Skip to content

Commit deabcec

Browse files
committed
feat:添加base_telemetry_events reindex脚本
1 parent 57104ee commit deabcec

2 files changed

Lines changed: 96 additions & 0 deletions

File tree

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
from bisheng.core.search.elasticsearch.manager import get_statistics_es_connection_sync
2+
3+
INDEX_MAPPING = {
4+
"mappings": { # Defining the indexed Mapping
5+
"properties": {
6+
"event_id": {"type": "keyword"},
7+
"event_type": {"type": "keyword"},
8+
"trace_id": {"type": "keyword"},
9+
"timestamp": {"type": "date", "format": "strict_date_optional_time||epoch_second"},
10+
"user_context": {
11+
"type": "object",
12+
"properties": {
13+
"user_id": {"type": "integer"},
14+
"user_name": {"type": "keyword"},
15+
"user_group_infos": {
16+
"type": "object",
17+
"properties": {
18+
"user_group_id": {"type": "integer"},
19+
"user_group_name": {"type": "keyword"}
20+
}
21+
},
22+
"user_role_infos": {
23+
"type": "object",
24+
"properties": {
25+
"role_id": {"type": "integer"},
26+
"role_name": {"type": "keyword"},
27+
"group_id": {"type": "integer"},
28+
}
29+
}
30+
}
31+
},
32+
"event_data": {
33+
"type": "object",
34+
"dynamic": True
35+
}
36+
}
37+
}
38+
}
39+
40+
if __name__ == '__main__':
41+
es_conn = get_statistics_es_connection_sync()
42+
43+
# 临时索引名称
44+
temp_index_name = "base_telemetry_events_temp_reindex"
45+
original_index_name = "base_telemetry_events"
46+
47+
# 创建临时索引
48+
if not es_conn.indices.exists(index=temp_index_name):
49+
es_conn.indices.create(index=temp_index_name, body=INDEX_MAPPING)
50+
print(f"Created temporary index: {temp_index_name}")
51+
52+
# 使用Elasticsearch的_reindex API进行数据迁移
53+
reindex_body = {
54+
"source": {
55+
"index": original_index_name
56+
},
57+
"dest": {
58+
"index": temp_index_name
59+
}
60+
}
61+
62+
es_conn.reindex(body=reindex_body, wait_for_completion=True, request_timeout=3600)
63+
print(f"Reindexed data from {original_index_name} to {temp_index_name}")
64+
65+
# 删除原始索引
66+
es_conn.indices.delete(index=original_index_name)
67+
68+
print(f"Deleted original index: {original_index_name}")
69+
70+
# 创建新的原始索引
71+
es_conn.indices.create(index=original_index_name, body=INDEX_MAPPING)
72+
print(f"Created new index: {original_index_name}")
73+
74+
# 使用_reindex API将数据从临时索引迁移回原始索引
75+
reindex_back_body = {
76+
"source": {
77+
"index": temp_index_name
78+
},
79+
"dest": {
80+
"index": original_index_name
81+
}
82+
}
83+
84+
es_conn.reindex(body=reindex_back_body, wait_for_completion=True, request_timeout=3600)
85+
86+
# 删除临时索引
87+
es_conn.indices.delete(index=temp_index_name)
88+
89+
print(f"Reindexed data back to {original_index_name} and deleted temporary index: {temp_index_name}")
90+
print("Reindexing process completed successfully.")
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
#!/bin/bash
2+
3+
export PYTHONPATH="./"
4+
echo "Reindexing telemetry events..."
5+
python bisheng/script/base_telemetry_events_reindex.py
6+
echo "Reindexing completed."

0 commit comments

Comments
 (0)