#!/usr/bin/env python3 """Bounded public-source probe for Polymarket Checkpoint 2. This is not a collector. It performs a small, finite set of public requests to prove which endpoints can support a future raw-first order book archive. """ from __future__ import annotations import argparse import base64 import datetime as dt import hashlib import json import os import socket import ssl import struct import sys import time import urllib.error import urllib.parse import urllib.request from pathlib import Path from typing import Any GAMMA_BASE = "https://gamma-api.polymarket.com" CLOB_BASE = "https://clob.polymarket.com" DATA_API_BASE = "https://data-api.polymarket.com" MARKET_WS_URL = "wss://ws-subscriptions-clob.polymarket.com/ws/market" DEFAULT_PROBE_JSON = Path("data/probes/polymarket_public_sources_probe_v1.json") DEFAULT_PROBE_MD = Path("data/probes/polymarket_public_sources_probe_v1.md") DEFAULT_CHECKPOINT_REPORT = Path( "reports/checkpoints/checkpoint_002_polymarket_public_sources.md" ) DEFAULT_CHECKPOINT_MANIFEST = Path( "data/manifests/checkpoint_002_polymarket_public_sources.json" ) OFFICIAL_SOURCES = [ { "name": "Fetching Markets", "url": "https://docs.polymarket.com/market-data/fetching-markets.md", "finding": "Use Gamma events with active=true&closed=false for active market discovery; events contain markets.", }, { "name": "List markets", "url": "https://docs.polymarket.com/api-reference/markets/list-markets.md", "finding": "Gamma /markets supports active/closed, slug, tag_id, condition_ids, clob_token_ids, end_date, limit, offset, and sorting parameters.", }, { "name": "Public search", "url": "https://docs.polymarket.com/api-reference/search/search-markets-events-and-profiles.md", "finding": "Gamma /public-search supports q, events_status, limit_per_type, search_tags, recurrence, and tag filters.", }, { "name": "Get order book", "url": "https://docs.polymarket.com/api-reference/market-data/get-order-book.md", "finding": "CLOB GET /book takes token_id and returns an order book summary.", }, { "name": "Get order books", "url": "https://docs.polymarket.com/api-reference/market-data/get-order-books-request-body.md", "finding": "CLOB POST /books takes an array of token_id objects and returns multiple book summaries.", }, { "name": "Market websocket", "url": "https://docs.polymarket.com/market-data/websocket/market-channel.md", "finding": "Public websocket supports market subscriptions by outcome token asset IDs.", }, { "name": "Recent trades", "url": "https://docs.polymarket.com/api-reference/core/get-trades-for-a-user-or-markets.md", "finding": "Data API GET /trades is public and can filter by condition ID in the market query parameter.", }, { "name": "Authenticated CLOB trades", "url": "https://docs.polymarket.com/api-reference/trade/get-trades.md", "finding": "CLOB GET /trades exists but requires API-key authentication, so it is not used for this public-data checkpoint.", }, { "name": "Rate limits", "url": "https://docs.polymarket.com/api-reference/rate-limits.md", "finding": "Official rate limits are documented for Gamma, Data API, CLOB market data, and websocket-adjacent endpoints.", }, ] DOCUMENTED_RATE_LIMITS = { "gamma": { "base_url": GAMMA_BASE, "general": "4,000 req / 10s", "/events": "500 req / 10s", "/markets": "300 req / 10s", "/markets + /events listing": "900 req / 10s", "/public-search": "350 req / 10s", }, "data_api": { "base_url": DATA_API_BASE, "general": "1,000 req / 10s", "/trades": "200 req / 10s", }, "clob": { "base_url": CLOB_BASE, "general": "9,000 req / 10s", "/book": "1,500 req / 10s", "/books": "500 req / 10s", "/price": "1,500 req / 10s", "/prices": "500 req / 10s", "/midpoint": "1,500 req / 10s", "/midpoints": "500 req / 10s", "/prices-history": "1,000 req / 10s", }, } SAFE_RESPONSE_HEADERS = { "age", "cache-control", "cf-cache-status", "cf-ray", "content-encoding", "content-length", "content-type", "date", "expires", "last-modified", "ratelimit-limit", "ratelimit-remaining", "ratelimit-reset", "retry-after", "server", "strict-transport-security", "x-ratelimit-limit", "x-ratelimit-remaining", "x-ratelimit-reset", } def utc_now() -> dt.datetime: return dt.datetime.now(dt.UTC) def iso_z(value: dt.datetime | None = None) -> str: value = value or utc_now() return value.astimezone(dt.UTC).replace(microsecond=0).isoformat().replace("+00:00", "Z") def parse_iso(value: Any) -> dt.datetime | None: if not value or not isinstance(value, str): return None text = value.strip() if not text: return None if text.endswith("Z"): text = text[:-1] + "+00:00" try: parsed = dt.datetime.fromisoformat(text) except ValueError: return None if parsed.tzinfo is None: parsed = parsed.replace(tzinfo=dt.UTC) return parsed.astimezone(dt.UTC) def sha256_file(path: Path) -> str: digest = hashlib.sha256() with path.open("rb") as handle: for chunk in iter(lambda: handle.read(1024 * 1024), b""): digest.update(chunk) return digest.hexdigest() def normalize_params(params: dict[str, Any] | None) -> dict[str, Any] | None: if not params: return None normalized: dict[str, Any] = {} for key, value in params.items(): if isinstance(value, bool): normalized[key] = "true" if value else "false" elif isinstance(value, list): normalized[key] = [ "true" if item is True else "false" if item is False else item for item in value ] else: normalized[key] = value return normalized def filter_headers(headers: Any) -> dict[str, str]: safe: dict[str, str] = {} for key, value in dict(headers).items(): lower = key.lower() if lower in SAFE_RESPONSE_HEADERS: safe[key] = value return safe def rate_limit_headers(headers: dict[str, str]) -> dict[str, str]: result: dict[str, str] = {} for key, value in headers.items(): lower = key.lower() if "ratelimit" in lower or lower == "retry-after": result[key] = value return result def decode_json_maybe(text: str) -> tuple[Any | None, str | None]: try: return json.loads(text), None except json.JSONDecodeError as exc: return None, str(exc) def encode_url(url: str, params: dict[str, Any] | None = None) -> str: params = normalize_params(params) if not params: return url query = urllib.parse.urlencode(params, doseq=True) separator = "&" if urllib.parse.urlparse(url).query else "?" return f"{url}{separator}{query}" def http_json_request( name: str, method: str, url: str, *, params: dict[str, Any] | None = None, json_body: Any | None = None, timeout_seconds: float = 15.0, ) -> dict[str, Any]: started_monotonic = time.monotonic() started_at = iso_z() full_url = encode_url(url, params) headers = { "Accept": "application/json", "User-Agent": "orderbooks-checkpoint-2-probe/1.0", } data = None if json_body is not None: data = json.dumps(json_body, separators=(",", ":")).encode("utf-8") headers["Content-Type"] = "application/json" request = urllib.request.Request( full_url, data=data, headers=headers, method=method.upper(), ) status_code: int | None = None response_headers: dict[str, str] = {} response_text = "" error: str | None = None try: with urllib.request.urlopen(request, timeout=timeout_seconds) as response: status_code = response.status response_headers = filter_headers(response.headers) response_text = response.read().decode("utf-8", errors="replace") except urllib.error.HTTPError as exc: status_code = exc.code response_headers = filter_headers(exc.headers) response_text = exc.read().decode("utf-8", errors="replace") error = f"HTTPError: {exc}" except Exception as exc: # noqa: BLE001 - preserve probe failure evidence error = f"{type(exc).__name__}: {exc}" duration_ms = round((time.monotonic() - started_monotonic) * 1000, 3) parsed_json, json_error = decode_json_maybe(response_text) if response_text else (None, None) return { "name": name, "started_at_utc": started_at, "ended_at_utc": iso_z(), "duration_ms": duration_ms, "request": { "method": method.upper(), "url": url, "full_url": full_url, "params": normalize_params(params), "json_body": json_body, }, "response": { "status_code": status_code, "headers": response_headers, "observed_rate_limit_headers": rate_limit_headers(response_headers), "json": parsed_json, "json_error": json_error, "text_preview": response_text[:1000] if parsed_json is None else None, }, "ok": error is None and status_code is not None and 200 <= status_code < 300, "error": error, } def coerce_json_array(value: Any) -> list[Any]: if isinstance(value, list): return value if isinstance(value, str): try: parsed = json.loads(value) except json.JSONDecodeError: return [] return parsed if isinstance(parsed, list) else [] return [] def first_market(event: dict[str, Any]) -> dict[str, Any] | None: markets = event.get("markets") if isinstance(markets, list) and markets and isinstance(markets[0], dict): return markets[0] return None def event_matches_btc_up_down(event: dict[str, Any]) -> bool: market = first_market(event) or {} outcomes = [str(item).lower() for item in coerce_json_array(market.get("outcomes"))] text = " ".join( str(event.get(key, "") or "") for key in ("title", "slug", "ticker", "seriesSlug", "description") ).lower() tag_text = " ".join( str(tag.get("slug", "") or tag.get("label", "") or "") for tag in (event.get("tags") or []) if isinstance(tag, dict) ).lower() series_slug = str(event.get("seriesSlug") or "").lower() has_btc_text = "bitcoin" in text or "btc" in text or "bitcoin" in tag_text has_up_down_text = ("up" in text and "down" in text) or "up-or-down" in tag_text has_up_down_outcomes = set(outcomes) == {"up", "down"} return bool( (series_slug.startswith("btc-up-or-down") or (has_btc_text and has_up_down_text)) and has_up_down_outcomes ) def candidate_record(event: dict[str, Any], now: dt.datetime, min_lead_seconds: int) -> dict[str, Any] | None: market = first_market(event) if not market: return None token_ids = [str(item) for item in coerce_json_array(market.get("clobTokenIds"))] outcomes = [str(item) for item in coerce_json_array(market.get("outcomes"))] if len(token_ids) < 1 or not outcomes: return None event_end = parse_iso(event.get("endDate")) market_end = parse_iso(market.get("endDate")) candidate_end = market_end or event_end has_future_lead = bool(candidate_end and candidate_end >= now + dt.timedelta(seconds=min_lead_seconds)) accepting_orders = market.get("acceptingOrders") is True active_open = ( event.get("active") is True and event.get("closed") is False and market.get("active") is True and market.get("closed") is False ) score = 0 score += 1000 if accepting_orders else 0 score += 500 if has_future_lead else 0 score += 200 if active_open else 0 score += 100 if str(event.get("seriesSlug") or "").startswith("btc-up-or-down") else 0 score += 50 if len(token_ids) >= 2 and len(outcomes) >= 2 else 0 if not active_open: score -= 1000 if candidate_end and candidate_end < now: score -= 250 return { "score": score, "event": event, "market": market, "event_end_utc": iso_z(event_end) if event_end else None, "market_end_utc": iso_z(market_end) if market_end else None, "has_future_lead": has_future_lead, "accepting_orders": accepting_orders, "active_open": active_open, "token_ids": token_ids, "outcomes": outcomes, } def select_btc_up_down_market( events: list[dict[str, Any]], now: dt.datetime, min_lead_seconds: int ) -> tuple[dict[str, Any] | None, list[dict[str, Any]]]: candidates = [] for event in events: if not isinstance(event, dict) or not event_matches_btc_up_down(event): continue record = candidate_record(event, now, min_lead_seconds) if record: candidates.append(record) def sort_key(record: dict[str, Any]) -> tuple[int, float]: end = parse_iso(record["market"].get("endDate")) or parse_iso(record["event"].get("endDate")) end_ts = end.timestamp() if end else float("inf") return (-int(record["score"]), end_ts) candidates.sort(key=sort_key) summarized = [ summarize_candidate(candidate, include_tokens=False) for candidate in candidates[:20] ] return (candidates[0] if candidates else None), summarized def summarize_candidate(candidate: dict[str, Any], include_tokens: bool = True) -> dict[str, Any]: event = candidate["event"] market = candidate["market"] summary = { "score": candidate["score"], "event_id": event.get("id"), "event_slug": event.get("slug"), "event_title": event.get("title"), "event_end_utc": candidate.get("event_end_utc"), "event_active": event.get("active"), "event_closed": event.get("closed"), "series_slug": event.get("seriesSlug"), "market_id": market.get("id"), "market_slug": market.get("slug"), "condition_id": market.get("conditionId"), "market_end_utc": candidate.get("market_end_utc"), "market_active": market.get("active"), "market_closed": market.get("closed"), "accepting_orders": market.get("acceptingOrders"), "enable_order_book": market.get("enableOrderBook"), "outcomes": candidate.get("outcomes"), "has_future_lead": candidate.get("has_future_lead"), } if include_tokens: summary["clob_token_ids"] = candidate.get("token_ids") return summary def field_names(payload: Any) -> list[str]: keys: set[str] = set() if isinstance(payload, dict): keys.update(str(key) for key in payload.keys()) elif isinstance(payload, list): for item in payload[:10]: if isinstance(item, dict): keys.update(str(key) for key in item.keys()) return sorted(keys) def nested_field_names(payload: Any, key: str) -> list[str]: values: list[Any] = [] if isinstance(payload, dict): candidate = payload.get(key) if isinstance(candidate, list): values.extend(candidate[:10]) elif isinstance(payload, list): for item in payload[:10]: if isinstance(item, dict) and isinstance(item.get(key), list): values.extend(item[key][:10]) keys: set[str] = set() for item in values: if isinstance(item, dict): keys.update(str(field) for field in item.keys()) return sorted(keys) def send_ws_frame(sock: ssl.SSLSocket, opcode: int, payload: bytes) -> None: mask = os.urandom(4) header = bytearray([0x80 | opcode]) length = len(payload) if length < 126: header.append(0x80 | length) elif length < 65536: header.append(0x80 | 126) header.extend(struct.pack("!H", length)) else: header.append(0x80 | 127) header.extend(struct.pack("!Q", length)) masked = bytes(byte ^ mask[index % 4] for index, byte in enumerate(payload)) sock.sendall(header + mask + masked) def read_exact(sock: ssl.SSLSocket, length: int) -> bytes: data = bytearray() while len(data) < length: chunk = sock.recv(length - len(data)) if not chunk: raise EOFError("websocket connection closed while reading frame") data.extend(chunk) return bytes(data) def read_ws_frame(sock: ssl.SSLSocket) -> tuple[int, bytes]: first, second = read_exact(sock, 2) opcode = first & 0x0F length = second & 0x7F masked = bool(second & 0x80) if length == 126: length = struct.unpack("!H", read_exact(sock, 2))[0] elif length == 127: length = struct.unpack("!Q", read_exact(sock, 8))[0] mask = read_exact(sock, 4) if masked else b"" payload = read_exact(sock, length) if length else b"" if masked: payload = bytes(byte ^ mask[index % 4] for index, byte in enumerate(payload)) return opcode, payload def parse_ws_headers(raw_headers: str) -> tuple[str, dict[str, str]]: lines = raw_headers.split("\r\n") status_line = lines[0] if lines else "" headers: dict[str, str] = {} for line in lines[1:]: if ":" not in line: continue key, value = line.split(":", 1) headers[key.strip()] = value.strip() return status_line, filter_headers(headers) def classify_ws_payload(payload: Any) -> list[str]: event_types: list[str] = [] items = payload if isinstance(payload, list) else [payload] for item in items: if not isinstance(item, dict): continue event_type = item.get("event_type") if event_type: event_types.append(str(event_type)) elif {"market", "asset_id", "bids", "asks", "timestamp"}.issubset(item.keys()): event_types.append("book_without_event_type") else: event_types.append("unknown_object") return event_types def websocket_probe( url: str, token_ids: list[str], *, timeout_seconds: float, max_messages: int, ) -> dict[str, Any]: started_monotonic = time.monotonic() started_at = iso_z() parsed = urllib.parse.urlparse(url) host = parsed.hostname if not host: return {"ok": False, "error": "missing websocket host"} port = parsed.port or 443 path = parsed.path or "/" if parsed.query: path = f"{path}?{parsed.query}" subscription = { "assets_ids": token_ids, "type": "market", "custom_feature_enabled": True, } result: dict[str, Any] = { "name": "clob_market_websocket", "started_at_utc": started_at, "request": { "url": url, "subscription": subscription, "max_messages": max_messages, "timeout_seconds": timeout_seconds, }, "handshake": {}, "messages": [], "message_event_types": [], "ok": False, "error": None, } sock: ssl.SSLSocket | None = None try: raw_sock = socket.create_connection((host, port), timeout=timeout_seconds) sock = ssl.create_default_context().wrap_socket(raw_sock, server_hostname=host) sock.settimeout(timeout_seconds) key = base64.b64encode(os.urandom(16)).decode("ascii") request = ( f"GET {path} HTTP/1.1\r\n" f"Host: {host}\r\n" "Upgrade: websocket\r\n" "Connection: Upgrade\r\n" f"Sec-WebSocket-Key: {key}\r\n" "Sec-WebSocket-Version: 13\r\n" "User-Agent: orderbooks-checkpoint-2-probe/1.0\r\n" "\r\n" ) sock.sendall(request.encode("ascii")) raw_headers = bytearray() while b"\r\n\r\n" not in raw_headers: raw_headers.extend(sock.recv(4096)) if len(raw_headers) > 65536: raise ValueError("websocket handshake headers exceeded 64 KiB") header_text = bytes(raw_headers).split(b"\r\n\r\n", 1)[0].decode( "iso-8859-1", errors="replace" ) status_line, response_headers = parse_ws_headers(header_text) result["handshake"] = { "status_line": status_line, "headers": response_headers, "observed_rate_limit_headers": rate_limit_headers(response_headers), } if " 101 " not in status_line: raise ValueError(f"websocket upgrade failed: {status_line}") send_ws_frame(sock, 0x1, json.dumps(subscription).encode("utf-8")) deadline = time.monotonic() + timeout_seconds while time.monotonic() < deadline and len(result["messages"]) < max_messages: remaining = max(0.1, deadline - time.monotonic()) sock.settimeout(remaining) opcode, payload_bytes = read_ws_frame(sock) if opcode == 0x8: result["messages"].append({"opcode": opcode, "close": True}) break if opcode == 0x9: send_ws_frame(sock, 0xA, payload_bytes) continue if opcode != 0x1: result["messages"].append( {"opcode": opcode, "payload_length_bytes": len(payload_bytes)} ) continue text = payload_bytes.decode("utf-8", errors="replace") parsed_payload, json_error = decode_json_maybe(text) event_types = classify_ws_payload(parsed_payload) result["message_event_types"].extend(event_types) result["messages"].append( { "opcode": opcode, "payload_length_bytes": len(payload_bytes), "json": parsed_payload, "json_error": json_error, "event_types": event_types, "text_preview": None if parsed_payload is not None else text[:1000], } ) result["ok"] = bool(result["messages"]) except Exception as exc: # noqa: BLE001 - preserve probe failure evidence result["error"] = f"{type(exc).__name__}: {exc}" finally: if sock is not None: try: send_ws_frame(sock, 0x8, b"") except Exception: pass try: sock.close() except Exception: pass result["ended_at_utc"] = iso_z() result["duration_ms"] = round((time.monotonic() - started_monotonic) * 1000, 3) result["message_event_types"] = sorted(set(result["message_event_types"])) return result def request_json_payload(record: dict[str, Any]) -> Any: return (record.get("response") or {}).get("json") def top_level_field_summary(requests: dict[str, dict[str, Any]], websocket: dict[str, Any]) -> dict[str, Any]: summary: dict[str, Any] = {} for name, record in requests.items(): payload = request_json_payload(record) summary[name] = { "top_level_fields": field_names(payload), "bid_ask_level_fields": sorted( set(nested_field_names(payload, "bids") + nested_field_names(payload, "asks")) ), } ws_fields: set[str] = set() ws_level_fields: set[str] = set() for message in websocket.get("messages", []): payload = message.get("json") ws_fields.update(field_names(payload)) ws_level_fields.update(nested_field_names(payload, "bids")) ws_level_fields.update(nested_field_names(payload, "asks")) if isinstance(payload, dict) and isinstance(payload.get("price_changes"), list): for change in payload["price_changes"][:10]: if isinstance(change, dict): ws_level_fields.update(str(key) for key in change.keys()) summary["clob_market_websocket"] = { "top_level_fields": sorted(ws_fields), "nested_level_or_change_fields": sorted(ws_level_fields), "event_types_observed": websocket.get("message_event_types", []), } return summary def build_endpoint_findings( selected: dict[str, Any], requests: dict[str, dict[str, Any]], websocket: dict[str, Any], ) -> dict[str, Any]: market = selected["market"] outcomes = selected["outcomes"] token_ids = selected["token_ids"] outcome_tokens = [ {"outcome": outcome, "token_id": token_ids[index] if index < len(token_ids) else None} for index, outcome in enumerate(outcomes) ] book_payload = request_json_payload(requests["clob_get_book"]) books_payload = request_json_payload(requests["clob_post_books"]) trades_payload = request_json_payload(requests["data_api_recent_trades"]) return { "active_market_discovery": { "endpoint": f"{GAMMA_BASE}/events", "method": "GET", "params_used": requests["gamma_events_bitcoin_tag"]["request"]["params"], "answer": "Use Gamma /events with active=true&closed=false and pagination; events include their markets. /markets can fetch individual market records by slug, condition_ids, or clob_token_ids.", }, "btc_up_down_filtering": { "answer": "Filter Gamma events/markets by Bitcoin tag evidence, seriesSlug beginning btc-up-or-down, text containing BTC/Bitcoin plus Up/Down, and market outcomes exactly ['Up', 'Down'].", "source_fields": [ "event.seriesSlug", "event.tags[].slug or label", "event.title", "event.slug", "market.outcomes", "market.clobTokenIds", ], }, "condition_and_token_resolution": { "condition_id_field": "market.conditionId", "outcomes_field": "market.outcomes", "token_ids_field": "market.clobTokenIds", "mapping_rule": "Parse outcomes and clobTokenIds as arrays and map by index.", "selected_condition_id": market.get("conditionId"), "selected_outcome_tokens": outcome_tokens, }, "single_order_book": { "endpoint": f"{CLOB_BASE}/book", "method": "GET", "params": {"token_id": token_ids[0]}, "status_code": requests["clob_get_book"]["response"]["status_code"], "field_count": len(field_names(book_payload)), "bid_levels": len(book_payload.get("bids", [])) if isinstance(book_payload, dict) else None, "ask_levels": len(book_payload.get("asks", [])) if isinstance(book_payload, dict) else None, }, "batch_order_books": { "endpoint": f"{CLOB_BASE}/books", "method": "POST", "json_body_shape": [{"token_id": ""}], "status_code": requests["clob_post_books"]["response"]["status_code"], "book_count": len(books_payload) if isinstance(books_payload, list) else None, }, "market_websocket": { "endpoint": MARKET_WS_URL, "subscription_shape": { "assets_ids": ["", ""], "type": "market", "custom_feature_enabled": True, }, "probe_ok": websocket.get("ok"), "message_count": len(websocket.get("messages", [])), "event_types_observed": websocket.get("message_event_types", []), "note": "Initial observed snapshot may arrive as a JSON array of book objects without event_type, followed by event-typed updates.", }, "trades": { "public_recent_trades_endpoint": f"{DATA_API_BASE}/trades", "public_recent_trades_method": "GET", "params": {"market": market.get("conditionId"), "limit": 10, "offset": 0}, "public_recent_trades_status_code": requests["data_api_recent_trades"]["response"]["status_code"], "public_recent_trade_count": len(trades_payload) if isinstance(trades_payload, list) else None, "websocket_trade_event": "market websocket documents and can emit last_trade_price for subscribed assets", "excluded_endpoint": "CLOB GET /trades requires readonly or level 2 API key authentication and was not used.", }, "rate_limits": { "documented": DOCUMENTED_RATE_LIMITS, "observed_headers": { name: record["response"].get("observed_rate_limit_headers", {}) for name, record in requests.items() } | { "clob_market_websocket": websocket.get("handshake", {}).get( "observed_rate_limit_headers", {} ) }, "observed_note": "The bounded probe did not intentionally approach limits; absence of rate-limit headers is not a limit test.", }, "timestamps": { "gamma_metadata": [ "startDate", "creationDate", "endDate", "createdAt", "updatedAt", "acceptingOrdersTimestamp", "eventStartTime", ], "clob_book": "timestamp string in order-book payload; observed as Unix epoch milliseconds.", "market_websocket": "timestamp string in websocket book/price/trade updates; observed as Unix epoch milliseconds.", "data_api_trades": "timestamp integer in recent trade payload; observed as Unix epoch seconds.", "probe_metadata": ["started_at_utc", "ended_at_utc", "duration_ms"], }, } def markdown_table_row(values: list[Any]) -> str: return "| " + " | ".join(str(value).replace("\n", " ") for value in values) + " |" def write_probe_markdown(probe: dict[str, Any], path: Path) -> None: selected = probe["selected_market"] gate = probe["gate"] endpoint_findings = probe["endpoint_findings"] validation = probe["validation_summary"] lines = [ "# Polymarket Public Sources Probe v1", "", f"Artifact status: `{probe['artifact_status']}`", "", "## Gate", "", f"Status: `{gate['status']}`", "", gate["reason"], "", "## Scope", "", "Bounded public endpoint probe only. No collector, no trading, no private endpoints, no secrets.", "", "## Selected Market", "", markdown_table_row(["Field", "Value"]), markdown_table_row(["---", "---"]), markdown_table_row(["event_slug", selected.get("event_slug")]), markdown_table_row(["event_title", selected.get("event_title")]), markdown_table_row(["series_slug", selected.get("series_slug")]), markdown_table_row(["market_id", selected.get("market_id")]), markdown_table_row(["condition_id", selected.get("condition_id")]), markdown_table_row(["market_end_utc", selected.get("market_end_utc")]), markdown_table_row(["accepting_orders", selected.get("accepting_orders")]), markdown_table_row(["outcomes", json.dumps(selected.get("outcomes"))]), markdown_table_row(["clob_token_ids", json.dumps(selected.get("clob_token_ids"))]), "", "## Questions Answered", "", markdown_table_row(["Question", "Answer"]), markdown_table_row(["---", "---"]), markdown_table_row( [ "How are active markets discovered?", endpoint_findings["active_market_discovery"]["answer"], ] ), markdown_table_row( [ "How can BTC up/down markets be filtered?", endpoint_findings["btc_up_down_filtering"]["answer"], ] ), markdown_table_row( [ "How are conditionId and token IDs resolved?", endpoint_findings["condition_and_token_resolution"]["mapping_rule"], ] ), markdown_table_row( [ "How is the current order book fetched?", f"GET {CLOB_BASE}/book?token_id=", ] ), markdown_table_row( [ "Is there a batch order-book endpoint?", f"Yes: POST {CLOB_BASE}/books with an array of token_id objects.", ] ), markdown_table_row( [ "Is there a market websocket?", f"Yes: {MARKET_WS_URL}; bounded probe ok={endpoint_findings['market_websocket']['probe_ok']}.", ] ), markdown_table_row( [ "Is there a trade websocket or recent trades endpoint?", f"Market websocket can emit last_trade_price; public recent trades are at GET {DATA_API_BASE}/trades?market=.", ] ), markdown_table_row( [ "What rate limits are documented or observed?", "Official docs list Gamma, Data API, and CLOB limits; this bounded probe observed no Retry-After or rate-limit headers.", ] ), markdown_table_row( [ "What fields are returned?", "See field summary in the JSON artifact; key fields include conditionId, outcomes, clobTokenIds, bids, asks, timestamp, hash, price, size.", ] ), markdown_table_row( [ "What timestamps exist?", "Gamma ISO date fields, CLOB/websocket epoch-millisecond strings, Data API trade epoch seconds, and probe request timestamps.", ] ), "", "## Endpoint Evidence", "", markdown_table_row(["Name", "Method", "URL", "Status", "Duration ms"]), markdown_table_row(["---", "---", "---", "---", "---"]), ] for name, record in probe["requests"].items(): request = record["request"] response = record["response"] lines.append( markdown_table_row( [ name, request["method"], request["full_url"], response["status_code"], record["duration_ms"], ] ) ) ws = probe["websocket_probe"] lines.extend( [ markdown_table_row( [ "clob_market_websocket", "WSS", MARKET_WS_URL, ws.get("handshake", {}).get("status_line"), ws.get("duration_ms"), ] ), "", "## Field Summary", "", "The full raw JSON payloads and websocket messages are preserved in the JSON probe artifact.", "", "```json", json.dumps(probe["field_summary"], indent=2, sort_keys=True), "```", "", "## Rate Limits", "", "Documented limits from official docs:", "", "```json", json.dumps(DOCUMENTED_RATE_LIMITS, indent=2, sort_keys=True), "```", "", "Observed rate-limit headers in this bounded run:", "", "```json", json.dumps(endpoint_findings["rate_limits"]["observed_headers"], indent=2, sort_keys=True), "```", "", "## Validation Evidence", "", markdown_table_row(["Check", "Result"]), markdown_table_row(["---", "---"]), markdown_table_row(["market_metadata_fetched", validation["market_metadata_fetched"]]), markdown_table_row(["single_order_book_fetched", validation["single_order_book_fetched"]]), markdown_table_row(["batch_order_books_fetched", validation["batch_order_books_fetched"]]), markdown_table_row(["recent_trades_checked", validation["recent_trades_checked"]]), markdown_table_row(["websocket_checked", validation["websocket_checked"]]), "", "## Official Sources", "", ] ) for source in OFFICIAL_SOURCES: lines.append(f"- [{source['name']}]({source['url']}): {source['finding']}") lines.extend( [ "", "## Strongest Fake-Progress Risk", "", probe["fake_progress_risk"], "", "## Next Smallest Step", "", probe["next_step"], "", ] ) path.parent.mkdir(parents=True, exist_ok=True) path.write_text("\n".join(lines), encoding="utf-8") def write_checkpoint_report(probe: dict[str, Any], path: Path) -> None: gate = probe["gate"] files = [ "scripts/probe_polymarket_public_sources.py", str(DEFAULT_PROBE_JSON), str(DEFAULT_PROBE_MD), str(DEFAULT_CHECKPOINT_MANIFEST), str(DEFAULT_CHECKPOINT_REPORT), ] lines = [ "# Checkpoint 2: Polymarket Public Data Source Probe", "", f"Gate: {gate['status']}", "", f"Started at UTC: {probe['started_at_utc']}", f"Ended at UTC: {probe['ended_at_utc']}", "", "## Scope", "", "Built a bounded public Polymarket source probe. Explicitly excluded collector implementation, polling, dashboards, databases, trading, order placement, wallet logic, private endpoints, and secrets.", "", "## Files Created Or Changed", "", markdown_table_row(["Path", "Kind", "Status"]), markdown_table_row(["---", "---", "---"]), ] kind_by_path = { "scripts/probe_polymarket_public_sources.py": "bounded probe script", str(DEFAULT_PROBE_JSON): "raw probe evidence", str(DEFAULT_PROBE_MD): "probe report", str(DEFAULT_CHECKPOINT_MANIFEST): "checkpoint manifest", str(DEFAULT_CHECKPOINT_REPORT): "checkpoint report", } for file_path in files: lines.append(markdown_table_row([file_path, kind_by_path[file_path], "valid"])) lines.extend( [ "", "## Validation", "", "Commands run by the builder:", "", "```sh", probe["command"], "```", "", f"Result: {gate['status']} - {gate['reason']}", "", "Evidence summary:", "", markdown_table_row(["Evidence", "Result"]), markdown_table_row(["---", "---"]), markdown_table_row(["selected_condition_id", probe["selected_market"].get("condition_id")]), markdown_table_row(["selected_tokens", json.dumps(probe["selected_market"].get("clob_token_ids"))]), markdown_table_row(["GET /book status", probe["requests"]["clob_get_book"]["response"]["status_code"]]), markdown_table_row(["POST /books status", probe["requests"]["clob_post_books"]["response"]["status_code"]]), markdown_table_row(["GET /trades status", probe["requests"]["data_api_recent_trades"]["response"]["status_code"]]), markdown_table_row(["websocket ok", probe["websocket_probe"].get("ok")]), "", "## Endpoint Findings", "", "- Active discovery: Gamma `GET /events?active=true&closed=false`, with pagination. Events include market records.", "- BTC up/down filtering: Bitcoin tag plus `seriesSlug` beginning `btc-up-or-down`, text containing BTC/Bitcoin and Up/Down, and outcomes `Up`/`Down`.", "- Token resolution: parse market `outcomes` and `clobTokenIds`, then map by index; condition ID is `conditionId`.", "- Single book: CLOB `GET /book?token_id=`.", "- Batch books: CLOB `POST /books` with `[{'token_id': ''}, ...]`.", "- Market websocket: public `wss://ws-subscriptions-clob.polymarket.com/ws/market` subscription by `assets_ids`.", "- Recent trades: public Data API `GET /trades?market=`; authenticated CLOB `GET /trades` was excluded.", "", "## Strongest Fake-Progress Risk", "", probe["fake_progress_risk"], "", "## Next Smallest Step", "", probe["next_step"], "", ] ) path.parent.mkdir(parents=True, exist_ok=True) path.write_text("\n".join(lines), encoding="utf-8") def write_checkpoint_manifest( probe: dict[str, Any], path: Path, artifacts: list[dict[str, Any]], ) -> None: manifest = { "schema_name": "checkpoint_manifest", "schema_version": 1, "checkpoint_id": 2, "checkpoint_name": "Polymarket Public Data Source Probe", "status": probe["gate"]["status"], "started_at_utc": probe["started_at_utc"], "ended_at_utc": probe["ended_at_utc"], "scope": "Bounded public endpoint probe only; no collector, trading, wallet, private endpoint, database, dashboard, or generic multi-market implementation.", "artifacts": artifacts, "validation": { "commands": [ { "command": probe["command"], "result": "exit_code_0" if probe["gate"]["status"] == "PASS" else "completed", "summary": probe["gate"]["reason"], } ], "summary": probe["validation_summary"], }, "decisions": [ { "decision": "Use Gamma events plus market records for discovery instead of adding a generic discovery framework.", "reason": "Checkpoint 2 only needs source identification; Checkpoint 3 can turn this into a small BTC discovery script.", }, { "decision": "Use public CLOB market-data endpoints and public Data API trades; exclude authenticated CLOB trade endpoints.", "reason": "Project rules require public data only and no secrets.", }, ], "assumptions": [ "Gamma market outcomes and clobTokenIds arrays align by index.", "CLOB/websocket order-book timestamps observed as epoch milliseconds should be preserved raw until later normalization confirms semantics.", "Data API public trade timestamps observed as epoch seconds should be preserved raw until later normalization confirms semantics.", ], "fake_progress_risk": probe["fake_progress_risk"], "next_step": probe["next_step"], } path.parent.mkdir(parents=True, exist_ok=True) path.write_text(json.dumps(manifest, indent=2, sort_keys=True) + "\n", encoding="utf-8") def build_probe(args: argparse.Namespace) -> dict[str, Any]: started_at = iso_z() now = utc_now() command = " ".join([Path(sys.argv[0]).as_posix(), *sys.argv[1:]]) or str(sys.argv[0]) requests: dict[str, dict[str, Any]] = {} requests["gamma_events_bitcoin_tag"] = http_json_request( "gamma_events_bitcoin_tag", "GET", f"{GAMMA_BASE}/events", params={ "tag_id": 235, "related_tags": True, "active": True, "closed": False, "limit": args.events_limit, "order": "endDate", "ascending": True, }, timeout_seconds=args.http_timeout, ) requests["gamma_public_search_btc_up_down"] = http_json_request( "gamma_public_search_btc_up_down", "GET", f"{GAMMA_BASE}/public-search", params={ "q": "bitcoin up or down", "events_status": "active", "limit_per_type": args.search_limit, "keep_closed_markets": 0, "search_tags": True, }, timeout_seconds=args.http_timeout, ) event_sources: list[dict[str, Any]] = [] events_payload = request_json_payload(requests["gamma_events_bitcoin_tag"]) if isinstance(events_payload, list): event_sources.extend(events_payload) search_payload = request_json_payload(requests["gamma_public_search_btc_up_down"]) if isinstance(search_payload, dict) and isinstance(search_payload.get("events"), list): event_sources.extend(search_payload["events"]) selected, candidates = select_btc_up_down_market( event_sources, now, args.min_future_lead_seconds ) if selected is None: raise RuntimeError("No BTC up/down market candidate with CLOB token IDs found") selected_summary = summarize_candidate(selected) market_slug = selected["market"].get("slug") or selected["event"].get("slug") requests["gamma_market_by_slug"] = http_json_request( "gamma_market_by_slug", "GET", f"{GAMMA_BASE}/markets", params={"slug": market_slug}, timeout_seconds=args.http_timeout, ) token_ids = selected["token_ids"] condition_id = selected["market"].get("conditionId") requests["clob_get_book"] = http_json_request( "clob_get_book", "GET", f"{CLOB_BASE}/book", params={"token_id": token_ids[0]}, timeout_seconds=args.http_timeout, ) requests["clob_post_books"] = http_json_request( "clob_post_books", "POST", f"{CLOB_BASE}/books", json_body=[{"token_id": token_id} for token_id in token_ids[:2]], timeout_seconds=args.http_timeout, ) requests["data_api_recent_trades"] = http_json_request( "data_api_recent_trades", "GET", f"{DATA_API_BASE}/trades", params={"market": condition_id, "limit": args.trades_limit, "offset": 0}, timeout_seconds=args.http_timeout, ) if args.skip_websocket: ws_probe = { "name": "clob_market_websocket", "started_at_utc": iso_z(), "ended_at_utc": iso_z(), "duration_ms": 0, "request": { "url": MARKET_WS_URL, "subscription": { "assets_ids": token_ids[:2], "type": "market", "custom_feature_enabled": True, }, "max_messages": args.websocket_messages, "timeout_seconds": args.websocket_timeout, }, "handshake": {}, "messages": [], "message_event_types": [], "ok": False, "error": "Skipped by --skip-websocket", } else: ws_probe = websocket_probe( MARKET_WS_URL, token_ids[:2], timeout_seconds=args.websocket_timeout, max_messages=args.websocket_messages, ) market_payload = request_json_payload(requests["gamma_market_by_slug"]) book_payload = request_json_payload(requests["clob_get_book"]) books_payload = request_json_payload(requests["clob_post_books"]) trades_payload = request_json_payload(requests["data_api_recent_trades"]) validation_summary = { "market_metadata_fetched": bool( requests["gamma_market_by_slug"]["ok"] and isinstance(market_payload, list) and len(market_payload) >= 1 ), "single_order_book_fetched": bool( requests["clob_get_book"]["ok"] and isinstance(book_payload, dict) and book_payload.get("asset_id") and isinstance(book_payload.get("bids"), list) and isinstance(book_payload.get("asks"), list) ), "batch_order_books_fetched": bool( requests["clob_post_books"]["ok"] and isinstance(books_payload, list) and len(books_payload) >= 1 ), "recent_trades_checked": bool( requests["data_api_recent_trades"]["ok"] and isinstance(trades_payload, list) ), "websocket_checked": bool(ws_probe.get("ok")), } pass_condition_met = ( validation_summary["market_metadata_fetched"] and validation_summary["single_order_book_fetched"] ) gate_status = "PASS" if pass_condition_met else "FAIL" reason = ( "Fetched at least one active market metadata record and one current CLOB order book." if pass_condition_met else "Did not fetch both required active market metadata and current order book evidence." ) probe: dict[str, Any] = { "schema_name": "polymarket_public_sources_probe", "schema_version": 1, "artifact_status": "valid" if pass_condition_met else "partial", "checkpoint_id": 2, "checkpoint_name": "Polymarket Public Data Source Probe", "started_at_utc": started_at, "ended_at_utc": iso_z(), "command": command, "scope": "Bounded public endpoint probe only; no collector implementation.", "official_sources": OFFICIAL_SOURCES, "selected_market": selected_summary, "candidate_markets_considered": candidates, "requests": requests, "websocket_probe": ws_probe, "validation_summary": validation_summary, "gate": {"status": gate_status, "reason": reason}, "fake_progress_risk": "Mistaking one successful short probe for a reliable collector. This checkpoint only proves endpoint availability and payload shape at probe time.", "next_step": "Checkpoint 3: build a small BTC market discovery script that reliably outputs current active BTC up/down markets with condition IDs and both outcome token IDs.", } probe["field_summary"] = top_level_field_summary(requests, ws_probe) probe["endpoint_findings"] = build_endpoint_findings(selected, requests, ws_probe) return probe def write_outputs(args: argparse.Namespace, probe: dict[str, Any]) -> None: args.output_json.parent.mkdir(parents=True, exist_ok=True) args.output_json.write_text( json.dumps(probe, indent=2, sort_keys=True) + "\n", encoding="utf-8" ) write_probe_markdown(probe, args.output_markdown) write_checkpoint_report(probe, args.checkpoint_report) artifact_paths = [ ("scripts/probe_polymarket_public_sources.py", "bounded_probe_script"), (args.output_json.as_posix(), "raw_probe_evidence"), (args.output_markdown.as_posix(), "probe_report"), (args.checkpoint_report.as_posix(), "checkpoint_report"), ] artifacts = [] for artifact_path, kind in artifact_paths: path = Path(artifact_path) artifacts.append( { "path": artifact_path, "kind": kind, "status": "valid" if path.exists() and path.stat().st_size > 0 else "missing", "sha256": sha256_file(path) if path.exists() and path.is_file() else None, } ) artifacts.append( { "path": args.checkpoint_manifest.as_posix(), "kind": "checkpoint_manifest", "status": "valid", } ) write_checkpoint_manifest(probe, args.checkpoint_manifest, artifacts) def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser( description="Probe public Polymarket data sources for Checkpoint 2." ) parser.add_argument("--output-json", type=Path, default=DEFAULT_PROBE_JSON) parser.add_argument("--output-markdown", type=Path, default=DEFAULT_PROBE_MD) parser.add_argument("--checkpoint-report", type=Path, default=DEFAULT_CHECKPOINT_REPORT) parser.add_argument( "--checkpoint-manifest", type=Path, default=DEFAULT_CHECKPOINT_MANIFEST ) parser.add_argument("--events-limit", type=int, default=100) parser.add_argument("--search-limit", type=int, default=20) parser.add_argument("--trades-limit", type=int, default=10) parser.add_argument("--http-timeout", type=float, default=15.0) parser.add_argument("--websocket-timeout", type=float, default=8.0) parser.add_argument("--websocket-messages", type=int, default=3) parser.add_argument("--min-future-lead-seconds", type=int, default=60) parser.add_argument("--skip-websocket", action="store_true") return parser.parse_args() def main() -> int: args = parse_args() probe = build_probe(args) write_outputs(args, probe) print( json.dumps( { "status": probe["gate"]["status"], "output_json": args.output_json.as_posix(), "output_markdown": args.output_markdown.as_posix(), "checkpoint_report": args.checkpoint_report.as_posix(), "checkpoint_manifest": args.checkpoint_manifest.as_posix(), "selected_market": probe["selected_market"], "validation_summary": probe["validation_summary"], }, indent=2, sort_keys=True, ) ) return 0 if probe["gate"]["status"] == "PASS" else 1 if __name__ == "__main__": raise SystemExit(main())