diff --git a/data/manifests/checkpoint_011_verified_purge.json b/data/manifests/checkpoint_011_verified_purge.json new file mode 100644 index 0000000..b1a00f1 --- /dev/null +++ b/data/manifests/checkpoint_011_verified_purge.json @@ -0,0 +1,42 @@ +{ + "active_checkpoint": "Checkpoint 11: Verified Upload Purge", + "artifacts": [ + "scripts/upload_archive_rclone.sh", + "scripts/purge_uploaded_local_files.sh", + "deploy/k8s/base/cronjob-uploader.yaml", + "systemd/polymarket-orderbook-uploader.service", + "docs/GOOGLE_DRIVE_OFFLOAD.md", + "docs/KUBERNETES_DEPLOYMENT.md", + "docs/POLYMARKET_WEBSOCKET_RECORDER.md", + "docs/VPS_CUTOVER_RUNBOOK.md", + "data/manifests/upload_archive_purge_validation_sample.json", + "data/manifests/purge_uploaded_local_validation_sample.json", + "data/manifests/purge_uploaded_local_validation_summary.json", + "reports/checkpoints/checkpoint_011_verified_purge.md" + ], + "gate_status": "READY_FOR_DEPLOY_NOT_LIVE", + "project_rules_added": [ + "Previously verified local files may be purged in later cycles using a durable verified-upload index.", + "The verified-upload index itself is protected from purge.", + "Purge uses prior verified-upload evidence and local checksum matching before deletion." + ], + "validation": { + "bash_syntax": "PASS", + "systemd_verify": "PASS", + "kustomize_render": "PASS", + "kubernetes_server_dry_run": "PASS", + "local_rclone_validation": { + "upload_gate_status": "PASS", + "upload_operation_status": "UPLOAD_VERIFIED", + "verified_index_status": "updated", + "purge_gate_status": "PASS", + "purge_operation_status": "PURGE_PASS", + "purge_deleted_count": 2, + "old_raw_exists_after_purge": false, + "old_manifest_exists_after_purge": false, + "recent_raw_exists_after_purge": true + } + }, + "strongest_fake_progress_risk": "The change is validated locally and in Kubernetes server dry-run only; the live cluster still needs a new image build and canary deploy before periodic purge is actually running there.", + "next_smallest_step": "Commit and push the change, deploy the websocket canary image that also updates the uploader CronJob, then inspect the next upload and purge manifests plus PVC usage." +} diff --git a/deploy/k8s/base/cronjob-uploader.yaml b/deploy/k8s/base/cronjob-uploader.yaml index d2fb6e6..863c65b 100644 --- a/deploy/k8s/base/cronjob-uploader.yaml +++ b/deploy/k8s/base/cronjob-uploader.yaml @@ -38,9 +38,15 @@ spec: imagePullPolicy: IfNotPresent command: - /bin/bash - - /app/scripts/upload_archive_rclone.sh - - --execute - - --cleanup-after-verify + - -lc + - | + upload_exit=0 + /app/scripts/upload_archive_rclone.sh --execute --cleanup-after-verify || upload_exit=$? + purge_exit=0 + /app/scripts/purge_uploaded_local_files.sh --execute || purge_exit=$? + if [[ "${upload_exit}" -ne 0 || "${purge_exit}" -ne 0 ]]; then + exit 1 + fi env: - name: ORDERBOOKS_DATA_DIR value: /var/lib/orderbooks diff --git a/docs/GOOGLE_DRIVE_OFFLOAD.md b/docs/GOOGLE_DRIVE_OFFLOAD.md index a61dbcd..5a3791b 100644 --- a/docs/GOOGLE_DRIVE_OFFLOAD.md +++ b/docs/GOOGLE_DRIVE_OFFLOAD.md @@ -14,13 +14,15 @@ must still pass. Included: - `scripts/upload_archive_rclone.sh` +- `scripts/purge_uploaded_local_files.sh` - `systemd/polymarket-orderbook-uploader.service` - `systemd/polymarket-orderbook-uploader.timer` - dry-run mode by default - real upload only with `--execute` - rclone verification with `rclone check` - per-run upload manifests -- optional local cleanup only after successful verification +- verified-upload index tracking +- periodic local purge of previously verified files Excluded: @@ -162,7 +164,7 @@ The upload gate is `PASS` only when the copy succeeds and verification succeeds. Local files are kept by default, even after upload verification. -Cleanup requires an explicit flag: +Immediate same-run cleanup requires an explicit flag: ```sh /opt/orderbooks/scripts/upload_archive_rclone.sh \ @@ -176,6 +178,28 @@ Cleanup requires an explicit flag: Cleanup deletes only files that were selected for upload, uploaded, verified, and older than the retention window. The default retention window is 7 days. +The uploader also maintains a durable verified-upload index at: + +```text +/var/lib/orderbooks/manifests/upload_verified_index.json +``` + +That index records files that have already passed `rclone copy` and +`rclone check`. The periodic purge step uses that index to delete previously +verified local files after the retention window, even when the current upload +run is not the one that first verified them. + +Run the purge manually with: + +```sh +/opt/orderbooks/scripts/purge_uploaded_local_files.sh \ + --execute \ + --data-dir /var/lib/orderbooks \ + --retention-days 7 +``` + +The periodic systemd/Kubernetes runtime runs upload and purge together. + ## Upload Manifest Each run writes a manifest such as: @@ -199,6 +223,22 @@ The manifest records: - start/end time - rclone copy/check exit codes - gate status +- verified-upload index update summary + +Each purge run writes a separate manifest such as: + +```text +/var/lib/orderbooks/manifests/purge_uploaded_local_YYYYMMDDTHHMMSSZ.json +``` + +The purge manifest records: + +- verified-index path and record count +- eligible files older than retention +- deleted local files +- skipped files such as checksum mismatches +- retention configuration +- gate and operation status For this repository, the sample manifest path is: @@ -247,6 +287,9 @@ Run one upload immediately: sudo systemctl start polymarket-orderbook-uploader.service ``` +That service now runs upload verification first and then runs the verified-file +purge step in the same timer cycle. + ## Logs Use the systemd journal: @@ -290,5 +333,5 @@ PASS - Long-run upload reliability. - Interaction between hourly uploads and a 24h collector soak test. -- Retention cleanup after verified upload. +- Long-run purge behavior under repeated intermittent `rclone check` failures. - Production readiness. diff --git a/docs/VPS_CUTOVER_RUNBOOK.md b/docs/VPS_CUTOVER_RUNBOOK.md index 59d66c8..54ef679 100644 --- a/docs/VPS_CUTOVER_RUNBOOK.md +++ b/docs/VPS_CUTOVER_RUNBOOK.md @@ -81,7 +81,7 @@ Prepare repository permissions and the Python virtualenv: ```sh cd /opt/orderbooks -sudo chmod +x scripts/run_polymarket_collector_cycle.sh scripts/upload_archive_rclone.sh scripts/vps_preflight_check.sh scripts/vps_runtime_smoke_check.sh +sudo chmod +x scripts/run_polymarket_collector_cycle.sh scripts/upload_archive_rclone.sh scripts/purge_uploaded_local_files.sh scripts/vps_preflight_check.sh scripts/vps_runtime_smoke_check.sh sudo python3 -m venv .venv sudo .venv/bin/python -m pip install --upgrade pip sudo chown -R root:root /opt/orderbooks @@ -141,7 +141,10 @@ ORDERBOOKS_UPLOAD_MIN_AGE_SECONDS=600 ``` The uploader verifies uploads with `rclone check`. Dry runs do not prove remote -write access. +write access. Successful uploads update +`/var/lib/orderbooks/manifests/upload_verified_index.json`, and the uploader +service also runs a purge step that deletes older previously verified local +files after the retention window. ## Run VPS Preflight diff --git a/reports/checkpoints/checkpoint_011_verified_purge.md b/reports/checkpoints/checkpoint_011_verified_purge.md new file mode 100644 index 0000000..cc01eec --- /dev/null +++ b/reports/checkpoints/checkpoint_011_verified_purge.md @@ -0,0 +1,89 @@ +# Checkpoint 11: Verified Upload Purge + +## Gate + +`READY_FOR_DEPLOY_NOT_LIVE` + +The purge implementation is validated locally and the Kubernetes apply set +passes server dry-run, but this change has not been built into a new cluster +image yet. + +## Goal + +Add periodic local deletion of files that have already been uploaded and +verified on the remote, without relying only on the current upload run. + +## What Changed + +- `scripts/upload_archive_rclone.sh` + - writes/updates a durable verified-upload index at + `/var/lib/orderbooks/manifests/upload_verified_index.json` + - records verified-index update summary in each upload manifest +- `scripts/purge_uploaded_local_files.sh` + - reads the verified-upload index + - deletes only files older than retention with matching local SHA-256 + - protects the verified-upload index itself + - writes a purge manifest under `/var/lib/orderbooks/manifests/` +- `deploy/k8s/base/cronjob-uploader.yaml` + - runs upload verification and purge in the same periodic CronJob cycle +- `systemd/polymarket-orderbook-uploader.service` + - runs upload verification and purge in the same periodic service execution +- docs updated: + - `docs/GOOGLE_DRIVE_OFFLOAD.md` + - `docs/KUBERNETES_DEPLOYMENT.md` + - `docs/POLYMARKET_WEBSOCKET_RECORDER.md` + - `docs/VPS_CUTOVER_RUNBOOK.md` + +## Validation Evidence + +Local validation used a temporary data directory and a local `rclone` +destination path, not Google Drive, to prove the full flow: + +1. real `rclone copy` +2. real `rclone check` +3. verified-upload index update +4. purge of files older than retention +5. retention of a newer local file + +Durable artifacts: + +- `data/manifests/upload_archive_purge_validation_sample.json` +- `data/manifests/purge_uploaded_local_validation_sample.json` +- `data/manifests/purge_uploaded_local_validation_summary.json` + +Observed result: + +- upload gate: `PASS` +- upload operation: `UPLOAD_VERIFIED` +- verified index status: `updated` +- purge gate: `PASS` +- purge operation: `PURGE_PASS` +- deleted files: `2` +- retained newer file: `1` + +Kubernetes validation: + +- `kubectl kustomize deploy/k8s/base` +- `KUBECONFIG=../nuri/unrip3/.state/hetzner/kubeconfig.yaml kubectl apply -k deploy/k8s/base --dry-run=server` + +Both passed. + +## Live Runtime Context + +Before this change, the live cluster was already deleting files older than the +3-day retention window, but only during successful upload runs. The live disk +shape still showed many retained recent files, especially manifests within the +retention window. This checkpoint adds a separate verified-file purge phase so +older already-verified files can be removed based on durable local evidence. + +## Strongest Fake-Progress Risk + +This is not deployed yet. The current cluster image still runs the previous +uploader behavior until a new image is built and the canary deploy is applied. + +## Next Smallest Step + +Commit and push this source change to Forgejo `main`, run +`scripts/deploy/deploy_ws_canary_kaniko.sh --git-ref `, and then check +the next `upload_archive_*.json`, `purge_uploaded_local_*.json`, and PVC usage +to confirm the live CronJob is purging as designed. diff --git a/scripts/purge_uploaded_local_files.sh b/scripts/purge_uploaded_local_files.sh new file mode 100755 index 0000000..ff2ed92 --- /dev/null +++ b/scripts/purge_uploaded_local_files.sh @@ -0,0 +1,299 @@ +#!/usr/bin/env bash +set -uo pipefail + +SCRIPT_NAME="orderbooks_verified_file_purger" +SCRIPT_VERSION="0.1.0" + +MODE="dry-run" +DATA_DIR="${ORDERBOOKS_UPLOAD_DATA_DIR:-${ORDERBOOKS_DATA_DIR:-/var/lib/orderbooks}}" +MANIFEST_DIR="${ORDERBOOKS_UPLOAD_MANIFEST_DIR:-}" +MANIFEST_PATH="${ORDERBOOKS_PURGE_MANIFEST_PATH:-}" +VERIFIED_INDEX_PATH="${ORDERBOOKS_UPLOAD_VERIFIED_INDEX_PATH:-}" +RETENTION_DAYS="${ORDERBOOKS_UPLOAD_RETENTION_DAYS:-7}" + +usage() { + cat <<'EOF' +Usage: scripts/purge_uploaded_local_files.sh [options] + +Deletes local files only when they have prior verified-upload evidence in the +verified-upload index and are older than the retention window. + +Options: + --dry-run Plan purge only (default). + --execute Delete eligible local files. + --data-dir DIR Base data directory. Default: /var/lib/orderbooks. + --manifest-dir DIR Purge manifest output directory. Default: DATA_DIR/manifests. + --manifest-path PATH Exact purge manifest path. + --verified-index-path PATH Verified-upload index path. Default: MANIFEST_DIR/upload_verified_index.json. + --retention-days N Keep at least N days locally. Default: 7. + --help Show this help. +EOF +} + +while [[ $# -gt 0 ]]; do + case "$1" in + --dry-run) + MODE="dry-run" + shift + ;; + --execute) + MODE="execute" + shift + ;; + --data-dir) + DATA_DIR="$2" + shift 2 + ;; + --manifest-dir) + MANIFEST_DIR="$2" + shift 2 + ;; + --manifest-path) + MANIFEST_PATH="$2" + shift 2 + ;; + --verified-index-path) + VERIFIED_INDEX_PATH="$2" + shift 2 + ;; + --retention-days) + RETENTION_DAYS="$2" + shift 2 + ;; + --help) + usage + exit 0 + ;; + *) + echo "Unknown argument: $1" >&2 + usage >&2 + exit 2 + ;; + esac +done + +if [[ -z "${MANIFEST_DIR}" ]]; then + MANIFEST_DIR="${DATA_DIR%/}/manifests" +fi +if [[ -z "${VERIFIED_INDEX_PATH}" ]]; then + VERIFIED_INDEX_PATH="${MANIFEST_DIR%/}/upload_verified_index.json" +fi + +STARTED_AT="$(date -u +%Y-%m-%dT%H:%M:%SZ)" +RUN_ID="$(date -u +%Y%m%dT%H%M%SZ)" +if [[ -z "${MANIFEST_PATH}" ]]; then + MANIFEST_PATH="${MANIFEST_DIR%/}/purge_uploaded_local_${RUN_ID}.json" +fi + +mkdir -p "$(dirname "${MANIFEST_PATH}")" + +export SCRIPT_NAME SCRIPT_VERSION MODE DATA_DIR MANIFEST_DIR MANIFEST_PATH VERIFIED_INDEX_PATH RETENTION_DAYS STARTED_AT + +python3 - <<'PY' +import datetime as dt +import hashlib +import json +import os +import sys +import tempfile +from pathlib import Path + +script_name = os.environ["SCRIPT_NAME"] +script_version = os.environ["SCRIPT_VERSION"] +mode = os.environ["MODE"] +data_dir = Path(os.environ["DATA_DIR"]).resolve() +manifest_dir = Path(os.environ["MANIFEST_DIR"]).resolve() +manifest_path = Path(os.environ["MANIFEST_PATH"]).resolve() +verified_index_path = Path(os.environ["VERIFIED_INDEX_PATH"]).resolve() +retention_days = int(os.environ["RETENTION_DAYS"]) +started_at = os.environ["STARTED_AT"] +now = dt.datetime.now(dt.UTC) +ended_at = now.replace(microsecond=0).isoformat().replace("+00:00", "Z") +cutoff = now - dt.timedelta(days=retention_days) + + +def iso_z_from_ts(ts: float) -> str: + return dt.datetime.fromtimestamp(ts, dt.UTC).replace(microsecond=0).isoformat().replace("+00:00", "Z") + + +def sha256_file(path: Path) -> str: + digest = hashlib.sha256() + with path.open("rb") as handle: + for chunk in iter(lambda: handle.read(1024 * 1024), b""): + digest.update(chunk) + return digest.hexdigest() + + +def write_atomic_json(path: Path, payload: dict) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + with tempfile.NamedTemporaryFile("w", encoding="utf-8", dir=str(path.parent), delete=False) as tmp: + json.dump(payload, tmp, indent=2, sort_keys=True) + tmp.write("\n") + tmp_path = Path(tmp.name) + os.replace(tmp_path, path) + + +manifest = { + "schema_name": "purge_uploaded_local_manifest", + "schema_version": 1, + "purger": {"name": script_name, "version": script_version}, + "started_at_utc": started_at, + "ended_at_utc": ended_at, + "command_mode": mode, + "operation_status": None, + "gate_status": "PASS", + "config": { + "data_dir": str(data_dir), + "manifest_dir": str(manifest_dir), + "manifest_path": str(manifest_path), + "verified_index_path": str(verified_index_path), + "retention_days": retention_days, + }, + "verified_index": { + "path": str(verified_index_path), + "exists": verified_index_path.exists(), + "record_count_before": 0, + "record_count_after": 0, + }, + "candidate_files": [], + "deleted_local_files": [], + "skipped_files": [], + "counts": { + "eligible": 0, + "deleted": 0, + "within_retention": 0, + "already_absent": 0, + "protected": 0, + "sha256_mismatch": 0, + "invalid_records": 0, + }, + "warnings": [], + "known_gaps": [ + "Purge trusts prior verified-upload evidence in the local verified index and does not re-run rclone copy/check during deletion.", + "Protected local state files, including the verified-upload index itself, are not deleted by this script.", + ], +} + +if not verified_index_path.exists(): + manifest["operation_status"] = "NO_VERIFIED_INDEX" + write_atomic_json(manifest_path, manifest) + print(json.dumps({ + "gate_status": manifest["gate_status"], + "operation_status": manifest["operation_status"], + "manifest_path": str(manifest_path), + "eligible_files": 0, + "deleted_files": 0, + }, indent=2, sort_keys=True)) + sys.exit(0) + +try: + index_doc = json.loads(verified_index_path.read_text(encoding="utf-8")) +except Exception as exc: + manifest["operation_status"] = "INDEX_READ_FAILED" + manifest["gate_status"] = "FAIL" + manifest["warnings"].append(f"failed to read verified-upload index: {exc}") + write_atomic_json(manifest_path, manifest) + print(json.dumps({ + "gate_status": manifest["gate_status"], + "operation_status": manifest["operation_status"], + "manifest_path": str(manifest_path), + "eligible_files": 0, + "deleted_files": 0, + }, indent=2, sort_keys=True)) + sys.exit(1) + +records = index_doc.get("records", []) +manifest["verified_index"]["record_count_before"] = len(records) +protected_path = verified_index_path.resolve() +index_changed = False +deleted_at = ended_at + +for record in records: + relative_path = record.get("relative_path") + sha256_expected = record.get("sha256") + if not relative_path or not sha256_expected: + manifest["counts"]["invalid_records"] += 1 + manifest["skipped_files"].append({ + "relative_path": relative_path, + "reason": "invalid_index_record", + }) + continue + + local_path = (data_dir / relative_path).resolve() + if local_path == protected_path: + manifest["counts"]["protected"] += 1 + continue + + if not local_path.exists(): + manifest["counts"]["already_absent"] += 1 + if mode == "execute" and record.get("local_deleted_at_utc") is None: + record["local_deleted_at_utc"] = deleted_at + index_changed = True + continue + + stat = local_path.stat() + mtime = dt.datetime.fromtimestamp(stat.st_mtime, dt.UTC) + if mtime >= cutoff: + manifest["counts"]["within_retention"] += 1 + continue + + sha256_actual = sha256_file(local_path) + if sha256_actual != sha256_expected: + manifest["counts"]["sha256_mismatch"] += 1 + manifest["skipped_files"].append({ + "relative_path": relative_path, + "local_path": str(local_path), + "kind": record.get("kind"), + "reason": "sha256_mismatch", + "expected_sha256": sha256_expected, + "actual_sha256": sha256_actual, + }) + continue + + candidate = { + "relative_path": relative_path, + "local_path": str(local_path), + "kind": record.get("kind"), + "bytes": stat.st_size, + "mtime_utc": iso_z_from_ts(stat.st_mtime), + "sha256": sha256_actual, + "first_verified_at_utc": record.get("first_verified_at_utc"), + "last_verified_at_utc": record.get("last_verified_at_utc"), + "last_verified_by_manifest": record.get("last_verified_by_manifest"), + } + manifest["candidate_files"].append(candidate) + manifest["counts"]["eligible"] += 1 + + if mode == "execute": + local_path.unlink() + record["local_deleted_at_utc"] = deleted_at + index_changed = True + manifest["deleted_local_files"].append({**candidate, "deleted_at_utc": deleted_at}) + +manifest["counts"]["deleted"] = len(manifest["deleted_local_files"]) +manifest["verified_index"]["record_count_after"] = len(records) + +if mode == "execute" and index_changed: + index_doc["updated_at_utc"] = ended_at + write_atomic_json(verified_index_path, index_doc) + +if manifest["operation_status"] is None: + if manifest["counts"]["eligible"] == 0: + manifest["operation_status"] = "NO_ELIGIBLE_FILES" + elif mode == "dry-run": + manifest["operation_status"] = "DRY_RUN_PASS" + else: + manifest["operation_status"] = "PURGE_PASS" + +write_atomic_json(manifest_path, manifest) +print(json.dumps({ + "gate_status": manifest["gate_status"], + "operation_status": manifest["operation_status"], + "manifest_path": str(manifest_path), + "eligible_files": manifest["counts"]["eligible"], + "deleted_files": manifest["counts"]["deleted"], +}, indent=2, sort_keys=True)) + +if manifest["gate_status"] != "PASS": + sys.exit(1) +PY diff --git a/scripts/upload_archive_rclone.sh b/scripts/upload_archive_rclone.sh index beb8b1e..243107f 100755 --- a/scripts/upload_archive_rclone.sh +++ b/scripts/upload_archive_rclone.sh @@ -11,6 +11,7 @@ RAW_DIR="${ORDERBOOKS_UPLOAD_RAW_DIR:-}" SOURCE_MANIFEST_DIR="${ORDERBOOKS_UPLOAD_SOURCE_MANIFEST_DIR:-}" MANIFEST_DIR="${ORDERBOOKS_UPLOAD_MANIFEST_DIR:-}" MANIFEST_PATH="${ORDERBOOKS_UPLOAD_MANIFEST_PATH:-}" +VERIFIED_INDEX_PATH="${ORDERBOOKS_UPLOAD_VERIFIED_INDEX_PATH:-}" DEST="${ORDERBOOKS_RCLONE_DEST:-}" RCLONE_BIN="${ORDERBOOKS_RCLONE_BIN:-rclone}" MIN_AGE_SECONDS="${ORDERBOOKS_UPLOAD_MIN_AGE_SECONDS:-600}" @@ -34,6 +35,7 @@ Options: --source-manifest-dir DIR Source collector manifest directory. Default: DATA_DIR/manifests. --manifest-dir DIR Upload manifest output directory. Default: DATA_DIR/manifests. --manifest-path PATH Exact upload manifest path. + --verified-index-path PATH Verified-upload index path. Default: MANIFEST_DIR/upload_verified_index.json. --dest REMOTE:PATH rclone destination. Or set ORDERBOOKS_RCLONE_DEST. --min-age-seconds N Skip files modified within N seconds. Default: 600. --retention-days N Keep at least N days locally. Default: 7. @@ -76,6 +78,10 @@ while [[ $# -gt 0 ]]; do MANIFEST_PATH="$2" shift 2 ;; + --verified-index-path) + VERIFIED_INDEX_PATH="$2" + shift 2 + ;; --dest) DEST="$2" shift 2 @@ -113,6 +119,9 @@ fi if [[ -z "${MANIFEST_DIR}" ]]; then MANIFEST_DIR="${DATA_DIR%/}/manifests" fi +if [[ -z "${VERIFIED_INDEX_PATH}" ]]; then + VERIFIED_INDEX_PATH="${MANIFEST_DIR%/}/upload_verified_index.json" +fi STARTED_AT="$(date -u +%Y-%m-%dT%H:%M:%SZ)" RUN_ID="$(date -u +%Y%m%dT%H%M%SZ)" @@ -127,6 +136,7 @@ PLAN_PATH="${TMPDIR}/plan.json" RCLONE_COPY_LOG="${TMPDIR}/rclone_copy.log" RCLONE_CHECK_LOG="${TMPDIR}/rclone_check.log" CLEANUP_PATH="${TMPDIR}/cleanup.json" +INDEX_UPDATE_PATH="${TMPDIR}/verified_index_update.json" STAGING_DIR="${TMPDIR}/stage" mkdir -p "$(dirname "${MANIFEST_PATH}")" "${STAGING_DIR}" @@ -324,13 +334,127 @@ PY ENDED_AT="$(date -u +%Y-%m-%dT%H:%M:%SZ)" +if ! python3 - "$PLAN_PATH" "$VERIFIED_INDEX_PATH" "$INDEX_UPDATE_PATH" "$OPERATION_STATUS" "$ENDED_AT" "$MANIFEST_PATH" "$DATA_DIR" <<'PY' +import json +import os +import tempfile +import sys +from pathlib import Path + +plan_path = Path(sys.argv[1]) +index_path = Path(sys.argv[2]) +summary_path = Path(sys.argv[3]) +operation_status = sys.argv[4] +ended_at = sys.argv[5] +manifest_path = Path(sys.argv[6]) +data_dir = Path(sys.argv[7]).resolve() + +summary = { + "path": str(index_path), + "status": "skipped", + "updated": False, + "record_count": 0, + "new_records": 0, + "updated_records": 0, + "reason": "upload_not_verified", +} + +records_by_rel = {} +existing = {} +if index_path.exists(): + existing = json.loads(index_path.read_text(encoding="utf-8")) + for record in existing.get("records", []): + rel = record.get("relative_path") + if rel: + records_by_rel[rel] = record + +if operation_status == "UPLOAD_VERIFIED": + plan = json.loads(plan_path.read_text(encoding="utf-8")) + for item in plan.get("selected_files", []): + rel = item["relative_path"] + prev = records_by_rel.get(rel, {}) + first_verified = prev.get("first_verified_at_utc") or ended_at + local_path = Path(item["local_path"]).resolve() + try: + relative_path = local_path.relative_to(data_dir).as_posix() + except ValueError: + relative_path = rel + changed = ( + not prev + or prev.get("sha256") != item["sha256"] + or prev.get("bytes") != item["bytes"] + or prev.get("mtime_utc") != item["mtime_utc"] + or prev.get("local_deleted_at_utc") is not None + ) + merged = { + "relative_path": relative_path, + "local_path": str(local_path), + "kind": item["kind"], + "bytes": item["bytes"], + "mtime_utc": item["mtime_utc"], + "sha256": item["sha256"], + "first_verified_at_utc": first_verified, + "last_verified_at_utc": ended_at, + "last_verified_by_manifest": str(manifest_path), + "local_deleted_at_utc": None, + } + records_by_rel[rel] = merged + if prev: + if changed: + summary["updated_records"] += 1 + else: + summary["new_records"] += 1 + + index_doc = { + "schema_name": "verified_upload_index", + "schema_version": 1, + "updated_at_utc": ended_at, + "records": [records_by_rel[key] for key in sorted(records_by_rel)], + } + index_path.parent.mkdir(parents=True, exist_ok=True) + with tempfile.NamedTemporaryFile("w", encoding="utf-8", dir=str(index_path.parent), delete=False) as tmp: + json.dump(index_doc, tmp, indent=2, sort_keys=True) + tmp.write("\n") + tmp_path = Path(tmp.name) + os.replace(tmp_path, index_path) + summary["status"] = "updated" + summary["updated"] = True + summary["record_count"] = len(index_doc["records"]) + summary["reason"] = None +else: + summary["record_count"] = len(records_by_rel) + +summary_path.write_text(json.dumps(summary, indent=2, sort_keys=True) + "\n", encoding="utf-8") +PY +then + python3 - "$INDEX_UPDATE_PATH" "$VERIFIED_INDEX_PATH" <<'PY' +import json +import sys +from pathlib import Path + +summary_path = Path(sys.argv[1]) +index_path = Path(sys.argv[2]) +summary = { + "path": str(index_path), + "status": "failed", + "updated": False, + "record_count": 0, + "new_records": 0, + "updated_records": 0, + "reason": "index_update_failed", +} +summary_path.write_text(json.dumps(summary, indent=2, sort_keys=True) + "\n", encoding="utf-8") +PY +fi + export SCRIPT_NAME SCRIPT_VERSION STARTED_AT ENDED_AT export MODE OPERATION_STATUS GATE_STATUS export RCLONE_BIN RCLONE_AVAILABLE RCLONE_VERSION DEST export COPY_ATTEMPTED CHECK_ATTEMPTED COPY_EXIT_CODE CHECK_EXIT_CODE export DATA_DIR RAW_DIR SOURCE_MANIFEST_DIR MIN_AGE_SECONDS RETENTION_DAYS CLEANUP_AFTER_VERIFY +export VERIFIED_INDEX_PATH -python3 - "$PLAN_PATH" "$CLEANUP_PATH" "$MANIFEST_PATH" <<'PY' +python3 - "$PLAN_PATH" "$CLEANUP_PATH" "$INDEX_UPDATE_PATH" "$MANIFEST_PATH" <<'PY' import json import os import sys @@ -338,7 +462,8 @@ from pathlib import Path plan = json.loads(Path(sys.argv[1]).read_text()) cleanup = json.loads(Path(sys.argv[2]).read_text()) -manifest_path = Path(sys.argv[3]) +index_update = json.loads(Path(sys.argv[3]).read_text()) +manifest_path = Path(sys.argv[4]) mode = os.environ["MODE"] operation_status = os.environ["OPERATION_STATUS"] @@ -393,10 +518,12 @@ manifest = { "raw_dir": os.environ["RAW_DIR"], "source_manifest_dir": os.environ["SOURCE_MANIFEST_DIR"], "manifest_path": str(manifest_path), + "verified_index_path": os.environ["VERIFIED_INDEX_PATH"], "min_age_seconds": int(os.environ["MIN_AGE_SECONDS"]), "retention_days": int(os.environ["RETENTION_DAYS"]), "cleanup_after_verify": os.environ["CLEANUP_AFTER_VERIFY"] == "1", }, + "verified_index": index_update, "planned_files": selected, "attempted_files": attempted_files, "dry_run_files": dry_run_files, @@ -432,6 +559,8 @@ if operation_status == "BLOCKED_DEST_MISSING": manifest["warnings"].append("No rclone destination was configured; set --dest or ORDERBOOKS_RCLONE_DEST.") if mode == "dry-run": manifest["warnings"].append("Dry-run mode does not perform a real upload; checkpoint real-upload gate remains blocked.") +if index_update.get("status") == "failed": + manifest["warnings"].append("Verified-upload index update failed; previously verified file purge may lag until a later successful update.") manifest_path.parent.mkdir(parents=True, exist_ok=True) manifest_path.write_text(json.dumps(manifest, indent=2, sort_keys=True) + "\n", encoding="utf-8") diff --git a/systemd/polymarket-orderbook-uploader.service b/systemd/polymarket-orderbook-uploader.service index a88f910..077bd17 100644 --- a/systemd/polymarket-orderbook-uploader.service +++ b/systemd/polymarket-orderbook-uploader.service @@ -16,7 +16,7 @@ Environment=ORDERBOOKS_UPLOAD_MIN_AGE_SECONDS=600 Environment=ORDERBOOKS_UPLOAD_RETENTION_DAYS=7 Environment=ORDERBOOKS_RCLONE_BIN=/usr/bin/rclone EnvironmentFile=-/etc/orderbooks/orderbook-uploader.env -ExecStart=/bin/bash /opt/orderbooks/scripts/upload_archive_rclone.sh --execute +ExecStart=/bin/bash -lc 'upload_exit=0; /opt/orderbooks/scripts/upload_archive_rclone.sh --execute --cleanup-after-verify || upload_exit=$?; purge_exit=0; /opt/orderbooks/scripts/purge_uploaded_local_files.sh --execute || purge_exit=$?; if [ "$upload_exit" -ne 0 ] || [ "$purge_exit" -ne 0 ]; then exit 1; fi' StandardOutput=journal StandardError=journal SyslogIdentifier=polymarket-orderbook-uploader @@ -26,4 +26,3 @@ ProtectSystem=strict ProtectHome=true ReadWritePaths=/var/lib/orderbooks StateDirectory=orderbooks -