Skip to content

Commit 4c74266

Browse files
wiechulashahor02
authored andcommitted
Add option to publish after a certain amount of processed TFs
1 parent 8b6f9d3 commit 4c74266

4 files changed

Lines changed: 58 additions & 24 deletions

File tree

Detectors/TPC/workflow/include/TPCWorkflow/CalDetMergerPublisherSpec.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ namespace o2
2222
namespace tpc
2323
{
2424

25-
o2::framework::DataProcessorSpec getCalDetMergerPublisherSpec(bool skipCCDB);
25+
o2::framework::DataProcessorSpec getCalDetMergerPublisherSpec(uint32_t lanes, bool skipCCDB, bool dumpAfterComplete = false);
2626

2727
} // namespace tpc
2828
} // namespace o2

Detectors/TPC/workflow/include/TPCWorkflow/TPCCalibPedestalSpec.h

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ namespace tpc
4646
class TPCCalibPedestalDevice : public o2::framework::Task
4747
{
4848
public:
49-
TPCCalibPedestalDevice(int lane, const std::vector<int>& sectors) : mLane{lane}, mSectors(sectors) {}
49+
TPCCalibPedestalDevice(uint32_t lane, const std::vector<int>& sectors, uint32_t publishAfterTFs) : mLane{lane}, mSectors(sectors), mPublishAfter(publishAfterTFs) {}
5050

5151
void init(o2::framework::InitContext& ic) final
5252
{
@@ -87,11 +87,20 @@ class TPCCalibPedestalDevice : public o2::framework::Task
8787
calib_processing_helper::processRawData(pc.inputs(), reader, mUseOldSubspec, mSectors);
8888

8989
mCalibPedestal.incrementNEvents();
90-
LOGP(info, "Number of processed events: {} ({})", mCalibPedestal.getNumberOfProcessedEvents(), mMaxEvents);
90+
const auto nTFs = mCalibPedestal.getNumberOfProcessedEvents();
91+
LOGP(info, "Number of processed TFs: {} ({})", nTFs, mMaxEvents);
9192

92-
if ((mCalibPedestal.getNumberOfProcessedEvents() >= mMaxEvents) && !mCalibDumped) {
93-
LOGP(info, "Maximm number of events reached ({}), no more processing will be done", mMaxEvents);
93+
if ((mPublishAfter && (nTFs % mPublishAfter) == 0)) {
94+
LOGP(info, "Publishing after {} TFs", nTFs);
95+
mCalibPedestal.analyse();
96+
dumpCalibData();
97+
sendOutput(pc.outputs());
98+
}
99+
100+
if (mMaxEvents && (mCalibPedestal.getNumberOfProcessedEvents() >= mMaxEvents) && !mCalibDumped) {
101+
LOGP(info, "Maximm number of TFs reached ({}), no more processing will be done", mMaxEvents);
94102
mReadyToQuit = true;
103+
mCalibPedestal.analyse();
95104
dumpCalibData();
96105
if (mForceQuit) {
97106
pc.services().get<ControlService>().endOfStream();
@@ -105,6 +114,7 @@ class TPCCalibPedestalDevice : public o2::framework::Task
105114
void endOfStream(o2::framework::EndOfStreamContext& ec) final
106115
{
107116
LOGP(info, "endOfStream");
117+
mCalibPedestal.analyse();
108118
dumpCalibData();
109119
sendOutput(ec.outputs());
110120
ec.services().get<ControlService>().readyToQuit(QuitRequest::Me);
@@ -113,8 +123,9 @@ class TPCCalibPedestalDevice : public o2::framework::Task
113123
private:
114124
CalibPedestal mCalibPedestal;
115125
rawreader::RawReaderCRUManager mRawReader;
116-
uint32_t mMaxEvents{100}; ///< maximum number of events to process
117-
int mLane{0}; ///< lane number of processor
126+
uint32_t mMaxEvents{0}; ///< maximum number of events to process
127+
uint32_t mPublishAfter{0}; ///< number of events after which to dump the calibration
128+
uint32_t mLane{0}; ///< lane number of processor
118129
std::vector<int> mSectors{}; ///< sectors to process in this instance
119130
bool mReadyToQuit{false}; ///< if processor is ready to quit
120131
bool mCalibDumped{false}; ///< if calibration object already dumped
@@ -134,7 +145,7 @@ class TPCCalibPedestalDevice : public o2::framework::Task
134145
auto image = o2::utils::MemFileHelper::createFileImage(cal, typeid(*cal), cal->getName(), "data");
135146
int type = int(dataType[i]);
136147

137-
header::DataHeader::SubSpecificationType subSpec{(header::DataHeader::SubSpecificationType)i};
148+
header::DataHeader::SubSpecificationType subSpec{(header::DataHeader::SubSpecificationType)((mLane << 4) + i)};
138149
output.snapshot(Output{clbUtils::gDataOriginCLB, "TPCCLBPART", subSpec}, *image.get());
139150
output.snapshot(Output{clbUtils::gDataOriginCLB, "TPCCLBPARTINFO", subSpec}, type);
140151
}
@@ -145,14 +156,13 @@ class TPCCalibPedestalDevice : public o2::framework::Task
145156
{
146157
if (mDirectFileDump && !mCalibDumped) {
147158
LOGP(info, "Dumping output");
148-
mCalibPedestal.analyse();
149159
mCalibPedestal.dumpToFile(fmt::format("pedestals_{:02}.root", mLane));
150160
mCalibDumped = true;
151161
}
152162
}
153163
};
154164

155-
DataProcessorSpec getTPCCalibPedestalSpec(const std::string inputSpec, int ilane = 0, std::vector<int> sectors = {})
165+
DataProcessorSpec getTPCCalibPedestalSpec(const std::string inputSpec, uint32_t ilane = 0, std::vector<int> sectors = {}, uint32_t publishAfterTFs = 0)
156166
{
157167
std::vector<o2::framework::OutputSpec> outputs;
158168
outputs.emplace_back(ConcreteDataTypeMatcher{clbUtils::gDataOriginCLB, "TPCCLBPART"});
@@ -163,9 +173,9 @@ DataProcessorSpec getTPCCalibPedestalSpec(const std::string inputSpec, int ilane
163173
id.data(),
164174
select(inputSpec.data()),
165175
outputs,
166-
AlgorithmSpec{adaptFromTask<TPCCalibPedestalDevice>(ilane, sectors)},
176+
AlgorithmSpec{adaptFromTask<TPCCalibPedestalDevice>(ilane, sectors, publishAfterTFs)},
167177
Options{
168-
{"max-events", VariantType::Int, 100, {"maximum number of events to process"}},
178+
{"max-events", VariantType::Int, 0, {"maximum number of events to process"}},
169179
{"use-old-subspec", VariantType::Bool, false, {"use old subsecifiation definition"}},
170180
{"force-quit", VariantType::Bool, false, {"force quit after max-events have been reached"}},
171181
{"direct-file-dump", VariantType::Bool, false, {"directly dump calibration to file"}},

Detectors/TPC/workflow/src/CalDetMergerPublisherSpec.cxx

Lines changed: 29 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
/// @brief TPC CalDet merger and CCDB publisher
1313
/// @author Jens Wiechula, Jens.Wiechula@ikf.uni-frankfurt.de
1414

15+
#include <bitset>
1516
#include <unordered_map>
1617
#include <vector>
1718
#include <string>
@@ -45,7 +46,7 @@ class CalDetMergerPublisherSpec : public o2::framework::Task
4546
using CcdbObjectInfo = o2::ccdb::CcdbObjectInfo;
4647

4748
public:
48-
CalDetMergerPublisherSpec(bool skipCCDB) : mSkipCCDB(skipCCDB) {}
49+
CalDetMergerPublisherSpec(uint32_t lanes, bool skipCCDB, bool dumpAfterComplete = false) : mLanesToExpect(lanes), mSkipCCDB(skipCCDB), mPublishAfterComplete(dumpAfterComplete) {}
4950

5051
void init(o2::framework::InitContext& ic) final
5152
{
@@ -58,10 +59,13 @@ class CalDetMergerPublisherSpec : public o2::framework::Task
5859
int nSlots = pc.inputs().getNofParts(0);
5960
assert(pc.inputs().getNofParts(1) == nSlots);
6061

61-
LOGP(info, "CalDetMergerPublisherSpec run");
6262
for (int isl = 0; isl < nSlots; isl++) {
6363
const auto type = pc.inputs().get<int>("clbInfo", isl);
6464
const auto pld = pc.inputs().get<gsl::span<char>>("clbPayload", isl); // this is actually an image of TMemFile
65+
const auto* dh = DataRefUtils::getHeader<o2::header::DataHeader*>(pc.inputs().get("clbInfo", isl));
66+
const auto subSpec = dh->subSpecification;
67+
const int lane = subSpec >> 4;
68+
const int calibType = subSpec & 0xf;
6569

6670
//const auto& path = wrp->getPath();
6771
TMemFile f("file", (char*)&pld[0], pld.size(), "READ");
@@ -77,13 +81,28 @@ class CalDetMergerPublisherSpec : public o2::framework::Task
7781
}
7882
f.Close();
7983

80-
LOGP(info, "getting slot {}", isl);
84+
LOGP(info, "getting slot {}, subspec {:#8x}, lane {}, type {}", isl, subSpec, lane, calibType);
85+
//if (mReceivedLanes.test(lane)) {
86+
//LOGP(warning, "lane {} received multiple times", lane);
87+
//}
88+
mReceivedLanes.set(lane);
89+
}
90+
91+
if (mReceivedLanes.count() == mLanesToExpect) {
92+
LOGP(info, "data of all lanes received");
93+
if (mPublishAfterComplete) {
94+
LOGP(info, "publishing after all data was received");
95+
dumpCalibData();
96+
sendOutput(pc.outputs());
97+
mCalibDumped = false;
98+
}
99+
mReceivedLanes.reset();
81100
}
82101
}
83102

84103
void endOfStream(o2::framework::EndOfStreamContext& ec) final
85104
{
86-
LOGP(info, "CalDetMergerPublisherSpec endOfStream");
105+
LOGP(info, "endOfStream");
87106

88107
dumpCalibData();
89108
sendOutput(ec.outputs());
@@ -92,9 +111,12 @@ class CalDetMergerPublisherSpec : public o2::framework::Task
92111

93112
private:
94113
using dataType = o2::tpc::CalDet<float>;
114+
std::bitset<128> mReceivedLanes; ///< counter for received lanes
95115
std::unordered_map<int, dataType> mMergedCalDets; ///< calibration data to merge
116+
uint32_t mLanesToExpect{0}; ///< number of expected lanes sending data
96117
bool mForceQuit{false}; ///< for quit after processing finished
97118
bool mDirectFileDump{false}; ///< directly dump the calibration data to file
119+
bool mPublishAfterComplete{false}; ///< dump calibration directly after data from all lanes received
98120
bool mCalibDumped{false}; ///< if calibration object already dumped
99121
bool mSkipCCDB{false}; ///< skip sending of calibration data
100122

@@ -129,7 +151,7 @@ class CalDetMergerPublisherSpec : public o2::framework::Task
129151
void dumpCalibData()
130152
{
131153
if (mDirectFileDump && !mCalibDumped) {
132-
LOGP(info, "CalDetMergerPublisherSpec Dumping output");
154+
LOGP(info, "Dumping output to file");
133155
TFile f("merged_CalDet.root", "recreate");
134156
for (auto& [type, object] : mMergedCalDets) {
135157
f.WriteObject(&object, object.getName().data());
@@ -139,7 +161,7 @@ class CalDetMergerPublisherSpec : public o2::framework::Task
139161
}
140162
};
141163

142-
o2::framework::DataProcessorSpec o2::tpc::getCalDetMergerPublisherSpec(bool skipCCDB)
164+
o2::framework::DataProcessorSpec o2::tpc::getCalDetMergerPublisherSpec(uint32_t lanes, bool skipCCDB, bool dumpAfterComplete)
143165
{
144166
std::vector<OutputSpec> outputs;
145167
if (!skipCCDB) {
@@ -157,7 +179,7 @@ o2::framework::DataProcessorSpec o2::tpc::getCalDetMergerPublisherSpec(bool skip
157179
id.data(),
158180
inputs,
159181
outputs,
160-
AlgorithmSpec{adaptFromTask<CalDetMergerPublisherSpec>(skipCCDB)},
182+
AlgorithmSpec{adaptFromTask<CalDetMergerPublisherSpec>(lanes, skipCCDB, dumpAfterComplete)},
161183
Options{
162184
{"force-quit", VariantType::Bool, false, {"force quit after max-events have been reached"}},
163185
{"direct-file-dump", VariantType::Bool, false, {"directly dump calibration to file"}},

Detectors/TPC/workflow/src/tpc-calib-pedestal.cxx

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -46,10 +46,11 @@ void customize(std::vector<o2::framework::CompletionPolicy>& policies)
4646
void customize(std::vector<ConfigParamSpec>& workflowOptions)
4747
{
4848
std::string sectorDefault = "0-" + std::to_string(o2::tpc::Sector::MAXSECTOR - 1);
49-
int defaultlanes = 1; //std::max(1u, std::thread::hardware_concurrency() / 2);
49+
int defaultlanes = std::max(1u, std::thread::hardware_concurrency() / 2);
5050

5151
std::vector<ConfigParamSpec> options{
5252
{"input-spec", VariantType::String, "A:TPC/RAWDATA", {"selection string input specs"}},
53+
{"publish-after-tfs", VariantType::Int, 0, {"number of time frames after which to force publishing the objects"}},
5354
{"configKeyValues", VariantType::String, "", {"Semicolon separated key=value strings (e.g.: 'TPCCalibPedestal.FirstTimeBin=10;...')"}},
5455
{"configFile", VariantType::String, "", {"configuration file for configurable parameters"}},
5556
{"no-write-ccdb", VariantType::Bool, false, {"skip sending the calibration output to CCDB"}},
@@ -75,10 +76,11 @@ WorkflowSpec defineDataProcessing(ConfigContext const& config)
7576

7677
const std::string inputSpec = config.options().get<std::string>("input-spec");
7778
const auto skipCCDB = config.options().get<bool>("no-write-ccdb");
79+
const auto publishAfterTFs = (uint32_t)config.options().get<int>("publish-after-tfs");
7880

7981
const auto tpcsectors = o2::RangeTokenizer::tokenize<int>(config.options().get<std::string>("sectors"));
80-
const auto nSectors = (int)tpcsectors.size();
81-
const auto nLanes = std::min(config.options().get<int>("lanes"), nSectors);
82+
const auto nSectors = (uint32_t)tpcsectors.size();
83+
const auto nLanes = std::min((uint32_t)config.options().get<int>("lanes"), nSectors);
8284
const auto sectorsPerLane = nSectors / nLanes + ((nSectors % nLanes) != 0);
8385

8486
WorkflowSpec workflow;
@@ -94,10 +96,10 @@ WorkflowSpec defineDataProcessing(ConfigContext const& config)
9496
}
9597
auto last = std::min(tpcsectors.end(), first + sectorsPerLane);
9698
std::vector<int> range(first, last);
97-
workflow.emplace_back(getTPCCalibPedestalSpec(inputSpec, ilane, range));
99+
workflow.emplace_back(getTPCCalibPedestalSpec(inputSpec, ilane, range, publishAfterTFs));
98100
}
99101

100-
workflow.emplace_back(getCalDetMergerPublisherSpec(skipCCDB));
102+
workflow.emplace_back(getCalDetMergerPublisherSpec(nLanes, skipCCDB, publishAfterTFs > 0));
101103

102104
return workflow;
103105
}

0 commit comments

Comments
 (0)