orderbooks/scripts/k8s_ws_runtime_smoke_check.sh
2026-04-19 19:22:07 +02:00

422 lines
19 KiB
Bash
Executable file

#!/usr/bin/env bash
set -euo pipefail
NAMESPACE="${ORDERBOOKS_K8S_NAMESPACE:-orderbooks}"
WS_DEPLOYMENT="${ORDERBOOKS_WS_DEPLOYMENT:-orderbooks-ws-recorder}"
REST_DEPLOYMENT="${ORDERBOOKS_REST_DEPLOYMENT:-orderbooks-collector}"
UPLOADER_CRONJOB="${ORDERBOOKS_UPLOADER_CRONJOB:-orderbooks-uploader}"
WAIT_SECONDS="${ORDERBOOKS_K8S_WS_SMOKE_WAIT_SECONDS:-900}"
OUTPUT_PATH=""
RAW_DIR="/var/lib/orderbooks/raw_orderbooks"
MANIFEST_DIR="/var/lib/orderbooks/manifests"
UPLOAD_MIN_AGE_SECONDS="600"
SMOKE_UPLOAD_RETENTION_DAYS="3"
usage() {
cat <<'EOF'
Usage: scripts/k8s_ws_runtime_smoke_check.sh [options]
Verifies the Kubernetes websocket recorder canary and writes compact local JSON
evidence. The script does not print secret contents.
Options:
--namespace NAME Namespace. Default: orderbooks.
--deployment NAME Websocket recorder Deployment. Default: orderbooks-ws-recorder.
--rest-deployment NAME Existing REST Deployment. Default: orderbooks-collector.
--cronjob NAME Production uploader CronJob. Default: orderbooks-uploader.
--wait-seconds N Max wait for runtime evidence. Default: 900.
--output PATH Local smoke evidence path.
--raw-dir PATH In-pod raw root. Default: /var/lib/orderbooks/raw_orderbooks.
--manifest-dir PATH In-pod manifest dir. Default: /var/lib/orderbooks/manifests.
--upload-min-age-seconds N Production upload min age to record. Default: 600.
--smoke-upload-retention-days N Retention used by one-off smoke upload Job. Default: 3.
--help Show help.
EOF
}
while [[ $# -gt 0 ]]; do
case "$1" in
--namespace) NAMESPACE="$2"; shift 2 ;;
--deployment) WS_DEPLOYMENT="$2"; shift 2 ;;
--rest-deployment) REST_DEPLOYMENT="$2"; shift 2 ;;
--cronjob) UPLOADER_CRONJOB="$2"; shift 2 ;;
--wait-seconds) WAIT_SECONDS="$2"; shift 2 ;;
--output) OUTPUT_PATH="$2"; shift 2 ;;
--raw-dir) RAW_DIR="$2"; shift 2 ;;
--manifest-dir) MANIFEST_DIR="$2"; shift 2 ;;
--upload-min-age-seconds) UPLOAD_MIN_AGE_SECONDS="$2"; shift 2 ;;
--smoke-upload-retention-days) SMOKE_UPLOAD_RETENTION_DAYS="$2"; shift 2 ;;
--help) usage; exit 0 ;;
*) echo "unknown argument: $1" >&2; usage >&2; exit 2 ;;
esac
done
command -v kubectl >/dev/null 2>&1 || { echo "kubectl is required" >&2; exit 2; }
RUN_ID="$(date -u +%Y%m%dT%H%M%SZ)"
if [[ -z "$OUTPUT_PATH" ]]; then
OUTPUT_PATH="data/manifests/k8s_ws_runtime_smoke_${RUN_ID}.json"
fi
mkdir -p "$(dirname "$OUTPUT_PATH")"
TMPDIR="$(mktemp -d)"
trap 'rm -rf "$TMPDIR"' EXIT
write_blocked() {
local gate="$1"
local reason="$2"
python3 - "$OUTPUT_PATH" "$gate" "$reason" <<'PY_BLOCKED'
import datetime as dt
import json
import sys
from pathlib import Path
path=Path(sys.argv[1])
manifest={
"schema_name":"k8s_ws_runtime_smoke",
"schema_version":1,
"written_at_utc":dt.datetime.now(dt.UTC).replace(microsecond=0).isoformat().replace('+00:00','Z'),
"gate_status":sys.argv[2],
"reason":sys.argv[3],
"production_ready":False,
}
path.write_text(json.dumps(manifest,indent=2,sort_keys=True)+'\n')
PY_BLOCKED
}
pod_for_deployment() {
local deployment="$1"
local selector
selector="$(kubectl -n "$NAMESPACE" get deployment "$deployment" -o json | python3 -c 'import json, sys; labels=json.load(sys.stdin)["spec"]["selector"]["matchLabels"]; print(",".join(f"{k}={v}" for k,v in sorted(labels.items())))')"
[[ -n "$selector" ]] || return 1
kubectl -n "$NAMESPACE" get pod -l "$selector" -o jsonpath='{.items[?(@.status.phase=="Running")].metadata.name}' | awk '{print $1}'
}
REST_IMAGE_BEFORE="$(kubectl -n "$NAMESPACE" get deployment "$REST_DEPLOYMENT" -o jsonpath='{.spec.template.spec.containers[0].image}')"
REST_READY_BEFORE="$(kubectl -n "$NAMESPACE" get deployment "$REST_DEPLOYMENT" -o jsonpath='{.status.readyReplicas}/{.spec.replicas}')"
kubectl -n "$NAMESPACE" rollout status "deployment/${REST_DEPLOYMENT}" --timeout=120s >/dev/null
kubectl -n "$NAMESPACE" rollout status "deployment/${WS_DEPLOYMENT}" --timeout=300s >/dev/null
REST_POD="$(pod_for_deployment "$REST_DEPLOYMENT")"
WS_POD="$(pod_for_deployment "$WS_DEPLOYMENT")"
if [[ -z "$REST_POD" || -z "$WS_POD" ]]; then
write_blocked "BLOCKED_K8S_RUNTIME_FAILURE" "missing running REST or websocket pod"
exit 1
fi
SUMMARY_PY="${TMPDIR}/summary.py"
cat >"${SUMMARY_PY}" <<'PY_SUMMARY'
import gzip, hashlib, json, os
from pathlib import Path
raw_root=Path(os.environ.get('RAW_DIR','/var/lib/orderbooks/raw_orderbooks'))
manifest_dir=Path(os.environ.get('MANIFEST_DIR','/var/lib/orderbooks/manifests'))
check_path=os.environ.get('CHECK_PATH') or ''
def sha(path):
h=hashlib.sha256()
with path.open('rb') as f:
for chunk in iter(lambda:f.read(1024*1024), b''):
h.update(chunk)
return h.hexdigest()
def count_gz(path):
rows=0
first=None
with gzip.open(path,'rt',encoding='utf-8') as f:
for line in f:
if line.strip():
obj=json.loads(line)
if first is None:
first=obj
rows+=1
return rows, first
def summarize_gz(path):
rows, first = count_gz(path)
return {'path':str(path),'rows':rows,'bytes':path.stat().st_size,'sha256':sha(path),'first_schema':first.get('schema_name') if isinstance(first,dict) else None}
def load_json(path):
return json.loads(path.read_text())
ws_files=sorted(raw_root.glob('polymarket/ws_raw/**/*.jsonl.gz'))
rest_files=sorted(raw_root.glob('polymarket/rest_checkpoints/**/*.jsonl.gz'))
open_files=sorted([p for p in raw_root.glob('polymarket/**/*') if p.is_file() and (p.name.startswith('.') or p.name.endswith(('.open','.tmp','.partial')))])
recorder_manifests=sorted(manifest_dir.glob('polymarket_ws_recorder_*.json'), key=lambda p: p.stat().st_mtime)
upload_manifests=sorted(manifest_dir.glob('upload_archive_*.json'), key=lambda p: p.stat().st_mtime)
latest_manifest=load_json(recorder_manifests[-1]) if recorder_manifests else None
latest_upload=load_json(upload_manifests[-1]) if upload_manifests else None
result={
'ws_file_count':len(ws_files),
'rest_file_count':len(rest_files),
'open_or_temp_files':[str(p) for p in open_files[:20]],
'open_or_temp_file_count':len(open_files),
'recorder_manifest_count':len(recorder_manifests),
'upload_manifest_count':len(upload_manifests),
'latest_manifest_path':str(recorder_manifests[-1]) if recorder_manifests else None,
'latest_upload_manifest_path':str(upload_manifests[-1]) if upload_manifests else None,
'latest_manifest':latest_manifest,
'latest_upload_manifest':latest_upload,
}
if ws_files:
result['latest_ws']=summarize_gz(ws_files[-1])
if rest_files:
result['latest_rest']=summarize_gz(rest_files[-1])
if check_path:
p=Path(check_path)
result['specific_file']={'path':check_path,'exists':p.exists()}
if p.exists():
result['specific_file'].update(summarize_gz(p))
print(json.dumps(result, sort_keys=True))
PY_SUMMARY
summarize_pod() {
local pod="$1"
local check_path="${2:-}"
kubectl -n "$NAMESPACE" exec "$pod" -- env RAW_DIR="$RAW_DIR" MANIFEST_DIR="$MANIFEST_DIR" CHECK_PATH="$check_path" python3 -c "$(cat "$SUMMARY_PY")"
}
WAIT_PY="${TMPDIR}/wait_condition.py"
cat >"${WAIT_PY}" <<'PY_WAIT'
import json, sys
mode=sys.argv[1]
old_run=sys.argv[2] if len(sys.argv) > 2 else ''
o=json.loads(sys.stdin.read())
manifest=o.get('latest_manifest') or {}
counters=manifest.get('counters') or {}
if mode == 'initial':
if counters.get('websocket_message_count',0) > 0 and counters.get('rest_success_count',0) > 0:
raise SystemExit(0)
elif mode == 'post_restart':
if manifest.get('run_id') and manifest.get('run_id') != old_run and counters.get('websocket_message_count',0) > 0:
raise SystemExit(0)
raise SystemExit(1)
PY_WAIT
initial_json=""
end=$((SECONDS + WAIT_SECONDS))
while [[ $SECONDS -lt $end ]]; do
initial_json="$(summarize_pod "$WS_POD")"
if python3 "$WAIT_PY" initial "" <<<"$initial_json"; then
break
fi
sleep 15
done
if ! python3 "$WAIT_PY" initial "" <<<"$initial_json"; then
write_blocked "BLOCKED_WS_RECORDER_RUNTIME" "websocket recorder did not expose initial websocket and REST counters before timeout"
exit 1
fi
old_run_id="$(python3 -c 'import json,sys; o=json.loads(sys.stdin.read()); print((o.get("latest_manifest") or {}).get("run_id") or "")' <<<"$initial_json")"
old_ws_path="$(python3 -c 'import json,sys; o=json.loads(sys.stdin.read()); print((o.get("latest_ws") or {}).get("path") or "")' <<<"$initial_json")"
old_ws_sha="$(python3 -c 'import json,sys; o=json.loads(sys.stdin.read()); print((o.get("latest_ws") or {}).get("sha256") or "")' <<<"$initial_json")"
old_ws_rows="$(python3 -c 'import json,sys; o=json.loads(sys.stdin.read()); print((o.get("latest_ws") or {}).get("rows") or 0)' <<<"$initial_json")"
kubectl -n "$NAMESPACE" delete pod "$WS_POD" --wait=true >/dev/null
kubectl -n "$NAMESPACE" rollout status "deployment/${WS_DEPLOYMENT}" --timeout=300s >/dev/null
NEW_WS_POD="$(pod_for_deployment "$WS_DEPLOYMENT")"
if [[ -z "$NEW_WS_POD" ]]; then
write_blocked "BLOCKED_WS_RECORDER_RUNTIME" "websocket pod did not return after restart"
exit 1
fi
restart_json="$(summarize_pod "$NEW_WS_POD" "$old_ws_path")"
post_json=""
end=$((SECONDS + WAIT_SECONDS))
while [[ $SECONDS -lt $end ]]; do
post_json="$(summarize_pod "$NEW_WS_POD" "$old_ws_path")"
if python3 "$WAIT_PY" post_restart "$old_run_id" <<<"$post_json"; then
break
fi
sleep 15
done
if ! python3 "$WAIT_PY" post_restart "$old_run_id" <<<"$post_json"; then
write_blocked "BLOCKED_WS_RECORDER_RUNTIME" "new websocket pod did not write post-restart manifest evidence before timeout"
exit 1
fi
UPLOADER_IMAGE="$(kubectl -n "$NAMESPACE" get cronjob "$UPLOADER_CRONJOB" -o jsonpath='{.spec.jobTemplate.spec.template.spec.containers[0].image}')"
JOB_NAME="orderbooks-ws-smoke-upload-${RUN_ID,,}"
JOB_NAME="${JOB_NAME//_/-}"
cat >"${TMPDIR}/upload-job.yaml" <<EOF_JOB
apiVersion: batch/v1
kind: Job
metadata:
name: ${JOB_NAME}
namespace: ${NAMESPACE}
labels:
app.kubernetes.io/name: orderbooks
app.kubernetes.io/part-of: orderbooks
app.kubernetes.io/component: ws-smoke-uploader
spec:
backoffLimit: 0
ttlSecondsAfterFinished: 86400
template:
metadata:
labels:
app.kubernetes.io/name: orderbooks
app.kubernetes.io/component: ws-smoke-uploader
spec:
restartPolicy: Never
imagePullSecrets:
- name: orderbooks-registry-creds
securityContext:
runAsNonRoot: true
runAsUser: 10001
runAsGroup: 10001
fsGroup: 10001
fsGroupChangePolicy: OnRootMismatch
containers:
- name: uploader
image: ${UPLOADER_IMAGE}
imagePullPolicy: IfNotPresent
command:
- /bin/bash
- /app/scripts/upload_archive_rclone.sh
- --execute
- --cleanup-after-verify
env:
- name: ORDERBOOKS_DATA_DIR
value: /var/lib/orderbooks
- name: ORDERBOOKS_UPLOAD_DATA_DIR
value: /var/lib/orderbooks
- name: ORDERBOOKS_UPLOAD_RAW_DIR
value: /var/lib/orderbooks/raw_orderbooks
- name: ORDERBOOKS_UPLOAD_SOURCE_MANIFEST_DIR
value: /var/lib/orderbooks/manifests
- name: ORDERBOOKS_UPLOAD_MANIFEST_DIR
value: /var/lib/orderbooks/manifests
- name: ORDERBOOKS_UPLOAD_MIN_AGE_SECONDS
value: "0"
- name: ORDERBOOKS_UPLOAD_RETENTION_DAYS
value: "${SMOKE_UPLOAD_RETENTION_DAYS}"
- name: ORDERBOOKS_RCLONE_BIN
value: /usr/bin/rclone
- name: ORDERBOOKS_RCLONE_DEST
value: gdrive:orderbooks/polymarket
- name: RCLONE_CONFIG
value: /etc/rclone/rclone.conf
volumeMounts:
- name: orderbooks-data
mountPath: /var/lib/orderbooks
- name: rclone-config
mountPath: /etc/rclone/rclone.conf
subPath: rclone.conf
readOnly: true
resources:
requests:
cpu: 50m
memory: 128Mi
limits:
cpu: 500m
memory: 512Mi
securityContext:
allowPrivilegeEscalation: false
capabilities:
drop:
- ALL
volumes:
- name: orderbooks-data
persistentVolumeClaim:
claimName: orderbooks-data
- name: rclone-config
secret:
secretName: orderbooks-rclone-config
items:
- key: rclone.conf
path: rclone.conf
EOF_JOB
kubectl apply -f "${TMPDIR}/upload-job.yaml" >/dev/null
kubectl -n "$NAMESPACE" wait --for=condition=Complete --timeout=900s "job/${JOB_NAME}" >/dev/null || true
JOB_STATUS="$(kubectl -n "$NAMESPACE" get job "$JOB_NAME" -o jsonpath='{.status.conditions[-1:].type}' 2>/dev/null || true)"
JOB_LOG_TAIL="$(kubectl -n "$NAMESPACE" logs "job/${JOB_NAME}" --tail=80 2>/dev/null || true)"
upload_json="$(summarize_pod "$NEW_WS_POD" "$old_ws_path")"
REST_IMAGE_AFTER="$(kubectl -n "$NAMESPACE" get deployment "$REST_DEPLOYMENT" -o jsonpath='{.spec.template.spec.containers[0].image}')"
REST_READY_AFTER="$(kubectl -n "$NAMESPACE" get deployment "$REST_DEPLOYMENT" -o jsonpath='{.status.readyReplicas}/{.spec.replicas}')"
WRITE_PY="${TMPDIR}/write_evidence.py"
cat >"${WRITE_PY}" <<'PY_WRITE'
import datetime as dt, json, sys
from pathlib import Path
(output_path, namespace, ws_deployment, rest_deployment, uploader_cronjob,
rest_image_before, rest_ready_before, rest_image_after, rest_ready_after,
ws_pod_before, ws_pod_after, old_ws_path, old_ws_sha, old_ws_rows,
job_name, job_status, production_min_age, smoke_retention_days, uploader_image) = sys.argv[1:20]
text=sys.stdin.read()
parts=text.split('\n---PART---\n')
initial=json.loads(parts[0])
restart=json.loads(parts[1])
post=json.loads(parts[2])
upload=json.loads(parts[3])
job_log_tail=parts[4]
reasons=[]
old_ws_rows=int(old_ws_rows or 0)
old_file_preexisting=bool(old_ws_path)
if old_file_preexisting:
specific=(post.get('specific_file') or {})
if not specific.get('exists') or specific.get('sha256') != old_ws_sha or int(specific.get('rows') or 0) != old_ws_rows:
reasons.append('pre-existing closed websocket file changed or failed parse after restart')
else:
latest_after=(restart.get('latest_ws') or post.get('latest_ws') or {})
if int(latest_after.get('rows') or 0) <= 0:
reasons.append('SIGTERM did not produce a parseable closed websocket archive')
post_manifest=post.get('latest_manifest') or {}
initial_manifest=initial.get('latest_manifest') or {}
post_counters=post_manifest.get('counters') or {}
if not post_manifest.get('run_id') or post_manifest.get('run_id') == initial_manifest.get('run_id'):
reasons.append('post-restart recorder manifest did not come from a new run')
if int(post_counters.get('websocket_message_count') or 0) <= 0:
reasons.append('post-restart recorder did not write websocket message evidence')
if not upload.get('latest_ws') or int((upload.get('latest_ws') or {}).get('rows') or 0) <= 0:
reasons.append('websocket gzip evidence missing or empty')
if not upload.get('latest_rest') or int((upload.get('latest_rest') or {}).get('rows') or 0) <= 0:
reasons.append('REST checkpoint gzip evidence missing or empty')
if rest_image_before != rest_image_after:
reasons.append('REST collector image changed')
if rest_ready_after != rest_ready_before:
reasons.append('REST collector readiness changed')
if job_status != 'Complete':
reasons.append('smoke uploader job did not complete')
upload_manifest=upload.get('latest_upload_manifest') or {}
verified_files=upload_manifest.get('verified_files') or []
skipped_files=upload_manifest.get('skipped_files') or []
deleted_files=upload_manifest.get('deleted_local_files') or []
retained_files=upload_manifest.get('retained_local_files') or []
verified_paths={item.get('relative_path') for item in verified_files}
verified_ws_or_rest=[item for item in verified_files if 'polymarket/ws_raw/' in str(item.get('relative_path')) or 'polymarket/rest_checkpoints/' in str(item.get('relative_path'))]
open_count=max(initial.get('open_or_temp_file_count') or 0, post.get('open_or_temp_file_count') or 0, upload.get('open_or_temp_file_count') or 0)
skipped_open=[item for item in skipped_files if item.get('reason') == 'open_or_temporary_file']
unsafe_deletes=[item for item in deleted_files if item.get('relative_path') not in verified_paths]
if upload_manifest.get('gate_status') != 'PASS' or upload_manifest.get('operation_status') != 'UPLOAD_VERIFIED':
reasons.append('upload manifest did not prove verified upload')
if int((upload_manifest.get('counts') or {}).get('verified') or 0) <= 0:
reasons.append('upload manifest verified count was zero')
if not verified_ws_or_rest:
reasons.append('upload manifest did not verify websocket recorder raw/checkpoint files')
if open_count > 0 and not skipped_open:
reasons.append('open/temp files existed but were not recorded as skipped')
if unsafe_deletes:
reasons.append('cleanup deleted files not present in verified set')
gate='WS_RECORDER_K8S_SMOKE_PASS' if not reasons else ('BLOCKED_WS_RECORDER_UPLOAD_OR_RETENTION' if any('upload' in r or 'cleanup' in r or 'open/temp' in r for r in reasons) else 'BLOCKED_WS_RECORDER_RUNTIME')
manifest={
'schema_name':'k8s_ws_runtime_smoke',
'schema_version':1,
'written_at_utc':dt.datetime.now(dt.UTC).replace(microsecond=0).isoformat().replace('+00:00','Z'),
'gate_status':gate,
'namespace':namespace,
'deployments':{'ws':ws_deployment,'rest':rest_deployment},
'uploader_cronjob':uploader_cronjob,
'uploader_image':uploader_image,
'pods':{'ws_before':ws_pod_before,'ws_after':ws_pod_after},
'rest_collector':{'image_before':rest_image_before,'ready_before':rest_ready_before,'image_after':rest_image_after,'ready_after':rest_ready_after,'unchanged':rest_image_before == rest_image_after and rest_ready_before == rest_ready_after},
'restart_check':{'old_file_preexisting':old_file_preexisting,'old_ws_file':{'path':old_ws_path,'sha256':old_ws_sha,'rows':old_ws_rows},'restart_summary':restart.get('specific_file')},
'initial':initial,
'post_restart':post,
'upload':upload,
'uploader_job':{'name':job_name,'status':job_status,'log_tail':job_log_tail[-4000:],'production_min_age_seconds':int(production_min_age),'smoke_min_age_seconds':0,'smoke_retention_days':int(smoke_retention_days)},
'upload_manifest_summary':{'path':upload.get('latest_upload_manifest_path'),'gate_status':upload_manifest.get('gate_status'),'operation_status':upload_manifest.get('operation_status'),'counts':upload_manifest.get('counts'),'verified_ws_or_rest_count':len(verified_ws_or_rest),'skipped_open_or_temp_count':len(skipped_open),'deleted_count':len(deleted_files),'retained_count':len(retained_files),'unsafe_delete_count':len(unsafe_deletes)},
'reasons':reasons,
'production_ready':False,
}
path=Path(output_path)
path.write_text(json.dumps(manifest, indent=2, sort_keys=True)+'\n')
print(json.dumps({'gate_status':gate,'evidence_path':str(path),'reasons':reasons}, indent=2, sort_keys=True))
raise SystemExit(0 if gate == 'WS_RECORDER_K8S_SMOKE_PASS' else 1)
PY_WRITE
printf '%s\n---PART---\n%s\n---PART---\n%s\n---PART---\n%s\n---PART---\n%s' "$initial_json" "$restart_json" "$post_json" "$upload_json" "$JOB_LOG_TAIL" | python3 "$WRITE_PY" "$OUTPUT_PATH" "$NAMESPACE" "$WS_DEPLOYMENT" "$REST_DEPLOYMENT" "$UPLOADER_CRONJOB" "$REST_IMAGE_BEFORE" "$REST_READY_BEFORE" "$REST_IMAGE_AFTER" "$REST_READY_AFTER" "$WS_POD" "$NEW_WS_POD" "$old_ws_path" "$old_ws_sha" "$old_ws_rows" "$JOB_NAME" "$JOB_STATUS" "$UPLOAD_MIN_AGE_SECONDS" "$SMOKE_UPLOAD_RETENTION_DAYS" "$UPLOADER_IMAGE"