Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 42 additions & 7 deletions application/rebalance_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,43 @@ def run_strategy_cycle(
run_period=run_period,
)
if is_duplicate_live_run(existing_run):
duplicate_stage = str(existing_run.get("stage") or "NO_ACTION")
duplicate_skipped_orders = [
{
"reason": "duplicate_live_strategy_run",
"run_period": run_period,
}
]
strategy_run_persisted = False
strategy_run_persistence_error = None
duplicate_state = build_strategy_run_state(
stage=duplicate_stage,

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Use a distinct history stage for duplicate runs

When a duplicate request is processed in the same second as the terminal run it is skipping, persisting it with the original duplicate_stage can overwrite the original history object: persist_strategy_run_state names history entries from run_period, stage, and a seconds-resolution timestamp. In that case the audit entry containing the actual submitted orders can be replaced by this duplicate payload with submitted_orders: [], leaving only the heartbeat in history; use a duplicate-specific stage/suffix or otherwise unique history key for skipped duplicates.

Useful? React with 👍 / 👎.

account=masked_account,
strategy_profile=strategy_runtime.profile,
strategy_display_name=strategy_runtime.display_name,
run_period=run_period,
dry_run_only=settings.dry_run_only,
live_trading_enabled=settings.live_trading_enabled,
session_reused=bool(getattr(client, "session_reused", False)),
portfolio_snapshot=plan.get("portfolio", {}),
evaluation_metadata=getattr(evaluation, "metadata", None),
plan=plan,
skipped_orders=duplicate_skipped_orders,
action_done=False,
now=now,
)
duplicate_state["idempotency_skipped"] = True
duplicate_state["existing_strategy_run_stage"] = existing_run.get("stage")
duplicate_state["existing_strategy_run_as_of"] = existing_run.get("as_of")
try:
strategy_run_persisted = persist_strategy_run_state(
store=store,
state=duplicate_state,
now=now,
)
except Exception as exc:
strategy_run_persisted = False
strategy_run_persistence_error = f"{type(exc).__name__}: {exc}"
result = {
"ok": True,
"api_kind": "unofficial-reverse-engineered",
Expand All @@ -383,20 +420,18 @@ def run_strategy_cycle(
"live_trading_enabled": settings.live_trading_enabled,
"session_reused": bool(getattr(client, "session_reused", False)),
"strategy_run_period": run_period,
"strategy_run_persisted": False,
"strategy_run_stage": duplicate_stage,
"strategy_run_persisted": strategy_run_persisted,
"idempotency_skipped": True,
"existing_strategy_run_stage": existing_run.get("stage"),
"existing_strategy_run_as_of": existing_run.get("as_of"),
"submitted_orders": [],
"skipped_orders": [
{
"reason": "duplicate_live_strategy_run",
"run_period": run_period,
}
],
"skipped_orders": duplicate_skipped_orders,
"action_done": False,
**empty_strategy_plugin_alert_report_fields(),
}
if strategy_run_persistence_error:
result["strategy_run_persistence_error"] = strategy_run_persistence_error
return attach_strategy_plugin_result(
result,
signals=strategy_plugin_signals,
Expand Down
38 changes: 35 additions & 3 deletions tests/test_rebalance_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,10 @@ def write_json(self, key, payload):
return True


def _latest_strategy_run_payloads(store: FakeStateStore) -> list[dict]:
return [payload for key, payload in store.writes if key.endswith("latest.json")]


def test_notification_i18n_keys_are_aligned():
assert set(I18N["zh"]) == set(I18N["en"])
assert build_translator("zh")("account_label", account="****1234") == "🆔 账户: ****1234"
Expand Down Expand Up @@ -465,8 +469,18 @@ def fake_client_factory(*args, **kwargs):

assert result["idempotency_skipped"] is True
assert result["action_done"] is False
assert result["strategy_run_stage"] == "SUBMITTED"
assert result["strategy_run_persisted"] is True
assert observed["client"].orders == []
assert store.writes == []
latest_payloads = _latest_strategy_run_payloads(store)
assert len(latest_payloads) == 1
assert latest_payloads[0]["stage"] == "SUBMITTED"
assert latest_payloads[0]["idempotency_skipped"] is True
assert latest_payloads[0]["existing_strategy_run_stage"] == "SUBMITTED"
assert latest_payloads[0]["skipped_orders"] == [
{"reason": "duplicate_live_strategy_run", "run_period": "2026-05"}
]
assert len(store.writes) == 2


def test_run_strategy_cycle_skips_duplicate_live_monthly_no_action(monkeypatch):
Expand Down Expand Up @@ -510,8 +524,18 @@ def fake_client_factory(*args, **kwargs):
assert result["idempotency_skipped"] is True
assert result["existing_strategy_run_stage"] == "NO_ACTION"
assert result["action_done"] is False
assert result["strategy_run_stage"] == "NO_ACTION"
assert result["strategy_run_persisted"] is True
assert observed["client"].orders == []
assert store.writes == []
latest_payloads = _latest_strategy_run_payloads(store)
assert len(latest_payloads) == 1
assert latest_payloads[0]["stage"] == "NO_ACTION"
assert latest_payloads[0]["idempotency_skipped"] is True
assert latest_payloads[0]["existing_strategy_run_stage"] == "NO_ACTION"
assert latest_payloads[0]["skipped_orders"] == [
{"reason": "duplicate_live_strategy_run", "run_period": "2026-05"}
]
assert len(store.writes) == 2


def test_run_strategy_cycle_persists_live_execution_blocked_without_terminal_stage(monkeypatch):
Expand Down Expand Up @@ -610,7 +634,15 @@ def evaluate(self, **inputs):

assert second_result["idempotency_skipped"] is True
assert second_result["existing_strategy_run_stage"] == "FUNDING_BLOCKED"
assert len(store.writes) == write_count
assert second_result["strategy_run_stage"] == "FUNDING_BLOCKED"
assert second_result["strategy_run_persisted"] is True
assert len(store.writes) == write_count + 2
latest_payloads = _latest_strategy_run_payloads(store)
duplicate_payload = latest_payloads[-1]
assert duplicate_payload["stage"] == "FUNDING_BLOCKED"
assert duplicate_payload["idempotency_skipped"] is True
assert duplicate_payload["existing_strategy_run_stage"] == "FUNDING_BLOCKED"
assert duplicate_payload["skipped_orders"][0]["reason"] == "duplicate_live_strategy_run"


def test_run_strategy_cycle_persists_live_partial_submission_as_non_terminal(monkeypatch):
Expand Down