orderbooks/scripts/k8s_runtime_smoke_check.sh
philipp 284e465588
Some checks failed
deploy / deploy (push) Has been cancelled
Prepare Kubernetes orderbooks deployment
2026-04-18 11:23:28 +02:00

466 lines
17 KiB
Bash
Executable file

#!/usr/bin/env bash
set -uo pipefail
NAMESPACE="${ORDERBOOKS_K8S_NAMESPACE:-orderbooks}"
DEPLOYMENT="${ORDERBOOKS_K8S_COLLECTOR_DEPLOYMENT:-orderbooks-collector}"
CRONJOB="${ORDERBOOKS_K8S_UPLOADER_CRONJOB:-orderbooks-uploader}"
RAW_DIR="${ORDERBOOKS_K8S_RAW_DIR:-/var/lib/orderbooks/raw_orderbooks}"
MANIFEST_DIR="${ORDERBOOKS_K8S_MANIFEST_DIR:-/var/lib/orderbooks/manifests}"
WAIT_SECONDS="${ORDERBOOKS_K8S_SMOKE_WAIT_SECONDS:-1200}"
UPLOAD_MIN_AGE_SECONDS="${ORDERBOOKS_UPLOAD_MIN_AGE_SECONDS:-600}"
KUBECTL_BIN="${ORDERBOOKS_KUBECTL:-kubectl}"
RUN_ID="$(date -u +%Y%m%dT%H%M%SZ)"
EVIDENCE_PATH="${ORDERBOOKS_K8S_SMOKE_EVIDENCE_PATH:-data/manifests/k8s_runtime_smoke_${RUN_ID}.json}"
usage() {
cat <<'EOF'
Usage: scripts/k8s_runtime_smoke_check.sh [options]
Run after the orderbooks Kubernetes workload is deployed. The script uses
kubectl, writes local JSON evidence, deletes one collector pod to force a
Deployment restart, verifies raw gzip JSONL files and manifests on the PVC,
then triggers the uploader CronJob and requires a verified upload manifest.
Options:
--namespace NAME Namespace. Default: orderbooks.
--deployment NAME Collector deployment. Default: orderbooks-collector.
--cronjob NAME Uploader CronJob. Default: orderbooks-uploader.
--raw-dir PATH Raw path inside collector pod. Default: /var/lib/orderbooks/raw_orderbooks.
--manifest-dir PATH Manifest path inside collector pod. Default: /var/lib/orderbooks/manifests.
--wait-seconds N Max wait for collector/upload evidence. Default: 1200.
--upload-min-age-seconds N
Wait for at least one raw/manifest file to be this old before upload. Default: 600.
--evidence-path PATH Local JSON evidence path.
--kubectl PATH kubectl binary. Default: kubectl.
--help Show this help.
This script does not read or print rclone config contents.
EOF
}
while [[ $# -gt 0 ]]; do
case "$1" in
--namespace) NAMESPACE="$2"; shift 2 ;;
--deployment) DEPLOYMENT="$2"; shift 2 ;;
--cronjob) CRONJOB="$2"; shift 2 ;;
--raw-dir) RAW_DIR="$2"; shift 2 ;;
--manifest-dir) MANIFEST_DIR="$2"; shift 2 ;;
--wait-seconds) WAIT_SECONDS="$2"; shift 2 ;;
--upload-min-age-seconds) UPLOAD_MIN_AGE_SECONDS="$2"; shift 2 ;;
--evidence-path) EVIDENCE_PATH="$2"; shift 2 ;;
--kubectl) KUBECTL_BIN="$2"; shift 2 ;;
--help) usage; exit 0 ;;
*) echo "Unknown argument: $1" >&2; usage >&2; exit 2 ;;
esac
done
mkdir -p "$(dirname "${EVIDENCE_PATH}")"
PYTHONDONTWRITEBYTECODE=1 python3 - "$KUBECTL_BIN" "$NAMESPACE" "$DEPLOYMENT" "$CRONJOB" "$RAW_DIR" "$MANIFEST_DIR" "$WAIT_SECONDS" "$UPLOAD_MIN_AGE_SECONDS" "$EVIDENCE_PATH" <<'PY_SMOKE'
import datetime as dt
import json
import subprocess
import sys
import time
from pathlib import Path
kubectl = sys.argv[1]
namespace = sys.argv[2]
deployment = sys.argv[3]
cronjob = sys.argv[4]
raw_dir = sys.argv[5]
manifest_dir = sys.argv[6]
wait_seconds = int(sys.argv[7])
upload_min_age_seconds = int(sys.argv[8])
evidence_path = Path(sys.argv[9])
started_at = dt.datetime.now(dt.UTC).replace(microsecond=0).isoformat().replace('+00:00', 'Z')
checks = []
failures = []
def iso_now():
return dt.datetime.now(dt.UTC).replace(microsecond=0).isoformat().replace('+00:00', 'Z')
def capture(command, input_text=None, timeout=None):
proc = subprocess.run(command, input=input_text, text=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=timeout)
item = {
'command': command,
'exit_code': proc.returncode,
'stdout_tail': proc.stdout[-6000:],
'stderr_tail': proc.stderr[-6000:],
'ran_at_utc': iso_now(),
}
checks.append(item)
return proc, item
def run(command, input_text=None, timeout=None):
_proc, item = capture(command, input_text=input_text, timeout=timeout)
return item
def run_json(command, input_text=None, timeout=None):
proc, item = capture(command, input_text=input_text, timeout=timeout)
if item['exit_code'] != 0:
raise RuntimeError(f"command failed: {' '.join(command)}")
return json.loads(proc.stdout)
def pod_ready(pod):
if pod.get('status', {}).get('phase') != 'Running':
return False
statuses = pod.get('status', {}).get('containerStatuses') or []
return bool(statuses) and all(status.get('ready') for status in statuses)
def get_collector_pod():
selector = 'app.kubernetes.io/name=orderbooks,app.kubernetes.io/component=collector'
deadline = time.time() + wait_seconds
last = None
while time.time() <= deadline:
pods = run_json([kubectl, '-n', namespace, 'get', 'pods', '-l', selector, '-o', 'json'])
items = pods.get('items', [])
ready = [pod for pod in items if pod_ready(pod)]
if ready:
ready.sort(key=lambda pod: pod.get('metadata', {}).get('creationTimestamp', ''))
return ready[-1]['metadata']['name'], ready[-1]
last = items
time.sleep(10)
raise TimeoutError(f'no ready collector pod found; last pods={last}')
def exec_python(pod, code, args):
command = [kubectl, '-n', namespace, 'exec', '-i', pod, '--', 'python3', '-', *args]
proc, item = capture(command, input_text=code, timeout=wait_seconds + 60)
if item['exit_code'] != 0:
raise RuntimeError(f"pod python command failed in {pod}: {item['stderr_tail']}")
return json.loads(proc.stdout)
def wait_for_valid_collector(pod, after_mtime, label):
deadline = time.time() + wait_seconds
last_error = None
while time.time() <= deadline:
try:
result = exec_python(pod, collector_validation_code, [manifest_dir, raw_dir, str(after_mtime)])
if result.get('valid'):
result['wait_label'] = label
return result
last_error = result
except Exception as exc:
last_error = repr(exc)
time.sleep(15)
raise TimeoutError(f'no valid {label} collector manifest found before timeout: {last_error}')
def wait_for_upload_eligible_files(pod):
deadline = time.time() + wait_seconds
last = None
while time.time() <= deadline:
result = exec_python(pod, upload_eligibility_code, [raw_dir, manifest_dir, str(upload_min_age_seconds)])
if result.get('eligible'):
return result
last = result
time.sleep(15)
raise TimeoutError(f'no upload-eligible raw/manifest files before timeout: {last}')
collector_validation_code = r'''
import gzip
import hashlib
import json
import sys
from pathlib import Path
manifest_dir = Path(sys.argv[1])
raw_dir = Path(sys.argv[2])
after_mtime = float(sys.argv[3])
def sha256(path):
digest = hashlib.sha256()
with path.open('rb') as handle:
for chunk in iter(lambda: handle.read(1024 * 1024), b''):
digest.update(chunk)
return digest.hexdigest()
def parse_raw(path):
rows = 0
first_keys = []
with gzip.open(path, 'rt', encoding='utf-8') as handle:
for line in handle:
if not line.strip():
continue
obj = json.loads(line)
if rows == 0:
first_keys = sorted(obj.keys())
rows += 1
return rows, first_keys
def validate(path):
manifest = json.loads(path.read_text(encoding='utf-8'))
output_files = []
for item in manifest.get('output_files', []):
raw_path = Path(item['path'])
rows, first_keys = parse_raw(raw_path)
actual_sha = sha256(raw_path)
output_files.append({
'path': str(raw_path),
'bytes': raw_path.stat().st_size,
'mtime': raw_path.stat().st_mtime,
'manifest_rows': item.get('rows'),
'rows_parsed': rows,
'row_count_matches_manifest': rows == item.get('rows'),
'manifest_sha256': item.get('sha256'),
'actual_sha256': actual_sha,
'sha256_matches_manifest': actual_sha == item.get('sha256'),
'under_raw_dir': raw_path.resolve().is_relative_to(raw_dir.resolve()),
'first_row_keys': first_keys,
})
valid = (
manifest.get('gate_status') == 'PASS'
and manifest.get('rows_written', 0) > 0
and manifest.get('failure_count') == 0
and not manifest.get('failures')
and bool(output_files)
and all(item['rows_parsed'] > 0 and item['row_count_matches_manifest'] and item['sha256_matches_manifest'] and item['under_raw_dir'] for item in output_files)
)
return {
'path': str(path),
'mtime': path.stat().st_mtime,
'manifest_summary': {
'gate_status': manifest.get('gate_status'),
'rows_written': manifest.get('rows_written'),
'failure_count': manifest.get('failure_count'),
'failures_present': bool(manifest.get('failures')),
'output_file_count': len(manifest.get('output_files', [])),
'started_at_utc': manifest.get('started_at_utc'),
'ended_at_utc': manifest.get('ended_at_utc'),
},
'output_files': output_files,
'valid': valid,
}
candidates = sorted(manifest_dir.glob('polymarket_orderbook_collector_*.json'), key=lambda p: p.stat().st_mtime)
candidates = [path for path in candidates if path.stat().st_mtime > after_mtime]
latest = None
for path in reversed(candidates):
try:
result = validate(path)
except Exception as exc:
latest = {'path': str(path), 'valid': False, 'error': repr(exc)}
continue
latest = result
if result['valid']:
print(json.dumps(result, sort_keys=True))
sys.exit(0)
print(json.dumps(latest or {'valid': False, 'error': 'no collector manifest candidates'}, sort_keys=True))
sys.exit(2)
'''
raw_check_code = r'''
import gzip
import hashlib
import json
import sys
from pathlib import Path
path = Path(sys.argv[1])
expected_sha = sys.argv[2]
expected_rows = int(sys.argv[3])
def sha256(path):
digest = hashlib.sha256()
with path.open('rb') as handle:
for chunk in iter(lambda: handle.read(1024 * 1024), b''):
digest.update(chunk)
return digest.hexdigest()
rows = 0
with gzip.open(path, 'rt', encoding='utf-8') as handle:
for line in handle:
if line.strip():
json.loads(line)
rows += 1
actual_sha = sha256(path)
print(json.dumps({
'path': str(path),
'expected_sha256': expected_sha,
'actual_sha256': actual_sha,
'sha256_matches': actual_sha == expected_sha,
'expected_rows': expected_rows,
'actual_rows': rows,
'row_count_matches': rows == expected_rows,
}, sort_keys=True))
'''
upload_validation_code = r'''
import json
import sys
from pathlib import Path
manifest_dir = Path(sys.argv[1])
after_mtime = float(sys.argv[2])
candidates = sorted(manifest_dir.glob('upload_archive_*.json'), key=lambda p: p.stat().st_mtime)
candidates = [path for path in candidates if path.stat().st_mtime >= after_mtime]
if not candidates:
print(json.dumps({'valid': False, 'error': 'no upload manifest candidates'}, sort_keys=True))
sys.exit(2)
path = candidates[-1]
manifest = json.loads(path.read_text(encoding='utf-8'))
verified_count = manifest.get('counts', {}).get('verified', len(manifest.get('verified_files', [])))
valid = (
manifest.get('operation_status') == 'UPLOAD_VERIFIED'
and manifest.get('gate_status') == 'PASS'
and manifest.get('rclone', {}).get('copy_exit_code') == 0
and manifest.get('rclone', {}).get('check_exit_code') == 0
and verified_count > 0
)
verified_files = manifest.get('verified_files', [])
print(json.dumps({
'path': str(path),
'mtime': path.stat().st_mtime,
'manifest_summary': {
'operation_status': manifest.get('operation_status'),
'gate_status': manifest.get('gate_status'),
'counts': manifest.get('counts', {}),
'planned_file_count': len(manifest.get('planned_files', [])),
'attempted_file_count': len(manifest.get('attempted_files', [])),
'uploaded_file_count': len(manifest.get('uploaded_files', [])),
'verified_file_count': verified_count,
'rclone_copy_exit_code': manifest.get('rclone', {}).get('copy_exit_code'),
'rclone_check_exit_code': manifest.get('rclone', {}).get('check_exit_code'),
'started_at_utc': manifest.get('started_at_utc'),
'ended_at_utc': manifest.get('ended_at_utc'),
},
'verified_count': verified_count,
'verified_file_samples': [
{
'relative_path': item.get('relative_path'),
'bytes': item.get('bytes'),
'sha256': item.get('sha256'),
'kind': item.get('kind'),
}
for item in verified_files[:5]
],
'valid': valid,
}, sort_keys=True))
if not valid:
sys.exit(2)
'''
upload_eligibility_code = r'''
import json
import sys
import time
from pathlib import Path
raw_dir = Path(sys.argv[1])
manifest_dir = Path(sys.argv[2])
min_age_seconds = int(sys.argv[3])
now = time.time()
def eligible_files(root, pattern):
if not root.exists():
return []
items = []
for path in sorted(root.rglob(pattern)):
if not path.is_file():
continue
age = max(0, int(now - path.stat().st_mtime))
if age >= min_age_seconds:
items.append({'path': str(path), 'bytes': path.stat().st_size, 'age_seconds': age})
return items
raw_files = eligible_files(raw_dir, '*.jsonl.gz')
manifest_files = eligible_files(manifest_dir, 'polymarket_orderbook_collector_*.json')
print(json.dumps({
'eligible': bool(raw_files) and bool(manifest_files),
'min_age_seconds': min_age_seconds,
'raw_eligible_count': len(raw_files),
'manifest_eligible_count': len(manifest_files),
'raw_sample': raw_files[:3],
'manifest_sample': manifest_files[:3],
}, sort_keys=True))
'''
summary = {
'schema_name': 'k8s_runtime_smoke_result',
'schema_version': 1,
'started_at_utc': started_at,
'ended_at_utc': None,
'gate_status': 'ERROR',
'production_ready': False,
'namespace': namespace,
'deployment': deployment,
'cronjob': cronjob,
'raw_dir': raw_dir,
'manifest_dir': manifest_dir,
'upload_min_age_seconds': upload_min_age_seconds,
'checks': checks,
'failures': failures,
}
try:
rollout = run([kubectl, '-n', namespace, 'rollout', 'status', f'deployment/{deployment}', f'--timeout={wait_seconds}s'])
if rollout['exit_code'] != 0:
raise RuntimeError('collector deployment rollout is not healthy')
pod_name, pod_obj = get_collector_pod()
before = wait_for_valid_collector(pod_name, 0, 'initial')
before_mtime = before['mtime']
old_file = before['output_files'][0]
delete_pod = run([kubectl, '-n', namespace, 'delete', 'pod', pod_name, '--wait=false'])
if delete_pod['exit_code'] != 0:
raise RuntimeError('failed to delete collector pod for restart test')
rollout_after = run([kubectl, '-n', namespace, 'rollout', 'status', f'deployment/{deployment}', f'--timeout={wait_seconds}s'])
if rollout_after['exit_code'] != 0:
raise RuntimeError('collector deployment did not recover after pod delete')
new_pod, new_pod_obj = get_collector_pod()
old_check = exec_python(new_pod, raw_check_code, [old_file['path'], old_file['actual_sha256'], str(old_file['rows_parsed'])])
if not old_check.get('sha256_matches') or not old_check.get('row_count_matches'):
raise RuntimeError('old raw file changed or stopped parsing after pod restart')
after = wait_for_valid_collector(new_pod, before_mtime, 'post_restart')
upload_eligibility = wait_for_upload_eligible_files(new_pod)
upload_start_mtime = time.time() - 2
job_name = 'orderbooks-uploader-smoke-' + dt.datetime.now(dt.UTC).strftime('%Y%m%dt%H%M%Sz').lower()
run([kubectl, '-n', namespace, 'delete', 'job', job_name, '--ignore-not-found=true'])
create_job = run([kubectl, '-n', namespace, 'create', 'job', job_name, f'--from=cronjob/{cronjob}'])
if create_job['exit_code'] != 0:
raise RuntimeError('failed to create uploader smoke job from CronJob')
wait_upload = run([kubectl, '-n', namespace, 'wait', '--for=condition=Complete', f'--timeout={wait_seconds}s', f'job/{job_name}'])
logs = run([kubectl, '-n', namespace, 'logs', f'job/{job_name}'])
if wait_upload['exit_code'] != 0:
raise RuntimeError('uploader smoke job did not complete')
upload = exec_python(new_pod, upload_validation_code, [manifest_dir, str(upload_start_mtime)])
if not upload.get('valid'):
raise RuntimeError('upload manifest did not verify at least one file')
summary.update({
'initial_collector_pod': pod_name,
'post_restart_collector_pod': new_pod,
'before_restart_collector': before,
'old_raw_file_after_restart': old_check,
'after_restart_collector': after,
'upload_eligibility': upload_eligibility,
'uploader_job': job_name,
'upload_result': upload,
'uploader_log_check_exit_code': logs['exit_code'],
})
summary['gate_status'] = 'PASS'
except Exception as exc:
failures.append(str(exc))
summary['exception'] = repr(exc)
summary['gate_status'] = 'FAIL'
finally:
summary['ended_at_utc'] = iso_now()
evidence_path.parent.mkdir(parents=True, exist_ok=True)
evidence_path.write_text(json.dumps(summary, indent=2, sort_keys=True) + '\n', encoding='utf-8')
print(f'K8S_SMOKE_EVIDENCE={evidence_path}')
print(f'K8S_SMOKE_GATE={summary["gate_status"]}')
if summary['gate_status'] != 'PASS':
sys.exit(1)
PY_SMOKE