import asyncio import json import asyncpg from redis.asyncio import Redis from app.celery_app import celery from app.core.config import get_settings from app.services.executors import ( run_ansible_playbook, run_molecule_test_for_playbook, run_molecule_test_for_role, ) from app.services.os_registry_pull import fetch_os_registry_molecule_env async def _append_log(conn: asyncpg.Connection, redis: Redis, job_id: str, line: str) -> None: await conn.execute( "insert into job_logs (job_id, log_line) values ($1::uuid, $2)", job_id, line, ) await redis.publish(f"job:{job_id}:logs", line) async def _run_job(job_id: str) -> None: settings = get_settings() conn = await asyncpg.connect(settings.database_dsn) redis = Redis.from_url(settings.redis_url, decode_responses=True) try: await conn.execute("update jobs set status='running', started_at=now() where id=$1::uuid", job_id) row = await conn.fetchrow( """ select p.playbook_yaml, i.inventory_text, j.extra_vars from jobs j join playbooks p on p.id = j.playbook_id join inventories i on i.id = p.inventory_id where j.id=$1::uuid """, job_id, ) if not row: await conn.execute("update jobs set status='failed', finished_at=now() where id=$1::uuid", job_id) return return_code = 1 for line in run_ansible_playbook( playbook_yaml=row["playbook_yaml"], inventory_text=row["inventory_text"], extra_vars=row["extra_vars"] or {}, ): if line.startswith("__RETURN_CODE__:"): return_code = int(line.split(":", maxsplit=1)[1]) continue await _append_log(conn, redis, job_id, line) final_status = "success" if return_code == 0 else "failed" await conn.execute( "update jobs set status=$1, finished_at=now() where id=$2::uuid", final_status, job_id, ) await redis.publish(f"job:{job_id}:logs", f"[job] finished with status={final_status}") finally: await redis.close() await conn.close() @celery.task(name="run_playbook_job") def run_playbook_job(job_id: str) -> None: asyncio.run(_run_job(job_id)) async def _append_test_log(conn: asyncpg.Connection, redis: Redis, test_id: str, line: str) -> None: await conn.execute( "insert into test_logs (test_run_id, log_line) values ($1::uuid, $2)", test_id, line, ) await redis.publish(f"test:{test_id}:logs", line) async def _run_molecule_test(test_id: str) -> None: settings = get_settings() conn = await asyncpg.connect(settings.database_dsn) redis = Redis.from_url(settings.redis_url, decode_responses=True) try: await conn.execute( "update test_runs set status='running', started_at=now() where id=$1::uuid", test_id, ) row = await conn.fetchrow( """ select tr.playbook_id::text as playbook_id, tr.role_id::text as role_id, tr.hosts_blueprint, tr.extra_vars, p.playbook_yaml, r.name as role_name, r.content as role_content from test_runs tr left join playbooks p on p.id = tr.playbook_id left join ansible_roles r on r.id = tr.role_id where tr.id=$1::uuid """, test_id, ) if not row: await conn.execute("update test_runs set status='failed', finished_at=now() where id=$1::uuid", test_id) return return_code = 1 molecule_env = await fetch_os_registry_molecule_env(conn) if row["playbook_id"]: iterator = run_molecule_test_for_playbook( playbook_yaml=row["playbook_yaml"], hosts=row["hosts_blueprint"] or [], extra_vars=row["extra_vars"] or {}, molecule_extra_env=molecule_env, ) else: role_content = row["role_content"] or {} role_tasks_yaml = role_content.get("tasks_yaml", "- name: ping\n ansible.builtin.ping:\n") iterator = run_molecule_test_for_role( role_name=row["role_name"] or "inline_role", role_tasks_yaml=role_tasks_yaml, hosts=row["hosts_blueprint"] or [], extra_vars=row["extra_vars"] or {}, molecule_extra_env=molecule_env, ) for line in iterator: if line.startswith("__RETURN_CODE__:"): return_code = int(line.split(":", maxsplit=1)[1]) continue await _append_test_log(conn, redis, test_id, line) final_status = "success" if return_code == 0 else "failed" await conn.execute( "update test_runs set status=$1, finished_at=now() where id=$2::uuid", final_status, test_id, ) await redis.publish(f"test:{test_id}:logs", f"[test] finished with status={final_status}") finally: await redis.close() await conn.close() @celery.task(name="run_molecule_test") def run_molecule_test(test_id: str) -> None: asyncio.run(_run_molecule_test(test_id))