orderbooks/scripts/collect_polymarket_ws_orderbooks.py
2026-04-19 19:17:56 +02:00

1157 lines
50 KiB
Python
Executable file

#!/usr/bin/env python3
"""Long-running raw Polymarket BTC websocket recorder with REST checkpoints.
Checkpoint 10D scope: public BTC up/down market data only. The recorder writes
raw websocket text exactly as received, keeps REST /books checkpoints as
recovery/comparison evidence, and records rotation/reconnect/stale/divergence
counters in manifests. It does not trade, sign, authenticate, or handle keys.
"""
from __future__ import annotations
import argparse
import base64
import datetime as dt
import gzip
import hashlib
import json
import os
import signal
import socket
import ssl
import struct
import subprocess
import sys
import time
import urllib.error
import urllib.parse
import urllib.request
from decimal import Decimal, InvalidOperation
from pathlib import Path
from typing import Any
COLLECTOR_NAME = "polymarket_ws_orderbook_recorder"
COLLECTOR_VERSION = "0.1.0"
WS_SCHEMA_NAME = "raw_polymarket_market_ws_message"
REST_SCHEMA_NAME = "raw_polymarket_books_checkpoint"
MANIFEST_SCHEMA_NAME = "polymarket_ws_recorder_manifest"
SCHEMA_VERSION = 1
DEFAULT_CONFIG_PATH = Path("config/polymarket_ws_collector.example.yaml")
DEFAULT_DISCOVERY_PATH = Path("data/discovery/polymarket_btc_markets_latest.json")
DEFAULT_DISCOVERY_DIR = Path("data/discovery")
DEFAULT_RAW_OUTPUT_ROOT = Path("/var/lib/orderbooks/raw_orderbooks")
DEFAULT_MANIFEST_DIR = Path("/var/lib/orderbooks/manifests")
DEFAULT_MANIFEST_PATH = Path("/var/lib/orderbooks/manifests/polymarket_ws_recorder_latest.json")
MARKET_WS_URL = "wss://ws-subscriptions-clob.polymarket.com/ws/market"
CLOB_BOOKS_URL = "https://clob.polymarket.com/books"
DISCOVERY_SCRIPT = Path("scripts/discover_polymarket_btc_markets.py")
SAFE_RESPONSE_HEADERS = {
"cache-control",
"cf-cache-status",
"cf-ray",
"content-length",
"content-type",
"date",
"retry-after",
"server",
"x-ratelimit-limit",
"x-ratelimit-remaining",
"x-ratelimit-reset",
"ratelimit-limit",
"ratelimit-remaining",
"ratelimit-reset",
}
STOP_REQUESTED = False
STOP_SIGNAL: str | None = None
def handle_stop(signum: int, _frame: Any) -> None:
global STOP_REQUESTED, STOP_SIGNAL
STOP_REQUESTED = True
STOP_SIGNAL = signal.Signals(signum).name
def utc_now() -> dt.datetime:
return dt.datetime.now(dt.UTC)
def iso_z(value: dt.datetime | None = None) -> str:
value = value or utc_now()
return value.astimezone(dt.UTC).replace(microsecond=0).isoformat().replace("+00:00", "Z")
def compact_timestamp(value: dt.datetime | None = None) -> str:
value = value or utc_now()
return value.astimezone(dt.UTC).strftime("%Y%m%dT%H%M%SZ")
def parse_iso(value: Any) -> dt.datetime | None:
if not isinstance(value, str) or not value.strip():
return None
text = value.strip()
if text.endswith("Z"):
text = text[:-1] + "+00:00"
try:
parsed = dt.datetime.fromisoformat(text)
except ValueError:
return None
if parsed.tzinfo is None:
parsed = parsed.replace(tzinfo=dt.UTC)
return parsed.astimezone(dt.UTC)
def sha256_bytes(data: bytes) -> str:
return hashlib.sha256(data).hexdigest()
def sha256_file(path: Path) -> str:
digest = hashlib.sha256()
with path.open("rb") as handle:
for chunk in iter(lambda: handle.read(1024 * 1024), b""):
digest.update(chunk)
return digest.hexdigest()
def parse_scalar(value: str) -> Any:
value = value.strip()
if not value:
return ""
if value[0] in {"'", '"'} and value[-1:] == value[0]:
return value[1:-1]
lower = value.lower()
if lower in {"true", "false"}:
return lower == "true"
if lower in {"null", "none"}:
return None
try:
return int(value)
except ValueError:
pass
try:
return float(value)
except ValueError:
return value
def load_flat_yaml(path: Path) -> dict[str, Any]:
config: dict[str, Any] = {}
if not path.exists():
return config
for line_number, raw_line in enumerate(path.read_text(encoding="utf-8").splitlines(), 1):
line = raw_line.split("#", 1)[0].strip()
if not line:
continue
if ":" not in line:
raise ValueError(f"Unsupported config line {line_number}: {raw_line}")
key, value = line.split(":", 1)
key = key.strip()
if not key:
raise ValueError(f"Missing config key on line {line_number}")
config[key] = parse_scalar(value)
return config
def config_value(config: dict[str, Any], args: argparse.Namespace, key: str, default: Any) -> Any:
value = getattr(args, key, None)
if value is not None:
return value
env_key = "ORDERBOOKS_WS_" + key.upper()
if env_key in os.environ:
return parse_scalar(os.environ[env_key])
return config.get(key, default)
def filter_headers(headers: Any) -> dict[str, str]:
safe: dict[str, str] = {}
for key, value in dict(headers).items():
if key.lower() in SAFE_RESPONSE_HEADERS:
safe[key] = value
return safe
def dec(value: Any) -> Decimal:
if value is None:
return Decimal("0")
try:
return Decimal(str(value))
except InvalidOperation:
return Decimal("0")
def dec_to_str(value: Decimal) -> str:
text = format(value, "f")
if "." in text:
text = text.rstrip("0").rstrip(".")
return text or "0"
def level_map(levels: Any) -> dict[str, Decimal]:
result: dict[str, Decimal] = {}
if not isinstance(levels, list):
return result
for item in levels:
if not isinstance(item, dict):
continue
price = dec_to_str(dec(item.get("price")))
result[price] = dec(item.get("size"))
return result
def top_levels(book: dict[str, Decimal], side: str, top_n: int) -> list[tuple[str, Decimal]]:
return sorted(book.items(), key=lambda item: Decimal(item[0]), reverse=(side == "bids"))[:top_n]
def summarize_book(bids: dict[str, Decimal], asks: dict[str, Decimal], top_n: int) -> dict[str, Any]:
bid_levels = top_levels(bids, "bids", top_n)
ask_levels = top_levels(asks, "asks", top_n)
best_bid = bid_levels[0][0] if bid_levels else None
best_ask = ask_levels[0][0] if ask_levels else None
spread = dec_to_str(Decimal(best_ask) - Decimal(best_bid)) if best_bid and best_ask else None
return {
"best_bid": best_bid,
"best_ask": best_ask,
"spread": spread,
"bid_level_count": len(bids),
"ask_level_count": len(asks),
"top_bids": [{"price": price, "size": dec_to_str(size)} for price, size in bid_levels],
"top_asks": [{"price": price, "size": dec_to_str(size)} for price, size in ask_levels],
}
def compare_side(local: dict[str, Decimal], rest: dict[str, Decimal], side: str, top_n: int) -> dict[str, Any]:
local_top = dict(top_levels(local, side, top_n))
rest_top = dict(top_levels(rest, side, top_n))
missing = sorted(set(rest_top) - set(local_top), key=Decimal, reverse=(side == "bids"))
extra = sorted(set(local_top) - set(rest_top), key=Decimal, reverse=(side == "bids"))
size_deltas = []
for price in sorted(set(local_top) & set(rest_top), key=Decimal, reverse=(side == "bids")):
delta = local_top[price] - rest_top[price]
if delta != 0:
size_deltas.append({
"price": price,
"local_size": dec_to_str(local_top[price]),
"rest_size": dec_to_str(rest_top[price]),
"delta": dec_to_str(delta),
})
return {"missing_prices": missing, "extra_prices": extra, "size_deltas": size_deltas}
class BookState:
def __init__(self, token_meta: dict[str, Any]) -> None:
self.token_meta = token_meta
self.bids: dict[str, Decimal] = {}
self.asks: dict[str, Decimal] = {}
self.initialized = False
self.messages_applied = 0
self.messages_skipped = 0
self.unknown_messages = 0
self.last_update_received_at_utc: str | None = None
def apply_book(self, item: dict[str, Any], received_at_utc: str) -> None:
self.bids = level_map(item.get("bids"))
self.asks = level_map(item.get("asks"))
self.initialized = True
self.messages_applied += 1
self.last_update_received_at_utc = received_at_utc
def apply_change(self, change: dict[str, Any], received_at_utc: str) -> str | None:
if not self.initialized:
self.messages_skipped += 1
return "price_change_before_book_snapshot"
side = str(change.get("side") or "").upper()
price = dec_to_str(dec(change.get("price")))
size = dec(change.get("size"))
if side == "BUY":
book_side = self.bids
elif side == "SELL":
book_side = self.asks
else:
self.messages_skipped += 1
return f"unsupported_price_change_side:{side}"
if size == 0:
book_side.pop(price, None)
else:
book_side[price] = size
self.messages_applied += 1
self.last_update_received_at_utc = received_at_utc
return None
def summary(self, top_n: int) -> dict[str, Any]:
state_quality = "insufficient_events"
if self.initialized and self.messages_applied > 1:
state_quality = "initialized_and_updated"
elif self.initialized:
state_quality = "snapshot_only"
return {
"token": self.token_meta,
"initialized": self.initialized,
"state_quality": state_quality,
"messages_applied": self.messages_applied,
"messages_skipped": self.messages_skipped,
"unknown_messages": self.unknown_messages,
"last_update_received_at_utc": self.last_update_received_at_utc,
**summarize_book(self.bids, self.asks, top_n),
}
class ArchiveWriter:
def __init__(self, *, root: Path, subdir: str, prefix: str, run_id: str) -> None:
self.root = root
self.subdir = subdir
self.prefix = prefix
self.run_id = run_id
self.current_hour: str | None = None
self.temp_path: Path | None = None
self.final_path: Path | None = None
self.handle: gzip.GzipFile | None = None
self.rows = 0
self.started_at_utc: str | None = None
self.closed_files: list[dict[str, Any]] = []
def _paths_for(self, when: dt.datetime) -> tuple[str, Path, Path]:
hour = when.astimezone(dt.UTC).strftime("%Y/%m/%d/%H")
hour_id = when.astimezone(dt.UTC).strftime("%Y%m%dT%H0000Z")
directory = self.root / self.subdir / hour
final_path = directory / f"{self.prefix}_{self.run_id}_{hour_id}.jsonl.gz"
temp_path = directory / f".{final_path.name}.open"
return hour, temp_path, final_path
def ensure_open(self, when: dt.datetime | None = None) -> None:
when = when or utc_now()
hour, temp_path, final_path = self._paths_for(when)
if self.handle is not None and self.current_hour == hour:
return
self.close(ended_at=when)
temp_path.parent.mkdir(parents=True, exist_ok=True)
self.current_hour = hour
self.temp_path = temp_path
self.final_path = final_path
self.rows = 0
self.started_at_utc = iso_z(when)
self.handle = gzip.open(temp_path, "at", encoding="utf-8")
def write(self, row: dict[str, Any], when: dt.datetime | None = None) -> None:
self.ensure_open(when)
assert self.handle is not None
self.handle.write(json.dumps(row, separators=(",", ":"), sort_keys=True) + "\n")
self.rows += 1
if self.rows % 100 == 0:
self.handle.flush()
def close(self, ended_at: dt.datetime | None = None) -> None:
if self.handle is None:
return
ended_at = ended_at or utc_now()
self.handle.close()
assert self.temp_path is not None and self.final_path is not None
self.final_path.parent.mkdir(parents=True, exist_ok=True)
if self.final_path.exists():
self.final_path.unlink()
self.temp_path.rename(self.final_path)
self.closed_files.append({
"path": self.final_path.as_posix(),
"kind": self.prefix,
"started_at_utc": self.started_at_utc,
"ended_at_utc": iso_z(ended_at),
"rows": self.rows,
"bytes": self.final_path.stat().st_size,
"sha256": sha256_file(self.final_path),
"status": "valid" if self.rows > 0 else "empty",
})
self.current_hour = None
self.temp_path = None
self.final_path = None
self.handle = None
self.rows = 0
self.started_at_utc = None
def send_ws_frame(sock: ssl.SSLSocket, opcode: int, payload: bytes) -> None:
mask = os.urandom(4)
header = bytearray([0x80 | opcode])
length = len(payload)
if length < 126:
header.append(0x80 | length)
elif length < 65536:
header.append(0x80 | 126)
header.extend(struct.pack("!H", length))
else:
header.append(0x80 | 127)
header.extend(struct.pack("!Q", length))
masked = bytes(byte ^ mask[index % 4] for index, byte in enumerate(payload))
sock.sendall(header + mask + masked)
def read_exact(sock: ssl.SSLSocket, length: int) -> bytes:
data = bytearray()
while len(data) < length:
chunk = sock.recv(length - len(data))
if not chunk:
raise EOFError("websocket connection closed while reading frame")
data.extend(chunk)
return bytes(data)
def read_ws_frame(sock: ssl.SSLSocket) -> tuple[int, bytes]:
first, second = read_exact(sock, 2)
opcode = first & 0x0F
length = second & 0x7F
masked = bool(second & 0x80)
if length == 126:
length = struct.unpack("!H", read_exact(sock, 2))[0]
elif length == 127:
length = struct.unpack("!Q", read_exact(sock, 8))[0]
mask = read_exact(sock, 4) if masked else b""
payload = read_exact(sock, length) if length else b""
if masked:
payload = bytes(byte ^ mask[index % 4] for index, byte in enumerate(payload))
return opcode, payload
def parse_ws_headers(raw_headers: str) -> tuple[str, dict[str, str]]:
lines = raw_headers.split("\r\n")
status_line = lines[0] if lines else ""
headers: dict[str, str] = {}
for line in lines[1:]:
if ":" not in line:
continue
key, value = line.split(":", 1)
headers[key.strip()] = value.strip()
return status_line, filter_headers(headers)
def open_websocket(url: str, timeout_seconds: float) -> tuple[ssl.SSLSocket, dict[str, Any]]:
parsed = urllib.parse.urlparse(url)
host = parsed.hostname
if not host:
raise ValueError("missing websocket host")
port = parsed.port or 443
path = parsed.path or "/"
if parsed.query:
path = f"{path}?{parsed.query}"
raw_sock = socket.create_connection((host, port), timeout=timeout_seconds)
sock = ssl.create_default_context().wrap_socket(raw_sock, server_hostname=host)
sock.settimeout(timeout_seconds)
key = base64.b64encode(os.urandom(16)).decode("ascii")
request = (
f"GET {path} HTTP/1.1\r\n"
f"Host: {host}\r\n"
"Upgrade: websocket\r\n"
"Connection: Upgrade\r\n"
f"Sec-WebSocket-Key: {key}\r\n"
"Sec-WebSocket-Version: 13\r\n"
f"User-Agent: orderbooks-polymarket-ws-recorder/{COLLECTOR_VERSION}\r\n"
"\r\n"
)
sock.sendall(request.encode("ascii"))
raw_headers = bytearray()
while b"\r\n\r\n" not in raw_headers:
raw_headers.extend(sock.recv(4096))
if len(raw_headers) > 65536:
raise ValueError("websocket handshake headers exceeded 64 KiB")
header_text = bytes(raw_headers).split(b"\r\n\r\n", 1)[0].decode("iso-8859-1", errors="replace")
status_line, response_headers = parse_ws_headers(header_text)
if " 101 " not in status_line:
raise ValueError(f"websocket upgrade failed: {status_line}")
return sock, {"status_line": status_line, "headers": response_headers}
def decode_json_maybe(text: str) -> tuple[Any | None, str | None]:
try:
return json.loads(text), None
except json.JSONDecodeError as exc:
return None, str(exc)
def classify_payload(payload: Any) -> list[str]:
items = payload if isinstance(payload, list) else [payload]
event_types: list[str] = []
for item in items:
if not isinstance(item, dict):
event_types.append(type(item).__name__)
elif item.get("event_type"):
event_types.append(str(item["event_type"]))
elif {"asset_id", "bids", "asks"}.issubset(item.keys()):
event_types.append("book")
else:
event_types.append("unknown_object")
return event_types
def raw_items(payload: Any) -> list[dict[str, Any]]:
items = payload if isinstance(payload, list) else [payload]
return [item for item in items if isinstance(item, dict)]
def load_discovery(path: Path) -> dict[str, Any]:
return json.loads(path.read_text(encoding="utf-8"))
def market_is_usable(market: dict[str, Any], now: dt.datetime, safety_seconds: int) -> tuple[bool, list[str]]:
reasons: list[str] = []
if market.get("active") is not True:
reasons.append("not_active")
if market.get("closed") is not False:
reasons.append("closed")
if market.get("accepting_orders") is not True:
reasons.append("not_accepting_orders")
if market.get("enable_order_book") is not True:
reasons.append("order_book_not_enabled")
end_time = parse_iso(market.get("end_time_utc"))
if end_time is None:
reasons.append("missing_end_time")
elif end_time <= now + dt.timedelta(seconds=safety_seconds):
reasons.append("too_close_to_end_or_expired")
tokens = market.get("tokens")
if not isinstance(tokens, list) or len(tokens) < 2:
reasons.append("missing_two_tokens")
else:
outcomes = [token.get("outcome") for token in tokens if isinstance(token, dict)]
token_ids = [token.get("token_id") for token in tokens if isinstance(token, dict)]
if outcomes[:2] != ["Up", "Down"] or not all(token_ids[:2]):
reasons.append("bad_up_down_token_mapping")
return not reasons, reasons
def select_markets(discovery: dict[str, Any], market_limit: int, market_end_safety_seconds: int) -> tuple[list[dict[str, Any]], dict[str, int]]:
now = utc_now()
selected: list[dict[str, Any]] = []
rejection_counts: dict[str, int] = {}
for market in discovery.get("normalized_markets") or []:
if not isinstance(market, dict):
rejection_counts["not_object"] = rejection_counts.get("not_object", 0) + 1
continue
usable, reasons = market_is_usable(market, now, market_end_safety_seconds)
if not usable:
for reason in reasons:
rejection_counts[reason] = rejection_counts.get(reason, 0) + 1
continue
selected.append(market)
if market_limit > 0 and len(selected) >= market_limit:
break
return selected, dict(sorted(rejection_counts.items()))
def flatten_tokens(markets: list[dict[str, Any]]) -> list[dict[str, Any]]:
tokens: list[dict[str, Any]] = []
for market in markets:
for token in market.get("tokens", [])[:2]:
if not isinstance(token, dict):
continue
tokens.append({
"market_name": market.get("market_name"),
"market_slug": market.get("market_slug"),
"condition_id": market.get("condition_id"),
"token_id": str(token.get("token_id")),
"outcome": token.get("outcome"),
"outcome_index": token.get("outcome_index"),
"market_end_time_utc": market.get("end_time_utc"),
})
return tokens
def run_discovery(config: dict[str, Any], counters: dict[str, Any], warnings: list[str], errors: list[dict[str, Any]]) -> None:
script_path = Path(config["discovery_script_path"])
if not config["discovery_execute"]:
return
if not script_path.exists():
warnings.append(f"discovery script missing; using existing artifact only: {script_path}")
return
output_json = Path(config["discovery_path"])
output_json.parent.mkdir(parents=True, exist_ok=True)
cmd = [
sys.executable,
script_path.as_posix(),
"--output-json", output_json.as_posix(),
"--manifest", str(Path(config["discovery_dir"]) / "polymarket_btc_markets_manifest.json"),
"--markdown", str(Path(config["discovery_dir"]) / "polymarket_btc_markets.md"),
"--max-pages", str(config["discovery_max_pages"]),
"--limit", str(config["discovery_page_limit"]),
"--timeout", str(config["request_timeout_seconds"]),
]
started = time.monotonic()
result = subprocess.run(cmd, text=True, capture_output=True, timeout=max(30, int(config["request_timeout_seconds"] * (config["discovery_max_pages"] + 1))))
counters["discovery_refresh_count"] += 1
counters["last_discovery_duration_ms"] = round((time.monotonic() - started) * 1000, 3)
if result.returncode != 0:
counters["discovery_failure_count"] += 1
errors.append({
"stage": "discovery_refresh",
"returncode": result.returncode,
"stderr_tail": result.stderr[-2000:],
"stdout_tail": result.stdout[-1000:],
})
def refresh_market_state(config: dict[str, Any], counters: dict[str, Any], warnings: list[str], errors: list[dict[str, Any]]) -> tuple[list[dict[str, Any]], list[dict[str, Any]], dict[str, int]]:
run_discovery(config, counters, warnings, errors)
discovery_path = Path(config["discovery_path"])
discovery = load_discovery(discovery_path)
markets, rejection_counts = select_markets(discovery, int(config["market_limit"]), int(config["market_end_safety_seconds"]))
tokens = flatten_tokens(markets)
counters["markets_tracked"] = len(markets)
counters["tokens_tracked"] = len(tokens)
counters["last_discovery_path"] = discovery_path.as_posix()
counters["last_discovery_sha256"] = sha256_file(discovery_path) if discovery_path.exists() else None
return markets, tokens, rejection_counts
def apply_payload_to_books(payload: Any, states: dict[str, BookState], received_at_utc: str, counters: dict[str, Any], warnings: list[str]) -> None:
for item in raw_items(payload):
event_type = item.get("event_type") or ("book" if {"asset_id", "bids", "asks"}.issubset(item.keys()) else "unknown_object")
counters["event_type_counts"][event_type] = counters["event_type_counts"].get(event_type, 0) + 1
if event_type == "book":
state = states.get(str(item.get("asset_id") or ""))
if state:
state.apply_book(item, received_at_utc)
elif event_type == "price_change":
changes = item.get("price_changes")
if not isinstance(changes, list):
counters["parser_warning_count"] += 1
warnings.append("price_change event without price_changes list")
continue
for change in changes:
if not isinstance(change, dict):
continue
state = states.get(str(change.get("asset_id") or ""))
if not state:
continue
reason = state.apply_change(change, received_at_utc)
if reason:
counters["book_update_skip_counts"][reason] = counters["book_update_skip_counts"].get(reason, 0) + 1
elif event_type in {"best_bid_ask", "last_trade_price", "new_market"}:
counters["non_mutating_event_counts"][event_type] = counters["non_mutating_event_counts"].get(event_type, 0) + 1
else:
counters["unknown_event_counts"][event_type] = counters["unknown_event_counts"].get(event_type, 0) + 1
for state in states.values():
state.unknown_messages += 1
def compare_state_to_rest(state: BookState, rest_item: dict[str, Any], top_n: int) -> dict[str, Any]:
rest_bids = level_map(rest_item.get("bids"))
rest_asks = level_map(rest_item.get("asks"))
local_summary = summarize_book(state.bids, state.asks, top_n)
rest_summary = summarize_book(rest_bids, rest_asks, top_n)
bid_diff = compare_side(state.bids, rest_bids, "bids", top_n)
ask_diff = compare_side(state.asks, rest_asks, "asks", top_n)
best_bid_affected = local_summary["best_bid"] != rest_summary["best_bid"]
best_ask_affected = local_summary["best_ask"] != rest_summary["best_ask"]
spread_affected = local_summary["spread"] != rest_summary["spread"]
level_count_affected = local_summary["bid_level_count"] != rest_summary["bid_level_count"] or local_summary["ask_level_count"] != rest_summary["ask_level_count"]
price_membership_affected = bool(bid_diff["missing_prices"] or bid_diff["extra_prices"] or ask_diff["missing_prices"] or ask_diff["extra_prices"])
size_delta_count = len(bid_diff["size_deltas"]) + len(ask_diff["size_deltas"])
divergent = any([best_bid_affected, best_ask_affected, spread_affected, level_count_affected, price_membership_affected, size_delta_count])
return {
"comparison_status": "divergent" if divergent else "match",
"best_bid_affected": best_bid_affected,
"best_ask_affected": best_ask_affected,
"spread_affected": spread_affected,
"level_count_affected": level_count_affected,
"price_membership_affected": price_membership_affected,
"size_only_divergent": bool(size_delta_count) and not any([best_bid_affected, best_ask_affected, spread_affected, level_count_affected, price_membership_affected]),
"bid_size_delta_count": len(bid_diff["size_deltas"]),
"ask_size_delta_count": len(ask_diff["size_deltas"]),
}
def http_post_books(url: str, token_ids: list[str], timeout_seconds: float) -> dict[str, Any]:
requested_at_utc = iso_z()
started = time.monotonic()
request_body = [{"token_id": token_id} for token_id in token_ids]
body_bytes = json.dumps(request_body, separators=(",", ":")).encode("utf-8")
status_code: int | None = None
headers: dict[str, str] = {}
response_text = ""
error: str | None = None
try:
request = urllib.request.Request(
url,
data=body_bytes,
headers={
"Accept": "application/json",
"Content-Type": "application/json",
"User-Agent": f"orderbooks-polymarket-ws-recorder/{COLLECTOR_VERSION}",
},
method="POST",
)
with urllib.request.urlopen(request, timeout=timeout_seconds) as response:
status_code = response.status
headers = filter_headers(response.headers)
response_text = response.read().decode("utf-8", errors="replace")
except urllib.error.HTTPError as exc:
status_code = exc.code
headers = filter_headers(exc.headers)
response_text = exc.read().decode("utf-8", errors="replace")
error = f"HTTPError: {exc}"
except Exception as exc: # noqa: BLE001 - preserve failure evidence
error = f"{type(exc).__name__}: {exc}"
parsed_json, json_error = decode_json_maybe(response_text) if response_text else (None, None)
return {
"requested_at_utc": requested_at_utc,
"received_at_utc": iso_z(),
"duration_ms": round((time.monotonic() - started) * 1000, 3),
"request_body": request_body,
"status_code": status_code,
"headers": headers,
"raw_response_json": parsed_json,
"json_error": json_error,
"raw_response_text_sha256": sha256_bytes(response_text.encode("utf-8")),
"raw_response_length_bytes": len(response_text.encode("utf-8")),
"raw_response_text_preview": response_text[:1000] if parsed_json is None else None,
"error": error,
"ok": error is None and status_code is not None and 200 <= status_code < 300 and json_error is None,
}
def fetch_rest_checkpoint(
*,
config: dict[str, Any],
rest_writer: ArchiveWriter,
checkpoint_sequence: int,
tokens: list[dict[str, Any]],
states: dict[str, BookState],
counters: dict[str, Any],
) -> None:
token_ids = [token["token_id"] for token in tokens]
batch_size = max(1, int(config["rest_batch_size"]))
for batch_index, start in enumerate(range(0, len(token_ids), batch_size), 1):
batch = token_ids[start:start + batch_size]
response = http_post_books(config["clob_books_url"], batch, float(config["request_timeout_seconds"]))
counters["rest_request_count"] += 1
if response["ok"]:
counters["rest_success_count"] += 1
else:
counters["rest_failure_count"] += 1
if response.get("status_code") == 429:
counters["rest_rate_limit_count"] += 1
comparison = {
"match_count": 0,
"divergent_count": 0,
"no_state_count": 0,
"best_bid_affected_count": 0,
"best_ask_affected_count": 0,
"spread_affected_count": 0,
"level_count_affected_count": 0,
"price_membership_affected_count": 0,
"size_only_divergent_count": 0,
"bid_size_delta_count": 0,
"ask_size_delta_count": 0,
}
payload = response.get("raw_response_json")
if isinstance(payload, list):
for rest_item in payload:
if not isinstance(rest_item, dict):
continue
token_id = str(rest_item.get("asset_id") or "")
state = states.get(token_id)
if not state or not state.initialized:
comparison["no_state_count"] += 1
continue
cmp = compare_state_to_rest(state, rest_item, int(config["top_n"]))
if cmp["comparison_status"] == "match":
comparison["match_count"] += 1
else:
comparison["divergent_count"] += 1
for key in ["best_bid_affected", "best_ask_affected", "spread_affected", "level_count_affected", "price_membership_affected", "size_only_divergent"]:
if cmp[key]:
comparison[f"{key}_count"] += 1
comparison["bid_size_delta_count"] += cmp["bid_size_delta_count"]
comparison["ask_size_delta_count"] += cmp["ask_size_delta_count"]
for key, value in comparison.items():
counters["rest_comparison_counts"][key] = counters["rest_comparison_counts"].get(key, 0) + value
rest_writer.write({
"schema_name": REST_SCHEMA_NAME,
"schema_version": SCHEMA_VERSION,
"collector": {"name": COLLECTOR_NAME, "version": COLLECTOR_VERSION},
"checkpoint_sequence": checkpoint_sequence,
"batch_index": batch_index,
"batch_count": (len(token_ids) + batch_size - 1) // batch_size,
"token_ids": batch,
"tokens_tracked_count": len(token_ids),
"response": response,
"comparison_summary": comparison,
})
def build_ws_envelope(
*,
run_id: str,
session_id: str,
connection_sequence: int,
message_sequence: int,
global_message_sequence: int,
received_at_utc: str,
websocket_url: str,
subscription: dict[str, Any],
tokens: list[dict[str, Any]],
opcode: int,
payload_bytes: bytes,
) -> tuple[dict[str, Any], Any | None, list[str], bool]:
decode_error = None
try:
raw_text = payload_bytes.decode("utf-8")
except UnicodeDecodeError as exc:
decode_error = str(exc)
raw_text = payload_bytes.decode("utf-8", errors="replace")
parsed_json, json_error = decode_json_maybe(raw_text) if decode_error is None else (None, decode_error)
event_types = classify_payload(parsed_json) if parsed_json is not None else ["unparseable_text"]
envelope = {
"schema_name": WS_SCHEMA_NAME,
"schema_version": SCHEMA_VERSION,
"collector": {"name": COLLECTOR_NAME, "version": COLLECTOR_VERSION},
"run_id": run_id,
"session_id": session_id,
"connection_sequence": connection_sequence,
"message_sequence": message_sequence,
"global_message_sequence": global_message_sequence,
"received_at_utc": received_at_utc,
"websocket": {"url": websocket_url},
"subscription": subscription,
"tokens_tracked": tokens,
"opcode": opcode,
"payload_length_bytes": len(payload_bytes),
"payload_sha256": sha256_bytes(payload_bytes),
"raw_text": raw_text,
"json": parsed_json,
"json_error": json_error,
"classified_event_types": event_types,
}
return envelope, parsed_json, event_types, parsed_json is not None
def initial_counters() -> dict[str, Any]:
return {
"websocket_message_count": 0,
"websocket_parsed_json_count": 0,
"websocket_parse_error_count": 0,
"websocket_opcode_counts": {},
"event_type_counts": {},
"non_mutating_event_counts": {},
"unknown_event_counts": {},
"book_update_skip_counts": {},
"parser_warning_count": 0,
"connection_count": 0,
"reconnect_count": 0,
"subscription_change_count": 0,
"stale_feed_count": 0,
"max_gap_seconds": None,
"last_message_received_at_utc": None,
"rest_request_count": 0,
"rest_success_count": 0,
"rest_failure_count": 0,
"rest_rate_limit_count": 0,
"rest_comparison_counts": {},
"discovery_refresh_count": 0,
"discovery_failure_count": 0,
"last_discovery_duration_ms": None,
"markets_tracked": 0,
"tokens_tracked": 0,
"last_discovery_path": None,
"last_discovery_sha256": None,
}
def summarize_states(states: dict[str, BookState], top_n: int) -> list[dict[str, Any]]:
return [state.summary(top_n) for state in states.values()]
def write_manifest(
*,
config: dict[str, Any],
run_id: str,
started_at_utc: str,
status: str,
gate_status: str,
shutdown_reason: str | None,
markets: list[dict[str, Any]],
tokens: list[dict[str, Any]],
counters: dict[str, Any],
ws_writer: ArchiveWriter,
rest_writer: ArchiveWriter,
states: dict[str, BookState],
warnings: list[str],
errors: list[dict[str, Any]],
) -> dict[str, Any]:
manifest = {
"schema_name": MANIFEST_SCHEMA_NAME,
"schema_version": 1,
"collector": {"name": COLLECTOR_NAME, "version": COLLECTOR_VERSION},
"run_id": run_id,
"started_at_utc": started_at_utc,
"updated_at_utc": iso_z(),
"status": status,
"gate_status": gate_status,
"shutdown_reason": shutdown_reason,
"production_ready": False,
"public_data_only": True,
"trading_enabled": False,
"command": config.get("command"),
"config": public_config(config),
"markets_tracked": markets,
"tokens_tracked": tokens,
"counters": counters,
"state_summary": summarize_states(states, int(config["top_n"])),
"output_files": [*ws_writer.closed_files, *rest_writer.closed_files],
"open_files": [path.as_posix() for path in [ws_writer.temp_path, rest_writer.temp_path] if path is not None],
"warnings": sorted(set(warnings)),
"errors": errors[-20:],
}
manifest_dir = Path(config["manifest_dir"])
manifest_dir.mkdir(parents=True, exist_ok=True)
manifest_path = Path(config["manifest_path"])
manifest_path.parent.mkdir(parents=True, exist_ok=True)
immutable_path = manifest_dir / f"polymarket_ws_recorder_{run_id}_{compact_timestamp()}.json"
text = json.dumps(manifest, indent=2, sort_keys=True) + "\n"
manifest_path.write_text(text, encoding="utf-8")
immutable_path.write_text(text, encoding="utf-8")
return manifest
def public_config(config: dict[str, Any]) -> dict[str, Any]:
result = {}
for key, value in config.items():
if isinstance(value, Path):
result[key] = value.as_posix()
else:
result[key] = value
return result
def build_config(args: argparse.Namespace) -> dict[str, Any]:
file_config = load_flat_yaml(args.config) if args.config else {}
duration = config_value(file_config, args, "duration_seconds", None)
config = {
"config_path": args.config.as_posix() if args.config else None,
"config_sha256": sha256_file(args.config) if args.config and args.config.exists() else None,
"discovery_path": Path(config_value(file_config, args, "discovery_path", DEFAULT_DISCOVERY_PATH)),
"discovery_dir": Path(config_value(file_config, args, "discovery_dir", DEFAULT_DISCOVERY_DIR)),
"discovery_script_path": Path(config_value(file_config, args, "discovery_script_path", DISCOVERY_SCRIPT)),
"discovery_execute": bool(config_value(file_config, args, "discovery_execute", True)),
"discovery_refresh_interval_seconds": float(config_value(file_config, args, "discovery_refresh_interval_seconds", 600)),
"discovery_max_pages": int(config_value(file_config, args, "discovery_max_pages", 3)),
"discovery_page_limit": int(config_value(file_config, args, "discovery_page_limit", 100)),
"raw_output_root": Path(config_value(file_config, args, "raw_output_root", DEFAULT_RAW_OUTPUT_ROOT)),
"manifest_dir": Path(config_value(file_config, args, "manifest_dir", DEFAULT_MANIFEST_DIR)),
"manifest_path": Path(config_value(file_config, args, "manifest_path", DEFAULT_MANIFEST_PATH)),
"websocket_url": str(config_value(file_config, args, "websocket_url", MARKET_WS_URL)),
"clob_books_url": str(config_value(file_config, args, "clob_books_url", CLOB_BOOKS_URL)),
"market_limit": int(config_value(file_config, args, "market_limit", 0) or 0),
"market_end_safety_seconds": int(config_value(file_config, args, "market_end_safety_seconds", 420)),
"rest_checkpoint_interval_seconds": float(config_value(file_config, args, "rest_checkpoint_interval_seconds", 60)),
"rest_batch_size": int(config_value(file_config, args, "rest_batch_size", 50)),
"top_n": int(config_value(file_config, args, "top_n", 10)),
"stale_feed_threshold_seconds": float(config_value(file_config, args, "stale_feed_threshold_seconds", 30)),
"request_timeout_seconds": float(config_value(file_config, args, "request_timeout_seconds", 15)),
"websocket_timeout_seconds": float(config_value(file_config, args, "websocket_timeout_seconds", 10)),
"reconnect_backoff_seconds": float(config_value(file_config, args, "reconnect_backoff_seconds", 3)),
"max_reconnect_backoff_seconds": float(config_value(file_config, args, "max_reconnect_backoff_seconds", 60)),
"duration_seconds": float(duration) if duration is not None else None,
"manifest_write_interval_seconds": float(config_value(file_config, args, "manifest_write_interval_seconds", 300)),
}
if config["rest_batch_size"] < 1:
raise ValueError("rest_batch_size must be >= 1")
if config["market_limit"] < 0:
raise ValueError("market_limit must be >= 0; use 0 for all active BTC markets")
return config
def run_recorder(config: dict[str, Any], command: str) -> dict[str, Any]:
signal.signal(signal.SIGINT, handle_stop)
signal.signal(signal.SIGTERM, handle_stop)
started = utc_now()
started_at_utc = iso_z(started)
run_id = compact_timestamp(started)
deadline = time.monotonic() + float(config["duration_seconds"]) if config["duration_seconds"] else None
counters = initial_counters()
warnings: list[str] = []
errors: list[dict[str, Any]] = []
markets: list[dict[str, Any]] = []
tokens: list[dict[str, Any]] = []
states: dict[str, BookState] = {}
rejection_counts: dict[str, int] = {}
ws_writer = ArchiveWriter(root=Path(config["raw_output_root"]), subdir="polymarket/ws_raw", prefix="polymarket_ws_raw", run_id=run_id)
rest_writer = ArchiveWriter(root=Path(config["raw_output_root"]), subdir="polymarket/rest_checkpoints", prefix="polymarket_rest_checkpoints", run_id=run_id)
next_discovery = 0.0
next_rest_checkpoint = 0.0
next_manifest_write = time.monotonic() + float(config["manifest_write_interval_seconds"])
checkpoint_sequence = 0
global_sequence = 0
connection_sequence = 0
reconnect_backoff = float(config["reconnect_backoff_seconds"])
last_text_message_monotonic: float | None = None
shutdown_reason: str | None = None
try:
while not STOP_REQUESTED:
if deadline is not None and time.monotonic() >= deadline:
shutdown_reason = "duration_elapsed"
break
if time.monotonic() >= next_discovery or not tokens:
old_token_ids = [token["token_id"] for token in tokens]
try:
markets, tokens, rejection_counts = refresh_market_state(config, counters, warnings, errors)
except Exception as exc: # noqa: BLE001 - preserve evidence and retry
counters["discovery_failure_count"] += 1
errors.append({"stage": "load_discovery", "error": f"{type(exc).__name__}: {exc}"})
time.sleep(min(reconnect_backoff, 30))
next_discovery = time.monotonic() + float(config["discovery_refresh_interval_seconds"])
continue
new_token_ids = [token["token_id"] for token in tokens]
if old_token_ids and old_token_ids != new_token_ids:
counters["subscription_change_count"] += 1
states = {token["token_id"]: states.get(token["token_id"], BookState(token)) for token in tokens}
next_discovery = time.monotonic() + float(config["discovery_refresh_interval_seconds"])
if not tokens:
warnings.append("no active BTC Up/Down tokens available after discovery")
time.sleep(min(reconnect_backoff, 30))
continue
token_ids = [token["token_id"] for token in tokens]
subscription = {"assets_ids": token_ids, "type": "market", "custom_feature_enabled": True}
connection_sequence += 1
session_id = f"{run_id}-ws{connection_sequence}"
sock: ssl.SSLSocket | None = None
try:
sock, handshake = open_websocket(str(config["websocket_url"]), float(config["websocket_timeout_seconds"]))
counters["connection_count"] += 1
send_ws_frame(sock, 1, json.dumps(subscription, separators=(",", ":")).encode("utf-8"))
session_message_sequence = 0
session_started_at_utc = iso_z()
reconnect_backoff = float(config["reconnect_backoff_seconds"])
while not STOP_REQUESTED:
now_monotonic = time.monotonic()
if deadline is not None and now_monotonic >= deadline:
shutdown_reason = "duration_elapsed"
break
if now_monotonic >= next_rest_checkpoint:
checkpoint_sequence += 1
fetch_rest_checkpoint(config=config, rest_writer=rest_writer, checkpoint_sequence=checkpoint_sequence, tokens=tokens, states=states, counters=counters)
next_rest_checkpoint = now_monotonic + float(config["rest_checkpoint_interval_seconds"])
if now_monotonic >= next_manifest_write:
write_manifest(config=config, run_id=run_id, started_at_utc=started_at_utc, status="RUNNING", gate_status="IN_PROGRESS", shutdown_reason=None, markets=markets, tokens=tokens, counters=counters, ws_writer=ws_writer, rest_writer=rest_writer, states=states, warnings=warnings, errors=errors)
next_manifest_write = now_monotonic + float(config["manifest_write_interval_seconds"])
if now_monotonic >= next_discovery:
previous_token_ids = token_ids
markets, tokens, rejection_counts = refresh_market_state(config, counters, warnings, errors)
token_ids = [token["token_id"] for token in tokens]
next_discovery = now_monotonic + float(config["discovery_refresh_interval_seconds"])
if token_ids != previous_token_ids:
counters["subscription_change_count"] += 1
states = {token["token_id"]: states.get(token["token_id"], BookState(token)) for token in tokens}
raise RuntimeError("tracked token set changed; reconnecting with new subscription")
if last_text_message_monotonic is not None:
silence = now_monotonic - last_text_message_monotonic
if silence > float(config["stale_feed_threshold_seconds"]):
counters["stale_feed_count"] += 1
raise TimeoutError(f"stale websocket feed for {silence:.3f}s")
try:
opcode, payload = read_ws_frame(sock)
except socket.timeout:
continue
counters["websocket_opcode_counts"][str(opcode)] = counters["websocket_opcode_counts"].get(str(opcode), 0) + 1
if opcode == 1:
received_at_utc = iso_z()
now_for_gap = time.monotonic()
if last_text_message_monotonic is not None:
gap = round(now_for_gap - last_text_message_monotonic, 3)
previous = counters.get("max_gap_seconds")
counters["max_gap_seconds"] = gap if previous is None else max(previous, gap)
last_text_message_monotonic = now_for_gap
counters["last_message_received_at_utc"] = received_at_utc
session_message_sequence += 1
global_sequence += 1
envelope, parsed_json, event_types, parse_ok = build_ws_envelope(
run_id=run_id,
session_id=session_id,
connection_sequence=connection_sequence,
message_sequence=session_message_sequence,
global_message_sequence=global_sequence,
received_at_utc=received_at_utc,
websocket_url=str(config["websocket_url"]),
subscription=subscription,
tokens=tokens,
opcode=opcode,
payload_bytes=payload,
)
ws_writer.write(envelope)
counters["websocket_message_count"] += 1
if parse_ok:
counters["websocket_parsed_json_count"] += 1
apply_payload_to_books(parsed_json, states, received_at_utc, counters, warnings)
else:
counters["websocket_parse_error_count"] += 1
elif opcode == 8:
raise EOFError("websocket close frame received")
elif opcode == 9:
send_ws_frame(sock, 10, payload)
elif opcode == 10:
continue
else:
warnings.append(f"ignored websocket opcode {opcode}")
if shutdown_reason:
break
except Exception as exc: # noqa: BLE001 - preserve reconnect evidence
errors.append({"stage": "websocket_session", "connection_sequence": connection_sequence, "error": f"{type(exc).__name__}: {exc}"})
counters["reconnect_count"] += 1
if STOP_REQUESTED:
shutdown_reason = STOP_SIGNAL or "stop_requested"
break
time.sleep(min(reconnect_backoff, float(config["max_reconnect_backoff_seconds"])))
reconnect_backoff = min(reconnect_backoff * 2, float(config["max_reconnect_backoff_seconds"]))
finally:
if sock is not None:
try:
sock.close()
except OSError:
pass
if shutdown_reason:
break
finally:
if STOP_REQUESTED and shutdown_reason is None:
shutdown_reason = STOP_SIGNAL or "stop_requested"
ws_writer.close()
rest_writer.close()
if shutdown_reason is None:
shutdown_reason = "loop_exited"
if counters["websocket_message_count"] > 0 and counters["rest_success_count"] > 0 and ws_writer.closed_files and rest_writer.closed_files:
gate_status = "PASS"
else:
gate_status = "BLOCKED_RUNTIME_EVIDENCE"
status = "INTERRUPTED" if STOP_SIGNAL else ("COMPLETED_BOUNDED" if config["duration_seconds"] else "STOPPED")
config["command"] = command
manifest = write_manifest(config=config, run_id=run_id, started_at_utc=started_at_utc, status=status, gate_status=gate_status, shutdown_reason=shutdown_reason, markets=markets, tokens=tokens, counters=counters, ws_writer=ws_writer, rest_writer=rest_writer, states=states, warnings=warnings, errors=errors)
return manifest
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Long-running Polymarket BTC websocket raw recorder with REST checkpoints.")
parser.add_argument("--config", type=Path, default=DEFAULT_CONFIG_PATH)
parser.add_argument("--duration-seconds", type=float, default=None, help="Bounded run duration for smoke tests. Default is continuous runtime.")
parser.add_argument("--market-limit", type=int, default=None, help="Limit active BTC markets for smoke tests. Use 0 for all.")
parser.add_argument("--raw-output-root", type=Path, default=None)
parser.add_argument("--manifest-dir", type=Path, default=None)
parser.add_argument("--manifest-path", type=Path, default=None)
parser.add_argument("--discovery-path", type=Path, default=None)
parser.add_argument("--discovery-dir", type=Path, default=None)
return parser.parse_args()
def main() -> int:
args = parse_args()
config = build_config(args)
command = " ".join([Path(sys.argv[0]).as_posix(), *sys.argv[1:]])
manifest = run_recorder(config, command)
print(f"WS_RECORDER_MANIFEST={config['manifest_path']}")
print(f"WS_RECORDER_RUN_ID={manifest['run_id']}")
print(f"WS_RECORDER_GATE={manifest['gate_status']}")
return 0 if manifest["gate_status"] == "PASS" else 1
if __name__ == "__main__":
raise SystemExit(main())