Files
RoleForge/app/tasks/runner.py
Sergey Antropoff 01d598eea5 - Админка: настройка pull-реестра (Hub / Harbor / Nexus) в БД, шифрование секретов;
обновлён /admin/config и API для os_registry.
- Molecule/раннер: env из конфигурации, ensure roleforge-os (ensure_roleforge_os.yml),
  os_registry_pull и доработки executors / runner / create.yml.
- /admin/os-images: выбор реестра, buildx (в т.ч. split amd64+arm64 + imagetools),
  опция --no-cache, стрим логов; domain.py: план команд build, ретраи push.
- UI: брендинг (app_name, app_tagline) из app_config через get_ui_branding_context;
  base.xhtml, role-create / role-view, core.js, pages-main, стили.
- Dockerfiles: требование Python ≥3.9 (assert), доработки alt9/astra/debian9/ubuntu20
  и др.; новые Dockerfile.arm64 для centos7/centos8.
- Конфиг: .env.example, config.py, pyproject.toml.
2026-05-06 07:52:29 +03:00

153 lines
5.3 KiB
Python

import asyncio
import json
import asyncpg
from redis.asyncio import Redis
from app.celery_app import celery
from app.core.config import get_settings
from app.services.executors import (
run_ansible_playbook,
run_molecule_test_for_playbook,
run_molecule_test_for_role,
)
from app.services.os_registry_pull import fetch_os_registry_molecule_env
async def _append_log(conn: asyncpg.Connection, redis: Redis, job_id: str, line: str) -> None:
await conn.execute(
"insert into job_logs (job_id, log_line) values ($1::uuid, $2)",
job_id,
line,
)
await redis.publish(f"job:{job_id}:logs", line)
async def _run_job(job_id: str) -> None:
settings = get_settings()
conn = await asyncpg.connect(settings.database_dsn)
redis = Redis.from_url(settings.redis_url, decode_responses=True)
try:
await conn.execute("update jobs set status='running', started_at=now() where id=$1::uuid", job_id)
row = await conn.fetchrow(
"""
select p.playbook_yaml, i.inventory_text, j.extra_vars
from jobs j
join playbooks p on p.id = j.playbook_id
join inventories i on i.id = p.inventory_id
where j.id=$1::uuid
""",
job_id,
)
if not row:
await conn.execute("update jobs set status='failed', finished_at=now() where id=$1::uuid", job_id)
return
return_code = 1
for line in run_ansible_playbook(
playbook_yaml=row["playbook_yaml"],
inventory_text=row["inventory_text"],
extra_vars=row["extra_vars"] or {},
):
if line.startswith("__RETURN_CODE__:"):
return_code = int(line.split(":", maxsplit=1)[1])
continue
await _append_log(conn, redis, job_id, line)
final_status = "success" if return_code == 0 else "failed"
await conn.execute(
"update jobs set status=$1, finished_at=now() where id=$2::uuid",
final_status,
job_id,
)
await redis.publish(f"job:{job_id}:logs", f"[job] finished with status={final_status}")
finally:
await redis.close()
await conn.close()
@celery.task(name="run_playbook_job")
def run_playbook_job(job_id: str) -> None:
asyncio.run(_run_job(job_id))
async def _append_test_log(conn: asyncpg.Connection, redis: Redis, test_id: str, line: str) -> None:
await conn.execute(
"insert into test_logs (test_run_id, log_line) values ($1::uuid, $2)",
test_id,
line,
)
await redis.publish(f"test:{test_id}:logs", line)
async def _run_molecule_test(test_id: str) -> None:
settings = get_settings()
conn = await asyncpg.connect(settings.database_dsn)
redis = Redis.from_url(settings.redis_url, decode_responses=True)
try:
await conn.execute(
"update test_runs set status='running', started_at=now() where id=$1::uuid",
test_id,
)
row = await conn.fetchrow(
"""
select tr.playbook_id::text as playbook_id,
tr.role_id::text as role_id,
tr.hosts_blueprint,
tr.extra_vars,
p.playbook_yaml,
r.name as role_name,
r.content as role_content
from test_runs tr
left join playbooks p on p.id = tr.playbook_id
left join ansible_roles r on r.id = tr.role_id
where tr.id=$1::uuid
""",
test_id,
)
if not row:
await conn.execute("update test_runs set status='failed', finished_at=now() where id=$1::uuid", test_id)
return
return_code = 1
molecule_env = await fetch_os_registry_molecule_env(conn)
if row["playbook_id"]:
iterator = run_molecule_test_for_playbook(
playbook_yaml=row["playbook_yaml"],
hosts=row["hosts_blueprint"] or [],
extra_vars=row["extra_vars"] or {},
molecule_extra_env=molecule_env,
)
else:
role_content = row["role_content"] or {}
role_tasks_yaml = role_content.get("tasks_yaml", "- name: ping\n ansible.builtin.ping:\n")
iterator = run_molecule_test_for_role(
role_name=row["role_name"] or "inline_role",
role_tasks_yaml=role_tasks_yaml,
hosts=row["hosts_blueprint"] or [],
extra_vars=row["extra_vars"] or {},
molecule_extra_env=molecule_env,
)
for line in iterator:
if line.startswith("__RETURN_CODE__:"):
return_code = int(line.split(":", maxsplit=1)[1])
continue
await _append_test_log(conn, redis, test_id, line)
final_status = "success" if return_code == 0 else "failed"
await conn.execute(
"update test_runs set status=$1, finished_at=now() where id=$2::uuid",
final_status,
test_id,
)
await redis.publish(f"test:{test_id}:logs", f"[test] finished with status={final_status}")
finally:
await redis.close()
await conn.close()
@celery.task(name="run_molecule_test")
def run_molecule_test(test_id: str) -> None:
asyncio.run(_run_molecule_test(test_id))