Skip to content

Commit f46c257

Browse files
committed
fix:修改reindex 脚本
1 parent 111d9d9 commit f46c257

1 file changed

Lines changed: 79 additions & 25 deletions

File tree

src/backend/bisheng/script/base_telemetry_events_reindex.py

Lines changed: 79 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -37,13 +37,72 @@
3737
}
3838
}
3939

40+
import time
41+
42+
43+
def wait_for_task(
44+
es,
45+
task_id: str,
46+
poll_interval: int = 10,
47+
timeout: int = 3600,
48+
):
49+
"""
50+
轮询 ES task 状态,直到完成或超时
51+
"""
52+
start_time = time.time()
53+
54+
while True:
55+
56+
task_info = es.tasks.get(task_id=task_id)
57+
completed = task_info.get("completed", False)
58+
59+
if completed:
60+
response = task_info.get("response", {})
61+
failures = response.get("failures", [])
62+
total = response.get("total", 0)
63+
created = response.get("created", 0)
64+
updated = response.get("updated", 0)
65+
66+
print(
67+
f"[REINDEX DONE] total={total}, created={created}, updated={updated}"
68+
)
69+
70+
if failures:
71+
raise RuntimeError(f"Reindex failures: {failures}")
72+
73+
return response
74+
75+
if time.time() - start_time > timeout:
76+
raise TimeoutError(f"Reindex task timeout: {task_id}")
77+
78+
status = task_info.get("task", {}).get("status", {})
79+
print(
80+
f"[REINDEX RUNNING] "
81+
f"total={status.get('total', 0)} "
82+
f"created={status.get('created', 0)} "
83+
f"updated={status.get('updated', 0)}"
84+
)
85+
86+
time.sleep(poll_interval)
87+
88+
89+
def count_docs(es, index):
90+
return es.count(index=index)["count"]
91+
92+
4093
if __name__ == '__main__':
4194
es_conn = get_statistics_es_connection_sync()
42-
43-
# 临时索引名称
44-
temp_index_name = "base_telemetry_events_temp_reindex"
95+
temp_index_name = "base_telemetry_events_v1"
4596
original_index_name = "base_telemetry_events"
4697

98+
# 1. 记录原始数据量
99+
try:
100+
source_count = count_docs(es_conn, original_index_name)
101+
print(f"Original doc count: {source_count}")
102+
except:
103+
source_count = 0
104+
print("Original index might not exist.")
105+
47106
# 创建临时索引
48107
if not es_conn.indices.exists(index=temp_index_name):
49108
es_conn.indices.create(index=temp_index_name, body=INDEX_MAPPING)
@@ -59,32 +118,27 @@
59118
}
60119
}
61120

62-
es_conn.reindex(body=reindex_body, wait_for_completion=True, request_timeout=3600)
63-
print(f"Reindexed data from {original_index_name} to {temp_index_name}")
121+
resp = es_conn.options(request_timeout=3600).reindex(
122+
body=reindex_body,
123+
wait_for_completion=False
124+
)
64125

65-
# 删除原始索引
66-
es_conn.indices.delete(index=original_index_name)
126+
task_id = resp["task"]
127+
print(f"Reindex started, task_id={task_id}")
67128

68-
print(f"Deleted original index: {original_index_name}")
129+
wait_for_task(
130+
es_conn,
131+
task_id=task_id,
132+
poll_interval=5,
133+
timeout=3600
134+
)
69135

70-
# 创建新的原始索引
71-
es_conn.indices.create(index=original_index_name, body=INDEX_MAPPING)
72-
print(f"Created new index: {original_index_name}")
136+
# 删除原始索引
137+
es_conn.indices.delete(index=original_index_name)
73138

74-
# 使用_reindex API将数据从临时索引迁移回原始索引
75-
reindex_back_body = {
76-
"source": {
77-
"index": temp_index_name
78-
},
79-
"dest": {
80-
"index": original_index_name
81-
}
82-
}
139+
# 将临时索引重命名为原始索引名
140+
es_conn.indices.put_alias(index=temp_index_name, name=original_index_name)
83141

84-
es_conn.reindex(body=reindex_back_body, wait_for_completion=True, request_timeout=3600)
85142

86-
# 删除临时索引
87-
es_conn.indices.delete(index=temp_index_name)
143+
print(f"Reindexed data to {original_index_name} successfully.")
88144

89-
print(f"Reindexed data back to {original_index_name} and deleted temporary index: {temp_index_name}")
90-
print("Reindexing process completed successfully.")

0 commit comments

Comments
 (0)