@@ -255,7 +255,7 @@ DataRelayer::RelayChoice
255255 assert (numInputTypes * slot.index < cache.size ());
256256 for (size_t ai = slot.index * numInputTypes, ae = ai + numInputTypes; ai != ae; ++ai) {
257257 cache[ai].clear ();
258- cachedStateMetrics[ai] = 0 ;
258+ cachedStateMetrics[ai] = CacheEntryStatus::EMPTY ;
259259 }
260260 };
261261
@@ -268,7 +268,7 @@ DataRelayer::RelayChoice
268268 &metrics](TimesliceId timeslice, int input, TimesliceSlot slot) {
269269 auto cacheIdx = numInputTypes * slot.index + input;
270270 std::vector<PartRef>& parts = cache[cacheIdx].parts ;
271- cachedStateMetrics[cacheIdx] = 1 ;
271+ cachedStateMetrics[cacheIdx] = CacheEntryStatus::PENDING ;
272272 // TODO: make sure that multiple parts can only be added within the same call of
273273 // DataRelayer::relay
274274 PartRef entry{std::move (header), std::move (payload)};
@@ -488,6 +488,25 @@ void DataRelayer::getReadyToProcess(std::vector<DataRelayer::RecordAction>& comp
488488 }
489489}
490490
491+ void DataRelayer::updateCacheStatus (TimesliceSlot slot, CacheEntryStatus oldStatus, CacheEntryStatus newStatus)
492+ {
493+ std::scoped_lock<LockableBase (std::recursive_mutex)> lock (mMutex );
494+ const auto numInputTypes = mDistinctRoutesIndex .size ();
495+ auto & index = mTimesliceIndex ;
496+
497+ auto markInputDone = [&cachedStateMetrics = mCachedStateMetrics ,
498+ &index, &numInputTypes](TimesliceSlot s, size_t arg, CacheEntryStatus oldStatus, CacheEntryStatus newStatus) {
499+ auto cacheId = s.index * numInputTypes + arg;
500+ if (cachedStateMetrics[cacheId] == oldStatus) {
501+ cachedStateMetrics[cacheId] = newStatus;
502+ }
503+ };
504+
505+ for (size_t ai = 0 , ae = numInputTypes; ai != ae; ++ai) {
506+ markInputDone (slot, ai, oldStatus, newStatus);
507+ }
508+ }
509+
491510std::vector<o2::framework::MessageSet> DataRelayer::getInputsForTimeslice (TimesliceSlot slot)
492511{
493512 std::scoped_lock<LockableBase (std::recursive_mutex)> lock (mMutex );
@@ -512,7 +531,7 @@ std::vector<o2::framework::MessageSet> DataRelayer::getInputsForTimeslice(Timesl
512531 &cachedStateMetrics = mCachedStateMetrics ,
513532 &cache, &index, &numInputTypes, &metrics](TimesliceSlot s, size_t arg) {
514533 auto cacheId = s.index * numInputTypes + arg;
515- cachedStateMetrics[cacheId] = 2 ;
534+ cachedStateMetrics[cacheId] = CacheEntryStatus::RUNNING ;
516535 // TODO: in the original implementation of the cache, there have been only two messages per entry,
517536 // check if the 2 above corresponds to the number of messages.
518537 if (cache[cacheId].size () > 0 ) {
@@ -634,7 +653,12 @@ void DataRelayer::sendContextState()
634653 mMetrics , sVariablesMetricsNames );
635654 }
636655 for (size_t si = 0 ; si < mCachedStateMetrics .size (); ++si) {
637- mMetrics .send ({mCachedStateMetrics [si], sMetricsNames [si]});
656+ mMetrics .send ({static_cast <int >(mCachedStateMetrics [si]), sMetricsNames [si]});
657+ // Anything which is done is actually already empty,
658+ // so after we report it we mark it as such.
659+ if (mCachedStateMetrics [si] == CacheEntryStatus::DONE) {
660+ mCachedStateMetrics [si] = CacheEntryStatus::EMPTY;
661+ }
638662 }
639663}
640664
0 commit comments