unrip/scripts/workflow/common.py
philipp 54dc05a94c Archive first live trade loop and open funding visibility turn
Proof: Preserve the completed first live BTC/EURe trade loop and establish the next approved implementation proof around pre-credit funding visibility and operator alerts.
Assumptions: The live-trade loop is sufficiently proven by the recorded deposits, withdrawals, durable command/result chain, and successful mainnet quote responses; the next highest-value slice is operational visibility rather than new execution breadth.
Still fake: The newly opened funding-visibility and alert turn is planning only; no pre-credit watcher or durable alert evaluator is implemented yet.
2026-04-03 01:07:02 +02:00

154 lines
4.2 KiB
Python
Executable file

#!/usr/bin/env python3
from __future__ import annotations
import re
import subprocess
from datetime import datetime, timezone
from pathlib import Path
ROOT_DIR = Path(__file__).resolve().parents[2]
BACKLOG_PATH = ROOT_DIR / "BACKLOG.md"
ARCHIVE_PATH = ROOT_DIR / "ARCHIVE.md"
PROOF_PATH = ROOT_DIR / "PROOF.md"
IMPLEMENTATION_PATH = ROOT_DIR / "IMPLEMENTATION.md"
RESEARCH_ACTIVE_PATH = ROOT_DIR / "research/ACTIVE.md"
IMPLEMENTATION_ARCHIVE_DIR = ROOT_DIR / "archive/implementation"
RESEARCH_ARCHIVE_DIR = ROOT_DIR / "archive/research"
LANE_PREFIX = {
"implementation": "I",
"research": "R",
"ops": "O",
"bug": "B",
}
BACKLOG_SECTION = {
"implementation": "## Implementation Candidates",
"research": "## Research Candidates",
"ops": "## Ops Candidates",
"bug": "## Bugs",
}
ARCHIVE_SECTION = {
"implementation": "## Implementation Turns",
"research": "## Research Turns",
"planning": "## Planning Events",
}
def now_utc() -> datetime:
return datetime.now(timezone.utc)
def today_iso() -> str:
return now_utc().date().isoformat()
def timestamp_slug() -> str:
return now_utc().strftime("%Y%m%dT%H%M%SZ")
def slugify(value: str) -> str:
slug = re.sub(r"[^a-z0-9]+", "-", value.lower()).strip("-")
return slug or "turn"
def load_text(path: Path) -> str:
return path.read_text(encoding="utf-8")
def save_text(path: Path, content: str) -> None:
path.write_text(content.rstrip() + "\n", encoding="utf-8")
def insert_into_section(document: str, heading: str, block: str) -> str:
lines = document.splitlines()
try:
start = lines.index(heading)
except ValueError as exc:
raise SystemExit(f"heading not found: {heading}") from exc
end = len(lines)
for idx in range(start + 1, len(lines)):
if lines[idx].startswith("## "):
end = idx
break
insert_at = end
return "\n".join(lines[:insert_at] + [block] + lines[insert_at:]) + "\n"
def append_archive_line(section: str, line: str) -> None:
content = load_text(ARCHIVE_PATH)
updated = insert_into_section(content, ARCHIVE_SECTION[section], line)
save_text(ARCHIVE_PATH, updated)
def read_backlog_lines() -> list[str]:
return load_text(BACKLOG_PATH).splitlines()
def write_backlog_lines(lines: list[str]) -> None:
save_text(BACKLOG_PATH, "\n".join(lines))
def next_backlog_id(lane: str) -> str:
prefix = LANE_PREFIX[lane]
pattern = re.compile(rf"\[{re.escape(prefix)}(\d+)\]")
highest = 0
for line in read_backlog_lines():
match = pattern.search(line)
if match:
highest = max(highest, int(match.group(1)))
return f"{prefix}{highest + 1:03d}"
def backlog_entry_map() -> dict[str, str]:
entries: dict[str, str] = {}
pattern = re.compile(r"- \[([A-Z]\d+)\] (.+)")
for line in read_backlog_lines():
match = pattern.match(line)
if match:
entries[match.group(1)] = match.group(2)
return entries
def remove_backlog_ids(ids: list[str]) -> None:
id_set = set(ids)
pattern = re.compile(r"- \[([A-Z]\d+)\] ")
kept: list[str] = []
for line in read_backlog_lines():
match = pattern.match(line)
if match and match.group(1) in id_set:
continue
kept.append(line)
write_backlog_lines(kept)
def git_has_changes() -> bool:
result = subprocess.run(
["git", "-C", str(ROOT_DIR), "status", "--porcelain"],
check=True,
capture_output=True,
text=True,
)
return bool(result.stdout.strip())
def path_has_changes(paths: list[Path]) -> bool:
rel_paths = [str(path.relative_to(ROOT_DIR)) for path in paths]
result = subprocess.run(
["git", "-C", str(ROOT_DIR), "status", "--porcelain", "--", *rel_paths],
check=True,
capture_output=True,
text=True,
)
return bool(result.stdout.strip())
def git_commit(message: str, paths: list[Path]) -> None:
if not path_has_changes(paths):
return
rel_paths = [str(path.relative_to(ROOT_DIR)) for path in paths]
subprocess.run(["git", "-C", str(ROOT_DIR), "add", "--", *rel_paths], check=True)
subprocess.run(["git", "-C", str(ROOT_DIR), "commit", "-m", message], check=True)