orderbooks/scripts/collect_polymarket_ws_orderbooks.py

1330 lines
59 KiB
Python
Executable file

#!/usr/bin/env python3
"""Long-running raw Polymarket BTC websocket recorder with REST checkpoints.
Checkpoint 10D scope: public BTC up/down market data only. The recorder writes
raw websocket text exactly as received, keeps REST /books checkpoints as
recovery/comparison evidence, and records rotation/reconnect/stale/divergence
counters in manifests. It does not trade, sign, authenticate, or handle keys.
"""
from __future__ import annotations
import argparse
import base64
import datetime as dt
import gzip
import hashlib
import json
import os
import signal
import socket
import ssl
import struct
import subprocess
import sys
import time
import urllib.error
import urllib.parse
import urllib.request
from decimal import Decimal, InvalidOperation
from pathlib import Path
from typing import Any
COLLECTOR_NAME = "polymarket_ws_orderbook_recorder"
COLLECTOR_VERSION = "0.1.1"
WS_SCHEMA_NAME = "raw_polymarket_market_ws_message"
REST_SCHEMA_NAME = "raw_polymarket_books_checkpoint"
MANIFEST_SCHEMA_NAME = "polymarket_ws_recorder_manifest"
SCHEMA_VERSION = 1
DEFAULT_CONFIG_PATH = Path("config/polymarket_ws_collector.example.yaml")
DEFAULT_DISCOVERY_PATH = Path("data/discovery/polymarket_btc_markets_latest.json")
DEFAULT_DISCOVERY_DIR = Path("data/discovery")
DEFAULT_RAW_OUTPUT_ROOT = Path("/var/lib/orderbooks/raw_orderbooks")
DEFAULT_MANIFEST_DIR = Path("/var/lib/orderbooks/manifests")
DEFAULT_MANIFEST_PATH = Path("/var/lib/orderbooks/manifests/polymarket_ws_recorder_latest.json")
MARKET_WS_URL = "wss://ws-subscriptions-clob.polymarket.com/ws/market"
CLOB_BOOKS_URL = "https://clob.polymarket.com/books"
DISCOVERY_SCRIPT = Path("scripts/discover_polymarket_btc_markets.py")
SAFE_RESPONSE_HEADERS = {
"cache-control",
"cf-cache-status",
"cf-ray",
"content-length",
"content-type",
"date",
"retry-after",
"server",
"x-ratelimit-limit",
"x-ratelimit-remaining",
"x-ratelimit-reset",
"ratelimit-limit",
"ratelimit-remaining",
"ratelimit-reset",
}
STOP_REQUESTED = False
STOP_SIGNAL: str | None = None
def handle_stop(signum: int, _frame: Any) -> None:
global STOP_REQUESTED, STOP_SIGNAL
STOP_REQUESTED = True
STOP_SIGNAL = signal.Signals(signum).name
def utc_now() -> dt.datetime:
return dt.datetime.now(dt.UTC)
def iso_z(value: dt.datetime | None = None) -> str:
value = value or utc_now()
return value.astimezone(dt.UTC).replace(microsecond=0).isoformat().replace("+00:00", "Z")
def compact_timestamp(value: dt.datetime | None = None) -> str:
value = value or utc_now()
return value.astimezone(dt.UTC).strftime("%Y%m%dT%H%M%SZ")
def parse_iso(value: Any) -> dt.datetime | None:
if not isinstance(value, str) or not value.strip():
return None
text = value.strip()
if text.endswith("Z"):
text = text[:-1] + "+00:00"
try:
parsed = dt.datetime.fromisoformat(text)
except ValueError:
return None
if parsed.tzinfo is None:
parsed = parsed.replace(tzinfo=dt.UTC)
return parsed.astimezone(dt.UTC)
def sha256_bytes(data: bytes) -> str:
return hashlib.sha256(data).hexdigest()
def sha256_file(path: Path) -> str:
digest = hashlib.sha256()
with path.open("rb") as handle:
for chunk in iter(lambda: handle.read(1024 * 1024), b""):
digest.update(chunk)
return digest.hexdigest()
def parse_scalar(value: str) -> Any:
value = value.strip()
if not value:
return ""
if value[0] in {"'", '"'} and value[-1:] == value[0]:
return value[1:-1]
lower = value.lower()
if lower in {"true", "false"}:
return lower == "true"
if lower in {"null", "none"}:
return None
try:
return int(value)
except ValueError:
pass
try:
return float(value)
except ValueError:
return value
def load_flat_yaml(path: Path) -> dict[str, Any]:
config: dict[str, Any] = {}
if not path.exists():
return config
for line_number, raw_line in enumerate(path.read_text(encoding="utf-8").splitlines(), 1):
line = raw_line.split("#", 1)[0].strip()
if not line:
continue
if ":" not in line:
raise ValueError(f"Unsupported config line {line_number}: {raw_line}")
key, value = line.split(":", 1)
key = key.strip()
if not key:
raise ValueError(f"Missing config key on line {line_number}")
config[key] = parse_scalar(value)
return config
def config_value(config: dict[str, Any], args: argparse.Namespace, key: str, default: Any) -> Any:
value = getattr(args, key, None)
if value is not None:
return value
env_key = "ORDERBOOKS_WS_" + key.upper()
if env_key in os.environ:
return parse_scalar(os.environ[env_key])
return config.get(key, default)
def filter_headers(headers: Any) -> dict[str, str]:
safe: dict[str, str] = {}
for key, value in dict(headers).items():
if key.lower() in SAFE_RESPONSE_HEADERS:
safe[key] = value
return safe
def dec(value: Any) -> Decimal:
if value is None:
return Decimal("0")
try:
return Decimal(str(value))
except InvalidOperation:
return Decimal("0")
def dec_to_str(value: Decimal) -> str:
text = format(value, "f")
if "." in text:
text = text.rstrip("0").rstrip(".")
return text or "0"
def level_map(levels: Any) -> dict[str, Decimal]:
result: dict[str, Decimal] = {}
if not isinstance(levels, list):
return result
for item in levels:
if not isinstance(item, dict):
continue
price = dec_to_str(dec(item.get("price")))
result[price] = dec(item.get("size"))
return result
def top_levels(book: dict[str, Decimal], side: str, top_n: int) -> list[tuple[str, Decimal]]:
return sorted(book.items(), key=lambda item: Decimal(item[0]), reverse=(side == "bids"))[:top_n]
def summarize_book(bids: dict[str, Decimal], asks: dict[str, Decimal], top_n: int) -> dict[str, Any]:
bid_levels = top_levels(bids, "bids", top_n)
ask_levels = top_levels(asks, "asks", top_n)
best_bid = bid_levels[0][0] if bid_levels else None
best_ask = ask_levels[0][0] if ask_levels else None
spread = dec_to_str(Decimal(best_ask) - Decimal(best_bid)) if best_bid and best_ask else None
return {
"best_bid": best_bid,
"best_ask": best_ask,
"spread": spread,
"bid_level_count": len(bids),
"ask_level_count": len(asks),
"top_bids": [{"price": price, "size": dec_to_str(size)} for price, size in bid_levels],
"top_asks": [{"price": price, "size": dec_to_str(size)} for price, size in ask_levels],
}
def compare_side(local: dict[str, Decimal], rest: dict[str, Decimal], side: str, top_n: int) -> dict[str, Any]:
local_top = dict(top_levels(local, side, top_n))
rest_top = dict(top_levels(rest, side, top_n))
missing = sorted(set(rest_top) - set(local_top), key=Decimal, reverse=(side == "bids"))
extra = sorted(set(local_top) - set(rest_top), key=Decimal, reverse=(side == "bids"))
size_deltas = []
for price in sorted(set(local_top) & set(rest_top), key=Decimal, reverse=(side == "bids")):
delta = local_top[price] - rest_top[price]
if delta != 0:
size_deltas.append({
"price": price,
"local_size": dec_to_str(local_top[price]),
"rest_size": dec_to_str(rest_top[price]),
"delta": dec_to_str(delta),
})
return {"missing_prices": missing, "extra_prices": extra, "size_deltas": size_deltas}
class BookState:
def __init__(self, token_meta: dict[str, Any]) -> None:
self.token_meta = token_meta
self.bids: dict[str, Decimal] = {}
self.asks: dict[str, Decimal] = {}
self.initialized = False
self.messages_applied = 0
self.messages_skipped = 0
self.unknown_messages = 0
self.last_update_received_at_utc: str | None = None
def apply_book(self, item: dict[str, Any], received_at_utc: str) -> None:
self.bids = level_map(item.get("bids"))
self.asks = level_map(item.get("asks"))
self.initialized = True
self.messages_applied += 1
self.last_update_received_at_utc = received_at_utc
def apply_change(self, change: dict[str, Any], received_at_utc: str) -> str | None:
if not self.initialized:
self.messages_skipped += 1
return "price_change_before_book_snapshot"
side = str(change.get("side") or "").upper()
price = dec_to_str(dec(change.get("price")))
size = dec(change.get("size"))
if side == "BUY":
book_side = self.bids
elif side == "SELL":
book_side = self.asks
else:
self.messages_skipped += 1
return f"unsupported_price_change_side:{side}"
if size == 0:
book_side.pop(price, None)
else:
book_side[price] = size
self.messages_applied += 1
self.last_update_received_at_utc = received_at_utc
return None
def summary(self, top_n: int) -> dict[str, Any]:
state_quality = "insufficient_events"
if self.initialized and self.messages_applied > 1:
state_quality = "initialized_and_updated"
elif self.initialized:
state_quality = "snapshot_only"
return {
"token": self.token_meta,
"initialized": self.initialized,
"state_quality": state_quality,
"messages_applied": self.messages_applied,
"messages_skipped": self.messages_skipped,
"unknown_messages": self.unknown_messages,
"last_update_received_at_utc": self.last_update_received_at_utc,
**summarize_book(self.bids, self.asks, top_n),
}
class ArchiveWriter:
def __init__(self, *, root: Path, subdir: str, prefix: str, run_id: str) -> None:
self.root = root
self.subdir = subdir
self.prefix = prefix
self.run_id = run_id
self.current_hour: str | None = None
self.temp_path: Path | None = None
self.final_path: Path | None = None
self.handle: gzip.GzipFile | None = None
self.rows = 0
self.started_at_utc: str | None = None
self.closed_files: list[dict[str, Any]] = []
def _paths_for(self, when: dt.datetime) -> tuple[str, Path, Path]:
hour = when.astimezone(dt.UTC).strftime("%Y/%m/%d/%H")
hour_id = when.astimezone(dt.UTC).strftime("%Y%m%dT%H0000Z")
directory = self.root / self.subdir / hour
final_path = directory / f"{self.prefix}_{self.run_id}_{hour_id}.jsonl.gz"
temp_path = directory / f".{final_path.name}.open"
return hour, temp_path, final_path
def ensure_open(self, when: dt.datetime | None = None) -> None:
when = when or utc_now()
hour, temp_path, final_path = self._paths_for(when)
if self.handle is not None and self.current_hour == hour:
return
self.close(ended_at=when)
temp_path.parent.mkdir(parents=True, exist_ok=True)
self.current_hour = hour
self.temp_path = temp_path
self.final_path = final_path
self.rows = 0
self.started_at_utc = iso_z(when)
self.handle = gzip.open(temp_path, "at", encoding="utf-8")
def write(self, row: dict[str, Any], when: dt.datetime | None = None) -> None:
self.ensure_open(when)
assert self.handle is not None
self.handle.write(json.dumps(row, separators=(",", ":"), sort_keys=True) + "\n")
self.rows += 1
if self.rows % 100 == 0:
self.handle.flush()
def close(self, ended_at: dt.datetime | None = None) -> None:
if self.handle is None:
return
ended_at = ended_at or utc_now()
self.handle.close()
assert self.temp_path is not None and self.final_path is not None
self.final_path.parent.mkdir(parents=True, exist_ok=True)
if self.final_path.exists():
self.final_path.unlink()
self.temp_path.rename(self.final_path)
self.closed_files.append({
"path": self.final_path.as_posix(),
"kind": self.prefix,
"started_at_utc": self.started_at_utc,
"ended_at_utc": iso_z(ended_at),
"rows": self.rows,
"bytes": self.final_path.stat().st_size,
"sha256": sha256_file(self.final_path),
"status": "valid" if self.rows > 0 else "empty",
})
self.current_hour = None
self.temp_path = None
self.final_path = None
self.handle = None
self.rows = 0
self.started_at_utc = None
def open_file_summary(self) -> dict[str, Any] | None:
if self.temp_path is None:
return None
summary = {
"path": self.temp_path.as_posix(),
"final_path": self.final_path.as_posix() if self.final_path else None,
"kind": self.prefix,
"started_at_utc": self.started_at_utc,
"rows_written": self.rows,
"status": "open",
}
if self.temp_path.exists():
stat = self.temp_path.stat()
summary.update({
"bytes": stat.st_size,
"mtime_utc": dt.datetime.fromtimestamp(stat.st_mtime, dt.UTC).replace(microsecond=0).isoformat().replace("+00:00", "Z"),
})
return summary
def send_ws_frame(sock: ssl.SSLSocket, opcode: int, payload: bytes) -> None:
mask = os.urandom(4)
header = bytearray([0x80 | opcode])
length = len(payload)
if length < 126:
header.append(0x80 | length)
elif length < 65536:
header.append(0x80 | 126)
header.extend(struct.pack("!H", length))
else:
header.append(0x80 | 127)
header.extend(struct.pack("!Q", length))
masked = bytes(byte ^ mask[index % 4] for index, byte in enumerate(payload))
sock.sendall(header + mask + masked)
def read_exact(sock: ssl.SSLSocket, length: int) -> bytes:
data = bytearray()
while len(data) < length:
chunk = sock.recv(length - len(data))
if not chunk:
raise EOFError("websocket connection closed while reading frame")
data.extend(chunk)
return bytes(data)
def read_ws_frame(sock: ssl.SSLSocket) -> tuple[int, bytes]:
first, second = read_exact(sock, 2)
opcode = first & 0x0F
length = second & 0x7F
masked = bool(second & 0x80)
if length == 126:
length = struct.unpack("!H", read_exact(sock, 2))[0]
elif length == 127:
length = struct.unpack("!Q", read_exact(sock, 8))[0]
mask = read_exact(sock, 4) if masked else b""
payload = read_exact(sock, length) if length else b""
if masked:
payload = bytes(byte ^ mask[index % 4] for index, byte in enumerate(payload))
return opcode, payload
def parse_ws_headers(raw_headers: str) -> tuple[str, dict[str, str]]:
lines = raw_headers.split("\r\n")
status_line = lines[0] if lines else ""
headers: dict[str, str] = {}
for line in lines[1:]:
if ":" not in line:
continue
key, value = line.split(":", 1)
headers[key.strip()] = value.strip()
return status_line, filter_headers(headers)
def open_websocket(url: str, timeout_seconds: float) -> tuple[ssl.SSLSocket, dict[str, Any]]:
parsed = urllib.parse.urlparse(url)
host = parsed.hostname
if not host:
raise ValueError("missing websocket host")
port = parsed.port or 443
path = parsed.path or "/"
if parsed.query:
path = f"{path}?{parsed.query}"
raw_sock = socket.create_connection((host, port), timeout=timeout_seconds)
sock = ssl.create_default_context().wrap_socket(raw_sock, server_hostname=host)
sock.settimeout(timeout_seconds)
key = base64.b64encode(os.urandom(16)).decode("ascii")
request = (
f"GET {path} HTTP/1.1\r\n"
f"Host: {host}\r\n"
"Upgrade: websocket\r\n"
"Connection: Upgrade\r\n"
f"Sec-WebSocket-Key: {key}\r\n"
"Sec-WebSocket-Version: 13\r\n"
f"User-Agent: orderbooks-polymarket-ws-recorder/{COLLECTOR_VERSION}\r\n"
"\r\n"
)
sock.sendall(request.encode("ascii"))
raw_headers = bytearray()
while b"\r\n\r\n" not in raw_headers:
raw_headers.extend(sock.recv(4096))
if len(raw_headers) > 65536:
raise ValueError("websocket handshake headers exceeded 64 KiB")
header_text = bytes(raw_headers).split(b"\r\n\r\n", 1)[0].decode("iso-8859-1", errors="replace")
status_line, response_headers = parse_ws_headers(header_text)
if " 101 " not in status_line:
raise ValueError(f"websocket upgrade failed: {status_line}")
return sock, {"status_line": status_line, "headers": response_headers}
def decode_json_maybe(text: str) -> tuple[Any | None, str | None]:
try:
return json.loads(text), None
except json.JSONDecodeError as exc:
return None, str(exc)
def classify_payload(payload: Any) -> list[str]:
items = payload if isinstance(payload, list) else [payload]
event_types: list[str] = []
for item in items:
if not isinstance(item, dict):
event_types.append(type(item).__name__)
elif item.get("event_type"):
event_types.append(str(item["event_type"]))
elif {"asset_id", "bids", "asks"}.issubset(item.keys()):
event_types.append("book")
else:
event_types.append("unknown_object")
return event_types
def raw_items(payload: Any) -> list[dict[str, Any]]:
items = payload if isinstance(payload, list) else [payload]
return [item for item in items if isinstance(item, dict)]
def load_discovery(path: Path) -> dict[str, Any]:
return json.loads(path.read_text(encoding="utf-8"))
def market_is_usable(market: dict[str, Any], now: dt.datetime, safety_seconds: int) -> tuple[bool, list[str]]:
reasons: list[str] = []
if market.get("active") is not True:
reasons.append("not_active")
if market.get("closed") is not False:
reasons.append("closed")
if market.get("accepting_orders") is not True:
reasons.append("not_accepting_orders")
if market.get("enable_order_book") is not True:
reasons.append("order_book_not_enabled")
end_time = parse_iso(market.get("end_time_utc"))
if end_time is None:
reasons.append("missing_end_time")
elif end_time <= now + dt.timedelta(seconds=safety_seconds):
reasons.append("too_close_to_end_or_expired")
tokens = market.get("tokens")
if not isinstance(tokens, list) or len(tokens) < 2:
reasons.append("missing_two_tokens")
else:
outcomes = [token.get("outcome") for token in tokens if isinstance(token, dict)]
token_ids = [token.get("token_id") for token in tokens if isinstance(token, dict)]
if outcomes[:2] != ["Up", "Down"] or not all(token_ids[:2]):
reasons.append("bad_up_down_token_mapping")
return not reasons, reasons
def select_markets(discovery: dict[str, Any], market_limit: int, market_end_safety_seconds: int) -> tuple[list[dict[str, Any]], dict[str, int]]:
now = utc_now()
selected: list[dict[str, Any]] = []
rejection_counts: dict[str, int] = {}
for market in discovery.get("normalized_markets") or []:
if not isinstance(market, dict):
rejection_counts["not_object"] = rejection_counts.get("not_object", 0) + 1
continue
usable, reasons = market_is_usable(market, now, market_end_safety_seconds)
if not usable:
for reason in reasons:
rejection_counts[reason] = rejection_counts.get(reason, 0) + 1
continue
selected.append(market)
if market_limit > 0 and len(selected) >= market_limit:
break
return selected, dict(sorted(rejection_counts.items()))
def flatten_tokens(markets: list[dict[str, Any]]) -> list[dict[str, Any]]:
tokens: list[dict[str, Any]] = []
for market in markets:
for token in market.get("tokens", [])[:2]:
if not isinstance(token, dict):
continue
tokens.append({
"market_name": market.get("market_name"),
"market_slug": market.get("market_slug"),
"condition_id": market.get("condition_id"),
"token_id": str(token.get("token_id")),
"outcome": token.get("outcome"),
"outcome_index": token.get("outcome_index"),
"market_end_time_utc": market.get("end_time_utc"),
})
return tokens
def run_discovery(config: dict[str, Any], counters: dict[str, Any], warnings: list[str], errors: list[dict[str, Any]]) -> None:
script_path = Path(config["discovery_script_path"])
if not config["discovery_execute"]:
return
if not script_path.exists():
warnings.append(f"discovery script missing; using existing artifact only: {script_path}")
return
output_json = Path(config["discovery_path"])
output_json.parent.mkdir(parents=True, exist_ok=True)
cmd = [
sys.executable,
script_path.as_posix(),
"--output-json", output_json.as_posix(),
"--manifest", str(Path(config["discovery_dir"]) / "polymarket_btc_markets_manifest.json"),
"--markdown", str(Path(config["discovery_dir"]) / "polymarket_btc_markets.md"),
"--max-pages", str(config["discovery_max_pages"]),
"--limit", str(config["discovery_page_limit"]),
"--timeout", str(config["request_timeout_seconds"]),
]
started = time.monotonic()
result = subprocess.run(cmd, text=True, capture_output=True, timeout=max(30, int(config["request_timeout_seconds"] * (config["discovery_max_pages"] + 1))))
counters["discovery_refresh_count"] += 1
counters["last_discovery_duration_ms"] = round((time.monotonic() - started) * 1000, 3)
if result.returncode != 0:
counters["discovery_failure_count"] += 1
errors.append({
"stage": "discovery_refresh",
"returncode": result.returncode,
"stderr_tail": result.stderr[-2000:],
"stdout_tail": result.stdout[-1000:],
})
def refresh_market_state(config: dict[str, Any], counters: dict[str, Any], warnings: list[str], errors: list[dict[str, Any]]) -> tuple[list[dict[str, Any]], list[dict[str, Any]], dict[str, int]]:
run_discovery(config, counters, warnings, errors)
discovery_path = Path(config["discovery_path"])
discovery = load_discovery(discovery_path)
markets, rejection_counts = select_markets(discovery, int(config["market_limit"]), int(config["market_end_safety_seconds"]))
tokens = flatten_tokens(markets)
counters["markets_tracked"] = len(markets)
counters["tokens_tracked"] = len(tokens)
counters["last_discovery_path"] = discovery_path.as_posix()
counters["last_discovery_sha256"] = sha256_file(discovery_path) if discovery_path.exists() else None
return markets, tokens, rejection_counts
def apply_payload_to_books(payload: Any, states: dict[str, BookState], received_at_utc: str, counters: dict[str, Any], warnings: list[str]) -> None:
for item in raw_items(payload):
event_type = item.get("event_type") or ("book" if {"asset_id", "bids", "asks"}.issubset(item.keys()) else "unknown_object")
counters["event_type_counts"][event_type] = counters["event_type_counts"].get(event_type, 0) + 1
if event_type == "book":
state = states.get(str(item.get("asset_id") or ""))
if state:
state.apply_book(item, received_at_utc)
elif event_type == "price_change":
changes = item.get("price_changes")
if not isinstance(changes, list):
counters["parser_warning_count"] += 1
warnings.append("price_change event without price_changes list")
continue
for change in changes:
if not isinstance(change, dict):
continue
state = states.get(str(change.get("asset_id") or ""))
if not state:
continue
reason = state.apply_change(change, received_at_utc)
if reason:
counters["book_update_skip_counts"][reason] = counters["book_update_skip_counts"].get(reason, 0) + 1
elif event_type in {"best_bid_ask", "last_trade_price", "new_market"}:
counters["non_mutating_event_counts"][event_type] = counters["non_mutating_event_counts"].get(event_type, 0) + 1
else:
counters["unknown_event_counts"][event_type] = counters["unknown_event_counts"].get(event_type, 0) + 1
for state in states.values():
state.unknown_messages += 1
def compare_state_to_rest(state: BookState, rest_item: dict[str, Any], top_n: int) -> dict[str, Any]:
rest_bids = level_map(rest_item.get("bids"))
rest_asks = level_map(rest_item.get("asks"))
local_summary = summarize_book(state.bids, state.asks, top_n)
rest_summary = summarize_book(rest_bids, rest_asks, top_n)
bid_diff = compare_side(state.bids, rest_bids, "bids", top_n)
ask_diff = compare_side(state.asks, rest_asks, "asks", top_n)
best_bid_affected = local_summary["best_bid"] != rest_summary["best_bid"]
best_ask_affected = local_summary["best_ask"] != rest_summary["best_ask"]
spread_affected = local_summary["spread"] != rest_summary["spread"]
level_count_affected = local_summary["bid_level_count"] != rest_summary["bid_level_count"] or local_summary["ask_level_count"] != rest_summary["ask_level_count"]
price_membership_affected = bool(bid_diff["missing_prices"] or bid_diff["extra_prices"] or ask_diff["missing_prices"] or ask_diff["extra_prices"])
size_delta_count = len(bid_diff["size_deltas"]) + len(ask_diff["size_deltas"])
divergent = any([best_bid_affected, best_ask_affected, spread_affected, level_count_affected, price_membership_affected, size_delta_count])
return {
"comparison_status": "divergent" if divergent else "match",
"best_bid_affected": best_bid_affected,
"best_ask_affected": best_ask_affected,
"spread_affected": spread_affected,
"level_count_affected": level_count_affected,
"price_membership_affected": price_membership_affected,
"size_only_divergent": bool(size_delta_count) and not any([best_bid_affected, best_ask_affected, spread_affected, level_count_affected, price_membership_affected]),
"bid_size_delta_count": len(bid_diff["size_deltas"]),
"ask_size_delta_count": len(ask_diff["size_deltas"]),
}
def http_post_books(url: str, token_ids: list[str], timeout_seconds: float) -> dict[str, Any]:
requested_at_utc = iso_z()
started = time.monotonic()
request_body = [{"token_id": token_id} for token_id in token_ids]
body_bytes = json.dumps(request_body, separators=(",", ":")).encode("utf-8")
status_code: int | None = None
headers: dict[str, str] = {}
response_text = ""
error: str | None = None
try:
request = urllib.request.Request(
url,
data=body_bytes,
headers={
"Accept": "application/json",
"Content-Type": "application/json",
"User-Agent": f"orderbooks-polymarket-ws-recorder/{COLLECTOR_VERSION}",
},
method="POST",
)
with urllib.request.urlopen(request, timeout=timeout_seconds) as response:
status_code = response.status
headers = filter_headers(response.headers)
response_text = response.read().decode("utf-8", errors="replace")
except urllib.error.HTTPError as exc:
status_code = exc.code
headers = filter_headers(exc.headers)
response_text = exc.read().decode("utf-8", errors="replace")
error = f"HTTPError: {exc}"
except Exception as exc: # noqa: BLE001 - preserve failure evidence
error = f"{type(exc).__name__}: {exc}"
parsed_json, json_error = decode_json_maybe(response_text) if response_text else (None, None)
return {
"requested_at_utc": requested_at_utc,
"received_at_utc": iso_z(),
"duration_ms": round((time.monotonic() - started) * 1000, 3),
"request_body": request_body,
"status_code": status_code,
"headers": headers,
"raw_response_json": parsed_json,
"json_error": json_error,
"raw_response_text_sha256": sha256_bytes(response_text.encode("utf-8")),
"raw_response_length_bytes": len(response_text.encode("utf-8")),
"raw_response_text_preview": response_text[:1000] if parsed_json is None else None,
"error": error,
"ok": error is None and status_code is not None and 200 <= status_code < 300 and json_error is None,
}
def fetch_rest_checkpoint(
*,
config: dict[str, Any],
rest_writer: ArchiveWriter,
checkpoint_sequence: int,
tokens: list[dict[str, Any]],
states: dict[str, BookState],
counters: dict[str, Any],
) -> None:
token_ids = [token["token_id"] for token in tokens]
batch_size = max(1, int(config["rest_batch_size"]))
for batch_index, start in enumerate(range(0, len(token_ids), batch_size), 1):
batch = token_ids[start:start + batch_size]
response = http_post_books(config["clob_books_url"], batch, float(config["request_timeout_seconds"]))
counters["rest_request_count"] += 1
if response["ok"]:
counters["rest_success_count"] += 1
else:
counters["rest_failure_count"] += 1
if response.get("status_code") == 429:
counters["rest_rate_limit_count"] += 1
comparison = {
"match_count": 0,
"divergent_count": 0,
"no_state_count": 0,
"best_bid_affected_count": 0,
"best_ask_affected_count": 0,
"spread_affected_count": 0,
"level_count_affected_count": 0,
"price_membership_affected_count": 0,
"size_only_divergent_count": 0,
"bid_size_delta_count": 0,
"ask_size_delta_count": 0,
}
payload = response.get("raw_response_json")
if isinstance(payload, list):
for rest_item in payload:
if not isinstance(rest_item, dict):
continue
token_id = str(rest_item.get("asset_id") or "")
state = states.get(token_id)
if not state or not state.initialized:
comparison["no_state_count"] += 1
continue
cmp = compare_state_to_rest(state, rest_item, int(config["top_n"]))
if cmp["comparison_status"] == "match":
comparison["match_count"] += 1
else:
comparison["divergent_count"] += 1
for key in ["best_bid_affected", "best_ask_affected", "spread_affected", "level_count_affected", "price_membership_affected", "size_only_divergent"]:
if cmp[key]:
comparison[f"{key}_count"] += 1
comparison["bid_size_delta_count"] += cmp["bid_size_delta_count"]
comparison["ask_size_delta_count"] += cmp["ask_size_delta_count"]
for key, value in comparison.items():
counters["rest_comparison_counts"][key] = counters["rest_comparison_counts"].get(key, 0) + value
rest_writer.write({
"schema_name": REST_SCHEMA_NAME,
"schema_version": SCHEMA_VERSION,
"collector": {"name": COLLECTOR_NAME, "version": COLLECTOR_VERSION},
"checkpoint_sequence": checkpoint_sequence,
"batch_index": batch_index,
"batch_count": (len(token_ids) + batch_size - 1) // batch_size,
"token_ids": batch,
"tokens_tracked_count": len(token_ids),
"response": response,
"comparison_summary": comparison,
})
def build_ws_envelope(
*,
run_id: str,
session_id: str,
connection_sequence: int,
message_sequence: int,
global_message_sequence: int,
received_at_utc: str,
websocket_url: str,
subscription: dict[str, Any],
tokens: list[dict[str, Any]],
opcode: int,
payload_bytes: bytes,
) -> tuple[dict[str, Any], Any | None, list[str], bool]:
decode_error = None
try:
raw_text = payload_bytes.decode("utf-8")
except UnicodeDecodeError as exc:
decode_error = str(exc)
raw_text = payload_bytes.decode("utf-8", errors="replace")
parsed_json, json_error = decode_json_maybe(raw_text) if decode_error is None else (None, decode_error)
event_types = classify_payload(parsed_json) if parsed_json is not None else ["unparseable_text"]
envelope = {
"schema_name": WS_SCHEMA_NAME,
"schema_version": SCHEMA_VERSION,
"collector": {"name": COLLECTOR_NAME, "version": COLLECTOR_VERSION},
"run_id": run_id,
"session_id": session_id,
"connection_sequence": connection_sequence,
"message_sequence": message_sequence,
"global_message_sequence": global_message_sequence,
"received_at_utc": received_at_utc,
"websocket": {"url": websocket_url},
"subscription": subscription,
"tokens_tracked": tokens,
"opcode": opcode,
"payload_length_bytes": len(payload_bytes),
"payload_sha256": sha256_bytes(payload_bytes),
"raw_text": raw_text,
"json": parsed_json,
"json_error": json_error,
"classified_event_types": event_types,
}
return envelope, parsed_json, event_types, parsed_json is not None
def initial_counters() -> dict[str, Any]:
return {
"websocket_message_count": 0,
"websocket_parsed_json_count": 0,
"websocket_parse_error_count": 0,
"websocket_opcode_counts": {},
"event_type_counts": {},
"non_mutating_event_counts": {},
"unknown_event_counts": {},
"book_update_skip_counts": {},
"parser_warning_count": 0,
"connection_count": 0,
"reconnect_count": 0,
"subscription_change_count": 0,
"stale_feed_count": 0,
"consecutive_stale_reconnects": 0,
"max_gap_seconds": None,
"last_message_received_at_utc": None,
"last_successful_ws_message_at_utc": None,
"seconds_since_last_ws_message": None,
"rest_request_count": 0,
"rest_success_count": 0,
"rest_failure_count": 0,
"rest_rate_limit_count": 0,
"rest_comparison_counts": {},
"discovery_refresh_count": 0,
"discovery_failure_count": 0,
"last_discovery_duration_ms": None,
"markets_tracked": 0,
"tokens_tracked": 0,
"last_discovery_path": None,
"last_discovery_sha256": None,
}
def summarize_states(states: dict[str, BookState], top_n: int) -> list[dict[str, Any]]:
return [state.summary(top_n) for state in states.values()]
def seconds_since_iso(value: str | None) -> float | None:
parsed = parse_iso(value)
if parsed is None:
return None
return round(max(0.0, (utc_now() - parsed).total_seconds()), 3)
def write_manifest(
*,
config: dict[str, Any],
run_id: str,
started_at_utc: str,
status: str,
gate_status: str,
shutdown_reason: str | None,
markets: list[dict[str, Any]],
tokens: list[dict[str, Any]],
counters: dict[str, Any],
ws_writer: ArchiveWriter,
rest_writer: ArchiveWriter,
states: dict[str, BookState],
warnings: list[str],
errors: list[dict[str, Any]],
recent_sessions: list[dict[str, Any]] | None = None,
current_session: dict[str, Any] | None = None,
) -> dict[str, Any]:
counters["seconds_since_last_ws_message"] = seconds_since_iso(counters.get("last_successful_ws_message_at_utc"))
open_file_summaries = [summary for summary in [ws_writer.open_file_summary(), rest_writer.open_file_summary()] if summary is not None]
manifest = {
"schema_name": MANIFEST_SCHEMA_NAME,
"schema_version": 1,
"collector": {"name": COLLECTOR_NAME, "version": COLLECTOR_VERSION},
"run_id": run_id,
"started_at_utc": started_at_utc,
"updated_at_utc": iso_z(),
"status": status,
"gate_status": gate_status,
"shutdown_reason": shutdown_reason,
"production_ready": False,
"public_data_only": True,
"trading_enabled": False,
"command": config.get("command"),
"config": public_config(config),
"markets_tracked": markets,
"tokens_tracked": tokens,
"current_subscription_token_ids": [token.get("token_id") for token in tokens],
"current_tracked_market_end_times": [market.get("end_time_utc") for market in markets],
"last_successful_ws_message_at_utc": counters.get("last_successful_ws_message_at_utc"),
"seconds_since_last_ws_message": counters.get("seconds_since_last_ws_message"),
"consecutive_stale_reconnects": counters.get("consecutive_stale_reconnects", 0),
"recent_sessions": (recent_sessions or [])[-20:],
"current_session": current_session,
"counters": counters,
"state_summary": summarize_states(states, int(config["top_n"])),
"output_files": [*ws_writer.closed_files, *rest_writer.closed_files],
"open_files": open_file_summaries,
"warnings": sorted(set(warnings)),
"errors": errors[-20:],
}
manifest_dir = Path(config["manifest_dir"])
manifest_dir.mkdir(parents=True, exist_ok=True)
manifest_path = Path(config["manifest_path"])
manifest_path.parent.mkdir(parents=True, exist_ok=True)
immutable_path = manifest_dir / f"polymarket_ws_recorder_{run_id}_{compact_timestamp()}.json"
text = json.dumps(manifest, indent=2, sort_keys=True) + "\n"
manifest_path.write_text(text, encoding="utf-8")
immutable_path.write_text(text, encoding="utf-8")
return manifest
def public_config(config: dict[str, Any]) -> dict[str, Any]:
result = {}
for key, value in config.items():
if isinstance(value, Path):
result[key] = value.as_posix()
else:
result[key] = value
return result
def build_config(args: argparse.Namespace) -> dict[str, Any]:
file_config = load_flat_yaml(args.config) if args.config else {}
duration = config_value(file_config, args, "duration_seconds", None)
config = {
"config_path": args.config.as_posix() if args.config else None,
"config_sha256": sha256_file(args.config) if args.config and args.config.exists() else None,
"discovery_path": Path(config_value(file_config, args, "discovery_path", DEFAULT_DISCOVERY_PATH)),
"discovery_dir": Path(config_value(file_config, args, "discovery_dir", DEFAULT_DISCOVERY_DIR)),
"discovery_script_path": Path(config_value(file_config, args, "discovery_script_path", DISCOVERY_SCRIPT)),
"discovery_execute": bool(config_value(file_config, args, "discovery_execute", True)),
"discovery_refresh_interval_seconds": float(config_value(file_config, args, "discovery_refresh_interval_seconds", 600)),
"discovery_max_pages": int(config_value(file_config, args, "discovery_max_pages", 3)),
"discovery_page_limit": int(config_value(file_config, args, "discovery_page_limit", 100)),
"raw_output_root": Path(config_value(file_config, args, "raw_output_root", DEFAULT_RAW_OUTPUT_ROOT)),
"manifest_dir": Path(config_value(file_config, args, "manifest_dir", DEFAULT_MANIFEST_DIR)),
"manifest_path": Path(config_value(file_config, args, "manifest_path", DEFAULT_MANIFEST_PATH)),
"websocket_url": str(config_value(file_config, args, "websocket_url", MARKET_WS_URL)),
"clob_books_url": str(config_value(file_config, args, "clob_books_url", CLOB_BOOKS_URL)),
"market_limit": int(config_value(file_config, args, "market_limit", 0) or 0),
"market_end_safety_seconds": int(config_value(file_config, args, "market_end_safety_seconds", 420)),
"rest_checkpoint_interval_seconds": float(config_value(file_config, args, "rest_checkpoint_interval_seconds", 60)),
"rest_batch_size": int(config_value(file_config, args, "rest_batch_size", 50)),
"top_n": int(config_value(file_config, args, "top_n", 10)),
"first_message_timeout_seconds": float(config_value(file_config, args, "first_message_timeout_seconds", 90)),
"stale_feed_threshold_seconds": float(config_value(file_config, args, "stale_feed_threshold_seconds", 90)),
"request_timeout_seconds": float(config_value(file_config, args, "request_timeout_seconds", 15)),
"websocket_timeout_seconds": float(config_value(file_config, args, "websocket_timeout_seconds", 10)),
"reconnect_backoff_seconds": float(config_value(file_config, args, "reconnect_backoff_seconds", 3)),
"max_reconnect_backoff_seconds": float(config_value(file_config, args, "max_reconnect_backoff_seconds", 60)),
"max_consecutive_stale_reconnects_before_discovery_refresh": int(config_value(file_config, args, "max_consecutive_stale_reconnects_before_discovery_refresh", 3)),
"duration_seconds": float(duration) if duration is not None else None,
"manifest_write_interval_seconds": float(config_value(file_config, args, "manifest_write_interval_seconds", 300)),
}
if config["rest_batch_size"] < 1:
raise ValueError("rest_batch_size must be >= 1")
if config["market_limit"] < 0:
raise ValueError("market_limit must be >= 0; use 0 for all active BTC markets")
if config["first_message_timeout_seconds"] <= 0:
raise ValueError("first_message_timeout_seconds must be > 0")
if config["stale_feed_threshold_seconds"] <= 0:
raise ValueError("stale_feed_threshold_seconds must be > 0")
if config["max_consecutive_stale_reconnects_before_discovery_refresh"] < 1:
raise ValueError("max_consecutive_stale_reconnects_before_discovery_refresh must be >= 1")
return config
def run_recorder(config: dict[str, Any], command: str) -> dict[str, Any]:
signal.signal(signal.SIGINT, handle_stop)
signal.signal(signal.SIGTERM, handle_stop)
started = utc_now()
started_at_utc = iso_z(started)
run_id = compact_timestamp(started)
deadline = time.monotonic() + float(config["duration_seconds"]) if config["duration_seconds"] else None
counters = initial_counters()
warnings: list[str] = []
errors: list[dict[str, Any]] = []
markets: list[dict[str, Any]] = []
tokens: list[dict[str, Any]] = []
states: dict[str, BookState] = {}
rejection_counts: dict[str, int] = {}
recent_sessions: list[dict[str, Any]] = []
current_session: dict[str, Any] | None = None
runtime_status = "RUNNING_RECONNECTING"
ws_writer = ArchiveWriter(root=Path(config["raw_output_root"]), subdir="polymarket/ws_raw", prefix="polymarket_ws_raw", run_id=run_id)
rest_writer = ArchiveWriter(root=Path(config["raw_output_root"]), subdir="polymarket/rest_checkpoints", prefix="polymarket_rest_checkpoints", run_id=run_id)
next_discovery = 0.0
next_rest_checkpoint = 0.0
next_manifest_write = time.monotonic() + float(config["manifest_write_interval_seconds"])
checkpoint_sequence = 0
global_sequence = 0
connection_sequence = 0
reconnect_backoff = float(config["reconnect_backoff_seconds"])
last_text_message_monotonic: float | None = None
shutdown_reason: str | None = None
config["command"] = command
def finish_current_session(reason: str, error: str | None = None) -> None:
nonlocal current_session
if current_session is None:
return
current_session["closed_at_utc"] = iso_z()
current_session["close_reason"] = current_session.get("close_reason") or reason
if error:
current_session["error"] = error
recent_sessions.append(dict(current_session))
del recent_sessions[:-20]
current_session = None
def write_runtime_manifest(status: str, gate_status: str = "IN_PROGRESS", reason: str | None = None) -> dict[str, Any]:
return write_manifest(
config=config,
run_id=run_id,
started_at_utc=started_at_utc,
status=status,
gate_status=gate_status,
shutdown_reason=reason,
markets=markets,
tokens=tokens,
counters=counters,
ws_writer=ws_writer,
rest_writer=rest_writer,
states=states,
warnings=warnings,
errors=errors,
recent_sessions=recent_sessions,
current_session=current_session,
)
try:
while not STOP_REQUESTED:
now_outer = time.monotonic()
if deadline is not None and now_outer >= deadline:
shutdown_reason = "duration_elapsed"
break
if now_outer >= next_discovery or not tokens:
old_token_ids = [token["token_id"] for token in tokens]
try:
markets, tokens, rejection_counts = refresh_market_state(config, counters, warnings, errors)
counters["market_rejection_counts"] = rejection_counts
except Exception as exc: # noqa: BLE001 - preserve evidence and retry
counters["discovery_failure_count"] += 1
errors.append({"stage": "load_discovery", "error": f"{type(exc).__name__}: {exc}"})
runtime_status = "RUNNING_RECONNECTING"
write_runtime_manifest(runtime_status)
time.sleep(min(reconnect_backoff, 30))
next_discovery = time.monotonic() + float(config["discovery_refresh_interval_seconds"])
continue
new_token_ids = [token["token_id"] for token in tokens]
if old_token_ids and old_token_ids != new_token_ids:
counters["subscription_change_count"] += 1
warnings.append("subscription_rotated_after_discovery_refresh")
states = {token["token_id"]: states.get(token["token_id"], BookState(token)) for token in tokens}
next_discovery = time.monotonic() + float(config["discovery_refresh_interval_seconds"])
if not tokens:
runtime_status = "RUNNING_NO_ACTIVE_TOKENS"
warnings.append("no_active_tokens_after_discovery_refresh")
write_runtime_manifest(runtime_status)
time.sleep(min(reconnect_backoff, 30))
continue
token_ids = [token["token_id"] for token in tokens]
subscription = {"assets_ids": token_ids, "type": "market", "custom_feature_enabled": True}
connection_sequence += 1
session_id = f"{run_id}-ws{connection_sequence}"
current_session = {
"session_id": session_id,
"connection_sequence": connection_sequence,
"connected_at_utc": None,
"subscribed_at_utc": None,
"closed_at_utc": None,
"messages": 0,
"close_reason": None,
"stale_reason": None,
"first_message_latency_seconds": None,
"last_message_received_at_utc": None,
"token_count": len(token_ids),
"market_slugs": [market.get("market_slug") for market in markets],
"market_end_times": [market.get("end_time_utc") for market in markets],
}
sock: ssl.SSLSocket | None = None
session_last_text_message_monotonic: float | None = None
session_subscribed_monotonic: float | None = None
session_message_sequence = 0
stale_raised = False
try:
runtime_status = "RUNNING_RECONNECTING"
sock, handshake = open_websocket(str(config["websocket_url"]), float(config["websocket_timeout_seconds"]))
counters["connection_count"] += 1
current_session["connected_at_utc"] = iso_z()
current_session["handshake_status_line"] = handshake.get("status_line")
send_ws_frame(sock, 1, json.dumps(subscription, separators=(",", ":")).encode("utf-8"))
current_session["subscribed_at_utc"] = iso_z()
session_subscribed_monotonic = time.monotonic()
while not STOP_REQUESTED:
now_monotonic = time.monotonic()
if deadline is not None and now_monotonic >= deadline:
shutdown_reason = "duration_elapsed"
finish_current_session(shutdown_reason)
break
if now_monotonic >= next_rest_checkpoint:
checkpoint_sequence += 1
fetch_rest_checkpoint(config=config, rest_writer=rest_writer, checkpoint_sequence=checkpoint_sequence, tokens=tokens, states=states, counters=counters)
next_rest_checkpoint = now_monotonic + float(config["rest_checkpoint_interval_seconds"])
if now_monotonic >= next_manifest_write:
write_runtime_manifest(runtime_status)
next_manifest_write = now_monotonic + float(config["manifest_write_interval_seconds"])
if now_monotonic >= next_discovery:
previous_token_ids = token_ids
markets, tokens, rejection_counts = refresh_market_state(config, counters, warnings, errors)
counters["market_rejection_counts"] = rejection_counts
token_ids = [token["token_id"] for token in tokens]
next_discovery = now_monotonic + float(config["discovery_refresh_interval_seconds"])
if token_ids != previous_token_ids:
counters["subscription_change_count"] += 1
states = {token["token_id"]: states.get(token["token_id"], BookState(token)) for token in tokens}
if token_ids:
current_session["close_reason"] = "subscription_rotated"
raise RuntimeError("tracked token set changed; reconnecting with new subscription")
current_session["close_reason"] = "no_active_tokens"
raise RuntimeError("no active tokens after discovery refresh")
if session_last_text_message_monotonic is None:
first_wait = now_monotonic - (session_subscribed_monotonic or now_monotonic)
if first_wait > float(config["first_message_timeout_seconds"]):
counters["stale_feed_count"] += 1
counters["consecutive_stale_reconnects"] += 1
stale_raised = True
reason = f"no first websocket text message for {first_wait:.3f}s"
current_session["stale_reason"] = reason
current_session["close_reason"] = "first_message_timeout"
runtime_status = "RUNNING_RECONNECTING"
raise TimeoutError(reason)
else:
silence = now_monotonic - session_last_text_message_monotonic
if silence > float(config["stale_feed_threshold_seconds"]):
counters["stale_feed_count"] += 1
counters["consecutive_stale_reconnects"] += 1
stale_raised = True
reason = f"stale websocket feed for {silence:.3f}s"
current_session["stale_reason"] = reason
current_session["close_reason"] = "stale_feed"
runtime_status = "RUNNING_RECONNECTING"
raise TimeoutError(reason)
try:
opcode, payload = read_ws_frame(sock)
except socket.timeout:
continue
counters["websocket_opcode_counts"][str(opcode)] = counters["websocket_opcode_counts"].get(str(opcode), 0) + 1
if opcode == 1:
received_at_utc = iso_z()
now_for_gap = time.monotonic()
if last_text_message_monotonic is not None:
gap = round(now_for_gap - last_text_message_monotonic, 3)
previous = counters.get("max_gap_seconds")
counters["max_gap_seconds"] = gap if previous is None else max(previous, gap)
if session_message_sequence == 0 and session_subscribed_monotonic is not None:
current_session["first_message_latency_seconds"] = round(now_for_gap - session_subscribed_monotonic, 3)
reconnect_backoff = float(config["reconnect_backoff_seconds"])
last_text_message_monotonic = now_for_gap
session_last_text_message_monotonic = now_for_gap
counters["last_message_received_at_utc"] = received_at_utc
counters["last_successful_ws_message_at_utc"] = received_at_utc
counters["consecutive_stale_reconnects"] = 0
runtime_status = "RUNNING_RECEIVING"
session_message_sequence += 1
current_session["messages"] = session_message_sequence
current_session["last_message_received_at_utc"] = received_at_utc
global_sequence += 1
envelope, parsed_json, event_types, parse_ok = build_ws_envelope(
run_id=run_id,
session_id=session_id,
connection_sequence=connection_sequence,
message_sequence=session_message_sequence,
global_message_sequence=global_sequence,
received_at_utc=received_at_utc,
websocket_url=str(config["websocket_url"]),
subscription=subscription,
tokens=tokens,
opcode=opcode,
payload_bytes=payload,
)
ws_writer.write(envelope)
counters["websocket_message_count"] += 1
if parse_ok:
counters["websocket_parsed_json_count"] += 1
apply_payload_to_books(parsed_json, states, received_at_utc, counters, warnings)
else:
counters["websocket_parse_error_count"] += 1
elif opcode == 8:
current_session["close_reason"] = "websocket_close_frame"
raise EOFError("websocket close frame received")
elif opcode == 9:
send_ws_frame(sock, 10, payload)
elif opcode == 10:
continue
else:
warnings.append(f"ignored websocket opcode {opcode}")
if shutdown_reason:
break
except Exception as exc: # noqa: BLE001 - preserve reconnect evidence
error_text = f"{type(exc).__name__}: {exc}"
errors.append({"stage": "websocket_session", "connection_sequence": connection_sequence, "error": error_text})
counters["reconnect_count"] += 1
if not stale_raised:
counters["consecutive_stale_reconnects"] = 0
finish_current_session(current_session.get("close_reason") if current_session else type(exc).__name__, error_text)
runtime_status = "RUNNING_RECONNECTING"
next_manifest_write = 0.0
if stale_raised and counters["consecutive_stale_reconnects"] >= int(config["max_consecutive_stale_reconnects_before_discovery_refresh"]):
next_discovery = 0.0
warnings.append("forced_discovery_refresh_after_consecutive_stale_reconnects")
if STOP_REQUESTED:
shutdown_reason = STOP_SIGNAL or "stop_requested"
break
time.sleep(min(reconnect_backoff, float(config["max_reconnect_backoff_seconds"])))
reconnect_backoff = min(reconnect_backoff * 2, float(config["max_reconnect_backoff_seconds"]))
finally:
if sock is not None:
try:
sock.close()
except OSError:
pass
if current_session is not None and shutdown_reason:
finish_current_session(shutdown_reason)
if shutdown_reason:
break
finally:
if STOP_REQUESTED and shutdown_reason is None:
shutdown_reason = STOP_SIGNAL or "stop_requested"
if current_session is not None:
finish_current_session(shutdown_reason or "loop_exited")
ws_writer.close()
rest_writer.close()
if shutdown_reason is None:
shutdown_reason = "loop_exited"
if counters["websocket_message_count"] > 0 and counters["rest_success_count"] > 0 and ws_writer.closed_files and rest_writer.closed_files:
gate_status = "PASS"
else:
gate_status = "BLOCKED_RUNTIME_EVIDENCE"
status = "INTERRUPTED" if STOP_SIGNAL else ("COMPLETED_BOUNDED" if config["duration_seconds"] else "STOPPED")
manifest = write_manifest(
config=config,
run_id=run_id,
started_at_utc=started_at_utc,
status=status,
gate_status=gate_status,
shutdown_reason=shutdown_reason,
markets=markets,
tokens=tokens,
counters=counters,
ws_writer=ws_writer,
rest_writer=rest_writer,
states=states,
warnings=warnings,
errors=errors,
recent_sessions=recent_sessions,
current_session=current_session,
)
return manifest
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Long-running Polymarket BTC websocket raw recorder with REST checkpoints.")
parser.add_argument("--config", type=Path, default=DEFAULT_CONFIG_PATH)
parser.add_argument("--duration-seconds", type=float, default=None, help="Bounded run duration for smoke tests. Default is continuous runtime.")
parser.add_argument("--market-limit", type=int, default=None, help="Limit active BTC markets for smoke tests. Use 0 for all.")
parser.add_argument("--raw-output-root", type=Path, default=None)
parser.add_argument("--manifest-dir", type=Path, default=None)
parser.add_argument("--manifest-path", type=Path, default=None)
parser.add_argument("--discovery-path", type=Path, default=None)
parser.add_argument("--discovery-dir", type=Path, default=None)
return parser.parse_args()
def main() -> int:
args = parse_args()
config = build_config(args)
command = " ".join([Path(sys.argv[0]).as_posix(), *sys.argv[1:]])
manifest = run_recorder(config, command)
print(f"WS_RECORDER_MANIFEST={config['manifest_path']}")
print(f"WS_RECORDER_RUN_ID={manifest['run_id']}")
print(f"WS_RECORDER_GATE={manifest['gate_status']}")
return 0 if manifest["gate_status"] == "PASS" else 1
if __name__ == "__main__":
raise SystemExit(main())