forked from JianLIUhep/RCTutils
-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathrct.py
More file actions
143 lines (116 loc) · 5.92 KB
/
rct.py
File metadata and controls
143 lines (116 loc) · 5.92 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
import requests
import csv
import json
from datetime import datetime, timezone
import urllib3
import argparse
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
def fetch_data_pass_ids(api_base_url, token):
"""Fetches all data passes and returns a dictionary mapping names to IDs."""
url = f"{api_base_url}/dataPasses?token={token}"
response = requests.get(url, verify=False)
data_passes = response.json().get('data', [])
return {dp['name']: dp['id'] for dp in data_passes}
def fetch_runs(api_base_url, data_pass_id, token):
"""Fetches a list of runs for a given data pass ID from the API."""
url = f"{api_base_url}/runs?filter[dataPassIds][]={data_pass_id}&token={token}"
response = requests.get(url, verify=False)
runs = response.json().get('data', [])
# Extract detectors involved in each run
for run in runs:
run['detectors_involved'] = run.get('detectors', '').split(',')
return runs
def fetch_detector_flags(flag_api_url, data_pass_id, run_number, detector_id, token):
"""
Fetches and filters detector flags with valid effective periods.
"""
url = f"{flag_api_url}?dataPassId={data_pass_id}&runNumber={run_number}&dplDetectorId={detector_id}&token={token}"
response = requests.get(url, verify=False)
flags = response.json().get('data', [])
if not flags:
return ["Not Available"]
return [flag for flag in flags if flag.get("effectivePeriods")]
def format_flags(flags, convert_time=False):
if flags == ["Not Available"]:
return "Not Available"
if flags == ["Not Present"]:
return "Not Present"
# Ensure flags is a list of dictionaries
if isinstance(flags, str):
try:
flags = json.loads(flags)
except json.JSONDecodeError:
raise ValueError("Invalid format: flags must be a list of dictionaries or a valid JSON string.")
# Ensure all elements in flags are dictionaries
if not isinstance(flags, list) or not all(isinstance(flag, dict) for flag in flags):
raise ValueError("Invalid format: flags must be a list of dictionaries.")
# Format flags
formatted_flags = []
for flag in flags:
for period in flag["effectivePeriods"]:
if convert_time:
# Convert timestamps to human-readable datetime
from_time = datetime.utcfromtimestamp(period["from"] / 1000).strftime('%Y-%m-%d %H:%M:%S')
to_time = datetime.utcfromtimestamp(period["to"] / 1000).strftime('%Y-%m-%d %H:%M:%S')
else:
# Keep raw timestamps
from_time = period["from"]
to_time = period["to"]
formatted_flags.append(f"{flag['flagType']['method']} (from: {from_time} to: {to_time})")
return " | ".join(formatted_flags)
def main(config_file, convert_time):
# Load configuration from the specified JSON file
with open(config_file, 'r') as file:
config = json.load(file)
api_base_url = config['run_api_url']
flag_api_url = config['flag_api_url']
token = config['token']
data_pass_names = config['dataPassNames']
# Get mapping of data pass names to IDs
data_pass_ids = fetch_data_pass_ids(api_base_url, token)
# Process each data pass name
for data_pass_name, data_pass_info in data_pass_names.items():
data_pass_id = data_pass_ids.get(data_pass_name)
if not data_pass_id:
print(f"No data pass ID found for {data_pass_name}. Check if your token is still valid; the token validity is 1 week only.")
continue
runs = fetch_runs(api_base_url, data_pass_id, token)
# Filter runs by range if specified
run_range = data_pass_info.get("run_range", [None, None])
if run_range[0] is not None:
runs = [run for run in runs if run['runNumber'] >= run_range[0]]
if run_range[1] is not None:
runs = [run for run in runs if run['runNumber'] <= run_range[1]]
# Define the CSV filename based on the data pass name and run range
safe_name = data_pass_name.replace(' ', '_').replace('/', '_')
if run_range[0] is not None and run_range[1] is not None:
csv_filename = f'Runs_{safe_name}_{run_range[0]}_{run_range[1]}.csv'
elif run_range[0] is not None:
csv_filename = f'Runs_{safe_name}_from_{run_range[0]}.csv'
elif run_range[1] is not None:
csv_filename = f'Runs_{safe_name}_to_{run_range[1]}.csv'
else:
csv_filename = f'Runs_{safe_name}.csv'
with open(csv_filename, 'w', newline='') as file:
writer = csv.writer(file)
# Write headers with detector names
headers = ['Run Number'] + [name for name in config['detector_ids'].keys()]
writer.writerow(headers)
for run in runs:
run_number = run['runNumber']
row = [run_number]
involved_detectors = run['detectors_involved']
for detector_name, detector_id in config['detector_ids'].items():
if detector_name not in involved_detectors:
row.append("Not present")
else:
flags = fetch_detector_flags(flag_api_url, data_pass_id, run_number, detector_id, token)
row.append(format_flags(flags, convert_time=convert_time))
writer.writerow(row)
print(f"Data has been written to {csv_filename}")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Fetch run and detector data based on configuration.")
parser.add_argument("config_file", help="Path to the JSON configuration file")
parser.add_argument("--convert-time", action="store_true", help="Convert timestamps to human-readable datetime")
args = parser.parse_args()
main(args.config_file, args.convert_time)