orderbooks/scripts/collect_polymarket_orderbooks.py
philipp 284e465588
Some checks failed
deploy / deploy (push) Has been cancelled
Prepare Kubernetes orderbooks deployment
2026-04-18 11:23:28 +02:00

668 lines
26 KiB
Python
Executable file

#!/usr/bin/env python3
"""Minimal raw Polymarket order-book snapshot sample collector.
Checkpoint 4 scope: finite sample run only. This script reads the BTC discovery
artifact, fetches public CLOB batch order books for a small market set, writes
raw gzip JSONL envelopes, and closes with a manifest. It is not a daemon and it
does not trade.
"""
from __future__ import annotations
import argparse
import datetime as dt
import gzip
import hashlib
import json
import signal
import sys
import time
import urllib.error
import urllib.request
from pathlib import Path
from typing import Any
COLLECTOR_NAME = "polymarket_orderbook_collector"
COLLECTOR_VERSION = "0.1.0"
SCHEMA_NAME = "raw_orderbook_snapshot"
SCHEMA_VERSION = 1
CLOB_BOOKS_URL = "https://clob.polymarket.com/books"
DEFAULT_CONFIG_PATH = Path("config/polymarket_collector.example.yaml")
DEFAULT_DISCOVERY_PATH = Path("data/discovery/polymarket_btc_markets_latest.json")
DEFAULT_OUTPUT_DIR = Path("data/live_sample")
DEFAULT_MANIFEST_PATH = Path("data/manifests/orderbook_collector_sample_manifest.json")
SAFE_RESPONSE_HEADERS = {
"cache-control",
"cf-cache-status",
"cf-ray",
"content-length",
"content-type",
"date",
"retry-after",
"server",
"x-ratelimit-limit",
"x-ratelimit-remaining",
"x-ratelimit-reset",
"ratelimit-limit",
"ratelimit-remaining",
"ratelimit-reset",
}
STOP_REQUESTED = False
STOP_SIGNAL: str | None = None
def handle_stop(signum: int, _frame: Any) -> None:
global STOP_REQUESTED, STOP_SIGNAL
STOP_REQUESTED = True
STOP_SIGNAL = signal.Signals(signum).name
def utc_now() -> dt.datetime:
return dt.datetime.now(dt.UTC)
def iso_z(value: dt.datetime | None = None) -> str:
value = value or utc_now()
return value.astimezone(dt.UTC).replace(microsecond=0).isoformat().replace("+00:00", "Z")
def compact_timestamp(value: dt.datetime | None = None) -> str:
value = value or utc_now()
return value.astimezone(dt.UTC).strftime("%Y%m%dT%H%M%SZ")
def parse_iso(value: Any) -> dt.datetime | None:
if not isinstance(value, str) or not value.strip():
return None
text = value.strip()
if text.endswith("Z"):
text = text[:-1] + "+00:00"
try:
parsed = dt.datetime.fromisoformat(text)
except ValueError:
return None
if parsed.tzinfo is None:
parsed = parsed.replace(tzinfo=dt.UTC)
return parsed.astimezone(dt.UTC)
def sha256_file(path: Path) -> str:
digest = hashlib.sha256()
with path.open("rb") as handle:
for chunk in iter(lambda: handle.read(1024 * 1024), b""):
digest.update(chunk)
return digest.hexdigest()
def parse_scalar(value: str) -> Any:
value = value.strip()
if not value:
return ""
if value[0] in {"'", '"'} and value[-1:] == value[0]:
return value[1:-1]
lower = value.lower()
if lower in {"true", "false"}:
return lower == "true"
if lower in {"null", "none"}:
return None
try:
return int(value)
except ValueError:
pass
try:
return float(value)
except ValueError:
return value
def load_flat_yaml(path: Path) -> dict[str, Any]:
"""Parse the flat YAML subset used by the example config."""
config: dict[str, Any] = {}
if not path.exists():
return config
for line_number, raw_line in enumerate(path.read_text(encoding="utf-8").splitlines(), 1):
line = raw_line.split("#", 1)[0].strip()
if not line:
continue
if ":" not in line:
raise ValueError(f"Unsupported config line {line_number}: {raw_line}")
key, value = line.split(":", 1)
key = key.strip()
if not key:
raise ValueError(f"Missing config key on line {line_number}")
config[key] = parse_scalar(value)
return config
def config_digest(path: Path | None) -> str | None:
if path is None or not path.exists():
return None
return sha256_file(path)
def filter_headers(headers: Any) -> dict[str, str]:
safe: dict[str, str] = {}
for key, value in dict(headers).items():
if key.lower() in SAFE_RESPONSE_HEADERS:
safe[key] = value
return safe
def http_post_json(
*,
url: str,
json_body: Any,
timeout_seconds: float,
max_retries: int,
backoff_seconds: float,
) -> dict[str, Any]:
body_bytes = json.dumps(json_body, separators=(",", ":")).encode("utf-8")
attempts: list[dict[str, Any]] = []
final_json: Any | None = None
final_text_preview: str | None = None
final_json_error: str | None = None
final_status_code: int | None = None
final_headers: dict[str, str] = {}
for attempt_index in range(max_retries + 1):
started_at = iso_z()
started_monotonic = time.monotonic()
status_code: int | None = None
response_headers: dict[str, str] = {}
response_text = ""
error: str | None = None
try:
request = urllib.request.Request(
url,
data=body_bytes,
headers={
"Accept": "application/json",
"Content-Type": "application/json",
"User-Agent": "orderbooks-checkpoint-4-sample/0.1.0",
},
method="POST",
)
with urllib.request.urlopen(request, timeout=timeout_seconds) as response:
status_code = response.status
response_headers = filter_headers(response.headers)
response_text = response.read().decode("utf-8", errors="replace")
except urllib.error.HTTPError as exc:
status_code = exc.code
response_headers = filter_headers(exc.headers)
response_text = exc.read().decode("utf-8", errors="replace")
error = f"HTTPError: {exc}"
except Exception as exc: # noqa: BLE001 - preserve request failure evidence
error = f"{type(exc).__name__}: {exc}"
duration_ms = round((time.monotonic() - started_monotonic) * 1000, 3)
parsed_json = None
json_error = None
if response_text:
try:
parsed_json = json.loads(response_text)
except json.JSONDecodeError as exc:
json_error = str(exc)
attempts.append(
{
"attempt": attempt_index + 1,
"started_at_utc": started_at,
"ended_at_utc": iso_z(),
"duration_ms": duration_ms,
"status_code": status_code,
"headers": response_headers,
"error": error,
"json_error": json_error,
}
)
final_json = parsed_json
final_json_error = json_error
final_text_preview = response_text[:1000] if parsed_json is None else None
final_status_code = status_code
final_headers = response_headers
retryable = status_code == 429 or (status_code is not None and 500 <= status_code <= 599)
if error is None and status_code is not None and 200 <= status_code < 300:
break
if not retryable or attempt_index >= max_retries or STOP_REQUESTED:
break
retry_after = response_headers.get("Retry-After") or response_headers.get("retry-after")
sleep_seconds = backoff_seconds * (2**attempt_index)
if retry_after:
try:
sleep_seconds = max(sleep_seconds, float(retry_after))
except ValueError:
pass
time.sleep(sleep_seconds)
return {
"request": {
"method": "POST",
"url": url,
"json_body": json_body,
},
"response": {
"status_code": final_status_code,
"headers": final_headers,
"json": final_json,
"json_error": final_json_error,
"text_preview": final_text_preview,
},
"attempts": attempts,
"duration_ms": round(sum(attempt["duration_ms"] for attempt in attempts), 3),
"ok": final_status_code is not None and 200 <= final_status_code < 300 and final_json_error is None,
}
def load_discovery(path: Path) -> dict[str, Any]:
return json.loads(path.read_text(encoding="utf-8"))
def market_is_usable(market: dict[str, Any], now: dt.datetime, safety_seconds: int) -> tuple[bool, list[str]]:
reasons: list[str] = []
if market.get("active") is not True:
reasons.append("not_active")
if market.get("closed") is not False:
reasons.append("closed")
if market.get("accepting_orders") is not True:
reasons.append("not_accepting_orders")
if market.get("enable_order_book") is not True:
reasons.append("order_book_not_enabled")
end_time = parse_iso(market.get("end_time_utc"))
if end_time is None:
reasons.append("missing_end_time")
elif end_time <= now + dt.timedelta(seconds=safety_seconds):
reasons.append("too_close_to_end_or_expired")
tokens = market.get("tokens")
if not isinstance(tokens, list) or len(tokens) < 2:
reasons.append("missing_two_tokens")
else:
outcomes = [token.get("outcome") for token in tokens if isinstance(token, dict)]
token_ids = [token.get("token_id") for token in tokens if isinstance(token, dict)]
if outcomes[:2] != ["Up", "Down"] or not all(token_ids[:2]):
reasons.append("bad_up_down_token_mapping")
return not reasons, reasons
def select_markets(
discovery: dict[str, Any],
*,
market_limit: int,
market_end_safety_seconds: int,
) -> tuple[list[dict[str, Any]], dict[str, int]]:
now = utc_now()
selected: list[dict[str, Any]] = []
rejection_counts: dict[str, int] = {}
markets = discovery.get("normalized_markets") or []
for market in markets:
if not isinstance(market, dict):
rejection_counts["not_object"] = rejection_counts.get("not_object", 0) + 1
continue
usable, reasons = market_is_usable(market, now, market_end_safety_seconds)
if not usable:
for reason in reasons:
rejection_counts[reason] = rejection_counts.get(reason, 0) + 1
continue
selected.append(market)
if len(selected) >= market_limit:
break
return selected, dict(sorted(rejection_counts.items()))
def flatten_tokens(markets: list[dict[str, Any]]) -> list[dict[str, Any]]:
tokens: list[dict[str, Any]] = []
for market in markets:
for token in market.get("tokens", [])[:2]:
tokens.append(
{
"market_name": market.get("market_name"),
"market_slug": market.get("market_slug"),
"condition_id": market.get("condition_id"),
"token_id": str(token.get("token_id")),
"outcome": token.get("outcome"),
"market_end_time_utc": market.get("end_time_utc"),
}
)
return tokens
def build_snapshot_envelope(
*,
raw_book: dict[str, Any],
token_meta: dict[str, Any],
collected_at_utc: str,
sequence: int,
request_record: dict[str, Any],
response_index: int,
) -> dict[str, Any]:
return {
"schema_name": SCHEMA_NAME,
"schema_version": SCHEMA_VERSION,
"collector": {
"name": COLLECTOR_NAME,
"version": COLLECTOR_VERSION,
},
"market": {
"market_name": token_meta.get("market_name"),
"market_slug": token_meta.get("market_slug"),
"condition_id": token_meta.get("condition_id"),
"token_id": token_meta.get("token_id"),
"outcome": token_meta.get("outcome"),
"market_end_time_utc": token_meta.get("market_end_time_utc"),
},
"collection": {
"collected_at_utc": collected_at_utc,
"sequence": sequence,
"response_index": response_index,
},
"request": {
"method": request_record["request"]["method"],
"url": request_record["request"]["url"],
"params": None,
"json_body": request_record["request"]["json_body"],
"status_code": request_record["response"]["status_code"],
"duration_ms": request_record["duration_ms"],
"attempts": request_record["attempts"],
},
"raw": raw_book,
}
def summarize_output_file(path: Path, rows_written: int) -> dict[str, Any]:
return {
"path": path.as_posix(),
"status": "valid" if path.exists() and path.stat().st_size > 0 else "missing",
"bytes": path.stat().st_size if path.exists() else 0,
"rows": rows_written,
"sha256": sha256_file(path) if path.exists() else None,
}
def write_manifest(path: Path, manifest: dict[str, Any]) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(manifest, indent=2, sort_keys=True) + "\n", encoding="utf-8")
def config_value(config: dict[str, Any], args: argparse.Namespace, key: str, default: Any) -> Any:
cli_value = getattr(args, key)
if cli_value is not None:
return cli_value
return config.get(key, default)
def build_runtime_config(args: argparse.Namespace) -> dict[str, Any]:
config_path = args.config
file_config = load_flat_yaml(config_path) if config_path else {}
runtime = {
"discovery_path": Path(config_value(file_config, args, "discovery_path", DEFAULT_DISCOVERY_PATH)),
"output_dir": Path(config_value(file_config, args, "output_dir", DEFAULT_OUTPUT_DIR)),
"manifest_path": Path(config_value(file_config, args, "manifest_path", DEFAULT_MANIFEST_PATH)),
"market_limit": int(config_value(file_config, args, "market_limit", 2)),
"interval_seconds": float(config_value(file_config, args, "interval_seconds", 30.0)),
"duration_seconds": float(config_value(file_config, args, "duration_seconds", 300.0)),
"request_timeout_seconds": float(config_value(file_config, args, "request_timeout_seconds", 15.0)),
"max_retries": int(config_value(file_config, args, "max_retries", 2)),
"backoff_seconds": float(config_value(file_config, args, "backoff_seconds", 2.0)),
"market_end_safety_seconds": int(config_value(file_config, args, "market_end_safety_seconds", 420)),
"clob_books_url": str(config_value(file_config, args, "clob_books_url", CLOB_BOOKS_URL)),
"config_path": config_path,
"config_sha256": config_digest(config_path),
"config_snapshot": file_config,
}
if runtime["market_limit"] < 1:
raise ValueError("market_limit must be >= 1")
if runtime["interval_seconds"] <= 0:
raise ValueError("interval_seconds must be > 0")
if runtime["duration_seconds"] <= 0:
raise ValueError("duration_seconds must be > 0")
return runtime
def run_collection(runtime: dict[str, Any], command: str) -> tuple[dict[str, Any], Path]:
signal.signal(signal.SIGINT, handle_stop)
signal.signal(signal.SIGTERM, handle_stop)
started = utc_now()
started_at_utc = iso_z(started)
discovery_path: Path = runtime["discovery_path"]
discovery = load_discovery(discovery_path)
selected_markets, rejection_counts = select_markets(
discovery,
market_limit=runtime["market_limit"],
market_end_safety_seconds=runtime["market_end_safety_seconds"],
)
warnings: list[str] = []
failures: list[dict[str, Any]] = []
if not selected_markets:
warnings.append("No usable active BTC markets found in discovery input.")
tokens = flatten_tokens(selected_markets)
run_id = compact_timestamp(started)
output_dir = runtime["output_dir"] / "polymarket" / "orderbooks" / run_id
output_dir.mkdir(parents=True, exist_ok=True)
output_file = output_dir / f"polymarket_orderbooks_{run_id}.jsonl.gz"
request_count = 0
success_count = 0
failure_count = 0
status_code_counts: dict[str, int] = {}
rows_written = 0
sequence = 0
token_row_counts = {token["token_id"]: 0 for token in tokens}
deadline = time.monotonic() + runtime["duration_seconds"]
token_by_id = {token["token_id"]: token for token in tokens}
request_body = [{"token_id": token["token_id"]} for token in tokens]
with gzip.open(output_file, "wt", encoding="utf-8") as handle:
while tokens and not STOP_REQUESTED and time.monotonic() < deadline:
loop_started = time.monotonic()
collected_at_utc = iso_z()
request_count += 1
request_record = http_post_json(
url=runtime["clob_books_url"],
json_body=request_body,
timeout_seconds=runtime["request_timeout_seconds"],
max_retries=runtime["max_retries"],
backoff_seconds=runtime["backoff_seconds"],
)
status_code = request_record["response"]["status_code"]
status_key = str(status_code)
status_code_counts[status_key] = status_code_counts.get(status_key, 0) + 1
if request_record["ok"] and isinstance(request_record["response"]["json"], list):
success_count += 1
for response_index, raw_book in enumerate(request_record["response"]["json"]):
if not isinstance(raw_book, dict):
failure_count += 1
failures.append(
{
"collected_at_utc": collected_at_utc,
"reason": "book_response_item_not_object",
"response_index": response_index,
}
)
continue
asset_id = str(raw_book.get("asset_id") or "")
token_meta = token_by_id.get(asset_id)
if token_meta is None:
failure_count += 1
failures.append(
{
"collected_at_utc": collected_at_utc,
"reason": "unknown_asset_id_in_book_response",
"asset_id": asset_id,
}
)
continue
sequence += 1
envelope = build_snapshot_envelope(
raw_book=raw_book,
token_meta=token_meta,
collected_at_utc=collected_at_utc,
sequence=sequence,
request_record=request_record,
response_index=response_index,
)
handle.write(json.dumps(envelope, separators=(",", ":"), sort_keys=True) + "\n")
rows_written += 1
token_row_counts[asset_id] = token_row_counts.get(asset_id, 0) + 1
handle.flush()
else:
failure_count += 1
failures.append(
{
"collected_at_utc": collected_at_utc,
"reason": "request_failed_or_non_json_list",
"status_code": status_code,
"attempts": request_record["attempts"],
"json_error": request_record["response"]["json_error"],
"text_preview": request_record["response"]["text_preview"],
}
)
remaining_interval = runtime["interval_seconds"] - (time.monotonic() - loop_started)
while remaining_interval > 0 and not STOP_REQUESTED and time.monotonic() < deadline:
sleep_for = min(remaining_interval, deadline - time.monotonic(), 1.0)
if sleep_for <= 0:
break
time.sleep(sleep_for)
remaining_interval = runtime["interval_seconds"] - (time.monotonic() - loop_started)
ended = utc_now()
ended_at_utc = iso_z(ended)
duration_seconds_actual = round((ended - started).total_seconds(), 3)
if STOP_REQUESTED:
warnings.append(f"Graceful shutdown requested by {STOP_SIGNAL}.")
if runtime["duration_seconds"] < 300:
warnings.append("Configured run duration was shorter than the roadmap 5-minute sample target.")
if not failures and request_count > 0:
failures = []
output_summary = summarize_output_file(output_file, rows_written)
gate_status = "PASS" if rows_written > 0 and all(count > 0 for count in token_row_counts.values()) else "FAIL"
if not tokens:
gate_status = "BLOCKED"
if request_count == 0:
gate_status = "FAIL" if tokens else "BLOCKED"
manifest = {
"schema_name": "orderbook_collector_sample_manifest",
"schema_version": 1,
"checkpoint_id": 4,
"checkpoint_name": "Minimal Orderbook Snapshot Collector",
"gate_status": gate_status,
"collector": {
"name": COLLECTOR_NAME,
"version": COLLECTOR_VERSION,
},
"started_at_utc": started_at_utc,
"ended_at_utc": ended_at_utc,
"run_duration_seconds": duration_seconds_actual,
"configured_duration_seconds": runtime["duration_seconds"],
"interval_seconds": runtime["interval_seconds"],
"command": command,
"config": {
"path": runtime["config_path"].as_posix() if runtime["config_path"] else None,
"sha256": runtime["config_sha256"],
"snapshot": runtime["config_snapshot"],
"effective": {
"discovery_path": discovery_path.as_posix(),
"output_dir": runtime["output_dir"].as_posix(),
"manifest_path": runtime["manifest_path"].as_posix(),
"market_limit": runtime["market_limit"],
"interval_seconds": runtime["interval_seconds"],
"duration_seconds": runtime["duration_seconds"],
"request_timeout_seconds": runtime["request_timeout_seconds"],
"max_retries": runtime["max_retries"],
"backoff_seconds": runtime["backoff_seconds"],
"market_end_safety_seconds": runtime["market_end_safety_seconds"],
"clob_books_url": runtime["clob_books_url"],
},
},
"discovery": {
"path": discovery_path.as_posix(),
"fetched_at_utc": discovery.get("fetched_at_utc"),
"source_summary": discovery.get("summary"),
"rejection_counts_before_selection": rejection_counts,
},
"markets_tracked": [
{
"market_name": market.get("market_name"),
"market_slug": market.get("market_slug"),
"condition_id": market.get("condition_id"),
"end_time_utc": market.get("end_time_utc"),
}
for market in selected_markets
],
"tokens_tracked": tokens,
"request_count": request_count,
"success_count": success_count,
"failure_count": failure_count,
"status_code_counts": dict(sorted(status_code_counts.items())),
"rows_written": rows_written,
"token_row_counts": token_row_counts,
"output_files": [output_summary],
"failures": failures,
"warnings": warnings,
"known_gaps": [
"This is a short run-rotated sample, not a daemon.",
"Hourly rotation is documented but not implemented in this checkpoint.",
"No websocket capture, normalization, upload, systemd unit, dashboard, database, or trading behavior is included.",
"A 5-minute sample proves file-writing behavior only; it does not prove 24/7 reliability.",
],
"fake_progress_risk": "A small successful sample can still hide long-run gaps, stale discovery, endpoint schema drift, and missed intervals. Reliability remains gated on the future 24h soak test.",
"next_step": "Checkpoint 5 should normalize this raw sample while preserving raw file references, or rerun a fresh short sample if the orchestrator wants more raw evidence first.",
}
return manifest, output_file
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Collect a bounded raw gzip JSONL sample of Polymarket BTC order books."
)
parser.add_argument("--config", type=Path, default=DEFAULT_CONFIG_PATH)
parser.add_argument("--discovery-path", type=Path, default=None)
parser.add_argument("--output-dir", type=Path, default=None)
parser.add_argument("--manifest-path", type=Path, default=None)
parser.add_argument("--market-limit", type=int, default=None)
parser.add_argument("--interval-seconds", type=float, default=None)
parser.add_argument("--duration-seconds", type=float, default=None)
parser.add_argument("--request-timeout-seconds", type=float, default=None)
parser.add_argument("--max-retries", type=int, default=None)
parser.add_argument("--backoff-seconds", type=float, default=None)
parser.add_argument("--market-end-safety-seconds", type=int, default=None)
parser.add_argument("--clob-books-url", type=str, default=None)
return parser.parse_args()
def main() -> int:
args = parse_args()
command = " ".join([Path(sys.argv[0]).as_posix(), *sys.argv[1:]])
runtime = build_runtime_config(args)
manifest, output_file = run_collection(runtime, command)
write_manifest(runtime["manifest_path"], manifest)
print(
json.dumps(
{
"gate_status": manifest["gate_status"],
"manifest_path": runtime["manifest_path"].as_posix(),
"output_file": output_file.as_posix(),
"markets_tracked": manifest["markets_tracked"],
"tokens_tracked": len(manifest["tokens_tracked"]),
"request_count": manifest["request_count"],
"success_count": manifest["success_count"],
"failure_count": manifest["failure_count"],
"rows_written": manifest["rows_written"],
"warnings": manifest["warnings"],
},
indent=2,
sort_keys=True,
)
)
return 0 if manifest["gate_status"] == "PASS" else 1
if __name__ == "__main__":
raise SystemExit(main())