From 561e222250df674c5ea079eb321529072503a6aa Mon Sep 17 00:00:00 2001 From: philipp Date: Sun, 19 Apr 2026 21:43:13 +0200 Subject: [PATCH] Harden websocket recorder reconnect reliability --- config/polymarket_ws_collector.example.yaml | 4 +- deploy/k8s/base/configmap.yaml | 4 +- docs/KUBERNETES_DEPLOYMENT.md | 18 +- docs/POLYMARKET_WEBSOCKET_RECORDER.md | 16 ++ scripts/collect_polymarket_ws_orderbooks.py | 209 ++++++++++++++++++-- scripts/k8s_ws_reliability_check.sh | 190 ++++++++++++++++++ 6 files changed, 420 insertions(+), 21 deletions(-) create mode 100755 scripts/k8s_ws_reliability_check.sh diff --git a/config/polymarket_ws_collector.example.yaml b/config/polymarket_ws_collector.example.yaml index cb000e9..960a91e 100644 --- a/config/polymarket_ws_collector.example.yaml +++ b/config/polymarket_ws_collector.example.yaml @@ -27,11 +27,13 @@ market_end_safety_seconds: 420 rest_checkpoint_interval_seconds: 60 rest_batch_size: 50 top_n: 10 -stale_feed_threshold_seconds: 30 +first_message_timeout_seconds: 90 +stale_feed_threshold_seconds: 90 request_timeout_seconds: 15 websocket_timeout_seconds: 10 reconnect_backoff_seconds: 3 max_reconnect_backoff_seconds: 60 +max_consecutive_stale_reconnects_before_discovery_refresh: 3 manifest_write_interval_seconds: 300 # Continuous by default. Set via CLI or env ORDERBOOKS_WS_DURATION_SECONDS for smoke tests. diff --git a/deploy/k8s/base/configmap.yaml b/deploy/k8s/base/configmap.yaml index 1244dd2..35a8866 100644 --- a/deploy/k8s/base/configmap.yaml +++ b/deploy/k8s/base/configmap.yaml @@ -46,10 +46,12 @@ data: rest_checkpoint_interval_seconds: 60 rest_batch_size: 50 top_n: 10 - stale_feed_threshold_seconds: 30 + first_message_timeout_seconds: 90 + stale_feed_threshold_seconds: 90 request_timeout_seconds: 15 websocket_timeout_seconds: 10 reconnect_backoff_seconds: 3 max_reconnect_backoff_seconds: 60 + max_consecutive_stale_reconnects_before_discovery_refresh: 3 manifest_write_interval_seconds: 60 duration_seconds: null diff --git a/docs/KUBERNETES_DEPLOYMENT.md b/docs/KUBERNETES_DEPLOYMENT.md index 1af44f9..11ec7e2 100644 --- a/docs/KUBERNETES_DEPLOYMENT.md +++ b/docs/KUBERNETES_DEPLOYMENT.md @@ -159,7 +159,7 @@ for recovery and divergence evidence. The script and example config default to `market_limit: 0`, which means all discovered active BTC Up/Down markets. The Kubernetes canary config currently -sets `market_limit: 2` and `manifest_write_interval_seconds: 60` as explicit +sets `market_limit: 2`, `manifest_write_interval_seconds: 60`, `first_message_timeout_seconds: 90`, and `stale_feed_threshold_seconds: 90` as explicit smoke/safety settings. The 10D local bounded run wrote about 3.35 MB of compressed websocket data in two minutes for two markets; running all active BTC markets on the current 10Gi PVC needs a separate sizing @@ -218,6 +218,22 @@ The smoke gate uses `kubectl`, not systemd. It writes local JSON evidence under A failed smoke run still writes JSON evidence and exits nonzero. Preserve failed manifests, raw files, upload manifests, and pod logs for review. + +## Websocket Reliability Observation + +After deploying a websocket recorder reliability fix, run a read-only bounded +observation before treating the canary as unattended: + +```sh +KUBECONFIG=../nuri/unrip3/.state/hetzner/kubeconfig.yaml \ + scripts/k8s_ws_reliability_check.sh --wait-seconds 1800 +``` + +The observation fails if websocket message counts and archive mtimes do not +advance while active tokens exist, if REST checkpoints stop succeeding, if parse +errors appear, or if reconnect/stale counters grow rapidly without recovery. It +also records the REST collector image/readiness before and after the observation. + ## Not Included - No trading, signing, wallets, private keys, or API keys. diff --git a/docs/POLYMARKET_WEBSOCKET_RECORDER.md b/docs/POLYMARKET_WEBSOCKET_RECORDER.md index 64ff4fe..7ca6d50 100644 --- a/docs/POLYMARKET_WEBSOCKET_RECORDER.md +++ b/docs/POLYMARKET_WEBSOCKET_RECORDER.md @@ -119,3 +119,19 @@ and records reconnect, stale-feed, REST failure, parser, and divergence counters Current gzip files use hidden `.open` names until closed. The uploader skips open/temporary files and deletes local archives only when `--cleanup-after-verify` is used after rclone verification succeeds. + +## Reliability Semantics + +Checkpoint 10E fixed the stale reconnect loop by making stale timers +session-local. A new websocket session starts with `first_message_timeout_seconds` +grace before stale detection can fire. After the first text message, normal +`stale_feed_threshold_seconds` applies to that session only. Run-level +`last_successful_ws_message_at_utc` is still preserved for observability, but it +is not used to stale-fail a fresh connection. + +After `max_consecutive_stale_reconnects_before_discovery_refresh` stale +reconnects, the recorder forces discovery before the next subscription. This +prevents expired or rotated BTC token IDs from causing an endless reconnect loop. +Manifests distinguish `RUNNING_RECEIVING`, `RUNNING_RECONNECTING`, and +`RUNNING_NO_ACTIVE_TOKENS`, and include recent session summaries plus current +open archive paths. diff --git a/scripts/collect_polymarket_ws_orderbooks.py b/scripts/collect_polymarket_ws_orderbooks.py index f0356a7..f280920 100755 --- a/scripts/collect_polymarket_ws_orderbooks.py +++ b/scripts/collect_polymarket_ws_orderbooks.py @@ -32,7 +32,7 @@ from typing import Any COLLECTOR_NAME = "polymarket_ws_orderbook_recorder" -COLLECTOR_VERSION = "0.1.0" +COLLECTOR_VERSION = "0.1.1" WS_SCHEMA_NAME = "raw_polymarket_market_ws_message" REST_SCHEMA_NAME = "raw_polymarket_books_checkpoint" MANIFEST_SCHEMA_NAME = "polymarket_ws_recorder_manifest" @@ -369,6 +369,25 @@ class ArchiveWriter: self.rows = 0 self.started_at_utc = None + def open_file_summary(self) -> dict[str, Any] | None: + if self.temp_path is None: + return None + summary = { + "path": self.temp_path.as_posix(), + "final_path": self.final_path.as_posix() if self.final_path else None, + "kind": self.prefix, + "started_at_utc": self.started_at_utc, + "rows_written": self.rows, + "status": "open", + } + if self.temp_path.exists(): + stat = self.temp_path.stat() + summary.update({ + "bytes": stat.st_size, + "mtime_utc": dt.datetime.fromtimestamp(stat.st_mtime, dt.UTC).replace(microsecond=0).isoformat().replace("+00:00", "Z"), + }) + return summary + def send_ws_frame(sock: ssl.SSLSocket, opcode: int, payload: bytes) -> None: mask = os.urandom(4) @@ -838,8 +857,11 @@ def initial_counters() -> dict[str, Any]: "reconnect_count": 0, "subscription_change_count": 0, "stale_feed_count": 0, + "consecutive_stale_reconnects": 0, "max_gap_seconds": None, "last_message_received_at_utc": None, + "last_successful_ws_message_at_utc": None, + "seconds_since_last_ws_message": None, "rest_request_count": 0, "rest_success_count": 0, "rest_failure_count": 0, @@ -859,6 +881,13 @@ def summarize_states(states: dict[str, BookState], top_n: int) -> list[dict[str, return [state.summary(top_n) for state in states.values()] +def seconds_since_iso(value: str | None) -> float | None: + parsed = parse_iso(value) + if parsed is None: + return None + return round(max(0.0, (utc_now() - parsed).total_seconds()), 3) + + def write_manifest( *, config: dict[str, Any], @@ -875,7 +904,11 @@ def write_manifest( states: dict[str, BookState], warnings: list[str], errors: list[dict[str, Any]], + recent_sessions: list[dict[str, Any]] | None = None, + current_session: dict[str, Any] | None = None, ) -> dict[str, Any]: + counters["seconds_since_last_ws_message"] = seconds_since_iso(counters.get("last_successful_ws_message_at_utc")) + open_file_summaries = [summary for summary in [ws_writer.open_file_summary(), rest_writer.open_file_summary()] if summary is not None] manifest = { "schema_name": MANIFEST_SCHEMA_NAME, "schema_version": 1, @@ -893,10 +926,17 @@ def write_manifest( "config": public_config(config), "markets_tracked": markets, "tokens_tracked": tokens, + "current_subscription_token_ids": [token.get("token_id") for token in tokens], + "current_tracked_market_end_times": [market.get("end_time_utc") for market in markets], + "last_successful_ws_message_at_utc": counters.get("last_successful_ws_message_at_utc"), + "seconds_since_last_ws_message": counters.get("seconds_since_last_ws_message"), + "consecutive_stale_reconnects": counters.get("consecutive_stale_reconnects", 0), + "recent_sessions": (recent_sessions or [])[-20:], + "current_session": current_session, "counters": counters, "state_summary": summarize_states(states, int(config["top_n"])), "output_files": [*ws_writer.closed_files, *rest_writer.closed_files], - "open_files": [path.as_posix() for path in [ws_writer.temp_path, rest_writer.temp_path] if path is not None], + "open_files": open_file_summaries, "warnings": sorted(set(warnings)), "errors": errors[-20:], } @@ -944,11 +984,13 @@ def build_config(args: argparse.Namespace) -> dict[str, Any]: "rest_checkpoint_interval_seconds": float(config_value(file_config, args, "rest_checkpoint_interval_seconds", 60)), "rest_batch_size": int(config_value(file_config, args, "rest_batch_size", 50)), "top_n": int(config_value(file_config, args, "top_n", 10)), - "stale_feed_threshold_seconds": float(config_value(file_config, args, "stale_feed_threshold_seconds", 30)), + "first_message_timeout_seconds": float(config_value(file_config, args, "first_message_timeout_seconds", 90)), + "stale_feed_threshold_seconds": float(config_value(file_config, args, "stale_feed_threshold_seconds", 90)), "request_timeout_seconds": float(config_value(file_config, args, "request_timeout_seconds", 15)), "websocket_timeout_seconds": float(config_value(file_config, args, "websocket_timeout_seconds", 10)), "reconnect_backoff_seconds": float(config_value(file_config, args, "reconnect_backoff_seconds", 3)), "max_reconnect_backoff_seconds": float(config_value(file_config, args, "max_reconnect_backoff_seconds", 60)), + "max_consecutive_stale_reconnects_before_discovery_refresh": int(config_value(file_config, args, "max_consecutive_stale_reconnects_before_discovery_refresh", 3)), "duration_seconds": float(duration) if duration is not None else None, "manifest_write_interval_seconds": float(config_value(file_config, args, "manifest_write_interval_seconds", 300)), } @@ -956,6 +998,12 @@ def build_config(args: argparse.Namespace) -> dict[str, Any]: raise ValueError("rest_batch_size must be >= 1") if config["market_limit"] < 0: raise ValueError("market_limit must be >= 0; use 0 for all active BTC markets") + if config["first_message_timeout_seconds"] <= 0: + raise ValueError("first_message_timeout_seconds must be > 0") + if config["stale_feed_threshold_seconds"] <= 0: + raise ValueError("stale_feed_threshold_seconds must be > 0") + if config["max_consecutive_stale_reconnects_before_discovery_refresh"] < 1: + raise ValueError("max_consecutive_stale_reconnects_before_discovery_refresh must be >= 1") return config @@ -973,6 +1021,9 @@ def run_recorder(config: dict[str, Any], command: str) -> dict[str, Any]: tokens: list[dict[str, Any]] = [] states: dict[str, BookState] = {} rejection_counts: dict[str, int] = {} + recent_sessions: list[dict[str, Any]] = [] + current_session: dict[str, Any] | None = None + runtime_status = "RUNNING_RECONNECTING" ws_writer = ArchiveWriter(root=Path(config["raw_output_root"]), subdir="polymarket/ws_raw", prefix="polymarket_ws_raw", run_id=run_id) rest_writer = ArchiveWriter(root=Path(config["raw_output_root"]), subdir="polymarket/rest_checkpoints", prefix="polymarket_rest_checkpoints", run_id=run_id) @@ -985,29 +1036,69 @@ def run_recorder(config: dict[str, Any], command: str) -> dict[str, Any]: reconnect_backoff = float(config["reconnect_backoff_seconds"]) last_text_message_monotonic: float | None = None shutdown_reason: str | None = None + config["command"] = command + + def finish_current_session(reason: str, error: str | None = None) -> None: + nonlocal current_session + if current_session is None: + return + current_session["closed_at_utc"] = iso_z() + current_session["close_reason"] = current_session.get("close_reason") or reason + if error: + current_session["error"] = error + recent_sessions.append(dict(current_session)) + del recent_sessions[:-20] + current_session = None + + def write_runtime_manifest(status: str, gate_status: str = "IN_PROGRESS", reason: str | None = None) -> dict[str, Any]: + return write_manifest( + config=config, + run_id=run_id, + started_at_utc=started_at_utc, + status=status, + gate_status=gate_status, + shutdown_reason=reason, + markets=markets, + tokens=tokens, + counters=counters, + ws_writer=ws_writer, + rest_writer=rest_writer, + states=states, + warnings=warnings, + errors=errors, + recent_sessions=recent_sessions, + current_session=current_session, + ) try: while not STOP_REQUESTED: - if deadline is not None and time.monotonic() >= deadline: + now_outer = time.monotonic() + if deadline is not None and now_outer >= deadline: shutdown_reason = "duration_elapsed" break - if time.monotonic() >= next_discovery or not tokens: + if now_outer >= next_discovery or not tokens: old_token_ids = [token["token_id"] for token in tokens] try: markets, tokens, rejection_counts = refresh_market_state(config, counters, warnings, errors) + counters["market_rejection_counts"] = rejection_counts except Exception as exc: # noqa: BLE001 - preserve evidence and retry counters["discovery_failure_count"] += 1 errors.append({"stage": "load_discovery", "error": f"{type(exc).__name__}: {exc}"}) + runtime_status = "RUNNING_RECONNECTING" + write_runtime_manifest(runtime_status) time.sleep(min(reconnect_backoff, 30)) next_discovery = time.monotonic() + float(config["discovery_refresh_interval_seconds"]) continue new_token_ids = [token["token_id"] for token in tokens] if old_token_ids and old_token_ids != new_token_ids: counters["subscription_change_count"] += 1 + warnings.append("subscription_rotated_after_discovery_refresh") states = {token["token_id"]: states.get(token["token_id"], BookState(token)) for token in tokens} next_discovery = time.monotonic() + float(config["discovery_refresh_interval_seconds"]) if not tokens: - warnings.append("no active BTC Up/Down tokens available after discovery") + runtime_status = "RUNNING_NO_ACTIVE_TOKENS" + warnings.append("no_active_tokens_after_discovery_refresh") + write_runtime_manifest(runtime_status) time.sleep(min(reconnect_backoff, 30)) continue @@ -1015,40 +1106,84 @@ def run_recorder(config: dict[str, Any], command: str) -> dict[str, Any]: subscription = {"assets_ids": token_ids, "type": "market", "custom_feature_enabled": True} connection_sequence += 1 session_id = f"{run_id}-ws{connection_sequence}" + current_session = { + "session_id": session_id, + "connection_sequence": connection_sequence, + "connected_at_utc": None, + "subscribed_at_utc": None, + "closed_at_utc": None, + "messages": 0, + "close_reason": None, + "stale_reason": None, + "first_message_latency_seconds": None, + "last_message_received_at_utc": None, + "token_count": len(token_ids), + "market_slugs": [market.get("market_slug") for market in markets], + "market_end_times": [market.get("end_time_utc") for market in markets], + } sock: ssl.SSLSocket | None = None + session_last_text_message_monotonic: float | None = None + session_subscribed_monotonic: float | None = None + session_message_sequence = 0 + stale_raised = False try: + runtime_status = "RUNNING_RECONNECTING" sock, handshake = open_websocket(str(config["websocket_url"]), float(config["websocket_timeout_seconds"])) counters["connection_count"] += 1 + current_session["connected_at_utc"] = iso_z() + current_session["handshake_status_line"] = handshake.get("status_line") send_ws_frame(sock, 1, json.dumps(subscription, separators=(",", ":")).encode("utf-8")) - session_message_sequence = 0 - session_started_at_utc = iso_z() - reconnect_backoff = float(config["reconnect_backoff_seconds"]) + current_session["subscribed_at_utc"] = iso_z() + session_subscribed_monotonic = time.monotonic() while not STOP_REQUESTED: now_monotonic = time.monotonic() if deadline is not None and now_monotonic >= deadline: shutdown_reason = "duration_elapsed" + finish_current_session(shutdown_reason) break if now_monotonic >= next_rest_checkpoint: checkpoint_sequence += 1 fetch_rest_checkpoint(config=config, rest_writer=rest_writer, checkpoint_sequence=checkpoint_sequence, tokens=tokens, states=states, counters=counters) next_rest_checkpoint = now_monotonic + float(config["rest_checkpoint_interval_seconds"]) if now_monotonic >= next_manifest_write: - write_manifest(config=config, run_id=run_id, started_at_utc=started_at_utc, status="RUNNING", gate_status="IN_PROGRESS", shutdown_reason=None, markets=markets, tokens=tokens, counters=counters, ws_writer=ws_writer, rest_writer=rest_writer, states=states, warnings=warnings, errors=errors) + write_runtime_manifest(runtime_status) next_manifest_write = now_monotonic + float(config["manifest_write_interval_seconds"]) if now_monotonic >= next_discovery: previous_token_ids = token_ids markets, tokens, rejection_counts = refresh_market_state(config, counters, warnings, errors) + counters["market_rejection_counts"] = rejection_counts token_ids = [token["token_id"] for token in tokens] next_discovery = now_monotonic + float(config["discovery_refresh_interval_seconds"]) if token_ids != previous_token_ids: counters["subscription_change_count"] += 1 states = {token["token_id"]: states.get(token["token_id"], BookState(token)) for token in tokens} - raise RuntimeError("tracked token set changed; reconnecting with new subscription") - if last_text_message_monotonic is not None: - silence = now_monotonic - last_text_message_monotonic + if token_ids: + current_session["close_reason"] = "subscription_rotated" + raise RuntimeError("tracked token set changed; reconnecting with new subscription") + current_session["close_reason"] = "no_active_tokens" + raise RuntimeError("no active tokens after discovery refresh") + if session_last_text_message_monotonic is None: + first_wait = now_monotonic - (session_subscribed_monotonic or now_monotonic) + if first_wait > float(config["first_message_timeout_seconds"]): + counters["stale_feed_count"] += 1 + counters["consecutive_stale_reconnects"] += 1 + stale_raised = True + reason = f"no first websocket text message for {first_wait:.3f}s" + current_session["stale_reason"] = reason + current_session["close_reason"] = "first_message_timeout" + runtime_status = "RUNNING_RECONNECTING" + raise TimeoutError(reason) + else: + silence = now_monotonic - session_last_text_message_monotonic if silence > float(config["stale_feed_threshold_seconds"]): counters["stale_feed_count"] += 1 - raise TimeoutError(f"stale websocket feed for {silence:.3f}s") + counters["consecutive_stale_reconnects"] += 1 + stale_raised = True + reason = f"stale websocket feed for {silence:.3f}s" + current_session["stale_reason"] = reason + current_session["close_reason"] = "stale_feed" + runtime_status = "RUNNING_RECONNECTING" + raise TimeoutError(reason) try: opcode, payload = read_ws_frame(sock) except socket.timeout: @@ -1061,9 +1196,18 @@ def run_recorder(config: dict[str, Any], command: str) -> dict[str, Any]: gap = round(now_for_gap - last_text_message_monotonic, 3) previous = counters.get("max_gap_seconds") counters["max_gap_seconds"] = gap if previous is None else max(previous, gap) + if session_message_sequence == 0 and session_subscribed_monotonic is not None: + current_session["first_message_latency_seconds"] = round(now_for_gap - session_subscribed_monotonic, 3) + reconnect_backoff = float(config["reconnect_backoff_seconds"]) last_text_message_monotonic = now_for_gap + session_last_text_message_monotonic = now_for_gap counters["last_message_received_at_utc"] = received_at_utc + counters["last_successful_ws_message_at_utc"] = received_at_utc + counters["consecutive_stale_reconnects"] = 0 + runtime_status = "RUNNING_RECEIVING" session_message_sequence += 1 + current_session["messages"] = session_message_sequence + current_session["last_message_received_at_utc"] = received_at_utc global_sequence += 1 envelope, parsed_json, event_types, parse_ok = build_ws_envelope( run_id=run_id, @@ -1086,6 +1230,7 @@ def run_recorder(config: dict[str, Any], command: str) -> dict[str, Any]: else: counters["websocket_parse_error_count"] += 1 elif opcode == 8: + current_session["close_reason"] = "websocket_close_frame" raise EOFError("websocket close frame received") elif opcode == 9: send_ws_frame(sock, 10, payload) @@ -1096,8 +1241,17 @@ def run_recorder(config: dict[str, Any], command: str) -> dict[str, Any]: if shutdown_reason: break except Exception as exc: # noqa: BLE001 - preserve reconnect evidence - errors.append({"stage": "websocket_session", "connection_sequence": connection_sequence, "error": f"{type(exc).__name__}: {exc}"}) + error_text = f"{type(exc).__name__}: {exc}" + errors.append({"stage": "websocket_session", "connection_sequence": connection_sequence, "error": error_text}) counters["reconnect_count"] += 1 + if not stale_raised: + counters["consecutive_stale_reconnects"] = 0 + finish_current_session(current_session.get("close_reason") if current_session else type(exc).__name__, error_text) + runtime_status = "RUNNING_RECONNECTING" + next_manifest_write = 0.0 + if stale_raised and counters["consecutive_stale_reconnects"] >= int(config["max_consecutive_stale_reconnects_before_discovery_refresh"]): + next_discovery = 0.0 + warnings.append("forced_discovery_refresh_after_consecutive_stale_reconnects") if STOP_REQUESTED: shutdown_reason = STOP_SIGNAL or "stop_requested" break @@ -1109,11 +1263,15 @@ def run_recorder(config: dict[str, Any], command: str) -> dict[str, Any]: sock.close() except OSError: pass + if current_session is not None and shutdown_reason: + finish_current_session(shutdown_reason) if shutdown_reason: break finally: if STOP_REQUESTED and shutdown_reason is None: shutdown_reason = STOP_SIGNAL or "stop_requested" + if current_session is not None: + finish_current_session(shutdown_reason or "loop_exited") ws_writer.close() rest_writer.close() @@ -1124,11 +1282,26 @@ def run_recorder(config: dict[str, Any], command: str) -> dict[str, Any]: else: gate_status = "BLOCKED_RUNTIME_EVIDENCE" status = "INTERRUPTED" if STOP_SIGNAL else ("COMPLETED_BOUNDED" if config["duration_seconds"] else "STOPPED") - config["command"] = command - manifest = write_manifest(config=config, run_id=run_id, started_at_utc=started_at_utc, status=status, gate_status=gate_status, shutdown_reason=shutdown_reason, markets=markets, tokens=tokens, counters=counters, ws_writer=ws_writer, rest_writer=rest_writer, states=states, warnings=warnings, errors=errors) + manifest = write_manifest( + config=config, + run_id=run_id, + started_at_utc=started_at_utc, + status=status, + gate_status=gate_status, + shutdown_reason=shutdown_reason, + markets=markets, + tokens=tokens, + counters=counters, + ws_writer=ws_writer, + rest_writer=rest_writer, + states=states, + warnings=warnings, + errors=errors, + recent_sessions=recent_sessions, + current_session=current_session, + ) return manifest - def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description="Long-running Polymarket BTC websocket raw recorder with REST checkpoints.") parser.add_argument("--config", type=Path, default=DEFAULT_CONFIG_PATH) diff --git a/scripts/k8s_ws_reliability_check.sh b/scripts/k8s_ws_reliability_check.sh new file mode 100755 index 0000000..c073bd8 --- /dev/null +++ b/scripts/k8s_ws_reliability_check.sh @@ -0,0 +1,190 @@ +#!/usr/bin/env bash +set -euo pipefail + +NAMESPACE="${ORDERBOOKS_K8S_NAMESPACE:-orderbooks}" +WS_DEPLOYMENT="${ORDERBOOKS_WS_DEPLOYMENT:-orderbooks-ws-recorder}" +REST_DEPLOYMENT="${ORDERBOOKS_REST_DEPLOYMENT:-orderbooks-collector}" +WAIT_SECONDS="${ORDERBOOKS_K8S_WS_RELIABILITY_WAIT_SECONDS:-1800}" +OUTPUT_PATH="" +RAW_DIR="/var/lib/orderbooks/raw_orderbooks" +MANIFEST_PATH="/var/lib/orderbooks/manifests/polymarket_ws_recorder_latest.json" + +usage() { + cat <<'EOF' +Usage: scripts/k8s_ws_reliability_check.sh [options] + +Read-only bounded observation for the Kubernetes websocket recorder canary. It +writes compact local JSON evidence and does not print secret contents. + +Options: + --namespace NAME Namespace. Default: orderbooks. + --deployment NAME Websocket recorder Deployment. Default: orderbooks-ws-recorder. + --rest-deployment NAME REST collector Deployment to prove unchanged. Default: orderbooks-collector. + --wait-seconds N Observation window. Default: 1800. + --output PATH Local evidence JSON path. + --raw-dir PATH In-pod raw root. Default: /var/lib/orderbooks/raw_orderbooks. + --manifest-path PATH In-pod websocket manifest path. + --help Show help. +EOF +} + +while [[ $# -gt 0 ]]; do + case "$1" in + --namespace) NAMESPACE="$2"; shift 2 ;; + --deployment) WS_DEPLOYMENT="$2"; shift 2 ;; + --rest-deployment) REST_DEPLOYMENT="$2"; shift 2 ;; + --wait-seconds) WAIT_SECONDS="$2"; shift 2 ;; + --output) OUTPUT_PATH="$2"; shift 2 ;; + --raw-dir) RAW_DIR="$2"; shift 2 ;; + --manifest-path) MANIFEST_PATH="$2"; shift 2 ;; + --help) usage; exit 0 ;; + *) echo "unknown argument: $1" >&2; usage >&2; exit 2 ;; + esac +done + +command -v kubectl >/dev/null 2>&1 || { echo "kubectl is required" >&2; exit 2; } +command -v python3 >/dev/null 2>&1 || { echo "python3 is required" >&2; exit 2; } +RUN_ID="$(date -u +%Y%m%dT%H%M%SZ)" +if [[ -z "$OUTPUT_PATH" ]]; then + OUTPUT_PATH="data/manifests/ws_reliability_observation_${RUN_ID}.json" +fi +mkdir -p "$(dirname "$OUTPUT_PATH")" +TMPDIR="$(mktemp -d)" +trap 'rm -rf "$TMPDIR"' EXIT + +pod_for_deployment() { + local deployment="$1" + local selector + selector="$(kubectl -n "$NAMESPACE" get deployment "$deployment" -o json | python3 -c 'import json, sys; labels=json.load(sys.stdin)["spec"]["selector"]["matchLabels"]; print(",".join(f"{k}={v}" for k,v in sorted(labels.items())))')" + [[ -n "$selector" ]] || return 1 + kubectl -n "$NAMESPACE" get pod -l "$selector" -o jsonpath='{.items[?(@.status.phase=="Running")].metadata.name}' | awk '{print $1}' +} + +kubectl -n "$NAMESPACE" rollout status "deployment/${REST_DEPLOYMENT}" --timeout=120s >/dev/null +kubectl -n "$NAMESPACE" rollout status "deployment/${WS_DEPLOYMENT}" --timeout=180s >/dev/null +REST_IMAGE_BEFORE="$(kubectl -n "$NAMESPACE" get deployment "$REST_DEPLOYMENT" -o jsonpath='{.spec.template.spec.containers[0].image}')" +REST_READY_BEFORE="$(kubectl -n "$NAMESPACE" get deployment "$REST_DEPLOYMENT" -o jsonpath='{.status.readyReplicas}/{.spec.replicas}')" +WS_IMAGE="$(kubectl -n "$NAMESPACE" get deployment "$WS_DEPLOYMENT" -o jsonpath='{.spec.template.spec.containers[0].image}')" +WS_POD="$(pod_for_deployment "$WS_DEPLOYMENT")" +[[ -n "$WS_POD" ]] || { echo "missing running websocket pod" >&2; exit 1; } + +SUMMARY_PY="$TMPDIR/reliability-summary.py" +cat >"$SUMMARY_PY" <<'PY_SUMMARY' +import json, os, time +from pathlib import Path +raw=Path(os.environ['RAW_DIR']) +manifest_path=Path(os.environ['MANIFEST_PATH']) +o=json.loads(manifest_path.read_text()) +def file_summary(path): + if not path: + return None + p=Path(path) + if not p.exists(): + return {'path': str(p), 'exists': False} + st=p.stat() + return {'path': str(p), 'exists': True, 'bytes': st.st_size, 'mtime_utc': time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime(st.st_mtime))} +open_files=o.get('open_files') or [] +ws_open=next((x for x in open_files if 'ws_raw' in str(x.get('path'))), None) +rest_open=next((x for x in open_files if 'rest_checkpoints' in str(x.get('path'))), None) +ws_files=sorted(raw.glob('polymarket/ws_raw/**/*.jsonl.gz'), key=lambda p:p.stat().st_mtime) +rest_files=sorted(raw.glob('polymarket/rest_checkpoints/**/*.jsonl.gz'), key=lambda p:p.stat().st_mtime) +print(json.dumps({ + 'sampled_at_utc': time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime()), + 'manifest_path': str(manifest_path), + 'manifest_mtime_utc': time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime(manifest_path.stat().st_mtime)), + 'run_id': o.get('run_id'), + 'status': o.get('status'), + 'gate_status': o.get('gate_status'), + 'updated_at_utc': o.get('updated_at_utc'), + 'seconds_since_last_ws_message': o.get('seconds_since_last_ws_message'), + 'last_successful_ws_message_at_utc': o.get('last_successful_ws_message_at_utc'), + 'current_subscription_token_ids': o.get('current_subscription_token_ids'), + 'current_tracked_market_end_times': o.get('current_tracked_market_end_times'), + 'current_session': o.get('current_session'), + 'recent_sessions': (o.get('recent_sessions') or [])[-5:], + 'counters': o.get('counters') or {}, + 'ws_open_file': ws_open, + 'rest_open_file': rest_open, + 'latest_closed_ws': file_summary(ws_files[-1]) if ws_files else None, + 'latest_closed_rest': file_summary(rest_files[-1]) if rest_files else None, +}, sort_keys=True)) +PY_SUMMARY + +sample_pod() { + kubectl -n "$NAMESPACE" exec "$WS_POD" -- env RAW_DIR="$RAW_DIR" MANIFEST_PATH="$MANIFEST_PATH" python3 -c "$(cat "$SUMMARY_PY")" +} + +START_JSON="$(sample_pod)" +sleep "$WAIT_SECONDS" +kubectl -n "$NAMESPACE" rollout status "deployment/${WS_DEPLOYMENT}" --timeout=180s >/dev/null +WS_POD_AFTER="$(pod_for_deployment "$WS_DEPLOYMENT")" +if [[ "$WS_POD_AFTER" != "$WS_POD" ]]; then + WS_POD="$WS_POD_AFTER" +fi +END_JSON="$(sample_pod)" +REST_IMAGE_AFTER="$(kubectl -n "$NAMESPACE" get deployment "$REST_DEPLOYMENT" -o jsonpath='{.spec.template.spec.containers[0].image}')" +REST_READY_AFTER="$(kubectl -n "$NAMESPACE" get deployment "$REST_DEPLOYMENT" -o jsonpath='{.status.readyReplicas}/{.spec.replicas}')" + +WRITE_PY="$TMPDIR/write.py" +cat >"$WRITE_PY" <<'PY_WRITE' +import datetime as dt, json, sys +from pathlib import Path +(output_path, namespace, ws_deployment, rest_deployment, wait_seconds, ws_image, rest_image_before, rest_ready_before, rest_image_after, rest_ready_after)=sys.argv[1:11] +start=json.loads(sys.stdin.readline()) +end=json.loads(sys.stdin.readline()) +def c(obj, key): + return (obj.get('counters') or {}).get(key) +def num(value): + return 0 if value is None else value +ws_delta=num(c(end,'websocket_message_count'))-num(c(start,'websocket_message_count')) +rest_delta=num(c(end,'rest_success_count'))-num(c(start,'rest_success_count')) +stale_delta=num(c(end,'stale_feed_count'))-num(c(start,'stale_feed_count')) +reconnect_delta=num(c(end,'reconnect_count'))-num(c(start,'reconnect_count')) +parse_errors=num(c(end,'websocket_parse_error_count')) +threshold=float(((end.get('counters') or {}).get('stale_feed_threshold_seconds') or 90)) +seconds_since=end.get('seconds_since_last_ws_message') +active_tokens=len(end.get('current_subscription_token_ids') or []) +start_ws_open=start.get('ws_open_file') or {} +end_ws_open=end.get('ws_open_file') or {} +file_advanced=False +if start_ws_open and end_ws_open: + file_advanced = end_ws_open.get('path') != start_ws_open.get('path') or num(end_ws_open.get('bytes')) > num(start_ws_open.get('bytes')) or end_ws_open.get('mtime_utc') != start_ws_open.get('mtime_utc') +reasons=[] +if active_tokens and ws_delta <= 0: + reasons.append('websocket_message_count did not increase while active tokens existed') +if active_tokens and not file_advanced: + reasons.append('websocket open file did not advance while active tokens existed') +if rest_delta <= 0: + reasons.append('REST checkpoint success count did not increase') +if parse_errors != 0: + reasons.append('websocket_parse_error_count is nonzero') +if active_tokens and seconds_since is not None and seconds_since > max(threshold * 2, 180): + reasons.append('seconds_since_last_ws_message exceeded reliability threshold') +if reconnect_delta > 3 or stale_delta > 3: + reasons.append('reconnect/stale counters grew rapidly during observation') +if rest_image_before != rest_image_after or rest_ready_before != rest_ready_after: + reasons.append('REST collector image/readiness changed') +gate='WS_RELIABILITY_OBSERVATION_PASS' if not reasons else 'BLOCKED_WS_STALE_LOOP' +manifest={ + 'schema_name':'ws_reliability_observation', + 'schema_version':1, + 'written_at_utc':dt.datetime.now(dt.UTC).replace(microsecond=0).isoformat().replace('+00:00','Z'), + 'gate_status':gate, + 'namespace':namespace, + 'ws_deployment':ws_deployment, + 'rest_deployment':rest_deployment, + 'wait_seconds':int(float(wait_seconds)), + 'ws_image':ws_image, + 'rest_collector':{'image_before':rest_image_before,'ready_before':rest_ready_before,'image_after':rest_image_after,'ready_after':rest_ready_after,'unchanged':rest_image_before==rest_image_after and rest_ready_before==rest_ready_after}, + 'start':start, + 'end':end, + 'deltas':{'websocket_message_count':ws_delta,'rest_success_count':rest_delta,'stale_feed_count':stale_delta,'reconnect_count':reconnect_delta}, + 'file_advanced':file_advanced, + 'reasons':reasons, + 'production_ready':False, +} +Path(output_path).write_text(json.dumps(manifest, indent=2, sort_keys=True)+'\n') +print(json.dumps({'gate_status':gate,'evidence_path':output_path,'deltas':manifest['deltas'],'reasons':reasons}, indent=2, sort_keys=True)) +raise SystemExit(0 if gate == 'WS_RELIABILITY_OBSERVATION_PASS' else 1) +PY_WRITE +printf '%s\n%s\n' "$START_JSON" "$END_JSON" | python3 "$WRITE_PY" "$OUTPUT_PATH" "$NAMESPACE" "$WS_DEPLOYMENT" "$REST_DEPLOYMENT" "$WAIT_SECONDS" "$WS_IMAGE" "$REST_IMAGE_BEFORE" "$REST_READY_BEFORE" "$REST_IMAGE_AFTER" "$REST_READY_AFTER"