Compare commits

..

No commits in common. "860471f2676de36a6db6859c66102834cd86fb80" and "16e7b79978e0f32a9921209578c4bb5337b2e2ce" have entirely different histories.

42 changed files with 31 additions and 3979 deletions

View file

@ -37,8 +37,6 @@ STRATEGY_ENGINE_CONTROL_HOST=0.0.0.0
STRATEGY_ENGINE_CONTROL_PORT=8086
TRADE_EXECUTOR_CONTROL_HOST=0.0.0.0
TRADE_EXECUTOR_CONTROL_PORT=8087
OPS_SENTINEL_CONTROL_HOST=0.0.0.0
OPS_SENTINEL_CONTROL_PORT=8088
# Kafka backbone
KAFKA_BROKERS=redpanda:9092
@ -48,8 +46,6 @@ KAFKA_TOPIC_NORM_SWAP_DEMAND=norm.swap_demand
KAFKA_TOPIC_REF_MARKET_PRICE=ref.market_price
KAFKA_TOPIC_STATE_INTENT_INVENTORY=state.intent_inventory
KAFKA_TOPIC_OPS_LIQUIDITY_ACTION=ops.liquidity_action
KAFKA_TOPIC_OPS_FUNDING_OBSERVATION=ops.funding_observation
KAFKA_TOPIC_OPS_ALERT=ops.alert
KAFKA_TOPIC_DECISION_TRADE_DECISION=decision.trade_decision
KAFKA_TOPIC_CMD_EXECUTE_TRADE=cmd.execute_trade
KAFKA_TOPIC_EXEC_TRADE_RESULT=exec.trade_result
@ -57,7 +53,6 @@ KAFKA_CONSUMER_GROUP_HISTORY=history-writer-v1
KAFKA_CONSUMER_GROUP_INVENTORY=inventory-sync-v1
KAFKA_CONSUMER_GROUP_STRATEGY=strategy-engine-v1
KAFKA_CONSUMER_GROUP_EXECUTOR=trade-executor-v1
KAFKA_CONSUMER_GROUP_OPS_SENTINEL=ops-sentinel-v1
# PostgreSQL durable history store
POSTGRES_URL=postgresql://unrip:unrip@postgres:5432/unrip
@ -84,13 +79,3 @@ STRATEGY_INVENTORY_MAX_AGE_MS=30000
EXECUTOR_INITIAL_ARMED=false
EXECUTOR_RESPONSE_TIMEOUT_MS=10000
LIQUIDITY_WITHDRAWALS_FROZEN=true
# Pre-credit funding visibility and alerting
BTC_FUNDING_OBSERVER_ENABLED=true
BTC_FUNDING_OBSERVER_BASE_URL=https://mempool.space/api
FUNDING_OBSERVATION_STUCK_MS=3600000
OPS_SENTINEL_EVALUATION_MS=5000
OPS_SENTINEL_PRICE_STALE_MS=30000
OPS_SENTINEL_INVENTORY_STALE_MS=30000
OPS_SENTINEL_FUNDING_CREDIT_PENDING_MS=300000
OPS_SENTINEL_FUNDING_STUCK_MS=3600000

162
AGENTS.md
View file

@ -1,162 +0,0 @@
# unrip agent rules
This repository is run with a data-first trading-system workflow.
## Core rule
Every change must directly serve the currently approved turn.
## What a turn is
A turn is the currently approved slice of work. There are two lanes:
- Implementation turn:
- defined by `PROOF.md` and `IMPLEMENTATION.md`
- the job is to implement what `IMPLEMENTATION.md` says, validate it against `PROOF.md`, and keep working until the scoped fake paths are removed or explicitly recorded as remaining blockers
- Research turn:
- defined by `research/ACTIVE.md`
- the job is to produce evidence against the active research charter, not product shape
`BACKLOG.md` is not the turn.
`ARCHIVE.md` is not the turn.
They are planning and history artifacts.
## Read order by task
Use the smallest context needed for the current task.
### For implementation work
Read only:
1. `THESIS.md`
2. `PROOF.md`
3. `IMPLEMENTATION.md`
Do not read `BACKLOG.md`, `ARCHIVE.md`, or `research/ACTIVE.md` during an implementation turn unless the user explicitly asks to re-plan, open a new turn, review history, or switch to research.
### For research work
Read only:
1. `THESIS.md`
2. `research/ACTIVE.md`
Do not read `BACKLOG.md`, `ARCHIVE.md`, `PROOF.md`, or `IMPLEMENTATION.md` unless the user explicitly asks to re-plan, compare lanes, or promote research into implementation.
### For planning, re-planning, or archiving
Read:
1. `THESIS.md`
2. the live turn files for the lane being changed
3. `BACKLOG.md`
4. `ARCHIVE.md`
## Hard constraints
- Do not invent or adopt a new roadmap on your own.
- Do not expand scope beyond the active implementation proof or research charter.
- No backlog generation instead of implementation.
- No scaffolding ahead of demonstrated need.
- Quote collection and analytics are first-class from day one. They are not a later add-on.
- Do not present scaffolding, dashboards, placeholders, or mock flows as product progress.
- State assumptions before coding when the environment, venue, chain, or source behavior is uncertain.
- Declare what is still fake in every commit.
- Do not read `BACKLOG.md` or `ARCHIVE.md` during an implementation turn unless the user explicitly asks to re-plan, open a new turn, or inspect history.
- If you discover adjacent work, add it to `BACKLOG.md` instead of absorbing it into the current turn.
- Changes that widen risk require explicit user approval:
- live funds
- secret creation or rotation
- permanent infrastructure spend
- long-running external jobs
- destructive data migrations
- The long-term thesis may be proposed, but `THESIS.md` must not be rewritten without explicit user approval.
## Iteration archive rule
When the user says to plan the next iteration, next turn, implementation turn, or proof sprint:
- first preserve the finished turn before drafting new planning docs
- do not rewrite the live turn files until the current turn has been archived
- use the existing repo workflow and archive locations:
- implementation turn snapshots go in `archive/implementation/`
- research turn snapshots go in `archive/research/`
- prefer the tracked scripts:
- `python3 scripts/workflow/close_turn.py ...`
- `python3 scripts/workflow/open_turn.py ...`
Planning or archiving is the one time it is correct to read `BACKLOG.md` and `ARCHIVE.md`.
## Planning inputs
- `iteration`, `implementation turn`, `proof sprint`, and `next turn` mean the same planning slice unless the user says otherwise.
- Use `BACKLOG.md` only while planning, re-planning, or archiving. Do not read it while implementing or validating the active turn unless the user explicitly asks.
- Select backlog items and bugs that belong together under one proof topic instead of mixing unrelated work into the same slice.
- When an item is pulled into the live turn, remove it from `BACKLOG.md` and record the planning event in `ARCHIVE.md`.
## Planning quality bar
- `IMPLEMENTATION.md` must be detailed enough that coding does not depend on rediscovering the plan mid-turn.
- `IMPLEMENTATION.md` should cover the end-to-end system touched by the proof: ingest, pricing, inventory, persistence, strategy, execution, control surfaces, logging, validation, tests, failure modes, and important edge cases.
- `PROOF.md` must be specific and falsifiable: scope, non-goals, definition of done, validation evidence, failure conditions, and clear statements about what is real versus fake.
- Both planning documents should think through the whole operator workflow and full system path, not isolated file edits.
## Turn lanes
- Implementation lane:
- Governed by `PROOF.md` and `IMPLEMENTATION.md`.
- Must make the shared live and historical data path more real unless the turn is explicitly marked as pure ops.
- “Doing the turn” means implementing `IMPLEMENTATION.md`, validating against `PROOF.md`, fixing what fails, and continuing until the definition of done is met or a hard blocker is reached.
- Research lane:
- Governed by `research/ACTIVE.md`.
- Must name the hypothesis, dataset, metric, assumptions, and falsification condition.
- Research output is evidence, not product shape.
## Commit rules
Every non-merge commit message body must include:
- `Proof: ...`
- `Assumptions: ...`
- `Still fake: ...`
Install the tracked hook before relying on this:
```bash
bash scripts/workflow/install_hooks.sh
```
## Post-implementation loop
- After implementation and validation, expect the user to test the slice and suggest small fixes.
- Treat those small fixes as part of closing the same turn unless the user explicitly changes scope.
## Bug-fix rule
- If a bug is found, fix it, inspect analogous or affected locations, and apply the needed follow-up fixes there too.
- No bug fix is done without a regression test.
- If a meaningful automated test cannot be added, stop and explain why instead of claiming the fix is complete.
## Real progress vs fake progress
Real progress means the repository can do more of the active proof with validated evidence from real systems. In this repo that means things like:
- ingesting real NEAR Intents flow
- storing durable inventory, pricing, decision, and execution records
- producing a real decision from live data
- making a real execution attempt through repo-controlled code
- proving blocked-path safety with explicit evidence
Fake progress includes:
- docs-only motion without stronger runtime truth
- speculative architecture not required by the active proof
- dashboards or control surfaces without the underlying real path
- placeholders or mocks presented as finished work
- invented data, unverifiable claims, or abstractions not yet required
## Safety rules
- Prefer the smallest real implementation that proves the active turn works.
- Keep secrets out of git, docs, and chat history.
- Use self-hosted or directly controlled infrastructure by default.
- If something is fake or incomplete, say so plainly.
## Review standard
Before claiming a turn is done, be able to state:
- what became more real
- what was validated against real data or systems
- what is still fake
- what was deliberately not built
## Working rule
Within an approved turn, continue implementing and validating until:
- `PROOF.md` is satisfied for the active scope, or
- a hard blocker requires user input.
Do not silently open the next turn yourself.
Do not stop at “the structure exists.”
Do not stop while the active turn still depends on hidden dummy paths.
If something remains fake at the end of the turn, name it plainly.

View file

@ -1,16 +0,0 @@
# Archive Index
This file records turn openings, closures, and archived snapshots.
Legacy note:
- Work completed before `2026-04-01` predates this workflow and is not retroactively indexed here.
## Implementation Turns
- 2026-04-02: `first-non-mocked-tradeable-loop-for-one-pair` closed with status `passed`. A live active-pair quote flowed through funding, inventory sync, strategy, and real mainnet quote responses with durable history.
## Research Turns
## Planning Events
- 2026-04-01: workflow files initialized for thesis, implementation proof, backlog, archive, and research lane.
- 2026-04-01: active implementation proof rewritten from durable-history scaffolding to the first executable trade loop for one pair.
- 2026-04-02: opened implementation turn `pre-credit-funding-visibility-and-operator-alerts` from backlog items O003, O004.

View file

@ -1,36 +0,0 @@
# Backlog
This file is the candidate pool for future work. It is not the active plan.
Rules:
- Add ideas here when they do not belong in the current turn.
- Promotion from backlog to an active turn is a separate planning step.
- `scripts/workflow/add_backlog.py` can append new items with stable IDs.
## Implementation Candidates
- [I001] Hybrid reference-price service using Kraken stream and CoinGecko poll or fallback for the active pair inputs.
- [I002] PostgreSQL event store plus history writer for quotes, reference prices, decisions, commands, and execution results.
- [I003] Strategy engine that consumes `norm.swap_demand` plus reference prices and emits auditable decisions.
- [I004] Real Near Intents executor service using pre-funded internal inventory, with explicit arming and idempotent result reporting.
- [I005] Import local EUR/BTC history on disk into research tables or files for replay and baseline checks.
- [I006] Decision-to-command safety gate with explicit arm or disarm state, notional caps, and inventory freshness checks.
- [I007] Inventory-aware execution rule: implement both directions, but only fire the side backed by credited internal source-asset inventory.
- [I008] Inventory-sync service for NEAR Intents internal balances and pending funding state.
- [I009] Liquidity-manager service for deposit addresses, funding actions, and treasury visibility.
## Research Candidates
- [R001] Compare Kraken and CoinGecko drift and freshness for the assets needed to price the active pair.
- [R002] Test whether the active pair's implied rate diverges from external reference prices enough to justify execution after a simple 2% gross threshold.
- [R003] Measure whether stale quotes correlate with worse execution quality or higher reject rates.
- [R004] Import and inspect the local historical EUR/BTC data to see how it can seed replay and backtests.
- [R005] Measure how long treasury funding takes from external transfer to credited internal inventory.
## Ops Candidates
- [O001] PostgreSQL backup and retention plan for analytics and audit history.
- [O002] Signing-secret and NEAR Intents account management for real execution credentials.
## Bugs
- [B001] The previous storage-only turn did not reach a tradeable loop and pulled the workflow toward scaffolding.
- [B002] No reference price source exists, so the system cannot estimate edge.
- [B003] Dummy reactor and dummy executor prevent a non-mocked trade path.
- [B004] The prior plan assumed external hot wallets were on the trade hot path instead of pre-funded NEAR Intents inventory.

View file

@ -1,235 +0,0 @@
# Implementation Turn: pre-credit funding visibility and operator alerts
Status: open
Opened: 2026-04-02
## Goal
Make the already-live BTC/EURe loop operationally understandable between external funding and spendable credit, and make critical stale or failed states queryable as durable alert records instead of transient logs.
## Selected backlog items
- [O003] Alerts for stale reference prices, stale inventory state, stuck funding actions, and failed executor submissions.
- [O004] Pre-credit funding visibility for slow chains: watch configured deposit addresses at chain level, track inbound transfers through mempool and on-chain confirmation before bridge credit, persist that state separately from spendable inventory, and alert operators when funding is seen, delayed, or stuck.
## Design rule
Keep the already-proven execution and inventory truth path intact:
- verifier and bridge credit remain the only spendable truth
- pre-credit visibility is additive observability
- alerts are additive operability
## Event backbone
Retain Kafka as the backbone. Add only the minimal new topics required:
- `ops.funding_observation`
- `ops.alert`
These topics are append-only evidence streams, not control planes.
## Durable store
Extend PostgreSQL with new append-only families:
- `funding_observations`
- `ops_alerts`
If a current-state materialization is needed, derive it from append-only records or keep it as a clearly named snapshot table. Do not replace the append-only record.
## Service changes
### 1. `liquidity-manager`
Extend the existing treasury owner instead of inventing a broad new funding stack.
New responsibilities:
- retain active funding handles by chain and asset
- poll configured chain observers for those handles
- emit `ops.funding_observation`
- correlate chain observations with bridge `recent_deposits`
- expose latest observations and credit-correlation state through `/state`
Expected state shape additions:
- `funding_observations_by_handle`
- `latest_funding_observation_at`
- `uncredited_funding_total_by_asset`
- `credit_correlation`
Control additions:
- `POST /refresh-funding-observations`
- optional `POST /pause-funding-observer`
- optional `POST /resume-funding-observer`
Important implementation constraints:
- do not change withdrawal behavior
- do not reuse spendable inventory fields for pre-credit state
- keep BTC and EURe observation records in one shared schema
### 2. `inventory-sync`
Keep current spendable accounting intact.
Possible additions:
- read the latest funding observations
- expose a separate `pre_credit_inbound` or `funding_visibility` field in `/state`
Hard rule:
- `spendable`, `pending_inbound`, and strategy-facing credited truth must not become looser
### 3. `history-writer`
Consume and persist the new topics:
- `ops.funding_observation`
- `ops.alert`
Expose through `/state`:
- latest funding-observation write time
- latest alert write time
- counts or offsets for the new topics
Add query-friendly indexes for:
- `tx_hash`
- `funding_handle`
- `alert_code`
- `ingested_at`
### 4. `ops-sentinel` or equivalent alert evaluator
Add one small service only if needed to keep alert logic separate and testable.
Responsibilities:
- consume:
- `ref.market_price`
- `state.intent_inventory`
- `ops.liquidity_action`
- `ops.funding_observation`
- `exec.trade_result`
- evaluate policy windows for stale and stuck conditions
- emit `ops.alert` raise/clear transitions
- expose current alert state
Preferred alert model:
- stable `alert_code`
- `status`: `raised` or `cleared`
- `severity`
- `reason`
- `first_raised_at`
- `last_evaluated_at`
- correlation IDs when available
Minimal alert set for this turn:
- `reference_price_stale`
- `inventory_snapshot_stale`
- `funding_seen_unconfirmed`
- `funding_confirmed_credit_pending`
- `funding_stuck`
- `executor_submission_failed`
Do not add Slack, email, or paging integrations in this turn unless required to prove the path. Durable alert records plus HTTP state are sufficient.
## Chain observer plan
### BTC
Must be the first-class proof path.
Implementation expectations:
- configurable observer endpoint
- look up the configured BTC deposit address
- detect:
- mempool appearance when available
- confirmation count
- credited transition once bridge/verifier catches up
Assumption to keep explicit in code:
- a chain observer can disappear or lag independently of the bridge
### Gnosis / EURe
Nice-to-have within the same schema, but BTC is the proof-critical path.
If included this turn:
- watch the configured deposit address for EURe token transfers
- represent observation state in the same event model
## Record shapes
### `ops.funding_observation`
Required fields:
- `funding_observation_id`
- `account_id`
- `asset_id`
- `chain`
- `funding_handle`
- `source`
- `tx_hash`
- `status`
- `amount`
- `confirmations`
- `first_seen_at`
- `last_seen_at`
- `credited_at` when known
- `bridge_deposit_tx_hash` when correlated
### `ops.alert`
Required fields:
- `alert_event_id`
- `alert_code`
- `status`
- `severity`
- `reason`
- `service_scope`
- `pair` when relevant
- `asset_id` when relevant
- `tx_hash` when relevant
- `raised_at`
- `cleared_at`
- `details`
## Control surface expectations
### `liquidity-manager`
Must expose:
- active deposit handles
- latest pre-credit funding observations
- latest credit correlation
- whether the funding observer is healthy or paused
### alert evaluator
Must expose:
- current active alerts
- latest cleared alerts
- per-alert evaluation timestamps
- pause state
### `history-writer`
Must expose the new topic offsets and write status for funding observations and alerts.
## Tests
Required automated coverage:
- BTC funding observation remains non-spendable before credit
- alert transitions raise then clear on recovered stale state
- funding observation correlates to a later credited deposit without losing the original tx hash
- executor failure produces an alert event
If a meaningful automated test cannot be written for a subpath, stop and record why instead of hand-waving.
## Validation plan
- Safe induced stale-price alert:
- pause `market-reference-ingest`
- wait past freshness window
- observe `reference_price_stale`
- resume and observe clear
- Safe induced stale-inventory alert:
- pause `inventory-sync`
- wait past freshness window
- observe `inventory_snapshot_stale`
- resume and observe clear
- Funding visibility proof:
- use a real deposit address
- observe pre-credit chain state before bridge credit where timing allows
- later observe credit correlation
- Executor failure alert proof:
- use a controlled non-destructive failure mode such as temporary relay endpoint override in a safe environment or a replayable failure fixture
- verify `executor_submission_failed`
## Out of scope on purpose
- No new trading strategy
- No historical backtest engine
- No broad observability stack
- No polished dashboard frontend
- No automated treasury refills
## Still fake at turn open
- Pre-credit funding visibility is still missing from the live cluster.
- Alert state is still mostly implicit in service logs and manual inspection.
- There is no durable operator-facing record yet for "funds are on the way but not spendable."

154
PROOF.md
View file

@ -1,154 +0,0 @@
# Implementation Proof: pre-credit funding visibility and operator alerts
Status: open
Opened: 2026-04-02
## Target outcome
The next turn is complete only when `unrip` can show operators the gap between "funds sent" and "funds spendable" with durable evidence, and can surface actionable alert state for the live loop without requiring log-diving or manual SQL every time something stalls.
This turn does not expand the trade hot path. It makes the existing live system more explainable and more operable.
## Hypothesis
`unrip` becomes materially safer to operate once it can:
1. observe configured funding handles before NEAR Intents credit
2. persist chain-level funding observations separately from spendable inventory
3. link pre-credit observations to later bridge and verifier credit where possible
4. emit durable alert state for stale prices, stale inventory, stuck funding, and failed execution submissions
5. expose that state through the same small control surfaces and PostgreSQL audit trail as the rest of the system
If pre-credit funding remains invisible, or alert state still lives only in transient logs, the live loop is still too opaque for routine funded operation.
## Scope
- [O003] Alerts for stale reference prices, stale inventory state, stuck funding actions, and failed executor submissions.
- [O004] Pre-credit funding visibility for slow chains: watch configured deposit addresses at chain level, track inbound transfers through mempool and on-chain confirmation before bridge credit, persist that state separately from spendable inventory, and alert operators when funding is seen, delayed, or stuck.
## Non-goals
- No new venue, pair, or strategy logic.
- No dashboard or polished UI.
- No automatic treasury actions or auto-refunding.
- No attempt to treat chain-level observations as spendable inventory.
- No change to the live execution arming model.
## Source-of-truth rule
Spendable inventory remains the existing truth:
- bridge and verifier credit determine spendable balances
- chain-level observations are visibility only
The new pre-credit path must never be allowed to make a direction tradable earlier than the verifier does.
## Required runtime behavior
### Funding visibility
- The system must know the currently active funding handles for BTC and EURe.
- For configured chains, it must watch those handles before NEAR Intents credit appears.
- It must distinguish at least these states where applicable:
- `SEEN_UNCONFIRMED`
- `SEEN_CONFIRMED`
- `CREDIT_PENDING`
- `CREDITED`
- `FAILED_OR_STUCK`
- BTC is the must-prove chain because that is where live funding latency was operationally visible.
- Gnosis support may share the same event model even if its confirmation behavior is simpler.
### Alerts
- Alerts must be durable records, not only log lines.
- At minimum the system must raise and clear alert state for:
- stale reference price
- stale inventory snapshot
- funding seen but not credited within policy
- execution submission failure
- Alert transitions must be inspectable through HTTP state and PostgreSQL.
## Service expectations
### `liquidity-manager`
Must become the owner of chain-level funding observations because it already owns deposit handles and treasury state.
It must:
- refresh and retain active funding handles
- ingest chain-level funding observations
- reconcile them against bridge deposit state
- publish durable funding-observation records
- expose current per-handle funding state
It must not:
- mark funds spendable
- trade on pre-credit observations
### `inventory-sync`
May surface pre-credit funding context, but only under a clearly separate non-spendable field.
It must not:
- merge pre-credit observations into `spendable`
### `history-writer`
Must persist the new record families:
- funding observations
- alert events
- optionally current alert snapshots if the implementation separates events from state
### Alert evaluator
This may be a new small service or a tightly scoped extension of an existing one, but it must:
- evaluate staleness and stuck conditions from durable inputs
- emit durable alert events
- expose current alert state and the latest reasons
No broad orchestration or dashboard service should be introduced just to satisfy this proof.
## Required durable storage
PostgreSQL must store at least:
- funding observations before bridge credit
- alert events or alert snapshots
- enough timestamps and IDs to correlate:
- funding handle
- chain tx hash
- later bridge tx hash or deposit record
- resulting verifier credit snapshot when available
Kafka remains the event backbone.
## Required control surface
At minimum operators must be able to inspect:
- active funding handles
- latest pre-credit observations by handle
- confirmation depth or equivalent chain state when available
- whether a funding action is still pending credit
- current active alerts and their reasons
If a new alert service exists, it must expose:
- `GET /healthz`
- `GET /state`
- `POST /pause`
- `POST /resume`
## Definition of done
- The live cluster still runs the previously proven funded trade loop unchanged for spendable truth.
- At least one real funding handle is watched at chain level before bridge credit.
- For at least one real deposit path, the system records a pre-credit observation before or during confirmation and later records the credited state separately.
- PostgreSQL contains durable records for the pre-credit funding path and alert path.
- Operators can inspect current funding observations and current alerts through control APIs.
- A stale price or stale inventory condition can be induced safely and becomes a durable alert.
- A funding delay or manually injected stuck condition can be represented as a durable alert with explicit reason fields.
- A failed execution submission path is represented as an alert without inventing fake venue traffic.
- Tests cover:
- pre-credit observations staying non-spendable
- alert raise and clear transitions
- correlation of funding observation to later credit when identifiers are available
## Failure conditions
- Funding observations exist only in logs and are not queryable later.
- Pre-credit observations leak into spendable inventory or strategy gating.
- Alerts cannot be queried as current state.
- The only proof of stuck funding is a human manually watching a block explorer.
- The implementation adds a dashboard shell without stronger runtime truth.
## Current real
- The first funded BTC/EURe live loop is already proven:
- real quote ingest
- real reference pricing
- real credited inventory
- real strategy decisions
- real `quote_response` submissions
- durable event chain in PostgreSQL
- Portfolio metrics are now durably computed and exposed, but alerting and pre-credit funding visibility are still incomplete.

View file

@ -1,50 +0,0 @@
# unrip thesis
## Purpose
Build a data-first trading system whose first-class artifact is trustworthy market and execution history.
The bot is one consumer of that truth. Analytics and backtesting are not bolted on later; they are part of the product from the beginning.
## Product nucleus
The nucleus of the system is one shared truth pipeline:
1. observe live market and intent flow
2. persist raw and normalized events durably
3. replay the same history for analytics and backtests
4. score candidate actions from the same canonical data model
5. execute only behind explicit safety gates and full auditability
## Architectural invariants
- Live decisions and historical analysis must share the same canonical event model whenever practical.
- Raw events are kept alongside normalized and derived records.
- Every important decision should be reproducible from stored inputs and explicit assumptions.
- Execution must not outpace observability. If the system cannot explain what happened, it is not ready to trade.
- Quote collection and analytics are core product work, not support work.
## Near-term thesis
Near term, `unrip` should become a narrow but truthful trading-data and decision pipeline for one real pair on one venue.
That means:
- real upstream data
- durable storage beyond transient bus retention
- replayable history
- measurable candidate decisions
- no pretending that execution is safe before the data and analysis path is trustworthy
## Long-term thesis
Long term, the same system should support:
- automated trading on selected pairs
- analytics and backtesting from retained ground-truth data
- cross-chain and cross-asset execution routing
## Non-goals right now
- polished operator UI before the data loop is truthful
- broad multi-venue coverage before one core loop is real
- strategy claims without named assumptions and falsification criteria
- live trading with real funds before paper or tightly gated execution is trustworthy
## Approval boundaries
The agent may propose changes to the thesis, but the user must approve:
- changes to the core product definition
- changes that raise the risk class of the system
- changes that create lasting infra cost or operational burden

View file

@ -1,59 +0,0 @@
# Workflow
This repository uses a small tracked workflow layer instead of a large agent orchestration system.
## Files
- `THESIS.md`: stable product intent
- `PROOF.md`: active implementation proof
- `IMPLEMENTATION.md`: current implementation turn
- `research/ACTIVE.md`: active research charter
- `BACKLOG.md`: parked ideas and bugs
- `ARCHIVE.md`: turn history index
- `workflow/REVIEW_PROMPT.md`: adversarial review prompt
## Install the tracked git hook
```bash
bash scripts/workflow/install_hooks.sh
```
## Add a backlog item
```bash
python3 scripts/workflow/add_backlog.py --lane implementation --summary "Reference-price service for active pair inputs"
python3 scripts/workflow/add_backlog.py --lane research --summary "Test whether implied pair rate diverges from external reference prices after fees"
```
## Open a new implementation turn
```bash
python3 scripts/workflow/open_turn.py \
--lane implementation \
--title "first executable trade loop for one pair" \
--summary "Add reference pricing, strategy, durable audit history, and a real execution path for the active pair." \
--pick I001 \
--pick I002 \
--pick I003 \
--pick I004
```
Use `--commit` if you want the planning change committed automatically.
## Close the current turn and archive it
```bash
python3 scripts/workflow/close_turn.py \
--lane implementation \
--status passed \
--summary "A live active-pair quote flowed through pricing, decision, and a real execution attempt with durable audit records."
```
The script copies the live turn files into `archive/implementation/` or `archive/research/`, updates `ARCHIVE.md`, and can make the archive commit with `--commit`.
## Build a review bundle
```bash
bash scripts/workflow/review_diff.sh HEAD~1
```
That emits a Markdown bundle containing the diff plus the adversarial review prompt for a separate review-only agent session.

View file

@ -1 +0,0 @@

View file

@ -1,544 +0,0 @@
# Implementation Turn: first non-mocked tradeable loop for one pair
Status: open
Opened: 2026-04-01
## Goal
Build the first full live vertical slice that can actually attempt a trade:
1. observe a real NEAR Intents quote
2. enrich it with live external reference pricing
3. synchronize spendable inventory already credited inside NEAR Intents
4. evaluate it in a real strategy service
5. gate it by inventory and arm state
6. submit it through a real Near Intents executor
7. persist the full chain in PostgreSQL
This turn is explicitly not a storage side-quest. Storage, control surfaces, analytics support, and treasury funding visibility are included because they are required to make the trade loop trustworthy.
## Selected backlog items
- [I001] Hybrid reference-price service using Kraken stream and CoinGecko poll or fallback for the active pair inputs.
- [I002] PostgreSQL event store plus history writer for quotes, reference prices, decisions, commands, and execution results.
- [I003] Strategy engine that consumes `norm.swap_demand` plus reference prices and emits auditable decisions.
- [I004] Real Near Intents executor service using pre-funded internal inventory, with explicit arming and idempotent result reporting.
- [I006] Decision-to-command safety gate with explicit arm or disarm state, notional caps, and inventory freshness checks.
- [I007] Inventory-aware execution rule: implement both directions, but only fire the side backed by credited internal source-asset inventory.
- [I008] Inventory-sync service for NEAR Intents internal balances and pending funding state.
- [I009] Liquidity-manager service for deposit addresses, funding actions, and treasury visibility.
- [B002] No reference price source exists, so the system cannot estimate edge.
- [B003] Dummy reactor and dummy executor prevent a non-mocked trade path.
- [B004] The current plan assumed external hot wallets were on the hot trade path instead of pre-funded internal inventory.
## Architectural shape
### Event backbone
Kafka or Redpanda remains the backbone between services.
Required topic set for this turn:
- `raw.near_intents.quote`
- `norm.swap_demand`
- `ref.market_price`
- `state.intent_inventory`
- `ops.liquidity_action`
- `decision.trade_decision`
- `cmd.execute_trade`
- `exec.trade_result`
### Durable store
PostgreSQL is the first durable analytics and audit layer.
Why:
- enough write throughput for current scope
- strong queryability for replay, inspection, and debugging
- simple to operate
- better fit than inventing a warehouse right now
### Service split
The first real loop should be seven services:
- `near-intents-ingest`
- `market-reference-ingest`
- `inventory-sync`
- `liquidity-manager`
- `history-writer`
- `strategy-engine`
- `trade-executor`
## Service-by-service responsibilities
### 1. `near-intents-ingest`
Responsibilities:
- connect to the NEAR Intents quote stream
- filter or classify the active pair
- emit raw and normalized events
- expose runtime state
Inputs:
- NEAR Intents websocket
- pair filter config and runtime override
Outputs:
- `raw.near_intents.quote`
- `norm.swap_demand`
Control surface:
- `GET /healthz`
- `GET /state`
- `GET /pair-filter`
- `PUT /pair-filter`
- `POST /pair-filter/reset`
Important edge cases:
- websocket disconnect
- reconnect storm
- invalid JSON
- pair silent but connection healthy
### 2. `market-reference-ingest`
Responsibilities:
- subscribe to Kraken BTC/EUR pricing
- poll CoinGecko for fallback or cross-check pricing
- derive fair BTC/EURe price for both trade directions
- publish reference-price events
- expose latest source state and freshness
Inputs:
- Kraken stream
- CoinGecko HTTP API
Outputs:
- `ref.market_price`
Control surface:
- `GET /healthz`
- `GET /state`
- `POST /refresh`
- `POST /pause`
- `POST /resume`
Required state:
- latest Kraken price
- latest CoinGecko price
- derived fair rate
- freshness age
- source health flags
Important edge cases:
- Kraken disconnect
- CoinGecko timeout or rate limit
- both sources stale
- conflicting sources beyond tolerance
Required behavior:
- if Kraken is down but CoinGecko is fresh, degrade according to policy and record fallback usage
- if both are stale, mark the service unhealthy for decisioning
- the first implementation must make the EURe/EUR pricing assumption explicit:
- either treat EURe as 1:1 with EUR and record that plainly
- or use a separate sanity source and record the mapping logic
### 3. `inventory-sync`
Responsibilities:
- read current credited spendable inventory from NEAR Intents internal state
- distinguish credited balances from pending deposits and pending withdrawals
- publish current internal inventory state
- expose freshness and reconciliation state
Inputs:
- NEAR Intents inventory or verifier surfaces
- liquidity-manager state when relevant
Outputs:
- `state.intent_inventory`
Control surface:
- `GET /healthz`
- `GET /state`
- `POST /refresh`
- `POST /pause`
- `POST /resume`
Required state:
- spendable balances by asset
- pending inbound funding by asset
- pending outbound withdrawal by asset
- last sync time
- reconciliation status
Important edge cases:
- internal inventory surface unavailable
- external deposit seen but not yet credited internally
- credited balance lower than expected after funding
- stale inventory snapshot
Required behavior:
- only credited internal inventory counts as spendable
- pending treasury movements must remain non-spendable
- stale inventory state must be visible and actionable
### 4. `liquidity-manager`
Responsibilities:
- request and track deposit addresses or equivalent funding handles for treasury assets
- track treasury funding actions from external wallets into NEAR Intents
- track withdrawals and rebalance actions
- expose current funding pipeline state
- publish auditable liquidity action records
Inputs:
- treasury configuration
- external funding wallets
- NEAR Intents deposit or withdrawal surfaces
Outputs:
- `ops.liquidity_action`
Control surface:
- `GET /healthz`
- `GET /state`
- `POST /refresh`
- `POST /pause`
- `POST /resume`
- `POST /freeze-withdrawals`
Required state:
- active deposit addresses or funding handles
- recent funding attempts
- pending credits
- recent withdrawals
- rebalance state
Important edge cases:
- deposit address request failure
- external wallet funded but NEAR Intents credit delayed
- duplicate funding detection
- unsupported asset or chain mapping
Required behavior:
- treasury actions must remain visible and auditable
- funding must be decoupled from the per-trade hot path
### 5. `history-writer`
Responsibilities:
- consume the core topics
- write append-only rows into PostgreSQL
- preserve causality across quote, decision, and execution records
- expose lag and write state
Inputs:
- all core Kafka topics
Outputs:
- PostgreSQL rows
Control surface:
- `GET /healthz`
- `GET /state`
- `POST /pause`
- `POST /resume`
- `POST /drain`
Required stored record families:
- raw quotes
- normalized demand
- reference prices
- inventory snapshots
- liquidity actions
- trade decisions
- execute commands
- execution results
Important edge cases:
- PostgreSQL unavailable
- duplicate deliveries from Kafka
- offset commit mismatch after partial write
Required behavior:
- never claim success before the write is durable
- surface last committed offsets and last successful write time
### 6. `strategy-engine`
Responsibilities:
- join latest demand with latest price and inventory state
- compute implied quote rate
- compare it to fair rate
- apply freshness thresholds
- apply the initial 2% gross edge threshold
- apply max-notional and inventory checks
- emit auditable decision events
- emit `cmd.execute_trade` only when all gates pass and the system is armed
Inputs:
- `norm.swap_demand`
- `ref.market_price`
- `state.intent_inventory`
Outputs:
- `decision.trade_decision`
- `cmd.execute_trade`
Control surface:
- `GET /healthz`
- `GET /state`
- `POST /arm`
- `POST /disarm`
- `POST /pause`
- `POST /resume`
- `PUT /threshold`
- `PUT /limits`
Required decision state:
- arm state
- current threshold
- current notional cap
- latest decision
- latest rejected decision reason
- per-reason skip counters
Required decision record fields:
- `decision_id`
- `quote_id`
- `pair`
- `direction`
- `implied_rate`
- `reference_rate`
- `gross_edge_pct`
- `price_freshness_ms`
- `inventory_snapshot`
- `decision`
- `decision_reason`
Important edge cases:
- stale prices
- no price yet
- no inventory yet
- insufficient spendable inventory
- pending deposit exists but is not yet credited
- quote already expired
- repeated quote IDs
- one direction affordable and the other not
- fresh deploy starts armed by mistake
### 7. `trade-executor`
Responsibilities:
- consume `cmd.execute_trade`
- load the correct Near Intents signing authority
- perform the real Near Intents submission using pre-funded internal inventory
- preserve idempotency across retries and restarts
- emit `exec.trade_result`
Inputs:
- `cmd.execute_trade`
- signing secrets
- optional latest inventory state
Outputs:
- `exec.trade_result`
Control surface:
- `GET /healthz`
- `GET /state`
- `POST /arm`
- `POST /disarm`
- `POST /pause`
- `POST /resume`
- `POST /drain`
Required state:
- arm state
- last command seen
- last request sent
- last venue response
- in-flight command count
- completed command count
- error counters
Important edge cases:
- duplicate command delivery
- crash after submission but before result publish
- venue reject
- timeout with unknown outcome
- insufficient internal inventory detected late
- secret missing or invalid
- signer not authorized for the intended NEAR Intents account
Required behavior:
- never silently swallow a venue error
- always publish a result record for attempted commands
- make duplicate suppression durable
- never try to bridge or top up inventory during trade execution
- start disarmed by default on fresh deploy
## Persistence design
### Why PostgreSQL now
- current scale does not require a special time-series database
- rows are easier to inspect than a custom file format for this phase
- joins across decision, command, and result matter more right now than raw ingestion throughput
- treasury, inventory, and execution joins matter more than raw bus throughput at this stage
### Minimum tables or equivalent record families
- `raw_near_intents_quotes`
- `swap_demand_events`
- `market_price_events`
- `intent_inventory_snapshots`
- `liquidity_actions`
- `trade_decisions`
- `execute_trade_commands`
- `trade_execution_results`
### Query requirements
Must be able to answer:
- what was the latest fair price when this decision was made
- what spendable inventory existed when this decision was made
- what treasury funding actions were still pending
- why was this quote skipped
- what command was emitted for this quote
- what happened when the executor submitted it
- what credited internal inventory existed at the time
## Control and stop semantics
Every service must support inspection.
State endpoints must be sufficient to answer:
- is it healthy
- is it connected
- what is it currently using as input state
- what was the last successful action
- why is it blocked, if blocked
- whether the state is authoritative or stale
Stopping must be explicit:
- `pause` means stop taking new work but keep process alive
- `drain` means finish in-flight work then stop cleanly
- `disarm` means remain live and observable but refuse side effects
This matters because the operator must be able to halt strategy or execution without destroying the whole pipeline.
## Logging requirements
All services use structured JSON logs.
Stable top-level fields:
- `level`
- `service`
- `component`
- `event`
- `namespace`
- `venue`
- `topic`
- `pair`
Additional body fields when relevant:
- `quote_id`
- `decision_id`
- `command_id`
- `execution_id`
- `inventory_id`
- `liquidity_action_id`
- `gross_edge_pct`
- `price_freshness_ms`
Log when:
- source connection is lost or reestablished
- price source becomes stale
- inventory state becomes stale or recovers
- funding action is requested, seen, credited, delayed, failed, or frozen
- strategy rejects with a meaningful reason
- strategy arms or disarms
- executor arms or disarms
- command is submitted
- venue rejects or times out
- PostgreSQL disconnects or recovers
- control API changes state
Do not log every healthy message by default.
## Failure and failover behavior
### Reference pricing
- Kraken failure with fresh CoinGecko:
- allowed only if fallback policy says yes
- decision records must note fallback source use
- both sources stale:
- strategy blocks all execution
### Inventory state
- missing or stale inventory state:
- strategy may still emit a non-actionable rejected decision
- strategy must not emit `cmd.execute_trade`
- pending funding action:
- remains non-spendable
- must be visible in control state and durable records
- treasury action service down:
- already funded trading may continue if inventory-sync is fresh
- new funding or withdrawal operations must be blocked visibly
### Persistence
- PostgreSQL unavailable:
- history-writer unhealthy
- system must surface that audit history is impaired
- if the user wants strict mode, strategy or executor may be blocked until persistence returns
### Execution
- timeout with unknown venue outcome:
- emit explicit uncertain result
- preserve idempotency state for recovery
- restart after partial submission:
- executor must not blindly resubmit without checking prior state
- executor sees lower real inventory than strategy snapshot:
- emit explicit failure result
- do not attempt fallback treasury movement
## Testing and validation plan
### Unit tests
- implied-rate calculation for both directions
- explicit EURe/EUR pricing-basis test
- threshold checks
- stale-price blocking
- insufficient-inventory blocking
- pending-deposit-not-spendable blocking
- command emission only when armed
- idempotency transitions in executor
### Integration tests
- pricing service emits canonical reference-price events
- inventory-sync emits credited vs pending inventory state correctly
- liquidity-manager records funding actions and status transitions
- strategy consumes demand, pricing, and inventory and emits decision plus command as expected
- history-writer persists linked records across topics
- executor publishes result records for success and failure paths
### Runtime validation in cluster
- inspect each service `/state`
- verify live reference pricing updates
- verify live internal inventory state
- verify one treasury funding path from request or deposit tracking to credited inventory
- verify strategy and executor start disarmed on a fresh deploy
- observe a rejected decision due to block condition
- arm strategy and executor
- keep the first live max notional tiny, on the order of a few EURe, before any larger cap
- observe one command emission
- observe one real Near Intents execution attempt
- verify PostgreSQL contains the full linked chain
## Deliberately rejected for this turn
- dashboards
- broad multi-pair abstractions
- ML training infrastructure
- broad backtest framework
- polished operator UI
- warehouse or lakehouse design
## Expected deliverables
- `market-reference-ingest`
- `inventory-sync`
- `liquidity-manager`
- `history-writer`
- `strategy-engine`
- `trade-executor`
- PostgreSQL schema and migrations or equivalent setup
- updated Kafka topic creation and config
- shared control API pattern for all long-running services
- tests for strategy, inventory, persistence, and executor behavior
- docs for operations, arm or disarm flow, and inspection commands
## Validation target
This turn is only complete when the deployed system can, through repo-controlled services alone, take one live active-pair quote, price it, verify credited internal inventory, decide on it, gate it, submit a real Near Intents execution attempt, and preserve the full record chain in PostgreSQL.

View file

@ -1,356 +0,0 @@
# Implementation Proof: first non-mocked tradeable loop for one pair
Status: open
Opened: 2026-04-01
## Target outcome
The active turn is not complete when the repo merely observes demand or emits placeholder commands. It is complete only when the deployed system can make a real Near Intents trade attempt for the active BTC/Gnosis EURe pair using pre-funded inventory already credited inside NEAR Intents, with the full chain stored durably for later analytics and backtesting.
## Hypothesis
`unrip` becomes materially more real once one live NEAR swap-demand event can move through this full chain without mocks:
1. real quote and demand ingestion
2. live external reference pricing
3. synchronized spendable inventory state from NEAR Intents
4. auditable strategy decision
5. inventory and safety gating
6. real Near Intents execution attempt using credited internal inventory
7. durable storage of the full event chain
If any of those links is still dummy, manual-only, or opaque after the fact, the proof is not achieved.
## Active pair and trading rule
- Active pair:
- `nep141:btc.omft.near`
- `nep141:gnosis-0x420ca0f9b9b604ce0fd9c18ef134c705e5fa3430.omft.near`
- Both directions must be implemented in the same decision and execution pipeline:
- BTC -> EURe
- EURe -> BTC
- Runtime spendable inventory inside NEAR Intents decides which direction can actually fire.
- External Gnosis and BTC wallets are treasury and funding inputs, not the per-trade spend path.
- Pending deposits or withdrawals must not be treated as spendable inventory.
- Initial decision rule:
- use Kraken plus CoinGecko reference prices
- make the EURe/EUR pricing assumption explicit in code and decision records
- require a gross edge of at least 2%
- require fresh reference pricing
- require fresh internal inventory state
- require sufficient spendable source-asset inventory already credited inside NEAR Intents
- keep strategy and executor disarmed by default on deploy
- start with a very small max notional for the first live attempt
## Required services
### `near-intents-ingest`
Responsibilities:
- connect to the live NEAR Intents quote feed
- maintain the active pair filter
- publish raw venue quotes
- publish normalized swap-demand events
- expose current ingest state
Must not:
- make trading decisions
- execute trades
### `market-reference-ingest`
Responsibilities:
- maintain current reference pricing for BTC/EUR and mapped BTC/EURe fair value
- use Kraken as the primary fast path
- use CoinGecko as fallback or cross-check
- publish reference-price events
- expose latest price state, freshness, and source health
Must not:
- emit trade commands
### `inventory-sync`
Responsibilities:
- read current spendable inventory from the NEAR Intents internal ledger or equivalent supported inventory surface
- distinguish credited inventory from pending deposits and withdrawals
- publish or serve current internal inventory state
- expose freshness, last successful sync time, and reconciliation status
Must not:
- initiate treasury movements
- make trading decisions
### `liquidity-manager`
Responsibilities:
- request or manage deposit addresses for supported treasury assets and chains
- track funding actions from external treasury wallets into NEAR Intents
- track pending deposits, credited deposits, withdrawals, and rebalance actions
- expose the current funding and treasury state
- publish or record liquidity actions so they are auditable
Must not:
- execute trading strategy
- spend inventory on the trade hot path
### `history-writer`
Responsibilities:
- consume the core Kafka topics
- write append-only records into PostgreSQL
- preserve enough IDs and timestamps to reconstruct causality
- expose writer lag, last committed offsets, and write health
Must not:
- serve as the message bus
- make trading decisions
### `strategy-engine`
Responsibilities:
- consume normalized demand plus fresh reference prices
- consume fresh spendable inventory state
- compute implied rate from the quote
- compare quote rate against external reference pricing
- apply the initial 2% gross edge threshold
- apply freshness, arming, and notional checks
- apply inventory-aware direction gating
- emit auditable decision events
- emit `cmd.execute_trade` only when a decision is actionable
Must not:
- hold private keys
- execute venue actions directly
### `trade-executor`
Responsibilities:
- consume `cmd.execute_trade`
- load the correct Near Intents signing authority at runtime
- perform the actual Near Intents submission against pre-funded internal inventory
- preserve idempotency and duplicate suppression
- publish `exec.trade_result`
- expose current arm state, recent attempts, and last venue error
Must not:
- invent trading logic
- bypass Kafka and manual safety gates
- perform ad hoc bridging or treasury funding on the trade hot path
## Required durable storage
PostgreSQL is the first durable analytics and audit store.
It must store at least:
- raw quotes
- normalized demand
- reference-price snapshots
- internal inventory snapshots
- liquidity and funding actions
- strategy decisions
- trade commands
- execution results
- treasury status and reconciliation metadata
Why PostgreSQL first:
- fast enough for the current streaming volume
- queryable for inspection and analytics
- simple to operate in the current cluster
- good fit for append-only audit plus current-state views
Kafka remains the streaming backbone. PostgreSQL is not the hot transport path.
## Required control surface
Every long-running service in this turn must expose a small HTTP control surface. The point is not a polished UI. The point is operability.
At minimum every service must expose:
- `GET /healthz`
- `GET /state`
Services with runtime controls must also expose the relevant action endpoints:
### `near-intents-ingest`
- inspect:
- pair filter
- connection state
- frames received
- published counts
- control:
- update pair filter
- disable or reset pair filter
### `market-reference-ingest`
- inspect:
- latest Kraken price
- latest CoinGecko price
- derived fair price
- freshness age
- source health
- control:
- pause or resume polling or streaming
- trigger ad hoc refresh
### `inventory-sync`
- inspect:
- latest spendable balances by asset
- pending deposits and withdrawals
- last sync time
- reconciliation status
- control:
- trigger sync
- pause or resume sync
### `liquidity-manager`
- inspect:
- active deposit addresses
- recent funding actions
- pending deposits
- credited deposits
- recent withdrawals
- control:
- request or rotate deposit address where supported
- refresh treasury status
- pause or resume funding trackers
- disable withdrawals or rebalance actions
### `history-writer`
- inspect:
- last persisted offsets by topic
- last write time
- error count
- database connectivity
- control:
- pause writes
- resume writes
- drain and stop cleanly
### `strategy-engine`
- inspect:
- arm state
- active threshold
- latest reference snapshot used
- latest inventory snapshot used
- latest decisions and reasons
- skipped counts by reason
- control:
- arm or disarm
- pause or resume decisions
- change threshold
- set notional cap
### `trade-executor`
- inspect:
- arm state
- last command received
- last venue response
- last error
- in-flight and completed command counts
- control:
- arm or disarm execution
- pause consumption
- drain and stop cleanly
Stopping a service must mean a graceful stop or drain, not “kill the pod and hope.”
## Logging and observability requirements
All services must emit structured JSON logs with stable fields.
Stable fields:
- `level`
- `service`
- `component`
- `event`
- `namespace`
- `venue`
- `topic`
- `pair`
High-cardinality IDs belong in the body, not labels:
- `quote_id`
- `command_id`
- `decision_id`
- `execution_id`
What must be logged:
- connection loss and recovery
- source stale state
- inventory-sync stale state
- treasury funding action requested, credited, failed, or stuck
- invalid messages
- decision accepted and decision rejected with reason
- arm and disarm actions
- execution submitted, rejected, failed, recovered, completed
- PostgreSQL disconnect and recovery
- control API failures
What should not be logged:
- per-message noise without state change
- full payload spam by default
## Required edge cases and failure handling
- stale reference prices must block decisions
- stale or missing internal inventory state must block execution
- insufficient credited inventory must block the affected direction only
- pending deposits must not be counted as spendable inventory
- deposit address created but never funded must not be mistaken for liquidity
- external transfer observed but not yet credited in NEAR Intents must stay non-spendable
- credited inventory drift between inventory-sync and executor must hard-fail the command and publish a result
- Kraken down but CoinGecko healthy should degrade, not crash, if policy allows fallback
- both reference sources stale must block decisions
- PostgreSQL down must surface a hard health failure and stop claiming the system is trade-ready
- liquidity-manager failure must block new funding operations but must not silently stop already funded trading
- duplicate `cmd.execute_trade` must not cause duplicate venue submission
- executor restart after a partial submission must preserve idempotency behavior
- Near Intents API or RPC failures must become explicit `trade_result` failure records
- pair inactivity must not be mistaken for pipeline breakage
## Definition of done
- The deployed cluster is running all required services for this loop.
- Live Kraken and CoinGecko reference prices are flowing and inspectable.
- Spendable inventory inside NEAR Intents is flowing and inspectable.
- At least one treasury funding path is documented and verified:
- deposit address or supported funding path created
- funding action observed
- credited inventory visible in internal state
- PostgreSQL contains the full event chain for at least one real quote path.
- Strategy decisions are non-dummy and include explicit reason fields with edge, freshness, and inventory context.
- Both trade directions are implemented in the same decision and execution path.
- At least one direction can be armed based on available credited inventory.
- A real Near Intents execution attempt can be triggered through the repo-controlled path.
- The resulting venue response is captured durably and inspectably.
- Each service exposes the required health and state endpoints, and controlled pause or arm semantics where appropriate.
- Fresh deploys start disarmed by default and require explicit operator arming before side effects.
- Validation includes not only happy path but blocked-path evidence:
- stale reference blocked
- insufficient inventory blocked
- pending-deposit-not-spendable blocked
- disarmed executor blocked
## Current real
- Real NEAR Intents quote data is flowing into the cluster.
- The pair filter can be configured and changed at runtime.
- Raw and normalized events already exist in Redpanda topics.
- The app is deployable and observable in Kubernetes.
- Kafka already provides the event backbone.
## Current fake or incomplete
- Reference pricing does not exist yet.
- Spendable internal inventory sync does not exist yet.
- Treasury funding or deposit management does not exist yet.
- Strategy decisions are still dummy placeholders.
- Execution is still dummy.
- The current executor is not a real Near Intents adapter.
- PostgreSQL is not yet part of the loop.
- Most services do not yet have their own control surfaces.
- Real signing and treasury secret handling is not yet wired.
## Failure conditions
This proof fails if any of the following is true:
- a trade can only happen via manual shell steps or out-of-band operator intervention
- the system depends on ad hoc bridging or external-wallet spending on the trade hot path
- decision logic is still embedded in a dummy or placeholder service
- executor logic is still simulated
- a service cannot be inspected or paused cleanly at runtime
- the system cannot explain why a quote did or did not become a trade
- the stored history cannot reconstruct quote, pricing, decision, command, and result
- both directions are not implemented
- the system attempts to execute without credited internal source inventory
## Expected validation evidence
- live `ref.market_price` events and current state from the pricing control API
- live internal inventory state from the inventory-sync control API
- PostgreSQL rows linking quote, reference, inventory snapshot, decision, command, and execution result
- a blocked decision caused by stale price or insufficient inventory
- a funding action that becomes credited inventory
- a successful command emission from the strategy service with a recorded edge above 2%
- one real Near Intents execution attempt with stored request and response metadata

View file

@ -1 +0,0 @@

View file

@ -99,15 +99,6 @@ services:
condition: service_healthy
restart: unless-stopped
ops-sentinel:
build: .
command: ["node", "src/apps/ops-sentinel.mjs"]
env_file: [.env]
depends_on:
redpanda:
condition: service_healthy
restart: unless-stopped
strategy-engine:
build: .
command: ["node", "src/apps/strategy-engine.mjs"]

View file

@ -16,7 +16,7 @@ spec:
- |
set -eu
BROKERS="redpanda.unrip.svc.cluster.local:9092"
TOPICS="raw.near_intents.quote norm.swap_demand ref.market_price state.intent_inventory ops.liquidity_action ops.funding_observation ops.alert decision.trade_decision cmd.execute_trade exec.trade_result"
TOPICS="raw.near_intents.quote norm.swap_demand ref.market_price state.intent_inventory ops.liquidity_action decision.trade_decision cmd.execute_trade exec.trade_result"
RETENTION_MS="172800000"
RETENTION_BYTES="268435456"

View file

@ -38,8 +38,6 @@ data:
STRATEGY_ENGINE_CONTROL_PORT: "8086"
TRADE_EXECUTOR_CONTROL_HOST: 0.0.0.0
TRADE_EXECUTOR_CONTROL_PORT: "8087"
OPS_SENTINEL_CONTROL_HOST: 0.0.0.0
OPS_SENTINEL_CONTROL_PORT: "8088"
KAFKA_BROKERS: redpanda.unrip.svc.cluster.local:9092
KAFKA_CLIENT_ID: unrip
KAFKA_TOPIC_RAW_NEAR_INTENTS_QUOTE: raw.near_intents.quote
@ -47,8 +45,6 @@ data:
KAFKA_TOPIC_REF_MARKET_PRICE: ref.market_price
KAFKA_TOPIC_STATE_INTENT_INVENTORY: state.intent_inventory
KAFKA_TOPIC_OPS_LIQUIDITY_ACTION: ops.liquidity_action
KAFKA_TOPIC_OPS_FUNDING_OBSERVATION: ops.funding_observation
KAFKA_TOPIC_OPS_ALERT: ops.alert
KAFKA_TOPIC_DECISION_TRADE_DECISION: decision.trade_decision
KAFKA_TOPIC_CMD_EXECUTE_TRADE: cmd.execute_trade
KAFKA_TOPIC_EXEC_TRADE_RESULT: exec.trade_result
@ -56,7 +52,6 @@ data:
KAFKA_CONSUMER_GROUP_INVENTORY: inventory-sync-v1
KAFKA_CONSUMER_GROUP_STRATEGY: strategy-engine-v1
KAFKA_CONSUMER_GROUP_EXECUTOR: trade-executor-v1
KAFKA_CONSUMER_GROUP_OPS_SENTINEL: ops-sentinel-v1
EXECUTOR_STATE_DIR: /var/lib/unrip/executor-state
LIQUIDITY_STATE_DIR: /var/lib/unrip/liquidity-state
MARKET_REFERENCE_REFRESH_MS: "5000"
@ -74,14 +69,6 @@ data:
EXECUTOR_INITIAL_ARMED: "false"
EXECUTOR_RESPONSE_TIMEOUT_MS: "10000"
LIQUIDITY_WITHDRAWALS_FROZEN: "true"
BTC_FUNDING_OBSERVER_ENABLED: "true"
BTC_FUNDING_OBSERVER_BASE_URL: https://mempool.space/api
FUNDING_OBSERVATION_STUCK_MS: "3600000"
OPS_SENTINEL_EVALUATION_MS: "5000"
OPS_SENTINEL_PRICE_STALE_MS: "30000"
OPS_SENTINEL_INVENTORY_STALE_MS: "30000"
OPS_SENTINEL_FUNDING_CREDIT_PENDING_MS: "300000"
OPS_SENTINEL_FUNDING_STUCK_MS: "3600000"
---
apiVersion: v1
kind: PersistentVolumeClaim
@ -274,38 +261,6 @@ spec:
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: ops-sentinel
namespace: unrip
spec:
replicas: 1
selector:
matchLabels:
app: ops-sentinel
template:
metadata:
labels:
app: ops-sentinel
app.kubernetes.io/part-of: unrip
spec:
imagePullSecrets:
- name: unrip-registry-creds
containers:
- name: app
image: ghcr.io/example/unrip:bootstrap
imagePullPolicy: IfNotPresent
command: ["node", "src/apps/ops-sentinel.mjs"]
ports:
- name: control-api
containerPort: 8088
envFrom:
- configMapRef:
name: unrip-config
- secretRef:
name: unrip-secrets
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: strategy-engine
namespace: unrip

View file

@ -3,8 +3,6 @@ norm.swap_demand
ref.market_price
state.intent_inventory
ops.liquidity_action
ops.funding_observation
ops.alert
decision.trade_decision
cmd.execute_trade
exec.trade_result

View file

@ -9,7 +9,6 @@
"inventory:sync": "node src/apps/inventory-sync.mjs",
"liquidity:manager": "node src/apps/liquidity-manager.mjs",
"history:writer": "node src/apps/history-writer.mjs",
"ops:sentinel": "node src/apps/ops-sentinel.mjs",
"strategy:engine": "node src/apps/strategy-engine.mjs",
"trade:executor": "node src/apps/trade-executor.mjs",
"start": "node index.mjs",

View file

@ -1,14 +0,0 @@
# Research Turn
Status: idle
No approved research turn is active yet.
When opening one, capture:
- charter
- hypothesis
- dataset or source of truth
- metrics
- assumptions
- falsification condition
- expected artifact paths under `research/experiments/`

View file

@ -1,9 +0,0 @@
# Research Queue
This queue holds strategy and analysis questions that are not yet an active research turn.
## Candidate charters
- Determine which Kraken symbols and CoinGecko asset IDs give a trustworthy pricing basis for the active pair.
- Measure how often the active pair's implied rate diverges from external reference prices after a simple fee model.
- Test whether stale quotes correlate with worse downstream execution quality or higher reject rates.
- Import the local historical EUR/BTC data and decide how it should seed replay and backtesting.

View file

@ -10,7 +10,7 @@ FORGEJO_REMOTE_NAME="${FORGEJO_REMOTE_NAME:-forgejo}"
PROJECT_NAME="${PROJECT_NAME:-unrip}"
PROJECT_NAMESPACE="${PROJECT_NAMESPACE:-$PROJECT_NAME}"
PROJECT_DEPLOYMENTS="${PROJECT_DEPLOYMENTS:-near-intents-ingest,market-reference-ingest,liquidity-manager,inventory-sync,history-writer,ops-sentinel,strategy-engine,trade-executor}"
PROJECT_DEPLOYMENTS="${PROJECT_DEPLOYMENTS:-near-intents-ingest,market-reference-ingest,liquidity-manager,inventory-sync,history-writer,strategy-engine,trade-executor}"
PROJECT_REGISTRY_SECRET_NAME="${PROJECT_REGISTRY_SECRET_NAME:-${PROJECT_NAME}-registry-creds}"
APP_SECRET_NAME="${APP_SECRET_NAME:-${PROJECT_NAME}-secrets}"
SYNC_FORGEJO_REMOTE="${SYNC_FORGEJO_REMOTE:-1}"

View file

@ -1,43 +0,0 @@
#!/usr/bin/env python3
from __future__ import annotations
import argparse
from common import BACKLOG_PATH, BACKLOG_SECTION, insert_into_section, load_text, next_backlog_id, save_text
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Append an item to BACKLOG.md.")
parser.add_argument(
"--lane",
required=True,
choices=("implementation", "research", "ops", "bug"),
help="Backlog section to append to.",
)
parser.add_argument("--summary", required=True, help="One-line backlog summary.")
parser.add_argument(
"--priority",
default="soon",
choices=("now", "soon", "later"),
help="Coarse urgency label.",
)
parser.add_argument(
"--tags",
default="",
help="Comma-separated tags stored inline for grepability.",
)
return parser.parse_args()
def main() -> None:
args = parse_args()
item_id = next_backlog_id(args.lane)
tags = f" tags={args.tags}" if args.tags else ""
entry = f"- [{item_id}] ({args.priority}) {args.summary}{tags}"
updated = insert_into_section(load_text(BACKLOG_PATH), BACKLOG_SECTION[args.lane], entry)
save_text(BACKLOG_PATH, updated)
print(item_id)
if __name__ == "__main__":
main()

View file

@ -1,150 +0,0 @@
#!/usr/bin/env python3
from __future__ import annotations
import argparse
import re
import shutil
from pathlib import Path
from common import (
ARCHIVE_PATH,
IMPLEMENTATION_ARCHIVE_DIR,
IMPLEMENTATION_PATH,
PROOF_PATH,
RESEARCH_ACTIVE_PATH,
RESEARCH_ARCHIVE_DIR,
append_archive_line,
git_commit,
load_text,
save_text,
slugify,
timestamp_slug,
today_iso,
)
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Archive and close the current implementation or research turn.")
parser.add_argument("--lane", required=True, choices=("implementation", "research"))
parser.add_argument(
"--status",
required=True,
choices=("passed", "failed", "paused", "abandoned"),
help="Outcome of the turn.",
)
parser.add_argument("--summary", required=True, help="One-line closure summary.")
parser.add_argument(
"--commit",
action="store_true",
help="Commit the archive change automatically.",
)
return parser.parse_args()
def title_from_content(content: str, prefix: str) -> str:
match = re.search(rf"^# {re.escape(prefix)}: (.+)$", content, flags=re.MULTILINE)
if not match:
raise SystemExit(f"could not find title for {prefix.lower()}")
return match.group(1).strip()
def close_implementation_turn(status: str, summary: str) -> tuple[str, list[Path]]:
proof_content = load_text(PROOF_PATH)
implementation_content = load_text(IMPLEMENTATION_PATH)
title = title_from_content(proof_content, "Implementation Proof")
slug = slugify(title)
stamp = timestamp_slug()
IMPLEMENTATION_ARCHIVE_DIR.mkdir(parents=True, exist_ok=True)
proof_archive = IMPLEMENTATION_ARCHIVE_DIR / f"{stamp}-{slug}-proof.md"
implementation_archive = IMPLEMENTATION_ARCHIVE_DIR / f"{stamp}-{slug}-implementation.md"
shutil.copyfile(PROOF_PATH, proof_archive)
shutil.copyfile(IMPLEMENTATION_PATH, implementation_archive)
save_text(
PROOF_PATH,
"""# Implementation Proof
Status: idle
No approved implementation proof is active yet.
""",
)
save_text(
IMPLEMENTATION_PATH,
"""# Implementation Turn
Status: idle
No approved implementation turn is active yet.
""",
)
append_archive_line(
"implementation",
f"- {today_iso()}: `{slug}` closed with status `{status}`. {summary}",
)
return title, [proof_archive, implementation_archive]
def close_research_turn(status: str, summary: str) -> tuple[str, list[Path]]:
research_content = load_text(RESEARCH_ACTIVE_PATH)
title = title_from_content(research_content, "Research Turn")
slug = slugify(title)
stamp = timestamp_slug()
RESEARCH_ARCHIVE_DIR.mkdir(parents=True, exist_ok=True)
research_archive = RESEARCH_ARCHIVE_DIR / f"{stamp}-{slug}.md"
shutil.copyfile(RESEARCH_ACTIVE_PATH, research_archive)
save_text(
RESEARCH_ACTIVE_PATH,
"""# Research Turn
Status: idle
No approved research turn is active yet.
""",
)
append_archive_line(
"research",
f"- {today_iso()}: `{slug}` closed with status `{status}`. {summary}",
)
return title, [research_archive]
def main() -> None:
args = parse_args()
if args.lane == "implementation" and "Status: idle" in load_text(PROOF_PATH):
raise SystemExit("no active implementation turn to close")
if args.lane == "research" and "Status: idle" in load_text(RESEARCH_ACTIVE_PATH):
raise SystemExit("no active research turn to close")
if args.lane == "implementation":
title, archived_paths = close_implementation_turn(args.status, args.summary)
else:
title, archived_paths = close_research_turn(args.status, args.summary)
if args.commit:
paths = [ARCHIVE_PATH]
if args.lane == "implementation":
paths.extend([PROOF_PATH, IMPLEMENTATION_PATH])
else:
paths.append(RESEARCH_ACTIVE_PATH)
paths.extend(archived_paths)
git_commit(
f"""Archive {args.lane} turn: {title}
Proof: Preserve the completed {args.lane} turn and record its outcome in the tracked archive.
Assumptions: The archived files capture the relevant planning state for the completed turn.
Still fake: Archiving does not validate the work by itself; external evidence still governs whether the result is trustworthy.""",
paths=paths,
)
for archived_path in archived_paths:
print(archived_path.relative_to(RESEARCH_ACTIVE_PATH.parent.parent))
if __name__ == "__main__":
main()

View file

@ -1,154 +0,0 @@
#!/usr/bin/env python3
from __future__ import annotations
import re
import subprocess
from datetime import datetime, timezone
from pathlib import Path
ROOT_DIR = Path(__file__).resolve().parents[2]
BACKLOG_PATH = ROOT_DIR / "BACKLOG.md"
ARCHIVE_PATH = ROOT_DIR / "ARCHIVE.md"
PROOF_PATH = ROOT_DIR / "PROOF.md"
IMPLEMENTATION_PATH = ROOT_DIR / "IMPLEMENTATION.md"
RESEARCH_ACTIVE_PATH = ROOT_DIR / "research/ACTIVE.md"
IMPLEMENTATION_ARCHIVE_DIR = ROOT_DIR / "archive/implementation"
RESEARCH_ARCHIVE_DIR = ROOT_DIR / "archive/research"
LANE_PREFIX = {
"implementation": "I",
"research": "R",
"ops": "O",
"bug": "B",
}
BACKLOG_SECTION = {
"implementation": "## Implementation Candidates",
"research": "## Research Candidates",
"ops": "## Ops Candidates",
"bug": "## Bugs",
}
ARCHIVE_SECTION = {
"implementation": "## Implementation Turns",
"research": "## Research Turns",
"planning": "## Planning Events",
}
def now_utc() -> datetime:
return datetime.now(timezone.utc)
def today_iso() -> str:
return now_utc().date().isoformat()
def timestamp_slug() -> str:
return now_utc().strftime("%Y%m%dT%H%M%SZ")
def slugify(value: str) -> str:
slug = re.sub(r"[^a-z0-9]+", "-", value.lower()).strip("-")
return slug or "turn"
def load_text(path: Path) -> str:
return path.read_text(encoding="utf-8")
def save_text(path: Path, content: str) -> None:
path.write_text(content.rstrip() + "\n", encoding="utf-8")
def insert_into_section(document: str, heading: str, block: str) -> str:
lines = document.splitlines()
try:
start = lines.index(heading)
except ValueError as exc:
raise SystemExit(f"heading not found: {heading}") from exc
end = len(lines)
for idx in range(start + 1, len(lines)):
if lines[idx].startswith("## "):
end = idx
break
insert_at = end
return "\n".join(lines[:insert_at] + [block] + lines[insert_at:]) + "\n"
def append_archive_line(section: str, line: str) -> None:
content = load_text(ARCHIVE_PATH)
updated = insert_into_section(content, ARCHIVE_SECTION[section], line)
save_text(ARCHIVE_PATH, updated)
def read_backlog_lines() -> list[str]:
return load_text(BACKLOG_PATH).splitlines()
def write_backlog_lines(lines: list[str]) -> None:
save_text(BACKLOG_PATH, "\n".join(lines))
def next_backlog_id(lane: str) -> str:
prefix = LANE_PREFIX[lane]
pattern = re.compile(rf"\[{re.escape(prefix)}(\d+)\]")
highest = 0
for line in read_backlog_lines():
match = pattern.search(line)
if match:
highest = max(highest, int(match.group(1)))
return f"{prefix}{highest + 1:03d}"
def backlog_entry_map() -> dict[str, str]:
entries: dict[str, str] = {}
pattern = re.compile(r"- \[([A-Z]\d+)\] (.+)")
for line in read_backlog_lines():
match = pattern.match(line)
if match:
entries[match.group(1)] = match.group(2)
return entries
def remove_backlog_ids(ids: list[str]) -> None:
id_set = set(ids)
pattern = re.compile(r"- \[([A-Z]\d+)\] ")
kept: list[str] = []
for line in read_backlog_lines():
match = pattern.match(line)
if match and match.group(1) in id_set:
continue
kept.append(line)
write_backlog_lines(kept)
def git_has_changes() -> bool:
result = subprocess.run(
["git", "-C", str(ROOT_DIR), "status", "--porcelain"],
check=True,
capture_output=True,
text=True,
)
return bool(result.stdout.strip())
def path_has_changes(paths: list[Path]) -> bool:
rel_paths = [str(path.relative_to(ROOT_DIR)) for path in paths]
result = subprocess.run(
["git", "-C", str(ROOT_DIR), "status", "--porcelain", "--", *rel_paths],
check=True,
capture_output=True,
text=True,
)
return bool(result.stdout.strip())
def git_commit(message: str, paths: list[Path]) -> None:
if not path_has_changes(paths):
return
rel_paths = [str(path.relative_to(ROOT_DIR)) for path in paths]
subprocess.run(["git", "-C", str(ROOT_DIR), "add", "--", *rel_paths], check=True)
subprocess.run(["git", "-C", str(ROOT_DIR), "commit", "-m", message], check=True)

View file

@ -1,9 +0,0 @@
#!/usr/bin/env bash
set -euo pipefail
ROOT_DIR="$(cd "$(dirname "$0")/../.." && pwd)"
chmod +x "$ROOT_DIR/.githooks/commit-msg"
git -C "$ROOT_DIR" config core.hooksPath .githooks
echo "Installed tracked git hooks for $ROOT_DIR"

View file

@ -1,176 +0,0 @@
#!/usr/bin/env python3
from __future__ import annotations
import argparse
from common import (
BACKLOG_PATH,
ARCHIVE_PATH,
IMPLEMENTATION_PATH,
PROOF_PATH,
RESEARCH_ACTIVE_PATH,
append_archive_line,
backlog_entry_map,
git_commit,
remove_backlog_ids,
save_text,
slugify,
today_iso,
)
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Open a new implementation or research turn.")
parser.add_argument("--lane", required=True, choices=("implementation", "research"))
parser.add_argument("--title", required=True, help="Turn title.")
parser.add_argument("--summary", required=True, help="One-line proof or charter summary.")
parser.add_argument(
"--pick",
action="append",
default=[],
help="Backlog ID to pull into the turn. Repeat as needed.",
)
parser.add_argument(
"--commit",
action="store_true",
help="Commit the planning change automatically.",
)
parser.add_argument(
"--force",
action="store_true",
help="Replace an already-open turn instead of refusing.",
)
return parser.parse_args()
def render_selected(ids: list[str], items: dict[str, str]) -> str:
if not ids:
return "- none selected"
return "\n".join(f"- [{item_id}] {items[item_id]}" for item_id in ids)
def open_implementation_turn(title: str, summary: str, selected: str) -> None:
opened = today_iso()
save_text(
PROOF_PATH,
f"""# Implementation Proof: {title}
Status: open
Opened: {opened}
## Hypothesis
{summary}
## Scope
{selected}
## Non-goals
- unchanged from `THESIS.md` unless the user approves otherwise
## Definition of done
- current turn implementation is validated with direct evidence
- remaining fakes are listed plainly
""",
)
save_text(
IMPLEMENTATION_PATH,
f"""# Implementation Turn: {title}
Status: open
Opened: {opened}
## Goal
{summary}
## Selected backlog items
{selected}
## Notes
- Fill in the concrete implementation plan before coding if the live plan no longer matches the turn.
""",
)
def open_research_turn(title: str, summary: str, selected: str) -> None:
opened = today_iso()
save_text(
RESEARCH_ACTIVE_PATH,
f"""# Research Turn: {title}
Status: open
Opened: {opened}
## Charter
{summary}
## Selected backlog items
{selected}
## Hypothesis
TBD by the approved research turn.
## Dataset or source of truth
TBD by the approved research turn.
## Metrics
TBD by the approved research turn.
## Assumptions
TBD by the approved research turn.
## Falsification condition
TBD by the approved research turn.
""",
)
def main() -> None:
args = parse_args()
if args.lane == "implementation":
if "Status: open" in PROOF_PATH.read_text(encoding="utf-8") and not args.force:
raise SystemExit("implementation turn already open; close it first or pass --force")
else:
if "Status: open" in RESEARCH_ACTIVE_PATH.read_text(encoding="utf-8") and not args.force:
raise SystemExit("research turn already open; close it first or pass --force")
entries = backlog_entry_map()
missing = [item_id for item_id in args.pick if item_id not in entries]
if missing:
raise SystemExit(f"unknown backlog IDs: {', '.join(missing)}")
selected = render_selected(args.pick, entries)
if args.lane == "implementation":
open_implementation_turn(args.title, args.summary, selected)
else:
open_research_turn(args.title, args.summary, selected)
if args.pick:
remove_backlog_ids(args.pick)
append_archive_line(
"planning",
(
f"- {today_iso()}: opened {args.lane} turn `{slugify(args.title)}` "
f"from backlog items {', '.join(args.pick) if args.pick else 'none'}."
),
)
if args.commit:
commit_paths = [BACKLOG_PATH, ARCHIVE_PATH]
if args.lane == "implementation":
commit_paths.extend([PROOF_PATH, IMPLEMENTATION_PATH])
else:
commit_paths.append(RESEARCH_ACTIVE_PATH)
git_commit(
f"""Open {args.lane} turn: {args.title}
Proof: Establish the approved {args.lane} turn and move selected backlog items into active scope.
Assumptions: The selected backlog items are the approved scope for this turn.
Still fake: Opening a turn changes planning state only; the work itself is not implemented yet.""",
paths=commit_paths,
)
if __name__ == "__main__":
main()

View file

@ -1,37 +0,0 @@
#!/usr/bin/env bash
set -euo pipefail
ROOT_DIR="$(cd "$(dirname "$0")/../.." && pwd)"
PROMPT_FILE="$ROOT_DIR/workflow/REVIEW_PROMPT.md"
RANGE="${1:-HEAD~1}"
if [[ "${1:-}" == "--help" ]]; then
cat <<'EOF'
Usage: bash scripts/workflow/review_diff.sh [git-diff-range]
Examples:
bash scripts/workflow/review_diff.sh HEAD~1
bash scripts/workflow/review_diff.sh main...HEAD
EOF
exit 0
fi
cat <<EOF
# Review Bundle
## Diff range
\`$RANGE\`
## Diff
\`\`\`diff
EOF
git -C "$ROOT_DIR" diff "$RANGE"
cat <<'EOF'
```
## Prompt
EOF
cat "$PROMPT_FILE"

View file

@ -41,8 +41,6 @@ const topics = [
config.kafkaTopicRefMarketPrice,
config.kafkaTopicStateIntentInventory,
config.kafkaTopicOpsLiquidityAction,
config.kafkaTopicOpsFundingObservation,
config.kafkaTopicOpsAlert,
config.kafkaTopicDecisionTradeDecision,
config.kafkaTopicCmdExecuteTrade,
config.kafkaTopicExecTradeResult,
@ -62,8 +60,6 @@ const state = {
paused: false,
draining: false,
last_write_at: null,
last_funding_observation_write_at: null,
last_alert_write_at: null,
last_metrics_at: null,
last_error: null,
error_count: 0,
@ -96,12 +92,6 @@ await consumer.run({
partition,
offset: message.offset,
};
if (topic === config.kafkaTopicOpsFundingObservation) {
state.last_funding_observation_write_at = state.last_write_at;
}
if (topic === config.kafkaTopicOpsAlert) {
state.last_alert_write_at = state.last_write_at;
}
if (portfolioMetricTopics.has(topic)) {
try {
await refreshPortfolioMetrics();

View file

@ -4,14 +4,9 @@ import { createConsumer } from '../bus/kafka/consumer.mjs';
import { createProducer } from '../bus/kafka/producer.mjs';
import { startControlApi } from '../core/control-api.mjs';
import { buildEventEnvelope, parseEventMessage } from '../core/event-envelope.mjs';
import { buildFundingVisibility } from '../core/funding-observations.mjs';
import { buildInventorySnapshot } from '../core/inventory.mjs';
import { createLogger, serializeError } from '../core/log.mjs';
import {
assertFundingObservationEvent,
assertInventorySnapshotEvent,
assertLiquidityActionEvent,
} from '../core/schemas.mjs';
import { assertInventorySnapshotEvent, assertLiquidityActionEvent } from '../core/schemas.mjs';
import { loadConfig } from '../lib/config.mjs';
import { createNearBridgeClient } from '../venues/near-intents/bridge-client.mjs';
import { createVerifierClient } from '../venues/near-intents/verifier-client.mjs';
@ -53,13 +48,6 @@ const consumer = await createConsumer({
const state = {
paused: false,
tracked_withdrawals: {},
funding_observations: {},
funding_visibility: {
last_observed_at: null,
pre_credit_inbound: {},
by_asset: {},
by_handle: {},
},
last_snapshot: null,
last_sync_at: null,
last_error: null,
@ -67,42 +55,29 @@ const state = {
};
await consumer.subscribe({ topic: config.kafkaTopicOpsLiquidityAction, fromBeginning: true });
await consumer.subscribe({ topic: config.kafkaTopicOpsFundingObservation, fromBeginning: true });
await consumer.run({
eachMessage: async ({ topic, message }) => {
eachMessage: async ({ message }) => {
if (!message.value) return;
try {
const event = parseEventMessage(message.value.toString());
if (topic === config.kafkaTopicOpsLiquidityAction) {
assertLiquidityActionEvent(event);
if (event.payload.action_type === 'withdrawal_tracked'
|| event.payload.action_type === 'withdrawal_status_changed') {
const details = event.payload.details || {};
if (details.withdrawal_hash) {
state.tracked_withdrawals[details.withdrawal_hash] = {
withdrawal_hash: details.withdrawal_hash,
asset_id: details.asset_id,
chain: details.chain,
amount: String(details.amount || '0'),
status: details.status || event.payload.status,
address: details.address || null,
};
}
assertLiquidityActionEvent(event);
if (event.payload.action_type === 'withdrawal_tracked'
|| event.payload.action_type === 'withdrawal_status_changed') {
const details = event.payload.details || {};
if (details.withdrawal_hash) {
state.tracked_withdrawals[details.withdrawal_hash] = {
withdrawal_hash: details.withdrawal_hash,
asset_id: details.asset_id,
chain: details.chain,
amount: String(details.amount || '0'),
status: details.status || event.payload.status,
address: details.address || null,
};
}
return;
}
if (topic === config.kafkaTopicOpsFundingObservation) {
assertFundingObservationEvent(event);
state.funding_observations[event.payload.funding_observation_id] = event.payload;
state.funding_visibility = buildFundingVisibility(
Object.values(state.funding_observations),
{ now: new Date().toISOString() },
);
}
} catch (error) {
logger.error('inventory_side_input_consume_failed', {
topic,
logger.error('liquidity_action_consume_failed', {
topic: config.kafkaTopicOpsLiquidityAction,
details: {
error: serializeError(error),
},
@ -187,10 +162,6 @@ const controlApi = startControlApi({
return {
account_id: config.nearIntentsAccountId,
...state,
funding_visibility: buildFundingVisibility(
Object.values(state.funding_observations),
{ now: new Date().toISOString() },
),
};
},
},

View file

@ -3,20 +3,12 @@ import process from 'node:process';
import { createProducer } from '../bus/kafka/producer.mjs';
import { startControlApi } from '../core/control-api.mjs';
import { buildEventEnvelope } from '../core/event-envelope.mjs';
import {
buildFundingObservationKey,
correlateFundingObservation,
hasFundingObservationChanged,
matchBridgeDeposit,
summarizeFundingObservations,
} from '../core/funding-observations.mjs';
import { createJsonStateStore } from '../core/json-state-store.mjs';
import { normalizeLiquidityState } from '../core/liquidity-state.mjs';
import { buildBridgeWithdrawalPlan } from '../core/liquidity-withdrawals.mjs';
import { createLogger, serializeError } from '../core/log.mjs';
import { assertFundingObservationEvent, assertLiquidityActionEvent } from '../core/schemas.mjs';
import { assertLiquidityActionEvent } from '../core/schemas.mjs';
import { loadConfig } from '../lib/config.mjs';
import { createBtcAddressObserver } from '../observers/btc-address-observer.mjs';
import { createNearBridgeClient } from '../venues/near-intents/bridge-client.mjs';
import { createVerifierClient } from '../venues/near-intents/verifier-client.mjs';
@ -49,37 +41,21 @@ const verifierClient = createVerifierClient({
accountId: config.nearIntentsAccountId,
signerPrivateKey: config.nearIntentsSignerPrivateKey,
});
const btcAddressObserver = config.btcFundingObserverEnabled
? createBtcAddressObserver({
baseUrl: config.btcFundingObserverBaseUrl,
})
: null;
const store = createJsonStateStore({
stateDir: config.liquidityStateDir,
fileName: 'liquidity.json',
initialState: {
paused: false,
funding_observer_paused: false,
withdrawals_frozen: config.withdrawalsFrozen,
deposit_addresses: {},
deposits: {},
tracked_withdrawals: {},
supported_tokens: {},
funding_observations: {},
funding_observations_by_handle: {},
funding_visibility_by_asset: {},
uncredited_funding_total_by_asset: {},
credit_correlation: {},
observer_health: {},
last_refresh_at: null,
last_funding_observation_at: null,
funding_observer_last_refresh_at: null,
funding_observer_last_error: null,
last_error: null,
last_withdrawal_request: null,
last_withdrawal_result: null,
publish_count: 0,
funding_publish_count: 0,
},
});
@ -89,10 +65,6 @@ const assetsByChain = new Map([
[config.tradingEure.chain, config.tradingEure.assetId],
]);
const fundingObserverByChain = new Map(
btcAddressObserver ? [[config.tradingBtc.chain, btcAddressObserver]] : [],
);
async function refresh() {
const state = normalizeLiquidityState(store.getState(), {
withdrawalsFrozen: config.withdrawalsFrozen,
@ -113,7 +85,6 @@ async function refresh() {
state.last_refresh_at = new Date().toISOString();
state.last_error = null;
applyFundingObservationSummary(state, state.last_refresh_at);
store.setState(state);
} catch (error) {
state.last_error = serializeError(error);
@ -132,12 +103,11 @@ async function refreshChain(chain, state) {
accountId: config.nearIntentsAccountId,
chain,
});
const refreshedAt = new Date().toISOString();
const previousAddress = state.deposit_addresses[chain]?.address || null;
state.deposit_addresses[chain] = {
...(state.deposit_addresses[chain] || {}),
...depositAddress,
refreshed_at: refreshedAt,
refreshed_at: new Date().toISOString(),
};
if (previousAddress !== depositAddress.address) {
@ -154,8 +124,7 @@ async function refreshChain(chain, state) {
accountId: config.nearIntentsAccountId,
chain,
});
const bridgeDeposits = deposits?.deposits || [];
for (const deposit of bridgeDeposits) {
for (const deposit of deposits?.deposits || []) {
const key = `${chain}:${deposit.tx_hash || deposit.address}:${deposit.defuse_asset_identifier}`;
const assetId = mapDepositAssetId(deposit.defuse_asset_identifier, chain);
const normalized = {
@ -180,172 +149,6 @@ async function refreshChain(chain, state) {
}, state);
}
}
await refreshFundingObservations({
chain,
state,
fundingHandle: depositAddress.address,
bridgeDeposits,
});
}
async function refreshFundingObservations({ chain, state, fundingHandle, bridgeDeposits }) {
const refreshedAt = new Date().toISOString();
const observer = fundingObserverByChain.get(chain);
if (!fundingHandle) {
state.observer_health[chain] = {
chain,
healthy: false,
configured: false,
supported: Boolean(observer),
paused: state.funding_observer_paused,
source: observer ? 'configured' : 'unsupported',
refreshed_at: refreshedAt,
};
applyFundingObservationSummary(state, refreshedAt);
return;
}
if (!observer) {
state.observer_health[chain] = {
chain,
healthy: false,
configured: true,
supported: false,
paused: false,
handle: fundingHandle,
source: 'unsupported',
refreshed_at: refreshedAt,
};
applyFundingObservationSummary(state, refreshedAt);
return;
}
if (state.funding_observer_paused) {
state.observer_health[chain] = {
...(state.observer_health[chain] || {}),
chain,
healthy: true,
configured: true,
supported: true,
paused: true,
handle: fundingHandle,
refreshed_at: refreshedAt,
source: state.observer_health[chain]?.source || 'btc_mempool_space',
};
applyFundingObservationSummary(state, refreshedAt);
return;
}
try {
const observed = await observer.listTransactions({ address: fundingHandle });
for (const tx of observed.transactions) {
const key = buildFundingObservationKey({
chain,
fundingHandle,
txHash: tx.tx_hash,
});
const previous = state.funding_observations[key] || null;
const next = correlateFundingObservation({
existing: previous,
accountId: config.nearIntentsAccountId,
assetId: assetsByChain.get(chain),
chain,
fundingHandle,
source: tx.source || observed.source,
txHash: tx.tx_hash,
amount: tx.amount,
confirmations: tx.confirmations,
observedAt: tx.observed_at || observed.observed_at,
bridgeDeposit: matchBridgeDeposit({
txHash: tx.tx_hash,
fundingHandle,
bridgeDeposits,
}),
stuckAfterMs: config.fundingObservationStuckMs,
});
state.funding_observations[key] = next;
if (hasFundingObservationChanged(previous, next)) {
await publishFundingObservation(next, state);
}
}
for (const [key, previous] of Object.entries(state.funding_observations)) {
if (previous.chain !== chain || previous.funding_handle !== fundingHandle) continue;
const next = correlateFundingObservation({
existing: previous,
accountId: previous.account_id,
assetId: previous.asset_id,
chain: previous.chain,
fundingHandle: previous.funding_handle,
source: previous.source,
txHash: previous.tx_hash,
amount: previous.amount,
confirmations: previous.confirmations,
observedAt: refreshedAt,
bridgeDeposit: matchBridgeDeposit({
txHash: previous.tx_hash,
fundingHandle,
bridgeDeposits,
}),
stuckAfterMs: config.fundingObservationStuckMs,
});
state.funding_observations[key] = next;
if (hasFundingObservationChanged(previous, next)) {
await publishFundingObservation(next, state);
}
}
state.funding_observer_last_refresh_at = observed.observed_at || refreshedAt;
state.funding_observer_last_error = null;
state.observer_health[chain] = {
chain,
healthy: true,
configured: true,
supported: true,
paused: false,
handle: fundingHandle,
source: observed.source,
observed_count: observed.transactions.length,
refreshed_at: observed.observed_at || refreshedAt,
};
applyFundingObservationSummary(state, refreshedAt);
} catch (error) {
state.funding_observer_last_error = serializeError(error);
state.observer_health[chain] = {
chain,
healthy: false,
configured: true,
supported: true,
paused: false,
handle: fundingHandle,
source: 'btc_mempool_space',
refreshed_at: refreshedAt,
error: serializeError(error),
};
applyFundingObservationSummary(state, refreshedAt);
logger.error('funding_observation_refresh_failed', {
topic: config.kafkaTopicOpsFundingObservation,
details: {
chain,
funding_handle: fundingHandle,
error: serializeError(error),
},
});
}
}
function applyFundingObservationSummary(state, now = new Date().toISOString()) {
const summary = summarizeFundingObservations(
Object.values(state.funding_observations),
{ now },
);
state.funding_observations_by_handle = summary.funding_observations_by_handle;
state.funding_visibility_by_asset = summary.funding_visibility_by_asset;
state.latest_funding_observation_at = summary.latest_funding_observation_at;
state.uncredited_funding_total_by_asset = summary.uncredited_funding_total_by_asset;
state.credit_correlation = summary.credit_correlation;
}
async function refreshWithdrawal(tracked, state) {
@ -487,21 +290,6 @@ async function publishAction(payload, state) {
state.publish_count += 1;
}
async function publishFundingObservation(payload, state) {
const event = buildEventEnvelope({
source: 'liquidity-manager',
venue: 'near-intents',
eventType: 'funding_observation',
observedAt: payload.last_seen_at,
payload,
});
assertFundingObservationEvent(event);
await producer.sendJson(config.kafkaTopicOpsFundingObservation, event, {
key: payload.funding_observation_id,
});
state.funding_publish_count += 1;
}
const timer = setInterval(refresh, config.liquidityRefreshMs);
timer.unref?.();
await refresh();
@ -514,7 +302,14 @@ const controlApi = startControlApi({
namespace: config.projectNamespace,
stateProvider: {
getState() {
return buildPublicState();
return {
account_id: config.nearIntentsAccountId,
withdrawal_defaults: {
[config.tradingBtc.assetId]: config.tradingBtc.withdrawAddress || null,
[config.tradingEure.assetId]: config.tradingEure.withdrawAddress || null,
},
...store.getState(),
};
},
},
routes: [
@ -525,48 +320,7 @@ const controlApi = startControlApi({
await refresh();
return {
ok: true,
...buildPublicState(),
};
},
},
{
method: 'POST',
path: '/refresh-funding-observations',
handler: async () => {
await refresh();
return {
ok: true,
...buildPublicState(),
};
},
},
{
method: 'POST',
path: '/pause-funding-observer',
handler: () => {
const state = store.getState();
normalizeLiquidityState(state, {
withdrawalsFrozen: config.withdrawalsFrozen,
});
state.funding_observer_paused = true;
store.setState(state);
return { ok: true, funding_observer_paused: true };
},
},
{
method: 'POST',
path: '/resume-funding-observer',
handler: async () => {
const state = store.getState();
normalizeLiquidityState(state, {
withdrawalsFrozen: config.withdrawalsFrozen,
});
state.funding_observer_paused = false;
store.setState(state);
await refresh();
return {
ok: true,
...buildPublicState(),
...store.getState(),
};
},
},
@ -786,45 +540,3 @@ function inferWithdrawStatusCode(error) {
}
return 409;
}
function buildPublicState() {
const now = new Date().toISOString();
const state = normalizeLiquidityState(structuredClone(store.getState()), {
withdrawalsFrozen: config.withdrawalsFrozen,
});
applyFundingObservationSummary(state, now);
return {
account_id: config.nearIntentsAccountId,
withdrawal_defaults: {
[config.tradingBtc.assetId]: config.tradingBtc.withdrawAddress || null,
[config.tradingEure.assetId]: config.tradingEure.withdrawAddress || null,
},
...state,
observer_health: buildObserverHealth(state.observer_health, {
now,
fundingObserverPaused: state.funding_observer_paused,
}),
observer_age_ms: ageMs(state.funding_observer_last_refresh_at, now),
};
}
function buildObserverHealth(observerHealth, { now, fundingObserverPaused }) {
return Object.fromEntries(
Object.entries(observerHealth || {}).map(([chain, health]) => [
chain,
{
...health,
paused: fundingObserverPaused || health?.paused || false,
age_ms: ageMs(health?.refreshed_at, now),
},
]),
);
}
function ageMs(from, to) {
const left = Date.parse(from || '');
const right = Date.parse(to || '');
if (!Number.isFinite(left) || !Number.isFinite(right)) return null;
return Math.max(0, right - left);
}

View file

@ -1,220 +0,0 @@
import process from 'node:process';
import { createConsumer } from '../bus/kafka/consumer.mjs';
import { createProducer } from '../bus/kafka/producer.mjs';
import { startControlApi } from '../core/control-api.mjs';
import { createAlertEngine } from '../core/alert-engine.mjs';
import { buildEventEnvelope, parseEventMessage } from '../core/event-envelope.mjs';
import { createLogger, serializeError } from '../core/log.mjs';
import {
assertFundingObservationEvent,
assertInventorySnapshotEvent,
assertLiquidityActionEvent,
assertMarketPriceEvent,
assertOpsAlertEvent,
assertTradeResult,
} from '../core/schemas.mjs';
import { loadConfig } from '../lib/config.mjs';
const config = loadConfig();
const logger = createLogger({
service: 'ops-sentinel',
component: 'alerts',
namespace: config.projectNamespace,
});
const producer = await createProducer({
brokers: config.kafkaBrokers,
clientId: config.kafkaClientId,
logger,
});
const consumer = await createConsumer({
groupId: config.kafkaConsumerGroupOpsSentinel,
brokers: config.kafkaBrokers,
clientId: config.kafkaClientId,
logger,
});
const topics = [
config.kafkaTopicRefMarketPrice,
config.kafkaTopicStateIntentInventory,
config.kafkaTopicOpsLiquidityAction,
config.kafkaTopicOpsFundingObservation,
config.kafkaTopicExecTradeResult,
];
const state = {
paused: false,
last_error: null,
last_event_at: null,
publish_count: 0,
};
const alertEngine = createAlertEngine({
activePair: config.activePair,
priceStaleMs: config.opsSentinelPriceStaleMs,
inventoryStaleMs: config.opsSentinelInventoryStaleMs,
fundingCreditPendingMs: config.opsSentinelFundingCreditPendingMs,
fundingStuckMs: config.opsSentinelFundingStuckMs,
evaluationIntervalMs: config.opsSentinelEvaluationMs,
});
for (const topic of topics) {
await consumer.subscribe({ topic, fromBeginning: true });
}
await consumer.run({
eachMessage: async ({ topic, message }) => {
if (!message.value || state.paused) return;
try {
const event = parseEventMessage(message.value.toString());
const payload = normalizePayloadForAlert(topic, event);
const transitions = alertEngine.applyEvent(topic, payload);
state.last_error = null;
state.last_event_at = new Date().toISOString();
await publishTransitions(transitions);
} catch (error) {
state.last_error = serializeError(error);
logger.error('ops_sentinel_consume_failed', {
topic,
details: {
error: serializeError(error),
},
});
}
},
});
const timer = setInterval(() => {
if (state.paused) return;
const transitions = alertEngine.evaluate();
publishTransitions(transitions).catch((error) => {
state.last_error = serializeError(error);
logger.error('ops_sentinel_evaluate_failed', {
topic: config.kafkaTopicOpsAlert,
details: {
error: serializeError(error),
},
});
});
}, config.opsSentinelEvaluationMs);
timer.unref?.();
const controlApi = startControlApi({
host: config.opsSentinelControlHost,
port: config.opsSentinelControlPort,
logger: logger.child({ component: 'control-api' }),
service: 'ops-sentinel',
namespace: config.projectNamespace,
stateProvider: {
getState() {
return {
paused: state.paused,
publish_count: state.publish_count,
last_error: state.last_error,
last_event_at: state.last_event_at,
...alertEngine.getState(),
};
},
},
healthProvider: {
getHealth() {
return {
paused: state.paused,
last_event_at: state.last_event_at,
last_error: state.last_error,
};
},
},
routes: [
{
method: 'POST',
path: '/pause',
handler: () => {
state.paused = true;
consumer.pause(topics.map((topic) => ({ topic })));
return { ok: true, paused: true };
},
},
{
method: 'POST',
path: '/resume',
handler: () => {
state.paused = false;
consumer.resume(topics.map((topic) => ({ topic })));
return { ok: true, paused: false };
},
},
],
});
async function publishTransitions(transitions) {
for (const transition of transitions) {
const event = buildEventEnvelope({
source: 'ops-sentinel',
venue: 'unrip',
eventType: 'ops_alert',
observedAt: transition.last_evaluated_at,
payload: {
alert_event_id: `${transition.alert_code}-${transition.status}-${Date.now()}-${Math.random().toString(16).slice(2, 8)}`,
...transition,
},
});
assertOpsAlertEvent(event);
await producer.sendJson(config.kafkaTopicOpsAlert, event, {
key: `${transition.alert_code}:${transition.service_scope}:${transition.tx_hash || transition.pair || 'global'}`,
});
state.publish_count += 1;
}
}
function normalizePayloadForAlert(topic, event) {
switch (topic) {
case config.kafkaTopicRefMarketPrice:
assertMarketPriceEvent(event);
return {
...event.payload,
observed_at: event.observed_at,
ingested_at: event.ingested_at,
};
case config.kafkaTopicStateIntentInventory:
assertInventorySnapshotEvent(event);
return {
...event.payload,
observed_at: event.observed_at,
ingested_at: event.ingested_at,
};
case config.kafkaTopicOpsLiquidityAction:
assertLiquidityActionEvent(event);
return {
...event.payload,
observed_at: event.observed_at,
ingested_at: event.ingested_at,
};
case config.kafkaTopicOpsFundingObservation:
assertFundingObservationEvent(event);
return event.payload;
case config.kafkaTopicExecTradeResult:
assertTradeResult(event);
return {
...event.payload,
observed_at: event.observed_at,
ingested_at: event.ingested_at,
};
default:
throw new Error(`unsupported ops-sentinel topic: ${topic}`);
}
}
async function shutdown() {
clearInterval(timer);
await controlApi.close().catch(() => {});
await consumer.disconnect();
await producer.disconnect();
process.exit(0);
}
process.on('SIGINT', shutdown);
process.on('SIGTERM', shutdown);

View file

@ -1,370 +0,0 @@
const DEFAULT_RECENT_LIMIT = 50;
export function createAlertEngine({
activePair,
priceStaleMs,
inventoryStaleMs,
fundingCreditPendingMs,
fundingStuckMs,
evaluationIntervalMs,
recentTransitionLimit = DEFAULT_RECENT_LIMIT,
}) {
const state = {
latest_price: null,
latest_inventory: null,
latest_liquidity_action: null,
latest_trade_result: null,
funding_observations: {},
active_alerts: {},
recent_transitions: [],
last_evaluated_at: null,
};
return {
applyEvent(topic, payload, now = new Date().toISOString()) {
switch (topic) {
case 'ref.market_price':
state.latest_price = payload;
break;
case 'state.intent_inventory':
state.latest_inventory = payload;
break;
case 'ops.liquidity_action':
state.latest_liquidity_action = payload;
break;
case 'ops.funding_observation':
if (payload?.funding_observation_id) {
state.funding_observations[payload.funding_observation_id] = payload;
}
break;
case 'exec.trade_result':
state.latest_trade_result = payload;
break;
default:
break;
}
return evaluateAlerts({
state,
activePair,
priceStaleMs,
inventoryStaleMs,
fundingCreditPendingMs,
fundingStuckMs,
recentTransitionLimit,
now,
});
},
evaluate(now = new Date().toISOString()) {
return evaluateAlerts({
state,
activePair,
priceStaleMs,
inventoryStaleMs,
fundingCreditPendingMs,
fundingStuckMs,
recentTransitionLimit,
now,
});
},
getState(now = new Date().toISOString()) {
return summarizeState({
state,
evaluationIntervalMs,
now,
});
},
};
}
function evaluateAlerts({
state,
activePair,
priceStaleMs,
inventoryStaleMs,
fundingCreditPendingMs,
fundingStuckMs,
recentTransitionLimit,
now,
}) {
const desired = new Map();
const nowValue = timestampValue(now);
const priceAgeMs = ageMs(state.latest_price?.observed_at || state.latest_price?.ingested_at, nowValue);
if (priceAgeMs == null || priceAgeMs > priceStaleMs) {
desired.set(
buildAlertKey({
alertCode: 'reference_price_stale',
serviceScope: 'market-reference-ingest',
pair: activePair,
}),
{
alert_code: 'reference_price_stale',
severity: 'warning',
reason: priceAgeMs == null
? 'no reference price has been observed'
: `reference price age ${priceAgeMs}ms exceeds ${priceStaleMs}ms`,
service_scope: 'market-reference-ingest',
pair: activePair,
asset_id: null,
tx_hash: null,
details: {
last_price_at: state.latest_price?.observed_at || state.latest_price?.ingested_at || null,
age_ms: priceAgeMs,
stale_after_ms: priceStaleMs,
},
},
);
}
const inventoryAgeMs = ageMs(
state.latest_inventory?.synced_at || state.latest_inventory?.ingested_at,
nowValue,
);
if (inventoryAgeMs == null || inventoryAgeMs > inventoryStaleMs) {
desired.set(
buildAlertKey({
alertCode: 'inventory_snapshot_stale',
serviceScope: 'inventory-sync',
}),
{
alert_code: 'inventory_snapshot_stale',
severity: 'warning',
reason: inventoryAgeMs == null
? 'no inventory snapshot has been observed'
: `inventory snapshot age ${inventoryAgeMs}ms exceeds ${inventoryStaleMs}ms`,
service_scope: 'inventory-sync',
pair: null,
asset_id: null,
tx_hash: null,
details: {
last_inventory_at: state.latest_inventory?.synced_at || state.latest_inventory?.ingested_at || null,
age_ms: inventoryAgeMs,
stale_after_ms: inventoryStaleMs,
},
},
);
}
for (const observation of Object.values(state.funding_observations)) {
const observationAgeMs = ageMs(observation.last_seen_at, nowValue);
const baseDetails = {
funding_observation_id: observation.funding_observation_id,
funding_handle: observation.funding_handle,
confirmations: observation.confirmations,
amount: observation.amount,
observation_status: observation.status,
first_seen_at: observation.first_seen_at,
last_seen_at: observation.last_seen_at,
age_ms: observationAgeMs,
bridge_deposit_tx_hash: observation.bridge_deposit_tx_hash,
bridge_status: observation.bridge_status,
};
if (observation.status === 'SEEN_UNCONFIRMED') {
desired.set(
buildAlertKey({
alertCode: 'funding_seen_unconfirmed',
serviceScope: 'liquidity-manager',
assetId: observation.asset_id,
txHash: observation.tx_hash,
}),
{
alert_code: 'funding_seen_unconfirmed',
severity: 'info',
reason: `funding tx ${observation.tx_hash} is visible before confirmations`,
service_scope: 'liquidity-manager',
pair: null,
asset_id: observation.asset_id,
tx_hash: observation.tx_hash,
details: baseDetails,
},
);
}
if (
observation.status === 'CREDIT_PENDING'
|| (observation.status === 'SEEN_CONFIRMED' && observationAgeMs != null && observationAgeMs >= fundingCreditPendingMs)
) {
desired.set(
buildAlertKey({
alertCode: 'funding_confirmed_credit_pending',
serviceScope: 'liquidity-manager',
assetId: observation.asset_id,
txHash: observation.tx_hash,
}),
{
alert_code: 'funding_confirmed_credit_pending',
severity: 'warning',
reason: `funding tx ${observation.tx_hash} is confirmed but not spendable yet`,
service_scope: 'liquidity-manager',
pair: null,
asset_id: observation.asset_id,
tx_hash: observation.tx_hash,
details: {
...baseDetails,
credit_pending_after_ms: fundingCreditPendingMs,
},
},
);
}
if (
observation.status === 'FAILED_OR_STUCK'
|| (
observation.status !== 'CREDITED'
&& observation.status !== 'SEEN_UNCONFIRMED'
&& observationAgeMs != null
&& fundingStuckMs != null
&& observationAgeMs >= fundingStuckMs
)
) {
desired.set(
buildAlertKey({
alertCode: 'funding_stuck',
serviceScope: 'liquidity-manager',
assetId: observation.asset_id,
txHash: observation.tx_hash,
}),
{
alert_code: 'funding_stuck',
severity: 'critical',
reason: `funding tx ${observation.tx_hash} is failed or stuck before credit`,
service_scope: 'liquidity-manager',
pair: null,
asset_id: observation.asset_id,
tx_hash: observation.tx_hash,
details: baseDetails,
},
);
}
}
const latestTradeResult = state.latest_trade_result;
if (latestTradeResult && isSubmissionFailure(latestTradeResult)) {
desired.set(
buildAlertKey({
alertCode: 'executor_submission_failed',
serviceScope: 'trade-executor',
pair: latestTradeResult.pair || activePair,
}),
{
alert_code: 'executor_submission_failed',
severity: 'critical',
reason: `executor submission failed for command ${latestTradeResult.command_id}`,
service_scope: 'trade-executor',
pair: latestTradeResult.pair || activePair,
asset_id: null,
tx_hash: null,
details: {
command_id: latestTradeResult.command_id,
quote_id: latestTradeResult.quote_id,
result_code: latestTradeResult.result_code,
error: latestTradeResult.error || null,
status: latestTradeResult.status,
},
},
);
}
const transitions = reconcileAlertState({
state,
desired,
now,
recentTransitionLimit,
});
state.last_evaluated_at = now;
return transitions;
}
function reconcileAlertState({ state, desired, now, recentTransitionLimit }) {
const transitions = [];
for (const [key, next] of desired.entries()) {
const existing = state.active_alerts[key];
if (!existing) {
const raised = {
...next,
status: 'raised',
first_raised_at: now,
raised_at: now,
cleared_at: null,
last_evaluated_at: now,
};
state.active_alerts[key] = raised;
transitions.push(raised);
continue;
}
state.active_alerts[key] = {
...existing,
...next,
status: 'raised',
raised_at: existing.raised_at || existing.first_raised_at || now,
first_raised_at: existing.first_raised_at || existing.raised_at || now,
cleared_at: null,
last_evaluated_at: now,
};
}
for (const [key, existing] of Object.entries(state.active_alerts)) {
if (desired.has(key)) continue;
const cleared = {
...existing,
status: 'cleared',
raised_at: existing.raised_at || existing.first_raised_at || now,
cleared_at: now,
last_evaluated_at: now,
};
delete state.active_alerts[key];
transitions.push(cleared);
}
if (transitions.length > 0) {
state.recent_transitions.unshift(...transitions);
state.recent_transitions = state.recent_transitions.slice(0, recentTransitionLimit);
}
return transitions;
}
function summarizeState({ state, evaluationIntervalMs, now }) {
const activeAlerts = Object.values(state.active_alerts)
.sort((left, right) => timestampValue(right.first_raised_at) - timestampValue(left.first_raised_at));
const nowValue = timestampValue(now);
return {
active_alerts: activeAlerts,
recent_transitions: state.recent_transitions,
last_evaluated_at: state.last_evaluated_at,
stale: ageMs(state.last_evaluated_at, nowValue) > (evaluationIntervalMs * 2),
latest_inputs: {
market_price_at: state.latest_price?.observed_at || state.latest_price?.ingested_at || null,
inventory_at: state.latest_inventory?.synced_at || state.latest_inventory?.ingested_at || null,
liquidity_action_at: state.latest_liquidity_action?.observed_at || null,
trade_result_at: state.latest_trade_result?.ingested_at || null,
funding_observation_count: Object.keys(state.funding_observations).length,
},
};
}
function buildAlertKey({ alertCode, serviceScope, pair = null, assetId = null, txHash = null }) {
return [alertCode, serviceScope, pair || '', assetId || '', txHash || ''].join('|');
}
function isSubmissionFailure(result) {
return result?.status === 'failed' || result?.result_code === 'submission_failed';
}
function ageMs(value, nowValue) {
const start = timestampValue(value);
if (!Number.isFinite(start) || !Number.isFinite(nowValue)) return null;
return Math.max(0, nowValue - start);
}
function timestampValue(value) {
if (!value) return NaN;
const parsed = Date.parse(value);
return Number.isFinite(parsed) ? parsed : NaN;
}

View file

@ -1,225 +0,0 @@
import crypto from 'node:crypto';
import { bigintAmount, mapToSortedObject } from './assets.mjs';
const CREDITED_BRIDGE_STATUSES = new Set(['COMPLETED', 'CREDITED', 'FINALIZED', 'SETTLED']);
const FAILED_BRIDGE_STATUSES = new Set(['FAILED', 'ERROR', 'REJECTED', 'EXPIRED']);
const UNCONFIRMED_STATUS = 'SEEN_UNCONFIRMED';
const CONFIRMED_STATUS = 'SEEN_CONFIRMED';
const CREDIT_PENDING_STATUS = 'CREDIT_PENDING';
const CREDITED_STATUS = 'CREDITED';
const FAILED_OR_STUCK_STATUS = 'FAILED_OR_STUCK';
export function buildFundingObservationId({ accountId, chain, fundingHandle, txHash }) {
return crypto
.createHash('sha256')
.update(`${accountId || ''}|${chain || ''}|${fundingHandle || ''}|${txHash || ''}`)
.digest('hex');
}
export function buildFundingObservationKey({ chain, fundingHandle, txHash }) {
return `${chain || 'unknown'}:${fundingHandle || 'unknown'}:${txHash || 'unknown'}`;
}
export function correlateFundingObservation({
existing = null,
accountId,
assetId,
chain,
fundingHandle,
source,
txHash,
amount,
confirmations = 0,
observedAt = new Date().toISOString(),
bridgeDeposit = null,
stuckAfterMs = 0,
}) {
const firstSeenAt = existing?.first_seen_at || observedAt;
const lastSeenAt = observedAt;
const normalizedConfirmations = normalizeConfirmations(confirmations);
const bridgeStatus = normalizeBridgeStatus(bridgeDeposit?.status);
const bridgeTxHash = bridgeDeposit?.tx_hash || null;
const ageMs = isoAgeMs(firstSeenAt, observedAt);
const fundingObservationId = existing?.funding_observation_id || buildFundingObservationId({
accountId,
chain,
fundingHandle,
txHash,
});
let status = normalizedConfirmations > 0 ? CONFIRMED_STATUS : UNCONFIRMED_STATUS;
if (bridgeStatus) {
if (CREDITED_BRIDGE_STATUSES.has(bridgeStatus)) status = CREDITED_STATUS;
else if (FAILED_BRIDGE_STATUSES.has(bridgeStatus)) status = FAILED_OR_STUCK_STATUS;
else status = CREDIT_PENDING_STATUS;
} else if (stuckAfterMs > 0 && ageMs >= stuckAfterMs && normalizedConfirmations > 0) {
status = FAILED_OR_STUCK_STATUS;
}
return {
funding_observation_id: fundingObservationId,
account_id: accountId,
asset_id: assetId,
chain,
funding_handle: fundingHandle,
source,
tx_hash: txHash,
status,
amount: String(amount || '0'),
confirmations: normalizedConfirmations,
first_seen_at: firstSeenAt,
last_seen_at: lastSeenAt,
credited_at: status === CREDITED_STATUS ? (existing?.credited_at || observedAt) : null,
bridge_deposit_tx_hash: bridgeTxHash,
bridge_status: bridgeStatus,
credit_correlation: bridgeStatus ? (bridgeDeposit?.tx_hash === txHash ? 'tx_hash' : 'handle') : null,
};
}
export function summarizeFundingObservations(observations, { now = new Date().toISOString() } = {}) {
const byHandle = {};
const byAsset = {};
const uncreditedFundingTotalByAsset = {};
const creditCorrelation = {};
let latestFundingObservationAt = null;
const sorted = [...observations].sort((left, right) => (
timestampValue(right.last_seen_at) - timestampValue(left.last_seen_at)
));
for (const observation of sorted) {
if (!observation?.funding_observation_id) continue;
const handleKey = observation.funding_handle || `${observation.chain}:${observation.asset_id}`;
const ageMs = isoAgeMs(observation.last_seen_at, now);
const enriched = {
...observation,
age_ms: ageMs,
spendable: false,
pre_credit: observation.status !== CREDITED_STATUS,
};
const handle = byHandle[handleKey] ||= {
funding_handle: observation.funding_handle,
chain: observation.chain,
asset_id: observation.asset_id,
source: observation.source,
latest_status: observation.status,
latest_observation_at: observation.last_seen_at,
pre_credit_total: '0',
observations: [],
};
handle.observations.push(enriched);
if (timestampValue(observation.last_seen_at) > timestampValue(handle.latest_observation_at)) {
handle.latest_status = observation.status;
handle.latest_observation_at = observation.last_seen_at;
handle.source = observation.source;
}
const asset = byAsset[observation.asset_id] ||= {
asset_id: observation.asset_id,
pre_credit_total: '0',
latest_observation_at: observation.last_seen_at,
latest_status: observation.status,
};
if (timestampValue(observation.last_seen_at) > timestampValue(asset.latest_observation_at)) {
asset.latest_observation_at = observation.last_seen_at;
asset.latest_status = observation.status;
}
creditCorrelation[observation.funding_observation_id] = {
funding_observation_id: observation.funding_observation_id,
tx_hash: observation.tx_hash,
bridge_deposit_tx_hash: observation.bridge_deposit_tx_hash,
status: observation.status,
credited_at: observation.credited_at,
bridge_status: observation.bridge_status,
};
if (observation.status !== CREDITED_STATUS) {
handle.pre_credit_total = addAmounts(handle.pre_credit_total, observation.amount);
asset.pre_credit_total = addAmounts(asset.pre_credit_total, observation.amount);
uncreditedFundingTotalByAsset[observation.asset_id] = addAmounts(
uncreditedFundingTotalByAsset[observation.asset_id] || '0',
observation.amount,
);
}
if (timestampValue(observation.last_seen_at) > timestampValue(latestFundingObservationAt)) {
latestFundingObservationAt = observation.last_seen_at;
}
}
return {
funding_observations_by_handle: mapToSortedObject(byHandle),
funding_visibility_by_asset: mapToSortedObject(byAsset),
latest_funding_observation_at: latestFundingObservationAt,
uncredited_funding_total_by_asset: mapToSortedObject(uncreditedFundingTotalByAsset),
credit_correlation: mapToSortedObject(creditCorrelation),
};
}
export function buildFundingVisibility(observations, { now = new Date().toISOString() } = {}) {
const summary = summarizeFundingObservations(observations, { now });
return {
last_observed_at: summary.latest_funding_observation_at,
pre_credit_inbound: summary.uncredited_funding_total_by_asset,
by_asset: summary.funding_visibility_by_asset,
by_handle: summary.funding_observations_by_handle,
};
}
export function hasFundingObservationChanged(previous, next) {
if (!previous) return true;
return (
previous.status !== next.status
|| previous.confirmations !== next.confirmations
|| previous.bridge_deposit_tx_hash !== next.bridge_deposit_tx_hash
|| previous.bridge_status !== next.bridge_status
|| previous.credited_at !== next.credited_at
);
}
export function matchBridgeDeposit({ txHash, fundingHandle, bridgeDeposits }) {
const txMatch = bridgeDeposits.find((deposit) => deposit?.tx_hash && deposit.tx_hash === txHash);
if (txMatch) return txMatch;
return bridgeDeposits.find((deposit) => (
!deposit?.tx_hash
&& (
deposit?.address
&& fundingHandle
&& deposit.address === fundingHandle
)
)) || null;
}
function normalizeBridgeStatus(status) {
return status ? String(status).toUpperCase() : null;
}
function normalizeConfirmations(value) {
const normalized = Number(value);
if (!Number.isFinite(normalized) || normalized < 0) return 0;
return Math.floor(normalized);
}
function addAmounts(left, right) {
return (bigintAmount(left) + bigintAmount(right)).toString();
}
function isoAgeMs(from, to) {
const start = timestampValue(from);
const end = timestampValue(to);
if (!Number.isFinite(start) || !Number.isFinite(end)) return null;
return Math.max(0, end - start);
}
function timestampValue(value) {
if (!value) return -Infinity;
const parsed = Date.parse(value);
return Number.isFinite(parsed) ? parsed : -Infinity;
}

View file

@ -1,11 +1,9 @@
import {
assertExecuteTradeCommand,
assertFundingObservationEvent,
assertInventorySnapshotEvent,
assertLiquidityActionEvent,
assertMarketPriceEvent,
assertNormalizedSwapDemand,
assertOpsAlertEvent,
assertTradeDecisionEvent,
assertTradeResult,
} from './schemas.mjs';
@ -75,32 +73,6 @@ export function routeHistoryRecord({ topic, event }) {
decision_key: event.payload.liquidity_action_id,
},
};
case 'ops.funding_observation':
assertFundingObservationEvent(event);
return {
table: 'funding_observations',
record: {
event_id: event.event_id,
observed_at: event.observed_at,
ingested_at: event.ingested_at,
quote_id: null,
pair: null,
decision_key: event.payload.funding_observation_id,
},
};
case 'ops.alert':
assertOpsAlertEvent(event);
return {
table: 'ops_alerts',
record: {
event_id: event.event_id,
observed_at: event.observed_at,
ingested_at: event.ingested_at,
quote_id: null,
pair: event.payload.pair || null,
decision_key: event.payload.alert_event_id,
},
};
case 'decision.trade_decision':
assertTradeDecisionEvent(event);
return {

View file

@ -3,16 +3,8 @@ export function normalizeLiquidityState(state, { withdrawalsFrozen }) {
state.deposits ||= {};
state.tracked_withdrawals ||= {};
state.supported_tokens ||= {};
state.funding_observations ||= {};
state.funding_observations_by_handle ||= {};
state.funding_visibility_by_asset ||= {};
state.uncredited_funding_total_by_asset ||= {};
state.credit_correlation ||= {};
state.observer_health ||= {};
state.publish_count ||= 0;
state.funding_publish_count ||= 0;
state.withdrawals_frozen ??= withdrawalsFrozen;
state.paused ??= false;
state.funding_observer_paused ??= false;
return state;
}

View file

@ -10,10 +10,6 @@ function requireOneOf(value, field, values) {
if (!values.includes(value)) throw new Error(`Unexpected ${field}: ${value}`);
}
function requireNumber(value, field) {
if (typeof value !== 'number' || !Number.isFinite(value)) throw new Error(`Missing ${field}`);
}
export function assertEventEnvelope(event) {
requireObject(event, 'event');
requireString(event.event_id, 'event.event_id');
@ -78,50 +74,6 @@ export function assertLiquidityActionEvent(event) {
return event;
}
export function assertFundingObservationEvent(event) {
assertEventEnvelope(event);
if (event.event_type !== 'funding_observation') {
throw new Error(`Unexpected event_type: ${event.event_type}`);
}
const payload = event.payload;
requireString(payload.funding_observation_id, 'payload.funding_observation_id');
requireString(payload.account_id, 'payload.account_id');
requireString(payload.asset_id, 'payload.asset_id');
requireString(payload.chain, 'payload.chain');
requireString(payload.funding_handle, 'payload.funding_handle');
requireString(payload.source, 'payload.source');
requireString(payload.tx_hash, 'payload.tx_hash');
requireString(payload.status, 'payload.status');
requireString(payload.amount, 'payload.amount');
requireNumber(payload.confirmations, 'payload.confirmations');
requireString(payload.first_seen_at, 'payload.first_seen_at');
requireString(payload.last_seen_at, 'payload.last_seen_at');
if (payload.credited_at != null) requireString(payload.credited_at, 'payload.credited_at');
if (payload.bridge_deposit_tx_hash != null) {
requireString(payload.bridge_deposit_tx_hash, 'payload.bridge_deposit_tx_hash');
}
return event;
}
export function assertOpsAlertEvent(event) {
assertEventEnvelope(event);
if (event.event_type !== 'ops_alert') throw new Error(`Unexpected event_type: ${event.event_type}`);
const payload = event.payload;
requireString(payload.alert_event_id, 'payload.alert_event_id');
requireString(payload.alert_code, 'payload.alert_code');
requireString(payload.status, 'payload.status');
requireOneOf(payload.status, 'payload.status', ['raised', 'cleared']);
requireString(payload.severity, 'payload.severity');
requireString(payload.reason, 'payload.reason');
requireString(payload.service_scope, 'payload.service_scope');
requireString(payload.raised_at, 'payload.raised_at');
if (payload.cleared_at != null) requireString(payload.cleared_at, 'payload.cleared_at');
requireObject(payload.details, 'payload.details');
return event;
}
export function assertTradeDecisionEvent(event) {
assertEventEnvelope(event);
if (event.event_type !== 'trade_decision') throw new Error(`Unexpected event_type: ${event.event_type}`);

View file

@ -18,7 +18,6 @@ const DEFAULTS = {
historyWriterControlPort: 8085,
strategyEngineControlPort: 8086,
tradeExecutorControlPort: 8087,
opsSentinelControlPort: 8088,
kafkaBrokers: ['127.0.0.1:9092'],
kafkaClientId: 'unrip',
kafkaTopicRawNearIntentsQuote: 'raw.near_intents.quote',
@ -26,8 +25,6 @@ const DEFAULTS = {
kafkaTopicRefMarketPrice: 'ref.market_price',
kafkaTopicStateIntentInventory: 'state.intent_inventory',
kafkaTopicOpsLiquidityAction: 'ops.liquidity_action',
kafkaTopicOpsFundingObservation: 'ops.funding_observation',
kafkaTopicOpsAlert: 'ops.alert',
kafkaTopicDecisionTradeDecision: 'decision.trade_decision',
kafkaTopicCmdExecuteTrade: 'cmd.execute_trade',
kafkaTopicExecTradeResult: 'exec.trade_result',
@ -35,7 +32,6 @@ const DEFAULTS = {
kafkaConsumerGroupInventory: 'inventory-sync-v1',
kafkaConsumerGroupStrategy: 'strategy-engine-v1',
kafkaConsumerGroupExecutor: 'trade-executor-v1',
kafkaConsumerGroupOpsSentinel: 'ops-sentinel-v1',
executorStateDir: './var/executor-state',
liquidityStateDir: './var/liquidity-state',
postgresUrl: 'postgresql://unrip:unrip@127.0.0.1:5432/unrip',
@ -67,11 +63,6 @@ const DEFAULTS = {
executorInitialArmed: false,
executorResponseTimeoutMs: 10_000,
withdrawalsFrozen: true,
btcFundingObserverEnabled: true,
btcFundingObserverBaseUrl: 'https://mempool.space/api',
fundingObservationStuckMs: 60 * 60 * 1000,
opsSentinelEvaluationMs: 5_000,
opsSentinelFundingCreditPendingMs: 5 * 60 * 1000,
};
function splitCsv(value) {
@ -180,12 +171,6 @@ export function loadConfig({ envPath = '.env' } = {}) {
process.env.HISTORY_WRITER_CONTROL_PORT,
DEFAULTS.historyWriterControlPort,
),
opsSentinelControlHost:
process.env.OPS_SENTINEL_CONTROL_HOST || DEFAULTS.nearIntentsControlHost,
opsSentinelControlPort: parseNumber(
process.env.OPS_SENTINEL_CONTROL_PORT,
DEFAULTS.opsSentinelControlPort,
),
strategyEngineControlHost:
process.env.STRATEGY_ENGINE_CONTROL_HOST || DEFAULTS.nearIntentsControlHost,
strategyEngineControlPort: parseNumber(
@ -212,10 +197,6 @@ export function loadConfig({ envPath = '.env' } = {}) {
process.env.KAFKA_TOPIC_STATE_INTENT_INVENTORY || DEFAULTS.kafkaTopicStateIntentInventory,
kafkaTopicOpsLiquidityAction:
process.env.KAFKA_TOPIC_OPS_LIQUIDITY_ACTION || DEFAULTS.kafkaTopicOpsLiquidityAction,
kafkaTopicOpsFundingObservation:
process.env.KAFKA_TOPIC_OPS_FUNDING_OBSERVATION || DEFAULTS.kafkaTopicOpsFundingObservation,
kafkaTopicOpsAlert:
process.env.KAFKA_TOPIC_OPS_ALERT || DEFAULTS.kafkaTopicOpsAlert,
kafkaTopicDecisionTradeDecision:
process.env.KAFKA_TOPIC_DECISION_TRADE_DECISION || DEFAULTS.kafkaTopicDecisionTradeDecision,
kafkaTopicCmdExecuteTrade:
@ -230,8 +211,6 @@ export function loadConfig({ envPath = '.env' } = {}) {
process.env.KAFKA_CONSUMER_GROUP_STRATEGY || DEFAULTS.kafkaConsumerGroupStrategy,
kafkaConsumerGroupExecutor:
process.env.KAFKA_CONSUMER_GROUP_EXECUTOR || DEFAULTS.kafkaConsumerGroupExecutor,
kafkaConsumerGroupOpsSentinel:
process.env.KAFKA_CONSUMER_GROUP_OPS_SENTINEL || DEFAULTS.kafkaConsumerGroupOpsSentinel,
executorStateDir: process.env.EXECUTOR_STATE_DIR || DEFAULTS.executorStateDir,
liquidityStateDir: process.env.LIQUIDITY_STATE_DIR || DEFAULTS.liquidityStateDir,
postgresUrl: process.env.POSTGRES_URL || DEFAULTS.postgresUrl,
@ -301,35 +280,5 @@ export function loadConfig({ envPath = '.env' } = {}) {
process.env.LIQUIDITY_WITHDRAWALS_FROZEN,
DEFAULTS.withdrawalsFrozen,
),
btcFundingObserverEnabled: parseBoolean(
process.env.BTC_FUNDING_OBSERVER_ENABLED,
DEFAULTS.btcFundingObserverEnabled,
),
btcFundingObserverBaseUrl:
process.env.BTC_FUNDING_OBSERVER_BASE_URL || DEFAULTS.btcFundingObserverBaseUrl,
fundingObservationStuckMs: parseNumber(
process.env.FUNDING_OBSERVATION_STUCK_MS,
DEFAULTS.fundingObservationStuckMs,
),
opsSentinelEvaluationMs: parseNumber(
process.env.OPS_SENTINEL_EVALUATION_MS,
DEFAULTS.opsSentinelEvaluationMs,
),
opsSentinelPriceStaleMs: parseNumber(
process.env.OPS_SENTINEL_PRICE_STALE_MS,
DEFAULTS.marketReferenceMaxAgeMs,
),
opsSentinelInventoryStaleMs: parseNumber(
process.env.OPS_SENTINEL_INVENTORY_STALE_MS,
DEFAULTS.strategyInventoryMaxAgeMs,
),
opsSentinelFundingCreditPendingMs: parseNumber(
process.env.OPS_SENTINEL_FUNDING_CREDIT_PENDING_MS,
DEFAULTS.opsSentinelFundingCreditPendingMs,
),
opsSentinelFundingStuckMs: parseNumber(
process.env.OPS_SENTINEL_FUNDING_STUCK_MS,
DEFAULTS.fundingObservationStuckMs,
),
};
}

View file

@ -6,8 +6,6 @@ const TABLES = [
'market_price_events',
'intent_inventory_snapshots',
'liquidity_actions',
'funding_observations',
'ops_alerts',
'trade_decisions',
'execute_trade_commands',
'trade_execution_results',
@ -53,47 +51,6 @@ export async function ensureHistorySchema(pool) {
`);
}
await ensureExpressionIndex(pool, {
name: 'funding_observations_tx_hash_idx',
table: 'funding_observations',
expression: "(payload->>'tx_hash')",
});
await ensureExpressionIndex(pool, {
name: 'funding_observations_handle_idx',
table: 'funding_observations',
expression: "(payload->>'funding_handle')",
});
await ensureExpressionIndex(pool, {
name: 'funding_observations_asset_id_idx',
table: 'funding_observations',
expression: "(payload->>'asset_id')",
});
await ensureExpressionIndex(pool, {
name: 'funding_observations_chain_idx',
table: 'funding_observations',
expression: "(payload->>'chain')",
});
await ensureExpressionIndex(pool, {
name: 'funding_observations_status_idx',
table: 'funding_observations',
expression: "(payload->>'status')",
});
await ensureExpressionIndex(pool, {
name: 'ops_alerts_alert_code_idx',
table: 'ops_alerts',
expression: "(payload->>'alert_code')",
});
await ensureExpressionIndex(pool, {
name: 'ops_alerts_status_idx',
table: 'ops_alerts',
expression: "(payload->>'status')",
});
await ensureExpressionIndex(pool, {
name: 'ops_alerts_asset_id_idx',
table: 'ops_alerts',
expression: "(payload->>'asset_id')",
});
await pool.query(`
CREATE TABLE IF NOT EXISTS ${PORTFOLIO_METRICS_TABLE} (
metric_id TEXT PRIMARY KEY,
@ -283,10 +240,3 @@ function normalizePortfolioMetricRow(row) {
payload: row.payload,
};
}
async function ensureExpressionIndex(pool, { name, table, expression }) {
await pool.query(`
CREATE INDEX IF NOT EXISTS ${name}
ON ${table} (${expression})
`);
}

View file

@ -1,95 +0,0 @@
import { fetchJson } from '../lib/http.mjs';
export function createBtcAddressObserver({
baseUrl,
source = 'btc_mempool_space',
}) {
const normalizedBaseUrl = String(baseUrl || '').replace(/\/+$/, '');
if (!normalizedBaseUrl) throw new Error('Missing BTC funding observer base URL');
return {
async listTransactions({ address }) {
if (!address) return {
source,
observed_at: new Date().toISOString(),
transactions: [],
};
const [tipHeight, latestTxs, mempoolTxs] = await Promise.all([
fetchTipHeight(normalizedBaseUrl),
fetchJson(`${normalizedBaseUrl}/address/${encodeURIComponent(address)}/txs`).catch(() => []),
fetchJson(`${normalizedBaseUrl}/address/${encodeURIComponent(address)}/txs/mempool`).catch(() => []),
]);
const transactions = dedupeTransactions([...(latestTxs || []), ...(mempoolTxs || [])])
.map((tx) => normalizeBtcTransaction(tx, { address, tipHeight, source }))
.filter(Boolean);
return {
source,
observed_at: new Date().toISOString(),
transactions,
};
},
};
}
async function fetchTipHeight(baseUrl) {
const response = await fetch(`${baseUrl}/blocks/tip/height`);
if (!response.ok) return null;
const text = await response.text();
const parsed = Number(text);
return Number.isFinite(parsed) ? parsed : null;
}
function dedupeTransactions(transactions) {
const seen = new Set();
const deduped = [];
for (const tx of transactions) {
const key = tx?.txid;
if (!key || seen.has(key)) continue;
seen.add(key);
deduped.push(tx);
}
return deduped;
}
function normalizeBtcTransaction(tx, { address, tipHeight, source }) {
if (!tx?.txid) return null;
const amount = sumOutputsToAddress(tx.vout, address);
if (amount === 0n) return null;
const confirmed = tx.status?.confirmed === true;
const confirmations = confirmed
? computeConfirmations(tipHeight, tx.status?.block_height)
: 0;
const observedAt = tx.status?.block_time
? new Date(Number(tx.status.block_time) * 1000).toISOString()
: new Date().toISOString();
return {
source,
tx_hash: tx.txid,
amount: amount.toString(),
confirmations,
observed_at: observedAt,
};
}
function sumOutputsToAddress(outputs, address) {
let total = 0n;
for (const output of outputs || []) {
if (output?.scriptpubkey_address !== address) continue;
total += BigInt(String(output.value || 0));
}
return total;
}
function computeConfirmations(tipHeight, blockHeight) {
const tip = Number(tipHeight);
const block = Number(blockHeight);
if (!Number.isFinite(tip) || !Number.isFinite(block)) return 1;
return Math.max(1, (tip - block) + 1);
}

View file

@ -1,103 +0,0 @@
import test from 'node:test';
import assert from 'node:assert/strict';
import { createAlertEngine } from '../src/core/alert-engine.mjs';
function createEngine() {
return createAlertEngine({
activePair: 'nep141:btc.omft.near->nep141:eure.omft.near',
priceStaleMs: 30_000,
inventoryStaleMs: 30_000,
fundingCreditPendingMs: 300_000,
fundingStuckMs: 3_600_000,
evaluationIntervalMs: 5_000,
});
}
test('alert engine raises and clears stale state transitions', () => {
const engine = createEngine();
let transitions = engine.applyEvent('ref.market_price', {
price_id: 'price-1',
pair: 'nep141:btc.omft.near->nep141:eure.omft.near',
eur_per_btc: '100000',
btc_per_eure: '0.00001',
observed_at: '2026-04-03T08:00:00.000Z',
}, '2026-04-03T08:00:00.000Z');
assert.equal(transitions.length, 1);
assert.equal(transitions[0].alert_code, 'inventory_snapshot_stale');
assert.equal(transitions[0].status, 'raised');
transitions = engine.applyEvent('state.intent_inventory', {
inventory_id: 'inventory-1',
account_id: 'solver.near',
reconciliation_status: 'ok',
spendable: { 'nep141:btc.omft.near': '1000' },
synced_at: '2026-04-03T08:00:00.000Z',
}, '2026-04-03T08:00:00.000Z');
assert.equal(transitions.length, 1);
assert.equal(transitions[0].alert_code, 'inventory_snapshot_stale');
assert.equal(transitions[0].status, 'cleared');
transitions = engine.evaluate('2026-04-03T08:00:31.000Z');
assert.equal(transitions.length, 2);
assert.deepEqual(
transitions.map((transition) => `${transition.alert_code}:${transition.status}`).sort(),
[
'inventory_snapshot_stale:raised',
'reference_price_stale:raised',
],
);
transitions = engine.applyEvent('ref.market_price', {
price_id: 'price-2',
pair: 'nep141:btc.omft.near->nep141:eure.omft.near',
eur_per_btc: '100100',
btc_per_eure: '0.00000999',
observed_at: '2026-04-03T08:00:32.000Z',
}, '2026-04-03T08:00:32.000Z');
assert.equal(transitions.length, 1);
assert.equal(transitions[0].alert_code, 'reference_price_stale');
assert.equal(transitions[0].status, 'cleared');
});
test('executor submission failure produces an alert event and clears on recovery', () => {
const engine = createEngine();
engine.applyEvent('ref.market_price', {
price_id: 'price-1',
pair: 'nep141:btc.omft.near->nep141:eure.omft.near',
eur_per_btc: '100000',
btc_per_eure: '0.00001',
observed_at: '2026-04-03T08:00:00.000Z',
}, '2026-04-03T08:00:00.000Z');
engine.applyEvent('state.intent_inventory', {
inventory_id: 'inventory-1',
account_id: 'solver.near',
reconciliation_status: 'ok',
spendable: { 'nep141:btc.omft.near': '1000' },
synced_at: '2026-04-03T08:00:00.000Z',
}, '2026-04-03T08:00:00.000Z');
let transitions = engine.applyEvent('exec.trade_result', {
command_id: 'cmd-1',
quote_id: 'quote-1',
pair: 'nep141:btc.omft.near->nep141:eure.omft.near',
status: 'failed',
result_code: 'submission_failed',
error: { message: 'relay timeout' },
}, '2026-04-03T08:00:10.000Z');
assert.equal(transitions.length, 1);
assert.equal(transitions[0].alert_code, 'executor_submission_failed');
assert.equal(transitions[0].status, 'raised');
transitions = engine.applyEvent('exec.trade_result', {
command_id: 'cmd-2',
quote_id: 'quote-2',
pair: 'nep141:btc.omft.near->nep141:eure.omft.near',
status: 'submitted',
result_code: 'quote_response_ok',
}, '2026-04-03T08:00:20.000Z');
assert.equal(transitions.length, 1);
assert.equal(transitions[0].alert_code, 'executor_submission_failed');
assert.equal(transitions[0].status, 'cleared');
});

View file

@ -1,110 +0,0 @@
import test from 'node:test';
import assert from 'node:assert/strict';
import {
buildFundingVisibility,
correlateFundingObservation,
} from '../src/core/funding-observations.mjs';
import { buildInventorySnapshot } from '../src/core/inventory.mjs';
import { routeHistoryRecord } from '../src/core/history-records.mjs';
test('pre-credit funding visibility remains non-spendable', () => {
const observation = correlateFundingObservation({
accountId: 'solver.near',
assetId: 'nep141:btc.omft.near',
chain: 'btc:mainnet',
fundingHandle: 'bc1qexample',
source: 'btc_mempool_space',
txHash: 'btc-tx-1',
amount: '1500',
confirmations: 0,
observedAt: '2026-04-03T08:00:00.000Z',
});
const inventory = buildInventorySnapshot({
accountId: 'solver.near',
balances: {
'nep141:btc.omft.near': '1000',
},
recentDeposits: [],
trackedWithdrawals: [],
assetRegistry: new Map([
['nep141:btc.omft.near', { decimals: 8 }],
]),
observedAt: '2026-04-03T08:01:00.000Z',
});
const visibility = buildFundingVisibility([observation], {
now: '2026-04-03T08:01:00.000Z',
});
assert.equal(observation.status, 'SEEN_UNCONFIRMED');
assert.equal(inventory.spendable['nep141:btc.omft.near'], '1000');
assert.equal(visibility.pre_credit_inbound['nep141:btc.omft.near'], '1500');
assert.equal(visibility.by_handle.bc1qexample.observations[0].spendable, false);
});
test('funding observation correlates to later credit without losing tx hash', () => {
const seen = correlateFundingObservation({
accountId: 'solver.near',
assetId: 'nep141:btc.omft.near',
chain: 'btc:mainnet',
fundingHandle: 'bc1qexample',
source: 'btc_mempool_space',
txHash: 'btc-tx-1',
amount: '1500',
confirmations: 2,
observedAt: '2026-04-03T08:00:00.000Z',
});
const credited = correlateFundingObservation({
existing: seen,
accountId: 'solver.near',
assetId: 'nep141:btc.omft.near',
chain: 'btc:mainnet',
fundingHandle: 'bc1qexample',
source: 'btc_mempool_space',
txHash: 'btc-tx-1',
amount: '1500',
confirmations: 3,
observedAt: '2026-04-03T08:10:00.000Z',
bridgeDeposit: {
tx_hash: 'btc-tx-1',
status: 'COMPLETED',
},
});
assert.equal(credited.status, 'CREDITED');
assert.equal(credited.tx_hash, 'btc-tx-1');
assert.equal(credited.bridge_deposit_tx_hash, 'btc-tx-1');
assert.equal(credited.funding_observation_id, seen.funding_observation_id);
assert.equal(credited.credited_at, '2026-04-03T08:10:00.000Z');
});
test('history writer routes funding observations into the funding table family', () => {
const routed = routeHistoryRecord({
topic: 'ops.funding_observation',
event: {
event_id: 'evt-funding-1',
event_type: 'funding_observation',
venue: 'near-intents',
schema_version: 1,
ingested_at: '2026-04-03T08:00:00.000Z',
payload: {
funding_observation_id: 'funding-1',
account_id: 'solver.near',
asset_id: 'nep141:btc.omft.near',
chain: 'btc:mainnet',
funding_handle: 'bc1qexample',
source: 'btc_mempool_space',
tx_hash: 'btc-tx-1',
status: 'SEEN_CONFIRMED',
amount: '1500',
confirmations: 2,
first_seen_at: '2026-04-03T08:00:00.000Z',
last_seen_at: '2026-04-03T08:05:00.000Z',
},
},
});
assert.equal(routed.table, 'funding_observations');
assert.equal(routed.record.decision_key, 'funding-1');
});

View file

@ -16,13 +16,6 @@ test('normalizeLiquidityState hydrates missing nested maps from persisted partia
assert.deepEqual(state.deposits, {});
assert.deepEqual(state.tracked_withdrawals, {});
assert.deepEqual(state.supported_tokens, {});
assert.deepEqual(state.funding_observations, {});
assert.deepEqual(state.funding_observations_by_handle, {});
assert.deepEqual(state.funding_visibility_by_asset, {});
assert.deepEqual(state.uncredited_funding_total_by_asset, {});
assert.deepEqual(state.credit_correlation, {});
assert.deepEqual(state.observer_health, {});
assert.equal(state.withdrawals_frozen, true);
assert.equal(state.paused, false);
assert.equal(state.funding_observer_paused, false);
});

View file

@ -1,28 +0,0 @@
# Adversarial review prompt
You are reviewing changes for a trading system.
Your job is to attack the work, not to praise it.
Look for:
1. Fake progress.
- Did the change make the system more real, or only more elaborate?
- Real means contact with live data, validated persistence, verified replay, or explicit evidence.
2. Smuggled scope.
- Did the diff stay inside the active proof or research charter?
3. Unstated assumptions.
- Prices, fees, latency, freshness, slippage, API guarantees, retention, and failure modes.
4. Placeholder dressed as real.
- Hardcoded values, mocks, TODOs, dead branches, or unverifiable claims.
5. Missing failure handling.
- Focus on real trading-system failures, not abstract defensive-programming trivia.
For every issue:
- state what it is
- state why it matters for this system
- state whether it should be fixed now, removed, or marked as still fake
Do not praise style.
Do not suggest adjacent features.
Do not expand scope.