279 lines
11 KiB
Bash
Executable file
279 lines
11 KiB
Bash
Executable file
#!/usr/bin/env bash
|
|
set -uo pipefail
|
|
|
|
APP_DIR="${ORDERBOOKS_APP_DIR:-/opt/orderbooks}"
|
|
DATA_DIR="${ORDERBOOKS_DATA_DIR:-/var/lib/orderbooks}"
|
|
RAW_DIR="${ORDERBOOKS_OUTPUT_DIR:-${DATA_DIR}/raw_orderbooks}"
|
|
MANIFEST_DIR="${ORDERBOOKS_MANIFEST_DIR:-${DATA_DIR}/manifests}"
|
|
COLLECTOR_SERVICE="${ORDERBOOKS_COLLECTOR_SERVICE:-polymarket-orderbook-collector.service}"
|
|
UPLOADER_SERVICE="${ORDERBOOKS_UPLOADER_SERVICE:-polymarket-orderbook-uploader.service}"
|
|
WAIT_SECONDS="${ORDERBOOKS_SMOKE_WAIT_SECONDS:-900}"
|
|
RUN_ID="$(date -u +%Y%m%dT%H%M%SZ)"
|
|
EVIDENCE_PATH="${ORDERBOOKS_SMOKE_EVIDENCE_PATH:-${MANIFEST_DIR}/vps_runtime_smoke_${RUN_ID}.json}"
|
|
PYTHON_BIN="${ORDERBOOKS_PYTHON:-python3}"
|
|
|
|
usage() {
|
|
cat <<'EOF'
|
|
Usage: scripts/vps_runtime_smoke_check.sh [options]
|
|
|
|
Run on the VPS after installing collector/uploader systemd units. The check
|
|
records durable JSON evidence, forces one collector service restart, verifies
|
|
old raw gzip files still parse and keep their checksum, waits for a later valid
|
|
collector cycle, then starts the uploader service and records upload evidence.
|
|
|
|
Options:
|
|
--app-dir DIR App checkout. Default: /opt/orderbooks.
|
|
--data-dir DIR Data root. Default: /var/lib/orderbooks.
|
|
--raw-dir DIR Raw output dir. Default: DATA_DIR/raw_orderbooks.
|
|
--manifest-dir DIR Manifest dir. Default: DATA_DIR/manifests.
|
|
--collector-service NAME systemd collector service name.
|
|
--uploader-service NAME systemd uploader service name.
|
|
--wait-seconds N Max wait for valid cycles. Default: 900.
|
|
--evidence-path PATH JSON evidence output path.
|
|
--help Show this help.
|
|
|
|
This script does not delete raw files or manifests. Failures are written to the
|
|
evidence JSON and should be preserved for review.
|
|
EOF
|
|
}
|
|
|
|
while [[ $# -gt 0 ]]; do
|
|
case "$1" in
|
|
--app-dir) APP_DIR="$2"; shift 2 ;;
|
|
--data-dir) DATA_DIR="$2"; RAW_DIR="${ORDERBOOKS_OUTPUT_DIR:-$2/raw_orderbooks}"; MANIFEST_DIR="${ORDERBOOKS_MANIFEST_DIR:-$2/manifests}"; shift 2 ;;
|
|
--raw-dir) RAW_DIR="$2"; shift 2 ;;
|
|
--manifest-dir) MANIFEST_DIR="$2"; shift 2 ;;
|
|
--collector-service) COLLECTOR_SERVICE="$2"; shift 2 ;;
|
|
--uploader-service) UPLOADER_SERVICE="$2"; shift 2 ;;
|
|
--wait-seconds) WAIT_SECONDS="$2"; shift 2 ;;
|
|
--evidence-path) EVIDENCE_PATH="$2"; shift 2 ;;
|
|
--help) usage; exit 0 ;;
|
|
*) echo "Unknown argument: $1" >&2; usage >&2; exit 2 ;;
|
|
esac
|
|
done
|
|
|
|
mkdir -p "$(dirname "${EVIDENCE_PATH}")"
|
|
|
|
PYTHONDONTWRITEBYTECODE=1 "${PYTHON_BIN}" - "$APP_DIR" "$DATA_DIR" "$RAW_DIR" "$MANIFEST_DIR" "$COLLECTOR_SERVICE" "$UPLOADER_SERVICE" "$WAIT_SECONDS" "$EVIDENCE_PATH" <<'PY_SMOKE'
|
|
import datetime as dt
|
|
import gzip
|
|
import hashlib
|
|
import json
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
from pathlib import Path
|
|
|
|
app_dir = Path(sys.argv[1])
|
|
data_dir = Path(sys.argv[2])
|
|
raw_dir = Path(sys.argv[3])
|
|
manifest_dir = Path(sys.argv[4])
|
|
collector_service = sys.argv[5]
|
|
uploader_service = sys.argv[6]
|
|
wait_seconds = int(sys.argv[7])
|
|
evidence_path = Path(sys.argv[8])
|
|
started = dt.datetime.now(dt.UTC).replace(microsecond=0)
|
|
checks = []
|
|
failures = []
|
|
|
|
|
|
def iso_now():
|
|
return dt.datetime.now(dt.UTC).replace(microsecond=0).isoformat().replace('+00:00', 'Z')
|
|
|
|
|
|
def run(command):
|
|
proc = subprocess.run(command, text=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
|
item = {
|
|
'command': command,
|
|
'exit_code': proc.returncode,
|
|
'stdout_tail': proc.stdout[-4000:],
|
|
'stderr_tail': proc.stderr[-4000:],
|
|
'ran_at_utc': iso_now(),
|
|
}
|
|
checks.append(item)
|
|
return item
|
|
|
|
|
|
def sha256(path):
|
|
digest = hashlib.sha256()
|
|
with path.open('rb') as handle:
|
|
for chunk in iter(lambda: handle.read(1024 * 1024), b''):
|
|
digest.update(chunk)
|
|
return digest.hexdigest()
|
|
|
|
|
|
def parse_raw(path):
|
|
rows = 0
|
|
first_keys = []
|
|
with gzip.open(path, 'rt', encoding='utf-8') as handle:
|
|
for line in handle:
|
|
if not line.strip():
|
|
continue
|
|
obj = json.loads(line)
|
|
if rows == 0:
|
|
first_keys = sorted(obj.keys())
|
|
rows += 1
|
|
return rows, first_keys
|
|
|
|
|
|
def collector_manifests():
|
|
if not manifest_dir.exists():
|
|
return []
|
|
return sorted(manifest_dir.glob('polymarket_orderbook_collector_*.json'), key=lambda path: path.stat().st_mtime)
|
|
|
|
|
|
def validate_collector(path):
|
|
manifest = json.loads(path.read_text(encoding='utf-8'))
|
|
output_files = []
|
|
for item in manifest.get('output_files', []):
|
|
raw_path = Path(item['path'])
|
|
rows, first_keys = parse_raw(raw_path)
|
|
actual_sha = sha256(raw_path)
|
|
output_files.append({
|
|
'path': str(raw_path),
|
|
'bytes': raw_path.stat().st_size,
|
|
'manifest_rows': item.get('rows'),
|
|
'rows_parsed': rows,
|
|
'row_count_matches_manifest': rows == item.get('rows'),
|
|
'manifest_sha256': item.get('sha256'),
|
|
'actual_sha256': actual_sha,
|
|
'sha256_matches_manifest': actual_sha == item.get('sha256'),
|
|
'first_row_keys': first_keys,
|
|
'under_raw_dir': raw_path.resolve().is_relative_to(raw_dir.resolve()),
|
|
'uses_live_sample_path': 'live_sample' in raw_path.parts,
|
|
})
|
|
valid = (
|
|
manifest.get('gate_status') == 'PASS'
|
|
and manifest.get('rows_written', 0) > 0
|
|
and manifest.get('failure_count') == 0
|
|
and not manifest.get('failures')
|
|
and bool(output_files)
|
|
and all(item['rows_parsed'] > 0 and item['row_count_matches_manifest'] and item['sha256_matches_manifest'] and item['under_raw_dir'] and not item['uses_live_sample_path'] for item in output_files)
|
|
)
|
|
return {
|
|
'path': str(path),
|
|
'manifest': manifest,
|
|
'output_files': output_files,
|
|
'valid': valid,
|
|
}
|
|
|
|
|
|
def latest_valid_after(after_mtime=0):
|
|
deadline = time.time() + wait_seconds
|
|
last_error = None
|
|
while time.time() <= deadline:
|
|
for path in reversed(collector_manifests()):
|
|
if path.stat().st_mtime <= after_mtime:
|
|
continue
|
|
try:
|
|
result = validate_collector(path)
|
|
except Exception as exc:
|
|
last_error = str(exc)
|
|
continue
|
|
if result['valid']:
|
|
return result
|
|
last_error = f"latest candidate invalid: {path}"
|
|
time.sleep(10)
|
|
raise TimeoutError(last_error or f'no valid collector manifest after mtime {after_mtime}')
|
|
|
|
|
|
def latest_upload_after(after_mtime=0):
|
|
candidates = sorted(manifest_dir.glob('upload_archive_*.json'), key=lambda path: path.stat().st_mtime)
|
|
candidates = [path for path in candidates if path.stat().st_mtime >= after_mtime]
|
|
if not candidates:
|
|
raise FileNotFoundError('no upload_archive_*.json manifest found after uploader run')
|
|
path = candidates[-1]
|
|
manifest = json.loads(path.read_text(encoding='utf-8'))
|
|
verified_count = manifest.get('counts', {}).get('verified', len(manifest.get('verified_files', [])))
|
|
return {
|
|
'path': str(path),
|
|
'manifest': manifest,
|
|
'verified_count': verified_count,
|
|
'valid': manifest.get('operation_status') == 'UPLOAD_VERIFIED' and manifest.get('gate_status') == 'PASS' and manifest.get('rclone', {}).get('copy_exit_code') == 0 and manifest.get('rclone', {}).get('check_exit_code') == 0 and verified_count > 0,
|
|
}
|
|
|
|
summary = {
|
|
'schema_name': 'vps_runtime_smoke_result',
|
|
'schema_version': 1,
|
|
'started_at_utc': started.isoformat().replace('+00:00', 'Z'),
|
|
'ended_at_utc': None,
|
|
'gate_status': 'ERROR',
|
|
'production_ready': False,
|
|
'app_dir': str(app_dir),
|
|
'data_dir': str(data_dir),
|
|
'raw_dir': str(raw_dir),
|
|
'manifest_dir': str(manifest_dir),
|
|
'collector_service': collector_service,
|
|
'uploader_service': uploader_service,
|
|
'wait_seconds': wait_seconds,
|
|
'checks': checks,
|
|
'failures': failures,
|
|
}
|
|
|
|
try:
|
|
active = run(['systemctl', 'is-active', collector_service])
|
|
if active['exit_code'] != 0:
|
|
failures.append('collector service is not active under systemd')
|
|
raise RuntimeError('collector service not active')
|
|
|
|
before = latest_valid_after(0)
|
|
before_mtime = Path(before['path']).stat().st_mtime
|
|
old_raw = before['output_files'][0]
|
|
old_raw_sha = old_raw['actual_sha256']
|
|
old_raw_path = Path(old_raw['path'])
|
|
|
|
restart = run(['systemctl', 'restart', collector_service])
|
|
if restart['exit_code'] != 0:
|
|
failures.append('collector service restart command failed')
|
|
raise RuntimeError('restart failed')
|
|
active_after = run(['systemctl', 'is-active', collector_service])
|
|
if active_after['exit_code'] != 0:
|
|
failures.append('collector service is not active after restart')
|
|
raise RuntimeError('collector inactive after restart')
|
|
|
|
after = latest_valid_after(before_mtime)
|
|
old_rows_after, _ = parse_raw(old_raw_path)
|
|
old_file_unchanged = sha256(old_raw_path) == old_raw_sha and old_rows_after == old_raw['rows_parsed']
|
|
if not old_file_unchanged:
|
|
failures.append('raw file from before restart changed or stopped parsing')
|
|
|
|
upload_start_mtime = time.time()
|
|
upload_run = run(['systemctl', 'start', uploader_service])
|
|
if upload_run['exit_code'] != 0:
|
|
failures.append('uploader service start failed')
|
|
try:
|
|
upload = latest_upload_after(upload_start_mtime - 2)
|
|
if not upload.get('valid'):
|
|
failures.append('uploader did not produce a verified upload manifest with at least one verified file')
|
|
except Exception as exc:
|
|
upload = {'path': None, 'valid': False, 'error': str(exc)}
|
|
failures.append(str(exc))
|
|
|
|
collector_logs = run(['journalctl', '-u', collector_service, '-n', '80', '--no-pager'])
|
|
uploader_logs = run(['journalctl', '-u', uploader_service, '-n', '80', '--no-pager'])
|
|
|
|
summary.update({
|
|
'before_restart_collector': before,
|
|
'after_restart_collector': after,
|
|
'old_raw_file_unchanged_after_restart': old_file_unchanged,
|
|
'upload_result': upload,
|
|
'collector_log_check_exit_code': collector_logs['exit_code'],
|
|
'uploader_log_check_exit_code': uploader_logs['exit_code'],
|
|
})
|
|
if after['valid'] and old_file_unchanged and upload.get('valid') and not failures:
|
|
summary['gate_status'] = 'PASS'
|
|
else:
|
|
summary['gate_status'] = 'FAIL'
|
|
except Exception as exc:
|
|
failures.append(str(exc))
|
|
summary['exception'] = repr(exc)
|
|
finally:
|
|
summary['ended_at_utc'] = iso_now()
|
|
evidence_path.parent.mkdir(parents=True, exist_ok=True)
|
|
evidence_path.write_text(json.dumps(summary, indent=2, sort_keys=True) + '\n', encoding='utf-8')
|
|
|
|
print(f"SMOKE_EVIDENCE={evidence_path}")
|
|
print(f"SMOKE_GATE={summary['gate_status']}")
|
|
if summary['gate_status'] != 'PASS':
|
|
sys.exit(1)
|
|
PY_SMOKE
|