refactor: isolate unrip project into projects folder
This commit is contained in:
commit
03ce6546a4
34 changed files with 2148 additions and 0 deletions
6
.dockerignore
Normal file
6
.dockerignore
Normal file
|
|
@ -0,0 +1,6 @@
|
|||
node_modules
|
||||
npm-debug.log
|
||||
.git
|
||||
.gitignore
|
||||
.env
|
||||
var
|
||||
39
.env.example
Normal file
39
.env.example
Normal file
|
|
@ -0,0 +1,39 @@
|
|||
# Local dev / container runtime values
|
||||
NEAR_INTENTS_API_KEY=replace_me
|
||||
NEAR_INTENTS_WS_URL=wss://solver-relay-v2.chaindefuser.com/ws
|
||||
KAFKA_BROKERS=redpanda:9092
|
||||
KAFKA_CLIENT_ID=unrip
|
||||
KAFKA_TOPIC_RAW_NEAR_INTENTS_QUOTE=raw.near_intents.quote
|
||||
KAFKA_TOPIC_NORM_SWAP_DEMAND=norm.swap_demand
|
||||
KAFKA_TOPIC_CMD_EXECUTE_TRADE=cmd.execute_trade
|
||||
KAFKA_TOPIC_EXEC_TRADE_RESULT=exec.trade_result
|
||||
KAFKA_CONSUMER_GROUP_DUMMY=dummy-reactor-v1
|
||||
KAFKA_CONSUMER_GROUP_EXECUTOR=dummy-executor-v1
|
||||
EXECUTOR_STATE_DIR=/var/lib/unrip/executor-state
|
||||
|
||||
# Repo-driven Hetzner bootstrap values live separately from the app .env.
|
||||
# Copy scripts/hetzner/bootstrap-secrets.env.example to
|
||||
# scripts/hetzner/bootstrap-secrets.env, configure non-secret values plus *_PASS
|
||||
# mappings to your pass store, then:
|
||||
# source scripts/hetzner/bootstrap-secrets.env
|
||||
# bash scripts/hetzner/bootstrap.sh
|
||||
#
|
||||
# Canonical operator flow uses `pass` for sensitive values; explicit env vars still
|
||||
# override pass-backed lookups for CI/testing.
|
||||
#
|
||||
# Expected bootstrap inputs now include:
|
||||
# - HCLOUD_TOKEN_PASS or HCLOUD_TOKEN
|
||||
# - SSH_PUBLIC_KEY_PATH
|
||||
# - PUBLIC_DOMAIN
|
||||
# - BASE_DOMAIN
|
||||
# - LETSENCRYPT_EMAIL
|
||||
# - REGISTRY_USERNAME
|
||||
# - REGISTRY_PASSWORD_PASS or REGISTRY_PASSWORD
|
||||
# - NEAR_INTENTS_API_KEY_PASS or NEAR_INTENTS_API_KEY
|
||||
# - FORGEJO_ADMIN_USERNAME
|
||||
# - FORGEJO_ADMIN_EMAIL
|
||||
# - FORGEJO_ADMIN_PASSWORD_PASS or FORGEJO_ADMIN_PASSWORD
|
||||
# - optional DNS provider creds via *_PASS or direct env vars
|
||||
#
|
||||
# Future k3s deployment should source the app values from Kubernetes Secret/ConfigMap.
|
||||
# Hetzner provisioning is workstation-driven after Terraform; cloud-init no longer clones this repo onto the node.
|
||||
10
Dockerfile
Normal file
10
Dockerfile
Normal file
|
|
@ -0,0 +1,10 @@
|
|||
FROM node:22-bookworm-slim
|
||||
WORKDIR /app
|
||||
|
||||
COPY package.json package-lock.json ./
|
||||
RUN npm ci --omit=dev
|
||||
|
||||
COPY . .
|
||||
|
||||
ENV NODE_ENV=production
|
||||
CMD ["node", "src/apps/dummy-consumer.mjs"]
|
||||
66
README.md
Normal file
66
README.md
Normal file
|
|
@ -0,0 +1,66 @@
|
|||
# unrip project
|
||||
|
||||
This directory contains the trading-system project code and project-specific deployment assets.
|
||||
It is shaped so it can later become its own repository with minimal reshuffling.
|
||||
|
||||
## Contents
|
||||
|
||||
- `src/` — application code
|
||||
- `package.json` / `package-lock.json` — Node package manifest
|
||||
- `Dockerfile` / `.dockerignore` — app container build
|
||||
- `.env.example` — local app runtime example
|
||||
- `compose.yml` — local development stack
|
||||
- `deploy/k8s/base/` — project-specific Kubernetes manifests
|
||||
- `deploy/redpanda/rpk-topics.txt` — project topic reference
|
||||
- `docs/` — project-specific design and contract docs
|
||||
|
||||
## Local development
|
||||
|
||||
```bash
|
||||
cd projects/unrip
|
||||
npm install
|
||||
cp .env.example .env
|
||||
# edit .env
|
||||
|
||||
docker compose up -d --build
|
||||
```
|
||||
|
||||
Useful commands:
|
||||
|
||||
```bash
|
||||
docker compose ps
|
||||
docker compose logs -f
|
||||
docker compose logs -f near-intents-ingest dummy-reactor dummy-executor dummy-consumer
|
||||
npm run near-intents:ingest
|
||||
npm run dummy-reactor
|
||||
npm run dummy-executor
|
||||
npm run dummy-consumer
|
||||
```
|
||||
|
||||
## App image
|
||||
|
||||
The app image is now built from this directory.
|
||||
|
||||
Examples:
|
||||
|
||||
```bash
|
||||
cd projects/unrip
|
||||
docker build -t unrip:dev .
|
||||
```
|
||||
|
||||
## Kubernetes manifests
|
||||
|
||||
Project manifests live under:
|
||||
|
||||
- `projects/unrip/deploy/k8s/base/`
|
||||
|
||||
They are consumed by the shared Hetzner overlay and bootstrap flow from the repo root.
|
||||
The shared platform remains outside this directory.
|
||||
|
||||
## Shared platform docs
|
||||
|
||||
For cluster/platform/bootstrap details, see the repo-root docs:
|
||||
- `docs/hetzner-k3s-bootstrap.md`
|
||||
- `docs/hetzner-self-hosted-ci-runbook.md`
|
||||
- `docs/k8s-observability.md`
|
||||
- `deploy/k8s/README.md`
|
||||
81
compose.yml
Normal file
81
compose.yml
Normal file
|
|
@ -0,0 +1,81 @@
|
|||
# Local/dev runtime reference. Hetzner production bootstrap now starts from Terraform + cloud-init + k3s.
|
||||
services:
|
||||
redpanda:
|
||||
image: docker.redpanda.com/redpandadata/redpanda:v24.3.9
|
||||
command:
|
||||
- redpanda
|
||||
- start
|
||||
- --overprovisioned
|
||||
- --smp
|
||||
- "1"
|
||||
- --memory
|
||||
- "1G"
|
||||
- --reserve-memory
|
||||
- "0M"
|
||||
- --node-id
|
||||
- "0"
|
||||
- --check=false
|
||||
- --kafka-addr
|
||||
- internal://0.0.0.0:9092,external://0.0.0.0:19092
|
||||
- --advertise-kafka-addr
|
||||
- internal://redpanda:9092,external://127.0.0.1:19092
|
||||
- --pandaproxy-addr
|
||||
- internal://0.0.0.0:8082
|
||||
- --advertise-pandaproxy-addr
|
||||
- internal://redpanda:8082
|
||||
ports:
|
||||
- "127.0.0.1:19092:19092"
|
||||
volumes:
|
||||
- redpanda-data:/var/lib/redpanda/data
|
||||
healthcheck:
|
||||
test: ["CMD-SHELL", "rpk cluster health | grep -q 'Healthy: *true'"]
|
||||
interval: 10s
|
||||
timeout: 5s
|
||||
retries: 10
|
||||
start_period: 20s
|
||||
|
||||
near-intents-ingest:
|
||||
build: .
|
||||
command: ["node", "src/apps/near-intents-ingest.mjs"]
|
||||
env_file:
|
||||
- .env
|
||||
depends_on:
|
||||
redpanda:
|
||||
condition: service_healthy
|
||||
restart: unless-stopped
|
||||
|
||||
dummy-reactor:
|
||||
build: .
|
||||
command: ["node", "src/apps/dummy-reactor.mjs"]
|
||||
env_file:
|
||||
- .env
|
||||
depends_on:
|
||||
redpanda:
|
||||
condition: service_healthy
|
||||
restart: unless-stopped
|
||||
|
||||
dummy-executor:
|
||||
build: .
|
||||
command: ["node", "src/apps/dummy-executor.mjs"]
|
||||
env_file:
|
||||
- .env
|
||||
depends_on:
|
||||
redpanda:
|
||||
condition: service_healthy
|
||||
restart: unless-stopped
|
||||
volumes:
|
||||
- executor-state:/var/lib/unrip/executor-state
|
||||
|
||||
dummy-consumer:
|
||||
build: .
|
||||
command: ["node", "src/apps/dummy-consumer.mjs"]
|
||||
env_file:
|
||||
- .env
|
||||
depends_on:
|
||||
redpanda:
|
||||
condition: service_healthy
|
||||
restart: unless-stopped
|
||||
|
||||
volumes:
|
||||
redpanda-data:
|
||||
executor-state:
|
||||
34
deploy/k8s/base/bootstrap-job.yaml
Normal file
34
deploy/k8s/base/bootstrap-job.yaml
Normal file
|
|
@ -0,0 +1,34 @@
|
|||
apiVersion: batch/v1
|
||||
kind: Job
|
||||
metadata:
|
||||
name: redpanda-topic-bootstrap
|
||||
namespace: unrip
|
||||
spec:
|
||||
backoffLimit: 6
|
||||
template:
|
||||
spec:
|
||||
restartPolicy: OnFailure
|
||||
containers:
|
||||
- name: bootstrap-topics
|
||||
image: docker.redpanda.com/redpandadata/redpanda:v24.3.9
|
||||
command: ["/bin/sh", "-lc"]
|
||||
args:
|
||||
- |
|
||||
set -eu
|
||||
BROKERS="redpanda.unrip.svc.cluster.local:9092"
|
||||
TOPICS="raw.near_intents.quote norm.swap_demand cmd.execute_trade exec.trade_result"
|
||||
|
||||
echo "waiting for Redpanda at ${BROKERS}"
|
||||
until rpk cluster info --brokers "$BROKERS" >/dev/null 2>&1; do
|
||||
sleep 2
|
||||
done
|
||||
|
||||
for topic in $TOPICS; do
|
||||
if rpk topic describe "$topic" --brokers "$BROKERS" >/dev/null 2>&1; then
|
||||
echo "topic already exists: $topic"
|
||||
continue
|
||||
fi
|
||||
|
||||
echo "creating topic: $topic"
|
||||
rpk topic create --brokers "$BROKERS" --partitions 1 --replicas 1 "$topic"
|
||||
done
|
||||
7
deploy/k8s/base/kustomization.yaml
Normal file
7
deploy/k8s/base/kustomization.yaml
Normal file
|
|
@ -0,0 +1,7 @@
|
|||
apiVersion: kustomize.config.k8s.io/v1beta1
|
||||
kind: Kustomization
|
||||
resources:
|
||||
- namespace.yaml
|
||||
- redpanda.yaml
|
||||
- unrip.yaml
|
||||
- bootstrap-job.yaml
|
||||
7
deploy/k8s/base/namespace.yaml
Normal file
7
deploy/k8s/base/namespace.yaml
Normal file
|
|
@ -0,0 +1,7 @@
|
|||
apiVersion: v1
|
||||
kind: Namespace
|
||||
metadata:
|
||||
name: unrip
|
||||
labels:
|
||||
app.kubernetes.io/part-of: unrip
|
||||
project.pi.io/type: project
|
||||
91
deploy/k8s/base/redpanda.yaml
Normal file
91
deploy/k8s/base/redpanda.yaml
Normal file
|
|
@ -0,0 +1,91 @@
|
|||
apiVersion: v1
|
||||
kind: PersistentVolumeClaim
|
||||
metadata:
|
||||
name: redpanda-data
|
||||
namespace: unrip
|
||||
spec:
|
||||
accessModes: ["ReadWriteOnce"]
|
||||
resources:
|
||||
requests:
|
||||
storage: 20Gi
|
||||
---
|
||||
apiVersion: apps/v1
|
||||
kind: Deployment
|
||||
metadata:
|
||||
name: redpanda
|
||||
namespace: unrip
|
||||
spec:
|
||||
replicas: 1
|
||||
selector:
|
||||
matchLabels:
|
||||
app: redpanda
|
||||
template:
|
||||
metadata:
|
||||
labels:
|
||||
app: redpanda
|
||||
app.kubernetes.io/part-of: unrip
|
||||
spec:
|
||||
containers:
|
||||
- name: redpanda
|
||||
image: docker.redpanda.com/redpandadata/redpanda:v24.3.9
|
||||
args:
|
||||
- redpanda
|
||||
- start
|
||||
- --overprovisioned
|
||||
- --smp
|
||||
- "1"
|
||||
- --memory
|
||||
- "1G"
|
||||
- --reserve-memory
|
||||
- "0M"
|
||||
- --node-id
|
||||
- "0"
|
||||
- --check=false
|
||||
- --set
|
||||
- redpanda.auto_create_topics_enabled=false
|
||||
- --kafka-addr
|
||||
- internal://0.0.0.0:9092
|
||||
- --advertise-kafka-addr
|
||||
- internal://redpanda.unrip.svc.cluster.local:9092
|
||||
- --pandaproxy-addr
|
||||
- internal://0.0.0.0:8082
|
||||
- --advertise-pandaproxy-addr
|
||||
- internal://redpanda.unrip.svc.cluster.local:8082
|
||||
ports:
|
||||
- name: kafka
|
||||
containerPort: 9092
|
||||
- name: proxy
|
||||
containerPort: 8082
|
||||
readinessProbe:
|
||||
tcpSocket:
|
||||
port: 9092
|
||||
initialDelaySeconds: 10
|
||||
periodSeconds: 10
|
||||
livenessProbe:
|
||||
tcpSocket:
|
||||
port: 9092
|
||||
initialDelaySeconds: 30
|
||||
periodSeconds: 15
|
||||
volumeMounts:
|
||||
- name: redpanda-data
|
||||
mountPath: /var/lib/redpanda/data
|
||||
volumes:
|
||||
- name: redpanda-data
|
||||
persistentVolumeClaim:
|
||||
claimName: redpanda-data
|
||||
---
|
||||
apiVersion: v1
|
||||
kind: Service
|
||||
metadata:
|
||||
name: redpanda
|
||||
namespace: unrip
|
||||
spec:
|
||||
selector:
|
||||
app: redpanda
|
||||
ports:
|
||||
- name: kafka
|
||||
port: 9092
|
||||
targetPort: 9092
|
||||
- name: proxy
|
||||
port: 8082
|
||||
targetPort: 8082
|
||||
152
deploy/k8s/base/unrip.yaml
Normal file
152
deploy/k8s/base/unrip.yaml
Normal file
|
|
@ -0,0 +1,152 @@
|
|||
apiVersion: v1
|
||||
kind: ConfigMap
|
||||
metadata:
|
||||
name: unrip-config
|
||||
namespace: unrip
|
||||
data:
|
||||
NEAR_INTENTS_WS_URL: wss://solver-relay-v2.chaindefuser.com/ws
|
||||
KAFKA_BROKERS: redpanda.unrip.svc.cluster.local:9092
|
||||
KAFKA_CLIENT_ID: unrip
|
||||
KAFKA_TOPIC_RAW_NEAR_INTENTS_QUOTE: raw.near_intents.quote
|
||||
KAFKA_TOPIC_NORM_SWAP_DEMAND: norm.swap_demand
|
||||
KAFKA_TOPIC_CMD_EXECUTE_TRADE: cmd.execute_trade
|
||||
KAFKA_TOPIC_EXEC_TRADE_RESULT: exec.trade_result
|
||||
KAFKA_CONSUMER_GROUP_DUMMY: dummy-reactor-v1
|
||||
KAFKA_CONSUMER_GROUP_EXECUTOR: dummy-executor-v1
|
||||
EXECUTOR_STATE_DIR: /var/lib/unrip/executor-state
|
||||
PROJECT_NAME: unrip
|
||||
PROJECT_NAMESPACE: unrip
|
||||
---
|
||||
apiVersion: v1
|
||||
kind: PersistentVolumeClaim
|
||||
metadata:
|
||||
name: executor-state
|
||||
namespace: unrip
|
||||
spec:
|
||||
accessModes: ["ReadWriteOnce"]
|
||||
resources:
|
||||
requests:
|
||||
storage: 5Gi
|
||||
---
|
||||
apiVersion: apps/v1
|
||||
kind: Deployment
|
||||
metadata:
|
||||
name: near-intents-ingest
|
||||
namespace: unrip
|
||||
spec:
|
||||
replicas: 1
|
||||
selector:
|
||||
matchLabels:
|
||||
app: near-intents-ingest
|
||||
template:
|
||||
metadata:
|
||||
labels:
|
||||
app: near-intents-ingest
|
||||
app.kubernetes.io/part-of: unrip
|
||||
spec:
|
||||
imagePullSecrets:
|
||||
- name: unrip-registry-creds
|
||||
containers:
|
||||
- name: app
|
||||
image: ghcr.io/example/unrip:bootstrap
|
||||
imagePullPolicy: IfNotPresent
|
||||
command: ["node", "src/apps/near-intents-ingest.mjs"]
|
||||
envFrom:
|
||||
- configMapRef:
|
||||
name: unrip-config
|
||||
- secretRef:
|
||||
name: unrip-secrets
|
||||
---
|
||||
apiVersion: apps/v1
|
||||
kind: Deployment
|
||||
metadata:
|
||||
name: dummy-reactor
|
||||
namespace: unrip
|
||||
spec:
|
||||
replicas: 1
|
||||
selector:
|
||||
matchLabels:
|
||||
app: dummy-reactor
|
||||
template:
|
||||
metadata:
|
||||
labels:
|
||||
app: dummy-reactor
|
||||
app.kubernetes.io/part-of: unrip
|
||||
spec:
|
||||
imagePullSecrets:
|
||||
- name: unrip-registry-creds
|
||||
containers:
|
||||
- name: app
|
||||
image: ghcr.io/example/unrip:bootstrap
|
||||
imagePullPolicy: IfNotPresent
|
||||
command: ["node", "src/apps/dummy-reactor.mjs"]
|
||||
envFrom:
|
||||
- configMapRef:
|
||||
name: unrip-config
|
||||
- secretRef:
|
||||
name: unrip-secrets
|
||||
---
|
||||
apiVersion: apps/v1
|
||||
kind: Deployment
|
||||
metadata:
|
||||
name: dummy-executor
|
||||
namespace: unrip
|
||||
spec:
|
||||
replicas: 1
|
||||
selector:
|
||||
matchLabels:
|
||||
app: dummy-executor
|
||||
template:
|
||||
metadata:
|
||||
labels:
|
||||
app: dummy-executor
|
||||
app.kubernetes.io/part-of: unrip
|
||||
spec:
|
||||
imagePullSecrets:
|
||||
- name: unrip-registry-creds
|
||||
containers:
|
||||
- name: app
|
||||
image: ghcr.io/example/unrip:bootstrap
|
||||
imagePullPolicy: IfNotPresent
|
||||
command: ["node", "src/apps/dummy-executor.mjs"]
|
||||
envFrom:
|
||||
- configMapRef:
|
||||
name: unrip-config
|
||||
- secretRef:
|
||||
name: unrip-secrets
|
||||
volumeMounts:
|
||||
- name: executor-state
|
||||
mountPath: /var/lib/unrip/executor-state
|
||||
volumes:
|
||||
- name: executor-state
|
||||
persistentVolumeClaim:
|
||||
claimName: executor-state
|
||||
---
|
||||
apiVersion: apps/v1
|
||||
kind: Deployment
|
||||
metadata:
|
||||
name: dummy-consumer
|
||||
namespace: unrip
|
||||
spec:
|
||||
replicas: 1
|
||||
selector:
|
||||
matchLabels:
|
||||
app: dummy-consumer
|
||||
template:
|
||||
metadata:
|
||||
labels:
|
||||
app: dummy-consumer
|
||||
app.kubernetes.io/part-of: unrip
|
||||
spec:
|
||||
imagePullSecrets:
|
||||
- name: unrip-registry-creds
|
||||
containers:
|
||||
- name: app
|
||||
image: ghcr.io/example/unrip:bootstrap
|
||||
imagePullPolicy: IfNotPresent
|
||||
command: ["node", "src/apps/dummy-consumer.mjs"]
|
||||
envFrom:
|
||||
- configMapRef:
|
||||
name: unrip-config
|
||||
- secretRef:
|
||||
name: unrip-secrets
|
||||
4
deploy/redpanda/rpk-topics.txt
Normal file
4
deploy/redpanda/rpk-topics.txt
Normal file
|
|
@ -0,0 +1,4 @@
|
|||
raw.near_intents.quote
|
||||
norm.swap_demand
|
||||
cmd.execute_trade
|
||||
exec.trade_result
|
||||
85
docs/contracts.md
Normal file
85
docs/contracts.md
Normal file
|
|
@ -0,0 +1,85 @@
|
|||
# Event contracts
|
||||
|
||||
## Envelope
|
||||
All bus messages use this envelope:
|
||||
|
||||
```json
|
||||
{
|
||||
"event_id": "string",
|
||||
"event_type": "string",
|
||||
"venue": "string",
|
||||
"source": "string|null",
|
||||
"schema_version": 1,
|
||||
"observed_at": "ISO-8601|null",
|
||||
"ingested_at": "ISO-8601",
|
||||
"payload": {},
|
||||
"raw": {}
|
||||
}
|
||||
```
|
||||
|
||||
## Topics
|
||||
Current canonical topic set:
|
||||
- `raw.near_intents.quote`
|
||||
- `norm.swap_demand`
|
||||
- `cmd.execute_trade`
|
||||
- `exec.trade_result`
|
||||
|
||||
In Kubernetes bootstrap, Redpanda topic creation is currently handled by the repo-managed bootstrap job applied with the manifest set.
|
||||
|
||||
## `raw.near_intents.quote`
|
||||
- `event_type`: `near_intents_quote_raw`
|
||||
- `payload.message`: original venue-native payload
|
||||
- `raw`: original venue-native payload
|
||||
|
||||
## `norm.swap_demand`
|
||||
- `event_type`: `swap_demand`
|
||||
- payload:
|
||||
- `quote_id`
|
||||
- `asset_in`
|
||||
- `asset_out`
|
||||
- `amount_in`
|
||||
- `amount_out`
|
||||
- `ttl_ms`
|
||||
|
||||
## `cmd.execute_trade`
|
||||
- `event_type`: `execute_trade`
|
||||
- payload:
|
||||
- `command_id`
|
||||
- `idempotency_key`
|
||||
- `execution_key`
|
||||
- `quote_id`
|
||||
- `asset_in`
|
||||
- `asset_out`
|
||||
- `amount_in`
|
||||
- `amount_out`
|
||||
- `reason`
|
||||
|
||||
## `exec.trade_result`
|
||||
- `event_type`: `trade_result`
|
||||
- payload:
|
||||
- `command_id`
|
||||
- `idempotency_key`
|
||||
- `execution_key`
|
||||
- `quote_id`
|
||||
- `status`
|
||||
- `result_code`
|
||||
- `note`
|
||||
|
||||
## Executor idempotency model
|
||||
- `command_id` is unique per trade command and currently deterministic as `cmd-${quote_id}`
|
||||
- `idempotency_key` is stable for semantic duplicate detection and currently `${venue}:${quote_id}`
|
||||
- `execution_key` is the stable partition key and currently `${venue}:${asset_in}->${asset_out}`
|
||||
- executor persists command state on durable storage before publishing a result
|
||||
- already-completed `command_id`s are skipped on replay or restart
|
||||
- if a command is seen again after a persisted `processing` state, the executor emits a recovered result path instead of blindly duplicating work
|
||||
|
||||
## Deployment and persistence implications
|
||||
These contracts are tied to deployment behavior:
|
||||
- executor duplicate suppression depends on durable persistence at `EXECUTOR_STATE_DIR`
|
||||
- local Compose mounts that path for development/runtime testing
|
||||
- the Hetzner single-node k3s path mounts persistent storage for the executor at `/var/lib/unrip/executor-state`
|
||||
- in the current single-node target, that persistence is node-backed and should be treated as required operational state
|
||||
|
||||
Operational consequence:
|
||||
- deleting the executor PVC or losing the node without migration discards idempotency history
|
||||
- that can allow already-seen commands to be treated as new after recovery
|
||||
198
docs/minimal-product.md
Normal file
198
docs/minimal-product.md
Normal file
|
|
@ -0,0 +1,198 @@
|
|||
# Minimal product: NEAR Intents demand monitor
|
||||
|
||||
## Goal
|
||||
Build the smallest useful event-driven product for crypto trading research:
|
||||
|
||||
- read **live user demand** from NEAR Intents
|
||||
- publish demand into a **central Kafka/Redpanda-compatible bus**
|
||||
- prove downstream consumption with a **dummy reactor**
|
||||
- avoid dashboards, execution, wallets, storage, auth workflows beyond the required API key, strategy code, and generic infra beyond the message bus itself
|
||||
|
||||
## Why this is the right first slice
|
||||
From the NEAR Intents docs, there are several possible data surfaces:
|
||||
|
||||
1. **Message Bus WebSocket `quote` subscription**
|
||||
- Endpoint: `wss://solver-relay-v2.chaindefuser.com/ws`
|
||||
- Real-time stream for quote requests
|
||||
- Subscription request shape:
|
||||
```json
|
||||
{
|
||||
"jsonrpc": "2.0",
|
||||
"id": 1,
|
||||
"method": "subscribe",
|
||||
"params": ["quote"]
|
||||
}
|
||||
```
|
||||
- Expected live frame shape is JSON-RPC-like but should be treated as flexible. The adapter should accept quote payloads when the useful fields appear either:
|
||||
- directly under `params`
|
||||
- directly under `result`
|
||||
- or at the top level of the message body
|
||||
- Fields of interest include:
|
||||
- `quote_id` (or equivalent request identifier)
|
||||
- `defuse_asset_identifier_in`
|
||||
- `defuse_asset_identifier_out`
|
||||
- `exact_amount_in` or `exact_amount_out`
|
||||
- `min_deadline_ms`
|
||||
- Subscription acknowledgements may also vary. They may arrive as an `id`-matched JSON-RPC response with a simple `result`, a structured `result`, or other non-quote control frame before the first quote event.
|
||||
- This is the closest public signal to **current demand**.
|
||||
|
||||
2. **Message Bus JSON-RPC `publish_intent` / `get_status`**
|
||||
- Endpoint: `https://solver-relay-v2.chaindefuser.com/rpc`
|
||||
- Useful for posting intents or checking a known `intent_hash`
|
||||
- Not a public firehose of all intents.
|
||||
|
||||
3. **Explorer API `/api/v0/transactions`**
|
||||
- Historical and analytics friendly
|
||||
- Requires JWT auth
|
||||
- Better for history, not best for a minimal live monitor
|
||||
|
||||
4. **Verifier contract intent payloads**
|
||||
- The on-chain swap expression is usually `token_diff`
|
||||
- Important for understanding settlement semantics
|
||||
- Not the easiest first live intake path for a lean bus-first system
|
||||
|
||||
## Product decision
|
||||
The minimal product should monitor **WebSocket `quote` events** and route them through a bus-first runtime.
|
||||
|
||||
### Why
|
||||
- closest live signal to user demand
|
||||
- directly reflects what users are requesting from solvers
|
||||
- enough to answer the first trading question: **what assets are being requested right now?**
|
||||
- decouples venue intake from downstream analysis through Kafka-compatible topics
|
||||
|
||||
### Important implementation note
|
||||
Current docs for the market-maker quickstart and live endpoint behavior indicate the Message Bus requires a **partner API key / JWT** in the `Authorization: Bearer ...` header.
|
||||
That means the best path is still the quote stream, but live operation is partner-gated.
|
||||
|
||||
### Important caveat
|
||||
A `quote` event is **pre-trade demand**, not guaranteed execution.
|
||||
That is fine for v0. The purpose is demand sensing, not settlement accounting.
|
||||
|
||||
## Runtime shape
|
||||
|
||||
```text
|
||||
NEAR Intents websocket
|
||||
|
|
||||
v
|
||||
src/apps/near-intents-ingest.mjs
|
||||
|
|
||||
+--> raw.near_intents.quote
|
||||
|
|
||||
+--> norm.swap_demand
|
||||
|
|
||||
v
|
||||
src/apps/dummy-consumer.mjs
|
||||
```
|
||||
|
||||
### Runtime contracts
|
||||
|
||||
#### Ingest app
|
||||
`src/apps/near-intents-ingest.mjs`:
|
||||
- loads env
|
||||
- parses optional `--pair 'asset_a->asset_b'`
|
||||
- starts the NEAR Intents websocket adapter
|
||||
- writes raw and normalized events to the configured broker
|
||||
|
||||
#### Dummy consumer
|
||||
`src/apps/dummy-consumer.mjs`:
|
||||
- subscribes to `norm.swap_demand`
|
||||
- logs observed pair and quote id
|
||||
- exists only to prove a downstream consumer contract
|
||||
|
||||
#### Bus config
|
||||
Default env-driven topics and group ids:
|
||||
- `KAFKA_TOPIC_RAW_NEAR_INTENTS_QUOTE=raw.near_intents.quote`
|
||||
- `KAFKA_TOPIC_NORM_SWAP_DEMAND=norm.swap_demand`
|
||||
- `KAFKA_CONSUMER_GROUP_DUMMY=dummy-reactor-v1`
|
||||
|
||||
Redpanda is a valid runtime target because the transport is Kafka-compatible.
|
||||
|
||||
## Internal model
|
||||
Normalize each quote event into a thin bus envelope:
|
||||
|
||||
Top-level envelope fields:
|
||||
- `venue`
|
||||
- `source`
|
||||
- `type`
|
||||
- `eventId`
|
||||
- `occurredAt`
|
||||
- `ingestedAt`
|
||||
- `assetIn`
|
||||
- `assetOut`
|
||||
- `raw`
|
||||
- `quote`
|
||||
|
||||
Nested `quote` fields:
|
||||
- `quoteId`
|
||||
- `assetIn`
|
||||
- `assetOut`
|
||||
- `amountIn`
|
||||
- `amountOut`
|
||||
- `ttlMs`
|
||||
|
||||
Field extraction must remain tolerant to known upstream aliases, and normalization should continue to operate on the merged `metadata + data` payload shape from the Message Bus event.
|
||||
The live adapter now intentionally accepts quote-like payloads from `params`, `result`, or the top-level message body, but only processes frames that actually look like quote data. Subscription acknowledgements and unrelated control frames should still be ignored.
|
||||
|
||||
## Filtering
|
||||
The ingest runtime supports an optional exact-pair filter:
|
||||
|
||||
```bash
|
||||
npm run near-intents:ingest -- --pair 'asset_a->asset_b'
|
||||
```
|
||||
|
||||
The filter is direction-agnostic, so the reversed asset order is also accepted.
|
||||
|
||||
## Scope boundaries
|
||||
### Must do
|
||||
- connect to the websocket
|
||||
- subscribe to `quote` and tolerate control frames
|
||||
- normalize quote events into one compact model
|
||||
- publish raw and normalized events to Kafka/Redpanda-compatible topics
|
||||
- allow a downstream consumer to react to normalized events
|
||||
- reconnect automatically on disconnect
|
||||
- document `npm` and `node` entrypoints
|
||||
|
||||
### Must not do
|
||||
- Python packaging or CLI guidance
|
||||
- TUI-specific product requirements
|
||||
- charts
|
||||
- account details
|
||||
- pnl
|
||||
- routing internals
|
||||
- market making controls
|
||||
- execution buttons
|
||||
- config panels
|
||||
- speculative infra beyond the current bus and dummy consumer
|
||||
|
||||
## Path to success
|
||||
1. Connect to WebSocket
|
||||
2. Subscribe to `quote`
|
||||
3. Normalize incoming events into one compact model
|
||||
4. Publish raw envelopes to `raw.near_intents.quote`
|
||||
5. Publish normalized envelopes to `norm.swap_demand`
|
||||
6. Start a dummy consumer on the normalized topic
|
||||
7. Reconnect automatically on disconnect
|
||||
8. Only after this works, consider:
|
||||
- `quote_status`-specific downstream handling
|
||||
- historical replay via Explorer API
|
||||
- token metadata enrichment
|
||||
- filtering and alerts beyond `--pair`
|
||||
|
||||
## Packaging alignment
|
||||
Current repository packaging and usage should stay aligned around the JavaScript runtime entrypoints:
|
||||
|
||||
- package scripts:
|
||||
- `npm run near-intents:ingest`
|
||||
- `npm run dummy-consumer`
|
||||
- `npm start` as a compatibility wrapper
|
||||
- direct app entrypoints:
|
||||
- `node src/apps/near-intents-ingest.mjs`
|
||||
- `node src/apps/dummy-consumer.mjs`
|
||||
|
||||
Documentation should treat the npm scripts and `src/apps/*` node entrypoints as canonical. Older single-file and Python/TUI instructions should remain removed to avoid runtime confusion.
|
||||
|
||||
## Sources
|
||||
- NEAR Intents Message Bus WebSocket docs: `subscribe` with `quote` / `quote_status`
|
||||
- NEAR Intents Message Bus RPC docs: `quote`, `publish_intent`, `get_status`
|
||||
- Verifier contract docs: `token_diff` intent type
|
||||
- Explorer API OpenAPI: authenticated historical transactions
|
||||
383
docs/next-session-architecture.md
Normal file
383
docs/next-session-architecture.md
Normal file
|
|
@ -0,0 +1,383 @@
|
|||
# Trading System Architecture Notes for Next Session
|
||||
|
||||
## Objective
|
||||
Build the first real version of the trading system as an event-driven, multi-service architecture.
|
||||
|
||||
Current implemented seed:
|
||||
- NEAR Intents ingest in Node.js
|
||||
- Kafka-compatible bus usage via `kafkajs`
|
||||
- dummy reactor / executor / result consumer loop
|
||||
|
||||
Next session should continue from this architecture, not revert to a monolith, local-only script, or TUI.
|
||||
|
||||
---
|
||||
|
||||
## Core Architecture
|
||||
All components are independent services.
|
||||
They communicate only through a central Kafka-compatible bus (Redpanda first, Kafka-compatible by design).
|
||||
|
||||
### Service classes
|
||||
- venue ingestors
|
||||
- normalizers
|
||||
- reactors / decision engines
|
||||
- executors
|
||||
- downstream consumers / monitors / archivers / replay tools
|
||||
|
||||
### Service communication rule
|
||||
No direct service-to-service calls for core trading flow.
|
||||
Use bus topics only.
|
||||
|
||||
---
|
||||
|
||||
## Venue-Oriented Structure
|
||||
The system should be organized by venue.
|
||||
Each venue can have different:
|
||||
- ingest/feed mechanics
|
||||
- normalization logic
|
||||
- execution mechanics
|
||||
|
||||
### Per-venue responsibilities
|
||||
- `ingest` = venue-native intake
|
||||
- `normalize` = convert venue-native payload into canonical internal event
|
||||
- `execute` = venue-specific action logic
|
||||
|
||||
Planned shape:
|
||||
```text
|
||||
src/
|
||||
apps/
|
||||
bus/
|
||||
core/
|
||||
venues/
|
||||
near-intents/
|
||||
ingest
|
||||
normalize
|
||||
execute
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Bus Choice
|
||||
Use **Redpanda** first, but stay fully **Kafka-compatible**.
|
||||
|
||||
### Reason
|
||||
Requirements:
|
||||
- high throughput
|
||||
- low latency
|
||||
- retention
|
||||
- replay
|
||||
- multiple producers/consumers
|
||||
- independent services
|
||||
- future scale-out
|
||||
- multi-language compatibility
|
||||
|
||||
### Constraint
|
||||
Do not use broker-specific features that make migration to Kafka difficult.
|
||||
Use standard Kafka clients and semantics.
|
||||
|
||||
---
|
||||
|
||||
## Data Model Principles
|
||||
Kafka/Redpanda is the operational event backbone.
|
||||
|
||||
### Event model rules
|
||||
- append-only
|
||||
- immutable events
|
||||
- versioned schemas
|
||||
- raw and normalized events both preserved
|
||||
|
||||
### Every event should include
|
||||
- `event_id`
|
||||
- `event_type`
|
||||
- `venue`
|
||||
- `observed_at` / `ingested_at`
|
||||
- `schema_version`
|
||||
- `payload`
|
||||
- optionally raw/original payload where appropriate
|
||||
|
||||
### Raw vs normalized
|
||||
Keep both.
|
||||
- raw topics = exact venue-native source truth
|
||||
- normalized topics = canonical research/trading inputs
|
||||
|
||||
This is required for:
|
||||
- replay
|
||||
- debugging
|
||||
- future backtesting
|
||||
- future Spark/batch processing
|
||||
|
||||
---
|
||||
|
||||
## Current/Planned Topic Flow
|
||||
Minimal 3-stage pipeline:
|
||||
|
||||
1. ingest publishes normalized demand
|
||||
2. reactor publishes trade command
|
||||
3. executor publishes trade result
|
||||
|
||||
### Topic classes
|
||||
- `raw.*` = raw venue-native events
|
||||
- `norm.*` = canonical normalized market events
|
||||
- `cmd.*` = execution commands
|
||||
- `exec.*` = execution outcomes
|
||||
- later `signal.*` if needed for reactor outputs before command stage
|
||||
|
||||
### Current minimal topics
|
||||
- `norm.swap_demand`
|
||||
- `cmd.execute_trade`
|
||||
- `exec.trade_result`
|
||||
|
||||
### NEAR Intents
|
||||
NEAR Intents source currently feeds quote-demand style events from solver-bus websocket.
|
||||
This is a venue ingest source, not the whole trading system.
|
||||
|
||||
---
|
||||
|
||||
## Execution Safety / Zero Downtime Requirements
|
||||
This is critical.
|
||||
|
||||
### Constraint
|
||||
Multiple executors must never duplicate the same trade/action during deploys, restarts, or rebalances.
|
||||
|
||||
### Must-have rules
|
||||
1. Every execution command must carry a unique `command_id`
|
||||
2. Commands must include deterministic idempotency information
|
||||
3. Executors must be idempotent
|
||||
4. Executors must belong to a consumer group per executor role
|
||||
5. Commands should be partitioned by a stable execution key where ordering matters
|
||||
6. Executor state must be persisted durably enough to detect duplicate command execution
|
||||
|
||||
### Kafka consumer groups are not sufficient alone
|
||||
They help assign work, but they do not guarantee no duplicate processing under restart/rebalance conditions.
|
||||
Idempotency is still required.
|
||||
|
||||
### Rolling updates / zero downtime
|
||||
Executors must support:
|
||||
- graceful shutdown
|
||||
- stop taking new work before exit
|
||||
- finish or safely recover in-flight work
|
||||
- commit offsets only after safe execution state transition
|
||||
|
||||
### Persistence implication
|
||||
Executor idempotency state is not optional metadata.
|
||||
It is operational state that must survive pod restarts.
|
||||
|
||||
Current single-node k3s direction:
|
||||
- executor state lives at `/var/lib/unrip/executor-state`
|
||||
- Kubernetes mounts that path through persistent storage
|
||||
- the Hetzner single-node overlay currently targets k3s `local-path` storage
|
||||
- node loss without storage migration means duplicate-suppression history is lost
|
||||
|
||||
---
|
||||
|
||||
## Deployment Target
|
||||
### First deployment phase
|
||||
- single machine on Hetzner
|
||||
- but still multiple independent services
|
||||
- no architecture shortcuts that prevent future clustering
|
||||
|
||||
### Future target
|
||||
- split across multiple machines
|
||||
- cluster capable
|
||||
- fault tolerant
|
||||
- multi-node
|
||||
- zero-downtime deploys
|
||||
|
||||
### Deployment rules from day 1
|
||||
- every component is a separate container/service
|
||||
- all config via env/config files
|
||||
- communication over network/bus only
|
||||
- persistent components use mounted volumes/PVCs
|
||||
- no manual SSH-based operational workflow
|
||||
|
||||
---
|
||||
|
||||
## Infrastructure / Ops Direction
|
||||
Target environment:
|
||||
- Hetzner
|
||||
- self-hosted CI/CD
|
||||
- provisioning by code
|
||||
- no GitHub dependency
|
||||
|
||||
### Desired stack direction
|
||||
- Terraform for Hetzner provisioning
|
||||
- Kubernetes-oriented target from the start
|
||||
- self-hosted Git + CI/CD
|
||||
- Kafka-compatible broker
|
||||
- object storage later for long-term archived event history
|
||||
|
||||
### Single-node first, future cluster later
|
||||
The first version may run on one machine, but deployment structure should already match a future distributed system.
|
||||
|
||||
### Current canonical operator path
|
||||
The repo now documents and partially implements this path as the primary deployment workflow:
|
||||
|
||||
#### Phase 0: workstation bootstrap
|
||||
1. A local operator workstation prepares bootstrap secrets in `scripts/hetzner/bootstrap-secrets.env`.
|
||||
2. The operator runs `bash scripts/hetzner/bootstrap.sh`.
|
||||
3. Terraform provisions the server, firewall, network, and cloud-init user-data.
|
||||
4. cloud-init installs k3s automatically and prepares persistence directories plus bootstrap artifacts.
|
||||
5. The workstation waits for the public k3s API endpoint to report ready.
|
||||
6. The workstation writes `.state/hetzner/kubeconfig.yaml`.
|
||||
7. The workstation injects initial Kubernetes Secrets for app and Forgejo bootstrap.
|
||||
8. The workstation applies repo-managed Kubernetes manifests under `deploy/k8s/`.
|
||||
9. The workstation performs the first image/bootstrap delivery attempt for the app workloads.
|
||||
10. The workstation verifies rollout status.
|
||||
|
||||
#### Phase 1: self-hosted handoff
|
||||
1. Forgejo becomes reachable in-cluster.
|
||||
2. The operator completes initial Forgejo admin/repo setup.
|
||||
3. This repo is pushed or mirrored into Forgejo.
|
||||
4. The Forgejo runner becomes the routine app deployment mechanism.
|
||||
5. Terraform remains the infra mutation entrypoint unless further automated later.
|
||||
|
||||
### Failure-recovery expectation
|
||||
The bootstrap path must be rerunnable from the workstation.
|
||||
Docs should keep treating recovery as:
|
||||
- fix local secrets/inputs
|
||||
- rerun the bootstrap script
|
||||
- inspect the cluster with the generated kubeconfig
|
||||
- destroy/recreate infra with `scripts/hetzner/destroy.sh` only when required
|
||||
|
||||
### Current repo-state caveats
|
||||
The direction is clear, but the implementation is still mid-transition:
|
||||
- the bootstrap script currently applies `deploy/k8s/base` directly rather than the Hetzner overlay
|
||||
- kubeconfig/auth handling is not yet fully production-hardened
|
||||
- first image delivery is still a bootstrap workaround rather than a final registry-native CI path
|
||||
- Forgejo admin bootstrap, repo creation, and Actions configuration still require operator steps
|
||||
- local Compose remains in the repo for development/testing, not as the canonical production path
|
||||
|
||||
### Minimal repo layout target
|
||||
```text
|
||||
deploy/
|
||||
hetzner/
|
||||
README.md
|
||||
k8s/
|
||||
base/
|
||||
overlays/
|
||||
hetzner-single-node/
|
||||
infra/
|
||||
terraform/
|
||||
hetzner/
|
||||
```
|
||||
|
||||
Guidelines:
|
||||
- `infra/terraform/hetzner/` owns VM, firewall, networking, and cloud-init rendering
|
||||
- `deploy/k8s/` owns Kubernetes-native manifests and overlays
|
||||
- app runtime manifests should remain Kubernetes-native so they can later move from single-node k3s to a larger cluster with minimal rewrite
|
||||
- secret material must not live in git in plaintext; bootstrap docs should describe workstation-driven injection or generated secret references
|
||||
|
||||
---
|
||||
|
||||
## Local Development / Testing Direction
|
||||
Do not assume manual multi-terminal operation long term.
|
||||
|
||||
### Requirement
|
||||
Need an orchestrated local/dev runtime.
|
||||
|
||||
### Local dev should preserve real boundaries
|
||||
- separate services
|
||||
- broker present
|
||||
- env/config driven
|
||||
- same event flow as production
|
||||
|
||||
### Current local/dev answer
|
||||
Compose is still acceptable for:
|
||||
- developer laptops
|
||||
- fast local iteration
|
||||
- debugging event flow
|
||||
- validating container boundaries before Kubernetes rollout
|
||||
|
||||
But Compose should remain explicitly secondary to the repo-driven Hetzner + k3s path for production operations.
|
||||
|
||||
### Testing layers
|
||||
1. unit tests for normalizers / schema logic / helpers
|
||||
2. integration tests against Kafka-compatible broker
|
||||
3. replay/simulation tests using retained event streams
|
||||
|
||||
---
|
||||
|
||||
## Spark Readiness
|
||||
Do not add Spark now.
|
||||
But keep the system Spark-compatible later by:
|
||||
- preserving raw events
|
||||
- preserving normalized events
|
||||
- using immutable append-only event streams
|
||||
- versioning schemas
|
||||
- separating operational event log from future analytical processing
|
||||
|
||||
Spark later would be for:
|
||||
- large-scale backtesting
|
||||
- feature generation
|
||||
- archive processing
|
||||
- multi-venue analytics
|
||||
|
||||
---
|
||||
|
||||
## Immediate Next Engineering Tasks
|
||||
Next session should focus on the following.
|
||||
|
||||
### 1. Clean current repo structure
|
||||
Remove duplicate/legacy paths and keep one canonical structure only.
|
||||
|
||||
### 2. Keep/complete the 3-stage loop
|
||||
- NEAR Intents ingest -> `norm.swap_demand`
|
||||
- dummy reactor -> `cmd.execute_trade`
|
||||
- dummy executor -> `exec.trade_result`
|
||||
- downstream result consumer
|
||||
|
||||
### 3. Define canonical schemas
|
||||
Define concrete event schemas for:
|
||||
- normalized swap demand
|
||||
- execute trade command
|
||||
- trade result
|
||||
|
||||
### 4. Define executor idempotency model
|
||||
Specify:
|
||||
- `command_id`
|
||||
- idempotency key rules
|
||||
- execution state transition rules
|
||||
- duplicate handling rules
|
||||
|
||||
### 5. Move toward production-shaped deployment
|
||||
Design for:
|
||||
- one service per container
|
||||
- single-node deployment first
|
||||
- future multi-node split without app rewrite
|
||||
|
||||
### 6. Harden provisioning/deployment path
|
||||
Next infra work should continue improving:
|
||||
- Hetzner provisioning by code
|
||||
- workstation bootstrap rerunnability
|
||||
- self-hosted CI/CD handoff
|
||||
- registry-native image delivery
|
||||
- overlay convergence for the Hetzner single-node target
|
||||
|
||||
Status update:
|
||||
- minimal Terraform exists under `infra/terraform/hetzner`
|
||||
- first boot is cloud-init driven and installs k3s automatically
|
||||
- bootstrap now starts from a local operator workstation rather than manual host login
|
||||
- Kubernetes assets exist under `deploy/k8s`
|
||||
- executor persistence boundaries are explicit for single-node k3s
|
||||
- self-hosted CI handoff is documented, but still requires follow-up hardening
|
||||
|
||||
---
|
||||
|
||||
## Non-Goals for Next Session
|
||||
- no dashboards
|
||||
- no UI/TUI
|
||||
- no monolith convenience architecture
|
||||
- no SQLite-first system of record
|
||||
- no direct coupling between ingest, decision, and execution
|
||||
- no temporary local-only shortcuts that block future cluster deployment
|
||||
|
||||
---
|
||||
|
||||
## Guiding Principle
|
||||
Build the single-node first version as if it is already a distributed system:
|
||||
- separate services
|
||||
- durable event bus
|
||||
- replayable events
|
||||
- explicit contracts
|
||||
- idempotent execution
|
||||
- production-compatible deployment boundaries
|
||||
- bootstrapable from scratch without manual SSH-based host setup
|
||||
144
docs/spec.md
Normal file
144
docs/spec.md
Normal file
|
|
@ -0,0 +1,144 @@
|
|||
# NEAR Intents demand monitor: bus-first source plan
|
||||
|
||||
## Why websocket quote requests are still the MVP demand signal
|
||||
|
||||
Public solver quote requests remain the closest thing to live demand because they appear when a user or integration asks the network for executable pricing. They are still the right upstream source, but the runtime architecture is now bus-first rather than terminal-first.
|
||||
|
||||
Why this source wins for a first monitor:
|
||||
|
||||
- **Most real-time:** quote requests arrive before settlement and usually before a completed trade is visible anywhere else.
|
||||
- **Closer to intent formation:** they reflect active user demand, not just historical outcomes.
|
||||
- **Operationally simple:** a single websocket feed can drive the ingest side without indexing chains, scraping dashboards, or correlating multiple APIs.
|
||||
- **Good enough for ranking demand:** even if quotes do not always become fills, repeated quote flow is still a strong indicator of what users are currently trying to do.
|
||||
|
||||
## Tradeoffs vs other sources
|
||||
|
||||
### Solver websocket quote requests
|
||||
|
||||
Pros:
|
||||
- lowest-latency view of current demand
|
||||
- directly tied to solver workflow
|
||||
- suitable for a streaming ingest adapter
|
||||
- can be normalized into pair, size, and frequency metrics immediately
|
||||
|
||||
Cons:
|
||||
- quote requests are **interest**, not guaranteed executed volume
|
||||
- public access may still be rate-limited, undocumented, or require credentials depending on environment
|
||||
- schema and availability may change faster than user-facing products
|
||||
|
||||
### Explorer
|
||||
|
||||
Explorer (`https://explorer.near-intents.org/`) is useful for validation and historical inspection, but it is usually a worse primary source for an MVP demand monitor.
|
||||
|
||||
Tradeoffs:
|
||||
- better for human inspection than low-latency streaming
|
||||
- likely shows processed/published activity instead of raw quote demand
|
||||
- may lag the actual request path
|
||||
- less convenient as a machine-first demand feed
|
||||
|
||||
### Status dashboard / published status
|
||||
|
||||
Status (`https://status.near-intents.org/posts/dashboard`) is useful for system health, not demand discovery.
|
||||
|
||||
Tradeoffs:
|
||||
- tells us whether the platform is up, degraded, or incident-affected
|
||||
- does **not** represent per-request user demand
|
||||
- coarse and aggregated by design
|
||||
|
||||
### Published intents / settled outcomes
|
||||
|
||||
Published or completed intents are higher-confidence signals, but lower-fidelity for immediate demand sensing.
|
||||
|
||||
Tradeoffs:
|
||||
- stronger evidence of actual execution
|
||||
- misses abandoned demand and pre-trade discovery
|
||||
- arrives later than quote traffic
|
||||
- may require more indexing and entity correlation work
|
||||
|
||||
## Runtime architecture
|
||||
|
||||
```text
|
||||
solver websocket quote stream
|
||||
|
|
||||
v
|
||||
src/apps/near-intents-ingest.mjs
|
||||
|
|
||||
+--> raw.near_intents.quote
|
||||
|
|
||||
+--> norm.swap_demand
|
||||
|
|
||||
v
|
||||
src/apps/dummy-consumer.mjs
|
||||
```
|
||||
|
||||
### Responsibilities
|
||||
|
||||
#### `src/apps/near-intents-ingest.mjs`
|
||||
- loads env from `.env`
|
||||
- parses optional `--pair 'asset_a->asset_b'`
|
||||
- connects to the NEAR Intents websocket
|
||||
- subscribes to `quote` and `quote_status`
|
||||
- publishes raw venue envelopes to `raw.near_intents.quote`
|
||||
- publishes normalized swap-demand envelopes to `norm.swap_demand`
|
||||
|
||||
#### `src/apps/dummy-consumer.mjs`
|
||||
- consumes normalized events from `norm.swap_demand`
|
||||
- logs observed demand as a placeholder for later strategy logic
|
||||
|
||||
#### Kafka / Redpanda layer
|
||||
- broker endpoint comes from `KAFKA_BROKERS`
|
||||
- Redpanda is supported through Kafka protocol compatibility
|
||||
- topics are configurable via env and default to:
|
||||
- `raw.near_intents.quote`
|
||||
- `norm.swap_demand`
|
||||
|
||||
## Assumptions and limitations
|
||||
|
||||
- The websocket is the best available **MVP** source, not a perfect truth source.
|
||||
- Demand is approximated by quote requests, not by settled intents.
|
||||
- Live endpoints require auth in practice; `NEAR_INTENTS_API_KEY` must be provided.
|
||||
- Request schemas may evolve; the parser should tolerate missing fields.
|
||||
- The current product is intentionally minimal: no database, no backfill, no reconciliation against chain state.
|
||||
- The dummy consumer proves the decoupled flow but is not a strategy engine.
|
||||
|
||||
## Run instructions
|
||||
|
||||
Install:
|
||||
|
||||
```bash
|
||||
npm install
|
||||
```
|
||||
|
||||
Start ingest:
|
||||
|
||||
```bash
|
||||
npm run near-intents:ingest
|
||||
```
|
||||
|
||||
Direct node entrypoint:
|
||||
|
||||
```bash
|
||||
node src/apps/near-intents-ingest.mjs
|
||||
```
|
||||
|
||||
Run with exact-pair filtering:
|
||||
|
||||
```bash
|
||||
npm run near-intents:ingest -- --pair 'asset_a->asset_b'
|
||||
```
|
||||
|
||||
Start dummy consumer:
|
||||
|
||||
```bash
|
||||
npm run dummy-consumer
|
||||
```
|
||||
|
||||
Direct node entrypoint:
|
||||
|
||||
```bash
|
||||
node src/apps/dummy-consumer.mjs
|
||||
```
|
||||
|
||||
## Decision summary
|
||||
|
||||
For an MVP whose job is to answer "what are users asking for right now?", solver websocket quote requests are still the best first source because they are the most direct, timely, and stream-friendly signal. The implementation now routes that signal through Kafka/Redpanda topics so ingestion and downstream reaction can evolve independently.
|
||||
1
index.mjs
Normal file
1
index.mjs
Normal file
|
|
@ -0,0 +1 @@
|
|||
import './src/apps/near-intents-ingest.mjs';
|
||||
24
package-lock.json
generated
Normal file
24
package-lock.json
generated
Normal file
|
|
@ -0,0 +1,24 @@
|
|||
{
|
||||
"name": "near-intents-monitor-poc",
|
||||
"version": "0.1.0",
|
||||
"lockfileVersion": 3,
|
||||
"requires": true,
|
||||
"packages": {
|
||||
"": {
|
||||
"name": "near-intents-monitor-poc",
|
||||
"version": "0.1.0",
|
||||
"dependencies": {
|
||||
"kafkajs": "^2.2.4"
|
||||
}
|
||||
},
|
||||
"node_modules/kafkajs": {
|
||||
"version": "2.2.4",
|
||||
"resolved": "https://registry.npmjs.org/kafkajs/-/kafkajs-2.2.4.tgz",
|
||||
"integrity": "sha512-j/YeapB1vfPT2iOIUn/vxdyKEuhuY2PxMBvf5JWux6iSaukAccrMtXEY/Lb7OvavDhOWME589bpLrEdnVHjfjA==",
|
||||
"license": "MIT",
|
||||
"engines": {
|
||||
"node": ">=14.0.0"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
16
package.json
Normal file
16
package.json
Normal file
|
|
@ -0,0 +1,16 @@
|
|||
{
|
||||
"name": "near-intents-monitor-poc",
|
||||
"version": "0.1.0",
|
||||
"private": true,
|
||||
"type": "module",
|
||||
"scripts": {
|
||||
"near-intents:ingest": "node src/apps/near-intents-ingest.mjs",
|
||||
"dummy-reactor": "node src/apps/dummy-reactor.mjs",
|
||||
"dummy-executor": "node src/apps/dummy-executor.mjs",
|
||||
"dummy-consumer": "node src/apps/dummy-consumer.mjs",
|
||||
"start": "node index.mjs"
|
||||
},
|
||||
"dependencies": {
|
||||
"kafkajs": "^2.2.4"
|
||||
}
|
||||
}
|
||||
42
src/apps/dummy-consumer.mjs
Normal file
42
src/apps/dummy-consumer.mjs
Normal file
|
|
@ -0,0 +1,42 @@
|
|||
import process from 'node:process';
|
||||
|
||||
import { createConsumer } from '../bus/kafka/consumer.mjs';
|
||||
import { logStatus } from '../core/log.mjs';
|
||||
import { parseEventMessage } from '../core/event-envelope.mjs';
|
||||
import { assertTradeResult } from '../core/schemas.mjs';
|
||||
import { loadConfig } from '../lib/config.mjs';
|
||||
|
||||
const config = loadConfig();
|
||||
|
||||
const consumer = await createConsumer({
|
||||
groupId: `${config.kafkaConsumerGroupExecutor}-results-view`,
|
||||
brokers: config.kafkaBrokers,
|
||||
clientId: config.kafkaClientId,
|
||||
});
|
||||
await consumer.subscribe({ topic: config.kafkaTopicExecTradeResult, fromBeginning: false });
|
||||
logStatus(`result consumer subscribed to ${config.kafkaTopicExecTradeResult}`);
|
||||
|
||||
process.on('SIGINT', async () => {
|
||||
await consumer.disconnect();
|
||||
process.exit(0);
|
||||
});
|
||||
process.on('SIGTERM', async () => {
|
||||
await consumer.disconnect();
|
||||
process.exit(0);
|
||||
});
|
||||
|
||||
await consumer.run({
|
||||
eachMessage: async ({ message }) => {
|
||||
if (!message.value) return;
|
||||
let event;
|
||||
try {
|
||||
event = parseEventMessage(message.value.toString());
|
||||
} catch {
|
||||
logStatus('result consumer received non-JSON message; skipping');
|
||||
return;
|
||||
}
|
||||
assertTradeResult(event);
|
||||
const payload = event.payload;
|
||||
console.log(`[result] command_id=${payload.command_id} quote_id=${payload.quote_id} status=${payload.status} result_code=${payload.result_code || 'n/a'}`);
|
||||
},
|
||||
});
|
||||
93
src/apps/dummy-executor.mjs
Normal file
93
src/apps/dummy-executor.mjs
Normal file
|
|
@ -0,0 +1,93 @@
|
|||
import process from 'node:process';
|
||||
|
||||
import { createConsumer } from '../bus/kafka/consumer.mjs';
|
||||
import { createProducer } from '../bus/kafka/producer.mjs';
|
||||
import { buildEventEnvelope, parseEventMessage } from '../core/event-envelope.mjs';
|
||||
import { createExecutorStateStore } from '../core/executor-state-store.mjs';
|
||||
import { logStatus } from '../core/log.mjs';
|
||||
import { assertExecuteTradeCommand, assertTradeResult } from '../core/schemas.mjs';
|
||||
import { loadConfig } from '../lib/config.mjs';
|
||||
|
||||
const config = loadConfig();
|
||||
|
||||
const consumer = await createConsumer({
|
||||
groupId: config.kafkaConsumerGroupExecutor,
|
||||
brokers: config.kafkaBrokers,
|
||||
clientId: config.kafkaClientId,
|
||||
});
|
||||
const producer = await createProducer({
|
||||
brokers: config.kafkaBrokers,
|
||||
clientId: config.kafkaClientId,
|
||||
});
|
||||
|
||||
const stateStore = createExecutorStateStore({ stateDir: config.executorStateDir });
|
||||
|
||||
await consumer.subscribe({ topic: config.kafkaTopicCmdExecuteTrade, fromBeginning: false });
|
||||
logStatus(`dummy executor subscribed to ${config.kafkaTopicCmdExecuteTrade} as ${config.kafkaConsumerGroupExecutor}`);
|
||||
logStatus(`dummy executor will publish results to ${config.kafkaTopicExecTradeResult}; state_dir=${config.executorStateDir}`);
|
||||
|
||||
async function shutdown() {
|
||||
await consumer.disconnect();
|
||||
await producer.disconnect();
|
||||
process.exit(0);
|
||||
}
|
||||
process.on('SIGINT', shutdown);
|
||||
process.on('SIGTERM', shutdown);
|
||||
|
||||
await consumer.run({
|
||||
eachMessage: async ({ message }) => {
|
||||
if (!message.value) return;
|
||||
|
||||
let event;
|
||||
try {
|
||||
event = parseEventMessage(message.value.toString());
|
||||
} catch {
|
||||
logStatus('dummy executor received non-JSON message; skipping');
|
||||
return;
|
||||
}
|
||||
|
||||
assertExecuteTradeCommand(event);
|
||||
|
||||
const payload = event.payload;
|
||||
const commandId = payload.command_id;
|
||||
const existing = stateStore.get(commandId);
|
||||
if (existing?.status === 'completed') {
|
||||
logStatus(`dummy executor skipping duplicate command_id=${commandId}`);
|
||||
return;
|
||||
}
|
||||
|
||||
stateStore.markProcessing(commandId, {
|
||||
idempotency_key: payload.idempotency_key,
|
||||
execution_key: payload.execution_key,
|
||||
quote_id: payload.quote_id,
|
||||
});
|
||||
|
||||
const pair = `${payload.asset_in} -> ${payload.asset_out}`;
|
||||
const result = buildEventEnvelope({
|
||||
source: 'dummy-executor',
|
||||
venue: event.venue || 'near-intents',
|
||||
eventType: 'trade_result',
|
||||
eventId: `exec-${commandId}`,
|
||||
observedAt: event.observed_at,
|
||||
payload: {
|
||||
command_id: commandId,
|
||||
idempotency_key: payload.idempotency_key,
|
||||
execution_key: payload.execution_key,
|
||||
quote_id: payload.quote_id,
|
||||
status: 'simulated_sent',
|
||||
result_code: existing?.status === 'processing' ? 'recovered_inflight' : 'sent',
|
||||
note: 'dummy executor placeholder result',
|
||||
},
|
||||
});
|
||||
assertTradeResult(result);
|
||||
|
||||
await producer.sendJson(config.kafkaTopicExecTradeResult, result, { key: payload.execution_key });
|
||||
stateStore.markCompleted(commandId, {
|
||||
idempotency_key: payload.idempotency_key,
|
||||
execution_key: payload.execution_key,
|
||||
quote_id: payload.quote_id,
|
||||
result_event_id: result.event_id,
|
||||
});
|
||||
console.log(`[dummy-executor] result emitted ${pair} quote_id=${payload.quote_id} command_id=${commandId} status=simulated_sent`);
|
||||
},
|
||||
});
|
||||
75
src/apps/dummy-reactor.mjs
Normal file
75
src/apps/dummy-reactor.mjs
Normal file
|
|
@ -0,0 +1,75 @@
|
|||
import process from 'node:process';
|
||||
|
||||
import { createConsumer } from '../bus/kafka/consumer.mjs';
|
||||
import { createProducer } from '../bus/kafka/producer.mjs';
|
||||
import { logStatus } from '../core/log.mjs';
|
||||
import { loadConfig } from '../lib/config.mjs';
|
||||
import { buildEventEnvelope, parseEventMessage } from '../core/event-envelope.mjs';
|
||||
import { assertExecuteTradeCommand, assertNormalizedSwapDemand } from '../core/schemas.mjs';
|
||||
|
||||
const config = loadConfig();
|
||||
|
||||
const consumer = await createConsumer({
|
||||
groupId: config.kafkaConsumerGroupDummy,
|
||||
brokers: config.kafkaBrokers,
|
||||
clientId: config.kafkaClientId,
|
||||
});
|
||||
const producer = await createProducer({
|
||||
brokers: config.kafkaBrokers,
|
||||
clientId: config.kafkaClientId,
|
||||
});
|
||||
|
||||
await consumer.subscribe({ topic: config.kafkaTopicNormSwapDemand, fromBeginning: false });
|
||||
logStatus(`dummy reactor subscribed to ${config.kafkaTopicNormSwapDemand} as ${config.kafkaConsumerGroupDummy}`);
|
||||
logStatus(`dummy reactor will publish commands to ${config.kafkaTopicCmdExecuteTrade}`);
|
||||
|
||||
async function shutdown() {
|
||||
await consumer.disconnect();
|
||||
await producer.disconnect();
|
||||
process.exit(0);
|
||||
}
|
||||
process.on('SIGINT', shutdown);
|
||||
process.on('SIGTERM', shutdown);
|
||||
|
||||
await consumer.run({
|
||||
eachMessage: async ({ message }) => {
|
||||
if (!message.value) return;
|
||||
|
||||
let event;
|
||||
try {
|
||||
event = parseEventMessage(message.value.toString());
|
||||
} catch {
|
||||
logStatus('dummy reactor received non-JSON message; skipping');
|
||||
return;
|
||||
}
|
||||
|
||||
assertNormalizedSwapDemand(event);
|
||||
|
||||
const payload = event.payload;
|
||||
const pair = `${payload.asset_in} -> ${payload.asset_out}`;
|
||||
const quoteId = payload.quote_id;
|
||||
const commandId = `cmd-${quoteId}`;
|
||||
const command = buildEventEnvelope({
|
||||
source: 'dummy-reactor',
|
||||
venue: event.venue || 'near-intents',
|
||||
eventType: 'execute_trade',
|
||||
eventId: commandId,
|
||||
observedAt: event.observed_at,
|
||||
payload: {
|
||||
command_id: commandId,
|
||||
idempotency_key: `${event.venue || 'near-intents'}:${quoteId}`,
|
||||
execution_key: `${event.venue || 'near-intents'}:${payload.asset_in}->${payload.asset_out}`,
|
||||
quote_id: quoteId,
|
||||
asset_in: payload.asset_in,
|
||||
asset_out: payload.asset_out,
|
||||
amount_in: payload.amount_in,
|
||||
amount_out: payload.amount_out,
|
||||
reason: 'dummy reactor placeholder decision',
|
||||
},
|
||||
});
|
||||
assertExecuteTradeCommand(command);
|
||||
|
||||
await producer.sendJson(config.kafkaTopicCmdExecuteTrade, command, { key: command.payload.execution_key });
|
||||
console.log(`[dummy-reactor] command emitted ${pair} quote_id=${quoteId} command_id=${commandId}`);
|
||||
},
|
||||
});
|
||||
41
src/apps/near-intents-ingest.mjs
Normal file
41
src/apps/near-intents-ingest.mjs
Normal file
|
|
@ -0,0 +1,41 @@
|
|||
import process from 'node:process';
|
||||
|
||||
import { createProducer } from '../bus/kafka/producer.mjs';
|
||||
import { logStatus } from '../core/log.mjs';
|
||||
import { parsePairFilter } from '../core/pair-filter.mjs';
|
||||
import { loadConfig } from '../lib/config.mjs';
|
||||
import { startNearIntentsWs } from '../venues/near-intents/ws.mjs';
|
||||
|
||||
const config = loadConfig();
|
||||
const pairFilter = parsePairFilter(process.argv.slice(2));
|
||||
|
||||
if (!config.nearIntentsApiKey) {
|
||||
console.error('Missing NEAR_INTENTS_API_KEY in env or .env');
|
||||
process.exit(1);
|
||||
}
|
||||
|
||||
const producer = await createProducer({
|
||||
brokers: config.kafkaBrokers,
|
||||
clientId: config.kafkaClientId,
|
||||
});
|
||||
logStatus(`kafka producer connected; raw_topic=${config.kafkaTopicRawNearIntentsQuote}; normalized_topic=${config.kafkaTopicNormSwapDemand}`);
|
||||
if (pairFilter) logStatus(`pair filter enabled: ${pairFilter[0]} <-> ${pairFilter[1]}`);
|
||||
|
||||
process.on('SIGINT', async () => {
|
||||
await producer.disconnect();
|
||||
process.exit(0);
|
||||
});
|
||||
|
||||
process.on('SIGTERM', async () => {
|
||||
await producer.disconnect();
|
||||
process.exit(0);
|
||||
});
|
||||
|
||||
await startNearIntentsWs({
|
||||
apiKey: config.nearIntentsApiKey,
|
||||
wsUrl: config.nearIntentsWsUrl,
|
||||
pairFilter,
|
||||
producer,
|
||||
rawTopic: config.kafkaTopicRawNearIntentsQuote,
|
||||
normalizedTopic: config.kafkaTopicNormSwapDemand,
|
||||
});
|
||||
16
src/bus/kafka/consumer.mjs
Normal file
16
src/bus/kafka/consumer.mjs
Normal file
|
|
@ -0,0 +1,16 @@
|
|||
import { Kafka } from 'kafkajs';
|
||||
|
||||
function createKafka({ brokers = ['127.0.0.1:9092'], clientId = 'unrip' } = {}) {
|
||||
return new Kafka({ clientId, brokers });
|
||||
}
|
||||
|
||||
export async function createConsumer({ groupId, ...options }) {
|
||||
const consumer = createKafka(options).consumer({ groupId });
|
||||
await consumer.connect();
|
||||
|
||||
return {
|
||||
subscribe: (options) => consumer.subscribe(options),
|
||||
run: (options) => consumer.run(options),
|
||||
disconnect: () => consumer.disconnect(),
|
||||
};
|
||||
}
|
||||
21
src/bus/kafka/producer.mjs
Normal file
21
src/bus/kafka/producer.mjs
Normal file
|
|
@ -0,0 +1,21 @@
|
|||
import { Kafka } from 'kafkajs';
|
||||
|
||||
function createKafka({ brokers = ['127.0.0.1:9092'], clientId = 'unrip' } = {}) {
|
||||
return new Kafka({ clientId, brokers });
|
||||
}
|
||||
|
||||
export async function createProducer(options = {}) {
|
||||
const producer = createKafka(options).producer();
|
||||
await producer.connect();
|
||||
return {
|
||||
async sendJson(topic, event, { key = event?.event_id ?? event?.key ?? null } = {}) {
|
||||
await producer.send({
|
||||
topic,
|
||||
messages: [{ key, value: JSON.stringify(event) }],
|
||||
});
|
||||
},
|
||||
async disconnect() {
|
||||
await producer.disconnect();
|
||||
},
|
||||
};
|
||||
}
|
||||
41
src/core/event-envelope.mjs
Normal file
41
src/core/event-envelope.mjs
Normal file
|
|
@ -0,0 +1,41 @@
|
|||
import crypto from 'node:crypto';
|
||||
|
||||
export function buildEventEnvelope({
|
||||
eventType,
|
||||
venue,
|
||||
payload,
|
||||
source,
|
||||
eventId = crypto.randomUUID(),
|
||||
schemaVersion = 1,
|
||||
observedAt = null,
|
||||
ingestedAt = new Date(),
|
||||
raw = null,
|
||||
}) {
|
||||
if (!eventType) throw new Error('Missing eventType');
|
||||
if (!venue) throw new Error('Missing venue');
|
||||
if (payload == null) throw new Error('Missing payload');
|
||||
|
||||
return {
|
||||
event_id: String(eventId),
|
||||
event_type: String(eventType),
|
||||
venue: String(venue),
|
||||
source: source ? String(source) : null,
|
||||
schema_version: Number(schemaVersion),
|
||||
observed_at: toIsoStringOrNull(observedAt),
|
||||
ingested_at: toIsoStringOrNull(ingestedAt) ?? new Date().toISOString(),
|
||||
payload,
|
||||
raw,
|
||||
};
|
||||
}
|
||||
|
||||
export function parseEventMessage(value) {
|
||||
const event = typeof value === 'string' ? JSON.parse(value) : value;
|
||||
if (!event || typeof event !== 'object') throw new Error('Event must be an object');
|
||||
return event;
|
||||
}
|
||||
|
||||
function toIsoStringOrNull(value) {
|
||||
if (value == null) return null;
|
||||
const date = value instanceof Date ? value : new Date(value);
|
||||
return Number.isNaN(date.getTime()) ? null : date.toISOString();
|
||||
}
|
||||
49
src/core/executor-state-store.mjs
Normal file
49
src/core/executor-state-store.mjs
Normal file
|
|
@ -0,0 +1,49 @@
|
|||
import fs from 'node:fs';
|
||||
import path from 'node:path';
|
||||
|
||||
export function createExecutorStateStore({ stateDir, fileName = 'commands.json' }) {
|
||||
fs.mkdirSync(stateDir, { recursive: true });
|
||||
const filePath = path.join(stateDir, fileName);
|
||||
const state = loadState(filePath);
|
||||
|
||||
return {
|
||||
get(commandId) {
|
||||
return state[commandId] || null;
|
||||
},
|
||||
markProcessing(commandId, metadata) {
|
||||
state[commandId] = {
|
||||
...(state[commandId] || {}),
|
||||
...metadata,
|
||||
status: 'processing',
|
||||
updated_at: new Date().toISOString(),
|
||||
};
|
||||
persistState(filePath, state);
|
||||
return state[commandId];
|
||||
},
|
||||
markCompleted(commandId, metadata) {
|
||||
state[commandId] = {
|
||||
...(state[commandId] || {}),
|
||||
...metadata,
|
||||
status: 'completed',
|
||||
updated_at: new Date().toISOString(),
|
||||
};
|
||||
persistState(filePath, state);
|
||||
return state[commandId];
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
function loadState(filePath) {
|
||||
if (!fs.existsSync(filePath)) return {};
|
||||
try {
|
||||
return JSON.parse(fs.readFileSync(filePath, 'utf8'));
|
||||
} catch {
|
||||
return {};
|
||||
}
|
||||
}
|
||||
|
||||
function persistState(filePath, state) {
|
||||
const tempPath = `${filePath}.tmp`;
|
||||
fs.writeFileSync(tempPath, JSON.stringify(state, null, 2));
|
||||
fs.renameSync(tempPath, filePath);
|
||||
}
|
||||
31
src/core/log.mjs
Normal file
31
src/core/log.mjs
Normal file
|
|
@ -0,0 +1,31 @@
|
|||
export function logStatus(message) {
|
||||
const time = new Date().toISOString();
|
||||
console.error(`[${time}] ${message}`);
|
||||
}
|
||||
|
||||
export function startIdleHeartbeat({
|
||||
label,
|
||||
getLastActivityAt,
|
||||
getStatus,
|
||||
idleAfterMs = 30_000,
|
||||
checkEveryMs = 5_000,
|
||||
}) {
|
||||
let lastHeartbeatAt = 0;
|
||||
|
||||
const timer = setInterval(() => {
|
||||
const lastActivityAt = getLastActivityAt();
|
||||
const idleForMs = Date.now() - lastActivityAt;
|
||||
|
||||
if (idleForMs < idleAfterMs) return;
|
||||
if (Date.now() - lastHeartbeatAt < idleAfterMs) return;
|
||||
|
||||
const seconds = Math.floor(idleForMs / 1000);
|
||||
const suffix = getStatus ? `; ${getStatus()}` : '';
|
||||
logStatus(`${label} idle ${seconds}s${suffix}`);
|
||||
lastHeartbeatAt = Date.now();
|
||||
}, checkEveryMs);
|
||||
|
||||
if (typeof timer.unref === 'function') timer.unref();
|
||||
|
||||
return () => clearInterval(timer);
|
||||
}
|
||||
17
src/core/pair-filter.mjs
Normal file
17
src/core/pair-filter.mjs
Normal file
|
|
@ -0,0 +1,17 @@
|
|||
export function parsePairFilter(argv) {
|
||||
const idx = argv.indexOf('--pair');
|
||||
if (idx === -1) return null;
|
||||
const raw = argv[idx + 1];
|
||||
if (!raw || !raw.includes('->')) {
|
||||
throw new Error("Use --pair 'asset_a->asset_b'");
|
||||
}
|
||||
const [a, b] = raw.split('->').map((x) => x.trim().toLowerCase());
|
||||
return [a, b];
|
||||
}
|
||||
|
||||
export function matchesPairFilter(assetIn, assetOut, pairFilter) {
|
||||
if (!pairFilter) return true;
|
||||
const x = assetIn.toLowerCase();
|
||||
const y = assetOut.toLowerCase();
|
||||
return (x === pairFilter[0] && y === pairFilter[1]) || (x === pairFilter[1] && y === pairFilter[0]);
|
||||
}
|
||||
63
src/core/schemas.mjs
Normal file
63
src/core/schemas.mjs
Normal file
|
|
@ -0,0 +1,63 @@
|
|||
function requireString(value, field) {
|
||||
if (typeof value !== 'string' || value.length === 0) throw new Error(`Missing ${field}`);
|
||||
}
|
||||
|
||||
function requireObject(value, field) {
|
||||
if (!value || typeof value !== 'object' || Array.isArray(value)) throw new Error(`Missing ${field}`);
|
||||
}
|
||||
|
||||
export function assertEventEnvelope(event) {
|
||||
requireObject(event, 'event');
|
||||
requireString(event.event_id, 'event.event_id');
|
||||
requireString(event.event_type, 'event.event_type');
|
||||
requireString(event.venue, 'event.venue');
|
||||
if (event.source != null) requireString(event.source, 'event.source');
|
||||
if (typeof event.schema_version !== 'number') throw new Error('Missing event.schema_version');
|
||||
requireString(event.ingested_at, 'event.ingested_at');
|
||||
requireObject(event.payload, 'event.payload');
|
||||
return event;
|
||||
}
|
||||
|
||||
export function assertNormalizedSwapDemand(event) {
|
||||
assertEventEnvelope(event);
|
||||
if (event.event_type !== 'swap_demand') throw new Error(`Unexpected event_type: ${event.event_type}`);
|
||||
|
||||
const payload = event.payload;
|
||||
requireString(payload.quote_id, 'payload.quote_id');
|
||||
requireString(payload.asset_in, 'payload.asset_in');
|
||||
requireString(payload.asset_out, 'payload.asset_out');
|
||||
if (payload.amount_in != null) requireString(payload.amount_in, 'payload.amount_in');
|
||||
if (payload.amount_out != null) requireString(payload.amount_out, 'payload.amount_out');
|
||||
if (payload.ttl_ms != null) requireString(payload.ttl_ms, 'payload.ttl_ms');
|
||||
return event;
|
||||
}
|
||||
|
||||
export function assertExecuteTradeCommand(event) {
|
||||
assertEventEnvelope(event);
|
||||
if (event.event_type !== 'execute_trade') throw new Error(`Unexpected event_type: ${event.event_type}`);
|
||||
|
||||
const payload = event.payload;
|
||||
requireString(payload.command_id, 'payload.command_id');
|
||||
requireString(payload.idempotency_key, 'payload.idempotency_key');
|
||||
requireString(payload.execution_key, 'payload.execution_key');
|
||||
requireString(payload.quote_id, 'payload.quote_id');
|
||||
requireString(payload.asset_in, 'payload.asset_in');
|
||||
requireString(payload.asset_out, 'payload.asset_out');
|
||||
if (payload.amount_in != null) requireString(payload.amount_in, 'payload.amount_in');
|
||||
if (payload.amount_out != null) requireString(payload.amount_out, 'payload.amount_out');
|
||||
return event;
|
||||
}
|
||||
|
||||
export function assertTradeResult(event) {
|
||||
assertEventEnvelope(event);
|
||||
if (event.event_type !== 'trade_result') throw new Error(`Unexpected event_type: ${event.event_type}`);
|
||||
|
||||
const payload = event.payload;
|
||||
requireString(payload.command_id, 'payload.command_id');
|
||||
requireString(payload.idempotency_key, 'payload.idempotency_key');
|
||||
requireString(payload.execution_key, 'payload.execution_key');
|
||||
requireString(payload.quote_id, 'payload.quote_id');
|
||||
requireString(payload.status, 'payload.status');
|
||||
if (payload.result_code != null) requireString(payload.result_code, 'payload.result_code');
|
||||
return event;
|
||||
}
|
||||
54
src/lib/config.mjs
Normal file
54
src/lib/config.mjs
Normal file
|
|
@ -0,0 +1,54 @@
|
|||
import { loadDotenv } from './env.mjs';
|
||||
|
||||
const DEFAULTS = {
|
||||
nearIntentsWsUrl: 'wss://solver-relay-v2.chaindefuser.com/ws',
|
||||
kafkaBrokers: ['127.0.0.1:9092'],
|
||||
kafkaClientId: 'unrip',
|
||||
kafkaTopicRawNearIntentsQuote: 'raw.near_intents.quote',
|
||||
kafkaTopicNormSwapDemand: 'norm.swap_demand',
|
||||
kafkaTopicCmdExecuteTrade: 'cmd.execute_trade',
|
||||
kafkaTopicExecTradeResult: 'exec.trade_result',
|
||||
kafkaConsumerGroupDummy: 'dummy-reactor-v1',
|
||||
kafkaConsumerGroupExecutor: 'dummy-executor-v1',
|
||||
executorStateDir: './var/executor-state',
|
||||
};
|
||||
|
||||
function splitCsv(value) {
|
||||
return String(value || '')
|
||||
.split(',')
|
||||
.map((part) => part.trim())
|
||||
.filter(Boolean);
|
||||
}
|
||||
|
||||
export function loadConfig({ envPath = '.env' } = {}) {
|
||||
// Runtime config stays environment-first so the same app build works for:
|
||||
// - local `.env` development
|
||||
// - Docker/Compose
|
||||
// - Kubernetes Secret/ConfigMap injection during Hetzner bootstrap
|
||||
// This is what lets the local workstation bootstrap provision infra and then
|
||||
// deploy the exact same image into k3s without app-level config rewrites.
|
||||
loadDotenv(envPath);
|
||||
|
||||
return {
|
||||
nearIntentsApiKey: process.env.NEAR_INTENTS_API_KEY || '',
|
||||
nearIntentsWsUrl: process.env.NEAR_INTENTS_WS_URL || DEFAULTS.nearIntentsWsUrl,
|
||||
kafkaBrokers: splitCsv(process.env.KAFKA_BROKERS).length
|
||||
? splitCsv(process.env.KAFKA_BROKERS)
|
||||
: DEFAULTS.kafkaBrokers,
|
||||
kafkaClientId: process.env.KAFKA_CLIENT_ID || DEFAULTS.kafkaClientId,
|
||||
kafkaTopicRawNearIntentsQuote:
|
||||
process.env.KAFKA_TOPIC_RAW_NEAR_INTENTS_QUOTE || DEFAULTS.kafkaTopicRawNearIntentsQuote,
|
||||
kafkaTopicNormSwapDemand:
|
||||
process.env.KAFKA_TOPIC_NORM_SWAP_DEMAND || DEFAULTS.kafkaTopicNormSwapDemand,
|
||||
kafkaTopicCmdExecuteTrade:
|
||||
process.env.KAFKA_TOPIC_CMD_EXECUTE_TRADE || DEFAULTS.kafkaTopicCmdExecuteTrade,
|
||||
kafkaTopicExecTradeResult:
|
||||
process.env.KAFKA_TOPIC_EXEC_TRADE_RESULT || DEFAULTS.kafkaTopicExecTradeResult,
|
||||
kafkaConsumerGroupDummy:
|
||||
process.env.KAFKA_CONSUMER_GROUP_DUMMY || DEFAULTS.kafkaConsumerGroupDummy,
|
||||
kafkaConsumerGroupExecutor:
|
||||
process.env.KAFKA_CONSUMER_GROUP_EXECUTOR || DEFAULTS.kafkaConsumerGroupExecutor,
|
||||
executorStateDir:
|
||||
process.env.EXECUTOR_STATE_DIR || DEFAULTS.executorStateDir,
|
||||
};
|
||||
}
|
||||
17
src/lib/env.mjs
Normal file
17
src/lib/env.mjs
Normal file
|
|
@ -0,0 +1,17 @@
|
|||
import fs from 'node:fs';
|
||||
|
||||
// `.env` loading is a local/dev convenience only.
|
||||
// In the repo-driven Hetzner+k3s bootstrap flow, Kubernetes injects runtime
|
||||
// environment variables from Secrets/ConfigMaps and already-present process.env
|
||||
// values always win over anything on disk.
|
||||
export function loadDotenv(path = '.env') {
|
||||
if (!fs.existsSync(path)) return;
|
||||
const lines = fs.readFileSync(path, 'utf8').split(/\r?\n/);
|
||||
for (const raw of lines) {
|
||||
const line = raw.trim();
|
||||
if (!line || line.startsWith('#') || !line.includes('=')) continue;
|
||||
const [key, ...rest] = line.split('=');
|
||||
const value = rest.join('=').trim().replace(/^['"]|['"]$/g, '');
|
||||
if (!(key.trim() in process.env)) process.env[key.trim()] = value;
|
||||
}
|
||||
}
|
||||
5
src/venues/near-intents/ingest.mjs
Normal file
5
src/venues/near-intents/ingest.mjs
Normal file
|
|
@ -0,0 +1,5 @@
|
|||
import { startNearIntentsWs } from './ws.mjs';
|
||||
|
||||
export function startNearIntentsIngest(options) {
|
||||
return startNearIntentsWs(options);
|
||||
}
|
||||
68
src/venues/near-intents/normalize.mjs
Normal file
68
src/venues/near-intents/normalize.mjs
Normal file
|
|
@ -0,0 +1,68 @@
|
|||
import { buildEventEnvelope } from '../../core/event-envelope.mjs';
|
||||
|
||||
export function buildNearIntentsRawEnvelope(message, { ingestedAt = new Date() } = {}) {
|
||||
const raw = isRecord(message) ? message : {};
|
||||
const quoteId = first(raw, ['quote_id', 'quoteRequestId', 'request_id', 'id', 'quote_hash']);
|
||||
const occurredAt = first(raw, ['created_at', 'createdAt', 'timestamp', 'ts']);
|
||||
|
||||
return buildEventEnvelope({
|
||||
source: 'near-intents.ws',
|
||||
venue: 'near-intents',
|
||||
eventType: 'near_intents_quote_raw',
|
||||
eventId: quoteId || `near-intents-raw-${ingestedAt.getTime()}`,
|
||||
observedAt: occurredAt,
|
||||
ingestedAt,
|
||||
payload: { message: raw },
|
||||
raw,
|
||||
});
|
||||
}
|
||||
|
||||
export function buildNearIntentsQuoteEnvelope(message, { ingestedAt = new Date() } = {}) {
|
||||
const raw = isRecord(message) ? message : {};
|
||||
const payload = normalizeNearIntentsQuote(raw);
|
||||
if (!payload) return null;
|
||||
|
||||
const occurredAt = first(raw, ['created_at', 'createdAt', 'timestamp', 'ts']);
|
||||
|
||||
return buildEventEnvelope({
|
||||
source: 'near-intents.ws',
|
||||
venue: 'near-intents',
|
||||
eventType: 'swap_demand',
|
||||
eventId: payload.quote_id,
|
||||
observedAt: occurredAt,
|
||||
ingestedAt,
|
||||
payload,
|
||||
raw,
|
||||
});
|
||||
}
|
||||
|
||||
export function normalizeNearIntentsQuote(message) {
|
||||
const quoteId = first(message, ['quote_id', 'quoteRequestId', 'request_id', 'id']);
|
||||
const assetIn = first(message, ['defuse_asset_identifier_in', 'sellToken', 'asset_in']);
|
||||
const assetOut = first(message, ['defuse_asset_identifier_out', 'buyToken', 'asset_out']);
|
||||
if (!quoteId || !assetIn || !assetOut) return null;
|
||||
|
||||
return {
|
||||
quote_id: String(quoteId),
|
||||
asset_in: String(assetIn),
|
||||
asset_out: String(assetOut),
|
||||
amount_in: stringify(first(message, ['exact_amount_in', 'sellAmount', 'amount_in'])),
|
||||
amount_out: stringify(first(message, ['exact_amount_out', 'buyAmount', 'amount_out', 'expectedOut', 'quoted_amount_out'])),
|
||||
ttl_ms: stringify(first(message, ['min_deadline_ms', 'ttl_ms', 'deadline_ms'])),
|
||||
};
|
||||
}
|
||||
|
||||
function first(obj, keys) {
|
||||
for (const key of keys) {
|
||||
if (obj[key] != null) return obj[key];
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
function stringify(value) {
|
||||
return value == null ? null : String(value);
|
||||
}
|
||||
|
||||
function isRecord(value) {
|
||||
return !!value && typeof value === 'object' && !Array.isArray(value);
|
||||
}
|
||||
167
src/venues/near-intents/ws.mjs
Normal file
167
src/venues/near-intents/ws.mjs
Normal file
|
|
@ -0,0 +1,167 @@
|
|||
import { matchesPairFilter } from '../../core/pair-filter.mjs';
|
||||
import { logStatus, startIdleHeartbeat } from '../../core/log.mjs';
|
||||
import { assertNormalizedSwapDemand } from '../../core/schemas.mjs';
|
||||
import { buildNearIntentsQuoteEnvelope, buildNearIntentsRawEnvelope } from './normalize.mjs';
|
||||
|
||||
const DEFAULT_WS_URL = 'wss://solver-relay-v2.chaindefuser.com/ws';
|
||||
const QUOTE_SUB_ID = 1;
|
||||
const QUOTE_STATUS_SUB_ID = 2;
|
||||
|
||||
export async function startNearIntentsWs({
|
||||
apiKey,
|
||||
wsUrl = DEFAULT_WS_URL,
|
||||
pairFilter,
|
||||
producer,
|
||||
rawTopic,
|
||||
normalizedTopic,
|
||||
onPublish = defaultOnPublish,
|
||||
}) {
|
||||
if (!apiKey) throw new Error('Missing NEAR_INTENTS_API_KEY');
|
||||
|
||||
let quoteSubscriptionId = null;
|
||||
let quoteStatusSubscriptionId = null;
|
||||
let lastStatusAt = Date.now();
|
||||
let publishedCount = 0;
|
||||
let publishLocked = false;
|
||||
|
||||
function connect() {
|
||||
const ws = new WebSocket(wsUrl, {
|
||||
headers: { Authorization: `Bearer ${apiKey}` },
|
||||
});
|
||||
|
||||
ws.addEventListener('open', () => {
|
||||
logStatus('near-intents connected');
|
||||
ws.send(JSON.stringify({ jsonrpc: '2.0', id: QUOTE_SUB_ID, method: 'subscribe', params: ['quote'] }));
|
||||
ws.send(JSON.stringify({ jsonrpc: '2.0', id: QUOTE_STATUS_SUB_ID, method: 'subscribe', params: ['quote_status'] }));
|
||||
});
|
||||
|
||||
ws.addEventListener('message', async (event) => {
|
||||
lastStatusAt = Date.now();
|
||||
const text = typeof event.data === 'string' ? event.data : Buffer.from(event.data).toString('utf8');
|
||||
|
||||
let payload;
|
||||
try {
|
||||
payload = JSON.parse(text);
|
||||
} catch {
|
||||
return;
|
||||
}
|
||||
|
||||
if (payload?.id === QUOTE_SUB_ID) {
|
||||
quoteSubscriptionId = extractSubscriptionId(payload.result);
|
||||
return;
|
||||
}
|
||||
|
||||
if (payload?.id === QUOTE_STATUS_SUB_ID) {
|
||||
quoteStatusSubscriptionId = extractSubscriptionId(payload.result);
|
||||
return;
|
||||
}
|
||||
|
||||
const eventFrame = extractQuoteEventFrame(payload);
|
||||
if (!eventFrame) return;
|
||||
|
||||
const { subscription, merged } = eventFrame;
|
||||
|
||||
if (quoteStatusSubscriptionId && subscription === quoteStatusSubscriptionId) return;
|
||||
if (quoteSubscriptionId && subscription && subscription !== quoteSubscriptionId) return;
|
||||
if (publishLocked) return;
|
||||
|
||||
const rawEnvelope = buildNearIntentsRawEnvelope(merged);
|
||||
const envelope = buildNearIntentsQuoteEnvelope(merged);
|
||||
if (!envelope) return;
|
||||
assertNormalizedSwapDemand(envelope);
|
||||
|
||||
const assetIn = envelope.payload?.asset_in;
|
||||
const assetOut = envelope.payload?.asset_out;
|
||||
if (!assetIn || !assetOut) return;
|
||||
if (!matchesPairFilter(assetIn, assetOut, pairFilter)) return;
|
||||
|
||||
publishLocked = true;
|
||||
try {
|
||||
await producer.sendJson(rawTopic, rawEnvelope, { key: rawEnvelope.event_id });
|
||||
await producer.sendJson(normalizedTopic, envelope, { key: envelope.payload.quote_id });
|
||||
publishedCount += 1;
|
||||
onPublish(envelope, publishedCount);
|
||||
} catch (error) {
|
||||
logStatus(`kafka publish failed: ${error.message || 'unknown error'}`);
|
||||
} finally {
|
||||
publishLocked = false;
|
||||
}
|
||||
});
|
||||
|
||||
ws.addEventListener('close', () => {
|
||||
logStatus('near-intents disconnected; reconnecting in 2s');
|
||||
setTimeout(connect, 2000);
|
||||
});
|
||||
|
||||
ws.addEventListener('error', (err) => {
|
||||
logStatus(`near-intents socket error: ${err.message || 'unknown error'}`);
|
||||
});
|
||||
}
|
||||
|
||||
startIdleHeartbeat({
|
||||
label: 'near-intents',
|
||||
getLastActivityAt: () => lastStatusAt,
|
||||
getStatus: () => `published=${publishedCount}`,
|
||||
});
|
||||
|
||||
connect();
|
||||
}
|
||||
|
||||
function extractSubscriptionId(result) {
|
||||
if (typeof result === 'string') return result;
|
||||
if (result && typeof result === 'object') {
|
||||
return result.subscription || result.subscription_id || result.subscriber_id || null;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
function extractQuoteEventFrame(payload) {
|
||||
const candidates = [];
|
||||
|
||||
if (payload?.method === 'event' && payload?.params) {
|
||||
candidates.push(payload.params);
|
||||
}
|
||||
|
||||
if (payload?.result && typeof payload.result === 'object') {
|
||||
candidates.push(payload.result);
|
||||
}
|
||||
|
||||
if (payload && typeof payload === 'object') {
|
||||
candidates.push(payload);
|
||||
}
|
||||
|
||||
for (const candidate of candidates) {
|
||||
const data = candidate?.data;
|
||||
const metadata = candidate?.metadata;
|
||||
const merged = isRecord(data) || isRecord(metadata)
|
||||
? { ...(isRecord(metadata) ? metadata : {}), ...(isRecord(data) ? data : {}) }
|
||||
: candidate;
|
||||
|
||||
if (!isRecord(merged)) continue;
|
||||
if (!looksLikeQuotePayload(merged)) continue;
|
||||
|
||||
return {
|
||||
subscription: candidate?.subscription ?? null,
|
||||
merged,
|
||||
};
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
function looksLikeQuotePayload(payload) {
|
||||
return Boolean(
|
||||
payload.quote_hash
|
||||
|| payload.quote_id
|
||||
|| payload.defuse_asset_identifier_in
|
||||
|| payload.defuse_asset_identifier_out
|
||||
|| payload.asset_in
|
||||
|| payload.asset_out,
|
||||
);
|
||||
}
|
||||
|
||||
function isRecord(value) {
|
||||
return Boolean(value) && typeof value === 'object' && !Array.isArray(value);
|
||||
}
|
||||
|
||||
function defaultOnPublish() {}
|
||||
Loading…
Add table
Reference in a new issue