Skip to content

Commit 05d755d

Browse files
committed
DPL: fix threading issue in ServiceRegistry
Protect monitoring thread trying to grab ws driver before it's initialized.
1 parent 14566b5 commit 05d755d

2 files changed

Lines changed: 38 additions & 9 deletions

File tree

Framework/Core/include/Framework/ServiceRegistry.h

Lines changed: 29 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -97,13 +97,18 @@ struct ServiceRegistry {
9797
/// Callbacks for services to be executed on exit
9898
std::vector<ServiceExitHandle> mPreExitHandles;
9999

100+
/// To hide exception throwing from QC
101+
void throwError(RuntimeErrorRef const& ref) const;
102+
100103
public:
101104
using hash_type = decltype(TypeIdHelpers::uniqueId<void>());
102105
ServiceRegistry();
103106

104107
ServiceRegistry(ServiceRegistry const& other)
105108
{
106-
mServicesKey = other.mServicesKey;
109+
for (size_t i = 0; i < MAX_SERVICES; ++i) {
110+
mServicesKey[i].store(other.mServicesKey[i].load());
111+
}
107112
mServicesValue = other.mServicesValue;
108113
mServicesMeta = other.mServicesMeta;
109114
for (size_t i = 0; i < other.mServicesBooked.size(); ++i) {
@@ -113,7 +118,9 @@ struct ServiceRegistry {
113118

114119
ServiceRegistry& operator=(ServiceRegistry const& other)
115120
{
116-
mServicesKey = other.mServicesKey;
121+
for (size_t i = 0; i < MAX_SERVICES; ++i) {
122+
mServicesKey[i].store(other.mServicesKey[i].load());
123+
}
117124
mServicesValue = other.mServicesValue;
118125
mServicesMeta = other.mServicesMeta;
119126
for (size_t i = 0; i < other.mServicesBooked.size(); ++i) {
@@ -168,9 +175,8 @@ struct ServiceRegistry {
168175
int getPos(uint32_t typeHash, uint64_t threadId) const
169176
{
170177
auto threadHashId = (typeHash ^ threadId) & MAX_SERVICES_MASK;
171-
std::atomic_thread_fence(std::memory_order_acquire);
172178
for (uint8_t i = 0; i < MAX_DISTANCE; ++i) {
173-
if (mServicesKey[i + threadHashId] == typeHash) {
179+
if (mServicesKey[i + threadHashId].load() == typeHash) {
174180
return i + threadHashId;
175181
}
176182
}
@@ -192,7 +198,14 @@ struct ServiceRegistry {
192198
// find it with getPos, but the value can still
193199
// be nullptr.
194200
auto pos = getPos(typeHash, threadId);
201+
if (pos != -1 && mServicesMeta[pos].kind == ServiceKind::Stream && mServicesMeta[pos].threadId != threadId) {
202+
throwError(runtime_error_f("Inconsistent registry for thread %d. Expected %d", threadId, mServicesMeta[pos].threadId));
203+
O2_BUILTIN_UNREACHABLE();
204+
}
205+
195206
if (pos != -1) {
207+
mServicesKey[pos].load();
208+
std::atomic_thread_fence(std::memory_order_acquire);
196209
void* ptr = mServicesValue[pos];
197210
if (ptr) {
198211
return ptr;
@@ -204,9 +217,17 @@ struct ServiceRegistry {
204217
if (threadId != 0) {
205218
int pos = getPos(typeHash, 0);
206219
if (pos != -1 && kind != ServiceKind::Stream) {
220+
mServicesKey[pos].load();
221+
std::atomic_thread_fence(std::memory_order_acquire);
207222
registerService(typeHash, mServicesValue[pos], kind, threadId, name);
208223
}
209-
return mServicesValue[pos];
224+
if (pos != -1) {
225+
mServicesKey[pos].load();
226+
std::atomic_thread_fence(std::memory_order_acquire);
227+
return mServicesValue[pos];
228+
} else {
229+
throwError(runtime_error_f("Unable to find requested service %s", name));
230+
}
210231
}
211232
// If we are here it means we never registered a
212233
// service for the 0 thread (i.e. the main thread).
@@ -222,7 +243,7 @@ struct ServiceRegistry {
222243
}
223244

224245
mutable std::vector<ServiceSpec> mSpecs;
225-
mutable std::array<uint32_t, MAX_SERVICES + MAX_DISTANCE> mServicesKey;
246+
mutable std::array<std::atomic<uint32_t>, MAX_SERVICES + MAX_DISTANCE> mServicesKey;
226247
mutable std::array<void*, MAX_SERVICES + MAX_DISTANCE> mServicesValue;
227248
mutable std::array<ServiceMeta, MAX_SERVICES + MAX_DISTANCE> mServicesMeta;
228249
mutable std::array<std::atomic<bool>, MAX_SERVICES + MAX_DISTANCE> mServicesBooked;
@@ -289,8 +310,8 @@ struct ServiceRegistry {
289310
return *reinterpret_cast<T*>(ptr);
290311
}
291312
}
292-
throw runtime_error_f("Unable to find service of kind %s. Make sure you use const / non-const correctly.",
293-
typeid(T).name());
313+
throwError(runtime_error_f("Unable to find service of kind %s. Make sure you use const / non-const correctly.", typeid(T).name()));
314+
O2_BUILTIN_UNREACHABLE();
294315
}
295316
};
296317

Framework/Core/src/ServiceRegistry.cxx

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,10 @@ namespace o2::framework
1717

1818
ServiceRegistry::ServiceRegistry()
1919
{
20-
mServicesKey.fill(0L);
20+
for (size_t i = 0; i < MAX_SERVICES; ++i) {
21+
mServicesKey[i].store(0L);
22+
}
23+
2124
mServicesValue.fill(nullptr);
2225
for (size_t i = 0; i < mServicesBooked.size(); ++i) {
2326
mServicesBooked[i] = false;
@@ -165,4 +168,9 @@ void ServiceRegistry::preExitCallbacks()
165168
}
166169
}
167170

171+
void ServiceRegistry::throwError(RuntimeErrorRef const& ref) const
172+
{
173+
throw ref;
174+
}
175+
168176
} // namespace o2::framework

0 commit comments

Comments
 (0)