Reduce ingest scope and bootstrap app deploy

This commit is contained in:
philipp 2026-04-01 00:09:10 +02:00
parent 086ec01597
commit 24a5002d1d
18 changed files with 1134 additions and 187 deletions

View file

@ -1,6 +1,7 @@
# Local dev / container runtime values
NEAR_INTENTS_API_KEY=replace_me
NEAR_INTENTS_WS_URL=wss://solver-relay-v2.chaindefuser.com/ws
NEAR_INTENTS_PAIR_FILTER=nep141:btc.omft.near->nep141:gnosis-0x420ca0f9b9b604ce0fd9c18ef134c705e5fa3430.omf
KAFKA_BROKERS=redpanda:9092
KAFKA_CLIENT_ID=unrip
KAFKA_TOPIC_RAW_NEAR_INTENTS_QUOTE=raw.near_intents.quote
@ -11,29 +12,12 @@ KAFKA_CONSUMER_GROUP_DUMMY=dummy-reactor-v1
KAFKA_CONSUMER_GROUP_EXECUTOR=dummy-executor-v1
EXECUTOR_STATE_DIR=/var/lib/unrip/executor-state
# Repo-driven Hetzner bootstrap values live separately from the app .env.
# Copy scripts/hetzner/bootstrap-secrets.env.example to
# scripts/hetzner/bootstrap-secrets.env, configure non-secret values plus *_PASS
# mappings to your pass store, then:
# source scripts/hetzner/bootstrap-secrets.env
# bash scripts/hetzner/bootstrap.sh
#
# Canonical operator flow uses `pass` for sensitive values; explicit env vars still
# override pass-backed lookups for CI/testing.
#
# Expected bootstrap inputs now include:
# - HCLOUD_TOKEN_PASS or HCLOUD_TOKEN
# - SSH_PUBLIC_KEY_PATH
# - PUBLIC_DOMAIN
# - BASE_DOMAIN
# - LETSENCRYPT_EMAIL
# - REGISTRY_USERNAME
# - REGISTRY_PASSWORD_PASS or REGISTRY_PASSWORD
# - NEAR_INTENTS_API_KEY_PASS or NEAR_INTENTS_API_KEY
# - FORGEJO_ADMIN_USERNAME
# - FORGEJO_ADMIN_EMAIL
# - FORGEJO_ADMIN_PASSWORD_PASS or FORGEJO_ADMIN_PASSWORD
# - optional DNS provider creds via *_PASS or direct env vars
# Platform bootstrap values live in the separate infra/platform repo, not in
# this application repo. In the current local split that repo is `../unrip3`.
# Configure and run bootstrap there, then deploy this repo using the app-side
# workflow described in `docs/deployment.md`.
#
# Future k3s deployment should source the app values from Kubernetes Secret/ConfigMap.
# Hetzner provisioning is workstation-driven after Terraform; cloud-init no longer clones this repo onto the node.
# This repo expects app-side cluster secrets such as:
# - `unrip-secrets` for `NEAR_INTENTS_API_KEY`
# - `unrip-registry-creds` for image pulls and in-cluster Kaniko builds

View file

@ -20,7 +20,25 @@ jobs:
steps:
- name: Install tooling
run: |
apk add --no-cache git kubectl
if command -v git >/dev/null 2>&1 && command -v kubectl >/dev/null 2>&1; then
exit 0
fi
if command -v apk >/dev/null 2>&1; then
apk add --no-cache git kubectl
exit 0
fi
if command -v apt-get >/dev/null 2>&1; then
apt-get update
apt-get install -y git curl ca-certificates
curl -fsSLo /usr/local/bin/kubectl "https://dl.k8s.io/release/$(curl -fsSL https://dl.k8s.io/release/stable.txt)/bin/linux/amd64/kubectl"
chmod +x /usr/local/bin/kubectl
exit 0
fi
echo "missing git/kubectl and no supported package manager found" >&2
exit 1
- name: Load kubeconfig
run: |
@ -40,7 +58,7 @@ jobs:
- name: Resolve deployment settings
run: |
IMAGE="$REGISTRY_HOST/$PROJECT_NAME:$IMAGE_TAG"
BUILD_JOB="image-build-${GITHUB_SHA:0:12}"
BUILD_JOB="image-build-$(printf '%s' "$GITHUB_SHA" | cut -c1-12)"
{
echo "IMAGE=$IMAGE"
echo "BUILD_JOB=$BUILD_JOB"
@ -115,9 +133,7 @@ jobs:
- name: Roll deployments to new image
run: |
IFS=',' read -r -a DEPLOYMENTS <<< "$PROJECT_DEPLOYMENTS"
for deployment in "${DEPLOYMENTS[@]}"; do
printf '%s\n' "$PROJECT_DEPLOYMENTS" | tr ',' '\n' | while IFS= read -r deployment; do
deployment="$(echo "$deployment" | xargs)"
[ -n "$deployment" ] || continue

View file

@ -55,5 +55,51 @@ The shared cluster/platform resources live in the separate infra repository.
## Deployment
This repo includes `.forgejo/workflows/deploy.yml`.
On push to `main`, Forgejo builds the image from this repo root, pushes it to the shared registry, applies `deploy/k8s/base`, and rolls the app deployments in the `unrip` namespace.
This repo is the app-side deployment repo. The shared Hetzner/k3s bootstrap,
Forgejo runner, registry, and other platform services live in the separate
platform repo.
See `docs/deployment.md` for the full operator path.
### One-time app bootstrap
Bootstrap the app namespace, secrets, and Forgejo repo settings from this repo:
```bash
bash scripts/deploy/bootstrap.sh
```
By default, the script uses the adjacent platform checkout at `../unrip3` for:
- `kubeconfig.yaml`
- `kubeconfig.incluster.yaml`
- registry credentials
- the `NEAR_INTENTS_API_KEY` fallback from `../unrip3/.env`
If you are not using that local split, provide the values yourself via env vars
such as `KUBECONFIG_PATH`, `CI_KUBECONFIG_PATH`, `REGISTRY_HOST`,
`REGISTRY_USERNAME`, `REGISTRY_PASSWORD`, `NEAR_INTENTS_API_KEY`, and either
`FORGEJO_TOKEN` or `FORGEJO_ADMIN_USERNAME` / `FORGEJO_ADMIN_PASSWORD`.
### Routine deploy
After bootstrap, deployment is just a push to Forgejo `main`:
```bash
git push forgejo main
```
`.forgejo/workflows/deploy.yml` then:
- applies `deploy/k8s/base`
- builds the image from this repo root inside the cluster with Kaniko
- pushes it to the shared registry
- rolls the `unrip` deployments
### Observe rollout
```bash
KUBECONFIG=../unrip3/.state/hetzner/kubeconfig.yaml kubectl -n unrip get deploy,pods,job
KUBECONFIG=../unrip3/.state/hetzner/kubeconfig.yaml kubectl -n unrip rollout status deploy/near-intents-ingest
KUBECONFIG=../unrip3/.state/hetzner/kubeconfig.yaml kubectl -n unrip rollout status deploy/dummy-reactor
KUBECONFIG=../unrip3/.state/hetzner/kubeconfig.yaml kubectl -n unrip rollout status deploy/dummy-executor
KUBECONFIG=../unrip3/.state/hetzner/kubeconfig.yaml kubectl -n unrip rollout status deploy/dummy-consumer
```

View file

@ -5,6 +5,7 @@ metadata:
namespace: unrip
data:
NEAR_INTENTS_WS_URL: wss://solver-relay-v2.chaindefuser.com/ws
NEAR_INTENTS_PAIR_FILTER: nep141:btc.omft.near->nep141:gnosis-0x420ca0f9b9b604ce0fd9c18ef134c705e5fa3430.omf
KAFKA_BROKERS: redpanda.unrip.svc.cluster.local:9092
KAFKA_CLIENT_ID: unrip
KAFKA_TOPIC_RAW_NEAR_INTENTS_QUOTE: raw.near_intents.quote

188
docs/deployment.md Normal file
View file

@ -0,0 +1,188 @@
# Deployment
This repository owns the app-side deployment assets for `unrip`:
- application source
- Docker build inputs
- Kubernetes manifests under `deploy/k8s/base/`
- the Forgejo workflow under `.forgejo/workflows/deploy.yml`
The shared platform bootstrap lives in the separate infra/platform repository.
In the current local split, that repo is available as `../unrip3`.
## Ownership split
Platform repo responsibilities:
- Hetzner/OpenTofu provisioning
- cloud-init and k3s bootstrap
- shared registry
- Forgejo and the Forgejo runner
- cert-manager, Traefik, observability, and other cluster services
This repo responsibilities:
- app image build context
- app namespace manifests
- app runtime secret names
- app deployment workflow
## Deployment model
The intended production path is:
1. bootstrap the shared platform from the platform repo
2. create the `unrip` app secrets in the cluster
3. push this repo to Forgejo
4. let `.forgejo/workflows/deploy.yml` build and roll the app in-cluster
The image build happens inside Kubernetes via Kaniko. The operator pushes Git,
not Docker images.
## Platform prerequisite
Before deploying this repo, the platform repo should already have completed its
Hetzner bootstrap flow. From the current local split, the relevant operator docs
live in:
- `../unrip3/deploy/hetzner/README.md`
- `../unrip3/deploy/k8s/README.md`
- `../unrip3/docs/hetzner-self-hosted-ci-runbook.md`
After platform bootstrap, you should have:
- a reachable Forgejo instance
- a reachable shared registry
- a running Forgejo runner in the cluster
- `../unrip3/.state/hetzner/kubeconfig.yaml`
- `../unrip3/.state/hetzner/kubeconfig.incluster.yaml`
## One-time app namespace setup
Apply the namespace first:
```bash
kubectl apply -f deploy/k8s/base/namespace.yaml
```
Create the app runtime secret required by `near-intents-ingest`:
```bash
kubectl -n unrip create secret generic unrip-secrets \
--from-literal=NEAR_INTENTS_API_KEY=replace_me
```
Create the registry auth secret used both for image pulls and the in-cluster
Kaniko build job:
```bash
kubectl -n unrip create secret docker-registry unrip-registry-creds \
--docker-server=registry.<your-domain> \
--docker-username="$REGISTRY_USERNAME" \
--docker-password="$REGISTRY_PASSWORD"
```
Then apply the app manifests:
```bash
kubectl apply -k deploy/k8s/base
```
Notes:
- `deploy/k8s/base/unrip.yaml` references `unrip-secrets` and
`unrip-registry-creds`; they are not created by this repo.
- the checked-in image `ghcr.io/example/unrip:bootstrap` is only a placeholder.
The first real image tag is set by the Forgejo workflow or by a manual
`kubectl set image`.
## Required Forgejo repo settings
Required repo secret:
- `KUBECONFIG_B64`
Required repo variable:
- `REGISTRY_HOST`
Recommended repo variables when you do not want to rely on workflow defaults:
- `PROJECT_NAME`
- `PROJECT_NAMESPACE`
- `PROJECT_DEPLOYMENTS`
- `PROJECT_REGISTRY_SECRET_NAME`
Recommended values for this repo:
- `REGISTRY_HOST=registry.<your-domain>`
- `PROJECT_NAME=unrip`
- `PROJECT_NAMESPACE=unrip`
- `PROJECT_DEPLOYMENTS=near-intents-ingest,dummy-reactor,dummy-executor,dummy-consumer`
- `PROJECT_REGISTRY_SECRET_NAME=unrip-registry-creds`
`KUBECONFIG_B64` should be the base64-encoded contents of the in-cluster
kubeconfig produced by the platform repo, not the public workstation kubeconfig:
```bash
base64 -w0 ../unrip3/.state/hetzner/kubeconfig.incluster.yaml
```
The current workflow does not read `REGISTRY_USERNAME` or `REGISTRY_PASSWORD`
from Forgejo directly. Those values are still needed on the operator side when
creating the `unrip-registry-creds` Kubernetes secret.
## Seed the repo into Forgejo
Create the repository in Forgejo, add a `forgejo` remote for this repo, and
push `main`. For example:
```bash
git remote add forgejo https://git.<your-domain>/<owner>/$(basename "$PWD").git
git push forgejo HEAD:refs/heads/main
```
## Routine deploy flow
Once the repo exists in Forgejo and the repo settings above are configured:
```bash
git push forgejo main
```
On push to `main`, `.forgejo/workflows/deploy.yml` does the following:
1. loads kubeconfig from `KUBECONFIG_B64`
2. applies `deploy/k8s/base`
3. creates an in-cluster Kaniko Job in the `unrip` namespace
4. builds and pushes `REGISTRY_HOST/unrip:<git-sha>`
5. updates the four app deployments and waits for rollout
## Observe rollout
```bash
kubectl -n unrip get deploy,pods,pvc,job
kubectl -n unrip rollout status deploy/near-intents-ingest
kubectl -n unrip rollout status deploy/dummy-reactor
kubectl -n unrip rollout status deploy/dummy-executor
kubectl -n unrip rollout status deploy/dummy-consumer
```
Useful logs:
```bash
kubectl -n unrip logs deploy/near-intents-ingest -f
kubectl -n unrip logs deploy/dummy-reactor -f
kubectl -n unrip logs deploy/dummy-executor -f
kubectl -n unrip logs deploy/dummy-consumer -f
kubectl -n unrip logs job/redpanda-topic-bootstrap
```
## Local development
Local iteration remains separate from the production path:
```bash
cp .env.example .env
docker compose up -d --build
```
That path is for local testing. Production rollout is Forgejo + Kubernetes.

View file

@ -209,41 +209,46 @@ Target environment:
The first version may run on one machine, but deployment structure should already match a future distributed system.
### Current canonical operator path
The repo now documents and partially implements this path as the primary deployment workflow:
After the repo split, the primary deployment path is shared across two repos:
- the separate platform repo owns Hetzner/OpenTofu, cloud-init, k3s bootstrap,
Forgejo, the runner, and the shared registry
- this repo owns the app image, app manifests, and the app rollout workflow
#### Phase 0: workstation bootstrap
1. A local operator workstation prepares bootstrap secrets in `scripts/hetzner/bootstrap-secrets.env`.
2. The operator runs `bash scripts/hetzner/bootstrap.sh`.
#### Phase 0: platform bootstrap (platform repo)
1. A local operator workstation prepares platform bootstrap secrets in the
platform repo.
2. The operator runs the platform repo bootstrap flow.
3. Terraform provisions the server, firewall, network, and cloud-init user-data.
4. cloud-init installs k3s automatically and prepares persistence directories plus bootstrap artifacts.
5. The workstation waits for the public k3s API endpoint to report ready.
6. The workstation writes `.state/hetzner/kubeconfig.yaml`.
7. The workstation injects initial Kubernetes Secrets for app and Forgejo bootstrap.
8. The workstation applies repo-managed Kubernetes manifests under `deploy/k8s/`.
9. The workstation performs the first image/bootstrap delivery attempt for the app workloads.
10. The workstation verifies rollout status.
4. cloud-init installs k3s automatically and prepares persistence directories
plus bootstrap artifacts.
5. The workstation writes the public and in-cluster kubeconfigs.
6. The workstation injects the shared platform secrets and applies the shared
platform manifests.
7. Forgejo and the runner come online in the cluster.
#### Phase 1: self-hosted handoff
1. Forgejo becomes reachable in-cluster.
2. The operator completes initial Forgejo admin/repo setup.
3. This repo is pushed or mirrored into Forgejo.
4. The Forgejo runner becomes the routine app deployment mechanism.
5. Terraform remains the infra mutation entrypoint unless further automated later.
#### Phase 1: app repo handoff (this repo)
1. The operator creates this app repo's Kubernetes secrets such as
`unrip-secrets` and `unrip-registry-creds`.
2. This repo is pushed or mirrored into Forgejo.
3. On push to `main`, the Forgejo runner applies `deploy/k8s/base`, builds the
app image in-cluster with Kaniko, and updates the `unrip` deployments.
4. Terraform remains the infra mutation entrypoint, but app rollout is owned by
this repo's workflow.
### Failure-recovery expectation
The bootstrap path must be rerunnable from the workstation.
The overall bootstrap path must be rerunnable from the workstation.
Docs should keep treating recovery as:
- fix local secrets/inputs
- rerun the bootstrap script
- rerun the platform repo bootstrap script
- inspect the cluster with the generated kubeconfig
- destroy/recreate infra with `scripts/hetzner/destroy.sh` only when required
- destroy/recreate infra from the platform repo only when required
### Current repo-state caveats
The direction is clear, but the implementation is still mid-transition:
- the bootstrap script currently applies `deploy/k8s/base` directly rather than the Hetzner overlay
- kubeconfig/auth handling is not yet fully production-hardened
- first image delivery is still a bootstrap workaround rather than a final registry-native CI path
- Forgejo admin bootstrap, repo creation, and Actions configuration still require operator steps
The direction is clear, but the implementation still has caveats:
- this repo now assumes a pre-bootstrapped platform repo/cluster
- kubeconfig/auth handling is only as strong as the external Forgejo secret management
- base manifests still carry a placeholder bootstrap image until CI rolls the first real tag
- app secrets and registry pull credentials still require operator setup
- local Compose remains in the repo for development/testing, not as the canonical production path
### Minimal repo layout target

183
scripts/deploy/bootstrap.sh Executable file
View file

@ -0,0 +1,183 @@
#!/usr/bin/env bash
set -euo pipefail
ROOT_DIR=$(cd "$(dirname "$0")/../.." && pwd)
PLATFORM_REPO_DIR="${PLATFORM_REPO_DIR:-$ROOT_DIR/../unrip3}"
BOOTSTRAP_ENV_FILE="${BOOTSTRAP_ENV_FILE:-$PLATFORM_REPO_DIR/.state/hetzner/bootstrap-secrets.resolved.env}"
PLATFORM_APP_ENV_FILE="${PLATFORM_APP_ENV_FILE:-$PLATFORM_REPO_DIR/.env}"
APP_ENV_FILE="${APP_ENV_FILE:-$ROOT_DIR/.env}"
FORGEJO_REMOTE_NAME="${FORGEJO_REMOTE_NAME:-forgejo}"
PROJECT_NAME="${PROJECT_NAME:-unrip}"
PROJECT_NAMESPACE="${PROJECT_NAMESPACE:-$PROJECT_NAME}"
PROJECT_DEPLOYMENTS="${PROJECT_DEPLOYMENTS:-near-intents-ingest,dummy-reactor,dummy-executor,dummy-consumer}"
PROJECT_REGISTRY_SECRET_NAME="${PROJECT_REGISTRY_SECRET_NAME:-${PROJECT_NAME}-registry-creds}"
APP_SECRET_NAME="${APP_SECRET_NAME:-${PROJECT_NAME}-secrets}"
require() {
command -v "$1" >/dev/null 2>&1 || {
echo "missing required command: $1" >&2
exit 1
}
}
load_env_defaults() {
local file="$1"
[[ -f "$file" ]] || return 0
eval "$(
python3 - "$file" <<'PY'
import os
import shlex
import sys
for raw in open(sys.argv[1], 'r', encoding='utf-8'):
line = raw.strip()
if not line or line.startswith('#'):
continue
if line.startswith('export '):
line = line[len('export '):]
if '=' not in line:
continue
key, value = line.split('=', 1)
key = key.strip()
if key in os.environ:
continue
print(f'export {key}={shlex.quote(value)}')
PY
)"
}
require git
require kubectl
require python3
require base64
load_env_defaults "$BOOTSTRAP_ENV_FILE"
load_env_defaults "$PLATFORM_APP_ENV_FILE"
load_env_defaults "$APP_ENV_FILE"
REMOTE_URL="${FORGEJO_REMOTE_URL:-$(git -C "$ROOT_DIR" remote get-url "$FORGEJO_REMOTE_NAME")}"
eval "$(
python3 - "$REMOTE_URL" <<'PY'
import sys
from urllib.parse import urlparse
remote = sys.argv[1]
parsed = urlparse(remote)
if parsed.scheme not in {'http', 'https'}:
raise SystemExit('forgejo remote must use http(s) for automatic bootstrap')
path = parsed.path.rstrip('/')
if path.endswith('.git'):
path = path[:-4]
parts = [part for part in path.split('/') if part]
if len(parts) < 2:
raise SystemExit('could not parse repo owner/name from forgejo remote')
base_url = f'{parsed.scheme}://{parsed.hostname}'
if parsed.port:
base_url += f':{parsed.port}'
if parsed.username and parsed.password:
print(f'export FORGEJO_API_PASSWORD={parsed.password}')
if parsed.username:
print(f'export FORGEJO_API_USERNAME={parsed.username}')
print(f'export FORGEJO_URL={base_url}')
print(f'export FORGEJO_REPO_OWNER={parts[-2]}')
print(f'export FORGEJO_REPO_NAME={parts[-1]}')
PY
)"
: "${KUBECONFIG_PATH:=${KUBECONFIG:-$PLATFORM_REPO_DIR/.state/hetzner/kubeconfig.yaml}}"
: "${CI_KUBECONFIG_PATH:=$PLATFORM_REPO_DIR/.state/hetzner/kubeconfig.incluster.yaml}"
: "${FORGEJO_URL:?set FORGEJO_URL or configure a forgejo remote}"
: "${FORGEJO_REPO_OWNER:?set FORGEJO_REPO_OWNER}"
: "${FORGEJO_REPO_NAME:?set FORGEJO_REPO_NAME}"
: "${FORGEJO_ADMIN_USERNAME:=${FORGEJO_API_USERNAME:-}}"
if [[ -z "${FORGEJO_TOKEN:-}" ]]; then
: "${FORGEJO_ADMIN_USERNAME:=${FORGEJO_API_USERNAME:-}}"
: "${FORGEJO_ADMIN_USERNAME:?set FORGEJO_TOKEN or FORGEJO_ADMIN_USERNAME}"
: "${FORGEJO_ADMIN_PASSWORD:=${FORGEJO_API_PASSWORD:-}}"
: "${FORGEJO_ADMIN_PASSWORD:?set FORGEJO_TOKEN or FORGEJO_ADMIN_PASSWORD}"
fi
if [[ ! -f "$KUBECONFIG_PATH" ]]; then
echo "missing kubeconfig: $KUBECONFIG_PATH" >&2
exit 1
fi
if [[ ! -f "$CI_KUBECONFIG_PATH" ]]; then
echo "missing in-cluster kubeconfig: $CI_KUBECONFIG_PATH" >&2
exit 1
fi
export KUBECONFIG="$KUBECONFIG_PATH"
if [[ -z "${REGISTRY_HOST:-}" ]]; then
REGISTRY_HOST="${REGISTRY_DOMAIN:-}"
fi
if [[ -z "${REGISTRY_HOST:-}" ]]; then
REGISTRY_HOST="$(kubectl get ingress -n registry registry -o jsonpath='{.spec.rules[0].host}' 2>/dev/null || true)"
fi
if [[ -z "${REGISTRY_USERNAME:-}" ]]; then
REGISTRY_USERNAME="$(kubectl get secret registry-secrets -n registry -o jsonpath='{.data.htpasswd}' 2>/dev/null | base64 -d | cut -d: -f1 || true)"
fi
: "${REGISTRY_HOST:?set REGISTRY_HOST or configure the registry ingress first}"
: "${REGISTRY_USERNAME:?set REGISTRY_USERNAME or bootstrap the shared registry first}"
: "${REGISTRY_PASSWORD:?set REGISTRY_PASSWORD}"
: "${NEAR_INTENTS_API_KEY:?set NEAR_INTENTS_API_KEY}"
echo "bootstrapping namespace $PROJECT_NAMESPACE"
kubectl apply -f "$ROOT_DIR/deploy/k8s/base/namespace.yaml"
echo "upserting runtime secret $APP_SECRET_NAME"
kubectl -n "$PROJECT_NAMESPACE" create secret generic "$APP_SECRET_NAME" \
--from-literal=NEAR_INTENTS_API_KEY="$NEAR_INTENTS_API_KEY" \
--dry-run=client -o yaml | kubectl apply -f -
echo "upserting registry pull/push secret $PROJECT_REGISTRY_SECRET_NAME"
kubectl -n "$PROJECT_NAMESPACE" create secret docker-registry "$PROJECT_REGISTRY_SECRET_NAME" \
--docker-server="$REGISTRY_HOST" \
--docker-username="$REGISTRY_USERNAME" \
--docker-password="$REGISTRY_PASSWORD" \
--dry-run=client -o yaml | kubectl apply -f -
echo "applying app manifests"
kubectl apply -k "$ROOT_DIR/deploy/k8s/base"
echo "upserting Forgejo repo settings"
forgejo_args=()
if [[ -n "${FORGEJO_TOKEN:-}" ]]; then
forgejo_args+=(--token "$FORGEJO_TOKEN")
fi
if [[ -n "${FORGEJO_ADMIN_USERNAME:-}" ]]; then
forgejo_args+=(--admin-username "$FORGEJO_ADMIN_USERNAME")
fi
if [[ -n "${FORGEJO_ADMIN_PASSWORD:-}" ]]; then
forgejo_args+=(--admin-password "$FORGEJO_ADMIN_PASSWORD")
fi
python3 "$ROOT_DIR/scripts/deploy/forgejo_repo_bootstrap.py" \
--forgejo-url "$FORGEJO_URL" \
--repo-owner "$FORGEJO_REPO_OWNER" \
--repo-name "$FORGEJO_REPO_NAME" \
--ci-kubeconfig "$CI_KUBECONFIG_PATH" \
--registry-host "$REGISTRY_HOST" \
--project-name "$PROJECT_NAME" \
--project-namespace "$PROJECT_NAMESPACE" \
--project-deployments "$PROJECT_DEPLOYMENTS" \
--project-registry-secret-name "$PROJECT_REGISTRY_SECRET_NAME" \
"${forgejo_args[@]}"
cat <<EOF
bootstrap complete
next:
git commit -am "..."
git push $FORGEJO_REMOTE_NAME main
EOF

View file

@ -0,0 +1,138 @@
#!/usr/bin/env python3
import argparse
import base64
import json
import ssl
import urllib.error
import urllib.parse
import urllib.request
from pathlib import Path
class ForgejoClient:
def __init__(self, base_url: str, username: str | None = None, password: str | None = None, token: str | None = None):
self.base_url = base_url.rstrip('/')
self.username = username or ''
self.headers = {
'Accept': 'application/json',
'Content-Type': 'application/json',
}
if token:
self.headers['Authorization'] = f'token {token}'
elif username is not None and password is not None:
credentials = base64.b64encode(f'{username}:{password}'.encode()).decode()
self.headers['Authorization'] = f'Basic {credentials}'
else:
raise ValueError('ForgejoClient requires either token auth or username/password auth')
self.ssl_context = ssl.create_default_context()
def request(self, method: str, path: str, payload=None, expected=(200, 201, 204)):
url = f'{self.base_url}{path}'
data = None
if payload is not None:
data = json.dumps(payload).encode()
req = urllib.request.Request(url, data=data, method=method)
for key, value in self.headers.items():
req.add_header(key, value)
try:
with urllib.request.urlopen(req, context=self.ssl_context) as response:
body = response.read().decode() if response.length != 0 else ''
if response.status not in expected:
raise RuntimeError(f'{method} {path} returned {response.status}: {body}')
return json.loads(body) if body else None
except urllib.error.HTTPError as exc:
body = exc.read().decode()
if exc.code not in expected:
raise RuntimeError(f'{method} {path} returned {exc.code}: {body}') from exc
return json.loads(body) if body else None
def get_repo(self, owner: str, repo: str):
try:
return self.request('GET', f'/api/v1/repos/{owner}/{repo}')
except RuntimeError as exc:
if ' returned 404:' in str(exc):
return None
raise
def create_repo(self, owner: str, name: str, private: bool):
payload = {
'name': name,
'private': private,
'auto_init': False,
'default_branch': 'main',
}
if owner == self.username:
return self.request('POST', '/api/v1/user/repos', payload, expected=(201,))
return self.request('POST', f'/api/v1/orgs/{urllib.parse.quote(owner)}/repos', payload, expected=(201,))
def upsert_variable(self, owner: str, repo: str, name: str, value: str):
try:
self.request(
'POST',
f'/api/v1/repos/{owner}/{repo}/actions/variables/{urllib.parse.quote(name)}',
{'value': value},
expected=(201, 204),
)
except RuntimeError as exc:
if ' returned 409:' not in str(exc) and ' returned 422:' not in str(exc):
raise
self.request(
'PUT',
f'/api/v1/repos/{owner}/{repo}/actions/variables/{urllib.parse.quote(name)}',
{'value': value},
expected=(201, 204),
)
def upsert_secret(self, owner: str, repo: str, name: str, value: str):
self.request(
'PUT',
f'/api/v1/repos/{owner}/{repo}/actions/secrets/{urllib.parse.quote(name)}',
{'data': value},
expected=(201, 204),
)
def main():
parser = argparse.ArgumentParser(description='Bootstrap Forgejo repo secrets/variables for app deployment')
parser.add_argument('--forgejo-url', required=True)
parser.add_argument('--admin-username')
parser.add_argument('--admin-password')
parser.add_argument('--token')
parser.add_argument('--repo-owner', required=True)
parser.add_argument('--repo-name', required=True)
parser.add_argument('--repo-private', action='store_true', default=False)
parser.add_argument('--ci-kubeconfig', required=True)
parser.add_argument('--registry-host', required=True)
parser.add_argument('--project-name', required=True)
parser.add_argument('--project-namespace', required=True)
parser.add_argument('--project-deployments', required=True)
parser.add_argument('--project-registry-secret-name', required=True)
args = parser.parse_args()
client = ForgejoClient(args.forgejo_url, args.admin_username, args.admin_password, args.token)
repo = client.get_repo(args.repo_owner, args.repo_name)
if repo is None:
created = client.create_repo(args.repo_owner, args.repo_name, args.repo_private)
print(f'created repo {created["full_name"]}')
else:
print(f'repo already exists: {repo["full_name"]}')
kubeconfig_b64 = base64.b64encode(Path(args.ci_kubeconfig).read_bytes()).decode()
client.upsert_secret(args.repo_owner, args.repo_name, 'KUBECONFIG_B64', kubeconfig_b64)
print('upserted repo action secret KUBECONFIG_B64')
client.upsert_variable(args.repo_owner, args.repo_name, 'REGISTRY_HOST', args.registry_host)
client.upsert_variable(args.repo_owner, args.repo_name, 'PROJECT_NAME', args.project_name)
client.upsert_variable(args.repo_owner, args.repo_name, 'PROJECT_NAMESPACE', args.project_namespace)
client.upsert_variable(args.repo_owner, args.repo_name, 'PROJECT_DEPLOYMENTS', args.project_deployments)
client.upsert_variable(
args.repo_owner,
args.repo_name,
'PROJECT_REGISTRY_SECRET_NAME',
args.project_registry_secret_name,
)
print('upserted repo action variables')
if __name__ == '__main__':
main()

View file

@ -1,20 +1,25 @@
import process from 'node:process';
import { createConsumer } from '../bus/kafka/consumer.mjs';
import { logStatus } from '../core/log.mjs';
import { createLogger, serializeError } from '../core/log.mjs';
import { parseEventMessage } from '../core/event-envelope.mjs';
import { assertTradeResult } from '../core/schemas.mjs';
import { loadConfig } from '../lib/config.mjs';
const config = loadConfig();
const logger = createLogger({
service: 'dummy-consumer',
component: 'consumer',
namespace: config.projectNamespace,
});
const consumer = await createConsumer({
groupId: `${config.kafkaConsumerGroupExecutor}-results-view`,
brokers: config.kafkaBrokers,
clientId: config.kafkaClientId,
logger,
});
await consumer.subscribe({ topic: config.kafkaTopicExecTradeResult, fromBeginning: false });
logStatus(`result consumer subscribed to ${config.kafkaTopicExecTradeResult}`);
process.on('SIGINT', async () => {
await consumer.disconnect();
@ -28,15 +33,29 @@ process.on('SIGTERM', async () => {
await consumer.run({
eachMessage: async ({ message }) => {
if (!message.value) return;
let event;
try {
event = parseEventMessage(message.value.toString());
} catch {
logStatus('result consumer received non-JSON message; skipping');
logger.warn('invalid_json_message', {
topic: config.kafkaTopicExecTradeResult,
});
return;
}
assertTradeResult(event);
const payload = event.payload;
console.log(`[result] command_id=${payload.command_id} quote_id=${payload.quote_id} status=${payload.status} result_code=${payload.result_code || 'n/a'}`);
try {
assertTradeResult(event);
} catch (error) {
logger.error('message_processing_failed', {
venue: event.venue || 'near-intents',
topic: config.kafkaTopicExecTradeResult,
details: {
error: serializeError(error),
command_id: event.payload?.command_id,
quote_id: event.payload?.quote_id,
},
});
}
},
});

View file

@ -4,27 +4,32 @@ import { createConsumer } from '../bus/kafka/consumer.mjs';
import { createProducer } from '../bus/kafka/producer.mjs';
import { buildEventEnvelope, parseEventMessage } from '../core/event-envelope.mjs';
import { createExecutorStateStore } from '../core/executor-state-store.mjs';
import { logStatus } from '../core/log.mjs';
import { createLogger, serializeError } from '../core/log.mjs';
import { assertExecuteTradeCommand, assertTradeResult } from '../core/schemas.mjs';
import { loadConfig } from '../lib/config.mjs';
const config = loadConfig();
const logger = createLogger({
service: 'dummy-executor',
component: 'executor',
namespace: config.projectNamespace,
});
const consumer = await createConsumer({
groupId: config.kafkaConsumerGroupExecutor,
brokers: config.kafkaBrokers,
clientId: config.kafkaClientId,
logger,
});
const producer = await createProducer({
brokers: config.kafkaBrokers,
clientId: config.kafkaClientId,
logger,
});
const stateStore = createExecutorStateStore({ stateDir: config.executorStateDir });
await consumer.subscribe({ topic: config.kafkaTopicCmdExecuteTrade, fromBeginning: false });
logStatus(`dummy executor subscribed to ${config.kafkaTopicCmdExecuteTrade} as ${config.kafkaConsumerGroupExecutor}`);
logStatus(`dummy executor will publish results to ${config.kafkaTopicExecTradeResult}; state_dir=${config.executorStateDir}`);
async function shutdown() {
await consumer.disconnect();
@ -42,52 +47,89 @@ await consumer.run({
try {
event = parseEventMessage(message.value.toString());
} catch {
logStatus('dummy executor received non-JSON message; skipping');
logger.warn('invalid_json_message', {
topic: config.kafkaTopicCmdExecuteTrade,
});
return;
}
assertExecuteTradeCommand(event);
try {
assertExecuteTradeCommand(event);
const payload = event.payload;
const commandId = payload.command_id;
const existing = stateStore.get(commandId);
if (existing?.status === 'completed') {
logStatus(`dummy executor skipping duplicate command_id=${commandId}`);
return;
}
const payload = event.payload;
const commandId = payload.command_id;
const pair = `${payload.asset_in}->${payload.asset_out}`;
const existing = stateStore.get(commandId);
if (existing?.status === 'completed') {
logger.warn('duplicate_command_skipped', {
venue: event.venue || 'near-intents',
topic: config.kafkaTopicCmdExecuteTrade,
pair,
details: {
command_id: commandId,
quote_id: payload.quote_id,
},
});
return;
}
stateStore.markProcessing(commandId, {
idempotency_key: payload.idempotency_key,
execution_key: payload.execution_key,
quote_id: payload.quote_id,
});
const pair = `${payload.asset_in} -> ${payload.asset_out}`;
const result = buildEventEnvelope({
source: 'dummy-executor',
venue: event.venue || 'near-intents',
eventType: 'trade_result',
eventId: `exec-${commandId}`,
observedAt: event.observed_at,
payload: {
command_id: commandId,
stateStore.markProcessing(commandId, {
idempotency_key: payload.idempotency_key,
execution_key: payload.execution_key,
quote_id: payload.quote_id,
status: 'simulated_sent',
result_code: existing?.status === 'processing' ? 'recovered_inflight' : 'sent',
note: 'dummy executor placeholder result',
},
});
assertTradeResult(result);
});
await producer.sendJson(config.kafkaTopicExecTradeResult, result, { key: payload.execution_key });
stateStore.markCompleted(commandId, {
idempotency_key: payload.idempotency_key,
execution_key: payload.execution_key,
quote_id: payload.quote_id,
result_event_id: result.event_id,
});
console.log(`[dummy-executor] result emitted ${pair} quote_id=${payload.quote_id} command_id=${commandId} status=simulated_sent`);
const recoveredInflight = existing?.status === 'processing';
const result = buildEventEnvelope({
source: 'dummy-executor',
venue: event.venue || 'near-intents',
eventType: 'trade_result',
eventId: `exec-${commandId}`,
observedAt: event.observed_at,
payload: {
command_id: commandId,
idempotency_key: payload.idempotency_key,
execution_key: payload.execution_key,
quote_id: payload.quote_id,
status: 'simulated_sent',
result_code: recoveredInflight ? 'recovered_inflight' : 'sent',
note: 'dummy executor placeholder result',
},
});
assertTradeResult(result);
await producer.sendJson(config.kafkaTopicExecTradeResult, result, { key: payload.execution_key });
stateStore.markCompleted(commandId, {
idempotency_key: payload.idempotency_key,
execution_key: payload.execution_key,
quote_id: payload.quote_id,
result_event_id: result.event_id,
});
if (recoveredInflight) {
logger.warn('inflight_command_recovered', {
venue: event.venue || 'near-intents',
topic: config.kafkaTopicExecTradeResult,
pair,
details: {
command_id: commandId,
quote_id: payload.quote_id,
},
});
}
} catch (error) {
logger.error('message_processing_failed', {
venue: event.venue || 'near-intents',
topic: config.kafkaTopicCmdExecuteTrade,
pair: event.payload?.asset_in && event.payload?.asset_out
? `${event.payload.asset_in}->${event.payload.asset_out}`
: undefined,
details: {
error: serializeError(error),
command_id: event.payload?.command_id,
quote_id: event.payload?.quote_id,
},
});
}
},
});

View file

@ -2,26 +2,31 @@ import process from 'node:process';
import { createConsumer } from '../bus/kafka/consumer.mjs';
import { createProducer } from '../bus/kafka/producer.mjs';
import { logStatus } from '../core/log.mjs';
import { createLogger, serializeError } from '../core/log.mjs';
import { loadConfig } from '../lib/config.mjs';
import { buildEventEnvelope, parseEventMessage } from '../core/event-envelope.mjs';
import { assertExecuteTradeCommand, assertNormalizedSwapDemand } from '../core/schemas.mjs';
const config = loadConfig();
const logger = createLogger({
service: 'dummy-reactor',
component: 'reactor',
namespace: config.projectNamespace,
});
const consumer = await createConsumer({
groupId: config.kafkaConsumerGroupDummy,
brokers: config.kafkaBrokers,
clientId: config.kafkaClientId,
logger,
});
const producer = await createProducer({
brokers: config.kafkaBrokers,
clientId: config.kafkaClientId,
logger,
});
await consumer.subscribe({ topic: config.kafkaTopicNormSwapDemand, fromBeginning: false });
logStatus(`dummy reactor subscribed to ${config.kafkaTopicNormSwapDemand} as ${config.kafkaConsumerGroupDummy}`);
logStatus(`dummy reactor will publish commands to ${config.kafkaTopicCmdExecuteTrade}`);
async function shutdown() {
await consumer.disconnect();
@ -39,37 +44,53 @@ await consumer.run({
try {
event = parseEventMessage(message.value.toString());
} catch {
logStatus('dummy reactor received non-JSON message; skipping');
logger.warn('invalid_json_message', {
topic: config.kafkaTopicNormSwapDemand,
});
return;
}
assertNormalizedSwapDemand(event);
try {
assertNormalizedSwapDemand(event);
const payload = event.payload;
const pair = `${payload.asset_in} -> ${payload.asset_out}`;
const quoteId = payload.quote_id;
const commandId = `cmd-${quoteId}`;
const command = buildEventEnvelope({
source: 'dummy-reactor',
venue: event.venue || 'near-intents',
eventType: 'execute_trade',
eventId: commandId,
observedAt: event.observed_at,
payload: {
command_id: commandId,
idempotency_key: `${event.venue || 'near-intents'}:${quoteId}`,
execution_key: `${event.venue || 'near-intents'}:${payload.asset_in}->${payload.asset_out}`,
quote_id: quoteId,
asset_in: payload.asset_in,
asset_out: payload.asset_out,
amount_in: payload.amount_in,
amount_out: payload.amount_out,
reason: 'dummy reactor placeholder decision',
},
});
assertExecuteTradeCommand(command);
const payload = event.payload;
const pair = `${payload.asset_in}->${payload.asset_out}`;
const quoteId = payload.quote_id;
const commandId = `cmd-${quoteId}`;
const command = buildEventEnvelope({
source: 'dummy-reactor',
venue: event.venue || 'near-intents',
eventType: 'execute_trade',
eventId: commandId,
observedAt: event.observed_at,
payload: {
command_id: commandId,
idempotency_key: `${event.venue || 'near-intents'}:${quoteId}`,
execution_key: `${event.venue || 'near-intents'}:${payload.asset_in}->${payload.asset_out}`,
quote_id: quoteId,
asset_in: payload.asset_in,
asset_out: payload.asset_out,
amount_in: payload.amount_in,
amount_out: payload.amount_out,
reason: 'dummy reactor placeholder decision',
},
});
assertExecuteTradeCommand(command);
await producer.sendJson(config.kafkaTopicCmdExecuteTrade, command, { key: command.payload.execution_key });
console.log(`[dummy-reactor] command emitted ${pair} quote_id=${quoteId} command_id=${commandId}`);
await producer.sendJson(config.kafkaTopicCmdExecuteTrade, command, { key: command.payload.execution_key });
return;
} catch (error) {
logger.error('message_processing_failed', {
venue: event.venue || 'near-intents',
topic: config.kafkaTopicNormSwapDemand,
pair: event.payload?.asset_in && event.payload?.asset_out
? `${event.payload.asset_in}->${event.payload.asset_out}`
: undefined,
details: {
error: serializeError(error),
quote_id: event.payload?.quote_id,
},
});
}
},
});

View file

@ -1,32 +1,53 @@
import process from 'node:process';
import { createProducer } from '../bus/kafka/producer.mjs';
import { logStatus } from '../core/log.mjs';
import { parsePairFilter } from '../core/pair-filter.mjs';
import { createLogger } from '../core/log.mjs';
import { createPairFilterController } from '../core/pair-filter.mjs';
import { loadConfig } from '../lib/config.mjs';
import { startNearIntentsWs } from '../venues/near-intents/ws.mjs';
const config = loadConfig();
const pairFilter = parsePairFilter(process.argv.slice(2));
const logger = createLogger({
service: 'near-intents-ingest',
component: 'ingest',
namespace: config.projectNamespace,
});
const pairFilterController = createPairFilterController({
argv: process.argv.slice(2),
env: process.env,
defaultPairFilter: config.nearIntentsPairFilter,
pairFilterFile: config.nearIntentsPairFilterFile,
reloadEveryMs: config.nearIntentsPairFilterReloadMs,
logger: logger.child({
component: 'filter',
venue: 'near-intents',
}),
});
if (!config.nearIntentsApiKey) {
console.error('Missing NEAR_INTENTS_API_KEY in env or .env');
logger.error('missing_api_key', {
venue: 'near-intents',
details: {
variable: 'NEAR_INTENTS_API_KEY',
},
});
process.exit(1);
}
const producer = await createProducer({
brokers: config.kafkaBrokers,
clientId: config.kafkaClientId,
logger,
});
logStatus(`kafka producer connected; raw_topic=${config.kafkaTopicRawNearIntentsQuote}; normalized_topic=${config.kafkaTopicNormSwapDemand}`);
if (pairFilter) logStatus(`pair filter enabled: ${pairFilter[0]} <-> ${pairFilter[1]}`);
process.on('SIGINT', async () => {
pairFilterController.close();
await producer.disconnect();
process.exit(0);
});
process.on('SIGTERM', async () => {
pairFilterController.close();
await producer.disconnect();
process.exit(0);
});
@ -34,8 +55,13 @@ process.on('SIGTERM', async () => {
await startNearIntentsWs({
apiKey: config.nearIntentsApiKey,
wsUrl: config.nearIntentsWsUrl,
pairFilter,
getPairFilter: () => pairFilterController.getPairFilter(),
producer,
rawTopic: config.kafkaTopicRawNearIntentsQuote,
normalizedTopic: config.kafkaTopicNormSwapDemand,
namespace: config.projectNamespace,
logger: logger.child({
component: 'ws',
venue: 'near-intents',
}),
});

View file

@ -1,11 +1,43 @@
import { Kafka } from 'kafkajs';
import { serializeError } from '../../core/log.mjs';
function createKafka({ brokers = ['127.0.0.1:9092'], clientId = 'unrip' } = {}) {
return new Kafka({ clientId, brokers });
}
export async function createConsumer({ groupId, ...options }) {
export async function createConsumer({ groupId, logger, ...options }) {
const consumer = createKafka(options).consumer({ groupId });
const kafkaLogger = logger ? logger.child({ component: 'kafka' }) : null;
consumer.on(consumer.events.CONNECT, () => {
kafkaLogger?.info('kafka_connected', {
details: {
client_type: 'consumer',
group_id: groupId,
},
});
});
consumer.on(consumer.events.DISCONNECT, () => {
kafkaLogger?.warn('kafka_disconnected', {
details: {
client_type: 'consumer',
group_id: groupId,
},
});
});
consumer.on(consumer.events.CRASH, ({ payload }) => {
kafkaLogger?.error('kafka_consumer_crashed', {
details: {
client_type: 'consumer',
group_id: groupId,
restart: payload?.restart ?? null,
error: serializeError(payload?.error),
},
});
});
await consumer.connect();
return {

View file

@ -1,11 +1,41 @@
import { Kafka } from 'kafkajs';
import { serializeError } from '../../core/log.mjs';
function createKafka({ brokers = ['127.0.0.1:9092'], clientId = 'unrip' } = {}) {
return new Kafka({ clientId, brokers });
}
export async function createProducer(options = {}) {
export async function createProducer({ logger, ...options } = {}) {
const producer = createKafka(options).producer();
const kafkaLogger = logger ? logger.child({ component: 'kafka' }) : null;
producer.on(producer.events.CONNECT, () => {
kafkaLogger?.info('kafka_connected', {
details: {
client_type: 'producer',
},
});
});
producer.on(producer.events.DISCONNECT, () => {
kafkaLogger?.warn('kafka_disconnected', {
details: {
client_type: 'producer',
},
});
});
producer.on(producer.events.REQUEST_TIMEOUT, ({ payload }) => {
kafkaLogger?.error('kafka_request_timeout', {
details: {
client_type: 'producer',
broker: payload?.broker ?? null,
client_id: payload?.clientId ?? null,
error: serializeError(payload?.error),
},
});
});
await producer.connect();
return {
async sendJson(topic, event, { key = event?.event_id ?? event?.key ?? null } = {}) {

View file

@ -1,31 +1,55 @@
export function logStatus(message) {
const time = new Date().toISOString();
console.error(`[${time}] ${message}`);
import process from 'node:process';
export function logJson(event) {
process.stdout.write(`${JSON.stringify(compact({
ts: new Date().toISOString(),
...event,
}))}\n`);
}
export function startIdleHeartbeat({
label,
getLastActivityAt,
getStatus,
idleAfterMs = 30_000,
checkEveryMs = 5_000,
}) {
let lastHeartbeatAt = 0;
export function createLogger(bindings = {}) {
const base = compact(bindings);
const timer = setInterval(() => {
const lastActivityAt = getLastActivityAt();
const idleForMs = Date.now() - lastActivityAt;
if (idleForMs < idleAfterMs) return;
if (Date.now() - lastHeartbeatAt < idleAfterMs) return;
const seconds = Math.floor(idleForMs / 1000);
const suffix = getStatus ? `; ${getStatus()}` : '';
logStatus(`${label} idle ${seconds}s${suffix}`);
lastHeartbeatAt = Date.now();
}, checkEveryMs);
if (typeof timer.unref === 'function') timer.unref();
return () => clearInterval(timer);
return {
child(extraBindings = {}) {
return createLogger({ ...base, ...extraBindings });
},
info(event, fields = {}) {
logJson({ level: 'info', ...base, event, ...compact(fields) });
},
warn(event, fields = {}) {
logJson({ level: 'warn', ...base, event, ...compact(fields) });
},
error(event, fields = {}) {
logJson({ level: 'error', ...base, event, ...compact(fields) });
},
};
}
export function serializeError(error) {
if (!error) return null;
if (error instanceof Error) {
return compact({
name: error.name,
message: error.message,
stack: error.stack,
code: error.code,
});
}
if (typeof error === 'object') return compact(error);
return { message: String(error) };
}
function compact(value) {
if (Array.isArray(value)) {
return value.map((item) => compact(item));
}
if (!value || typeof value !== 'object') return value;
const entries = Object.entries(value)
.filter(([, entry]) => entry !== undefined)
.map(([key, entry]) => [key, compact(entry)]);
return Object.fromEntries(entries);
}

View file

@ -1,17 +1,173 @@
import fs from 'node:fs';
export const DEFAULT_NEAR_INTENTS_PAIR_FILTER =
'nep141:btc.omft.near->nep141:gnosis-0x420ca0f9b9b604ce0fd9c18ef134c705e5fa3430.omf';
export function parsePairFilter(argv) {
const idx = argv.indexOf('--pair');
if (idx === -1) return null;
const raw = argv[idx + 1];
if (!raw || !raw.includes('->')) {
throw new Error("Use --pair 'asset_a->asset_b'");
return parsePairFilterValue(argv[idx + 1], { fieldName: '--pair' });
}
export function parsePairFilterValue(raw, { fieldName = 'pair filter' } = {}) {
const normalized = String(raw || '').trim();
if (!normalized) return null;
if (['all', '*', 'off', 'none', 'disabled'].includes(normalized.toLowerCase())) {
return null;
}
const [a, b] = raw.split('->').map((x) => x.trim().toLowerCase());
if (!normalized.includes('->')) {
throw new Error(`Use ${fieldName} like 'asset_a->asset_b'`);
}
const [a, b] = normalized.split('->').map((value) => value.trim().toLowerCase());
if (!a || !b) throw new Error(`Use ${fieldName} like 'asset_a->asset_b'`);
return [a, b];
}
export function formatPairFilter(pairFilter) {
if (!pairFilter) return null;
return `${pairFilter[0]}->${pairFilter[1]}`;
}
export function resolvePairFilter({
argv = [],
env = process.env,
defaultPairFilter = DEFAULT_NEAR_INTENTS_PAIR_FILTER,
} = {}) {
const cliPairFilter = parsePairFilter(argv);
if (cliPairFilter) {
return {
pairFilter: cliPairFilter,
pair: formatPairFilter(cliPairFilter),
source: 'argv',
};
}
const envPairFilter = parsePairFilterValue(env.NEAR_INTENTS_PAIR_FILTER, {
fieldName: 'NEAR_INTENTS_PAIR_FILTER',
});
if (envPairFilter) {
return {
pairFilter: envPairFilter,
pair: formatPairFilter(envPairFilter),
source: 'env',
};
}
const defaultResolved = parsePairFilterValue(defaultPairFilter, {
fieldName: 'default pair filter',
});
return {
pairFilter: defaultResolved,
pair: formatPairFilter(defaultResolved),
source: 'default',
};
}
export function createPairFilterController({
argv = [],
env = process.env,
logger = null,
defaultPairFilter = DEFAULT_NEAR_INTENTS_PAIR_FILTER,
pairFilterFile = env.NEAR_INTENTS_PAIR_FILTER_FILE,
reloadEveryMs = env.NEAR_INTENTS_PAIR_FILTER_RELOAD_MS,
} = {}) {
const resolved = resolvePairFilter({ argv, env, defaultPairFilter });
let currentPairFilter = resolved.pairFilter;
let currentPair = resolved.pair;
let lastLoadedFileValue = null;
let source = resolved.source;
const normalizedPairFilterFile = String(pairFilterFile || '').trim() || null;
const normalizedReloadEveryMs = parseReloadMs(reloadEveryMs);
if (normalizedPairFilterFile) {
const initialFileValue = readPairFilterFile(normalizedPairFilterFile);
if (initialFileValue != null) {
const initialFilePairFilter = parsePairFilterValue(initialFileValue, {
fieldName: 'NEAR_INTENTS_PAIR_FILTER_FILE',
});
currentPairFilter = initialFilePairFilter;
currentPair = formatPairFilter(initialFilePairFilter);
lastLoadedFileValue = initialFileValue;
source = 'file';
}
}
logger?.info('pair_filter_configured', {
pair: currentPair,
details: {
source,
pair_filter_file: normalizedPairFilterFile,
},
});
const timer = normalizedPairFilterFile
? setInterval(() => {
const nextValue = readPairFilterFile(normalizedPairFilterFile);
if (nextValue == null || nextValue === lastLoadedFileValue) return;
try {
const nextPairFilter = parsePairFilterValue(nextValue, {
fieldName: 'NEAR_INTENTS_PAIR_FILTER_FILE',
});
currentPairFilter = nextPairFilter;
currentPair = formatPairFilter(nextPairFilter);
lastLoadedFileValue = nextValue;
logger?.info('pair_filter_reloaded', {
pair: currentPair,
details: {
pair_filter_file: normalizedPairFilterFile,
},
});
} catch (error) {
logger?.error('pair_filter_reload_failed', {
pair: currentPair,
details: {
pair_filter_file: normalizedPairFilterFile,
error: error.message,
},
});
}
}, normalizedReloadEveryMs)
: null;
if (timer && typeof timer.unref === 'function') timer.unref();
return {
getPairFilter() {
return currentPairFilter;
},
getPair() {
return currentPair;
},
close() {
if (timer) clearInterval(timer);
},
};
}
export function matchesPairFilter(assetIn, assetOut, pairFilter) {
if (!pairFilter) return true;
const x = assetIn.toLowerCase();
const y = assetOut.toLowerCase();
return (x === pairFilter[0] && y === pairFilter[1]) || (x === pairFilter[1] && y === pairFilter[0]);
}
function readPairFilterFile(filePath) {
if (!fs.existsSync(filePath)) return null;
const raw = fs.readFileSync(filePath, 'utf8')
.split(/\r?\n/)
.map((line) => line.trim())
.find((line) => line && !line.startsWith('#'));
return raw || null;
}
function parseReloadMs(raw) {
const parsed = Number(raw);
return Number.isFinite(parsed) && parsed >= 1_000 ? parsed : 5_000;
}

View file

@ -1,7 +1,10 @@
import { loadDotenv } from './env.mjs';
import { DEFAULT_NEAR_INTENTS_PAIR_FILTER } from '../core/pair-filter.mjs';
const DEFAULTS = {
nearIntentsWsUrl: 'wss://solver-relay-v2.chaindefuser.com/ws',
nearIntentsPairFilter: DEFAULT_NEAR_INTENTS_PAIR_FILTER,
nearIntentsPairFilterReloadMs: 5_000,
kafkaBrokers: ['127.0.0.1:9092'],
kafkaClientId: 'unrip',
kafkaTopicRawNearIntentsQuote: 'raw.near_intents.quote',
@ -11,6 +14,8 @@ const DEFAULTS = {
kafkaConsumerGroupDummy: 'dummy-reactor-v1',
kafkaConsumerGroupExecutor: 'dummy-executor-v1',
executorStateDir: './var/executor-state',
projectName: 'unrip',
projectNamespace: 'unrip',
};
function splitCsv(value) {
@ -32,6 +37,11 @@ export function loadConfig({ envPath = '.env' } = {}) {
return {
nearIntentsApiKey: process.env.NEAR_INTENTS_API_KEY || '',
nearIntentsWsUrl: process.env.NEAR_INTENTS_WS_URL || DEFAULTS.nearIntentsWsUrl,
nearIntentsPairFilter:
process.env.NEAR_INTENTS_PAIR_FILTER || DEFAULTS.nearIntentsPairFilter,
nearIntentsPairFilterFile: process.env.NEAR_INTENTS_PAIR_FILTER_FILE || '',
nearIntentsPairFilterReloadMs:
parseNumber(process.env.NEAR_INTENTS_PAIR_FILTER_RELOAD_MS, DEFAULTS.nearIntentsPairFilterReloadMs),
kafkaBrokers: splitCsv(process.env.KAFKA_BROKERS).length
? splitCsv(process.env.KAFKA_BROKERS)
: DEFAULTS.kafkaBrokers,
@ -50,5 +60,13 @@ export function loadConfig({ envPath = '.env' } = {}) {
process.env.KAFKA_CONSUMER_GROUP_EXECUTOR || DEFAULTS.kafkaConsumerGroupExecutor,
executorStateDir:
process.env.EXECUTOR_STATE_DIR || DEFAULTS.executorStateDir,
projectName: process.env.PROJECT_NAME || DEFAULTS.projectName,
projectNamespace:
process.env.PROJECT_NAMESPACE || process.env.PROJECT_NAME || DEFAULTS.projectNamespace,
};
}
function parseNumber(value, fallback) {
const parsed = Number(value);
return Number.isFinite(parsed) ? parsed : fallback;
}

View file

@ -1,5 +1,5 @@
import { matchesPairFilter } from '../../core/pair-filter.mjs';
import { logStatus, startIdleHeartbeat } from '../../core/log.mjs';
import { serializeError } from '../../core/log.mjs';
import { assertNormalizedSwapDemand } from '../../core/schemas.mjs';
import { buildNearIntentsQuoteEnvelope, buildNearIntentsRawEnvelope } from './normalize.mjs';
@ -11,16 +11,18 @@ export async function startNearIntentsWs({
apiKey,
wsUrl = DEFAULT_WS_URL,
pairFilter,
getPairFilter = () => pairFilter,
producer,
rawTopic,
normalizedTopic,
logger,
namespace = 'unrip',
onPublish = defaultOnPublish,
}) {
if (!apiKey) throw new Error('Missing NEAR_INTENTS_API_KEY');
let quoteSubscriptionId = null;
let quoteStatusSubscriptionId = null;
let lastStatusAt = Date.now();
let publishedCount = 0;
let publishLocked = false;
@ -30,13 +32,14 @@ export async function startNearIntentsWs({
});
ws.addEventListener('open', () => {
logStatus('near-intents connected');
logger?.info('connection_established', {
namespace,
});
ws.send(JSON.stringify({ jsonrpc: '2.0', id: QUOTE_SUB_ID, method: 'subscribe', params: ['quote'] }));
ws.send(JSON.stringify({ jsonrpc: '2.0', id: QUOTE_STATUS_SUB_ID, method: 'subscribe', params: ['quote_status'] }));
});
ws.addEventListener('message', async (event) => {
lastStatusAt = Date.now();
const text = typeof event.data === 'string' ? event.data : Buffer.from(event.data).toString('utf8');
let payload;
@ -73,7 +76,9 @@ export async function startNearIntentsWs({
const assetIn = envelope.payload?.asset_in;
const assetOut = envelope.payload?.asset_out;
if (!assetIn || !assetOut) return;
if (!matchesPairFilter(assetIn, assetOut, pairFilter)) return;
const activePairFilter = getPairFilter();
if (!matchesPairFilter(assetIn, assetOut, activePairFilter)) return;
publishLocked = true;
try {
@ -82,28 +87,41 @@ export async function startNearIntentsWs({
publishedCount += 1;
onPublish(envelope, publishedCount);
} catch (error) {
logStatus(`kafka publish failed: ${error.message || 'unknown error'}`);
logger?.error('publish_failed', {
namespace,
topic: normalizedTopic,
pair: `${assetIn}->${assetOut}`,
details: {
raw_topic: rawTopic,
error: serializeError(error),
quote_id: envelope.payload?.quote_id,
},
});
} finally {
publishLocked = false;
}
});
ws.addEventListener('close', () => {
logStatus('near-intents disconnected; reconnecting in 2s');
logger?.warn('connection_lost', {
namespace,
details: {
reconnect_in_ms: 2_000,
},
});
setTimeout(connect, 2000);
});
ws.addEventListener('error', (err) => {
logStatus(`near-intents socket error: ${err.message || 'unknown error'}`);
logger?.error('socket_error', {
namespace,
details: {
error: serializeError(err),
},
});
});
}
startIdleHeartbeat({
label: 'near-intents',
getLastActivityAt: () => lastStatusAt,
getStatus: () => `published=${publishedCount}`,
});
connect();
}