#!/usr/bin/env bash set -uo pipefail NAMESPACE="${ORDERBOOKS_K8S_NAMESPACE:-orderbooks}" DEPLOYMENT="${ORDERBOOKS_K8S_COLLECTOR_DEPLOYMENT:-orderbooks-collector}" CRONJOB="${ORDERBOOKS_K8S_UPLOADER_CRONJOB:-orderbooks-uploader}" RAW_DIR="${ORDERBOOKS_K8S_RAW_DIR:-/var/lib/orderbooks/raw_orderbooks}" MANIFEST_DIR="${ORDERBOOKS_K8S_MANIFEST_DIR:-/var/lib/orderbooks/manifests}" WAIT_SECONDS="${ORDERBOOKS_K8S_SMOKE_WAIT_SECONDS:-1200}" UPLOAD_MIN_AGE_SECONDS="${ORDERBOOKS_UPLOAD_MIN_AGE_SECONDS:-600}" KUBECTL_BIN="${ORDERBOOKS_KUBECTL:-kubectl}" RUN_ID="$(date -u +%Y%m%dT%H%M%SZ)" EVIDENCE_PATH="${ORDERBOOKS_K8S_SMOKE_EVIDENCE_PATH:-data/manifests/k8s_runtime_smoke_${RUN_ID}.json}" usage() { cat <<'EOF' Usage: scripts/k8s_runtime_smoke_check.sh [options] Run after the orderbooks Kubernetes workload is deployed. The script uses kubectl, writes local JSON evidence, deletes one collector pod to force a Deployment restart, verifies raw gzip JSONL files and manifests on the PVC, then triggers the uploader CronJob and requires a verified upload manifest. Options: --namespace NAME Namespace. Default: orderbooks. --deployment NAME Collector deployment. Default: orderbooks-collector. --cronjob NAME Uploader CronJob. Default: orderbooks-uploader. --raw-dir PATH Raw path inside collector pod. Default: /var/lib/orderbooks/raw_orderbooks. --manifest-dir PATH Manifest path inside collector pod. Default: /var/lib/orderbooks/manifests. --wait-seconds N Max wait for collector/upload evidence. Default: 1200. --upload-min-age-seconds N Wait for at least one raw/manifest file to be this old before upload. Default: 600. --evidence-path PATH Local JSON evidence path. --kubectl PATH kubectl binary. Default: kubectl. --help Show this help. This script does not read or print rclone config contents. EOF } while [[ $# -gt 0 ]]; do case "$1" in --namespace) NAMESPACE="$2"; shift 2 ;; --deployment) DEPLOYMENT="$2"; shift 2 ;; --cronjob) CRONJOB="$2"; shift 2 ;; --raw-dir) RAW_DIR="$2"; shift 2 ;; --manifest-dir) MANIFEST_DIR="$2"; shift 2 ;; --wait-seconds) WAIT_SECONDS="$2"; shift 2 ;; --upload-min-age-seconds) UPLOAD_MIN_AGE_SECONDS="$2"; shift 2 ;; --evidence-path) EVIDENCE_PATH="$2"; shift 2 ;; --kubectl) KUBECTL_BIN="$2"; shift 2 ;; --help) usage; exit 0 ;; *) echo "Unknown argument: $1" >&2; usage >&2; exit 2 ;; esac done mkdir -p "$(dirname "${EVIDENCE_PATH}")" PYTHONDONTWRITEBYTECODE=1 python3 - "$KUBECTL_BIN" "$NAMESPACE" "$DEPLOYMENT" "$CRONJOB" "$RAW_DIR" "$MANIFEST_DIR" "$WAIT_SECONDS" "$UPLOAD_MIN_AGE_SECONDS" "$EVIDENCE_PATH" <<'PY_SMOKE' import datetime as dt import json import subprocess import sys import time from pathlib import Path kubectl = sys.argv[1] namespace = sys.argv[2] deployment = sys.argv[3] cronjob = sys.argv[4] raw_dir = sys.argv[5] manifest_dir = sys.argv[6] wait_seconds = int(sys.argv[7]) upload_min_age_seconds = int(sys.argv[8]) evidence_path = Path(sys.argv[9]) started_at = dt.datetime.now(dt.UTC).replace(microsecond=0).isoformat().replace('+00:00', 'Z') checks = [] failures = [] def iso_now(): return dt.datetime.now(dt.UTC).replace(microsecond=0).isoformat().replace('+00:00', 'Z') def capture(command, input_text=None, timeout=None): proc = subprocess.run(command, input=input_text, text=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=timeout) item = { 'command': command, 'exit_code': proc.returncode, 'stdout_tail': proc.stdout[-6000:], 'stderr_tail': proc.stderr[-6000:], 'ran_at_utc': iso_now(), } checks.append(item) return proc, item def run(command, input_text=None, timeout=None): _proc, item = capture(command, input_text=input_text, timeout=timeout) return item def run_json(command, input_text=None, timeout=None): proc, item = capture(command, input_text=input_text, timeout=timeout) if item['exit_code'] != 0: raise RuntimeError(f"command failed: {' '.join(command)}") return json.loads(proc.stdout) def pod_ready(pod): if pod.get('metadata', {}).get('deletionTimestamp'): return False if pod.get('status', {}).get('phase') != 'Running': return False statuses = pod.get('status', {}).get('containerStatuses') or [] return bool(statuses) and all(status.get('ready') for status in statuses) def get_collector_pod(exclude_names=None): exclude_names = set(exclude_names or []) selector = 'app.kubernetes.io/name=orderbooks,app.kubernetes.io/component=collector' deadline = time.time() + wait_seconds last = None while time.time() <= deadline: pods = run_json([kubectl, '-n', namespace, 'get', 'pods', '-l', selector, '-o', 'json']) items = pods.get('items', []) ready = [ pod for pod in items if pod_ready(pod) and pod.get('metadata', {}).get('name') not in exclude_names ] if ready: ready.sort(key=lambda pod: pod.get('metadata', {}).get('creationTimestamp', '')) return ready[-1]['metadata']['name'], ready[-1] last = items time.sleep(10) raise TimeoutError(f'no ready collector pod found; last pods={last}') def pod_start_epoch(pod): timestamp = pod.get('status', {}).get('startTime') or pod.get('metadata', {}).get('creationTimestamp') if not timestamp: return 0 return dt.datetime.fromisoformat(timestamp.replace('Z', '+00:00')).timestamp() def exec_python(pod, code, args): command = [kubectl, '-n', namespace, 'exec', '-i', pod, '--', 'python3', '-', *args] proc, item = capture(command, input_text=code, timeout=wait_seconds + 60) if item['exit_code'] != 0: raise RuntimeError(f"pod python command failed in {pod}: {item['stderr_tail']}") return json.loads(proc.stdout) def wait_for_valid_collector(pod, after_mtime, label): deadline = time.time() + wait_seconds last_error = None while time.time() <= deadline: try: result = exec_python(pod, collector_validation_code, [manifest_dir, raw_dir, str(after_mtime)]) if result.get('valid'): result['wait_label'] = label return result last_error = result except Exception as exc: last_error = repr(exc) time.sleep(15) raise TimeoutError(f'no valid {label} collector manifest found before timeout: {last_error}') def wait_for_upload_eligible_files(pod): deadline = time.time() + wait_seconds last = None while time.time() <= deadline: result = exec_python(pod, upload_eligibility_code, [raw_dir, manifest_dir, str(upload_min_age_seconds)]) if result.get('eligible'): return result last = result time.sleep(15) raise TimeoutError(f'no upload-eligible raw/manifest files before timeout: {last}') collector_validation_code = r''' import gzip import hashlib import json import sys from pathlib import Path manifest_dir = Path(sys.argv[1]) raw_dir = Path(sys.argv[2]) after_mtime = float(sys.argv[3]) def sha256(path): digest = hashlib.sha256() with path.open('rb') as handle: for chunk in iter(lambda: handle.read(1024 * 1024), b''): digest.update(chunk) return digest.hexdigest() def parse_raw(path): rows = 0 first_keys = [] with gzip.open(path, 'rt', encoding='utf-8') as handle: for line in handle: if not line.strip(): continue obj = json.loads(line) if rows == 0: first_keys = sorted(obj.keys()) rows += 1 return rows, first_keys def validate(path): manifest = json.loads(path.read_text(encoding='utf-8')) output_files = [] for item in manifest.get('output_files', []): raw_path = Path(item['path']) rows, first_keys = parse_raw(raw_path) actual_sha = sha256(raw_path) output_files.append({ 'path': str(raw_path), 'bytes': raw_path.stat().st_size, 'mtime': raw_path.stat().st_mtime, 'manifest_rows': item.get('rows'), 'rows_parsed': rows, 'row_count_matches_manifest': rows == item.get('rows'), 'manifest_sha256': item.get('sha256'), 'actual_sha256': actual_sha, 'sha256_matches_manifest': actual_sha == item.get('sha256'), 'under_raw_dir': raw_path.resolve().is_relative_to(raw_dir.resolve()), 'first_row_keys': first_keys, }) valid = ( manifest.get('gate_status') == 'PASS' and manifest.get('rows_written', 0) > 0 and manifest.get('failure_count') == 0 and not manifest.get('failures') and bool(output_files) and all(item['rows_parsed'] > 0 and item['row_count_matches_manifest'] and item['sha256_matches_manifest'] and item['under_raw_dir'] for item in output_files) ) return { 'path': str(path), 'mtime': path.stat().st_mtime, 'manifest_summary': { 'gate_status': manifest.get('gate_status'), 'rows_written': manifest.get('rows_written'), 'failure_count': manifest.get('failure_count'), 'failures_present': bool(manifest.get('failures')), 'output_file_count': len(manifest.get('output_files', [])), 'started_at_utc': manifest.get('started_at_utc'), 'ended_at_utc': manifest.get('ended_at_utc'), }, 'output_files': output_files, 'valid': valid, } candidates = sorted(manifest_dir.glob('polymarket_orderbook_collector_*.json'), key=lambda p: p.stat().st_mtime) candidates = [path for path in candidates if path.stat().st_mtime > after_mtime] latest = None for path in reversed(candidates): try: result = validate(path) except Exception as exc: latest = {'path': str(path), 'valid': False, 'error': repr(exc)} continue latest = result if result['valid']: print(json.dumps(result, sort_keys=True)) sys.exit(0) print(json.dumps(latest or {'valid': False, 'error': 'no collector manifest candidates'}, sort_keys=True)) sys.exit(2) ''' raw_check_code = r''' import gzip import hashlib import json import sys from pathlib import Path path = Path(sys.argv[1]) expected_sha = sys.argv[2] expected_rows = int(sys.argv[3]) def sha256(path): digest = hashlib.sha256() with path.open('rb') as handle: for chunk in iter(lambda: handle.read(1024 * 1024), b''): digest.update(chunk) return digest.hexdigest() rows = 0 with gzip.open(path, 'rt', encoding='utf-8') as handle: for line in handle: if line.strip(): json.loads(line) rows += 1 actual_sha = sha256(path) print(json.dumps({ 'path': str(path), 'expected_sha256': expected_sha, 'actual_sha256': actual_sha, 'sha256_matches': actual_sha == expected_sha, 'expected_rows': expected_rows, 'actual_rows': rows, 'row_count_matches': rows == expected_rows, }, sort_keys=True)) ''' upload_validation_code = r''' import json import sys from pathlib import Path manifest_dir = Path(sys.argv[1]) after_mtime = float(sys.argv[2]) candidates = sorted(manifest_dir.glob('upload_archive_*.json'), key=lambda p: p.stat().st_mtime) candidates = [path for path in candidates if path.stat().st_mtime >= after_mtime] if not candidates: print(json.dumps({'valid': False, 'error': 'no upload manifest candidates'}, sort_keys=True)) sys.exit(2) path = candidates[-1] manifest = json.loads(path.read_text(encoding='utf-8')) verified_count = manifest.get('counts', {}).get('verified', len(manifest.get('verified_files', []))) valid = ( manifest.get('operation_status') == 'UPLOAD_VERIFIED' and manifest.get('gate_status') == 'PASS' and manifest.get('rclone', {}).get('copy_exit_code') == 0 and manifest.get('rclone', {}).get('check_exit_code') == 0 and verified_count > 0 ) verified_files = manifest.get('verified_files', []) print(json.dumps({ 'path': str(path), 'mtime': path.stat().st_mtime, 'manifest_summary': { 'operation_status': manifest.get('operation_status'), 'gate_status': manifest.get('gate_status'), 'counts': manifest.get('counts', {}), 'planned_file_count': len(manifest.get('planned_files', [])), 'attempted_file_count': len(manifest.get('attempted_files', [])), 'uploaded_file_count': len(manifest.get('uploaded_files', [])), 'verified_file_count': verified_count, 'rclone_copy_exit_code': manifest.get('rclone', {}).get('copy_exit_code'), 'rclone_check_exit_code': manifest.get('rclone', {}).get('check_exit_code'), 'started_at_utc': manifest.get('started_at_utc'), 'ended_at_utc': manifest.get('ended_at_utc'), }, 'verified_count': verified_count, 'verified_file_samples': [ { 'relative_path': item.get('relative_path'), 'bytes': item.get('bytes'), 'sha256': item.get('sha256'), 'kind': item.get('kind'), } for item in verified_files[:5] ], 'valid': valid, }, sort_keys=True)) if not valid: sys.exit(2) ''' upload_eligibility_code = r''' import json import sys import time from pathlib import Path raw_dir = Path(sys.argv[1]) manifest_dir = Path(sys.argv[2]) min_age_seconds = int(sys.argv[3]) now = time.time() def eligible_files(root, pattern): if not root.exists(): return [] items = [] for path in sorted(root.rglob(pattern)): if not path.is_file(): continue age = max(0, int(now - path.stat().st_mtime)) if age >= min_age_seconds: items.append({'path': str(path), 'bytes': path.stat().st_size, 'age_seconds': age}) return items raw_files = eligible_files(raw_dir, '*.jsonl.gz') manifest_files = eligible_files(manifest_dir, 'polymarket_orderbook_collector_*.json') print(json.dumps({ 'eligible': bool(raw_files) and bool(manifest_files), 'min_age_seconds': min_age_seconds, 'raw_eligible_count': len(raw_files), 'manifest_eligible_count': len(manifest_files), 'raw_sample': raw_files[:3], 'manifest_sample': manifest_files[:3], }, sort_keys=True)) ''' summary = { 'schema_name': 'k8s_runtime_smoke_result', 'schema_version': 1, 'started_at_utc': started_at, 'ended_at_utc': None, 'gate_status': 'ERROR', 'production_ready': False, 'namespace': namespace, 'deployment': deployment, 'cronjob': cronjob, 'raw_dir': raw_dir, 'manifest_dir': manifest_dir, 'wait_seconds': wait_seconds, 'upload_min_age_seconds': upload_min_age_seconds, 'checks': checks, 'failures': failures, } try: rollout = run([kubectl, '-n', namespace, 'rollout', 'status', f'deployment/{deployment}', f'--timeout={wait_seconds}s']) if rollout['exit_code'] != 0: raise RuntimeError('collector deployment rollout is not healthy') pod_name, pod_obj = get_collector_pod() initial_after_mtime = max(0, pod_start_epoch(pod_obj) - 5) before = wait_for_valid_collector(pod_name, initial_after_mtime, 'initial') before_mtime = before['mtime'] old_file = before['output_files'][0] delete_pod = run([kubectl, '-n', namespace, 'delete', 'pod', pod_name, '--wait=false']) if delete_pod['exit_code'] != 0: raise RuntimeError('failed to delete collector pod for restart test') rollout_after = run([kubectl, '-n', namespace, 'rollout', 'status', f'deployment/{deployment}', f'--timeout={wait_seconds}s']) if rollout_after['exit_code'] != 0: raise RuntimeError('collector deployment did not recover after pod delete') new_pod, new_pod_obj = get_collector_pod(exclude_names={pod_name}) old_check = exec_python(new_pod, raw_check_code, [old_file['path'], old_file['actual_sha256'], str(old_file['rows_parsed'])]) if not old_check.get('sha256_matches') or not old_check.get('row_count_matches'): raise RuntimeError('old raw file changed or stopped parsing after pod restart') after = wait_for_valid_collector(new_pod, before_mtime, 'post_restart') upload_eligibility = wait_for_upload_eligible_files(new_pod) upload_start_mtime = time.time() - 2 job_name = 'orderbooks-uploader-smoke-' + dt.datetime.now(dt.UTC).strftime('%Y%m%dt%H%M%Sz').lower() run([kubectl, '-n', namespace, 'delete', 'job', job_name, '--ignore-not-found=true']) create_job = run([kubectl, '-n', namespace, 'create', 'job', job_name, f'--from=cronjob/{cronjob}']) if create_job['exit_code'] != 0: raise RuntimeError('failed to create uploader smoke job from CronJob') wait_upload = run([kubectl, '-n', namespace, 'wait', '--for=condition=Complete', f'--timeout={wait_seconds}s', f'job/{job_name}']) logs = run([kubectl, '-n', namespace, 'logs', f'job/{job_name}']) if wait_upload['exit_code'] != 0: raise RuntimeError('uploader smoke job did not complete') upload = exec_python(new_pod, upload_validation_code, [manifest_dir, str(upload_start_mtime)]) if not upload.get('valid'): raise RuntimeError('upload manifest did not verify at least one file') summary.update({ 'initial_collector_pod': pod_name, 'post_restart_collector_pod': new_pod, 'before_restart_collector': before, 'old_raw_file_after_restart': old_check, 'after_restart_collector': after, 'upload_eligibility': upload_eligibility, 'uploader_job': job_name, 'upload_result': upload, 'uploader_log_check_exit_code': logs['exit_code'], }) summary['gate_status'] = 'PASS' except Exception as exc: failures.append(str(exc)) summary['exception'] = repr(exc) summary['gate_status'] = 'FAIL' finally: summary['ended_at_utc'] = iso_now() evidence_path.parent.mkdir(parents=True, exist_ok=True) evidence_path.write_text(json.dumps(summary, indent=2, sort_keys=True) + '\n', encoding='utf-8') print(f'K8S_SMOKE_EVIDENCE={evidence_path}') print(f'K8S_SMOKE_GATE={summary["gate_status"]}') if summary['gate_status'] != 'PASS': sys.exit(1) PY_SMOKE