22#include " ittnotify.h"
33#include " ../ittnotify/ittnotify.h"
44#include " logger.h"
5+ #include " config.h"
56using namespace std ::chrono_literals;
67
78#include < iomanip> // put_time
@@ -165,11 +166,22 @@ std::future<bool> Threading::pushTask(mariadb::connection_ref con, std::function
165166 logMessageWithTime (" Threading::pushTask" );
166167 __itt_task_begin (domain, __itt_null, __itt_null, threading_pushTask);
167168 auto found = workers.find (con->account ());
168- if (found != workers.end () && !found->second ->exiting ) {
169- // Already have a worker on that account.
170- auto ret = found->second ->pushTask (std::move (task), intoWorkList);
171- __itt_task_end (domain);
172- return ret;
169+ if (found != workers.end ()) {
170+
171+ if (found->second .size () == Config::get ().getWorkerCount ()) {
172+ const auto it = std::min_element (found->second .begin (), found->second .end (), [](const std::shared_ptr<Worker>& l, const std::shared_ptr<Worker>& r) {
173+ if (l->exiting ) return false ;
174+ if (r->exiting ) return true ;
175+ return l->tasks .size () < r->tasks .size ();
176+ });
177+
178+ if (!(*it)->exiting ) {
179+ // Already have a worker on that account.
180+ auto ret = (*it)->pushTask (std::move (task), intoWorkList);
181+ __itt_task_end (domain);
182+ return ret;
183+ }
184+ }
173185 }
174186
175187
@@ -179,7 +191,7 @@ std::future<bool> Threading::pushTask(mariadb::connection_ref con, std::function
179191 newWorker->parentConnection = con;
180192 newWorker->workerConnection = mariadb::connection::create (con->account ());
181193 newWorker->myThread = std::make_shared<std::thread>([newWorker]() {newWorker->run (); });
182- workers[con->account ()] = newWorker;
194+ workers[con->account ()]. emplace_back ( newWorker) ;
183195 __itt_task_end (domain);
184196
185197 auto ret = newWorker->pushTask (std::move (task), intoWorkList);
@@ -195,33 +207,38 @@ void Threading::doCleanup() {
195207 __itt_task_begin (domain, __itt_null, __itt_null, threading_doCleanup);
196208
197209 // Will only be called from mainthread so noone can insert stuff now.
198- for (auto & [acc,worker] : workers) {
199- __itt_sync_prepare (&worker->taskLock );
200- std::unique_lock l (worker->taskLock );
201- __itt_sync_acquired (&worker->taskLock );
202- if (!worker->tasks .empty ()) continue ; // Is still working on tasks
203- logMessageWithTime (" worker cleanup check" );
204- auto lastTask = std::chrono::system_clock::from_time_t (worker->lastJob );
205- LOG_Thread (" Worker::cleanupCheck W" +std::to_string (reinterpret_cast <uintptr_t >(worker.get ())) + " LastTask " +
206- std::to_string (std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now () - lastTask).count ())+" s"
207- );
208- if (lastTask + 60s < std::chrono::system_clock::now ()) { // no tasks for 60s, kill worker
209- LOG_Thread (" Worker::doCleanup W" +std::to_string (reinterpret_cast <uintptr_t >(worker.get ())));
210- // logMessageWithTime("worker do cleanup");
211- __itt_task_begin (domain, __itt_null, __itt_null, threading_doCleanup_workerDoCleanup);
212- worker->parentConnection .reset (); // no parent will mean it exits next iteration
213- __itt_sync_releasing (&worker->taskLock );
214- l.unlock ();
215- worker->hasTasksCondition .notify_all (); // force iteration
216- if (worker->myThread ->joinable ()) worker->myThread ->join (); // wait for thread to exit
217- workers.erase (acc);
218- __itt_task_end (domain);
219- LOG_Thread (" Worker::cleanupDone W" +std::to_string (reinterpret_cast <uintptr_t >(worker.get ())));
220-
221- __itt_task_end (domain);
222- return ;
210+ for (auto & [acc,workerlist] : workers) {
211+
212+ for (auto & worker : workerlist) {
213+ __itt_sync_prepare (&worker->taskLock );
214+ std::unique_lock l (worker->taskLock );
215+ __itt_sync_acquired (&worker->taskLock );
216+ if (!worker->tasks .empty ()) continue ; // Is still working on tasks
217+ logMessageWithTime (" worker cleanup check" );
218+ auto lastTask = std::chrono::system_clock::from_time_t (worker->lastJob );
219+ LOG_Thread (" Worker::cleanupCheck W" +std::to_string (reinterpret_cast <uintptr_t >(worker.get ())) + " LastTask " +
220+ std::to_string (std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now () - lastTask).count ())+" s"
221+ );
222+ if (lastTask + 60s < std::chrono::system_clock::now ()) { // no tasks for 60s, kill worker
223+ LOG_Thread (" Worker::doCleanup W" +std::to_string (reinterpret_cast <uintptr_t >(worker.get ())));
224+ // logMessageWithTime("worker do cleanup");
225+ __itt_task_begin (domain, __itt_null, __itt_null, threading_doCleanup_workerDoCleanup);
226+ worker->parentConnection .reset (); // no parent will mean it exits next iteration
227+ __itt_sync_releasing (&worker->taskLock );
228+ l.unlock ();
229+ worker->hasTasksCondition .notify_all (); // force iteration
230+ if (worker->myThread ->joinable ()) worker->myThread ->join (); // wait for thread to exit
231+ workerlist.erase (std::remove (workerlist.begin (), workerlist.end (), worker), workerlist.end ());
232+ // workers.erase(acc);
233+ __itt_task_end (domain);
234+ LOG_Thread (" Worker::cleanupDone W" +std::to_string (reinterpret_cast <uintptr_t >(worker.get ())));
235+
236+ __itt_task_end (domain);
237+ return ;
238+ }
223239 }
224-
240+ if (workerlist.empty ())
241+ workers.erase (acc);
225242 }
226243 lastCleanup = std::chrono::system_clock::now ();
227244 __itt_task_end (domain);
@@ -231,7 +248,12 @@ bool Threading::isConnected(mariadb::account_ref acc) {
231248 auto found = workers.find (acc);
232249 if (found == workers.end ()) return false ;
233250 // #TODO potentially not threadsafe
234- return found->second ->workerConnection ->connected ();
251+
252+ for (auto & it : found->second ) {
253+ if (it->workerConnection ->connected ()) return true ;
254+ }
255+
256+ return false ;
235257}
236258
237259void Threading::pushAsyncWork (ref<GameDataDBAsyncResult> work) {
0 commit comments