Instrument executor submission latency
All checks were successful
deploy / deploy (push) Successful in 45s
All checks were successful
deploy / deploy (push) Successful in 45s
Proof: targeted executor and dashboard tests pass; npm test passes 239/239; operator dashboard bundle builds; git diff --check passes. Assumptions: timing fields are observational only and do not change quote response policy, signing, concurrency, arming, or live funds behavior. Still fake: venue-native terminal fill ids and fee-complete realized PnL remain unavailable.
This commit is contained in:
parent
1a071c5b10
commit
a4db57182c
7 changed files with 146 additions and 21 deletions
|
|
@ -1,4 +1,5 @@
|
|||
import process from 'node:process';
|
||||
import { performance } from 'node:perf_hooks';
|
||||
|
||||
import { createConsumer } from '../bus/kafka/consumer.mjs';
|
||||
import { createProducer } from '../bus/kafka/producer.mjs';
|
||||
|
|
@ -179,6 +180,7 @@ await consumer.run({
|
|||
|
||||
async function handleCommand(event) {
|
||||
const payload = event.payload;
|
||||
const timing = startExecutorTiming(event);
|
||||
state.last_command = payload;
|
||||
|
||||
const existing = stateStore.get(payload.command_id);
|
||||
|
|
@ -194,22 +196,22 @@ async function handleCommand(event) {
|
|||
}
|
||||
|
||||
if (isMakerSuppressedForOwnRequest()) {
|
||||
await publishResult(payload, {
|
||||
await publishResult(payload, withExecutorTiming({
|
||||
status: 'rejected',
|
||||
result_code: 'own_request_preflight_in_progress',
|
||||
note: 'Own request preflight is suppressing maker quote responses to avoid self-matching.',
|
||||
});
|
||||
}, timing));
|
||||
return;
|
||||
}
|
||||
|
||||
if (state.paused) return;
|
||||
|
||||
if (!state.armed) {
|
||||
await publishResult(payload, {
|
||||
await publishResult(payload, withExecutorTiming({
|
||||
status: 'rejected',
|
||||
result_code: 'executor_disarmed',
|
||||
note: 'executor is disarmed',
|
||||
});
|
||||
}, timing));
|
||||
return;
|
||||
}
|
||||
|
||||
|
|
@ -222,7 +224,7 @@ async function handleCommand(event) {
|
|||
message: expiry.reason,
|
||||
},
|
||||
});
|
||||
await publishResult(payload, {
|
||||
await publishResult(payload, withExecutorTiming({
|
||||
status: 'rejected',
|
||||
result_code: 'stale_execute_command',
|
||||
note: 'execute command deadline elapsed before relay submission',
|
||||
|
|
@ -230,7 +232,7 @@ async function handleCommand(event) {
|
|||
command_deadline_ms: expiry.deadline_ms == null ? null : String(expiry.deadline_ms),
|
||||
command_deadline_at: expiry.deadline_at,
|
||||
stale_reason: expiry.reason,
|
||||
});
|
||||
}, timing));
|
||||
return;
|
||||
}
|
||||
|
||||
|
|
@ -242,27 +244,46 @@ async function handleCommand(event) {
|
|||
state.in_flight_count += 1;
|
||||
|
||||
try {
|
||||
const currentSaltHex = await verifierClient.currentSalt();
|
||||
const submission = buildQuoteResponseSubmission({
|
||||
const saltStartMs = performance.now();
|
||||
let currentSaltHex;
|
||||
try {
|
||||
currentSaltHex = await verifierClient.currentSalt();
|
||||
} finally {
|
||||
recordExecutorTiming(timing, 'current_salt_ms', saltStartMs);
|
||||
}
|
||||
|
||||
const signStartMs = performance.now();
|
||||
let submission;
|
||||
try {
|
||||
submission = buildQuoteResponseSubmission({
|
||||
command: payload,
|
||||
signerAccountId: config.nearIntentsAccountId,
|
||||
signer,
|
||||
verifierContract: config.nearVerifierContract,
|
||||
currentSaltHex,
|
||||
});
|
||||
} finally {
|
||||
recordExecutorTiming(timing, 'sign_ms', signStartMs);
|
||||
}
|
||||
state.last_request = submission;
|
||||
|
||||
const response = await relayClient.request('quote_response', [submission], {
|
||||
const relayStartMs = performance.now();
|
||||
let response;
|
||||
try {
|
||||
response = await relayClient.request('quote_response', [submission], {
|
||||
timeoutMs: config.executorResponseTimeoutMs,
|
||||
});
|
||||
} finally {
|
||||
recordExecutorTiming(timing, 'relay_response_ms', relayStartMs);
|
||||
}
|
||||
state.last_venue_response = response;
|
||||
state.last_error = null;
|
||||
|
||||
await publishResult(payload, {
|
||||
await publishResult(payload, withExecutorTiming({
|
||||
status: 'submitted',
|
||||
result_code: response === 'OK' ? 'quote_response_ok' : 'quote_response_ack',
|
||||
venue_response: response,
|
||||
});
|
||||
}, timing));
|
||||
stateStore.markSubmitted(payload.command_id, {
|
||||
quote_id: payload.quote_id,
|
||||
result: response,
|
||||
|
|
@ -274,11 +295,11 @@ async function handleCommand(event) {
|
|||
quote_id: payload.quote_id,
|
||||
error: serializeError(error),
|
||||
});
|
||||
await publishResult(payload, {
|
||||
await publishResult(payload, withExecutorTiming({
|
||||
status: 'failed',
|
||||
result_code: 'submission_failed',
|
||||
error: serializeError(error),
|
||||
});
|
||||
}, timing));
|
||||
} finally {
|
||||
state.in_flight_count = Math.max(0, state.in_flight_count - 1);
|
||||
if (state.draining && state.in_flight_count === 0) {
|
||||
|
|
@ -287,6 +308,52 @@ async function handleCommand(event) {
|
|||
}
|
||||
}
|
||||
|
||||
function startExecutorTiming(event) {
|
||||
const receivedAt = new Date();
|
||||
const commandEventAtMs = Date.parse(
|
||||
event?.ingested_at
|
||||
|| event?.observed_at
|
||||
|| event?.payload?.decision_at
|
||||
|| '',
|
||||
);
|
||||
return {
|
||||
received_at: receivedAt.toISOString(),
|
||||
started_at_ms: performance.now(),
|
||||
command_event_age_ms: Number.isFinite(commandEventAtMs)
|
||||
? roundTimingMs(receivedAt.getTime() - commandEventAtMs)
|
||||
: null,
|
||||
};
|
||||
}
|
||||
|
||||
function recordExecutorTiming(timing, field, startedAtMs) {
|
||||
if (!timing) return;
|
||||
timing[field] = roundTimingMs(performance.now() - startedAtMs);
|
||||
}
|
||||
|
||||
function withExecutorTiming(payload, timing) {
|
||||
return {
|
||||
...payload,
|
||||
executor_timing: finishExecutorTiming(timing),
|
||||
};
|
||||
}
|
||||
|
||||
function finishExecutorTiming(timing) {
|
||||
if (!timing) return null;
|
||||
return {
|
||||
received_at: timing.received_at,
|
||||
command_event_age_ms: timing.command_event_age_ms,
|
||||
current_salt_ms: timing.current_salt_ms ?? null,
|
||||
sign_ms: timing.sign_ms ?? null,
|
||||
relay_response_ms: timing.relay_response_ms ?? null,
|
||||
executor_total_ms: roundTimingMs(performance.now() - timing.started_at_ms),
|
||||
};
|
||||
}
|
||||
|
||||
function roundTimingMs(value) {
|
||||
const number = Number(value);
|
||||
return Number.isFinite(number) ? Math.round(number * 1000) / 1000 : null;
|
||||
}
|
||||
|
||||
async function publishResult(command, extraPayload) {
|
||||
const event = buildEventEnvelope({
|
||||
source: 'trade-executor',
|
||||
|
|
|
|||
|
|
@ -2415,6 +2415,7 @@ function normalizeLiveExecutionResult(payload, event) {
|
|||
venue_response: payload.venue_response || null,
|
||||
error_message: payload.error?.message || null,
|
||||
note: payload.note || null,
|
||||
timing: payload.executor_timing || null,
|
||||
};
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -3617,6 +3617,7 @@ function normalizeExecutionResultRow(row) {
|
|||
venue_response: resultPayload.venue_response || null,
|
||||
error_message: resultPayload.error?.message || null,
|
||||
note: resultPayload.note || null,
|
||||
timing: resultPayload.executor_timing || null,
|
||||
};
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -34,6 +34,29 @@ function formatRelativeAge(value, now) {
|
|||
return age === 'Unavailable' ? 'Age unavailable' : `${age} ago`;
|
||||
}
|
||||
|
||||
function formatTimingMs(value) {
|
||||
const number = Number(value);
|
||||
if (!Number.isFinite(number)) return null;
|
||||
return `${number < 10 ? number.toFixed(1) : number.toFixed(0)} ms`;
|
||||
}
|
||||
|
||||
function formatExecutionTiming(timing) {
|
||||
if (!timing) return null;
|
||||
const parts = [
|
||||
['total', timing.executor_total_ms],
|
||||
['cmd age', timing.command_event_age_ms],
|
||||
['salt', timing.current_salt_ms],
|
||||
['sign', timing.sign_ms],
|
||||
['relay', timing.relay_response_ms],
|
||||
]
|
||||
.map(([label, value]) => {
|
||||
const formatted = formatTimingMs(value);
|
||||
return formatted ? `${label} ${formatted}` : null;
|
||||
})
|
||||
.filter(Boolean);
|
||||
return parts.length ? `Timing: ${parts.join(', ')}` : null;
|
||||
}
|
||||
|
||||
function IdentifierRow({ label, value }) {
|
||||
if (!value) return <div className="status-subtle">{`${label}: unavailable`}</div>;
|
||||
|
||||
|
|
@ -112,6 +135,8 @@ function StageCard({ title, at, status, children }) {
|
|||
}
|
||||
|
||||
function LifecycleDetails({ item }) {
|
||||
const executionTiming = formatExecutionTiming(item.execution?.timing);
|
||||
|
||||
return (
|
||||
<div className="lifecycle-detail-panel">
|
||||
<div className="lifecycle-stage-grid">
|
||||
|
|
@ -135,6 +160,7 @@ function LifecycleDetails({ item }) {
|
|||
<div>{item.execution?.result_code || 'No executor result code stored'}</div>
|
||||
{item.execution?.error_message ? <div className="status-subtle">{item.execution.error_message}</div> : null}
|
||||
{!item.execution?.error_message && item.execution?.note ? <div className="status-subtle">{item.execution.note}</div> : null}
|
||||
{executionTiming ? <div className="status-subtle">{executionTiming}</div> : null}
|
||||
{item.execution?.status === 'submitted' ? (
|
||||
<div className="status-subtle">Submitted means the relay accepted the response; it does not prove a trade.</div>
|
||||
) : null}
|
||||
|
|
|
|||
|
|
@ -29,6 +29,11 @@ test('strategy page owns consolidated quote lifecycle and successful trade table
|
|||
assert.match(strategySource, /setDisplayItems\(items \|\| \[\]\)/);
|
||||
assert.match(strategySource, /item\.execution\?\.error_message/);
|
||||
assert.match(strategySource, /item\.execution\?\.note/);
|
||||
assert.match(strategySource, /formatExecutionTiming/);
|
||||
assert.match(strategySource, /item\.execution\?\.timing/);
|
||||
assert.match(strategySource, /current_salt_ms/);
|
||||
assert.match(strategySource, /relay_response_ms/);
|
||||
assert.match(strategySource, /Timing:/);
|
||||
assert.match(strategySource, /item\.execution\?\.status === 'submitted'/);
|
||||
assert.match(strategySource, /Submitted means the relay accepted the response; it does not prove a trade\./);
|
||||
assert.doesNotMatch(strategySource, /Actionable|actionable/);
|
||||
|
|
|
|||
|
|
@ -326,6 +326,13 @@ test('live quote updates stay capped and publish lifecycle rows without refresh'
|
|||
pair: config.activePair,
|
||||
status: 'submitted',
|
||||
result_code: 'quote_response_ok',
|
||||
executor_timing: {
|
||||
command_event_age_ms: 3,
|
||||
current_salt_ms: 42,
|
||||
sign_ms: 1.5,
|
||||
relay_response_ms: 111,
|
||||
executor_total_ms: 157,
|
||||
},
|
||||
},
|
||||
},
|
||||
});
|
||||
|
|
@ -336,6 +343,13 @@ test('live quote updates stay capped and publish lifecycle rows without refresh'
|
|||
assert.ok(submittedUpdates.find((update) => update.type === 'status_bar.updated'));
|
||||
assert.equal(submittedLifecycleUpdate.recent_lifecycle_rows[0].quote_id, 'quote-10');
|
||||
assert.equal(submittedLifecycleUpdate.recent_lifecycle_rows[0].lifecycle_state, 'submitted');
|
||||
assert.deepEqual(submittedLifecycleUpdate.recent_lifecycle_rows[0].execution.timing, {
|
||||
command_event_age_ms: 3,
|
||||
current_salt_ms: 42,
|
||||
sign_ms: 1.5,
|
||||
relay_response_ms: 111,
|
||||
executor_total_ms: 157,
|
||||
});
|
||||
assert.doesNotMatch(
|
||||
`${submittedLifecycleUpdate.recent_lifecycle_rows[0].lifecycle_label} ${submittedLifecycleUpdate.recent_lifecycle_rows[0].reason_text}`,
|
||||
/completed|successful trade|asset delta/i,
|
||||
|
|
|
|||
|
|
@ -25,3 +25,14 @@ test('trade executor exposes summarized durable command state', () => {
|
|||
assert.match(source, /stateStore\.getSummary\(\{ limit: 50 \}\)/);
|
||||
assert.doesNotMatch(source, /durable_state:\s*stateStore\.getState\(\)/);
|
||||
});
|
||||
|
||||
test('trade executor records hot path timing in result payloads', () => {
|
||||
assert.match(source, /node:perf_hooks/);
|
||||
assert.match(source, /startExecutorTiming\(event\)/);
|
||||
assert.match(source, /executor_timing/);
|
||||
assert.match(source, /current_salt_ms/);
|
||||
assert.match(source, /sign_ms/);
|
||||
assert.match(source, /relay_response_ms/);
|
||||
assert.match(source, /executor_total_ms/);
|
||||
assert.match(source, /withExecutorTiming\(\{[\s\S]*?result_code: 'submission_failed'/);
|
||||
});
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue