import json import os import platform as py_platform import shlex import shutil import subprocess import tempfile from collections.abc import Generator from pathlib import Path from app.core.config import get_settings # Collections required by Molecule docker driver are baked into the runner image (Dockerfile); skip Galaxy during runs. _MOLECULE_DEPENDENCY = {"name": "galaxy", "enabled": False} # Docker targets often lack a usable ~/.ansible; use RAM-backed tmp in target containers. _MOLECULE_REMOTE_TMP = "/dev/shm/.ansible-remote-tmp" _MOLECULE_DOCKER_PLAYBOOK_SRC = Path(__file__).resolve().parent / "molecule_docker_playbook" def _molecule_runtime_path() -> str: """Ensure `docker` from /usr/local/bin is visible to molecule and ansible-playbook children.""" prefix = "/usr/local/bin:/usr/bin:/usr/sbin:/sbin:/bin" tail = (os.environ.get("PATH") or "").strip() return f"{prefix}:{tail}" if tail else prefix def _molecule_provisioner_env() -> dict[str, str]: return {"PATH": _molecule_runtime_path(), "ANSIBLE_REMOTE_TMP": _MOLECULE_REMOTE_TMP} def _molecule_process_env(molecule_extra_env: dict[str, str] | None = None) -> dict[str, str]: """Environment for molecule / ansible-playbook child processes (OS image pull target + optional registry auth).""" env = os.environ.copy() env["PATH"] = _molecule_runtime_path() env["ANSIBLE_REMOTE_TMP"] = _MOLECULE_REMOTE_TMP env["ANSIBLE_FORCE_COLOR"] = "1" hub = (get_settings().roleforge_os_docker_hub_repository or "").strip() if hub: env["ROLEFORGE_OS_DOCKER_HUB_REPOSITORY"] = hub env.setdefault("ROLEFORGE_OS_PULL_REPOSITORY", hub) if molecule_extra_env: for k, v in molecule_extra_env.items(): if v is None: continue vs = str(v).strip() if vs != "": env[str(k)] = vs return env def _molecule_group_vars(extra_vars: dict | None) -> dict: """Inject stable transport vars for community.docker while preserving user extra vars.""" base = dict(extra_vars or {}) # docker connection may default to a non-writable user inside the target image base.setdefault("ansible_user", "root") base.setdefault("ansible_remote_tmp", _MOLECULE_REMOTE_TMP) return base def _platform_command(host: dict) -> str | list[str]: raw = str(host.get("command", "tail -f /dev/null") or "").strip() if not raw: raw = "tail -f /dev/null" try: parts = shlex.split(raw) except Exception: # noqa: BLE001 return raw return parts if len(parts) > 1 else (parts[0] if parts else raw) def _docker_engine_architecture() -> str | None: """`docker info` daemon Architecture — matches the engine (e.g. Apple Silicon), not the API container CPU.""" proc = subprocess.run( ["docker", "info", "-f", "{{.Architecture}}"], capture_output=True, text=True, timeout=30, env={**os.environ, "PATH": _molecule_runtime_path()}, ) if proc.returncode != 0: return None a = proc.stdout.strip().lower() return a or None def _effective_docker_host_arm_amd() -> tuple[bool, bool]: """ARM vs AMD64 for platform selection: prefer Docker daemon arch, fallback to this process.""" raw = _docker_engine_architecture() or py_platform.machine().lower() arm = raw in ("aarch64", "arm64", "armv8l", "armv7l") amd = raw in ("x86_64", "amd64", "i386", "i686") return arm, amd def _docker_local_image_architecture(image: str) -> str | None: img = str(image or "").strip() if not img: return None proc = subprocess.run( ["docker", "image", "inspect", "-f", "{{.Architecture}}", img], capture_output=True, text=True, timeout=45, env={**os.environ, "PATH": _molecule_runtime_path()}, ) if proc.returncode != 0: return None arch = proc.stdout.strip().lower() return arch or None def _manifest_includes_linux_arm64(image: str) -> bool: """True if docker manifest list/index lists a linux/arm64 variant (avoid 404 on amd64-only tags).""" img = str(image or "").strip() if not img: return False proc = subprocess.run( ["docker", "manifest", "inspect", img], capture_output=True, text=True, timeout=90, env={**os.environ, "PATH": _molecule_runtime_path()}, ) if proc.returncode != 0: return False try: data = json.loads(proc.stdout) except json.JSONDecodeError: return False manifests = data.get("manifests") if not isinstance(manifests, list): return False for m in manifests: pl = m.get("platform") or {} if str(pl.get("os", "linux")).lower() != "linux": continue arch = str(pl.get("architecture") or "").lower() if arch in ("arm64", "aarch64"): return True variant = str(pl.get("variant") or "").lower() if arch == "arm" and variant == "v8": return True return False def _molecule_docker_platform_for_image(host: dict, image: str) -> str | None: """OCI platform for docker_container: honor explicit host.platform; else match daemon + image capabilities.""" explicit = str(host.get("platform") or "").strip() if explicit: return explicit host_arm, host_amd = _effective_docker_host_arm_amd() arch = _docker_local_image_architecture(image) img_arm = arch in ("arm64", "aarch64") img_amd = arch in ("amd64", "x86_64") tag = str(image) if host_arm: if img_arm: return None # Prefer arm64 only when this tag actually publishes it; else omit platform (amd64 + emulation). if img_amd or arch is None: if _manifest_includes_linux_arm64(tag): return "linux/arm64" return None if host_amd: if img_amd: return None if img_arm: return "linux/arm64" return None return None def _platform_systemd_overrides(host: dict) -> dict: if not bool(host.get("systemd")): return {} # Systemd-in-Docker needs cgroup rw + writable /run. molecule-plugins may merge Podman-style dict tmpfs; # vendored provisioner create.yml converts that mapping to docker_container's list-of-strings format. return { "tmpfs": [ "/run:rw,noexec,nosuid,size=65536k", "/run/lock:rw,noexec,nosuid,size=65536k", ], "volumes": ["/sys/fs/cgroup:/sys/fs/cgroup:rw"], "cgroupns_mode": "host", } def run_ansible_playbook( playbook_yaml: str, inventory_text: str, extra_vars: dict, ) -> Generator[str, None, int]: with tempfile.TemporaryDirectory(prefix="roleforge-") as temp_dir: playbook_path = os.path.join(temp_dir, "playbook.yml") inventory_path = os.path.join(temp_dir, "inventory.ini") with open(playbook_path, "w", encoding="utf-8") as f: f.write(playbook_yaml) with open(inventory_path, "w", encoding="utf-8") as f: f.write(inventory_text) command = [ "ansible-playbook", playbook_path, "-i", inventory_path, "--extra-vars", json.dumps(extra_vars), ] env = os.environ.copy() env["ANSIBLE_FORCE_COLOR"] = "1" process = subprocess.Popen( command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, bufsize=1, universal_newlines=True, env=env, ) assert process.stdout is not None for line in process.stdout: yield line.rstrip("\n") return_code = process.wait() yield f"__RETURN_CODE__:{return_code}" def _molecule_ansible_cfg_text() -> str: return ( "[defaults]\n" "stdout_callback = ansible.builtin.default\n" "remote_user = root\n" f"remote_tmp = {_MOLECULE_REMOTE_TMP}\n" "pipelining = True\n" "\n" "[callback_default]\n" "result_format = yaml\n" ) def _write_molecule_ansible_cfgs(project_root: Path, scenario_dir: Path) -> None: """Write ansible.cfg at project root and under the scenario (Molecule cwd varies).""" text = _molecule_ansible_cfg_text() (project_root / "ansible.cfg").write_text(text, encoding="utf-8") (scenario_dir / "ansible.cfg").write_text(text, encoding="utf-8") def _install_molecule_docker_playbook(scenario_dir: Path) -> None: """Vendor molecule-plugins docker create playbook (+tasks/) into the scenario; tmpfs line patched.""" src_root = _MOLECULE_DOCKER_PLAYBOOK_SRC for path in src_root.rglob("*"): if not path.is_file(): continue rel = path.relative_to(src_root) dest = scenario_dir / rel dest.parent.mkdir(parents=True, exist_ok=True) shutil.copy2(path, dest) def _run_streaming(command: list[str], cwd: str, env: dict[str, str]) -> Generator[str, None, int]: process = subprocess.Popen( command, cwd=cwd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, bufsize=1, universal_newlines=True, env=env, ) assert process.stdout is not None for line in process.stdout: text = line.rstrip("\n") # Molecule docker plugin noise: keep logs focused on actionable output. if "Driver docker does not provide a schema." in text: continue yield text code = process.wait() yield f"__RETURN_CODE__:{code}" def _run_molecule_step(command: list[str], cwd: str, env: dict[str, str]) -> Generator[str, None, int]: """Stream a molecule step and return early when it fails.""" rc = 1 for line in _run_streaming(command=command, cwd=cwd, env=env): if line.startswith("__RETURN_CODE__:"): try: rc = int(line.split(":", 1)[1]) except Exception: # noqa: BLE001 rc = 1 yield line return rc def _dump_container_debug(name: str, tail: int = 200) -> Generator[str, None, None]: """Emit inspect/status and recent logs for a docker container into test output.""" inspect_proc = subprocess.run( ["docker", "inspect", "-f", "{{.State.Status}}|{{.State.ExitCode}}|{{.State.Error}}", name], capture_output=True, text=True, ) if inspect_proc.returncode == 0: yield f"[roleforge] container state ({name}): {inspect_proc.stdout.strip()}" else: yield f"[roleforge] container inspect failed ({name})" if inspect_proc.stderr: for line in inspect_proc.stderr.splitlines(): yield line return logs_proc = subprocess.run( ["docker", "logs", f"--tail={tail}", name], capture_output=True, text=True, ) if logs_proc.returncode != 0: yield f"[roleforge] container logs unavailable ({name})" if logs_proc.stderr: for line in logs_proc.stderr.splitlines(): yield line return yield f"[roleforge] container logs ({name}) begin" body = (logs_proc.stdout or "").splitlines() for line in body: yield line yield f"[roleforge] container logs ({name}) end" def _docker_tmp_preflight(hosts: list[dict]) -> Generator[str, None, bool]: """Prepare ansible tmp dir in Molecule target containers before converge.""" ok = True for host in hosts: name = str(host.get("name") or "").strip() if not name: continue state = subprocess.run( ["docker", "inspect", "-f", "{{.State.Running}}", name], capture_output=True, text=True, ) if state.returncode != 0: ok = False yield f"[roleforge] preflight failed ({name}): cannot inspect container" if state.stderr: for line in state.stderr.splitlines(): yield line continue if state.stdout.strip().lower() != "true": start = subprocess.run(["docker", "start", name], capture_output=True, text=True) if start.returncode != 0: ok = False yield f"[roleforge] preflight failed ({name}): container is stopped and cannot be started" if start.stderr: for line in start.stderr.splitlines(): yield line yield from _dump_container_debug(name) continue yield f"[roleforge] preflight: started stopped container {name}" cmd = [ "docker", "exec", name, "sh", "-lc", "mkdir -p /tmp/.ansible-remote-tmp && chmod 1777 /tmp /tmp/.ansible-remote-tmp", ] proc = subprocess.run(cmd, capture_output=True, text=True) if proc.returncode == 0: yield f"[roleforge] preflight ok: {name}" continue ok = False yield f"[roleforge] preflight failed ({name}), exit={proc.returncode}" if proc.stdout: for line in proc.stdout.splitlines(): yield line if proc.stderr: for line in proc.stderr.splitlines(): yield line yield from _dump_container_debug(name) return ok def run_molecule_test_for_playbook( playbook_yaml: str, hosts: list[dict], extra_vars: dict, *, molecule_extra_env: dict[str, str] | None = None, ) -> Generator[str, None, int]: hosts = hosts or [{"name": "test-node-1", "groups": ["all"]}] with tempfile.TemporaryDirectory(prefix="roleforge-molecule-") as temp_dir: root = Path(temp_dir) scenario_dir = root / "molecule" / "default" scenario_dir.mkdir(parents=True, exist_ok=True) _install_molecule_docker_playbook(scenario_dir) _write_molecule_ansible_cfgs(root, scenario_dir) platforms = [] for host in hosts: item = { "name": host["name"], "image": host.get("image", "roleforge-backend:latest"), "command": _platform_command(host), "user": "root", "privileged": host.get("privileged", True), "pre_build_image": True, } item.update(_platform_systemd_overrides(host)) plat = _molecule_docker_platform_for_image(host, str(item["image"])) if plat: item["platform"] = plat platforms.append(item) molecule_config = { "dependency": _MOLECULE_DEPENDENCY, "driver": {"name": "docker"}, "platforms": platforms, "provisioner": { "name": "ansible", "env": _molecule_provisioner_env(), "playbooks": {"create": "create.yml", "converge": "converge.yml"}, "inventory": {"group_vars": {"all": _molecule_group_vars(extra_vars)}}, }, "scenario": {"name": "default", "converge_sequence": ["converge"]}, } (scenario_dir / "molecule.yml").write_text( json.dumps(molecule_config, indent=2), encoding="utf-8", ) (scenario_dir / "converge.yml").write_text(playbook_yaml, encoding="utf-8") env = _molecule_process_env(molecule_extra_env) create_command = ["molecule", "create", "-s", "default"] create_rc = yield from _run_molecule_step(command=create_command, cwd=str(root), env=env) if create_rc != 0: return preflight_ok = yield from _docker_tmp_preflight(hosts) if not preflight_ok: yield "__RETURN_CODE__:1" return # No extra `-v` on ansible-playbook — verbosity prints full result dicts (JSON blobs) on skip/ok. converge_command = ["molecule", "converge", "-s", "default"] yield from _run_streaming(command=converge_command, cwd=str(root), env=env) def run_molecule_test_for_role( role_name: str, role_tasks_yaml: str, hosts: list[dict], extra_vars: dict, *, molecule_extra_env: dict[str, str] | None = None, ) -> Generator[str, None, int]: hosts = hosts or [{"name": "test-node-1", "groups": ["all"]}] with tempfile.TemporaryDirectory(prefix="roleforge-molecule-role-") as temp_dir: root = Path(temp_dir) roles_dir = root / "roles" / role_name / "tasks" roles_dir.mkdir(parents=True, exist_ok=True) (roles_dir / "main.yml").write_text(role_tasks_yaml, encoding="utf-8") scenario_dir = root / "molecule" / "default" scenario_dir.mkdir(parents=True, exist_ok=True) _install_molecule_docker_playbook(scenario_dir) _write_molecule_ansible_cfgs(root, scenario_dir) converge = ( "- hosts: all\n" " gather_facts: true\n" " pre_tasks:\n" " - name: Prepare Ansible remote tmp\n" " raw: mkdir -p /tmp/.ansible-remote-tmp && chmod 1777 /tmp /tmp/.ansible-remote-tmp\n" " roles:\n" f" - role: {role_name}\n" ) (scenario_dir / "converge.yml").write_text(converge, encoding="utf-8") platforms = [] for host in hosts: item = { "name": host["name"], "image": host.get("image", "roleforge-backend:latest"), "command": _platform_command(host), "user": "root", "privileged": host.get("privileged", True), "pre_build_image": True, } item.update(_platform_systemd_overrides(host)) plat = _molecule_docker_platform_for_image(host, str(item["image"])) if plat: item["platform"] = plat platforms.append(item) molecule_config = { "dependency": _MOLECULE_DEPENDENCY, "driver": {"name": "docker"}, "platforms": platforms, "provisioner": { "name": "ansible", "env": _molecule_provisioner_env(), "playbooks": {"create": "create.yml", "converge": "converge.yml"}, "inventory": {"group_vars": {"all": _molecule_group_vars(extra_vars)}}, }, "scenario": {"name": "default", "converge_sequence": ["converge"]}, } (scenario_dir / "molecule.yml").write_text( json.dumps(molecule_config, indent=2), encoding="utf-8", ) env = _molecule_process_env(molecule_extra_env) env["ANSIBLE_ROLES_PATH"] = str(root / "roles") create_command = ["molecule", "create", "-s", "default"] create_rc = yield from _run_molecule_step(command=create_command, cwd=str(root), env=env) if create_rc != 0: return preflight_ok = yield from _docker_tmp_preflight(hosts) if not preflight_ok: yield "__RETURN_CODE__:1" return converge_command = ["molecule", "converge", "-s", "default"] yield from _run_streaming(command=converge_command, cwd=str(root), env=env)