Harden websocket recorder reconnect reliability

This commit is contained in:
philipp 2026-04-19 21:43:13 +02:00
parent 51e0fa5a37
commit 561e222250
6 changed files with 420 additions and 21 deletions

View file

@ -27,11 +27,13 @@ market_end_safety_seconds: 420
rest_checkpoint_interval_seconds: 60 rest_checkpoint_interval_seconds: 60
rest_batch_size: 50 rest_batch_size: 50
top_n: 10 top_n: 10
stale_feed_threshold_seconds: 30 first_message_timeout_seconds: 90
stale_feed_threshold_seconds: 90
request_timeout_seconds: 15 request_timeout_seconds: 15
websocket_timeout_seconds: 10 websocket_timeout_seconds: 10
reconnect_backoff_seconds: 3 reconnect_backoff_seconds: 3
max_reconnect_backoff_seconds: 60 max_reconnect_backoff_seconds: 60
max_consecutive_stale_reconnects_before_discovery_refresh: 3
manifest_write_interval_seconds: 300 manifest_write_interval_seconds: 300
# Continuous by default. Set via CLI or env ORDERBOOKS_WS_DURATION_SECONDS for smoke tests. # Continuous by default. Set via CLI or env ORDERBOOKS_WS_DURATION_SECONDS for smoke tests.

View file

@ -46,10 +46,12 @@ data:
rest_checkpoint_interval_seconds: 60 rest_checkpoint_interval_seconds: 60
rest_batch_size: 50 rest_batch_size: 50
top_n: 10 top_n: 10
stale_feed_threshold_seconds: 30 first_message_timeout_seconds: 90
stale_feed_threshold_seconds: 90
request_timeout_seconds: 15 request_timeout_seconds: 15
websocket_timeout_seconds: 10 websocket_timeout_seconds: 10
reconnect_backoff_seconds: 3 reconnect_backoff_seconds: 3
max_reconnect_backoff_seconds: 60 max_reconnect_backoff_seconds: 60
max_consecutive_stale_reconnects_before_discovery_refresh: 3
manifest_write_interval_seconds: 60 manifest_write_interval_seconds: 60
duration_seconds: null duration_seconds: null

View file

@ -159,7 +159,7 @@ for recovery and divergence evidence.
The script and example config default to `market_limit: 0`, which means all The script and example config default to `market_limit: 0`, which means all
discovered active BTC Up/Down markets. The Kubernetes canary config currently discovered active BTC Up/Down markets. The Kubernetes canary config currently
sets `market_limit: 2` and `manifest_write_interval_seconds: 60` as explicit sets `market_limit: 2`, `manifest_write_interval_seconds: 60`, `first_message_timeout_seconds: 90`, and `stale_feed_threshold_seconds: 90` as explicit
smoke/safety settings. The 10D local bounded run smoke/safety settings. The 10D local bounded run
wrote about 3.35 MB of compressed websocket data in two minutes for two markets; wrote about 3.35 MB of compressed websocket data in two minutes for two markets;
running all active BTC markets on the current 10Gi PVC needs a separate sizing running all active BTC markets on the current 10Gi PVC needs a separate sizing
@ -218,6 +218,22 @@ The smoke gate uses `kubectl`, not systemd. It writes local JSON evidence under
A failed smoke run still writes JSON evidence and exits nonzero. Preserve failed A failed smoke run still writes JSON evidence and exits nonzero. Preserve failed
manifests, raw files, upload manifests, and pod logs for review. manifests, raw files, upload manifests, and pod logs for review.
## Websocket Reliability Observation
After deploying a websocket recorder reliability fix, run a read-only bounded
observation before treating the canary as unattended:
```sh
KUBECONFIG=../nuri/unrip3/.state/hetzner/kubeconfig.yaml \
scripts/k8s_ws_reliability_check.sh --wait-seconds 1800
```
The observation fails if websocket message counts and archive mtimes do not
advance while active tokens exist, if REST checkpoints stop succeeding, if parse
errors appear, or if reconnect/stale counters grow rapidly without recovery. It
also records the REST collector image/readiness before and after the observation.
## Not Included ## Not Included
- No trading, signing, wallets, private keys, or API keys. - No trading, signing, wallets, private keys, or API keys.

View file

@ -119,3 +119,19 @@ and records reconnect, stale-feed, REST failure, parser, and divergence counters
Current gzip files use hidden `.open` names until closed. The uploader skips Current gzip files use hidden `.open` names until closed. The uploader skips
open/temporary files and deletes local archives only when `--cleanup-after-verify` open/temporary files and deletes local archives only when `--cleanup-after-verify`
is used after rclone verification succeeds. is used after rclone verification succeeds.
## Reliability Semantics
Checkpoint 10E fixed the stale reconnect loop by making stale timers
session-local. A new websocket session starts with `first_message_timeout_seconds`
grace before stale detection can fire. After the first text message, normal
`stale_feed_threshold_seconds` applies to that session only. Run-level
`last_successful_ws_message_at_utc` is still preserved for observability, but it
is not used to stale-fail a fresh connection.
After `max_consecutive_stale_reconnects_before_discovery_refresh` stale
reconnects, the recorder forces discovery before the next subscription. This
prevents expired or rotated BTC token IDs from causing an endless reconnect loop.
Manifests distinguish `RUNNING_RECEIVING`, `RUNNING_RECONNECTING`, and
`RUNNING_NO_ACTIVE_TOKENS`, and include recent session summaries plus current
open archive paths.

View file

@ -32,7 +32,7 @@ from typing import Any
COLLECTOR_NAME = "polymarket_ws_orderbook_recorder" COLLECTOR_NAME = "polymarket_ws_orderbook_recorder"
COLLECTOR_VERSION = "0.1.0" COLLECTOR_VERSION = "0.1.1"
WS_SCHEMA_NAME = "raw_polymarket_market_ws_message" WS_SCHEMA_NAME = "raw_polymarket_market_ws_message"
REST_SCHEMA_NAME = "raw_polymarket_books_checkpoint" REST_SCHEMA_NAME = "raw_polymarket_books_checkpoint"
MANIFEST_SCHEMA_NAME = "polymarket_ws_recorder_manifest" MANIFEST_SCHEMA_NAME = "polymarket_ws_recorder_manifest"
@ -369,6 +369,25 @@ class ArchiveWriter:
self.rows = 0 self.rows = 0
self.started_at_utc = None self.started_at_utc = None
def open_file_summary(self) -> dict[str, Any] | None:
if self.temp_path is None:
return None
summary = {
"path": self.temp_path.as_posix(),
"final_path": self.final_path.as_posix() if self.final_path else None,
"kind": self.prefix,
"started_at_utc": self.started_at_utc,
"rows_written": self.rows,
"status": "open",
}
if self.temp_path.exists():
stat = self.temp_path.stat()
summary.update({
"bytes": stat.st_size,
"mtime_utc": dt.datetime.fromtimestamp(stat.st_mtime, dt.UTC).replace(microsecond=0).isoformat().replace("+00:00", "Z"),
})
return summary
def send_ws_frame(sock: ssl.SSLSocket, opcode: int, payload: bytes) -> None: def send_ws_frame(sock: ssl.SSLSocket, opcode: int, payload: bytes) -> None:
mask = os.urandom(4) mask = os.urandom(4)
@ -838,8 +857,11 @@ def initial_counters() -> dict[str, Any]:
"reconnect_count": 0, "reconnect_count": 0,
"subscription_change_count": 0, "subscription_change_count": 0,
"stale_feed_count": 0, "stale_feed_count": 0,
"consecutive_stale_reconnects": 0,
"max_gap_seconds": None, "max_gap_seconds": None,
"last_message_received_at_utc": None, "last_message_received_at_utc": None,
"last_successful_ws_message_at_utc": None,
"seconds_since_last_ws_message": None,
"rest_request_count": 0, "rest_request_count": 0,
"rest_success_count": 0, "rest_success_count": 0,
"rest_failure_count": 0, "rest_failure_count": 0,
@ -859,6 +881,13 @@ def summarize_states(states: dict[str, BookState], top_n: int) -> list[dict[str,
return [state.summary(top_n) for state in states.values()] return [state.summary(top_n) for state in states.values()]
def seconds_since_iso(value: str | None) -> float | None:
parsed = parse_iso(value)
if parsed is None:
return None
return round(max(0.0, (utc_now() - parsed).total_seconds()), 3)
def write_manifest( def write_manifest(
*, *,
config: dict[str, Any], config: dict[str, Any],
@ -875,7 +904,11 @@ def write_manifest(
states: dict[str, BookState], states: dict[str, BookState],
warnings: list[str], warnings: list[str],
errors: list[dict[str, Any]], errors: list[dict[str, Any]],
recent_sessions: list[dict[str, Any]] | None = None,
current_session: dict[str, Any] | None = None,
) -> dict[str, Any]: ) -> dict[str, Any]:
counters["seconds_since_last_ws_message"] = seconds_since_iso(counters.get("last_successful_ws_message_at_utc"))
open_file_summaries = [summary for summary in [ws_writer.open_file_summary(), rest_writer.open_file_summary()] if summary is not None]
manifest = { manifest = {
"schema_name": MANIFEST_SCHEMA_NAME, "schema_name": MANIFEST_SCHEMA_NAME,
"schema_version": 1, "schema_version": 1,
@ -893,10 +926,17 @@ def write_manifest(
"config": public_config(config), "config": public_config(config),
"markets_tracked": markets, "markets_tracked": markets,
"tokens_tracked": tokens, "tokens_tracked": tokens,
"current_subscription_token_ids": [token.get("token_id") for token in tokens],
"current_tracked_market_end_times": [market.get("end_time_utc") for market in markets],
"last_successful_ws_message_at_utc": counters.get("last_successful_ws_message_at_utc"),
"seconds_since_last_ws_message": counters.get("seconds_since_last_ws_message"),
"consecutive_stale_reconnects": counters.get("consecutive_stale_reconnects", 0),
"recent_sessions": (recent_sessions or [])[-20:],
"current_session": current_session,
"counters": counters, "counters": counters,
"state_summary": summarize_states(states, int(config["top_n"])), "state_summary": summarize_states(states, int(config["top_n"])),
"output_files": [*ws_writer.closed_files, *rest_writer.closed_files], "output_files": [*ws_writer.closed_files, *rest_writer.closed_files],
"open_files": [path.as_posix() for path in [ws_writer.temp_path, rest_writer.temp_path] if path is not None], "open_files": open_file_summaries,
"warnings": sorted(set(warnings)), "warnings": sorted(set(warnings)),
"errors": errors[-20:], "errors": errors[-20:],
} }
@ -944,11 +984,13 @@ def build_config(args: argparse.Namespace) -> dict[str, Any]:
"rest_checkpoint_interval_seconds": float(config_value(file_config, args, "rest_checkpoint_interval_seconds", 60)), "rest_checkpoint_interval_seconds": float(config_value(file_config, args, "rest_checkpoint_interval_seconds", 60)),
"rest_batch_size": int(config_value(file_config, args, "rest_batch_size", 50)), "rest_batch_size": int(config_value(file_config, args, "rest_batch_size", 50)),
"top_n": int(config_value(file_config, args, "top_n", 10)), "top_n": int(config_value(file_config, args, "top_n", 10)),
"stale_feed_threshold_seconds": float(config_value(file_config, args, "stale_feed_threshold_seconds", 30)), "first_message_timeout_seconds": float(config_value(file_config, args, "first_message_timeout_seconds", 90)),
"stale_feed_threshold_seconds": float(config_value(file_config, args, "stale_feed_threshold_seconds", 90)),
"request_timeout_seconds": float(config_value(file_config, args, "request_timeout_seconds", 15)), "request_timeout_seconds": float(config_value(file_config, args, "request_timeout_seconds", 15)),
"websocket_timeout_seconds": float(config_value(file_config, args, "websocket_timeout_seconds", 10)), "websocket_timeout_seconds": float(config_value(file_config, args, "websocket_timeout_seconds", 10)),
"reconnect_backoff_seconds": float(config_value(file_config, args, "reconnect_backoff_seconds", 3)), "reconnect_backoff_seconds": float(config_value(file_config, args, "reconnect_backoff_seconds", 3)),
"max_reconnect_backoff_seconds": float(config_value(file_config, args, "max_reconnect_backoff_seconds", 60)), "max_reconnect_backoff_seconds": float(config_value(file_config, args, "max_reconnect_backoff_seconds", 60)),
"max_consecutive_stale_reconnects_before_discovery_refresh": int(config_value(file_config, args, "max_consecutive_stale_reconnects_before_discovery_refresh", 3)),
"duration_seconds": float(duration) if duration is not None else None, "duration_seconds": float(duration) if duration is not None else None,
"manifest_write_interval_seconds": float(config_value(file_config, args, "manifest_write_interval_seconds", 300)), "manifest_write_interval_seconds": float(config_value(file_config, args, "manifest_write_interval_seconds", 300)),
} }
@ -956,6 +998,12 @@ def build_config(args: argparse.Namespace) -> dict[str, Any]:
raise ValueError("rest_batch_size must be >= 1") raise ValueError("rest_batch_size must be >= 1")
if config["market_limit"] < 0: if config["market_limit"] < 0:
raise ValueError("market_limit must be >= 0; use 0 for all active BTC markets") raise ValueError("market_limit must be >= 0; use 0 for all active BTC markets")
if config["first_message_timeout_seconds"] <= 0:
raise ValueError("first_message_timeout_seconds must be > 0")
if config["stale_feed_threshold_seconds"] <= 0:
raise ValueError("stale_feed_threshold_seconds must be > 0")
if config["max_consecutive_stale_reconnects_before_discovery_refresh"] < 1:
raise ValueError("max_consecutive_stale_reconnects_before_discovery_refresh must be >= 1")
return config return config
@ -973,6 +1021,9 @@ def run_recorder(config: dict[str, Any], command: str) -> dict[str, Any]:
tokens: list[dict[str, Any]] = [] tokens: list[dict[str, Any]] = []
states: dict[str, BookState] = {} states: dict[str, BookState] = {}
rejection_counts: dict[str, int] = {} rejection_counts: dict[str, int] = {}
recent_sessions: list[dict[str, Any]] = []
current_session: dict[str, Any] | None = None
runtime_status = "RUNNING_RECONNECTING"
ws_writer = ArchiveWriter(root=Path(config["raw_output_root"]), subdir="polymarket/ws_raw", prefix="polymarket_ws_raw", run_id=run_id) ws_writer = ArchiveWriter(root=Path(config["raw_output_root"]), subdir="polymarket/ws_raw", prefix="polymarket_ws_raw", run_id=run_id)
rest_writer = ArchiveWriter(root=Path(config["raw_output_root"]), subdir="polymarket/rest_checkpoints", prefix="polymarket_rest_checkpoints", run_id=run_id) rest_writer = ArchiveWriter(root=Path(config["raw_output_root"]), subdir="polymarket/rest_checkpoints", prefix="polymarket_rest_checkpoints", run_id=run_id)
@ -985,29 +1036,69 @@ def run_recorder(config: dict[str, Any], command: str) -> dict[str, Any]:
reconnect_backoff = float(config["reconnect_backoff_seconds"]) reconnect_backoff = float(config["reconnect_backoff_seconds"])
last_text_message_monotonic: float | None = None last_text_message_monotonic: float | None = None
shutdown_reason: str | None = None shutdown_reason: str | None = None
config["command"] = command
def finish_current_session(reason: str, error: str | None = None) -> None:
nonlocal current_session
if current_session is None:
return
current_session["closed_at_utc"] = iso_z()
current_session["close_reason"] = current_session.get("close_reason") or reason
if error:
current_session["error"] = error
recent_sessions.append(dict(current_session))
del recent_sessions[:-20]
current_session = None
def write_runtime_manifest(status: str, gate_status: str = "IN_PROGRESS", reason: str | None = None) -> dict[str, Any]:
return write_manifest(
config=config,
run_id=run_id,
started_at_utc=started_at_utc,
status=status,
gate_status=gate_status,
shutdown_reason=reason,
markets=markets,
tokens=tokens,
counters=counters,
ws_writer=ws_writer,
rest_writer=rest_writer,
states=states,
warnings=warnings,
errors=errors,
recent_sessions=recent_sessions,
current_session=current_session,
)
try: try:
while not STOP_REQUESTED: while not STOP_REQUESTED:
if deadline is not None and time.monotonic() >= deadline: now_outer = time.monotonic()
if deadline is not None and now_outer >= deadline:
shutdown_reason = "duration_elapsed" shutdown_reason = "duration_elapsed"
break break
if time.monotonic() >= next_discovery or not tokens: if now_outer >= next_discovery or not tokens:
old_token_ids = [token["token_id"] for token in tokens] old_token_ids = [token["token_id"] for token in tokens]
try: try:
markets, tokens, rejection_counts = refresh_market_state(config, counters, warnings, errors) markets, tokens, rejection_counts = refresh_market_state(config, counters, warnings, errors)
counters["market_rejection_counts"] = rejection_counts
except Exception as exc: # noqa: BLE001 - preserve evidence and retry except Exception as exc: # noqa: BLE001 - preserve evidence and retry
counters["discovery_failure_count"] += 1 counters["discovery_failure_count"] += 1
errors.append({"stage": "load_discovery", "error": f"{type(exc).__name__}: {exc}"}) errors.append({"stage": "load_discovery", "error": f"{type(exc).__name__}: {exc}"})
runtime_status = "RUNNING_RECONNECTING"
write_runtime_manifest(runtime_status)
time.sleep(min(reconnect_backoff, 30)) time.sleep(min(reconnect_backoff, 30))
next_discovery = time.monotonic() + float(config["discovery_refresh_interval_seconds"]) next_discovery = time.monotonic() + float(config["discovery_refresh_interval_seconds"])
continue continue
new_token_ids = [token["token_id"] for token in tokens] new_token_ids = [token["token_id"] for token in tokens]
if old_token_ids and old_token_ids != new_token_ids: if old_token_ids and old_token_ids != new_token_ids:
counters["subscription_change_count"] += 1 counters["subscription_change_count"] += 1
warnings.append("subscription_rotated_after_discovery_refresh")
states = {token["token_id"]: states.get(token["token_id"], BookState(token)) for token in tokens} states = {token["token_id"]: states.get(token["token_id"], BookState(token)) for token in tokens}
next_discovery = time.monotonic() + float(config["discovery_refresh_interval_seconds"]) next_discovery = time.monotonic() + float(config["discovery_refresh_interval_seconds"])
if not tokens: if not tokens:
warnings.append("no active BTC Up/Down tokens available after discovery") runtime_status = "RUNNING_NO_ACTIVE_TOKENS"
warnings.append("no_active_tokens_after_discovery_refresh")
write_runtime_manifest(runtime_status)
time.sleep(min(reconnect_backoff, 30)) time.sleep(min(reconnect_backoff, 30))
continue continue
@ -1015,40 +1106,84 @@ def run_recorder(config: dict[str, Any], command: str) -> dict[str, Any]:
subscription = {"assets_ids": token_ids, "type": "market", "custom_feature_enabled": True} subscription = {"assets_ids": token_ids, "type": "market", "custom_feature_enabled": True}
connection_sequence += 1 connection_sequence += 1
session_id = f"{run_id}-ws{connection_sequence}" session_id = f"{run_id}-ws{connection_sequence}"
current_session = {
"session_id": session_id,
"connection_sequence": connection_sequence,
"connected_at_utc": None,
"subscribed_at_utc": None,
"closed_at_utc": None,
"messages": 0,
"close_reason": None,
"stale_reason": None,
"first_message_latency_seconds": None,
"last_message_received_at_utc": None,
"token_count": len(token_ids),
"market_slugs": [market.get("market_slug") for market in markets],
"market_end_times": [market.get("end_time_utc") for market in markets],
}
sock: ssl.SSLSocket | None = None sock: ssl.SSLSocket | None = None
session_last_text_message_monotonic: float | None = None
session_subscribed_monotonic: float | None = None
session_message_sequence = 0
stale_raised = False
try: try:
runtime_status = "RUNNING_RECONNECTING"
sock, handshake = open_websocket(str(config["websocket_url"]), float(config["websocket_timeout_seconds"])) sock, handshake = open_websocket(str(config["websocket_url"]), float(config["websocket_timeout_seconds"]))
counters["connection_count"] += 1 counters["connection_count"] += 1
current_session["connected_at_utc"] = iso_z()
current_session["handshake_status_line"] = handshake.get("status_line")
send_ws_frame(sock, 1, json.dumps(subscription, separators=(",", ":")).encode("utf-8")) send_ws_frame(sock, 1, json.dumps(subscription, separators=(",", ":")).encode("utf-8"))
session_message_sequence = 0 current_session["subscribed_at_utc"] = iso_z()
session_started_at_utc = iso_z() session_subscribed_monotonic = time.monotonic()
reconnect_backoff = float(config["reconnect_backoff_seconds"])
while not STOP_REQUESTED: while not STOP_REQUESTED:
now_monotonic = time.monotonic() now_monotonic = time.monotonic()
if deadline is not None and now_monotonic >= deadline: if deadline is not None and now_monotonic >= deadline:
shutdown_reason = "duration_elapsed" shutdown_reason = "duration_elapsed"
finish_current_session(shutdown_reason)
break break
if now_monotonic >= next_rest_checkpoint: if now_monotonic >= next_rest_checkpoint:
checkpoint_sequence += 1 checkpoint_sequence += 1
fetch_rest_checkpoint(config=config, rest_writer=rest_writer, checkpoint_sequence=checkpoint_sequence, tokens=tokens, states=states, counters=counters) fetch_rest_checkpoint(config=config, rest_writer=rest_writer, checkpoint_sequence=checkpoint_sequence, tokens=tokens, states=states, counters=counters)
next_rest_checkpoint = now_monotonic + float(config["rest_checkpoint_interval_seconds"]) next_rest_checkpoint = now_monotonic + float(config["rest_checkpoint_interval_seconds"])
if now_monotonic >= next_manifest_write: if now_monotonic >= next_manifest_write:
write_manifest(config=config, run_id=run_id, started_at_utc=started_at_utc, status="RUNNING", gate_status="IN_PROGRESS", shutdown_reason=None, markets=markets, tokens=tokens, counters=counters, ws_writer=ws_writer, rest_writer=rest_writer, states=states, warnings=warnings, errors=errors) write_runtime_manifest(runtime_status)
next_manifest_write = now_monotonic + float(config["manifest_write_interval_seconds"]) next_manifest_write = now_monotonic + float(config["manifest_write_interval_seconds"])
if now_monotonic >= next_discovery: if now_monotonic >= next_discovery:
previous_token_ids = token_ids previous_token_ids = token_ids
markets, tokens, rejection_counts = refresh_market_state(config, counters, warnings, errors) markets, tokens, rejection_counts = refresh_market_state(config, counters, warnings, errors)
counters["market_rejection_counts"] = rejection_counts
token_ids = [token["token_id"] for token in tokens] token_ids = [token["token_id"] for token in tokens]
next_discovery = now_monotonic + float(config["discovery_refresh_interval_seconds"]) next_discovery = now_monotonic + float(config["discovery_refresh_interval_seconds"])
if token_ids != previous_token_ids: if token_ids != previous_token_ids:
counters["subscription_change_count"] += 1 counters["subscription_change_count"] += 1
states = {token["token_id"]: states.get(token["token_id"], BookState(token)) for token in tokens} states = {token["token_id"]: states.get(token["token_id"], BookState(token)) for token in tokens}
if token_ids:
current_session["close_reason"] = "subscription_rotated"
raise RuntimeError("tracked token set changed; reconnecting with new subscription") raise RuntimeError("tracked token set changed; reconnecting with new subscription")
if last_text_message_monotonic is not None: current_session["close_reason"] = "no_active_tokens"
silence = now_monotonic - last_text_message_monotonic raise RuntimeError("no active tokens after discovery refresh")
if session_last_text_message_monotonic is None:
first_wait = now_monotonic - (session_subscribed_monotonic or now_monotonic)
if first_wait > float(config["first_message_timeout_seconds"]):
counters["stale_feed_count"] += 1
counters["consecutive_stale_reconnects"] += 1
stale_raised = True
reason = f"no first websocket text message for {first_wait:.3f}s"
current_session["stale_reason"] = reason
current_session["close_reason"] = "first_message_timeout"
runtime_status = "RUNNING_RECONNECTING"
raise TimeoutError(reason)
else:
silence = now_monotonic - session_last_text_message_monotonic
if silence > float(config["stale_feed_threshold_seconds"]): if silence > float(config["stale_feed_threshold_seconds"]):
counters["stale_feed_count"] += 1 counters["stale_feed_count"] += 1
raise TimeoutError(f"stale websocket feed for {silence:.3f}s") counters["consecutive_stale_reconnects"] += 1
stale_raised = True
reason = f"stale websocket feed for {silence:.3f}s"
current_session["stale_reason"] = reason
current_session["close_reason"] = "stale_feed"
runtime_status = "RUNNING_RECONNECTING"
raise TimeoutError(reason)
try: try:
opcode, payload = read_ws_frame(sock) opcode, payload = read_ws_frame(sock)
except socket.timeout: except socket.timeout:
@ -1061,9 +1196,18 @@ def run_recorder(config: dict[str, Any], command: str) -> dict[str, Any]:
gap = round(now_for_gap - last_text_message_monotonic, 3) gap = round(now_for_gap - last_text_message_monotonic, 3)
previous = counters.get("max_gap_seconds") previous = counters.get("max_gap_seconds")
counters["max_gap_seconds"] = gap if previous is None else max(previous, gap) counters["max_gap_seconds"] = gap if previous is None else max(previous, gap)
if session_message_sequence == 0 and session_subscribed_monotonic is not None:
current_session["first_message_latency_seconds"] = round(now_for_gap - session_subscribed_monotonic, 3)
reconnect_backoff = float(config["reconnect_backoff_seconds"])
last_text_message_monotonic = now_for_gap last_text_message_monotonic = now_for_gap
session_last_text_message_monotonic = now_for_gap
counters["last_message_received_at_utc"] = received_at_utc counters["last_message_received_at_utc"] = received_at_utc
counters["last_successful_ws_message_at_utc"] = received_at_utc
counters["consecutive_stale_reconnects"] = 0
runtime_status = "RUNNING_RECEIVING"
session_message_sequence += 1 session_message_sequence += 1
current_session["messages"] = session_message_sequence
current_session["last_message_received_at_utc"] = received_at_utc
global_sequence += 1 global_sequence += 1
envelope, parsed_json, event_types, parse_ok = build_ws_envelope( envelope, parsed_json, event_types, parse_ok = build_ws_envelope(
run_id=run_id, run_id=run_id,
@ -1086,6 +1230,7 @@ def run_recorder(config: dict[str, Any], command: str) -> dict[str, Any]:
else: else:
counters["websocket_parse_error_count"] += 1 counters["websocket_parse_error_count"] += 1
elif opcode == 8: elif opcode == 8:
current_session["close_reason"] = "websocket_close_frame"
raise EOFError("websocket close frame received") raise EOFError("websocket close frame received")
elif opcode == 9: elif opcode == 9:
send_ws_frame(sock, 10, payload) send_ws_frame(sock, 10, payload)
@ -1096,8 +1241,17 @@ def run_recorder(config: dict[str, Any], command: str) -> dict[str, Any]:
if shutdown_reason: if shutdown_reason:
break break
except Exception as exc: # noqa: BLE001 - preserve reconnect evidence except Exception as exc: # noqa: BLE001 - preserve reconnect evidence
errors.append({"stage": "websocket_session", "connection_sequence": connection_sequence, "error": f"{type(exc).__name__}: {exc}"}) error_text = f"{type(exc).__name__}: {exc}"
errors.append({"stage": "websocket_session", "connection_sequence": connection_sequence, "error": error_text})
counters["reconnect_count"] += 1 counters["reconnect_count"] += 1
if not stale_raised:
counters["consecutive_stale_reconnects"] = 0
finish_current_session(current_session.get("close_reason") if current_session else type(exc).__name__, error_text)
runtime_status = "RUNNING_RECONNECTING"
next_manifest_write = 0.0
if stale_raised and counters["consecutive_stale_reconnects"] >= int(config["max_consecutive_stale_reconnects_before_discovery_refresh"]):
next_discovery = 0.0
warnings.append("forced_discovery_refresh_after_consecutive_stale_reconnects")
if STOP_REQUESTED: if STOP_REQUESTED:
shutdown_reason = STOP_SIGNAL or "stop_requested" shutdown_reason = STOP_SIGNAL or "stop_requested"
break break
@ -1109,11 +1263,15 @@ def run_recorder(config: dict[str, Any], command: str) -> dict[str, Any]:
sock.close() sock.close()
except OSError: except OSError:
pass pass
if current_session is not None and shutdown_reason:
finish_current_session(shutdown_reason)
if shutdown_reason: if shutdown_reason:
break break
finally: finally:
if STOP_REQUESTED and shutdown_reason is None: if STOP_REQUESTED and shutdown_reason is None:
shutdown_reason = STOP_SIGNAL or "stop_requested" shutdown_reason = STOP_SIGNAL or "stop_requested"
if current_session is not None:
finish_current_session(shutdown_reason or "loop_exited")
ws_writer.close() ws_writer.close()
rest_writer.close() rest_writer.close()
@ -1124,11 +1282,26 @@ def run_recorder(config: dict[str, Any], command: str) -> dict[str, Any]:
else: else:
gate_status = "BLOCKED_RUNTIME_EVIDENCE" gate_status = "BLOCKED_RUNTIME_EVIDENCE"
status = "INTERRUPTED" if STOP_SIGNAL else ("COMPLETED_BOUNDED" if config["duration_seconds"] else "STOPPED") status = "INTERRUPTED" if STOP_SIGNAL else ("COMPLETED_BOUNDED" if config["duration_seconds"] else "STOPPED")
config["command"] = command manifest = write_manifest(
manifest = write_manifest(config=config, run_id=run_id, started_at_utc=started_at_utc, status=status, gate_status=gate_status, shutdown_reason=shutdown_reason, markets=markets, tokens=tokens, counters=counters, ws_writer=ws_writer, rest_writer=rest_writer, states=states, warnings=warnings, errors=errors) config=config,
run_id=run_id,
started_at_utc=started_at_utc,
status=status,
gate_status=gate_status,
shutdown_reason=shutdown_reason,
markets=markets,
tokens=tokens,
counters=counters,
ws_writer=ws_writer,
rest_writer=rest_writer,
states=states,
warnings=warnings,
errors=errors,
recent_sessions=recent_sessions,
current_session=current_session,
)
return manifest return manifest
def parse_args() -> argparse.Namespace: def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Long-running Polymarket BTC websocket raw recorder with REST checkpoints.") parser = argparse.ArgumentParser(description="Long-running Polymarket BTC websocket raw recorder with REST checkpoints.")
parser.add_argument("--config", type=Path, default=DEFAULT_CONFIG_PATH) parser.add_argument("--config", type=Path, default=DEFAULT_CONFIG_PATH)

View file

@ -0,0 +1,190 @@
#!/usr/bin/env bash
set -euo pipefail
NAMESPACE="${ORDERBOOKS_K8S_NAMESPACE:-orderbooks}"
WS_DEPLOYMENT="${ORDERBOOKS_WS_DEPLOYMENT:-orderbooks-ws-recorder}"
REST_DEPLOYMENT="${ORDERBOOKS_REST_DEPLOYMENT:-orderbooks-collector}"
WAIT_SECONDS="${ORDERBOOKS_K8S_WS_RELIABILITY_WAIT_SECONDS:-1800}"
OUTPUT_PATH=""
RAW_DIR="/var/lib/orderbooks/raw_orderbooks"
MANIFEST_PATH="/var/lib/orderbooks/manifests/polymarket_ws_recorder_latest.json"
usage() {
cat <<'EOF'
Usage: scripts/k8s_ws_reliability_check.sh [options]
Read-only bounded observation for the Kubernetes websocket recorder canary. It
writes compact local JSON evidence and does not print secret contents.
Options:
--namespace NAME Namespace. Default: orderbooks.
--deployment NAME Websocket recorder Deployment. Default: orderbooks-ws-recorder.
--rest-deployment NAME REST collector Deployment to prove unchanged. Default: orderbooks-collector.
--wait-seconds N Observation window. Default: 1800.
--output PATH Local evidence JSON path.
--raw-dir PATH In-pod raw root. Default: /var/lib/orderbooks/raw_orderbooks.
--manifest-path PATH In-pod websocket manifest path.
--help Show help.
EOF
}
while [[ $# -gt 0 ]]; do
case "$1" in
--namespace) NAMESPACE="$2"; shift 2 ;;
--deployment) WS_DEPLOYMENT="$2"; shift 2 ;;
--rest-deployment) REST_DEPLOYMENT="$2"; shift 2 ;;
--wait-seconds) WAIT_SECONDS="$2"; shift 2 ;;
--output) OUTPUT_PATH="$2"; shift 2 ;;
--raw-dir) RAW_DIR="$2"; shift 2 ;;
--manifest-path) MANIFEST_PATH="$2"; shift 2 ;;
--help) usage; exit 0 ;;
*) echo "unknown argument: $1" >&2; usage >&2; exit 2 ;;
esac
done
command -v kubectl >/dev/null 2>&1 || { echo "kubectl is required" >&2; exit 2; }
command -v python3 >/dev/null 2>&1 || { echo "python3 is required" >&2; exit 2; }
RUN_ID="$(date -u +%Y%m%dT%H%M%SZ)"
if [[ -z "$OUTPUT_PATH" ]]; then
OUTPUT_PATH="data/manifests/ws_reliability_observation_${RUN_ID}.json"
fi
mkdir -p "$(dirname "$OUTPUT_PATH")"
TMPDIR="$(mktemp -d)"
trap 'rm -rf "$TMPDIR"' EXIT
pod_for_deployment() {
local deployment="$1"
local selector
selector="$(kubectl -n "$NAMESPACE" get deployment "$deployment" -o json | python3 -c 'import json, sys; labels=json.load(sys.stdin)["spec"]["selector"]["matchLabels"]; print(",".join(f"{k}={v}" for k,v in sorted(labels.items())))')"
[[ -n "$selector" ]] || return 1
kubectl -n "$NAMESPACE" get pod -l "$selector" -o jsonpath='{.items[?(@.status.phase=="Running")].metadata.name}' | awk '{print $1}'
}
kubectl -n "$NAMESPACE" rollout status "deployment/${REST_DEPLOYMENT}" --timeout=120s >/dev/null
kubectl -n "$NAMESPACE" rollout status "deployment/${WS_DEPLOYMENT}" --timeout=180s >/dev/null
REST_IMAGE_BEFORE="$(kubectl -n "$NAMESPACE" get deployment "$REST_DEPLOYMENT" -o jsonpath='{.spec.template.spec.containers[0].image}')"
REST_READY_BEFORE="$(kubectl -n "$NAMESPACE" get deployment "$REST_DEPLOYMENT" -o jsonpath='{.status.readyReplicas}/{.spec.replicas}')"
WS_IMAGE="$(kubectl -n "$NAMESPACE" get deployment "$WS_DEPLOYMENT" -o jsonpath='{.spec.template.spec.containers[0].image}')"
WS_POD="$(pod_for_deployment "$WS_DEPLOYMENT")"
[[ -n "$WS_POD" ]] || { echo "missing running websocket pod" >&2; exit 1; }
SUMMARY_PY="$TMPDIR/reliability-summary.py"
cat >"$SUMMARY_PY" <<'PY_SUMMARY'
import json, os, time
from pathlib import Path
raw=Path(os.environ['RAW_DIR'])
manifest_path=Path(os.environ['MANIFEST_PATH'])
o=json.loads(manifest_path.read_text())
def file_summary(path):
if not path:
return None
p=Path(path)
if not p.exists():
return {'path': str(p), 'exists': False}
st=p.stat()
return {'path': str(p), 'exists': True, 'bytes': st.st_size, 'mtime_utc': time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime(st.st_mtime))}
open_files=o.get('open_files') or []
ws_open=next((x for x in open_files if 'ws_raw' in str(x.get('path'))), None)
rest_open=next((x for x in open_files if 'rest_checkpoints' in str(x.get('path'))), None)
ws_files=sorted(raw.glob('polymarket/ws_raw/**/*.jsonl.gz'), key=lambda p:p.stat().st_mtime)
rest_files=sorted(raw.glob('polymarket/rest_checkpoints/**/*.jsonl.gz'), key=lambda p:p.stat().st_mtime)
print(json.dumps({
'sampled_at_utc': time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime()),
'manifest_path': str(manifest_path),
'manifest_mtime_utc': time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime(manifest_path.stat().st_mtime)),
'run_id': o.get('run_id'),
'status': o.get('status'),
'gate_status': o.get('gate_status'),
'updated_at_utc': o.get('updated_at_utc'),
'seconds_since_last_ws_message': o.get('seconds_since_last_ws_message'),
'last_successful_ws_message_at_utc': o.get('last_successful_ws_message_at_utc'),
'current_subscription_token_ids': o.get('current_subscription_token_ids'),
'current_tracked_market_end_times': o.get('current_tracked_market_end_times'),
'current_session': o.get('current_session'),
'recent_sessions': (o.get('recent_sessions') or [])[-5:],
'counters': o.get('counters') or {},
'ws_open_file': ws_open,
'rest_open_file': rest_open,
'latest_closed_ws': file_summary(ws_files[-1]) if ws_files else None,
'latest_closed_rest': file_summary(rest_files[-1]) if rest_files else None,
}, sort_keys=True))
PY_SUMMARY
sample_pod() {
kubectl -n "$NAMESPACE" exec "$WS_POD" -- env RAW_DIR="$RAW_DIR" MANIFEST_PATH="$MANIFEST_PATH" python3 -c "$(cat "$SUMMARY_PY")"
}
START_JSON="$(sample_pod)"
sleep "$WAIT_SECONDS"
kubectl -n "$NAMESPACE" rollout status "deployment/${WS_DEPLOYMENT}" --timeout=180s >/dev/null
WS_POD_AFTER="$(pod_for_deployment "$WS_DEPLOYMENT")"
if [[ "$WS_POD_AFTER" != "$WS_POD" ]]; then
WS_POD="$WS_POD_AFTER"
fi
END_JSON="$(sample_pod)"
REST_IMAGE_AFTER="$(kubectl -n "$NAMESPACE" get deployment "$REST_DEPLOYMENT" -o jsonpath='{.spec.template.spec.containers[0].image}')"
REST_READY_AFTER="$(kubectl -n "$NAMESPACE" get deployment "$REST_DEPLOYMENT" -o jsonpath='{.status.readyReplicas}/{.spec.replicas}')"
WRITE_PY="$TMPDIR/write.py"
cat >"$WRITE_PY" <<'PY_WRITE'
import datetime as dt, json, sys
from pathlib import Path
(output_path, namespace, ws_deployment, rest_deployment, wait_seconds, ws_image, rest_image_before, rest_ready_before, rest_image_after, rest_ready_after)=sys.argv[1:11]
start=json.loads(sys.stdin.readline())
end=json.loads(sys.stdin.readline())
def c(obj, key):
return (obj.get('counters') or {}).get(key)
def num(value):
return 0 if value is None else value
ws_delta=num(c(end,'websocket_message_count'))-num(c(start,'websocket_message_count'))
rest_delta=num(c(end,'rest_success_count'))-num(c(start,'rest_success_count'))
stale_delta=num(c(end,'stale_feed_count'))-num(c(start,'stale_feed_count'))
reconnect_delta=num(c(end,'reconnect_count'))-num(c(start,'reconnect_count'))
parse_errors=num(c(end,'websocket_parse_error_count'))
threshold=float(((end.get('counters') or {}).get('stale_feed_threshold_seconds') or 90))
seconds_since=end.get('seconds_since_last_ws_message')
active_tokens=len(end.get('current_subscription_token_ids') or [])
start_ws_open=start.get('ws_open_file') or {}
end_ws_open=end.get('ws_open_file') or {}
file_advanced=False
if start_ws_open and end_ws_open:
file_advanced = end_ws_open.get('path') != start_ws_open.get('path') or num(end_ws_open.get('bytes')) > num(start_ws_open.get('bytes')) or end_ws_open.get('mtime_utc') != start_ws_open.get('mtime_utc')
reasons=[]
if active_tokens and ws_delta <= 0:
reasons.append('websocket_message_count did not increase while active tokens existed')
if active_tokens and not file_advanced:
reasons.append('websocket open file did not advance while active tokens existed')
if rest_delta <= 0:
reasons.append('REST checkpoint success count did not increase')
if parse_errors != 0:
reasons.append('websocket_parse_error_count is nonzero')
if active_tokens and seconds_since is not None and seconds_since > max(threshold * 2, 180):
reasons.append('seconds_since_last_ws_message exceeded reliability threshold')
if reconnect_delta > 3 or stale_delta > 3:
reasons.append('reconnect/stale counters grew rapidly during observation')
if rest_image_before != rest_image_after or rest_ready_before != rest_ready_after:
reasons.append('REST collector image/readiness changed')
gate='WS_RELIABILITY_OBSERVATION_PASS' if not reasons else 'BLOCKED_WS_STALE_LOOP'
manifest={
'schema_name':'ws_reliability_observation',
'schema_version':1,
'written_at_utc':dt.datetime.now(dt.UTC).replace(microsecond=0).isoformat().replace('+00:00','Z'),
'gate_status':gate,
'namespace':namespace,
'ws_deployment':ws_deployment,
'rest_deployment':rest_deployment,
'wait_seconds':int(float(wait_seconds)),
'ws_image':ws_image,
'rest_collector':{'image_before':rest_image_before,'ready_before':rest_ready_before,'image_after':rest_image_after,'ready_after':rest_ready_after,'unchanged':rest_image_before==rest_image_after and rest_ready_before==rest_ready_after},
'start':start,
'end':end,
'deltas':{'websocket_message_count':ws_delta,'rest_success_count':rest_delta,'stale_feed_count':stale_delta,'reconnect_count':reconnect_delta},
'file_advanced':file_advanced,
'reasons':reasons,
'production_ready':False,
}
Path(output_path).write_text(json.dumps(manifest, indent=2, sort_keys=True)+'\n')
print(json.dumps({'gate_status':gate,'evidence_path':output_path,'deltas':manifest['deltas'],'reasons':reasons}, indent=2, sort_keys=True))
raise SystemExit(0 if gate == 'WS_RELIABILITY_OBSERVATION_PASS' else 1)
PY_WRITE
printf '%s\n%s\n' "$START_JSON" "$END_JSON" | python3 "$WRITE_PY" "$OUTPUT_PATH" "$NAMESPACE" "$WS_DEPLOYMENT" "$REST_DEPLOYMENT" "$WAIT_SECONDS" "$WS_IMAGE" "$REST_IMAGE_BEFORE" "$REST_READY_BEFORE" "$REST_IMAGE_AFTER" "$REST_READY_AFTER"