#!/usr/bin/env python3 """Long-running raw Polymarket BTC websocket recorder with REST checkpoints. Checkpoint 10D scope: public BTC up/down market data only. The recorder writes raw websocket text exactly as received, keeps REST /books checkpoints as recovery/comparison evidence, and records rotation/reconnect/stale/divergence counters in manifests. It does not trade, sign, authenticate, or handle keys. """ from __future__ import annotations import argparse import base64 import datetime as dt import gzip import hashlib import json import os import signal import socket import ssl import struct import subprocess import sys import time import urllib.error import urllib.parse import urllib.request from decimal import Decimal, InvalidOperation from pathlib import Path from typing import Any COLLECTOR_NAME = "polymarket_ws_orderbook_recorder" COLLECTOR_VERSION = "0.1.0" WS_SCHEMA_NAME = "raw_polymarket_market_ws_message" REST_SCHEMA_NAME = "raw_polymarket_books_checkpoint" MANIFEST_SCHEMA_NAME = "polymarket_ws_recorder_manifest" SCHEMA_VERSION = 1 DEFAULT_CONFIG_PATH = Path("config/polymarket_ws_collector.example.yaml") DEFAULT_DISCOVERY_PATH = Path("data/discovery/polymarket_btc_markets_latest.json") DEFAULT_DISCOVERY_DIR = Path("data/discovery") DEFAULT_RAW_OUTPUT_ROOT = Path("/var/lib/orderbooks/raw_orderbooks") DEFAULT_MANIFEST_DIR = Path("/var/lib/orderbooks/manifests") DEFAULT_MANIFEST_PATH = Path("/var/lib/orderbooks/manifests/polymarket_ws_recorder_latest.json") MARKET_WS_URL = "wss://ws-subscriptions-clob.polymarket.com/ws/market" CLOB_BOOKS_URL = "https://clob.polymarket.com/books" DISCOVERY_SCRIPT = Path("scripts/discover_polymarket_btc_markets.py") SAFE_RESPONSE_HEADERS = { "cache-control", "cf-cache-status", "cf-ray", "content-length", "content-type", "date", "retry-after", "server", "x-ratelimit-limit", "x-ratelimit-remaining", "x-ratelimit-reset", "ratelimit-limit", "ratelimit-remaining", "ratelimit-reset", } STOP_REQUESTED = False STOP_SIGNAL: str | None = None def handle_stop(signum: int, _frame: Any) -> None: global STOP_REQUESTED, STOP_SIGNAL STOP_REQUESTED = True STOP_SIGNAL = signal.Signals(signum).name def utc_now() -> dt.datetime: return dt.datetime.now(dt.UTC) def iso_z(value: dt.datetime | None = None) -> str: value = value or utc_now() return value.astimezone(dt.UTC).replace(microsecond=0).isoformat().replace("+00:00", "Z") def compact_timestamp(value: dt.datetime | None = None) -> str: value = value or utc_now() return value.astimezone(dt.UTC).strftime("%Y%m%dT%H%M%SZ") def parse_iso(value: Any) -> dt.datetime | None: if not isinstance(value, str) or not value.strip(): return None text = value.strip() if text.endswith("Z"): text = text[:-1] + "+00:00" try: parsed = dt.datetime.fromisoformat(text) except ValueError: return None if parsed.tzinfo is None: parsed = parsed.replace(tzinfo=dt.UTC) return parsed.astimezone(dt.UTC) def sha256_bytes(data: bytes) -> str: return hashlib.sha256(data).hexdigest() def sha256_file(path: Path) -> str: digest = hashlib.sha256() with path.open("rb") as handle: for chunk in iter(lambda: handle.read(1024 * 1024), b""): digest.update(chunk) return digest.hexdigest() def parse_scalar(value: str) -> Any: value = value.strip() if not value: return "" if value[0] in {"'", '"'} and value[-1:] == value[0]: return value[1:-1] lower = value.lower() if lower in {"true", "false"}: return lower == "true" if lower in {"null", "none"}: return None try: return int(value) except ValueError: pass try: return float(value) except ValueError: return value def load_flat_yaml(path: Path) -> dict[str, Any]: config: dict[str, Any] = {} if not path.exists(): return config for line_number, raw_line in enumerate(path.read_text(encoding="utf-8").splitlines(), 1): line = raw_line.split("#", 1)[0].strip() if not line: continue if ":" not in line: raise ValueError(f"Unsupported config line {line_number}: {raw_line}") key, value = line.split(":", 1) key = key.strip() if not key: raise ValueError(f"Missing config key on line {line_number}") config[key] = parse_scalar(value) return config def config_value(config: dict[str, Any], args: argparse.Namespace, key: str, default: Any) -> Any: value = getattr(args, key, None) if value is not None: return value env_key = "ORDERBOOKS_WS_" + key.upper() if env_key in os.environ: return parse_scalar(os.environ[env_key]) return config.get(key, default) def filter_headers(headers: Any) -> dict[str, str]: safe: dict[str, str] = {} for key, value in dict(headers).items(): if key.lower() in SAFE_RESPONSE_HEADERS: safe[key] = value return safe def dec(value: Any) -> Decimal: if value is None: return Decimal("0") try: return Decimal(str(value)) except InvalidOperation: return Decimal("0") def dec_to_str(value: Decimal) -> str: text = format(value, "f") if "." in text: text = text.rstrip("0").rstrip(".") return text or "0" def level_map(levels: Any) -> dict[str, Decimal]: result: dict[str, Decimal] = {} if not isinstance(levels, list): return result for item in levels: if not isinstance(item, dict): continue price = dec_to_str(dec(item.get("price"))) result[price] = dec(item.get("size")) return result def top_levels(book: dict[str, Decimal], side: str, top_n: int) -> list[tuple[str, Decimal]]: return sorted(book.items(), key=lambda item: Decimal(item[0]), reverse=(side == "bids"))[:top_n] def summarize_book(bids: dict[str, Decimal], asks: dict[str, Decimal], top_n: int) -> dict[str, Any]: bid_levels = top_levels(bids, "bids", top_n) ask_levels = top_levels(asks, "asks", top_n) best_bid = bid_levels[0][0] if bid_levels else None best_ask = ask_levels[0][0] if ask_levels else None spread = dec_to_str(Decimal(best_ask) - Decimal(best_bid)) if best_bid and best_ask else None return { "best_bid": best_bid, "best_ask": best_ask, "spread": spread, "bid_level_count": len(bids), "ask_level_count": len(asks), "top_bids": [{"price": price, "size": dec_to_str(size)} for price, size in bid_levels], "top_asks": [{"price": price, "size": dec_to_str(size)} for price, size in ask_levels], } def compare_side(local: dict[str, Decimal], rest: dict[str, Decimal], side: str, top_n: int) -> dict[str, Any]: local_top = dict(top_levels(local, side, top_n)) rest_top = dict(top_levels(rest, side, top_n)) missing = sorted(set(rest_top) - set(local_top), key=Decimal, reverse=(side == "bids")) extra = sorted(set(local_top) - set(rest_top), key=Decimal, reverse=(side == "bids")) size_deltas = [] for price in sorted(set(local_top) & set(rest_top), key=Decimal, reverse=(side == "bids")): delta = local_top[price] - rest_top[price] if delta != 0: size_deltas.append({ "price": price, "local_size": dec_to_str(local_top[price]), "rest_size": dec_to_str(rest_top[price]), "delta": dec_to_str(delta), }) return {"missing_prices": missing, "extra_prices": extra, "size_deltas": size_deltas} class BookState: def __init__(self, token_meta: dict[str, Any]) -> None: self.token_meta = token_meta self.bids: dict[str, Decimal] = {} self.asks: dict[str, Decimal] = {} self.initialized = False self.messages_applied = 0 self.messages_skipped = 0 self.unknown_messages = 0 self.last_update_received_at_utc: str | None = None def apply_book(self, item: dict[str, Any], received_at_utc: str) -> None: self.bids = level_map(item.get("bids")) self.asks = level_map(item.get("asks")) self.initialized = True self.messages_applied += 1 self.last_update_received_at_utc = received_at_utc def apply_change(self, change: dict[str, Any], received_at_utc: str) -> str | None: if not self.initialized: self.messages_skipped += 1 return "price_change_before_book_snapshot" side = str(change.get("side") or "").upper() price = dec_to_str(dec(change.get("price"))) size = dec(change.get("size")) if side == "BUY": book_side = self.bids elif side == "SELL": book_side = self.asks else: self.messages_skipped += 1 return f"unsupported_price_change_side:{side}" if size == 0: book_side.pop(price, None) else: book_side[price] = size self.messages_applied += 1 self.last_update_received_at_utc = received_at_utc return None def summary(self, top_n: int) -> dict[str, Any]: state_quality = "insufficient_events" if self.initialized and self.messages_applied > 1: state_quality = "initialized_and_updated" elif self.initialized: state_quality = "snapshot_only" return { "token": self.token_meta, "initialized": self.initialized, "state_quality": state_quality, "messages_applied": self.messages_applied, "messages_skipped": self.messages_skipped, "unknown_messages": self.unknown_messages, "last_update_received_at_utc": self.last_update_received_at_utc, **summarize_book(self.bids, self.asks, top_n), } class ArchiveWriter: def __init__(self, *, root: Path, subdir: str, prefix: str, run_id: str) -> None: self.root = root self.subdir = subdir self.prefix = prefix self.run_id = run_id self.current_hour: str | None = None self.temp_path: Path | None = None self.final_path: Path | None = None self.handle: gzip.GzipFile | None = None self.rows = 0 self.started_at_utc: str | None = None self.closed_files: list[dict[str, Any]] = [] def _paths_for(self, when: dt.datetime) -> tuple[str, Path, Path]: hour = when.astimezone(dt.UTC).strftime("%Y/%m/%d/%H") hour_id = when.astimezone(dt.UTC).strftime("%Y%m%dT%H0000Z") directory = self.root / self.subdir / hour final_path = directory / f"{self.prefix}_{self.run_id}_{hour_id}.jsonl.gz" temp_path = directory / f".{final_path.name}.open" return hour, temp_path, final_path def ensure_open(self, when: dt.datetime | None = None) -> None: when = when or utc_now() hour, temp_path, final_path = self._paths_for(when) if self.handle is not None and self.current_hour == hour: return self.close(ended_at=when) temp_path.parent.mkdir(parents=True, exist_ok=True) self.current_hour = hour self.temp_path = temp_path self.final_path = final_path self.rows = 0 self.started_at_utc = iso_z(when) self.handle = gzip.open(temp_path, "at", encoding="utf-8") def write(self, row: dict[str, Any], when: dt.datetime | None = None) -> None: self.ensure_open(when) assert self.handle is not None self.handle.write(json.dumps(row, separators=(",", ":"), sort_keys=True) + "\n") self.rows += 1 if self.rows % 100 == 0: self.handle.flush() def close(self, ended_at: dt.datetime | None = None) -> None: if self.handle is None: return ended_at = ended_at or utc_now() self.handle.close() assert self.temp_path is not None and self.final_path is not None self.final_path.parent.mkdir(parents=True, exist_ok=True) if self.final_path.exists(): self.final_path.unlink() self.temp_path.rename(self.final_path) self.closed_files.append({ "path": self.final_path.as_posix(), "kind": self.prefix, "started_at_utc": self.started_at_utc, "ended_at_utc": iso_z(ended_at), "rows": self.rows, "bytes": self.final_path.stat().st_size, "sha256": sha256_file(self.final_path), "status": "valid" if self.rows > 0 else "empty", }) self.current_hour = None self.temp_path = None self.final_path = None self.handle = None self.rows = 0 self.started_at_utc = None def send_ws_frame(sock: ssl.SSLSocket, opcode: int, payload: bytes) -> None: mask = os.urandom(4) header = bytearray([0x80 | opcode]) length = len(payload) if length < 126: header.append(0x80 | length) elif length < 65536: header.append(0x80 | 126) header.extend(struct.pack("!H", length)) else: header.append(0x80 | 127) header.extend(struct.pack("!Q", length)) masked = bytes(byte ^ mask[index % 4] for index, byte in enumerate(payload)) sock.sendall(header + mask + masked) def read_exact(sock: ssl.SSLSocket, length: int) -> bytes: data = bytearray() while len(data) < length: chunk = sock.recv(length - len(data)) if not chunk: raise EOFError("websocket connection closed while reading frame") data.extend(chunk) return bytes(data) def read_ws_frame(sock: ssl.SSLSocket) -> tuple[int, bytes]: first, second = read_exact(sock, 2) opcode = first & 0x0F length = second & 0x7F masked = bool(second & 0x80) if length == 126: length = struct.unpack("!H", read_exact(sock, 2))[0] elif length == 127: length = struct.unpack("!Q", read_exact(sock, 8))[0] mask = read_exact(sock, 4) if masked else b"" payload = read_exact(sock, length) if length else b"" if masked: payload = bytes(byte ^ mask[index % 4] for index, byte in enumerate(payload)) return opcode, payload def parse_ws_headers(raw_headers: str) -> tuple[str, dict[str, str]]: lines = raw_headers.split("\r\n") status_line = lines[0] if lines else "" headers: dict[str, str] = {} for line in lines[1:]: if ":" not in line: continue key, value = line.split(":", 1) headers[key.strip()] = value.strip() return status_line, filter_headers(headers) def open_websocket(url: str, timeout_seconds: float) -> tuple[ssl.SSLSocket, dict[str, Any]]: parsed = urllib.parse.urlparse(url) host = parsed.hostname if not host: raise ValueError("missing websocket host") port = parsed.port or 443 path = parsed.path or "/" if parsed.query: path = f"{path}?{parsed.query}" raw_sock = socket.create_connection((host, port), timeout=timeout_seconds) sock = ssl.create_default_context().wrap_socket(raw_sock, server_hostname=host) sock.settimeout(timeout_seconds) key = base64.b64encode(os.urandom(16)).decode("ascii") request = ( f"GET {path} HTTP/1.1\r\n" f"Host: {host}\r\n" "Upgrade: websocket\r\n" "Connection: Upgrade\r\n" f"Sec-WebSocket-Key: {key}\r\n" "Sec-WebSocket-Version: 13\r\n" f"User-Agent: orderbooks-polymarket-ws-recorder/{COLLECTOR_VERSION}\r\n" "\r\n" ) sock.sendall(request.encode("ascii")) raw_headers = bytearray() while b"\r\n\r\n" not in raw_headers: raw_headers.extend(sock.recv(4096)) if len(raw_headers) > 65536: raise ValueError("websocket handshake headers exceeded 64 KiB") header_text = bytes(raw_headers).split(b"\r\n\r\n", 1)[0].decode("iso-8859-1", errors="replace") status_line, response_headers = parse_ws_headers(header_text) if " 101 " not in status_line: raise ValueError(f"websocket upgrade failed: {status_line}") return sock, {"status_line": status_line, "headers": response_headers} def decode_json_maybe(text: str) -> tuple[Any | None, str | None]: try: return json.loads(text), None except json.JSONDecodeError as exc: return None, str(exc) def classify_payload(payload: Any) -> list[str]: items = payload if isinstance(payload, list) else [payload] event_types: list[str] = [] for item in items: if not isinstance(item, dict): event_types.append(type(item).__name__) elif item.get("event_type"): event_types.append(str(item["event_type"])) elif {"asset_id", "bids", "asks"}.issubset(item.keys()): event_types.append("book") else: event_types.append("unknown_object") return event_types def raw_items(payload: Any) -> list[dict[str, Any]]: items = payload if isinstance(payload, list) else [payload] return [item for item in items if isinstance(item, dict)] def load_discovery(path: Path) -> dict[str, Any]: return json.loads(path.read_text(encoding="utf-8")) def market_is_usable(market: dict[str, Any], now: dt.datetime, safety_seconds: int) -> tuple[bool, list[str]]: reasons: list[str] = [] if market.get("active") is not True: reasons.append("not_active") if market.get("closed") is not False: reasons.append("closed") if market.get("accepting_orders") is not True: reasons.append("not_accepting_orders") if market.get("enable_order_book") is not True: reasons.append("order_book_not_enabled") end_time = parse_iso(market.get("end_time_utc")) if end_time is None: reasons.append("missing_end_time") elif end_time <= now + dt.timedelta(seconds=safety_seconds): reasons.append("too_close_to_end_or_expired") tokens = market.get("tokens") if not isinstance(tokens, list) or len(tokens) < 2: reasons.append("missing_two_tokens") else: outcomes = [token.get("outcome") for token in tokens if isinstance(token, dict)] token_ids = [token.get("token_id") for token in tokens if isinstance(token, dict)] if outcomes[:2] != ["Up", "Down"] or not all(token_ids[:2]): reasons.append("bad_up_down_token_mapping") return not reasons, reasons def select_markets(discovery: dict[str, Any], market_limit: int, market_end_safety_seconds: int) -> tuple[list[dict[str, Any]], dict[str, int]]: now = utc_now() selected: list[dict[str, Any]] = [] rejection_counts: dict[str, int] = {} for market in discovery.get("normalized_markets") or []: if not isinstance(market, dict): rejection_counts["not_object"] = rejection_counts.get("not_object", 0) + 1 continue usable, reasons = market_is_usable(market, now, market_end_safety_seconds) if not usable: for reason in reasons: rejection_counts[reason] = rejection_counts.get(reason, 0) + 1 continue selected.append(market) if market_limit > 0 and len(selected) >= market_limit: break return selected, dict(sorted(rejection_counts.items())) def flatten_tokens(markets: list[dict[str, Any]]) -> list[dict[str, Any]]: tokens: list[dict[str, Any]] = [] for market in markets: for token in market.get("tokens", [])[:2]: if not isinstance(token, dict): continue tokens.append({ "market_name": market.get("market_name"), "market_slug": market.get("market_slug"), "condition_id": market.get("condition_id"), "token_id": str(token.get("token_id")), "outcome": token.get("outcome"), "outcome_index": token.get("outcome_index"), "market_end_time_utc": market.get("end_time_utc"), }) return tokens def run_discovery(config: dict[str, Any], counters: dict[str, Any], warnings: list[str], errors: list[dict[str, Any]]) -> None: script_path = Path(config["discovery_script_path"]) if not config["discovery_execute"]: return if not script_path.exists(): warnings.append(f"discovery script missing; using existing artifact only: {script_path}") return output_json = Path(config["discovery_path"]) output_json.parent.mkdir(parents=True, exist_ok=True) cmd = [ sys.executable, script_path.as_posix(), "--output-json", output_json.as_posix(), "--manifest", str(Path(config["discovery_dir"]) / "polymarket_btc_markets_manifest.json"), "--markdown", str(Path(config["discovery_dir"]) / "polymarket_btc_markets.md"), "--max-pages", str(config["discovery_max_pages"]), "--limit", str(config["discovery_page_limit"]), "--timeout", str(config["request_timeout_seconds"]), ] started = time.monotonic() result = subprocess.run(cmd, text=True, capture_output=True, timeout=max(30, int(config["request_timeout_seconds"] * (config["discovery_max_pages"] + 1)))) counters["discovery_refresh_count"] += 1 counters["last_discovery_duration_ms"] = round((time.monotonic() - started) * 1000, 3) if result.returncode != 0: counters["discovery_failure_count"] += 1 errors.append({ "stage": "discovery_refresh", "returncode": result.returncode, "stderr_tail": result.stderr[-2000:], "stdout_tail": result.stdout[-1000:], }) def refresh_market_state(config: dict[str, Any], counters: dict[str, Any], warnings: list[str], errors: list[dict[str, Any]]) -> tuple[list[dict[str, Any]], list[dict[str, Any]], dict[str, int]]: run_discovery(config, counters, warnings, errors) discovery_path = Path(config["discovery_path"]) discovery = load_discovery(discovery_path) markets, rejection_counts = select_markets(discovery, int(config["market_limit"]), int(config["market_end_safety_seconds"])) tokens = flatten_tokens(markets) counters["markets_tracked"] = len(markets) counters["tokens_tracked"] = len(tokens) counters["last_discovery_path"] = discovery_path.as_posix() counters["last_discovery_sha256"] = sha256_file(discovery_path) if discovery_path.exists() else None return markets, tokens, rejection_counts def apply_payload_to_books(payload: Any, states: dict[str, BookState], received_at_utc: str, counters: dict[str, Any], warnings: list[str]) -> None: for item in raw_items(payload): event_type = item.get("event_type") or ("book" if {"asset_id", "bids", "asks"}.issubset(item.keys()) else "unknown_object") counters["event_type_counts"][event_type] = counters["event_type_counts"].get(event_type, 0) + 1 if event_type == "book": state = states.get(str(item.get("asset_id") or "")) if state: state.apply_book(item, received_at_utc) elif event_type == "price_change": changes = item.get("price_changes") if not isinstance(changes, list): counters["parser_warning_count"] += 1 warnings.append("price_change event without price_changes list") continue for change in changes: if not isinstance(change, dict): continue state = states.get(str(change.get("asset_id") or "")) if not state: continue reason = state.apply_change(change, received_at_utc) if reason: counters["book_update_skip_counts"][reason] = counters["book_update_skip_counts"].get(reason, 0) + 1 elif event_type in {"best_bid_ask", "last_trade_price", "new_market"}: counters["non_mutating_event_counts"][event_type] = counters["non_mutating_event_counts"].get(event_type, 0) + 1 else: counters["unknown_event_counts"][event_type] = counters["unknown_event_counts"].get(event_type, 0) + 1 for state in states.values(): state.unknown_messages += 1 def compare_state_to_rest(state: BookState, rest_item: dict[str, Any], top_n: int) -> dict[str, Any]: rest_bids = level_map(rest_item.get("bids")) rest_asks = level_map(rest_item.get("asks")) local_summary = summarize_book(state.bids, state.asks, top_n) rest_summary = summarize_book(rest_bids, rest_asks, top_n) bid_diff = compare_side(state.bids, rest_bids, "bids", top_n) ask_diff = compare_side(state.asks, rest_asks, "asks", top_n) best_bid_affected = local_summary["best_bid"] != rest_summary["best_bid"] best_ask_affected = local_summary["best_ask"] != rest_summary["best_ask"] spread_affected = local_summary["spread"] != rest_summary["spread"] level_count_affected = local_summary["bid_level_count"] != rest_summary["bid_level_count"] or local_summary["ask_level_count"] != rest_summary["ask_level_count"] price_membership_affected = bool(bid_diff["missing_prices"] or bid_diff["extra_prices"] or ask_diff["missing_prices"] or ask_diff["extra_prices"]) size_delta_count = len(bid_diff["size_deltas"]) + len(ask_diff["size_deltas"]) divergent = any([best_bid_affected, best_ask_affected, spread_affected, level_count_affected, price_membership_affected, size_delta_count]) return { "comparison_status": "divergent" if divergent else "match", "best_bid_affected": best_bid_affected, "best_ask_affected": best_ask_affected, "spread_affected": spread_affected, "level_count_affected": level_count_affected, "price_membership_affected": price_membership_affected, "size_only_divergent": bool(size_delta_count) and not any([best_bid_affected, best_ask_affected, spread_affected, level_count_affected, price_membership_affected]), "bid_size_delta_count": len(bid_diff["size_deltas"]), "ask_size_delta_count": len(ask_diff["size_deltas"]), } def http_post_books(url: str, token_ids: list[str], timeout_seconds: float) -> dict[str, Any]: requested_at_utc = iso_z() started = time.monotonic() request_body = [{"token_id": token_id} for token_id in token_ids] body_bytes = json.dumps(request_body, separators=(",", ":")).encode("utf-8") status_code: int | None = None headers: dict[str, str] = {} response_text = "" error: str | None = None try: request = urllib.request.Request( url, data=body_bytes, headers={ "Accept": "application/json", "Content-Type": "application/json", "User-Agent": f"orderbooks-polymarket-ws-recorder/{COLLECTOR_VERSION}", }, method="POST", ) with urllib.request.urlopen(request, timeout=timeout_seconds) as response: status_code = response.status headers = filter_headers(response.headers) response_text = response.read().decode("utf-8", errors="replace") except urllib.error.HTTPError as exc: status_code = exc.code headers = filter_headers(exc.headers) response_text = exc.read().decode("utf-8", errors="replace") error = f"HTTPError: {exc}" except Exception as exc: # noqa: BLE001 - preserve failure evidence error = f"{type(exc).__name__}: {exc}" parsed_json, json_error = decode_json_maybe(response_text) if response_text else (None, None) return { "requested_at_utc": requested_at_utc, "received_at_utc": iso_z(), "duration_ms": round((time.monotonic() - started) * 1000, 3), "request_body": request_body, "status_code": status_code, "headers": headers, "raw_response_json": parsed_json, "json_error": json_error, "raw_response_text_sha256": sha256_bytes(response_text.encode("utf-8")), "raw_response_length_bytes": len(response_text.encode("utf-8")), "raw_response_text_preview": response_text[:1000] if parsed_json is None else None, "error": error, "ok": error is None and status_code is not None and 200 <= status_code < 300 and json_error is None, } def fetch_rest_checkpoint( *, config: dict[str, Any], rest_writer: ArchiveWriter, checkpoint_sequence: int, tokens: list[dict[str, Any]], states: dict[str, BookState], counters: dict[str, Any], ) -> None: token_ids = [token["token_id"] for token in tokens] batch_size = max(1, int(config["rest_batch_size"])) for batch_index, start in enumerate(range(0, len(token_ids), batch_size), 1): batch = token_ids[start:start + batch_size] response = http_post_books(config["clob_books_url"], batch, float(config["request_timeout_seconds"])) counters["rest_request_count"] += 1 if response["ok"]: counters["rest_success_count"] += 1 else: counters["rest_failure_count"] += 1 if response.get("status_code") == 429: counters["rest_rate_limit_count"] += 1 comparison = { "match_count": 0, "divergent_count": 0, "no_state_count": 0, "best_bid_affected_count": 0, "best_ask_affected_count": 0, "spread_affected_count": 0, "level_count_affected_count": 0, "price_membership_affected_count": 0, "size_only_divergent_count": 0, "bid_size_delta_count": 0, "ask_size_delta_count": 0, } payload = response.get("raw_response_json") if isinstance(payload, list): for rest_item in payload: if not isinstance(rest_item, dict): continue token_id = str(rest_item.get("asset_id") or "") state = states.get(token_id) if not state or not state.initialized: comparison["no_state_count"] += 1 continue cmp = compare_state_to_rest(state, rest_item, int(config["top_n"])) if cmp["comparison_status"] == "match": comparison["match_count"] += 1 else: comparison["divergent_count"] += 1 for key in ["best_bid_affected", "best_ask_affected", "spread_affected", "level_count_affected", "price_membership_affected", "size_only_divergent"]: if cmp[key]: comparison[f"{key}_count"] += 1 comparison["bid_size_delta_count"] += cmp["bid_size_delta_count"] comparison["ask_size_delta_count"] += cmp["ask_size_delta_count"] for key, value in comparison.items(): counters["rest_comparison_counts"][key] = counters["rest_comparison_counts"].get(key, 0) + value rest_writer.write({ "schema_name": REST_SCHEMA_NAME, "schema_version": SCHEMA_VERSION, "collector": {"name": COLLECTOR_NAME, "version": COLLECTOR_VERSION}, "checkpoint_sequence": checkpoint_sequence, "batch_index": batch_index, "batch_count": (len(token_ids) + batch_size - 1) // batch_size, "token_ids": batch, "tokens_tracked_count": len(token_ids), "response": response, "comparison_summary": comparison, }) def build_ws_envelope( *, run_id: str, session_id: str, connection_sequence: int, message_sequence: int, global_message_sequence: int, received_at_utc: str, websocket_url: str, subscription: dict[str, Any], tokens: list[dict[str, Any]], opcode: int, payload_bytes: bytes, ) -> tuple[dict[str, Any], Any | None, list[str], bool]: decode_error = None try: raw_text = payload_bytes.decode("utf-8") except UnicodeDecodeError as exc: decode_error = str(exc) raw_text = payload_bytes.decode("utf-8", errors="replace") parsed_json, json_error = decode_json_maybe(raw_text) if decode_error is None else (None, decode_error) event_types = classify_payload(parsed_json) if parsed_json is not None else ["unparseable_text"] envelope = { "schema_name": WS_SCHEMA_NAME, "schema_version": SCHEMA_VERSION, "collector": {"name": COLLECTOR_NAME, "version": COLLECTOR_VERSION}, "run_id": run_id, "session_id": session_id, "connection_sequence": connection_sequence, "message_sequence": message_sequence, "global_message_sequence": global_message_sequence, "received_at_utc": received_at_utc, "websocket": {"url": websocket_url}, "subscription": subscription, "tokens_tracked": tokens, "opcode": opcode, "payload_length_bytes": len(payload_bytes), "payload_sha256": sha256_bytes(payload_bytes), "raw_text": raw_text, "json": parsed_json, "json_error": json_error, "classified_event_types": event_types, } return envelope, parsed_json, event_types, parsed_json is not None def initial_counters() -> dict[str, Any]: return { "websocket_message_count": 0, "websocket_parsed_json_count": 0, "websocket_parse_error_count": 0, "websocket_opcode_counts": {}, "event_type_counts": {}, "non_mutating_event_counts": {}, "unknown_event_counts": {}, "book_update_skip_counts": {}, "parser_warning_count": 0, "connection_count": 0, "reconnect_count": 0, "subscription_change_count": 0, "stale_feed_count": 0, "max_gap_seconds": None, "last_message_received_at_utc": None, "rest_request_count": 0, "rest_success_count": 0, "rest_failure_count": 0, "rest_rate_limit_count": 0, "rest_comparison_counts": {}, "discovery_refresh_count": 0, "discovery_failure_count": 0, "last_discovery_duration_ms": None, "markets_tracked": 0, "tokens_tracked": 0, "last_discovery_path": None, "last_discovery_sha256": None, } def summarize_states(states: dict[str, BookState], top_n: int) -> list[dict[str, Any]]: return [state.summary(top_n) for state in states.values()] def write_manifest( *, config: dict[str, Any], run_id: str, started_at_utc: str, status: str, gate_status: str, shutdown_reason: str | None, markets: list[dict[str, Any]], tokens: list[dict[str, Any]], counters: dict[str, Any], ws_writer: ArchiveWriter, rest_writer: ArchiveWriter, states: dict[str, BookState], warnings: list[str], errors: list[dict[str, Any]], ) -> dict[str, Any]: manifest = { "schema_name": MANIFEST_SCHEMA_NAME, "schema_version": 1, "collector": {"name": COLLECTOR_NAME, "version": COLLECTOR_VERSION}, "run_id": run_id, "started_at_utc": started_at_utc, "updated_at_utc": iso_z(), "status": status, "gate_status": gate_status, "shutdown_reason": shutdown_reason, "production_ready": False, "public_data_only": True, "trading_enabled": False, "command": config.get("command"), "config": public_config(config), "markets_tracked": markets, "tokens_tracked": tokens, "counters": counters, "state_summary": summarize_states(states, int(config["top_n"])), "output_files": [*ws_writer.closed_files, *rest_writer.closed_files], "open_files": [path.as_posix() for path in [ws_writer.temp_path, rest_writer.temp_path] if path is not None], "warnings": sorted(set(warnings)), "errors": errors[-20:], } manifest_dir = Path(config["manifest_dir"]) manifest_dir.mkdir(parents=True, exist_ok=True) manifest_path = Path(config["manifest_path"]) manifest_path.parent.mkdir(parents=True, exist_ok=True) immutable_path = manifest_dir / f"polymarket_ws_recorder_{run_id}_{compact_timestamp()}.json" text = json.dumps(manifest, indent=2, sort_keys=True) + "\n" manifest_path.write_text(text, encoding="utf-8") immutable_path.write_text(text, encoding="utf-8") return manifest def public_config(config: dict[str, Any]) -> dict[str, Any]: result = {} for key, value in config.items(): if isinstance(value, Path): result[key] = value.as_posix() else: result[key] = value return result def build_config(args: argparse.Namespace) -> dict[str, Any]: file_config = load_flat_yaml(args.config) if args.config else {} duration = config_value(file_config, args, "duration_seconds", None) config = { "config_path": args.config.as_posix() if args.config else None, "config_sha256": sha256_file(args.config) if args.config and args.config.exists() else None, "discovery_path": Path(config_value(file_config, args, "discovery_path", DEFAULT_DISCOVERY_PATH)), "discovery_dir": Path(config_value(file_config, args, "discovery_dir", DEFAULT_DISCOVERY_DIR)), "discovery_script_path": Path(config_value(file_config, args, "discovery_script_path", DISCOVERY_SCRIPT)), "discovery_execute": bool(config_value(file_config, args, "discovery_execute", True)), "discovery_refresh_interval_seconds": float(config_value(file_config, args, "discovery_refresh_interval_seconds", 600)), "discovery_max_pages": int(config_value(file_config, args, "discovery_max_pages", 3)), "discovery_page_limit": int(config_value(file_config, args, "discovery_page_limit", 100)), "raw_output_root": Path(config_value(file_config, args, "raw_output_root", DEFAULT_RAW_OUTPUT_ROOT)), "manifest_dir": Path(config_value(file_config, args, "manifest_dir", DEFAULT_MANIFEST_DIR)), "manifest_path": Path(config_value(file_config, args, "manifest_path", DEFAULT_MANIFEST_PATH)), "websocket_url": str(config_value(file_config, args, "websocket_url", MARKET_WS_URL)), "clob_books_url": str(config_value(file_config, args, "clob_books_url", CLOB_BOOKS_URL)), "market_limit": int(config_value(file_config, args, "market_limit", 0) or 0), "market_end_safety_seconds": int(config_value(file_config, args, "market_end_safety_seconds", 420)), "rest_checkpoint_interval_seconds": float(config_value(file_config, args, "rest_checkpoint_interval_seconds", 60)), "rest_batch_size": int(config_value(file_config, args, "rest_batch_size", 50)), "top_n": int(config_value(file_config, args, "top_n", 10)), "stale_feed_threshold_seconds": float(config_value(file_config, args, "stale_feed_threshold_seconds", 30)), "request_timeout_seconds": float(config_value(file_config, args, "request_timeout_seconds", 15)), "websocket_timeout_seconds": float(config_value(file_config, args, "websocket_timeout_seconds", 10)), "reconnect_backoff_seconds": float(config_value(file_config, args, "reconnect_backoff_seconds", 3)), "max_reconnect_backoff_seconds": float(config_value(file_config, args, "max_reconnect_backoff_seconds", 60)), "duration_seconds": float(duration) if duration is not None else None, "manifest_write_interval_seconds": float(config_value(file_config, args, "manifest_write_interval_seconds", 300)), } if config["rest_batch_size"] < 1: raise ValueError("rest_batch_size must be >= 1") if config["market_limit"] < 0: raise ValueError("market_limit must be >= 0; use 0 for all active BTC markets") return config def run_recorder(config: dict[str, Any], command: str) -> dict[str, Any]: signal.signal(signal.SIGINT, handle_stop) signal.signal(signal.SIGTERM, handle_stop) started = utc_now() started_at_utc = iso_z(started) run_id = compact_timestamp(started) deadline = time.monotonic() + float(config["duration_seconds"]) if config["duration_seconds"] else None counters = initial_counters() warnings: list[str] = [] errors: list[dict[str, Any]] = [] markets: list[dict[str, Any]] = [] tokens: list[dict[str, Any]] = [] states: dict[str, BookState] = {} rejection_counts: dict[str, int] = {} ws_writer = ArchiveWriter(root=Path(config["raw_output_root"]), subdir="polymarket/ws_raw", prefix="polymarket_ws_raw", run_id=run_id) rest_writer = ArchiveWriter(root=Path(config["raw_output_root"]), subdir="polymarket/rest_checkpoints", prefix="polymarket_rest_checkpoints", run_id=run_id) next_discovery = 0.0 next_rest_checkpoint = 0.0 next_manifest_write = time.monotonic() + float(config["manifest_write_interval_seconds"]) checkpoint_sequence = 0 global_sequence = 0 connection_sequence = 0 reconnect_backoff = float(config["reconnect_backoff_seconds"]) last_text_message_monotonic: float | None = None shutdown_reason: str | None = None try: while not STOP_REQUESTED: if deadline is not None and time.monotonic() >= deadline: shutdown_reason = "duration_elapsed" break if time.monotonic() >= next_discovery or not tokens: old_token_ids = [token["token_id"] for token in tokens] try: markets, tokens, rejection_counts = refresh_market_state(config, counters, warnings, errors) except Exception as exc: # noqa: BLE001 - preserve evidence and retry counters["discovery_failure_count"] += 1 errors.append({"stage": "load_discovery", "error": f"{type(exc).__name__}: {exc}"}) time.sleep(min(reconnect_backoff, 30)) next_discovery = time.monotonic() + float(config["discovery_refresh_interval_seconds"]) continue new_token_ids = [token["token_id"] for token in tokens] if old_token_ids and old_token_ids != new_token_ids: counters["subscription_change_count"] += 1 states = {token["token_id"]: states.get(token["token_id"], BookState(token)) for token in tokens} next_discovery = time.monotonic() + float(config["discovery_refresh_interval_seconds"]) if not tokens: warnings.append("no active BTC Up/Down tokens available after discovery") time.sleep(min(reconnect_backoff, 30)) continue token_ids = [token["token_id"] for token in tokens] subscription = {"assets_ids": token_ids, "type": "market", "custom_feature_enabled": True} connection_sequence += 1 session_id = f"{run_id}-ws{connection_sequence}" sock: ssl.SSLSocket | None = None try: sock, handshake = open_websocket(str(config["websocket_url"]), float(config["websocket_timeout_seconds"])) counters["connection_count"] += 1 send_ws_frame(sock, 1, json.dumps(subscription, separators=(",", ":")).encode("utf-8")) session_message_sequence = 0 session_started_at_utc = iso_z() reconnect_backoff = float(config["reconnect_backoff_seconds"]) while not STOP_REQUESTED: now_monotonic = time.monotonic() if deadline is not None and now_monotonic >= deadline: shutdown_reason = "duration_elapsed" break if now_monotonic >= next_rest_checkpoint: checkpoint_sequence += 1 fetch_rest_checkpoint(config=config, rest_writer=rest_writer, checkpoint_sequence=checkpoint_sequence, tokens=tokens, states=states, counters=counters) next_rest_checkpoint = now_monotonic + float(config["rest_checkpoint_interval_seconds"]) if now_monotonic >= next_manifest_write: write_manifest(config=config, run_id=run_id, started_at_utc=started_at_utc, status="RUNNING", gate_status="IN_PROGRESS", shutdown_reason=None, markets=markets, tokens=tokens, counters=counters, ws_writer=ws_writer, rest_writer=rest_writer, states=states, warnings=warnings, errors=errors) next_manifest_write = now_monotonic + float(config["manifest_write_interval_seconds"]) if now_monotonic >= next_discovery: previous_token_ids = token_ids markets, tokens, rejection_counts = refresh_market_state(config, counters, warnings, errors) token_ids = [token["token_id"] for token in tokens] next_discovery = now_monotonic + float(config["discovery_refresh_interval_seconds"]) if token_ids != previous_token_ids: counters["subscription_change_count"] += 1 states = {token["token_id"]: states.get(token["token_id"], BookState(token)) for token in tokens} raise RuntimeError("tracked token set changed; reconnecting with new subscription") if last_text_message_monotonic is not None: silence = now_monotonic - last_text_message_monotonic if silence > float(config["stale_feed_threshold_seconds"]): counters["stale_feed_count"] += 1 raise TimeoutError(f"stale websocket feed for {silence:.3f}s") try: opcode, payload = read_ws_frame(sock) except socket.timeout: continue counters["websocket_opcode_counts"][str(opcode)] = counters["websocket_opcode_counts"].get(str(opcode), 0) + 1 if opcode == 1: received_at_utc = iso_z() now_for_gap = time.monotonic() if last_text_message_monotonic is not None: gap = round(now_for_gap - last_text_message_monotonic, 3) previous = counters.get("max_gap_seconds") counters["max_gap_seconds"] = gap if previous is None else max(previous, gap) last_text_message_monotonic = now_for_gap counters["last_message_received_at_utc"] = received_at_utc session_message_sequence += 1 global_sequence += 1 envelope, parsed_json, event_types, parse_ok = build_ws_envelope( run_id=run_id, session_id=session_id, connection_sequence=connection_sequence, message_sequence=session_message_sequence, global_message_sequence=global_sequence, received_at_utc=received_at_utc, websocket_url=str(config["websocket_url"]), subscription=subscription, tokens=tokens, opcode=opcode, payload_bytes=payload, ) ws_writer.write(envelope) counters["websocket_message_count"] += 1 if parse_ok: counters["websocket_parsed_json_count"] += 1 apply_payload_to_books(parsed_json, states, received_at_utc, counters, warnings) else: counters["websocket_parse_error_count"] += 1 elif opcode == 8: raise EOFError("websocket close frame received") elif opcode == 9: send_ws_frame(sock, 10, payload) elif opcode == 10: continue else: warnings.append(f"ignored websocket opcode {opcode}") if shutdown_reason: break except Exception as exc: # noqa: BLE001 - preserve reconnect evidence errors.append({"stage": "websocket_session", "connection_sequence": connection_sequence, "error": f"{type(exc).__name__}: {exc}"}) counters["reconnect_count"] += 1 if STOP_REQUESTED: shutdown_reason = STOP_SIGNAL or "stop_requested" break time.sleep(min(reconnect_backoff, float(config["max_reconnect_backoff_seconds"]))) reconnect_backoff = min(reconnect_backoff * 2, float(config["max_reconnect_backoff_seconds"])) finally: if sock is not None: try: sock.close() except OSError: pass if shutdown_reason: break finally: if STOP_REQUESTED and shutdown_reason is None: shutdown_reason = STOP_SIGNAL or "stop_requested" ws_writer.close() rest_writer.close() if shutdown_reason is None: shutdown_reason = "loop_exited" if counters["websocket_message_count"] > 0 and counters["rest_success_count"] > 0 and ws_writer.closed_files and rest_writer.closed_files: gate_status = "PASS" else: gate_status = "BLOCKED_RUNTIME_EVIDENCE" status = "INTERRUPTED" if STOP_SIGNAL else ("COMPLETED_BOUNDED" if config["duration_seconds"] else "STOPPED") config["command"] = command manifest = write_manifest(config=config, run_id=run_id, started_at_utc=started_at_utc, status=status, gate_status=gate_status, shutdown_reason=shutdown_reason, markets=markets, tokens=tokens, counters=counters, ws_writer=ws_writer, rest_writer=rest_writer, states=states, warnings=warnings, errors=errors) return manifest def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description="Long-running Polymarket BTC websocket raw recorder with REST checkpoints.") parser.add_argument("--config", type=Path, default=DEFAULT_CONFIG_PATH) parser.add_argument("--duration-seconds", type=float, default=None, help="Bounded run duration for smoke tests. Default is continuous runtime.") parser.add_argument("--market-limit", type=int, default=None, help="Limit active BTC markets for smoke tests. Use 0 for all.") parser.add_argument("--raw-output-root", type=Path, default=None) parser.add_argument("--manifest-dir", type=Path, default=None) parser.add_argument("--manifest-path", type=Path, default=None) parser.add_argument("--discovery-path", type=Path, default=None) parser.add_argument("--discovery-dir", type=Path, default=None) return parser.parse_args() def main() -> int: args = parse_args() config = build_config(args) command = " ".join([Path(sys.argv[0]).as_posix(), *sys.argv[1:]]) manifest = run_recorder(config, command) print(f"WS_RECORDER_MANIFEST={config['manifest_path']}") print(f"WS_RECORDER_RUN_ID={manifest['run_id']}") print(f"WS_RECORDER_GATE={manifest['gate_status']}") return 0 if manifest["gate_status"] == "PASS" else 1 if __name__ == "__main__": raise SystemExit(main())