Skip to content

Commit 7c0de62

Browse files
committed
fix: sync mid table repeat data
1 parent b2f8f47 commit 7c0de62

4 files changed

Lines changed: 22 additions & 6 deletions

File tree

src/backend/bisheng/script/sync_increment_table.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from bisheng.api.services.workflow import WorkFlowService
44
from bisheng.knowledge.domain.services.knowledge_service import KnowledgeService
55
from bisheng.user.domain.services.user import UserService
6+
from bisheng.worker import sync_mid_user_interact_dtl
67
from bisheng.worker.telemetry.mid_table import sync_mid_user_increment, sync_mid_knowledge_increment, \
78
sync_mid_app_increment
89

@@ -46,7 +47,14 @@ def sync_app_increment_table_all():
4647
sync_mid_app_increment(start_date, end_date)
4748

4849

50+
def sync_user_interact_dtl_all():
51+
first_date = datetime(2025, 12, 1).isoformat()
52+
end_date = datetime.now().isoformat()
53+
sync_mid_user_interact_dtl(first_date, end_date)
54+
55+
4956
if __name__ == '__main__':
5057
sync_user_increment_table_all()
5158
sync_knowledge_increment_table_all()
5259
sync_app_increment_table_all()
60+
sync_user_interact_dtl_all()

src/backend/bisheng/telemetry/domain/mid_table/base.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
1-
from typing import Dict, Any, List
1+
from typing import Dict, Any, List, Optional
22

33
from elasticsearch import Elasticsearch, AsyncElasticsearch, exceptions as es_exceptions, helpers
44
from loguru import logger
5-
from pydantic import BaseModel
5+
from pydantic import BaseModel, Field
66

77
from bisheng.common.schemas.telemetry.base_telemetry_schema import UserContext
88
from bisheng.common.services import telemetry_service
@@ -32,6 +32,7 @@
3232

3333
class BaseRecord(UserContext):
3434
timestamp: int
35+
es_id: Optional[str] = Field(default=None)
3536

3637

3738
class BaseMidTable(BaseModel):
@@ -96,14 +97,16 @@ def get_latest_record_time_sync(self) -> int | None:
9697
return latest_time
9798
return None
9899

99-
def insert_records_sync(self, records: List[BaseModel]) -> None:
100+
def insert_records_sync(self, records: List[BaseRecord]) -> None:
100101
""" 批量插入记录 """
101102
actions = []
102103
for rec in records:
103104
action = {
104105
"_index": self._index_name,
105-
"_source": rec.model_dump()
106+
"_source": rec.model_dump(exclude={"es_id"}),
106107
}
108+
if rec.es_id is not None:
109+
action["_id"] = rec.es_id
107110
actions.append(action)
108111
helpers.bulk(self._es_client_sync, actions)
109112

src/backend/bisheng/telemetry/domain/mid_table/user_interact.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ def get_records_by_time_range_sync(self, start_time: int, end_time: int, page: i
2525
"bool": {
2626
"filter": [
2727
{"term": {"event_type": {"value": BaseTelemetryTypeEnum.MESSAGE_FEEDBACK.value}}},
28-
{"range": {"timestamp": {"gte": start_time, "lt": end_time}}}
28+
{"range": {"timestamp": {"gte": start_time * 1000, "lt": end_time * 1000}}}
2929
]
3030
}
3131
}

src/backend/bisheng/worker/telemetry/mid_table.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ def sync_mid_user_increment(start_date: str = None, end_date: str = None):
6969
records = []
7070
for user in user_list:
7171
records.append(UserIncrementRecord(
72+
es_id=f"user_{user.user_id}",
7273
user_id=user.user_id,
7374
user_name=user.user_name,
7475
user_group_infos=[UserGroupInfo(user_group_id=group.id, user_group_name=group.group_name)
@@ -121,6 +122,7 @@ def sync_mid_app_increment(start_date: str = None, end_date: str = None):
121122
for app in app_list:
122123
user = user_map.get(app['user_id'], None)
123124
records.append(AppIncrementRecord(
125+
es_id=f"app_{app['id']}",
124126
user_id=app['user_id'],
125127
user_name=user.user_name if user else "",
126128
user_group_infos=[UserGroupInfo(user_group_id=group.id, user_group_name=group.group_name)
@@ -167,6 +169,7 @@ def sync_mid_knowledge_increment(start_date: str = None, end_date: str = None):
167169
for knowledge in knowledge_list:
168170
user = user_map.get(knowledge.user_id, None)
169171
records.append(KnowledgeIncrementRecord(
172+
es_id=f"knowledge_{knowledge.id}",
170173
user_id=knowledge.user_id,
171174
user_name=user.user_name if user else "",
172175
user_group_infos=[UserGroupInfo(user_group_id=group.id, user_group_name=group.group_name)
@@ -204,16 +207,18 @@ def sync_mid_user_interact_dtl(start_date: str = None, end_date: str = None):
204207
break
205208
records = []
206209
for record in result:
210+
es_id = record['_id']
207211
record = record['_source']
208212
records.append(UserInteractRecord(
213+
es_id=es_id,
209214
user_id=record['user_context']['user_id'],
210215
user_name=record['user_context']['user_name'],
211216
user_group_infos=[UserGroupInfo(user_group_id=group['user_group_id'],
212217
user_group_name=group['user_group_name'])
213218
for group in record['user_context'].get('user_group_infos', [])],
214219
user_role_infos=[UserRoleInfo(role_id=role['role_id'],
215220
role_name=role['role_name'],
216-
group_id=role['group_id'])
221+
group_id=role.get('group_id', 0))
217222
for role in record['user_context'].get('user_role_infos', [])],
218223
event_id=record['event_id'],
219224
timestamp=record['timestamp'],

0 commit comments

Comments
 (0)