From 24a5002d1dbfa08a7571004d82eba281cefd5422 Mon Sep 17 00:00:00 2001 From: philipp Date: Wed, 1 Apr 2026 00:09:10 +0200 Subject: [PATCH] Reduce ingest scope and bootstrap app deploy --- .env.example | 32 +--- .forgejo/workflows/deploy.yml | 26 +++- README.md | 50 +++++- deploy/k8s/base/unrip.yaml | 1 + docs/deployment.md | 188 +++++++++++++++++++++++ docs/next-session-architecture.md | 55 ++++--- scripts/deploy/bootstrap.sh | 183 ++++++++++++++++++++++ scripts/deploy/forgejo_repo_bootstrap.py | 138 +++++++++++++++++ src/apps/dummy-consumer.mjs | 31 +++- src/apps/dummy-executor.mjs | 124 ++++++++++----- src/apps/dummy-reactor.mjs | 81 ++++++---- src/apps/near-intents-ingest.mjs | 40 ++++- src/bus/kafka/consumer.mjs | 34 +++- src/bus/kafka/producer.mjs | 32 +++- src/core/log.mjs | 78 ++++++---- src/core/pair-filter.mjs | 164 +++++++++++++++++++- src/lib/config.mjs | 18 +++ src/venues/near-intents/ws.mjs | 46 ++++-- 18 files changed, 1134 insertions(+), 187 deletions(-) create mode 100644 docs/deployment.md create mode 100755 scripts/deploy/bootstrap.sh create mode 100755 scripts/deploy/forgejo_repo_bootstrap.py diff --git a/.env.example b/.env.example index 040b24b..f8b3a53 100644 --- a/.env.example +++ b/.env.example @@ -1,6 +1,7 @@ # Local dev / container runtime values NEAR_INTENTS_API_KEY=replace_me NEAR_INTENTS_WS_URL=wss://solver-relay-v2.chaindefuser.com/ws +NEAR_INTENTS_PAIR_FILTER=nep141:btc.omft.near->nep141:gnosis-0x420ca0f9b9b604ce0fd9c18ef134c705e5fa3430.omf KAFKA_BROKERS=redpanda:9092 KAFKA_CLIENT_ID=unrip KAFKA_TOPIC_RAW_NEAR_INTENTS_QUOTE=raw.near_intents.quote @@ -11,29 +12,12 @@ KAFKA_CONSUMER_GROUP_DUMMY=dummy-reactor-v1 KAFKA_CONSUMER_GROUP_EXECUTOR=dummy-executor-v1 EXECUTOR_STATE_DIR=/var/lib/unrip/executor-state -# Repo-driven Hetzner bootstrap values live separately from the app .env. -# Copy scripts/hetzner/bootstrap-secrets.env.example to -# scripts/hetzner/bootstrap-secrets.env, configure non-secret values plus *_PASS -# mappings to your pass store, then: -# source scripts/hetzner/bootstrap-secrets.env -# bash scripts/hetzner/bootstrap.sh -# -# Canonical operator flow uses `pass` for sensitive values; explicit env vars still -# override pass-backed lookups for CI/testing. -# -# Expected bootstrap inputs now include: -# - HCLOUD_TOKEN_PASS or HCLOUD_TOKEN -# - SSH_PUBLIC_KEY_PATH -# - PUBLIC_DOMAIN -# - BASE_DOMAIN -# - LETSENCRYPT_EMAIL -# - REGISTRY_USERNAME -# - REGISTRY_PASSWORD_PASS or REGISTRY_PASSWORD -# - NEAR_INTENTS_API_KEY_PASS or NEAR_INTENTS_API_KEY -# - FORGEJO_ADMIN_USERNAME -# - FORGEJO_ADMIN_EMAIL -# - FORGEJO_ADMIN_PASSWORD_PASS or FORGEJO_ADMIN_PASSWORD -# - optional DNS provider creds via *_PASS or direct env vars +# Platform bootstrap values live in the separate infra/platform repo, not in +# this application repo. In the current local split that repo is `../unrip3`. +# Configure and run bootstrap there, then deploy this repo using the app-side +# workflow described in `docs/deployment.md`. # # Future k3s deployment should source the app values from Kubernetes Secret/ConfigMap. -# Hetzner provisioning is workstation-driven after Terraform; cloud-init no longer clones this repo onto the node. +# This repo expects app-side cluster secrets such as: +# - `unrip-secrets` for `NEAR_INTENTS_API_KEY` +# - `unrip-registry-creds` for image pulls and in-cluster Kaniko builds diff --git a/.forgejo/workflows/deploy.yml b/.forgejo/workflows/deploy.yml index 392b8f3..87b0b24 100644 --- a/.forgejo/workflows/deploy.yml +++ b/.forgejo/workflows/deploy.yml @@ -20,7 +20,25 @@ jobs: steps: - name: Install tooling run: | - apk add --no-cache git kubectl + if command -v git >/dev/null 2>&1 && command -v kubectl >/dev/null 2>&1; then + exit 0 + fi + + if command -v apk >/dev/null 2>&1; then + apk add --no-cache git kubectl + exit 0 + fi + + if command -v apt-get >/dev/null 2>&1; then + apt-get update + apt-get install -y git curl ca-certificates + curl -fsSLo /usr/local/bin/kubectl "https://dl.k8s.io/release/$(curl -fsSL https://dl.k8s.io/release/stable.txt)/bin/linux/amd64/kubectl" + chmod +x /usr/local/bin/kubectl + exit 0 + fi + + echo "missing git/kubectl and no supported package manager found" >&2 + exit 1 - name: Load kubeconfig run: | @@ -40,7 +58,7 @@ jobs: - name: Resolve deployment settings run: | IMAGE="$REGISTRY_HOST/$PROJECT_NAME:$IMAGE_TAG" - BUILD_JOB="image-build-${GITHUB_SHA:0:12}" + BUILD_JOB="image-build-$(printf '%s' "$GITHUB_SHA" | cut -c1-12)" { echo "IMAGE=$IMAGE" echo "BUILD_JOB=$BUILD_JOB" @@ -115,9 +133,7 @@ jobs: - name: Roll deployments to new image run: | - IFS=',' read -r -a DEPLOYMENTS <<< "$PROJECT_DEPLOYMENTS" - - for deployment in "${DEPLOYMENTS[@]}"; do + printf '%s\n' "$PROJECT_DEPLOYMENTS" | tr ',' '\n' | while IFS= read -r deployment; do deployment="$(echo "$deployment" | xargs)" [ -n "$deployment" ] || continue diff --git a/README.md b/README.md index 79e957c..6457af7 100644 --- a/README.md +++ b/README.md @@ -55,5 +55,51 @@ The shared cluster/platform resources live in the separate infra repository. ## Deployment -This repo includes `.forgejo/workflows/deploy.yml`. -On push to `main`, Forgejo builds the image from this repo root, pushes it to the shared registry, applies `deploy/k8s/base`, and rolls the app deployments in the `unrip` namespace. +This repo is the app-side deployment repo. The shared Hetzner/k3s bootstrap, +Forgejo runner, registry, and other platform services live in the separate +platform repo. + +See `docs/deployment.md` for the full operator path. + +### One-time app bootstrap + +Bootstrap the app namespace, secrets, and Forgejo repo settings from this repo: + +```bash +bash scripts/deploy/bootstrap.sh +``` + +By default, the script uses the adjacent platform checkout at `../unrip3` for: +- `kubeconfig.yaml` +- `kubeconfig.incluster.yaml` +- registry credentials +- the `NEAR_INTENTS_API_KEY` fallback from `../unrip3/.env` + +If you are not using that local split, provide the values yourself via env vars +such as `KUBECONFIG_PATH`, `CI_KUBECONFIG_PATH`, `REGISTRY_HOST`, +`REGISTRY_USERNAME`, `REGISTRY_PASSWORD`, `NEAR_INTENTS_API_KEY`, and either +`FORGEJO_TOKEN` or `FORGEJO_ADMIN_USERNAME` / `FORGEJO_ADMIN_PASSWORD`. + +### Routine deploy + +After bootstrap, deployment is just a push to Forgejo `main`: + +```bash +git push forgejo main +``` + +`.forgejo/workflows/deploy.yml` then: +- applies `deploy/k8s/base` +- builds the image from this repo root inside the cluster with Kaniko +- pushes it to the shared registry +- rolls the `unrip` deployments + +### Observe rollout + +```bash +KUBECONFIG=../unrip3/.state/hetzner/kubeconfig.yaml kubectl -n unrip get deploy,pods,job +KUBECONFIG=../unrip3/.state/hetzner/kubeconfig.yaml kubectl -n unrip rollout status deploy/near-intents-ingest +KUBECONFIG=../unrip3/.state/hetzner/kubeconfig.yaml kubectl -n unrip rollout status deploy/dummy-reactor +KUBECONFIG=../unrip3/.state/hetzner/kubeconfig.yaml kubectl -n unrip rollout status deploy/dummy-executor +KUBECONFIG=../unrip3/.state/hetzner/kubeconfig.yaml kubectl -n unrip rollout status deploy/dummy-consumer +``` diff --git a/deploy/k8s/base/unrip.yaml b/deploy/k8s/base/unrip.yaml index eeebae5..d930a46 100644 --- a/deploy/k8s/base/unrip.yaml +++ b/deploy/k8s/base/unrip.yaml @@ -5,6 +5,7 @@ metadata: namespace: unrip data: NEAR_INTENTS_WS_URL: wss://solver-relay-v2.chaindefuser.com/ws + NEAR_INTENTS_PAIR_FILTER: nep141:btc.omft.near->nep141:gnosis-0x420ca0f9b9b604ce0fd9c18ef134c705e5fa3430.omf KAFKA_BROKERS: redpanda.unrip.svc.cluster.local:9092 KAFKA_CLIENT_ID: unrip KAFKA_TOPIC_RAW_NEAR_INTENTS_QUOTE: raw.near_intents.quote diff --git a/docs/deployment.md b/docs/deployment.md new file mode 100644 index 0000000..d8a1da6 --- /dev/null +++ b/docs/deployment.md @@ -0,0 +1,188 @@ +# Deployment + +This repository owns the app-side deployment assets for `unrip`: + +- application source +- Docker build inputs +- Kubernetes manifests under `deploy/k8s/base/` +- the Forgejo workflow under `.forgejo/workflows/deploy.yml` + +The shared platform bootstrap lives in the separate infra/platform repository. +In the current local split, that repo is available as `../unrip3`. + +## Ownership split + +Platform repo responsibilities: +- Hetzner/OpenTofu provisioning +- cloud-init and k3s bootstrap +- shared registry +- Forgejo and the Forgejo runner +- cert-manager, Traefik, observability, and other cluster services + +This repo responsibilities: +- app image build context +- app namespace manifests +- app runtime secret names +- app deployment workflow + +## Deployment model + +The intended production path is: + +1. bootstrap the shared platform from the platform repo +2. create the `unrip` app secrets in the cluster +3. push this repo to Forgejo +4. let `.forgejo/workflows/deploy.yml` build and roll the app in-cluster + +The image build happens inside Kubernetes via Kaniko. The operator pushes Git, +not Docker images. + +## Platform prerequisite + +Before deploying this repo, the platform repo should already have completed its +Hetzner bootstrap flow. From the current local split, the relevant operator docs +live in: + +- `../unrip3/deploy/hetzner/README.md` +- `../unrip3/deploy/k8s/README.md` +- `../unrip3/docs/hetzner-self-hosted-ci-runbook.md` + +After platform bootstrap, you should have: + +- a reachable Forgejo instance +- a reachable shared registry +- a running Forgejo runner in the cluster +- `../unrip3/.state/hetzner/kubeconfig.yaml` +- `../unrip3/.state/hetzner/kubeconfig.incluster.yaml` + +## One-time app namespace setup + +Apply the namespace first: + +```bash +kubectl apply -f deploy/k8s/base/namespace.yaml +``` + +Create the app runtime secret required by `near-intents-ingest`: + +```bash +kubectl -n unrip create secret generic unrip-secrets \ + --from-literal=NEAR_INTENTS_API_KEY=replace_me +``` + +Create the registry auth secret used both for image pulls and the in-cluster +Kaniko build job: + +```bash +kubectl -n unrip create secret docker-registry unrip-registry-creds \ + --docker-server=registry. \ + --docker-username="$REGISTRY_USERNAME" \ + --docker-password="$REGISTRY_PASSWORD" +``` + +Then apply the app manifests: + +```bash +kubectl apply -k deploy/k8s/base +``` + +Notes: + +- `deploy/k8s/base/unrip.yaml` references `unrip-secrets` and + `unrip-registry-creds`; they are not created by this repo. +- the checked-in image `ghcr.io/example/unrip:bootstrap` is only a placeholder. + The first real image tag is set by the Forgejo workflow or by a manual + `kubectl set image`. + +## Required Forgejo repo settings + +Required repo secret: + +- `KUBECONFIG_B64` + +Required repo variable: + +- `REGISTRY_HOST` + +Recommended repo variables when you do not want to rely on workflow defaults: + +- `PROJECT_NAME` +- `PROJECT_NAMESPACE` +- `PROJECT_DEPLOYMENTS` +- `PROJECT_REGISTRY_SECRET_NAME` + +Recommended values for this repo: + +- `REGISTRY_HOST=registry.` +- `PROJECT_NAME=unrip` +- `PROJECT_NAMESPACE=unrip` +- `PROJECT_DEPLOYMENTS=near-intents-ingest,dummy-reactor,dummy-executor,dummy-consumer` +- `PROJECT_REGISTRY_SECRET_NAME=unrip-registry-creds` + +`KUBECONFIG_B64` should be the base64-encoded contents of the in-cluster +kubeconfig produced by the platform repo, not the public workstation kubeconfig: + +```bash +base64 -w0 ../unrip3/.state/hetzner/kubeconfig.incluster.yaml +``` + +The current workflow does not read `REGISTRY_USERNAME` or `REGISTRY_PASSWORD` +from Forgejo directly. Those values are still needed on the operator side when +creating the `unrip-registry-creds` Kubernetes secret. + +## Seed the repo into Forgejo + +Create the repository in Forgejo, add a `forgejo` remote for this repo, and +push `main`. For example: + +```bash +git remote add forgejo https://git.//$(basename "$PWD").git +git push forgejo HEAD:refs/heads/main +``` + +## Routine deploy flow + +Once the repo exists in Forgejo and the repo settings above are configured: + +```bash +git push forgejo main +``` + +On push to `main`, `.forgejo/workflows/deploy.yml` does the following: + +1. loads kubeconfig from `KUBECONFIG_B64` +2. applies `deploy/k8s/base` +3. creates an in-cluster Kaniko Job in the `unrip` namespace +4. builds and pushes `REGISTRY_HOST/unrip:` +5. updates the four app deployments and waits for rollout + +## Observe rollout + +```bash +kubectl -n unrip get deploy,pods,pvc,job +kubectl -n unrip rollout status deploy/near-intents-ingest +kubectl -n unrip rollout status deploy/dummy-reactor +kubectl -n unrip rollout status deploy/dummy-executor +kubectl -n unrip rollout status deploy/dummy-consumer +``` + +Useful logs: + +```bash +kubectl -n unrip logs deploy/near-intents-ingest -f +kubectl -n unrip logs deploy/dummy-reactor -f +kubectl -n unrip logs deploy/dummy-executor -f +kubectl -n unrip logs deploy/dummy-consumer -f +kubectl -n unrip logs job/redpanda-topic-bootstrap +``` + +## Local development + +Local iteration remains separate from the production path: + +```bash +cp .env.example .env +docker compose up -d --build +``` + +That path is for local testing. Production rollout is Forgejo + Kubernetes. diff --git a/docs/next-session-architecture.md b/docs/next-session-architecture.md index 20045ae..2b726c3 100644 --- a/docs/next-session-architecture.md +++ b/docs/next-session-architecture.md @@ -209,41 +209,46 @@ Target environment: The first version may run on one machine, but deployment structure should already match a future distributed system. ### Current canonical operator path -The repo now documents and partially implements this path as the primary deployment workflow: +After the repo split, the primary deployment path is shared across two repos: +- the separate platform repo owns Hetzner/OpenTofu, cloud-init, k3s bootstrap, + Forgejo, the runner, and the shared registry +- this repo owns the app image, app manifests, and the app rollout workflow -#### Phase 0: workstation bootstrap -1. A local operator workstation prepares bootstrap secrets in `scripts/hetzner/bootstrap-secrets.env`. -2. The operator runs `bash scripts/hetzner/bootstrap.sh`. +#### Phase 0: platform bootstrap (platform repo) +1. A local operator workstation prepares platform bootstrap secrets in the + platform repo. +2. The operator runs the platform repo bootstrap flow. 3. Terraform provisions the server, firewall, network, and cloud-init user-data. -4. cloud-init installs k3s automatically and prepares persistence directories plus bootstrap artifacts. -5. The workstation waits for the public k3s API endpoint to report ready. -6. The workstation writes `.state/hetzner/kubeconfig.yaml`. -7. The workstation injects initial Kubernetes Secrets for app and Forgejo bootstrap. -8. The workstation applies repo-managed Kubernetes manifests under `deploy/k8s/`. -9. The workstation performs the first image/bootstrap delivery attempt for the app workloads. -10. The workstation verifies rollout status. +4. cloud-init installs k3s automatically and prepares persistence directories + plus bootstrap artifacts. +5. The workstation writes the public and in-cluster kubeconfigs. +6. The workstation injects the shared platform secrets and applies the shared + platform manifests. +7. Forgejo and the runner come online in the cluster. -#### Phase 1: self-hosted handoff -1. Forgejo becomes reachable in-cluster. -2. The operator completes initial Forgejo admin/repo setup. -3. This repo is pushed or mirrored into Forgejo. -4. The Forgejo runner becomes the routine app deployment mechanism. -5. Terraform remains the infra mutation entrypoint unless further automated later. +#### Phase 1: app repo handoff (this repo) +1. The operator creates this app repo's Kubernetes secrets such as + `unrip-secrets` and `unrip-registry-creds`. +2. This repo is pushed or mirrored into Forgejo. +3. On push to `main`, the Forgejo runner applies `deploy/k8s/base`, builds the + app image in-cluster with Kaniko, and updates the `unrip` deployments. +4. Terraform remains the infra mutation entrypoint, but app rollout is owned by + this repo's workflow. ### Failure-recovery expectation -The bootstrap path must be rerunnable from the workstation. +The overall bootstrap path must be rerunnable from the workstation. Docs should keep treating recovery as: - fix local secrets/inputs -- rerun the bootstrap script +- rerun the platform repo bootstrap script - inspect the cluster with the generated kubeconfig -- destroy/recreate infra with `scripts/hetzner/destroy.sh` only when required +- destroy/recreate infra from the platform repo only when required ### Current repo-state caveats -The direction is clear, but the implementation is still mid-transition: -- the bootstrap script currently applies `deploy/k8s/base` directly rather than the Hetzner overlay -- kubeconfig/auth handling is not yet fully production-hardened -- first image delivery is still a bootstrap workaround rather than a final registry-native CI path -- Forgejo admin bootstrap, repo creation, and Actions configuration still require operator steps +The direction is clear, but the implementation still has caveats: +- this repo now assumes a pre-bootstrapped platform repo/cluster +- kubeconfig/auth handling is only as strong as the external Forgejo secret management +- base manifests still carry a placeholder bootstrap image until CI rolls the first real tag +- app secrets and registry pull credentials still require operator setup - local Compose remains in the repo for development/testing, not as the canonical production path ### Minimal repo layout target diff --git a/scripts/deploy/bootstrap.sh b/scripts/deploy/bootstrap.sh new file mode 100755 index 0000000..8f60660 --- /dev/null +++ b/scripts/deploy/bootstrap.sh @@ -0,0 +1,183 @@ +#!/usr/bin/env bash +set -euo pipefail + +ROOT_DIR=$(cd "$(dirname "$0")/../.." && pwd) +PLATFORM_REPO_DIR="${PLATFORM_REPO_DIR:-$ROOT_DIR/../unrip3}" +BOOTSTRAP_ENV_FILE="${BOOTSTRAP_ENV_FILE:-$PLATFORM_REPO_DIR/.state/hetzner/bootstrap-secrets.resolved.env}" +PLATFORM_APP_ENV_FILE="${PLATFORM_APP_ENV_FILE:-$PLATFORM_REPO_DIR/.env}" +APP_ENV_FILE="${APP_ENV_FILE:-$ROOT_DIR/.env}" +FORGEJO_REMOTE_NAME="${FORGEJO_REMOTE_NAME:-forgejo}" + +PROJECT_NAME="${PROJECT_NAME:-unrip}" +PROJECT_NAMESPACE="${PROJECT_NAMESPACE:-$PROJECT_NAME}" +PROJECT_DEPLOYMENTS="${PROJECT_DEPLOYMENTS:-near-intents-ingest,dummy-reactor,dummy-executor,dummy-consumer}" +PROJECT_REGISTRY_SECRET_NAME="${PROJECT_REGISTRY_SECRET_NAME:-${PROJECT_NAME}-registry-creds}" +APP_SECRET_NAME="${APP_SECRET_NAME:-${PROJECT_NAME}-secrets}" + +require() { + command -v "$1" >/dev/null 2>&1 || { + echo "missing required command: $1" >&2 + exit 1 + } +} + +load_env_defaults() { + local file="$1" + [[ -f "$file" ]] || return 0 + + eval "$( + python3 - "$file" <<'PY' +import os +import shlex +import sys + +for raw in open(sys.argv[1], 'r', encoding='utf-8'): + line = raw.strip() + if not line or line.startswith('#'): + continue + if line.startswith('export '): + line = line[len('export '):] + if '=' not in line: + continue + key, value = line.split('=', 1) + key = key.strip() + if key in os.environ: + continue + print(f'export {key}={shlex.quote(value)}') +PY + )" +} + +require git +require kubectl +require python3 +require base64 + +load_env_defaults "$BOOTSTRAP_ENV_FILE" +load_env_defaults "$PLATFORM_APP_ENV_FILE" +load_env_defaults "$APP_ENV_FILE" + +REMOTE_URL="${FORGEJO_REMOTE_URL:-$(git -C "$ROOT_DIR" remote get-url "$FORGEJO_REMOTE_NAME")}" +eval "$( + python3 - "$REMOTE_URL" <<'PY' +import sys +from urllib.parse import urlparse + +remote = sys.argv[1] +parsed = urlparse(remote) +if parsed.scheme not in {'http', 'https'}: + raise SystemExit('forgejo remote must use http(s) for automatic bootstrap') + +path = parsed.path.rstrip('/') +if path.endswith('.git'): + path = path[:-4] +parts = [part for part in path.split('/') if part] +if len(parts) < 2: + raise SystemExit('could not parse repo owner/name from forgejo remote') + +base_url = f'{parsed.scheme}://{parsed.hostname}' +if parsed.port: + base_url += f':{parsed.port}' + +if parsed.username and parsed.password: + print(f'export FORGEJO_API_PASSWORD={parsed.password}') + +if parsed.username: + print(f'export FORGEJO_API_USERNAME={parsed.username}') + +print(f'export FORGEJO_URL={base_url}') +print(f'export FORGEJO_REPO_OWNER={parts[-2]}') +print(f'export FORGEJO_REPO_NAME={parts[-1]}') +PY +)" + +: "${KUBECONFIG_PATH:=${KUBECONFIG:-$PLATFORM_REPO_DIR/.state/hetzner/kubeconfig.yaml}}" +: "${CI_KUBECONFIG_PATH:=$PLATFORM_REPO_DIR/.state/hetzner/kubeconfig.incluster.yaml}" +: "${FORGEJO_URL:?set FORGEJO_URL or configure a forgejo remote}" +: "${FORGEJO_REPO_OWNER:?set FORGEJO_REPO_OWNER}" +: "${FORGEJO_REPO_NAME:?set FORGEJO_REPO_NAME}" +: "${FORGEJO_ADMIN_USERNAME:=${FORGEJO_API_USERNAME:-}}" + +if [[ -z "${FORGEJO_TOKEN:-}" ]]; then + : "${FORGEJO_ADMIN_USERNAME:=${FORGEJO_API_USERNAME:-}}" + : "${FORGEJO_ADMIN_USERNAME:?set FORGEJO_TOKEN or FORGEJO_ADMIN_USERNAME}" + : "${FORGEJO_ADMIN_PASSWORD:=${FORGEJO_API_PASSWORD:-}}" + : "${FORGEJO_ADMIN_PASSWORD:?set FORGEJO_TOKEN or FORGEJO_ADMIN_PASSWORD}" +fi + +if [[ ! -f "$KUBECONFIG_PATH" ]]; then + echo "missing kubeconfig: $KUBECONFIG_PATH" >&2 + exit 1 +fi + +if [[ ! -f "$CI_KUBECONFIG_PATH" ]]; then + echo "missing in-cluster kubeconfig: $CI_KUBECONFIG_PATH" >&2 + exit 1 +fi + +export KUBECONFIG="$KUBECONFIG_PATH" + +if [[ -z "${REGISTRY_HOST:-}" ]]; then + REGISTRY_HOST="${REGISTRY_DOMAIN:-}" +fi +if [[ -z "${REGISTRY_HOST:-}" ]]; then + REGISTRY_HOST="$(kubectl get ingress -n registry registry -o jsonpath='{.spec.rules[0].host}' 2>/dev/null || true)" +fi +if [[ -z "${REGISTRY_USERNAME:-}" ]]; then + REGISTRY_USERNAME="$(kubectl get secret registry-secrets -n registry -o jsonpath='{.data.htpasswd}' 2>/dev/null | base64 -d | cut -d: -f1 || true)" +fi + +: "${REGISTRY_HOST:?set REGISTRY_HOST or configure the registry ingress first}" +: "${REGISTRY_USERNAME:?set REGISTRY_USERNAME or bootstrap the shared registry first}" +: "${REGISTRY_PASSWORD:?set REGISTRY_PASSWORD}" +: "${NEAR_INTENTS_API_KEY:?set NEAR_INTENTS_API_KEY}" + +echo "bootstrapping namespace $PROJECT_NAMESPACE" +kubectl apply -f "$ROOT_DIR/deploy/k8s/base/namespace.yaml" + +echo "upserting runtime secret $APP_SECRET_NAME" +kubectl -n "$PROJECT_NAMESPACE" create secret generic "$APP_SECRET_NAME" \ + --from-literal=NEAR_INTENTS_API_KEY="$NEAR_INTENTS_API_KEY" \ + --dry-run=client -o yaml | kubectl apply -f - + +echo "upserting registry pull/push secret $PROJECT_REGISTRY_SECRET_NAME" +kubectl -n "$PROJECT_NAMESPACE" create secret docker-registry "$PROJECT_REGISTRY_SECRET_NAME" \ + --docker-server="$REGISTRY_HOST" \ + --docker-username="$REGISTRY_USERNAME" \ + --docker-password="$REGISTRY_PASSWORD" \ + --dry-run=client -o yaml | kubectl apply -f - + +echo "applying app manifests" +kubectl apply -k "$ROOT_DIR/deploy/k8s/base" + +echo "upserting Forgejo repo settings" +forgejo_args=() +if [[ -n "${FORGEJO_TOKEN:-}" ]]; then + forgejo_args+=(--token "$FORGEJO_TOKEN") +fi +if [[ -n "${FORGEJO_ADMIN_USERNAME:-}" ]]; then + forgejo_args+=(--admin-username "$FORGEJO_ADMIN_USERNAME") +fi +if [[ -n "${FORGEJO_ADMIN_PASSWORD:-}" ]]; then + forgejo_args+=(--admin-password "$FORGEJO_ADMIN_PASSWORD") +fi + +python3 "$ROOT_DIR/scripts/deploy/forgejo_repo_bootstrap.py" \ + --forgejo-url "$FORGEJO_URL" \ + --repo-owner "$FORGEJO_REPO_OWNER" \ + --repo-name "$FORGEJO_REPO_NAME" \ + --ci-kubeconfig "$CI_KUBECONFIG_PATH" \ + --registry-host "$REGISTRY_HOST" \ + --project-name "$PROJECT_NAME" \ + --project-namespace "$PROJECT_NAMESPACE" \ + --project-deployments "$PROJECT_DEPLOYMENTS" \ + --project-registry-secret-name "$PROJECT_REGISTRY_SECRET_NAME" \ + "${forgejo_args[@]}" + +cat < { await consumer.disconnect(); @@ -28,15 +33,29 @@ process.on('SIGTERM', async () => { await consumer.run({ eachMessage: async ({ message }) => { if (!message.value) return; + let event; try { event = parseEventMessage(message.value.toString()); } catch { - logStatus('result consumer received non-JSON message; skipping'); + logger.warn('invalid_json_message', { + topic: config.kafkaTopicExecTradeResult, + }); return; } - assertTradeResult(event); - const payload = event.payload; - console.log(`[result] command_id=${payload.command_id} quote_id=${payload.quote_id} status=${payload.status} result_code=${payload.result_code || 'n/a'}`); + + try { + assertTradeResult(event); + } catch (error) { + logger.error('message_processing_failed', { + venue: event.venue || 'near-intents', + topic: config.kafkaTopicExecTradeResult, + details: { + error: serializeError(error), + command_id: event.payload?.command_id, + quote_id: event.payload?.quote_id, + }, + }); + } }, }); diff --git a/src/apps/dummy-executor.mjs b/src/apps/dummy-executor.mjs index 0992381..ffe0969 100644 --- a/src/apps/dummy-executor.mjs +++ b/src/apps/dummy-executor.mjs @@ -4,27 +4,32 @@ import { createConsumer } from '../bus/kafka/consumer.mjs'; import { createProducer } from '../bus/kafka/producer.mjs'; import { buildEventEnvelope, parseEventMessage } from '../core/event-envelope.mjs'; import { createExecutorStateStore } from '../core/executor-state-store.mjs'; -import { logStatus } from '../core/log.mjs'; +import { createLogger, serializeError } from '../core/log.mjs'; import { assertExecuteTradeCommand, assertTradeResult } from '../core/schemas.mjs'; import { loadConfig } from '../lib/config.mjs'; const config = loadConfig(); +const logger = createLogger({ + service: 'dummy-executor', + component: 'executor', + namespace: config.projectNamespace, +}); const consumer = await createConsumer({ groupId: config.kafkaConsumerGroupExecutor, brokers: config.kafkaBrokers, clientId: config.kafkaClientId, + logger, }); const producer = await createProducer({ brokers: config.kafkaBrokers, clientId: config.kafkaClientId, + logger, }); const stateStore = createExecutorStateStore({ stateDir: config.executorStateDir }); await consumer.subscribe({ topic: config.kafkaTopicCmdExecuteTrade, fromBeginning: false }); -logStatus(`dummy executor subscribed to ${config.kafkaTopicCmdExecuteTrade} as ${config.kafkaConsumerGroupExecutor}`); -logStatus(`dummy executor will publish results to ${config.kafkaTopicExecTradeResult}; state_dir=${config.executorStateDir}`); async function shutdown() { await consumer.disconnect(); @@ -42,52 +47,89 @@ await consumer.run({ try { event = parseEventMessage(message.value.toString()); } catch { - logStatus('dummy executor received non-JSON message; skipping'); + logger.warn('invalid_json_message', { + topic: config.kafkaTopicCmdExecuteTrade, + }); return; } - assertExecuteTradeCommand(event); + try { + assertExecuteTradeCommand(event); - const payload = event.payload; - const commandId = payload.command_id; - const existing = stateStore.get(commandId); - if (existing?.status === 'completed') { - logStatus(`dummy executor skipping duplicate command_id=${commandId}`); - return; - } + const payload = event.payload; + const commandId = payload.command_id; + const pair = `${payload.asset_in}->${payload.asset_out}`; + const existing = stateStore.get(commandId); + if (existing?.status === 'completed') { + logger.warn('duplicate_command_skipped', { + venue: event.venue || 'near-intents', + topic: config.kafkaTopicCmdExecuteTrade, + pair, + details: { + command_id: commandId, + quote_id: payload.quote_id, + }, + }); + return; + } - stateStore.markProcessing(commandId, { - idempotency_key: payload.idempotency_key, - execution_key: payload.execution_key, - quote_id: payload.quote_id, - }); - - const pair = `${payload.asset_in} -> ${payload.asset_out}`; - const result = buildEventEnvelope({ - source: 'dummy-executor', - venue: event.venue || 'near-intents', - eventType: 'trade_result', - eventId: `exec-${commandId}`, - observedAt: event.observed_at, - payload: { - command_id: commandId, + stateStore.markProcessing(commandId, { idempotency_key: payload.idempotency_key, execution_key: payload.execution_key, quote_id: payload.quote_id, - status: 'simulated_sent', - result_code: existing?.status === 'processing' ? 'recovered_inflight' : 'sent', - note: 'dummy executor placeholder result', - }, - }); - assertTradeResult(result); + }); - await producer.sendJson(config.kafkaTopicExecTradeResult, result, { key: payload.execution_key }); - stateStore.markCompleted(commandId, { - idempotency_key: payload.idempotency_key, - execution_key: payload.execution_key, - quote_id: payload.quote_id, - result_event_id: result.event_id, - }); - console.log(`[dummy-executor] result emitted ${pair} quote_id=${payload.quote_id} command_id=${commandId} status=simulated_sent`); + const recoveredInflight = existing?.status === 'processing'; + const result = buildEventEnvelope({ + source: 'dummy-executor', + venue: event.venue || 'near-intents', + eventType: 'trade_result', + eventId: `exec-${commandId}`, + observedAt: event.observed_at, + payload: { + command_id: commandId, + idempotency_key: payload.idempotency_key, + execution_key: payload.execution_key, + quote_id: payload.quote_id, + status: 'simulated_sent', + result_code: recoveredInflight ? 'recovered_inflight' : 'sent', + note: 'dummy executor placeholder result', + }, + }); + assertTradeResult(result); + + await producer.sendJson(config.kafkaTopicExecTradeResult, result, { key: payload.execution_key }); + stateStore.markCompleted(commandId, { + idempotency_key: payload.idempotency_key, + execution_key: payload.execution_key, + quote_id: payload.quote_id, + result_event_id: result.event_id, + }); + + if (recoveredInflight) { + logger.warn('inflight_command_recovered', { + venue: event.venue || 'near-intents', + topic: config.kafkaTopicExecTradeResult, + pair, + details: { + command_id: commandId, + quote_id: payload.quote_id, + }, + }); + } + } catch (error) { + logger.error('message_processing_failed', { + venue: event.venue || 'near-intents', + topic: config.kafkaTopicCmdExecuteTrade, + pair: event.payload?.asset_in && event.payload?.asset_out + ? `${event.payload.asset_in}->${event.payload.asset_out}` + : undefined, + details: { + error: serializeError(error), + command_id: event.payload?.command_id, + quote_id: event.payload?.quote_id, + }, + }); + } }, }); diff --git a/src/apps/dummy-reactor.mjs b/src/apps/dummy-reactor.mjs index 1a8b5ab..4854d29 100644 --- a/src/apps/dummy-reactor.mjs +++ b/src/apps/dummy-reactor.mjs @@ -2,26 +2,31 @@ import process from 'node:process'; import { createConsumer } from '../bus/kafka/consumer.mjs'; import { createProducer } from '../bus/kafka/producer.mjs'; -import { logStatus } from '../core/log.mjs'; +import { createLogger, serializeError } from '../core/log.mjs'; import { loadConfig } from '../lib/config.mjs'; import { buildEventEnvelope, parseEventMessage } from '../core/event-envelope.mjs'; import { assertExecuteTradeCommand, assertNormalizedSwapDemand } from '../core/schemas.mjs'; const config = loadConfig(); +const logger = createLogger({ + service: 'dummy-reactor', + component: 'reactor', + namespace: config.projectNamespace, +}); const consumer = await createConsumer({ groupId: config.kafkaConsumerGroupDummy, brokers: config.kafkaBrokers, clientId: config.kafkaClientId, + logger, }); const producer = await createProducer({ brokers: config.kafkaBrokers, clientId: config.kafkaClientId, + logger, }); await consumer.subscribe({ topic: config.kafkaTopicNormSwapDemand, fromBeginning: false }); -logStatus(`dummy reactor subscribed to ${config.kafkaTopicNormSwapDemand} as ${config.kafkaConsumerGroupDummy}`); -logStatus(`dummy reactor will publish commands to ${config.kafkaTopicCmdExecuteTrade}`); async function shutdown() { await consumer.disconnect(); @@ -39,37 +44,53 @@ await consumer.run({ try { event = parseEventMessage(message.value.toString()); } catch { - logStatus('dummy reactor received non-JSON message; skipping'); + logger.warn('invalid_json_message', { + topic: config.kafkaTopicNormSwapDemand, + }); return; } - assertNormalizedSwapDemand(event); + try { + assertNormalizedSwapDemand(event); - const payload = event.payload; - const pair = `${payload.asset_in} -> ${payload.asset_out}`; - const quoteId = payload.quote_id; - const commandId = `cmd-${quoteId}`; - const command = buildEventEnvelope({ - source: 'dummy-reactor', - venue: event.venue || 'near-intents', - eventType: 'execute_trade', - eventId: commandId, - observedAt: event.observed_at, - payload: { - command_id: commandId, - idempotency_key: `${event.venue || 'near-intents'}:${quoteId}`, - execution_key: `${event.venue || 'near-intents'}:${payload.asset_in}->${payload.asset_out}`, - quote_id: quoteId, - asset_in: payload.asset_in, - asset_out: payload.asset_out, - amount_in: payload.amount_in, - amount_out: payload.amount_out, - reason: 'dummy reactor placeholder decision', - }, - }); - assertExecuteTradeCommand(command); + const payload = event.payload; + const pair = `${payload.asset_in}->${payload.asset_out}`; + const quoteId = payload.quote_id; + const commandId = `cmd-${quoteId}`; + const command = buildEventEnvelope({ + source: 'dummy-reactor', + venue: event.venue || 'near-intents', + eventType: 'execute_trade', + eventId: commandId, + observedAt: event.observed_at, + payload: { + command_id: commandId, + idempotency_key: `${event.venue || 'near-intents'}:${quoteId}`, + execution_key: `${event.venue || 'near-intents'}:${payload.asset_in}->${payload.asset_out}`, + quote_id: quoteId, + asset_in: payload.asset_in, + asset_out: payload.asset_out, + amount_in: payload.amount_in, + amount_out: payload.amount_out, + reason: 'dummy reactor placeholder decision', + }, + }); + assertExecuteTradeCommand(command); - await producer.sendJson(config.kafkaTopicCmdExecuteTrade, command, { key: command.payload.execution_key }); - console.log(`[dummy-reactor] command emitted ${pair} quote_id=${quoteId} command_id=${commandId}`); + await producer.sendJson(config.kafkaTopicCmdExecuteTrade, command, { key: command.payload.execution_key }); + return; + } catch (error) { + logger.error('message_processing_failed', { + venue: event.venue || 'near-intents', + topic: config.kafkaTopicNormSwapDemand, + pair: event.payload?.asset_in && event.payload?.asset_out + ? `${event.payload.asset_in}->${event.payload.asset_out}` + : undefined, + details: { + error: serializeError(error), + quote_id: event.payload?.quote_id, + }, + }); + } }, }); diff --git a/src/apps/near-intents-ingest.mjs b/src/apps/near-intents-ingest.mjs index 27f51c1..ab95059 100644 --- a/src/apps/near-intents-ingest.mjs +++ b/src/apps/near-intents-ingest.mjs @@ -1,32 +1,53 @@ import process from 'node:process'; import { createProducer } from '../bus/kafka/producer.mjs'; -import { logStatus } from '../core/log.mjs'; -import { parsePairFilter } from '../core/pair-filter.mjs'; +import { createLogger } from '../core/log.mjs'; +import { createPairFilterController } from '../core/pair-filter.mjs'; import { loadConfig } from '../lib/config.mjs'; import { startNearIntentsWs } from '../venues/near-intents/ws.mjs'; const config = loadConfig(); -const pairFilter = parsePairFilter(process.argv.slice(2)); +const logger = createLogger({ + service: 'near-intents-ingest', + component: 'ingest', + namespace: config.projectNamespace, +}); +const pairFilterController = createPairFilterController({ + argv: process.argv.slice(2), + env: process.env, + defaultPairFilter: config.nearIntentsPairFilter, + pairFilterFile: config.nearIntentsPairFilterFile, + reloadEveryMs: config.nearIntentsPairFilterReloadMs, + logger: logger.child({ + component: 'filter', + venue: 'near-intents', + }), +}); if (!config.nearIntentsApiKey) { - console.error('Missing NEAR_INTENTS_API_KEY in env or .env'); + logger.error('missing_api_key', { + venue: 'near-intents', + details: { + variable: 'NEAR_INTENTS_API_KEY', + }, + }); process.exit(1); } const producer = await createProducer({ brokers: config.kafkaBrokers, clientId: config.kafkaClientId, + logger, }); -logStatus(`kafka producer connected; raw_topic=${config.kafkaTopicRawNearIntentsQuote}; normalized_topic=${config.kafkaTopicNormSwapDemand}`); -if (pairFilter) logStatus(`pair filter enabled: ${pairFilter[0]} <-> ${pairFilter[1]}`); process.on('SIGINT', async () => { + pairFilterController.close(); await producer.disconnect(); process.exit(0); }); process.on('SIGTERM', async () => { + pairFilterController.close(); await producer.disconnect(); process.exit(0); }); @@ -34,8 +55,13 @@ process.on('SIGTERM', async () => { await startNearIntentsWs({ apiKey: config.nearIntentsApiKey, wsUrl: config.nearIntentsWsUrl, - pairFilter, + getPairFilter: () => pairFilterController.getPairFilter(), producer, rawTopic: config.kafkaTopicRawNearIntentsQuote, normalizedTopic: config.kafkaTopicNormSwapDemand, + namespace: config.projectNamespace, + logger: logger.child({ + component: 'ws', + venue: 'near-intents', + }), }); diff --git a/src/bus/kafka/consumer.mjs b/src/bus/kafka/consumer.mjs index c9d7b47..f1aaf4b 100644 --- a/src/bus/kafka/consumer.mjs +++ b/src/bus/kafka/consumer.mjs @@ -1,11 +1,43 @@ import { Kafka } from 'kafkajs'; +import { serializeError } from '../../core/log.mjs'; function createKafka({ brokers = ['127.0.0.1:9092'], clientId = 'unrip' } = {}) { return new Kafka({ clientId, brokers }); } -export async function createConsumer({ groupId, ...options }) { +export async function createConsumer({ groupId, logger, ...options }) { const consumer = createKafka(options).consumer({ groupId }); + const kafkaLogger = logger ? logger.child({ component: 'kafka' }) : null; + + consumer.on(consumer.events.CONNECT, () => { + kafkaLogger?.info('kafka_connected', { + details: { + client_type: 'consumer', + group_id: groupId, + }, + }); + }); + + consumer.on(consumer.events.DISCONNECT, () => { + kafkaLogger?.warn('kafka_disconnected', { + details: { + client_type: 'consumer', + group_id: groupId, + }, + }); + }); + + consumer.on(consumer.events.CRASH, ({ payload }) => { + kafkaLogger?.error('kafka_consumer_crashed', { + details: { + client_type: 'consumer', + group_id: groupId, + restart: payload?.restart ?? null, + error: serializeError(payload?.error), + }, + }); + }); + await consumer.connect(); return { diff --git a/src/bus/kafka/producer.mjs b/src/bus/kafka/producer.mjs index da4d55f..bf2b5ed 100644 --- a/src/bus/kafka/producer.mjs +++ b/src/bus/kafka/producer.mjs @@ -1,11 +1,41 @@ import { Kafka } from 'kafkajs'; +import { serializeError } from '../../core/log.mjs'; function createKafka({ brokers = ['127.0.0.1:9092'], clientId = 'unrip' } = {}) { return new Kafka({ clientId, brokers }); } -export async function createProducer(options = {}) { +export async function createProducer({ logger, ...options } = {}) { const producer = createKafka(options).producer(); + const kafkaLogger = logger ? logger.child({ component: 'kafka' }) : null; + + producer.on(producer.events.CONNECT, () => { + kafkaLogger?.info('kafka_connected', { + details: { + client_type: 'producer', + }, + }); + }); + + producer.on(producer.events.DISCONNECT, () => { + kafkaLogger?.warn('kafka_disconnected', { + details: { + client_type: 'producer', + }, + }); + }); + + producer.on(producer.events.REQUEST_TIMEOUT, ({ payload }) => { + kafkaLogger?.error('kafka_request_timeout', { + details: { + client_type: 'producer', + broker: payload?.broker ?? null, + client_id: payload?.clientId ?? null, + error: serializeError(payload?.error), + }, + }); + }); + await producer.connect(); return { async sendJson(topic, event, { key = event?.event_id ?? event?.key ?? null } = {}) { diff --git a/src/core/log.mjs b/src/core/log.mjs index 5abc50a..ce7323e 100644 --- a/src/core/log.mjs +++ b/src/core/log.mjs @@ -1,31 +1,55 @@ -export function logStatus(message) { - const time = new Date().toISOString(); - console.error(`[${time}] ${message}`); +import process from 'node:process'; + +export function logJson(event) { + process.stdout.write(`${JSON.stringify(compact({ + ts: new Date().toISOString(), + ...event, + }))}\n`); } -export function startIdleHeartbeat({ - label, - getLastActivityAt, - getStatus, - idleAfterMs = 30_000, - checkEveryMs = 5_000, -}) { - let lastHeartbeatAt = 0; +export function createLogger(bindings = {}) { + const base = compact(bindings); - const timer = setInterval(() => { - const lastActivityAt = getLastActivityAt(); - const idleForMs = Date.now() - lastActivityAt; - - if (idleForMs < idleAfterMs) return; - if (Date.now() - lastHeartbeatAt < idleAfterMs) return; - - const seconds = Math.floor(idleForMs / 1000); - const suffix = getStatus ? `; ${getStatus()}` : ''; - logStatus(`${label} idle ${seconds}s${suffix}`); - lastHeartbeatAt = Date.now(); - }, checkEveryMs); - - if (typeof timer.unref === 'function') timer.unref(); - - return () => clearInterval(timer); + return { + child(extraBindings = {}) { + return createLogger({ ...base, ...extraBindings }); + }, + info(event, fields = {}) { + logJson({ level: 'info', ...base, event, ...compact(fields) }); + }, + warn(event, fields = {}) { + logJson({ level: 'warn', ...base, event, ...compact(fields) }); + }, + error(event, fields = {}) { + logJson({ level: 'error', ...base, event, ...compact(fields) }); + }, + }; +} + +export function serializeError(error) { + if (!error) return null; + if (error instanceof Error) { + return compact({ + name: error.name, + message: error.message, + stack: error.stack, + code: error.code, + }); + } + if (typeof error === 'object') return compact(error); + return { message: String(error) }; +} + +function compact(value) { + if (Array.isArray(value)) { + return value.map((item) => compact(item)); + } + + if (!value || typeof value !== 'object') return value; + + const entries = Object.entries(value) + .filter(([, entry]) => entry !== undefined) + .map(([key, entry]) => [key, compact(entry)]); + + return Object.fromEntries(entries); } diff --git a/src/core/pair-filter.mjs b/src/core/pair-filter.mjs index 172cab3..0c47bb6 100644 --- a/src/core/pair-filter.mjs +++ b/src/core/pair-filter.mjs @@ -1,17 +1,173 @@ +import fs from 'node:fs'; + +export const DEFAULT_NEAR_INTENTS_PAIR_FILTER = + 'nep141:btc.omft.near->nep141:gnosis-0x420ca0f9b9b604ce0fd9c18ef134c705e5fa3430.omf'; + export function parsePairFilter(argv) { const idx = argv.indexOf('--pair'); if (idx === -1) return null; - const raw = argv[idx + 1]; - if (!raw || !raw.includes('->')) { - throw new Error("Use --pair 'asset_a->asset_b'"); + return parsePairFilterValue(argv[idx + 1], { fieldName: '--pair' }); +} + +export function parsePairFilterValue(raw, { fieldName = 'pair filter' } = {}) { + const normalized = String(raw || '').trim(); + if (!normalized) return null; + + if (['all', '*', 'off', 'none', 'disabled'].includes(normalized.toLowerCase())) { + return null; } - const [a, b] = raw.split('->').map((x) => x.trim().toLowerCase()); + + if (!normalized.includes('->')) { + throw new Error(`Use ${fieldName} like 'asset_a->asset_b'`); + } + + const [a, b] = normalized.split('->').map((value) => value.trim().toLowerCase()); + if (!a || !b) throw new Error(`Use ${fieldName} like 'asset_a->asset_b'`); return [a, b]; } +export function formatPairFilter(pairFilter) { + if (!pairFilter) return null; + return `${pairFilter[0]}->${pairFilter[1]}`; +} + +export function resolvePairFilter({ + argv = [], + env = process.env, + defaultPairFilter = DEFAULT_NEAR_INTENTS_PAIR_FILTER, +} = {}) { + const cliPairFilter = parsePairFilter(argv); + if (cliPairFilter) { + return { + pairFilter: cliPairFilter, + pair: formatPairFilter(cliPairFilter), + source: 'argv', + }; + } + + const envPairFilter = parsePairFilterValue(env.NEAR_INTENTS_PAIR_FILTER, { + fieldName: 'NEAR_INTENTS_PAIR_FILTER', + }); + if (envPairFilter) { + return { + pairFilter: envPairFilter, + pair: formatPairFilter(envPairFilter), + source: 'env', + }; + } + + const defaultResolved = parsePairFilterValue(defaultPairFilter, { + fieldName: 'default pair filter', + }); + return { + pairFilter: defaultResolved, + pair: formatPairFilter(defaultResolved), + source: 'default', + }; +} + +export function createPairFilterController({ + argv = [], + env = process.env, + logger = null, + defaultPairFilter = DEFAULT_NEAR_INTENTS_PAIR_FILTER, + pairFilterFile = env.NEAR_INTENTS_PAIR_FILTER_FILE, + reloadEveryMs = env.NEAR_INTENTS_PAIR_FILTER_RELOAD_MS, +} = {}) { + const resolved = resolvePairFilter({ argv, env, defaultPairFilter }); + let currentPairFilter = resolved.pairFilter; + let currentPair = resolved.pair; + let lastLoadedFileValue = null; + let source = resolved.source; + + const normalizedPairFilterFile = String(pairFilterFile || '').trim() || null; + const normalizedReloadEveryMs = parseReloadMs(reloadEveryMs); + + if (normalizedPairFilterFile) { + const initialFileValue = readPairFilterFile(normalizedPairFilterFile); + if (initialFileValue != null) { + const initialFilePairFilter = parsePairFilterValue(initialFileValue, { + fieldName: 'NEAR_INTENTS_PAIR_FILTER_FILE', + }); + currentPairFilter = initialFilePairFilter; + currentPair = formatPairFilter(initialFilePairFilter); + lastLoadedFileValue = initialFileValue; + source = 'file'; + } + } + + logger?.info('pair_filter_configured', { + pair: currentPair, + details: { + source, + pair_filter_file: normalizedPairFilterFile, + }, + }); + + const timer = normalizedPairFilterFile + ? setInterval(() => { + const nextValue = readPairFilterFile(normalizedPairFilterFile); + if (nextValue == null || nextValue === lastLoadedFileValue) return; + + try { + const nextPairFilter = parsePairFilterValue(nextValue, { + fieldName: 'NEAR_INTENTS_PAIR_FILTER_FILE', + }); + currentPairFilter = nextPairFilter; + currentPair = formatPairFilter(nextPairFilter); + lastLoadedFileValue = nextValue; + logger?.info('pair_filter_reloaded', { + pair: currentPair, + details: { + pair_filter_file: normalizedPairFilterFile, + }, + }); + } catch (error) { + logger?.error('pair_filter_reload_failed', { + pair: currentPair, + details: { + pair_filter_file: normalizedPairFilterFile, + error: error.message, + }, + }); + } + }, normalizedReloadEveryMs) + : null; + + if (timer && typeof timer.unref === 'function') timer.unref(); + + return { + getPairFilter() { + return currentPairFilter; + }, + getPair() { + return currentPair; + }, + close() { + if (timer) clearInterval(timer); + }, + }; +} + export function matchesPairFilter(assetIn, assetOut, pairFilter) { if (!pairFilter) return true; const x = assetIn.toLowerCase(); const y = assetOut.toLowerCase(); return (x === pairFilter[0] && y === pairFilter[1]) || (x === pairFilter[1] && y === pairFilter[0]); } + +function readPairFilterFile(filePath) { + if (!fs.existsSync(filePath)) return null; + + const raw = fs.readFileSync(filePath, 'utf8') + .split(/\r?\n/) + .map((line) => line.trim()) + .find((line) => line && !line.startsWith('#')); + + return raw || null; +} + +function parseReloadMs(raw) { + const parsed = Number(raw); + return Number.isFinite(parsed) && parsed >= 1_000 ? parsed : 5_000; +} diff --git a/src/lib/config.mjs b/src/lib/config.mjs index 4e261cb..1b1e31c 100644 --- a/src/lib/config.mjs +++ b/src/lib/config.mjs @@ -1,7 +1,10 @@ import { loadDotenv } from './env.mjs'; +import { DEFAULT_NEAR_INTENTS_PAIR_FILTER } from '../core/pair-filter.mjs'; const DEFAULTS = { nearIntentsWsUrl: 'wss://solver-relay-v2.chaindefuser.com/ws', + nearIntentsPairFilter: DEFAULT_NEAR_INTENTS_PAIR_FILTER, + nearIntentsPairFilterReloadMs: 5_000, kafkaBrokers: ['127.0.0.1:9092'], kafkaClientId: 'unrip', kafkaTopicRawNearIntentsQuote: 'raw.near_intents.quote', @@ -11,6 +14,8 @@ const DEFAULTS = { kafkaConsumerGroupDummy: 'dummy-reactor-v1', kafkaConsumerGroupExecutor: 'dummy-executor-v1', executorStateDir: './var/executor-state', + projectName: 'unrip', + projectNamespace: 'unrip', }; function splitCsv(value) { @@ -32,6 +37,11 @@ export function loadConfig({ envPath = '.env' } = {}) { return { nearIntentsApiKey: process.env.NEAR_INTENTS_API_KEY || '', nearIntentsWsUrl: process.env.NEAR_INTENTS_WS_URL || DEFAULTS.nearIntentsWsUrl, + nearIntentsPairFilter: + process.env.NEAR_INTENTS_PAIR_FILTER || DEFAULTS.nearIntentsPairFilter, + nearIntentsPairFilterFile: process.env.NEAR_INTENTS_PAIR_FILTER_FILE || '', + nearIntentsPairFilterReloadMs: + parseNumber(process.env.NEAR_INTENTS_PAIR_FILTER_RELOAD_MS, DEFAULTS.nearIntentsPairFilterReloadMs), kafkaBrokers: splitCsv(process.env.KAFKA_BROKERS).length ? splitCsv(process.env.KAFKA_BROKERS) : DEFAULTS.kafkaBrokers, @@ -50,5 +60,13 @@ export function loadConfig({ envPath = '.env' } = {}) { process.env.KAFKA_CONSUMER_GROUP_EXECUTOR || DEFAULTS.kafkaConsumerGroupExecutor, executorStateDir: process.env.EXECUTOR_STATE_DIR || DEFAULTS.executorStateDir, + projectName: process.env.PROJECT_NAME || DEFAULTS.projectName, + projectNamespace: + process.env.PROJECT_NAMESPACE || process.env.PROJECT_NAME || DEFAULTS.projectNamespace, }; } + +function parseNumber(value, fallback) { + const parsed = Number(value); + return Number.isFinite(parsed) ? parsed : fallback; +} diff --git a/src/venues/near-intents/ws.mjs b/src/venues/near-intents/ws.mjs index f18ef54..221e584 100644 --- a/src/venues/near-intents/ws.mjs +++ b/src/venues/near-intents/ws.mjs @@ -1,5 +1,5 @@ import { matchesPairFilter } from '../../core/pair-filter.mjs'; -import { logStatus, startIdleHeartbeat } from '../../core/log.mjs'; +import { serializeError } from '../../core/log.mjs'; import { assertNormalizedSwapDemand } from '../../core/schemas.mjs'; import { buildNearIntentsQuoteEnvelope, buildNearIntentsRawEnvelope } from './normalize.mjs'; @@ -11,16 +11,18 @@ export async function startNearIntentsWs({ apiKey, wsUrl = DEFAULT_WS_URL, pairFilter, + getPairFilter = () => pairFilter, producer, rawTopic, normalizedTopic, + logger, + namespace = 'unrip', onPublish = defaultOnPublish, }) { if (!apiKey) throw new Error('Missing NEAR_INTENTS_API_KEY'); let quoteSubscriptionId = null; let quoteStatusSubscriptionId = null; - let lastStatusAt = Date.now(); let publishedCount = 0; let publishLocked = false; @@ -30,13 +32,14 @@ export async function startNearIntentsWs({ }); ws.addEventListener('open', () => { - logStatus('near-intents connected'); + logger?.info('connection_established', { + namespace, + }); ws.send(JSON.stringify({ jsonrpc: '2.0', id: QUOTE_SUB_ID, method: 'subscribe', params: ['quote'] })); ws.send(JSON.stringify({ jsonrpc: '2.0', id: QUOTE_STATUS_SUB_ID, method: 'subscribe', params: ['quote_status'] })); }); ws.addEventListener('message', async (event) => { - lastStatusAt = Date.now(); const text = typeof event.data === 'string' ? event.data : Buffer.from(event.data).toString('utf8'); let payload; @@ -73,7 +76,9 @@ export async function startNearIntentsWs({ const assetIn = envelope.payload?.asset_in; const assetOut = envelope.payload?.asset_out; if (!assetIn || !assetOut) return; - if (!matchesPairFilter(assetIn, assetOut, pairFilter)) return; + + const activePairFilter = getPairFilter(); + if (!matchesPairFilter(assetIn, assetOut, activePairFilter)) return; publishLocked = true; try { @@ -82,28 +87,41 @@ export async function startNearIntentsWs({ publishedCount += 1; onPublish(envelope, publishedCount); } catch (error) { - logStatus(`kafka publish failed: ${error.message || 'unknown error'}`); + logger?.error('publish_failed', { + namespace, + topic: normalizedTopic, + pair: `${assetIn}->${assetOut}`, + details: { + raw_topic: rawTopic, + error: serializeError(error), + quote_id: envelope.payload?.quote_id, + }, + }); } finally { publishLocked = false; } }); ws.addEventListener('close', () => { - logStatus('near-intents disconnected; reconnecting in 2s'); + logger?.warn('connection_lost', { + namespace, + details: { + reconnect_in_ms: 2_000, + }, + }); setTimeout(connect, 2000); }); ws.addEventListener('error', (err) => { - logStatus(`near-intents socket error: ${err.message || 'unknown error'}`); + logger?.error('socket_error', { + namespace, + details: { + error: serializeError(err), + }, + }); }); } - startIdleHeartbeat({ - label: 'near-intents', - getLastActivityAt: () => lastStatusAt, - getStatus: () => `published=${publishedCount}`, - }); - connect(); }