Skip to content

Commit 2bfba6e

Browse files
committed
Better threading
1 parent 4fc61f3 commit 2bfba6e

6 files changed

Lines changed: 284 additions & 49 deletions

File tree

src/connection.cpp

Lines changed: 46 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,9 @@
44
#include "res.h"
55
#include "mariadb++/exceptions.hpp"
66
#include <winsock2.h>
7+
#include "threading.h"
78

89
using namespace intercept::client;
9-
extern auto_array<ref<GameDataDBAsyncResult>> asyncWork;
1010

1111
class GameDataDBConnection : public game_data {
1212

@@ -109,11 +109,11 @@ class callstack_item_WaitForQueryResult : public vm_context::callstack_item {
109109

110110

111111
game_instruction* next(int& d1, const game_state* s) override {
112-
if (res->data->res.wait_for(std::chrono::nanoseconds(0)) == std::future_status::ready) {
112+
if (res->data->fut.wait_for(std::chrono::nanoseconds(0)) == std::future_status::ready) {
113113

114114
//push result onto stack.
115115
auto gd_res = new GameDataDBResult();
116-
gd_res->res = res->data->res.get();
116+
gd_res->res = res->data->res;
117117
s->current_context->scriptStack[_stackEndAtStart] = game_value(gd_res);
118118
d1 = 2; //done
119119
} else {
@@ -143,21 +143,21 @@ game_value Connection::cmd_execute(uintptr_t g , game_value_parameter con, game_
143143
auto session = con.get_as<GameDataDBConnection>()->session;
144144
auto query = qu.get_as<GameDataDBQuery>();
145145

146-
auto statement = session->create_statement(query->getQueryString());
146+
if (!gs->current_context->scheduled) {
147147

148-
uint32_t idx = 0;
149-
for (auto& it : query->boundValues) {
150-
151-
switch (it.type_enum()) {
148+
auto statement = session->create_statement(query->getQueryString());
149+
150+
uint32_t idx = 0;
151+
for (auto& it : query->boundValues) {
152+
153+
switch (it.type_enum()) {
152154
case game_data_type::SCALAR: statement->set_float(idx++, static_cast<float>(it)); break;
153155
case game_data_type::BOOL: statement->set_boolean(idx++, static_cast<bool>(it)); break;
154156
case game_data_type::STRING: statement->set_string(idx++, static_cast<r_string>(it)); break;
155-
default: ;
157+
default:;
158+
}
156159
}
157-
}
158-
159160

160-
if (!gs->current_context->scheduled) {
161161
auto res = statement->query();
162162

163163
auto gd_res = new GameDataDBResult();
@@ -171,12 +171,34 @@ game_value Connection::cmd_execute(uintptr_t g , game_value_parameter con, game_
171171

172172
auto gd_res = new GameDataDBAsyncResult();
173173
gd_res->data = std::make_shared<GameDataDBAsyncResult::dataT>();
174-
gd_res->data->res =
175-
std::async([statement]() -> mariadb::result_set_ref {
176-
return statement->query();
177-
});
178174

179175

176+
177+
gd_res->data->fut = Threading::get().pushTask(session,
178+
[stmt = query->getQueryString(), boundV = query->boundValues, result = gd_res->data](mariadb::connection_ref con) -> bool
179+
{
180+
try {
181+
auto statement = con->create_statement(stmt);
182+
uint32_t idx = 0;
183+
for (auto& it : boundV) {
184+
185+
switch (it.type_enum()) {
186+
case game_data_type::SCALAR: statement->set_float(idx++, static_cast<float>(it)); break;
187+
case game_data_type::BOOL: statement->set_boolean(idx++, static_cast<bool>(it)); break;
188+
case game_data_type::STRING: statement->set_string(idx++, static_cast<r_string>(it)); break;
189+
}
190+
}
191+
result->res = statement->query();
192+
return true;
193+
}
194+
catch (mariadb::exception::connection& x) {
195+
__debugbreak();
196+
197+
198+
return false;
199+
}
200+
});
201+
180202
auto newItem = new callstack_item_WaitForQueryResult(gd_res);
181203
newItem->_parent = cs.back();
182204
newItem->_stackEndAtStart = gs->current_context->scriptStack.size()-2;
@@ -193,12 +215,11 @@ game_value Connection::cmd_executeAsync(uintptr_t, game_value_parameter con, gam
193215

194216
auto gd_res = new GameDataDBAsyncResult();
195217
gd_res->data = std::make_shared<GameDataDBAsyncResult::dataT>();
196-
gd_res->data->res =
197-
std::async(std::launch::async,[session, stmt = query->getQueryString(), boundV = query->boundValues]() -> mariadb::result_set_ref
218+
gd_res->data->fut = Threading::get().pushTask(session,
219+
[stmt = query->getQueryString(), boundV = query->boundValues, result = gd_res->data](mariadb::connection_ref con) -> bool
198220
{
199-
200221
try {
201-
auto statement = session->create_statement(stmt);
222+
auto statement = con->create_statement(stmt);
202223
uint32_t idx = 0;
203224
for (auto& it : boundV) {
204225

@@ -208,18 +229,16 @@ game_value Connection::cmd_executeAsync(uintptr_t, game_value_parameter con, gam
208229
case game_data_type::STRING: statement->set_string(idx++, static_cast<r_string>(it)); break;
209230
}
210231
}
211-
return statement->query();
232+
result->res = statement->query();
233+
return true;
212234
} catch (mariadb::exception::connection& x) {
213235
__debugbreak();
214236

215237

216-
return {};
238+
return false;
217239
}
218-
219-
220-
221-
});
222-
asyncWork.emplace_back(gd_res);
240+
}, true);
241+
Threading::get().pushAsyncWork(gd_res);
223242
return gd_res;
224243
}
225244

src/main.cpp

Lines changed: 10 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,12 @@
44
#include "query.h"
55
#include "connection.h"
66
#include "yaml-cpp/exceptions.h"
7+
#include "threading.h"
78

89
int intercept::api_version() { //This is required for the plugin to work.
910
return 1;
1011
}
11-
auto_array<ref<GameDataDBAsyncResult>> asyncWork;
12+
1213
void intercept::register_interfaces() {
1314

1415
}
@@ -39,26 +40,20 @@ void intercept::pre_init() {
3940

4041
intercept::sqf::system_chat("Intercept database has been loaded");
4142
}
42-
43+
void logMessageWithTime(std::string msg);
4344
void intercept::on_frame() {
45+
if (!Threading::get().hasCompletedAsyncWork) return;
4446

45-
std::vector<ref<GameDataDBAsyncResult>> done;
46-
47-
48-
auto p = std::stable_partition(asyncWork.begin(), asyncWork.end(),
49-
[&](const auto& x) { return x->data->res.wait_for(std::chrono::nanoseconds(0)) == std::future_status::ready; });
50-
// range insert with move
51-
done.insert(done.end(), std::make_move_iterator(p),
52-
std::make_move_iterator(asyncWork.end()));
53-
// erase the moved-from elements.
54-
asyncWork.erase(p, asyncWork.end());
55-
56-
for (auto& it : done) {
47+
std::unique_lock l(Threading::get().asyncWorkMutex);
48+
for (auto& it : Threading::get().completedAsyncTasks) {
49+
logMessageWithTime("task callback");
5750
auto gd_res = new GameDataDBResult();
58-
gd_res->res = it->data->res.get();
51+
gd_res->res = it->data->res;
5952

6053
sqf::call(it->data->callback, { gd_res, it->data->callbackArgs });
6154
}
55+
Threading::get().hasCompletedAsyncWork = false;
56+
Threading::get().completedAsyncTasks.clear();
6257
}
6358

6459

src/res.cpp

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
#include "res.h"
22
#include <mariadb++/result_set.hpp>
3+
#include "threading.h"
34

45
using namespace intercept::client;
5-
extern auto_array<ref<GameDataDBAsyncResult>> asyncWork;
66

77
game_data* createGameDataDBResult(param_archive* ar) {
88
auto x = new GameDataDBResult();
@@ -78,17 +78,23 @@ game_value Result::cmd_bindCallback(uintptr_t, game_value_parameter left, game_v
7878
game_value Result::cmd_waitForResult(uintptr_t, game_value_parameter right) {
7979
auto& res = right.get_as<GameDataDBAsyncResult>();
8080

81-
res->data->res.wait();
81+
res->data->fut.wait();
8282

83-
for (int i = 0; i < asyncWork.size(); ++i) {
84-
if (asyncWork[i] == res) {
85-
asyncWork.erase(i);
83+
std::unique_lock l(Threading::get().asyncWorkMutex);
84+
auto& tasks = Threading::get().completedAsyncTasks;
85+
for (int i = 0; i < tasks.size(); ++i) {
86+
if (tasks[i] == res) {
87+
tasks.erase(tasks.begin()+i);
8688
break;
8789
}
8890
}
91+
if (tasks.empty())
92+
Threading::get().hasCompletedAsyncWork = false;
93+
l.unlock();
94+
8995

9096
auto gd_res = new GameDataDBResult();
91-
gd_res->res = res->data->res.get();
97+
gd_res->res = res->data->res;
9298
sqf::call(res->data->callback, { gd_res, res->data->callbackArgs });
9399
return gd_res;
94100
}

src/res.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,8 @@ class GameDataDBAsyncResult : public game_data {
109109
return serialization_return::no_error;
110110
}
111111
struct dataT {
112-
std::future<mariadb::result_set_ref> res;
112+
std::future<bool> fut;
113+
mariadb::result_set_ref res;
113114
game_value callback;
114115
game_value callbackArgs;
115116
};

src/threading.cpp

Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
#include "threading.h"
2+
using namespace std::chrono_literals;
3+
4+
#include <iomanip> // put_time
5+
#include <windows.h>
6+
void logMessageWithTime(std::string msg) {
7+
auto now = std::chrono::system_clock::now();
8+
auto in_time_t = std::chrono::system_clock::to_time_t(now);
9+
10+
std::chrono::milliseconds ms = std::chrono::duration_cast<std::chrono::milliseconds>(now.time_since_epoch());
11+
std::size_t fractional_seconds = ms.count() % 1000;
12+
13+
14+
15+
std::stringstream ss;
16+
ss << msg << " " << std::put_time(std::localtime(&in_time_t), "%H:%M:%S.") << fractional_seconds << "\n";
17+
18+
OutputDebugStringA(ss.str().c_str());
19+
}
20+
21+
22+
23+
void Worker::run() {
24+
while (true) {
25+
std::unique_lock l(taskLock);
26+
auto parentCon = parentConnection.lock();
27+
28+
if (!parentCon) {//If parent got deleted (went out of scope in SQF land) we want to exit too
29+
exiting = true; //#TODO going out of scope doesn't work
30+
while (!tasks.empty()) {
31+
std::shared_ptr<Task> task = tasks.front();
32+
tasks.pop();
33+
task->prom.set_value(false);
34+
}
35+
return;
36+
}
37+
38+
if (tasks.empty()) {
39+
hasTasksCondition.wait(l);
40+
}
41+
42+
if (!tasks.empty()) {
43+
std::shared_ptr<Task> task = tasks.front();
44+
tasks.pop();
45+
lastJob = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
46+
l.unlock();
47+
logMessageWithTime("do task");
48+
task->prom.set_value(task->job(workerConnection));
49+
50+
if (task->isInWorkList) {
51+
Threading::get().updateAsyncWorkLists();
52+
}
53+
54+
55+
logMessageWithTime("task done");
56+
}
57+
}
58+
59+
}
60+
61+
std::future<bool> Worker::pushTask(std::function<bool(mariadb::connection_ref)> task, bool intoWorkList) {
62+
logMessageWithTime("Worker::pushTask");
63+
auto newTask = std::make_shared<Task>();
64+
newTask->worker = shared_from_this();
65+
newTask->job = std::move(task);
66+
newTask->isInWorkList = intoWorkList;
67+
auto fut = newTask->prom.get_future();
68+
69+
70+
std::unique_lock l(taskLock);
71+
if (exiting) {
72+
newTask->prom.set_value(false);
73+
return fut;
74+
}
75+
tasks.emplace(newTask);
76+
l.unlock();
77+
78+
hasTasksCondition.notify_all();
79+
logMessageWithTime("Worker::pushTask ret");
80+
81+
return fut;
82+
}
83+
84+
std::future<bool> Threading::pushTask(mariadb::connection_ref con, std::function<bool(mariadb::connection_ref)> task, bool intoWorkList) {
85+
logMessageWithTime("Threading::pushTask");
86+
auto found = workers.find(con->account());
87+
if (found != workers.end()) {
88+
//Already have a worker on that account.
89+
return found->second->pushTask(std::move(task), intoWorkList);
90+
}
91+
logMessageWithTime("create worker");
92+
auto newWorker = std::make_shared<Worker>();
93+
newWorker->parentConnection = con;
94+
newWorker->workerConnection = mariadb::connection::create(con->account());
95+
newWorker->myThread = std::make_shared<std::thread>([newWorker]() {newWorker->run(); });
96+
workers[con->account()] = newWorker;
97+
98+
return newWorker->pushTask(std::move(task), intoWorkList);
99+
}
100+
101+
102+
void Threading::doCleanup() {
103+
if (lastCleanup + 30s > std::chrono::system_clock::now()) return;
104+
logMessageWithTime("Threading::doCleanup");
105+
106+
107+
//Will only be called from mainthread so noone can insert stuff now.
108+
for (auto& [acc,worker] : workers) {
109+
std::unique_lock l(worker->taskLock);
110+
if (!worker->tasks.empty()) continue; //Is still working on tasks
111+
logMessageWithTime("worker cleanup check");
112+
auto lastTask = std::chrono::system_clock::from_time_t(worker->lastJob);
113+
if (lastTask + 60s < std::chrono::system_clock::now()) { //no tasks for 60s, kill worker
114+
logMessageWithTime("worker do cleanup");
115+
worker->parentConnection.reset(); //no parent will mean it exits next iteration
116+
worker->hasTasksCondition.notify_all(); //force iteration
117+
if (worker->myThread->joinable()) worker->myThread->join(); //wait for thread to exit
118+
logMessageWithTime("worker cleaned up");
119+
}
120+
workers.erase(acc);
121+
return;
122+
}
123+
lastCleanup = std::chrono::system_clock::now();
124+
}
125+
126+
void Threading::pushAsyncWork(ref<GameDataDBAsyncResult> work) {
127+
std::unique_lock l(asyncWorkMutex);
128+
asyncWork.emplace_back(work);
129+
130+
}
131+
132+
void Threading::updateAsyncWorkLists() {
133+
std::unique_lock l(asyncWorkMutex);
134+
135+
136+
137+
auto p = std::stable_partition(asyncWork.begin(), asyncWork.end(),
138+
[&](const auto& x) {
139+
auto res = x->data->fut.wait_for(std::chrono::nanoseconds(0));
140+
141+
return res != std::future_status::ready; });
142+
// range insert with move
143+
completedAsyncTasks.insert(completedAsyncTasks.end(), std::make_move_iterator(p),
144+
std::make_move_iterator(asyncWork.end()));
145+
// erase the moved-from elements.
146+
asyncWork.erase(p, asyncWork.end());
147+
//#TODO we can just delete completed tasks that don't have callback or callbackargs
148+
//better do it here than in mainthread
149+
hasCompletedAsyncWork = !completedAsyncTasks.empty();
150+
151+
}

0 commit comments

Comments
 (0)