From f37f118e746b3225d2bda91a3a322c1b95aa7103 Mon Sep 17 00:00:00 2001 From: raychen <815315825@qq.com> Date: Thu, 11 Jun 2026 15:11:43 +0800 Subject: [PATCH] =?UTF-8?q?feat(session):=20=E4=BD=BF=E7=94=A8=20active/hi?= =?UTF-8?q?storical=20=E5=8F=8C=E5=88=97=E8=A1=A8=E4=BC=98=E5=8C=96?= =?UTF-8?q?=E4=BC=9A=E8=AF=9D=E5=8E=86=E5=8F=B2=E5=AD=98=E5=82=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 本次调整将会话历史从原先依赖 Event model_flags 区分模型可见性,改为使用 Session.events 与 Session.historical_events 分别存储 active 模型窗口和历史数据,减少内部遍历与过滤开销,并统一 Redis、SQL、InMemory 后端的持久化语义。 主要变更: - Session.events 现在只保留模型可见的 active event window。 - 新增 Session.historical_events,用于按需保存被 max_events、TTL 或 summary 移出 active window 的历史事件。 - 新增 SessionServiceConfig.store_historical_events,控制是否保存历史事件。 - 保留 Event.is_model_visible() / set_model_visible() 公共接口用于兼容,但 SDK 内部不再依赖该 flag 进行模型历史过滤。 - 调整 Session.add_event() / apply_event_filtering() 内部实现,新增内部 helper 返回本次被裁剪的事件,避免影响公共返回值兼容。 - Summary 压缩后使用 [summary_event, recent_events...] 作为新的 active window,旧 summary 会参与下一轮 re-summary,避免历史摘要丢失。 - Summarizer checker 只跳过首位 summary anchor,避免每次反向扫描 events,降低大窗口下的触发检查成本。 - RedisSessionService 写回裁剪后的 active events 与 historical_events,同时保留 app/user/session/temp state 的分层存储语义。 - SqlSessionService 将 active events 存在 events 表,将 historical_events 存在 sessions 表 JSON 字段;append 时仅在本次发生裁剪时删除对应旧 event rows,避免每次全量同步。 - InMemory、Redis、SQL 的 list_sessions() 均不返回 events 和 historical_events,避免列表接口返回大量历史数据。 - MemPalace、Mem0、SQL/Redis/InMemory memory service 适配新的 active events 语义,不再使用 is_model_visible() 过滤。 - 更新相关单测,覆盖 Redis/SQL active-historical 持久化、state 分层、summary re-compress、list_sessions 不返回 historical_events 等场景。 兼容性说明: - Event.is_model_visible() / set_model_visible() 仍保留,避免业务调用时报错。 - SDK 内部模型可见性以 Session.events / Session.historical_events 的归属为准。 - Redis/SQL 后端仅在未显式传入 session_config 时默认开启 store_historical_events;显式传入 SessionServiceConfig() 时仍按默认 False,不保存历史事件。 - SQL stale reload 场景保留现有 best-effort 行为,并通过注释说明并发写入下可能存在窗口不一致,需要更重的版本控制/锁机制才能彻底解决。 --- docs/mkdocs/en/memory.md | 1 - docs/mkdocs/zh/memory.md | 1 - .../memory_service_with_mempalace/README.md | 2 - .../run_agent.py | 1 - tests/agents/core/test_history_processor.py | 7 +- tests/memory/test_mempalace_memory_service.py | 28 +++- tests/sessions/test_base_session_service.py | 57 +++++--- .../test_in_memory_session_service.py | 1 + tests/sessions/test_redis_session_service.py | 44 +++++- tests/sessions/test_session.py | 118 ++++++++++------- tests/sessions/test_session_summarizer.py | 125 +++++++++++------- tests/sessions/test_sql_session_service.py | 36 ++++- tests/sessions/test_summarizer_checker.py | 94 +++++++++++++ tests/sessions/test_utils.py | 35 ++--- trpc_agent_sdk/agents/_langgraph_agent.py | 6 +- .../agents/core/_history_processor.py | 6 - trpc_agent_sdk/dsl/graph/_graph_agent.py | 4 - .../dsl/graph/_node_action/_agent.py | 4 +- .../memory/_in_memory_memory_service.py | 2 - .../memory/_redis_memory_service.py | 2 +- trpc_agent_sdk/memory/_sql_memory_service.py | 2 - trpc_agent_sdk/memory/mem0_memory_service.py | 4 +- .../memory/mempalace_memory_service.py | 5 +- .../server/a2a/_remote_a2a_agent.py | 2 - .../server/agents/claude/_claude_agent.py | 4 - .../sessions/_base_session_service.py | 36 ++++- .../sessions/_in_memory_session_service.py | 7 +- .../sessions/_redis_session_service.py | 27 ++-- trpc_agent_sdk/sessions/_session.py | 77 ++++++----- .../sessions/_session_summarizer.py | 50 ++++--- .../sessions/_sql_session_service.py | 64 ++++++++- .../sessions/_summarizer_checker.py | 32 ++++- .../sessions/_summarizer_manager.py | 20 +-- trpc_agent_sdk/sessions/_types.py | 2 + trpc_agent_sdk/sessions/_utils.py | 35 ++--- 35 files changed, 648 insertions(+), 293 deletions(-) diff --git a/docs/mkdocs/en/memory.md b/docs/mkdocs/en/memory.md index a826c226..aab85240 100644 --- a/docs/mkdocs/en/memory.md +++ b/docs/mkdocs/en/memory.md @@ -1248,7 +1248,6 @@ memory_service = MempalaceMemoryService( ), wing="my_app_user", room="conversations", - store_only_model_visible=True, ) ``` diff --git a/docs/mkdocs/zh/memory.md b/docs/mkdocs/zh/memory.md index 0432784a..7cfd11df 100644 --- a/docs/mkdocs/zh/memory.md +++ b/docs/mkdocs/zh/memory.md @@ -619,7 +619,6 @@ memory_service = MempalaceMemoryService( memory_service_config=memory_service_config, wing="my_app_user", # 可选:记忆命名空间;不传则默认由 save_key 推导 room="conversations", # 可选:记忆类别;默认 conversations - store_only_model_visible=True, ) ``` diff --git a/examples/memory_service_with_mempalace/README.md b/examples/memory_service_with_mempalace/README.md index 1318fece..f64664c7 100644 --- a/examples/memory_service_with_mempalace/README.md +++ b/examples/memory_service_with_mempalace/README.md @@ -62,7 +62,6 @@ memory_service = MempalaceMemoryService( memory_service_config=memory_service_config, wing="trpc-agent", room="conversations", - store_only_model_visible=True, ) ``` @@ -70,7 +69,6 @@ memory_service = MempalaceMemoryService( - `wing="trpc-agent"`:把示例记忆固定写入 `trpc-agent` 这个 wing。 - `room="conversations"`:把普通对话记忆写入 `conversations` room。 -- `store_only_model_visible=True`:只存模型可见的事件。 - `ttl_seconds=20`:超过 20 秒的记忆会被后台 cleanup 删除。 - `cleanup_interval_seconds=20`:每 20 秒执行一次清理。 diff --git a/examples/memory_service_with_mempalace/run_agent.py b/examples/memory_service_with_mempalace/run_agent.py index 6dbe3cc6..62e740a0 100644 --- a/examples/memory_service_with_mempalace/run_agent.py +++ b/examples/memory_service_with_mempalace/run_agent.py @@ -44,7 +44,6 @@ def create_memory_service(): memory_service_config=memory_service_config, wing="trpc-agent", room="conversations", - store_only_model_visible=True, ) return memory_service diff --git a/tests/agents/core/test_history_processor.py b/tests/agents/core/test_history_processor.py index ebbdebb8..9cfd52cc 100644 --- a/tests/agents/core/test_history_processor.py +++ b/tests/agents/core/test_history_processor.py @@ -116,7 +116,7 @@ def test_invocation_mode_filters_by_id(self, invocation_context): assert len(events) == 1 assert events[0].content.parts[0].text == "current" - def test_invocation_mode_includes_summary_events(self, invocation_context): + def test_invocation_mode_filters_summary_events_by_id(self, invocation_context): proc = HistoryProcessor(timeline_filter_mode=TimelineFilterMode.INVOCATION) summary_event = _make_event("system", "Previous conversation summary", invocation_id="summary") summary_event.set_summary_event(True) @@ -124,9 +124,8 @@ def test_invocation_mode_includes_summary_events(self, invocation_context): events = proc.filter_events(invocation_context, [summary_event, current_event]) - assert len(events) == 2 - assert events[0].is_summary_event() - assert events[1].content.parts[0].text == "current" + assert len(events) == 1 + assert events[0].content.parts[0].text == "current" # --------------------------------------------------------------------------- diff --git a/tests/memory/test_mempalace_memory_service.py b/tests/memory/test_mempalace_memory_service.py index d98137f0..d45a099e 100644 --- a/tests/memory/test_mempalace_memory_service.py +++ b/tests/memory/test_mempalace_memory_service.py @@ -91,7 +91,7 @@ def fake_store(session, events_to_store, wing, room): assert "remember this" in events_to_store[0][1] assert events_to_store[0][2] in svc._stored_drawer_ids - async def test_store_session_skips_invisible_events(self, monkeypatch): + async def test_store_session_ignores_model_visible_flag(self, monkeypatch): calls = [] def fake_store(session, events_to_store, wing, room): @@ -99,17 +99,35 @@ def fake_store(session, events_to_store, wing, room): return {drawer_id for _, _, drawer_id in events_to_store} visible_event = _make_event("visible") - invisible_event = _make_event("hidden") - invisible_event.set_model_visible(False) + flagged_event = _make_event("hidden") + flagged_event.set_model_visible(False) svc = MempalaceMemoryService(memory_service_config=_make_config()) monkeypatch.setattr(svc, "_store_events", fake_store) - await svc.store_session(_make_session(events=[visible_event, invisible_event])) + await svc.store_session(_make_session(events=[visible_event, flagged_event])) await svc.close() assert len(calls) == 1 - assert len(calls[0]) == 1 + assert len(calls[0]) == 2 assert "visible" in calls[0][0][1] + assert "hidden" in calls[0][1][1] + + async def test_store_only_model_visible_flag_is_compatibility_noop(self, monkeypatch): + calls = [] + + def fake_store(session, events_to_store, wing, room): + calls.append(events_to_store) + return {drawer_id for _, _, drawer_id in events_to_store} + + svc = MempalaceMemoryService(memory_service_config=_make_config(), store_only_model_visible=False) + monkeypatch.setattr(svc, "_store_events", fake_store) + + await svc.store_session(_make_session(events=[_make_event("active event")])) + await svc.close() + + assert len(calls) == 1 + assert len(calls[0]) == 1 + assert "active event" in calls[0][0][1] async def test_store_session_is_incremental(self, monkeypatch): calls = [] diff --git a/tests/sessions/test_base_session_service.py b/tests/sessions/test_base_session_service.py index f15d3732..bcb63037 100644 --- a/tests/sessions/test_base_session_service.py +++ b/tests/sessions/test_base_session_service.py @@ -141,6 +141,18 @@ async def test_append_event_empty_state_delta(self): await svc.append_event(session, event) assert len(session.events) == 1 + async def test_append_event_stores_filtered_events_when_configured(self): + config = SessionServiceConfig(max_events=2, store_historical_events=True) + svc = ConcreteSessionService(session_config=config) + session = _make_session() + + for i in range(6): + event = _make_event(author="user" if i == 2 else "agent", text=f"msg{i}") + await svc.append_event(session, event) + + assert [event.get_text() for event in session.events] == ["msg2", "msg5"] + assert [event.get_text() for event in session.historical_events] == ["msg0", "msg1", "msg3", "msg4"] + class TestBaseSessionServiceTrimTempDeltaState: """Test _trim_temp_delta_state method.""" @@ -172,10 +184,23 @@ def test_filter_by_num_recent_events(self): for i in range(10): author = "user" if i == 7 else "agent" session.events.append(_make_event(author=author, text=f"msg{i}")) - svc.filter_events(session) + filtered_session = svc.filter_events(session) + assert filtered_session is session + assert [event.get_text() for event in session.events] == ["msg7", "msg8", "msg9"] + + def test_filter_by_num_recent_events_with_copy(self): + config = SessionServiceConfig(num_recent_events=3) + svc = ConcreteSessionService(session_config=config) + session = _make_session() + for i in range(10): + author = "user" if i == 7 else "agent" + session.events.append(_make_event(author=author, text=f"msg{i}")) + + filtered_session = svc.filter_events(session, need_copy=True) + + assert filtered_session is not session assert len(session.events) == 10 - visible_events = [event for event in session.events if event.is_model_visible()] - assert [event.get_text() for event in visible_events] == ["msg7", "msg8", "msg9"] + assert [event.get_text() for event in filtered_session.events] == ["msg7", "msg8", "msg9"] def test_filter_by_event_ttl(self): config = SessionServiceConfig(event_ttl_seconds=5.0) @@ -190,18 +215,18 @@ def test_filter_by_event_ttl(self): new_event.timestamp = time.time() session.events.append(new_event) - svc.filter_events(session) - assert len(session.events) == 2 - visible_events = [event for event in session.events if event.is_model_visible()] - assert len(visible_events) == 1 - assert visible_events[0].get_text() == "new" + filtered_session = svc.filter_events(session) + assert filtered_session is session + assert len(session.events) == 1 + assert session.events[0].get_text() == "new" def test_filter_no_config(self): svc = ConcreteSessionService() session = _make_session() for i in range(5): session.events.append(_make_event(text=f"msg{i}")) - svc.filter_events(session) + filtered_session = svc.filter_events(session) + assert filtered_session is session assert len(session.events) == 5 def test_filter_ttl_removes_all_old(self): @@ -212,9 +237,9 @@ def test_filter_ttl_removes_all_old(self): e = _make_event(text=f"old{i}") e.timestamp = time.time() - 100 session.events.append(e) - svc.filter_events(session) - assert len(session.events) == 5 - assert all(not event.is_model_visible() for event in session.events) + filtered_session = svc.filter_events(session) + assert filtered_session is session + assert session.events == [] def test_filter_by_num_recent_events_preserves_summary_anchor(self): config = SessionServiceConfig(num_recent_events=3) @@ -227,11 +252,11 @@ def test_filter_by_num_recent_events_preserves_summary_anchor(self): for i in range(5): session.events.append(_make_event(text=f"agent{i}")) - svc.filter_events(session) + filtered_session = svc.filter_events(session) - visible_events = [event for event in session.events if event.is_model_visible()] - assert len(visible_events) == 1 - assert visible_events[0].is_summary_event() + assert filtered_session is session + assert len(session.events) == 1 + assert session.events[0].is_summary_event() class TestBaseSessionServiceSetSummarizerManager: diff --git a/tests/sessions/test_in_memory_session_service.py b/tests/sessions/test_in_memory_session_service.py index ddfaad7a..6b33bcd2 100644 --- a/tests/sessions/test_in_memory_session_service.py +++ b/tests/sessions/test_in_memory_session_service.py @@ -235,6 +235,7 @@ async def test_list_sessions_have_no_events(self): result = await svc.list_sessions(app_name="app", user_id="user") assert len(result.sessions) == 1 assert result.sessions[0].events == [] + assert result.sessions[0].historical_events == [] await svc.close() async def test_list_nonexistent_app(self): diff --git a/tests/sessions/test_redis_session_service.py b/tests/sessions/test_redis_session_service.py index f1ed6b92..963fe218 100644 --- a/tests/sessions/test_redis_session_service.py +++ b/tests/sessions/test_redis_session_service.py @@ -27,8 +27,12 @@ from trpc_agent_sdk.types import Content, EventActions, Part, State -def _make_config(ttl_seconds=0, cleanup_interval=0.0, enable_ttl=False): - config = SessionServiceConfig() +def _make_config(ttl_seconds=0, + cleanup_interval=0.0, + enable_ttl=False, + max_events=0, + store_historical_events=False): + config = SessionServiceConfig(max_events=max_events, store_historical_events=store_historical_events) if enable_ttl: config.ttl = SessionServiceConfig.create_ttl_config( enable=True, ttl_seconds=ttl_seconds, cleanup_interval_seconds=cleanup_interval) @@ -203,6 +207,7 @@ async def test_list_sessions_have_no_events(self): result = await svc.list_sessions(app_name="app", user_id="user") for s in result.sessions: assert s.events == [] + assert s.historical_events == [] await svc.close() @@ -256,6 +261,28 @@ async def test_append_with_state_delta(self): assert stored.state[f"{State.USER_PREFIX}user_key"] == "uv" await svc.close() + async def test_append_does_not_persist_merged_or_temp_state_in_session_json(self): + svc = _create_service() + session = await svc.create_session(app_name="app", user_id="user", session_id="s1") + event = _make_event(state_delta={ + "session_key": "sv", + f"{State.APP_PREFIX}app_key": "av", + f"{State.USER_PREFIX}user_key": "uv", + f"{State.TEMP_PREFIX}temp_key": "tv", + }) + + await svc.append_event(session, event) + + stored_json = svc._redis_storage._store["session:app:user:s1"] + raw_session = Session.model_validate_json(stored_json) + assert raw_session.state == {"session_key": "sv"} + stored = await svc.get_session(app_name="app", user_id="user", session_id="s1") + assert stored.state["session_key"] == "sv" + assert stored.state[f"{State.APP_PREFIX}app_key"] == "av" + assert stored.state[f"{State.USER_PREFIX}user_key"] == "uv" + assert f"{State.TEMP_PREFIX}temp_key" not in raw_session.state + await svc.close() + async def test_append_to_nonexistent_session(self): svc = _create_service() session = _make_session_obj(id="nonexistent") @@ -264,6 +291,19 @@ async def test_append_to_nonexistent_session(self): assert result is event await svc.close() + async def test_append_persists_filtered_active_and_historical_events(self): + config = _make_config(max_events=2, store_historical_events=True) + svc = _create_service(config=config) + session = await svc.create_session(app_name="app", user_id="user", session_id="s1") + + for i in range(4): + await svc.append_event(session, _make_event(author="user" if i == 2 else "agent", text=f"msg{i}")) + + stored = await svc.get_session(app_name="app", user_id="user", session_id="s1") + assert [event.get_text() for event in stored.events] == ["msg2", "msg3"] + assert [event.get_text() for event in stored.historical_events] == ["msg0", "msg1"] + await svc.close() + class TestRedisUpdateSession: async def test_update_existing(self): diff --git a/tests/sessions/test_session.py b/tests/sessions/test_session.py index d0411bf8..151fa829 100644 --- a/tests/sessions/test_session.py +++ b/tests/sessions/test_session.py @@ -81,12 +81,35 @@ def test_apply_event_filtering_max_events(self): # Apply filtering with max_events=5 session.apply_event_filtering(max_events=5) - # Filtering hides model-invisible events instead of deleting them. - visible_events = [event for event in session.events if event.is_model_visible()] - assert len(session.events) == 10 - assert len(visible_events) == 4 - assert visible_events[0].get_text() == "Message 6" - assert visible_events[-1].get_text() == "Message 9" + assert len(session.events) == 4 + assert session.events[0].get_text() == "Message 6" + assert session.events[-1].get_text() == "Message 9" + + def test_apply_event_filtering_can_store_filtered_events(self): + """Test filtered events can be moved into historical_events.""" + session = Session( + id="test-session", + app_name="test-app", + user_id="test-user", + save_key="test-key", + ) + + for i in range(6): + event = Event(author="user" if i == 2 else "agent", + content=Content(parts=[Part.from_text(text=f"Message {i}")])) + session.events.append(event) + + session.apply_event_filtering(max_events=2, store_filtered_events=True) + + assert [event.get_text() for event in session.events] == ["Message 2"] + assert [event.get_text() for event in session.historical_events] == [ + "Message 0", + "Message 1", + "Message 3", + "Message 4", + "Message 5", + ] + assert all(event.is_model_visible() for event in session.historical_events) def test_apply_event_filtering_ttl(self): """Test event filtering with TTL.""" @@ -119,12 +142,10 @@ def test_apply_event_filtering_ttl(self): # Apply TTL filtering with 2 seconds session.apply_event_filtering(event_ttl_seconds=2.0) - # TTL + user-anchor fallback keeps only the last user message visible. - visible_events = [event for event in session.events if event.is_model_visible()] - assert len(session.events) == 6 - assert len(visible_events) == 1 - assert visible_events[0].author == "user" - assert "Old user message 1" in visible_events[0].get_text() + # TTL + user-anchor fallback keeps only the last user message. + assert len(session.events) == 1 + assert session.events[0].author == "user" + assert "Old user message 1" in session.events[0].get_text() def test_apply_event_filtering_ttl_and_max_events(self): """Test event filtering with both TTL and max_events.""" @@ -154,12 +175,9 @@ def test_apply_event_filtering_ttl_and_max_events(self): # Apply both filters session.apply_event_filtering(event_ttl_seconds=5.0, max_events=5) - # Filtering hides model-invisible events instead of deleting them. - visible_events = [event for event in session.events if event.is_model_visible()] - assert len(session.events) == 20 - assert len(visible_events) == 4 - assert visible_events[0].get_text() == "Recent 6" - assert visible_events[-1].get_text() == "Recent 9" + assert len(session.events) == 4 + assert session.events[0].get_text() == "Recent 6" + assert session.events[-1].get_text() == "Recent 9" def test_apply_event_filtering_preserves_last_user_message(self): """Test that filtering preserves the last user message when all events are filtered.""" @@ -190,12 +208,10 @@ def test_apply_event_filtering_preserves_last_user_message(self): # Apply strict TTL filter that would remove all events session.apply_event_filtering(event_ttl_seconds=2.0) - # All events are old, but last user message remains model-visible. - visible_events = [event for event in session.events if event.is_model_visible()] - assert len(session.events) == 7 - assert len(visible_events) == 1 - assert visible_events[0].author == "user" - assert visible_events[0].get_text() == "Last user message" + # All events are old, but last user message remains as the anchor. + assert len(session.events) == 1 + assert session.events[0].author == "user" + assert session.events[0].get_text() == "Last user message" def test_apply_event_filtering_empty_events(self): """Test event filtering with no events.""" @@ -230,10 +246,7 @@ def test_apply_event_filtering_all_filtered_no_user_message(self): # Apply strict TTL filter session.apply_event_filtering(event_ttl_seconds=2.0) - # All events are hidden from model history; raw events remain. - visible_events = [event for event in session.events if event.is_model_visible()] - assert len(session.events) == 5 - assert len(visible_events) == 0 + assert session.events == [] def test_apply_event_filtering_case_insensitive_user(self): """Test that user detection is case-insensitive.""" @@ -260,12 +273,10 @@ def test_apply_event_filtering_case_insensitive_user(self): # Apply strict TTL filter session.apply_event_filtering(event_ttl_seconds=2.0) - # Last user message is preserved as model-visible (case-insensitive). - visible_events = [event for event in session.events if event.is_model_visible()] - assert len(session.events) == 5 - assert len(visible_events) == 1 - assert visible_events[0].author.lower() == "user" - assert visible_events[0].get_text() == "Message from uSeR" + # Last user message is preserved as the anchor (case-insensitive). + assert len(session.events) == 1 + assert session.events[0].author.lower() == "user" + assert session.events[0].get_text() == "Message from uSeR" def test_apply_event_filtering_max_events_less_than_one(self): """Test that max_events <= 0 is treated as no limit.""" @@ -325,12 +336,31 @@ def test_add_event_with_filtering(self): content=Content(parts=[Part.from_text(text=f"Message {i}")])) session.add_event(event, max_events=5) - # Raw events remain, while only the model-visible window is trimmed. - visible_events = [event for event in session.events if event.is_model_visible()] - assert len(session.events) == 10 - assert len(visible_events) == 4 - assert visible_events[0].get_text() == "Message 6" - assert visible_events[-1].get_text() == "Message 9" + assert len(session.events) == 4 + assert session.events[0].get_text() == "Message 6" + assert session.events[-1].get_text() == "Message 9" + + def test_add_event_with_filtering_can_store_filtered_events(self): + """Test add_event moves filtered events to historical_events when requested.""" + session = Session( + id="test-session", + app_name="test-app", + user_id="test-user", + save_key="test-key", + ) + + for i in range(6): + event = Event(author="user" if i == 2 else "agent", + content=Content(parts=[Part.from_text(text=f"Message {i}")])) + session.add_event(event, max_events=2, store_filtered_events=True) + + assert [event.get_text() for event in session.events] == ["Message 2", "Message 5"] + assert [event.get_text() for event in session.historical_events] == [ + "Message 0", + "Message 1", + "Message 3", + "Message 4", + ] def test_apply_event_filtering_keeps_first_user_message_and_after(self): """Test that filtering keeps the first user message and all events after it.""" @@ -364,8 +394,6 @@ def test_apply_event_filtering_keeps_first_user_message_and_after(self): session.apply_event_filtering(max_events=3) # When the retained tail has no user message, fallback keeps the last user message only. - visible_events = [event for event in session.events if event.is_model_visible()] - assert len(session.events) == 7 - assert len(visible_events) == 1 - assert visible_events[0].author == "user" - assert visible_events[0].get_text() == "User question" + assert len(session.events) == 1 + assert session.events[0].author == "user" + assert session.events[0].get_text() == "User question" diff --git a/tests/sessions/test_session_summarizer.py b/tests/sessions/test_session_summarizer.py index 97462cad..ef46db4b 100644 --- a/tests/sessions/test_session_summarizer.py +++ b/tests/sessions/test_session_summarizer.py @@ -347,15 +347,13 @@ async def mock_generate(request, stream=False, ctx=None): summary_text, result_events = await summarizer.create_session_summary_by_events( events, "s1", keep_recent_count=3) assert summary_text is not None - assert len(result_events) == 11 # preserve all original events + 1 summary - visible_events = [event for event in result_events if event.is_model_visible()] - assert len(visible_events) == 4 # 1 summary + 3 recent + assert len(result_events) == 4 # 1 summary + 3 recent assert any(event.is_summary_event() for event in result_events) summary_event = next(event for event in result_events if event.is_summary_event()) assert summary_event.author == "system" assert summary_event.content.role == "user" - async def test_summary_traces_back_to_invisible_user_before_first_visible_event(self): + async def test_summary_starts_from_first_user_turn_before_recent_events(self): model = _make_model_mock() llm_response = MagicMock() llm_response.content = Content(parts=[Part.from_text(text="summary text")]) @@ -367,15 +365,13 @@ async def mock_generate(request, stream=False, ctx=None): model.generate_async = mock_generate summarizer = SessionSummarizer(model=model, start_by_user_turn=True) - hidden_user = _make_event(author="user", text="hidden question") - hidden_user.set_model_visible(False) - old_answer = _make_event(author="agent", text="visible answer") + old_user = _make_event(author="user", text="old question") + old_answer = _make_event(author="agent", text="old answer") recent_user = _make_event(author="user", text="recent question") system_preamble = _make_event(author="system", text="system preamble") - system_preamble.set_model_visible(False) events = [ system_preamble, - hidden_user, + old_user, old_answer, recent_user, ] @@ -386,13 +382,12 @@ async def mock_generate(request, stream=False, ctx=None): assert summary_text == "summary text" assert result_events is events assert captured_prompts - assert "hidden question" in captured_prompts[0] - assert "visible answer" in captured_prompts[0] + assert "old question" in captured_prompts[0] + assert "old answer" in captured_prompts[0] assert "system preamble" not in captured_prompts[0] assert "recent question" not in captured_prompts[0] - assert old_answer.is_model_visible() is False - assert recent_user.is_model_visible() is True - assert any(event.is_summary_event() for event in result_events) + assert result_events[0].is_summary_event() + assert result_events[1] is recent_user async def test_summary_can_start_from_existing_summary_event(self): model = _make_model_mock() @@ -409,7 +404,6 @@ async def mock_generate(request, stream=False, ctx=None): existing_summary = _make_event(author="system", text="previous summary") existing_summary.set_summary_event(True) system_preamble = _make_event(author="system", text="system preamble") - system_preamble.set_model_visible(False) events = [ system_preamble, existing_summary, @@ -424,7 +418,7 @@ async def mock_generate(request, stream=False, ctx=None): assert "previous summary" in captured_prompts[0] assert "old answer" in captured_prompts[0] assert "system preamble" not in captured_prompts[0] - assert result_events[3].is_summary_event() + assert result_events[0].is_summary_event() async def test_summary_falls_back_to_first_visible_event_and_ignores_large_keep_recent(self): model = _make_model_mock() @@ -449,9 +443,8 @@ async def mock_generate(request, stream=False, ctx=None): assert summary_text == "summary text" assert "agent message 1" in captured_prompts[0] assert "agent message 2" in captured_prompts[0] - visible_events = [event for event in result_events if event.is_model_visible()] - assert len(visible_events) == 1 - assert visible_events[0].is_summary_event() + assert len(result_events) == 1 + assert result_events[0].is_summary_event() async def test_summary_inserted_before_recent_user_turn_and_hides_prior_events(self): model = _make_model_mock() @@ -466,8 +459,6 @@ async def mock_generate(request, stream=False, ctx=None): model.generate_async = mock_generate summarizer = SessionSummarizer(model=model, start_by_user_turn=True) events = [_make_event(author="user" if idx in (8, 80, 92) else "agent", text=f"msg {idx}") for idx in range(100)] - for idx, event in enumerate(events): - event.set_model_visible(10 <= idx < 99) summary_text, result_events = await summarizer.create_session_summary_by_events( events, "s1", keep_recent_count=10) @@ -476,9 +467,8 @@ async def mock_generate(request, stream=False, ctx=None): assert "msg 8" in captured_prompts[0] assert "msg 91" in captured_prompts[0] assert "msg 92" not in captured_prompts[0] - assert result_events[92].is_summary_event() - assert all(not event.is_model_visible() for event in result_events[:92]) - assert result_events[93].is_model_visible() + assert result_events[0].is_summary_event() + assert result_events[1].content.parts[0].text == "msg 92" async def test_summary_with_zero_keep_recent(self): model = _make_model_mock() @@ -494,11 +484,59 @@ async def mock_generate(request, stream=False, ctx=None): summary_text, result_events = await summarizer.create_session_summary_by_events( events, "s1", keep_recent_count=0) assert summary_text is not None - assert len(result_events) == 6 # preserve all original events + 1 summary - visible_events = [event for event in result_events if event.is_model_visible()] - assert len(visible_events) == 1 # only summary event remains model-visible - assert visible_events[0].is_summary_event() - assert visible_events[0].content.role == "user" + assert len(result_events) == 1 # only summary event remains active + assert result_events[0].is_summary_event() + assert result_events[0].content.role == "user" + + async def test_summary_can_store_historical_events_separately(self): + model = _make_model_mock() + llm_response = MagicMock() + llm_response.content = Content(parts=[Part.from_text(text="summary text")]) + + async def mock_generate(request, stream=False, ctx=None): + yield llm_response + + model.generate_async = mock_generate + summarizer = SessionSummarizer(model=model) + events = [_make_event(text=f"msg{i}") for i in range(5)] + historical_events = [] + + summary_text, result_events = await summarizer.create_session_summary_by_events( + events, "s1", keep_recent_count=2, historical_events=historical_events, store_historical_events=True) + + assert summary_text == "summary text" + assert len(result_events) == 3 + assert len(historical_events) == 3 + assert historical_events[0].content.parts[0].text == "msg0" + + async def test_resummary_compresses_existing_summary_anchor(self): + model = _make_model_mock() + llm_response = MagicMock() + llm_response.content = Content(parts=[Part.from_text(text="new summary")]) + captured_prompts = [] + + async def mock_generate(request, stream=False, ctx=None): + captured_prompts.append(request.contents[0].parts[0].text) + yield llm_response + + model.generate_async = mock_generate + summarizer = SessionSummarizer(model=model, start_by_user_turn=True) + previous_summary = _make_event(author="system", text="Previous conversation summary: old summary") + previous_summary.set_summary_event(True) + events = [previous_summary] + [_make_event(text=f"msg{i}") for i in range(100, 181)] + historical_events = [] + + summary_text, result_events = await summarizer.create_session_summary_by_events( + events, "s1", keep_recent_count=20, historical_events=historical_events, store_historical_events=True) + + assert summary_text == "new summary" + assert result_events[0].is_summary_event() + assert result_events[0] is not previous_summary + assert [event.get_text() for event in result_events[1:]] == [f"msg{i}" for i in range(161, 181)] + assert previous_summary in historical_events + assert "old summary" in captured_prompts[0] + assert "msg160" in captured_prompts[0] + assert "msg161" not in captured_prompts[0] async def test_summary_no_events(self): model = _make_model_mock() @@ -539,12 +577,10 @@ async def mock_generate(request, stream=False, ctx=None): session = _make_session(events=[_make_event(text=f"msg{i}") for i in range(10)]) result = await summarizer.create_session_summary(session) assert result is not None - assert len(session.events) == 11 # preserve all original events + 1 summary - visible_events = [event for event in session.events if event.is_model_visible()] - assert len(visible_events) == 3 # 1 summary + 2 recent + assert len(session.events) == 3 # 1 summary + 2 recent assert any(event.is_summary_event() for event in session.events) - async def test_summary_traces_back_to_invisible_user_before_visible_events(self): + async def test_summary_starts_from_first_user_turn_in_session(self): model = _make_model_mock() llm_response = MagicMock() llm_response.content = Content(parts=[Part.from_text(text="session summary")]) @@ -556,15 +592,13 @@ async def mock_generate(request, stream=False, ctx=None): model.generate_async = mock_generate summarizer = SessionSummarizer(model=model, keep_recent_count=1, start_by_user_turn=True) - hidden_user = _make_event(author="user", text="hidden question") - hidden_user.set_model_visible(False) - old_answer = _make_event(author="agent", text="visible answer") + old_user = _make_event(author="user", text="old question") + old_answer = _make_event(author="agent", text="old answer") recent_user = _make_event(author="user", text="recent question") system_preamble = _make_event(author="system", text="system preamble") - system_preamble.set_model_visible(False) session = _make_session(events=[ system_preamble, - hidden_user, + old_user, old_answer, recent_user, ]) @@ -573,13 +607,12 @@ async def mock_generate(request, stream=False, ctx=None): assert result == "session summary" assert captured_prompts - assert "hidden question" in captured_prompts[0] - assert "visible answer" in captured_prompts[0] + assert "old question" in captured_prompts[0] + assert "old answer" in captured_prompts[0] assert "system preamble" not in captured_prompts[0] assert "recent question" not in captured_prompts[0] - assert old_answer.is_model_visible() is False - assert recent_user.is_model_visible() is True - assert any(event.is_summary_event() for event in session.events) + assert session.events[0].is_summary_event() + assert session.events[1] is recent_user async def test_summary_without_visible_user_falls_back_to_first_visible_event(self): model = _make_model_mock() @@ -600,10 +633,8 @@ async def mock_generate(request, stream=False, ctx=None): result = await summarizer.create_session_summary(session) assert result == "session summary" - assert len(session.events) == 3 - visible_events = [event for event in session.events if event.is_model_visible()] - assert len(visible_events) == 1 - assert visible_events[0].is_summary_event() + assert len(session.events) == 1 + assert session.events[0].is_summary_event() async def test_summary_no_update_on_failure(self): model = _make_model_mock() diff --git a/tests/sessions/test_sql_session_service.py b/tests/sessions/test_sql_session_service.py index 6c727951..30b3df1e 100644 --- a/tests/sessions/test_sql_session_service.py +++ b/tests/sessions/test_sql_session_service.py @@ -33,8 +33,15 @@ from trpc_agent_sdk.types import Content, EventActions, FunctionCall, Part, State -def _make_config(ttl_seconds=0, cleanup_interval=0.0, enable_ttl=False, num_recent_events=0): - config = SessionServiceConfig(num_recent_events=num_recent_events) +def _make_config(ttl_seconds=0, + cleanup_interval=0.0, + enable_ttl=False, + num_recent_events=0, + max_events=0, + store_historical_events=False): + config = SessionServiceConfig(num_recent_events=num_recent_events, + max_events=max_events, + store_historical_events=store_historical_events) if enable_ttl: config.ttl = SessionServiceConfig.create_ttl_config( enable=True, ttl_seconds=ttl_seconds, cleanup_interval_seconds=cleanup_interval) @@ -276,6 +283,7 @@ async def test_list_sessions_have_no_events(self): result = await svc.list_sessions(app_name="app", user_id="user") for s in result.sessions: assert s.events == [] + assert s.historical_events == [] await svc.close() @@ -350,6 +358,30 @@ async def test_append_multiple_events(self): assert len(stored.events) == 5 await svc.close() + async def test_append_without_filtering_does_not_delete_event_rows(self): + svc = await _create_service() + session = await svc.create_session(app_name="app", user_id="user", session_id="s1") + delete_mock = AsyncMock(wraps=svc._sql_storage.delete) + svc._sql_storage.delete = delete_mock + + await svc.append_event(session, _make_event(text="msg0")) + + delete_mock.assert_not_awaited() + await svc.close() + + async def test_append_persists_historical_events(self): + config = _make_config(max_events=2, store_historical_events=True) + svc = await _create_service(config) + session = await svc.create_session(app_name="app", user_id="user", session_id="s1") + + for i in range(4): + await svc.append_event(session, _make_event(author="user" if i == 2 else "agent", text=f"msg{i}")) + + stored = await svc.get_session(app_name="app", user_id="user", session_id="s1") + assert [event.get_text() for event in stored.events] == ["msg2", "msg3"] + assert [event.get_text() for event in stored.historical_events] == ["msg0", "msg1"] + await svc.close() + # --------------------------------------------------------------------------- # SqlSessionService — update_session diff --git a/tests/sessions/test_summarizer_checker.py b/tests/sessions/test_summarizer_checker.py index 2eb87e2c..62613ed4 100644 --- a/tests/sessions/test_summarizer_checker.py +++ b/tests/sessions/test_summarizer_checker.py @@ -55,6 +55,13 @@ def _make_event_with_usage(total_tokens: int) -> Event: return event +def _make_summary_event() -> Event: + event = _make_event_with_text("Previous conversation summary") + event.set_summary_event(True) + event.set_model_visible(True) + return event + + def _make_event_with_text(text: str) -> Event: return Event( invocation_id="inv-1", @@ -90,6 +97,29 @@ def test_no_usage_metadata(self): session = _make_session(events=events) assert checker(session) is False + def test_ignores_leading_summary_anchor_tokens(self): + checker = set_summarizer_token_threshold(100) + summary_event = _make_summary_event() + summary_event.usage_metadata = MagicMock() + summary_event.usage_metadata.total_token_count = 1000 + new_event = _make_event_with_usage(10) + summary_event.timestamp = 2.0 + new_event.timestamp = 3.0 + session = _make_session(events=[summary_event, new_event]) + + assert checker(session) is False + + def test_counts_flagged_invisible_tokens_after_latest_summary(self): + checker = set_summarizer_token_threshold(100) + summary_event = _make_summary_event() + flagged_event = _make_event_with_usage(1000) + summary_event.timestamp = 1.0 + flagged_event.timestamp = 2.0 + flagged_event.set_model_visible(False) + session = _make_session(events=[summary_event, flagged_event]) + + assert checker(session) is True + class TestEventsCountThreshold: """Test set_summarizer_events_count_threshold.""" @@ -118,6 +148,29 @@ def test_default_threshold(self): session = _make_session(events=events) assert checker(session) is True + def test_counts_events_after_leading_summary_anchor(self): + checker = set_summarizer_events_count_threshold(2) + summary_event = _make_summary_event() + new_events = [_make_event_with_text("new1"), _make_event_with_text("new2")] + summary_event.timestamp = 20.0 + new_events[0].timestamp = 21.0 + new_events[1].timestamp = 22.0 + session = _make_session(events=[summary_event, *new_events]) + + assert checker(session) is False + + def test_non_leading_summary_is_counted_as_active_event(self): + checker = set_summarizer_events_count_threshold(1) + event_after_first_summary = _make_event_with_text("already summarized") + non_leading_summary = _make_summary_event() + new_event = _make_event_with_text("new") + event_after_first_summary.timestamp = 2.0 + non_leading_summary.timestamp = 3.0 + new_event.timestamp = 4.0 + session = _make_session(events=[event_after_first_summary, non_leading_summary, new_event]) + + assert checker(session) is True + class TestTimeIntervalThreshold: """Test set_summarizer_time_interval_threshold.""" @@ -143,6 +196,25 @@ def test_default_threshold(self): session = _make_session(events=[event]) assert checker(session) is False + def test_requires_events_after_leading_summary_anchor(self): + checker = set_summarizer_time_interval_threshold(10.0) + summary_event = _make_summary_event() + summary_event.timestamp = time.time() - 20.0 + session = _make_session(events=[summary_event]) + + assert checker(session) is False + + def test_counts_flagged_invisible_events_after_latest_summary(self): + checker = set_summarizer_time_interval_threshold(10.0) + summary_event = _make_summary_event() + flagged_event = _make_event_with_text("flagged") + summary_event.timestamp = time.time() - 20.0 + flagged_event.timestamp = time.time() - 20.0 + flagged_event.set_model_visible(False) + session = _make_session(events=[summary_event, flagged_event]) + + assert checker(session) is True + class TestImportantContentThreshold: """Test set_summarizer_important_content_threshold.""" @@ -182,6 +254,28 @@ def test_event_with_whitespace_only(self): session = _make_session(events=events) assert checker(session) is False + def test_ignores_important_content_in_leading_summary_anchor(self): + checker = set_summarizer_important_content_threshold(5) + summary_event = _make_event_with_text("This old content is important") + summary_event.set_summary_event(True) + new_short_event = _make_event_with_text("short") + summary_event.timestamp = 2.0 + new_short_event.timestamp = 3.0 + session = _make_session(events=[summary_event, new_short_event]) + + assert checker(session) is False + + def test_counts_flagged_invisible_important_content_after_latest_summary(self): + checker = set_summarizer_important_content_threshold(5) + summary_event = _make_summary_event() + flagged_important_event = _make_event_with_text("This flagged content is important") + summary_event.timestamp = 1.0 + flagged_important_event.timestamp = 2.0 + flagged_important_event.set_model_visible(False) + session = _make_session(events=[summary_event, flagged_important_event]) + + assert checker(session) is True + class TestConversationThreshold: """Test set_summarizer_conversation_threshold.""" diff --git a/tests/sessions/test_utils.py b/tests/sessions/test_utils.py index bda07ba1..f8a74b9b 100644 --- a/tests/sessions/test_utils.py +++ b/tests/sessions/test_utils.py @@ -181,7 +181,7 @@ def test_defaults_start_from_user_and_keep_recent(self): selected_events, insert_index = find_events_for_summary(events, keep_recent_count=1) - assert selected_events == events[:3] + assert selected_events == events[1:3] assert insert_index == 3 def test_first_visible_summary_event_starts_summary_window(self): @@ -209,16 +209,14 @@ def test_fallback_to_first_visible_event(self): assert selected_events == events[:1] assert insert_index == 1 - def test_traces_back_to_invisible_user_before_first_visible_event(self): - hidden_user = _make_event(author="user", text="hidden") - hidden_user.set_model_visible(False) + def test_starts_from_first_user_in_active_events(self): + old_user = _make_event(author="user", text="old question") events = [ - _make_event(author="system", text="hidden preamble"), - hidden_user, - _make_event(author="agent", text="visible answer"), - _make_event(author="agent", text="visible answer"), + _make_event(author="system", text="system preamble"), + old_user, + _make_event(author="agent", text="answer 1"), + _make_event(author="agent", text="answer 2"), ] - events[0].set_model_visible(False) selected_events, insert_index = find_events_for_summary(events, keep_recent_count=10) @@ -227,24 +225,20 @@ def test_traces_back_to_invisible_user_before_first_visible_event(self): def test_aligns_recent_window_to_next_user_turn(self): events = [_make_event(author="user" if idx in (8, 80, 92) else "agent", text=f"msg {idx}") for idx in range(100)] - for idx, event in enumerate(events): - event.set_model_visible(10 <= idx < 99) selected_events, insert_index = find_events_for_summary(events, keep_recent_count=10) assert selected_events == events[8:92] assert insert_index == 92 - def test_keep_recent_count_uses_model_visible_events(self): + def test_keep_recent_count_uses_active_events(self): events = [_make_event(author="agent", text=f"msg {idx}") for idx in range(20)] - for idx, event in enumerate(events): - event.set_model_visible(idx in (0, 1, 2, 10, 15, 19)) selected_events, insert_index = find_events_for_summary( events, keep_recent_count=3, start_by_user_turn=False) - assert selected_events == events[:10] - assert insert_index == 10 + assert selected_events == events[:17] + assert insert_index == 17 def test_ignores_keep_recent_when_it_would_empty_selection(self): events = [ @@ -266,14 +260,11 @@ def test_zero_keep_recent_selects_all_matching_events(self): selected_events, insert_index = find_events_for_summary(events, keep_recent_count=0) - assert selected_events == events + assert selected_events == events[1:] assert insert_index == len(events) - def test_no_visible_events(self): - event = _make_event(author="user", text="hidden") - event.set_model_visible(False) - - selected_events, insert_index = find_events_for_summary([event]) + def test_no_events(self): + selected_events, insert_index = find_events_for_summary([]) assert selected_events == [] assert insert_index == -1 diff --git a/trpc_agent_sdk/agents/_langgraph_agent.py b/trpc_agent_sdk/agents/_langgraph_agent.py index 446895fb..019832b7 100644 --- a/trpc_agent_sdk/agents/_langgraph_agent.py +++ b/trpc_agent_sdk/agents/_langgraph_agent.py @@ -420,7 +420,7 @@ def _extract_resume_command(self, events: list[Event]) -> Optional[Command]: # Must use checkpointer to resume if not self.graph.checkpointer or len(events) == 0: return None - last_event = next((event for event in reversed(events) if event.is_model_visible()), None) + last_event = events[-1] if not last_event: return None if last_event.author == "user" and last_event.content and last_event.content.parts: @@ -678,8 +678,6 @@ def _get_last_human_messages(self, events: list[Event]) -> list[HumanMessage]: """ messages = [] for event in reversed(events): - if not event.is_model_visible(): - continue if messages and event.author != "user": break if event.author == "user" and event.content and event.content.parts: @@ -722,8 +720,6 @@ def _get_conversation_with_agent(self, events: list[Event]) -> list[Union[HumanM messages = [] for event in events: - if not event.is_model_visible(): - continue if not event.content or not event.content.parts: continue diff --git a/trpc_agent_sdk/agents/core/_history_processor.py b/trpc_agent_sdk/agents/core/_history_processor.py index bd38ccda..f8b90f6d 100644 --- a/trpc_agent_sdk/agents/core/_history_processor.py +++ b/trpc_agent_sdk/agents/core/_history_processor.py @@ -114,9 +114,6 @@ def filter_events( filtered_events = [] for event in events: - # Step 0.5: Model visibility filtering - if not event.is_model_visible(): - continue # Step 1: Timeline filtering if not self._should_include_event_by_timeline(event, self.timeline_filter_mode, ctx): @@ -175,9 +172,6 @@ def _should_include_event_by_timeline( if timeline_filter_mode == TimelineFilterMode.ALL: return True - if event.is_summary_event(): - return True - # INVOCATION mode: Filter by invocation_id (which represents a single runner.run_async() call) if timeline_filter_mode == TimelineFilterMode.INVOCATION: if ctx and event.invocation_id: diff --git a/trpc_agent_sdk/dsl/graph/_graph_agent.py b/trpc_agent_sdk/dsl/graph/_graph_agent.py index 4e4ec5ce..12e05c1d 100644 --- a/trpc_agent_sdk/dsl/graph/_graph_agent.py +++ b/trpc_agent_sdk/dsl/graph/_graph_agent.py @@ -413,8 +413,6 @@ def _build_initial_state(self, ctx: InvocationContext) -> GraphState: user_input = "" user_input_event = None for event in reversed(ctx.session.events): - if not event.is_model_visible(): - continue if event.author == "user" and event.content and event.content.parts: for part in event.content.parts: if part.text: @@ -432,8 +430,6 @@ def _build_initial_state(self, ctx: InvocationContext) -> GraphState: messages = [] if not has_saved_checkpoint: for event in ctx.session.events: - if not event.is_model_visible(): - continue if event.content: # Skip the user input event - it will be added via STATE_KEY_USER_INPUT if event is user_input_event: diff --git a/trpc_agent_sdk/dsl/graph/_node_action/_agent.py b/trpc_agent_sdk/dsl/graph/_node_action/_agent.py index eacad52f..adf1e7a7 100644 --- a/trpc_agent_sdk/dsl/graph/_node_action/_agent.py +++ b/trpc_agent_sdk/dsl/graph/_node_action/_agent.py @@ -187,7 +187,7 @@ async def execute(self, state: State) -> dict[str, Any]: if text_parts: last_response = text_parts[-1] - if not event.visible or not event.is_model_visible(): + if not event.visible: if event.actions and event.actions.transfer_to_agent: raise ValueError("Agent transfer requested but invisible is not allowed.") continue @@ -221,7 +221,7 @@ async def execute(self, state: State) -> dict[str, Any]: await self._run_agent_event_callbacks(state, error_event) if hasattr(child_session, "events"): child_session.events.append(error_event.model_copy(deep=True)) - if error_event.visible or error_event.is_model_visible(): + if error_event.visible: self.writer.write_event(error_event) break diff --git a/trpc_agent_sdk/memory/_in_memory_memory_service.py b/trpc_agent_sdk/memory/_in_memory_memory_service.py index 992a9b2c..eb2f8496 100644 --- a/trpc_agent_sdk/memory/_in_memory_memory_service.py +++ b/trpc_agent_sdk/memory/_in_memory_memory_service.py @@ -111,8 +111,6 @@ async def search_memory(self, count = 0 for session_events in self._session_events[key].values(): for event_ttl in session_events: - if not event_ttl.event.is_model_visible(): - continue if not event_ttl.event.content or not event_ttl.event.content.parts: continue words_in_event = extract_words_lower(' '.join( diff --git a/trpc_agent_sdk/memory/_redis_memory_service.py b/trpc_agent_sdk/memory/_redis_memory_service.py index 04e3aae5..0c51dc05 100644 --- a/trpc_agent_sdk/memory/_redis_memory_service.py +++ b/trpc_agent_sdk/memory/_redis_memory_service.py @@ -85,7 +85,7 @@ async def search_memory(self, except Exception as ex: # pylint: disable=broad-except logger.error("Error parsing event JSON: %s", ex) continue - if not event or not event.content or not event.content.parts or not event.is_model_visible(): + if not event or not event.content or not event.content.parts: continue words_in_event = extract_words_lower(' '.join( [part.text for part in event.content.parts if part.text])) diff --git a/trpc_agent_sdk/memory/_sql_memory_service.py b/trpc_agent_sdk/memory/_sql_memory_service.py index 244d72a8..a92e91c6 100644 --- a/trpc_agent_sdk/memory/_sql_memory_service.py +++ b/trpc_agent_sdk/memory/_sql_memory_service.py @@ -198,8 +198,6 @@ async def store_session(self, session: Session, agent_context: Optional[AgentCon async with self._sql_storage.create_db_session() as sql_session: is_exist = False for event in session.events: - if not event.is_model_visible(): - continue if not event.content or not event.content.parts: continue content = sanitize_content_json(event.content.model_dump(exclude_none=True, mode="json")) diff --git a/trpc_agent_sdk/memory/mem0_memory_service.py b/trpc_agent_sdk/memory/mem0_memory_service.py index 2878109b..3507af84 100644 --- a/trpc_agent_sdk/memory/mem0_memory_service.py +++ b/trpc_agent_sdk/memory/mem0_memory_service.py @@ -147,9 +147,7 @@ async def store_session(self, session: Session, agent_context: Optional[AgentCon level-1 key: session.save_key -> user_id level-2 key: session.id -> metadata["session_id"] """ - valid_events = [ - event for event in session.events if event.content and event.content.parts and event.is_model_visible() - ] + valid_events = [event for event in session.events if event.content and event.content.parts] if not valid_events: return diff --git a/trpc_agent_sdk/memory/mempalace_memory_service.py b/trpc_agent_sdk/memory/mempalace_memory_service.py index 8070fd94..032f1efc 100644 --- a/trpc_agent_sdk/memory/mempalace_memory_service.py +++ b/trpc_agent_sdk/memory/mempalace_memory_service.py @@ -86,14 +86,13 @@ def __init__( wing: Optional[str] = None, room: str = _DEFAULT_ROOM, added_by: str = _DEFAULT_ADDED_BY, - store_only_model_visible: bool = True, + **kwargs: Any, ) -> None: super().__init__(memory_service_config=memory_service_config) self._config = config or MempalaceConfig() self._wing = wing self._room = room self._added_by = added_by - self._store_only_model_visible = store_only_model_visible self._pending_tasks: set[asyncio.Task[None]] = set() self._scheduled_drawer_ids: set[str] = set() self._stored_drawer_ids: set[str] = set() @@ -110,8 +109,6 @@ async def store_session(self, session: Session, agent_context: Optional[AgentCon events_to_store: list[tuple[Event, str, str]] = [] for event in session.events: - if self._store_only_model_visible and not event.is_model_visible(): - continue text = self._event_to_text(event) if not text: continue diff --git a/trpc_agent_sdk/server/a2a/_remote_a2a_agent.py b/trpc_agent_sdk/server/a2a/_remote_a2a_agent.py index 52d818c5..e76540c5 100644 --- a/trpc_agent_sdk/server/a2a/_remote_a2a_agent.py +++ b/trpc_agent_sdk/server/a2a/_remote_a2a_agent.py @@ -323,8 +323,6 @@ def _build_outgoing_message(self, ctx: InvocationContext) -> Optional[Message]: user_event = None for event in reversed(ctx.session.events): - if not event.is_model_visible(): - continue if event.author == "user" and event.content: user_event = event break diff --git a/trpc_agent_sdk/server/agents/claude/_claude_agent.py b/trpc_agent_sdk/server/agents/claude/_claude_agent.py index 93273996..d745c611 100644 --- a/trpc_agent_sdk/server/agents/claude/_claude_agent.py +++ b/trpc_agent_sdk/server/agents/claude/_claude_agent.py @@ -832,8 +832,6 @@ def _extract_latest_user_message(self, ctx: InvocationContext) -> Optional[str]: # Look through events in reverse to find latest user message for event in reversed(ctx.session.events): - if not event.is_model_visible(): - continue if event.author == "user" and event.content and event.content.parts: for part in event.content.parts: if part.text: @@ -904,8 +902,6 @@ def _build_prompt_with_history(self, ctx: InvocationContext) -> Optional[str]: # Iterate through events to build conversation history for event in ctx.session.events: - if not event.is_model_visible(): - continue if not event.content or not event.content.parts: continue diff --git a/trpc_agent_sdk/sessions/_base_session_service.py b/trpc_agent_sdk/sessions/_base_session_service.py index e962cda9..979523f4 100644 --- a/trpc_agent_sdk/sessions/_base_session_service.py +++ b/trpc_agent_sdk/sessions/_base_session_service.py @@ -66,6 +66,11 @@ def summarizer_manager(self) -> Optional[SummarizerSessionManager]: """Get the summarizer manager.""" return self._summarizer_manager + @property + def session_config(self) -> SessionServiceConfig: + """Get the session service configuration.""" + return self._session_config + def set_summarizer_manager(self, summarizer_manager: SummarizerSessionManager, force: bool = False) -> None: """Set the summarizer manager to use. @@ -82,15 +87,22 @@ async def append_event(self, session: Session, event: Event) -> Event: """Appends an event to a session object.""" if event.partial: return event + event, _ = self._append_event_to_session(session, event) + return event + + def _append_event_to_session(self, session: Session, event: Event) -> tuple[Event, list[Event]]: + """Append an event to the in-memory session and return filtered events.""" # Apply temp-scoped state to in-memory session before trimming event delta, # so same-invocation consumers can still read temp values. self._apply_temp_state(session, event) event = self._trim_temp_delta_state(event) self.__update_session_state(session, event) - session.add_event(event, - event_ttl_seconds=self._session_config.event_ttl_seconds, - max_events=self._session_config.max_events) - return event + filtered_events = session._add_event_and_get_filtered_events( + event, + event_ttl_seconds=self._session_config.event_ttl_seconds, + max_events=self._session_config.max_events, + store_filtered_events=self._session_config.store_historical_events) + return event, filtered_events def _apply_temp_state(self, session: Session, event: Event) -> None: """Apply temp-scoped state delta to in-memory session state only. @@ -179,12 +191,22 @@ async def get_session_summary(self, session: Session) -> Optional[str]: return summary.summary_text return None - def filter_events(self, session: Session) -> None: - """Filter events based on the session config.""" - session.apply_event_filtering( + def filter_events(self, session: Session, need_copy: bool = False) -> Session: + """Filter events based on the session config. + + Args: + session: Session to filter. + need_copy: Whether to filter a deep copy instead of mutating the input session. + """ + filtered_session = session.model_copy(deep=True) if need_copy else session + # This is a read-view trim. Do not pass store_historical_events here: + # repeated get_session calls must not move view-trimmed events into + # historical_events or turn a read into a persistent state change. + filtered_session.apply_event_filtering( event_ttl_seconds=self._session_config.event_ttl_seconds, max_events=self._session_config.num_recent_events, ) + return filtered_session @override async def close(self) -> None: diff --git a/trpc_agent_sdk/sessions/_in_memory_session_service.py b/trpc_agent_sdk/sessions/_in_memory_session_service.py index fc5f400f..a2c542ed 100644 --- a/trpc_agent_sdk/sessions/_in_memory_session_service.py +++ b/trpc_agent_sdk/sessions/_in_memory_session_service.py @@ -175,8 +175,7 @@ async def get_session( if session is None: return None - copied_session = copy.deepcopy(session) - self.filter_events(copied_session) + copied_session = self.filter_events(session, need_copy=True) app_state = self._get_app_state(app_name) user_state = self._get_user_state(app_name, user_id) @@ -199,6 +198,7 @@ async def list_sessions(self, *, app_name: str, user_id: str) -> ListSessionsRes copied_session = copy.deepcopy(session) copied_session.events = [] + copied_session.historical_events = [] app_state = self._get_app_state(app_name) user_state = self._get_user_state(app_name, user_id) copied_session = self._merge_state(app_state, user_state, copied_session) @@ -469,6 +469,9 @@ def _set_session(self, app_name: str, user_id: str, session_id: str, session: Se if user_id not in self._sessions[app_name]: self._sessions[app_name][user_id] = {} + if not self._session_config.store_historical_events: + session = session.model_copy(update={"historical_events": []}) + # Store session with TTL session_with_ttl = SessionWithTTL(session=session, ttl=self._session_config.ttl) session_with_ttl.update(session) diff --git a/trpc_agent_sdk/sessions/_redis_session_service.py b/trpc_agent_sdk/sessions/_redis_session_service.py index 0d6dc11d..01f77182 100644 --- a/trpc_agent_sdk/sessions/_redis_session_service.py +++ b/trpc_agent_sdk/sessions/_redis_session_service.py @@ -62,7 +62,11 @@ def __init__(self, session_config: Optional[SessionServiceConfig] = None, is_async: bool = False, **kwargs: Any): + is_default_config = session_config is None super().__init__(summarizer_manager=summarizer_manager, session_config=session_config) + if is_default_config: + # Default to store historical events for persistent backends. + self._session_config.store_historical_events = True # Redis needs default TTL configuration self._redis_storage = RedisStorage(is_async=is_async, redis_url=db_url, **kwargs) @@ -115,14 +119,14 @@ async def get_session( if not storage_session: return None - # Filter events based on configuration - self.filter_events(storage_session) + # Filter events for the returned view without mutating storage data. + session = self.filter_events(storage_session) # Get and merge state app_state = await self._get_app_state(redis_session, app_name) user_state = await self._get_user_state(redis_session, app_name, user_id) - return self._merge_state(app_state, user_state, storage_session) + return self._merge_state(app_state, user_state, session) @override async def list_sessions(self, *, app_name: str, user_id: str) -> ListSessionsResponse: @@ -144,6 +148,7 @@ async def list_sessions(self, *, app_name: str, user_id: str) -> ListSessionsRes if storage_session: # Clear events for list view storage_session.events = [] + storage_session.historical_events = [] # Merge state storage_session = self._merge_state(app_state, user_state, storage_session) sessions_without_events.append(storage_session) @@ -181,9 +186,6 @@ def _warning(message: str) -> None: if not storage_session: _warning("session not found in Redis") return event - # Add event to storage session - storage_session.events.append(event) - # Extract and apply state changes to appropriate storage buckets if event.actions and event.actions.state_delta: state_delta = extract_state_delta(event.actions.state_delta) @@ -200,7 +202,8 @@ def _warning(message: str) -> None: if state_delta.session_state: storage_session.state.update(state_delta.session_state) - # Update conversation count + storage_session.events = session.events + storage_session.historical_events = session.historical_events storage_session.conversation_count = session.conversation_count await self._set_session(redis_session, storage_session) @@ -316,7 +319,10 @@ async def _set_session(self, redis_session: RedisSession, session: Session) -> N session: Session to set """ key = session_key(session.app_name, session.user_id, session.id) - session_json = session.model_dump_json() + if self._session_config.store_historical_events: + session_json = session.model_dump_json() + else: + session_json = session.model_copy(update={"historical_events": []}).model_dump_json() # Use SET with TTL if TTL is configured, otherwise use SET command = RedisCommand(method='set', @@ -378,7 +384,10 @@ async def _get_session(self, redis_session: RedisSession, session_key: str) -> O storage_session_data = await self._redis_storage.execute_command(redis_session, command) if storage_session_data: await self._refresh_ttl(redis_session, session_key) - return Session.model_validate_json(storage_session_data) + session = Session.model_validate_json(storage_session_data) + if not self._session_config.store_historical_events: + session.historical_events = [] + return session return None def _merge_state(self, app_state: dict[str, Any], user_state: dict[str, Any], session: Session) -> Session: diff --git a/trpc_agent_sdk/sessions/_session.py b/trpc_agent_sdk/sessions/_session.py index 73a057f5..b0fd094f 100644 --- a/trpc_agent_sdk/sessions/_session.py +++ b/trpc_agent_sdk/sessions/_session.py @@ -34,20 +34,40 @@ class Session(SessionABC): """ events: List[Event] = Field(default_factory=list, description="The events of the session") """The events of the session, e.g. user input, model response, function call/response, etc.""" - - def add_event(self, event: Event, event_ttl_seconds: float = 0.0, max_events: int = 0) -> None: + historical_events: List[Event] = Field(default_factory=list, description="Events replaced by session summaries") + """Raw events that have been compressed into summaries and removed from the model-visible event window.""" + + def add_event(self, + event: Event, + event_ttl_seconds: float = 0.0, + max_events: int = 0, + store_filtered_events: bool = False) -> None: """Add an event to the session and update the last update time. Args: event: The event to add to the session. event_ttl_seconds: Time-to-live in seconds for events. If 0, no TTL filtering is applied. max_events: Maximum number of events to keep. If 0, no limit is applied. + store_filtered_events: Whether to move filtered events into historical_events. """ + self._add_event_and_get_filtered_events(event, event_ttl_seconds, max_events, store_filtered_events) + + def _add_event_and_get_filtered_events(self, + event: Event, + event_ttl_seconds: float = 0.0, + max_events: int = 0, + store_filtered_events: bool = False) -> list[Event]: + """Add an event and return events removed from the active event window.""" self.events.append(event) - self.apply_event_filtering(event_ttl_seconds, max_events) + filtered_events = self._apply_event_filtering_and_get_filtered_events(event_ttl_seconds, max_events, + store_filtered_events) self.last_update_time = event.timestamp + return filtered_events - def apply_event_filtering(self, event_ttl_seconds: float = 0.0, max_events: int = 0) -> None: + def apply_event_filtering(self, + event_ttl_seconds: float = 0.0, + max_events: int = 0, + store_filtered_events: bool = False) -> None: """Apply event filtering based on TTL and maximum event count. This method filters events in two steps: @@ -60,21 +80,23 @@ def apply_event_filtering(self, event_ttl_seconds: float = 0.0, max_events: int Args: event_ttl_seconds: Time-to-live in seconds for events. If 0, no TTL filtering is applied. max_events: Maximum number of events to keep. If 0, no limit is applied. + store_filtered_events: Whether to move filtered events into historical_events. """ + self._apply_event_filtering_and_get_filtered_events(event_ttl_seconds, max_events, store_filtered_events) + + def _apply_event_filtering_and_get_filtered_events(self, + event_ttl_seconds: float = 0.0, + max_events: int = 0, + store_filtered_events: bool = False) -> list[Event]: + """Apply event filtering and return events removed from the active event window.""" if not self.events: - return + return [] # If neither filter is configured, return early if event_ttl_seconds <= 0 and max_events <= 0: - return + return [] - # Apply filtering only to the currently model-visible events. Raw - # session events stay in place; events filtered out of this visible - # window are hidden from model history. - visible_events = [event for event in self.events if event.is_model_visible()] - if not visible_events: - return - retained_events = visible_events.copy() + retained_events = self.events.copy() # Step 1: Apply TTL filtering if configured if event_ttl_seconds > 0: @@ -91,31 +113,26 @@ def apply_event_filtering(self, event_ttl_seconds: float = 0.0, max_events: int retained_events = retained_events[i:] break else: - # Step 3: If all visible events were filtered out, retain the - # first user message that the original behavior would have - # re-inserted, but only from the already-visible subset. + # Step 3: If all events were filtered out, retain the latest + # summary/user anchor so the next model request still has a + # coherent starting point. retained_events = [] - for event in reversed(visible_events): + for event in reversed(self.events): if is_summary_anchor(event): retained_events.insert(0, event) break retained_ids = {id(event) for event in retained_events} - for event in visible_events: - if id(event) not in retained_ids: - event.set_model_visible(False) - - def get_first_visible_event_idx(self) -> int: - """Get the first visible event index in the session.""" - first_visible_idx = 0 - for idx, event in enumerate(self.events): - if event.is_model_visible(): - first_visible_idx = idx - break - return first_visible_idx + filtered_events = [event for event in self.events if id(event) not in retained_ids] + + if store_filtered_events: + self.historical_events.extend(filtered_events) + + self.events = retained_events + return filtered_events def insert_events(self, events: List[Event], idx: Optional[int] = None) -> None: """Insert events at the given index, replacing the existing events.""" if idx is None: - idx = self.get_first_visible_event_idx() + idx = 0 self.events[idx:idx] = events diff --git a/trpc_agent_sdk/sessions/_session_summarizer.py b/trpc_agent_sdk/sessions/_session_summarizer.py index 6251beec..da717110 100644 --- a/trpc_agent_sdk/sessions/_session_summarizer.py +++ b/trpc_agent_sdk/sessions/_session_summarizer.py @@ -355,11 +355,14 @@ def _create_summarization_prompt(self, conversation_text: str) -> str: """ return self._summarizer_prompt.format(conversation_text=conversation_text) - async def create_session_summary_by_events(self, - events: List[Event], - session_id: str, - keep_recent_count: int = 10, - ctx: InvocationContext | None = None) -> Optional[str]: + async def create_session_summary_by_events( + self, + events: List[Event], + session_id: str, + keep_recent_count: int = 10, + ctx: InvocationContext | None = None, + historical_events: Optional[List[Event]] = None, + store_historical_events: bool = False) -> tuple[Optional[str], List[Event]]: """Compress a session by summarizing old events. Args: @@ -367,20 +370,22 @@ async def create_session_summary_by_events(self, session_id: The session ID keep_recent_count: Number of recent events to keep after compression ctx: The invocation context + historical_events: Optional list to receive raw events replaced by the summary + store_historical_events: Whether to keep raw historical events Returns: Summary text if successful, None otherwise Events after compression """ try: - original_count = sum(1 for event in events if event.is_model_visible()) - old_visible_events, insert_index = find_events_for_summary(events, keep_recent_count, + original_count = len(events) + events_for_summary, insert_index = find_events_for_summary(events, keep_recent_count, self.__start_by_user_turn) - if not old_visible_events: + if not events_for_summary: return None, events # Generate summary of old events - summary_text = await self._compress_session_to_summary(old_visible_events, session_id, ctx) + summary_text = await self._compress_session_to_summary(events_for_summary, session_id, ctx) if summary_text: # Create summary event @@ -391,16 +396,15 @@ async def create_session_summary_by_events(self, role="user"), timestamp=time.time()) summary_event.set_summary_event(True) - summary_event.set_model_visible(True) - # Hide all events before the summary insertion point without dropping raw data. - for event in events[:insert_index]: - event.set_model_visible(False) + summarized_events = events[:insert_index] + if store_historical_events and historical_events is not None: + historical_events.extend(summarized_events) - # Insert summary before the recent complete conversation window. - events.insert(insert_index, summary_event) + # Keep only the summary and recent active events in the model-facing window. + events[:] = [summary_event] + events[insert_index:] - compressed_count = sum(1 for event in events if event.is_model_visible()) + compressed_count = len(events) logger.info("Compressed session %s: %s events -> %s events", session_id, original_count, compressed_count) @@ -409,19 +413,27 @@ async def create_session_summary_by_events(self, logger.error("Failed to compress session %s: %s", session_id, ex, exc_info=True) return None, events - async def create_session_summary(self, session: Session, ctx: InvocationContext | None = None) -> Optional[str]: + async def create_session_summary(self, + session: Session, + ctx: InvocationContext | None = None, + store_historical_events: bool = False) -> Optional[str]: """Compress a session by summarizing old events. Args: session: The session to compress ctx: The invocation context + store_historical_events: Whether to keep raw historical events Returns: Summary text if successful, None otherwise Events after compression """ - summary_text, _ = await self.create_session_summary_by_events(session.events, session.id, - self.__keep_recent_count, ctx) + summary_text, _ = await self.create_session_summary_by_events(session.events, + session.id, + self.__keep_recent_count, + ctx, + historical_events=session.historical_events, + store_historical_events=store_historical_events) return summary_text def get_summary_metadata(self) -> Dict[str, Any]: diff --git a/trpc_agent_sdk/sessions/_sql_session_service.py b/trpc_agent_sdk/sessions/_sql_session_service.py index 82dd41a7..d5ad5a08 100644 --- a/trpc_agent_sdk/sessions/_sql_session_service.py +++ b/trpc_agent_sdk/sessions/_sql_session_service.py @@ -41,6 +41,7 @@ from sqlalchemy import Text from sqlalchemy import func from sqlalchemy.ext.mutable import MutableDict +from sqlalchemy.ext.mutable import MutableList from sqlalchemy.inspection import inspect from sqlalchemy.orm import DeclarativeBase from sqlalchemy.orm import Mapped @@ -94,6 +95,16 @@ def _event_object_from_storage(value: Optional[str]) -> Optional[str]: return value or Event.model_fields["object"].default +def _events_to_storage(events: list[Event]) -> list[dict[str, Any]]: + """Serialize historical events into the sessions table JSON column.""" + return [event.model_dump(exclude_none=True, mode="json") for event in events] + + +def _events_from_storage(events: Optional[list[dict[str, Any]]]) -> list[Event]: + """Deserialize historical events from the sessions table JSON column.""" + return [Event.model_validate(event) for event in (events or [])] + + class SessionStorageBase(DeclarativeBase): """Base class for SqlSessionService tables only. @@ -139,6 +150,9 @@ class StorageSession(SessionStorageBase): ) state: Mapped[MutableDict[str, Any]] = mapped_column(MutableDict.as_mutable(DynamicJSON), default={}) + historical_events: Mapped[Optional[list[dict[str, Any]]]] = mapped_column(MutableList.as_mutable(DynamicJSON), + default=list, + nullable=True) conversation_count: Mapped[int] = mapped_column(Integer, default=0) create_time: Mapped[datetime] = mapped_column(PreciseTimestamp, default=func.now()) @@ -164,17 +178,21 @@ def to_session( self, state: dict[str, Any] | None = None, events: list[Event] | None = None, + historical_events: list[Event] | None = None, ) -> Session: if state is None: state = {} if events is None: events = [] + if historical_events is None: + historical_events = [] return Session( app_name=self.app_name, user_id=self.user_id, id=self.id, state=state, events=events, + historical_events=historical_events, conversation_count=self.conversation_count, last_update_time=self.update_timestamp_tz, save_key=user_key(self.app_name, self.user_id), @@ -374,7 +392,11 @@ def __init__(self, is_async: bool = False, session_config: Optional[SessionServiceConfig] = None, **kwargs: Any): + is_default_config = session_config is None super().__init__(summarizer_manager=summarizer_manager, session_config=session_config) + if is_default_config: + # Default to store historical events for persistent backends. + self._session_config.store_historical_events = True self._sql_storage = SqlStorage(is_async=is_async, db_url=db_url, metadata=SessionStorageBase.metadata, **kwargs) self.__cleanup_task: Optional[asyncio.Task] = None self.__cleanup_stop_event: Optional[asyncio.Event] = None @@ -422,7 +444,9 @@ async def create_session( StateStorageEntry(app_state_delta=app_state, user_state_delta=user_state, session_state=state_deltas.session_state)) - return storage_session.to_session(state=merged_state) + historical_events = (_events_from_storage(storage_session.historical_events) + if self._session_config.store_historical_events else []) + return storage_session.to_session(state=merged_state, historical_events=historical_events) @override async def get_session( @@ -459,9 +483,10 @@ async def get_session( session_state=storage_session.state)) events = [e.to_event() for e in reversed(storage_events)] - session = storage_session.to_session(state=merged_state, events=events) - self.filter_events(session) - return session + historical_events = (_events_from_storage(storage_session.historical_events) + if self._session_config.store_historical_events else []) + session = storage_session.to_session(state=merged_state, events=events, historical_events=historical_events) + return self.filter_events(session) @override async def list_sessions(self, *, app_name: str, user_id: str) -> ListSessionsResponse: @@ -507,7 +532,7 @@ async def append_event(self, session: Session, event: Event) -> Event: if event.partial: return event - await super().append_event(session=session, event=event) + event, filtered_events = self._append_event_to_session(session, event) app_name = session.app_name user_id = session.user_id @@ -525,6 +550,12 @@ async def append_event(self, session: Session, event: Event) -> Event: logger.warning( "Session %s is stale (time diff: %ss). Reloading session from database to get latest state.", session_id, time_diff) + # The event was already appended to the caller-provided + # session before this reload. If another writer concurrently + # changed the same session, filtered_events may no longer + # describe the exact database window. Full conflict resolution + # would require versioned writes/locking; keep the existing + # best-effort stale-session behavior here. await self._sql_storage.refresh(sql_session, storage_session) filters = [ SessionStorageEvent.app_name == app_name, SessionStorageEvent.session_id == session_id, @@ -539,6 +570,8 @@ async def append_event(self, session: Session, event: Event) -> Event: session.state = storage_session.state session.conversation_count = storage_session.conversation_count session.events = [e.to_event() for e in reversed(storage_events)] + session.historical_events = (_events_from_storage(storage_session.historical_events) + if self._session_config.store_historical_events else []) storage_session.conversation_count = session.conversation_count @@ -556,7 +589,23 @@ async def append_event(self, session: Session, event: Event) -> Event: session_state.update(state_entry.session_state) storage_session.state = session_state # type: ignore - await self._sql_storage.add(sql_session, SessionStorageEvent.from_event(session, event)) + if filtered_events: + filtered_event_ids = [filtered_event.id for filtered_event in filtered_events] + storage_session.historical_events = (_events_to_storage( + session.historical_events) if self._session_config.store_historical_events else []) # type: ignore + filters = [ + SessionStorageEvent.app_name == app_name, SessionStorageEvent.user_id == user_id, + SessionStorageEvent.session_id == session_id, + SessionStorageEvent.id.in_(filtered_event_ids) + ] + conditions = SqlCondition(filters=filters) + event_key = SqlKey(key=(app_name, user_id, session_id), storage_cls=SessionStorageEvent) + await self._sql_storage.delete(sql_session, event_key, conditions) + else: + filtered_event_ids = [] + + if event.id not in filtered_event_ids: + await self._sql_storage.add(sql_session, SessionStorageEvent.from_event(session, event)) await self._sql_storage.commit(sql_session) await self._sql_storage.refresh(sql_session, storage_session) @@ -589,6 +638,9 @@ async def update_session(self, session: Session) -> None: await self._sql_storage.add(sql_session, SessionStorageEvent.from_event(session, event)) storage_session.state = session.state # type: ignore + storage_session.historical_events = ( + _events_to_storage(session.historical_events) if self._session_config.store_historical_events else [] + ) # type: ignore storage_session.conversation_count = session.conversation_count await self._sql_storage.commit(sql_session) diff --git a/trpc_agent_sdk/sessions/_summarizer_checker.py b/trpc_agent_sdk/sessions/_summarizer_checker.py index bcce1bcc..37863cc9 100644 --- a/trpc_agent_sdk/sessions/_summarizer_checker.py +++ b/trpc_agent_sdk/sessions/_summarizer_checker.py @@ -11,6 +11,7 @@ from typing import Callable from typing import List +from trpc_agent_sdk.events import Event from trpc_agent_sdk.log import logger from ._session import Session @@ -18,6 +19,18 @@ CheckSummarizerFunction = Callable[[Session], bool] +def _leading_summary_event(session: Session) -> Event | None: + """Return the summary anchor when the active event window starts with one.""" + if session.events and session.events[0].is_summary_event(): + return session.events[0] + return None + + +def _events_after_summary_anchor(session: Session) -> list[Event]: + """Return active events after the leading summary anchor.""" + return session.events[1:] if _leading_summary_event(session) else session.events + + def set_summarizer_token_threshold(token_count: int) -> CheckSummarizerFunction: """Set the token threshold for summarizer. @@ -30,7 +43,9 @@ def set_summarizer_token_threshold(token_count: int) -> CheckSummarizerFunction: def _decorator(session: Session) -> bool: # Filter events with usage_metadata - events_with_metadata = [event for event in session.events if event.usage_metadata is not None] + events_with_metadata = [ + event for event in _events_after_summary_anchor(session) if event.usage_metadata is not None + ] # If no events have usage_metadata, log a warning and return False if not events_with_metadata: @@ -60,7 +75,7 @@ def set_summarizer_events_count_threshold(event_count: int = 30) -> CheckSummari def _decorator(session: Session) -> bool: # Check if we have enough events to warrant summarization - return len(session.events) > event_count + return len(_events_after_summary_anchor(session)) > event_count return _decorator @@ -76,8 +91,15 @@ def set_summarizer_time_interval_threshold(time_interval: float = 300.0) -> Chec """ def _decorator(session: Session) -> bool: - # Check if it's been long enough since the last summarization - return time.time() - session.events[-1].timestamp > time_interval + summary_event = _leading_summary_event(session) + events_after_summary = _events_after_summary_anchor(session) + if not events_after_summary: + return False + + if summary_event is not None: + return time.time() - summary_event.timestamp > time_interval + + return time.time() - events_after_summary[-1].timestamp > time_interval return _decorator @@ -94,7 +116,7 @@ def set_summarizer_important_content_threshold(important_content_count: int = 10 def _decorator(session: Session) -> bool: # Check if there's important content to summarize - for event in session.events: + for event in _events_after_summary_anchor(session): if event.content and event.content.parts: for part in event.content.parts: if part.text and len(part.text.strip()) > important_content_count: diff --git a/trpc_agent_sdk/sessions/_summarizer_manager.py b/trpc_agent_sdk/sessions/_summarizer_manager.py index e938d25b..2a07f58c 100644 --- a/trpc_agent_sdk/sessions/_summarizer_manager.py +++ b/trpc_agent_sdk/sessions/_summarizer_manager.py @@ -105,10 +105,16 @@ async def create_session_summary(self, if is_should_summarize: logger.debug("Summarizing session %s", session.id) - # Compress the session. Invisible events are treated as already - # compressed/deleted for summary metrics; raw events remain stored. - original_event_count = self._count_visible_events(session) - summary_text = await self._summarizer.create_session_summary(session, ctx) + # Compress the session so the active events list contains only + # model-visible summary/recent events. Raw events are retained only + # when the session service config requests it. + original_event_count = len(session.events) + base_config = getattr(self._base_service, "session_config", None) + store_historical_events = getattr(base_config, "store_historical_events", False) + if not isinstance(store_historical_events, bool): + store_historical_events = False + summary_text = await self._summarizer.create_session_summary( + session, ctx, store_historical_events=store_historical_events) if summary_text: app_name = session.app_name user_id = session.user_id @@ -120,17 +126,13 @@ async def create_session_summary(self, session_id=session.id, summary_text=summary_text, original_event_count=original_event_count, - compressed_event_count=self._count_visible_events(session), + compressed_event_count=len(session.events), summary_timestamp=time.time(), ) # Update the stored session if self._base_service: await self._base_service.update_session(session) - @staticmethod - def _count_visible_events(session: Session) -> int: - return sum(1 for event in session.events if event.is_model_visible()) - async def get_session_summary(self, session: Session) -> Optional[SessionSummary]: """Get a summary of a session. diff --git a/trpc_agent_sdk/sessions/_types.py b/trpc_agent_sdk/sessions/_types.py index e9d670ba..5d4c548f 100644 --- a/trpc_agent_sdk/sessions/_types.py +++ b/trpc_agent_sdk/sessions/_types.py @@ -25,6 +25,8 @@ class SessionServiceConfig(BaseModel): """Time-to-live in seconds for events. If 0, no TTL filtering is applied.""" num_recent_events: int = Field(default=0, description="Number of recent events to keep") """Number of recent events to keep. If 0, no recent events are kept.""" + store_historical_events: bool = Field(default=False, description="Whether to store historical events") + """Whether to keep events that were moved out of the active event window.""" ttl: Ttl = Field(default_factory=Ttl, description="TTL configuration") """TTL configuration.""" diff --git a/trpc_agent_sdk/sessions/_utils.py b/trpc_agent_sdk/sessions/_utils.py index 68002623..ff842018 100644 --- a/trpc_agent_sdk/sessions/_utils.py +++ b/trpc_agent_sdk/sessions/_utils.py @@ -86,39 +86,28 @@ def is_summary_anchor(event: Event) -> bool: def find_events_for_summary(events: list[Event], keep_recent_count: int = 10, start_by_user_turn: bool = True) -> tuple[list[Event], int]: - """Find events that should be summarized. + """Find active events that should be summarized without checking model-visible flags. - Args: - events: Source events to inspect. - keep_recent_count: Number of recent model-visible events to keep out of the summary window. - start_by_user_turn: Whether to align the summary window to user or summary events. - - Returns: - A tuple of selected events and the summary insertion index in the original events. - Returns ([], -1) when no model-visible events can be selected. + The optimized summarizer keeps only model-facing events in Session.events and + moves compressed raw events to Session.historical_events, so the active list + itself is the source of truth. """ - visible_event_indices = [idx for idx, event in enumerate(events) if event.is_model_visible()] - if not visible_event_indices: + if not events: return [], -1 - first_visible_index = visible_event_indices[0] - last_visible_index = visible_event_indices[-1] - start_index = first_visible_index - + start_index = 0 if start_by_user_turn and not is_summary_anchor(events[start_index]): - for idx in range(first_visible_index - 1, -1, -1): - if is_summary_anchor(events[idx]): + for idx, event in enumerate(events): + if is_summary_anchor(event): start_index = idx break - window_end_index = last_visible_index + 1 - visible_events_count = len(visible_event_indices) - if keep_recent_count <= 0 or keep_recent_count >= visible_events_count: - insert_index = window_end_index + if keep_recent_count <= 0 or keep_recent_count >= len(events): + insert_index = len(events) else: - insert_index = visible_event_indices[-keep_recent_count] + insert_index = len(events) - keep_recent_count if start_by_user_turn: - for idx in range(insert_index, window_end_index): + for idx in range(insert_index, len(events)): if is_summary_anchor(events[idx]): insert_index = idx break