99static constexpr auto DEFAULT_THREAD_COUNT = 10U ;
1010static constexpr auto DEFAULT_CALL_COUNT = 2 ;
1111
12-
13- static struct {
14- bool logCall = true ; // Uses JS console.log to output when the TSFN is
15- // processing the NonBlockingCall().
16- bool logThread = false ; // Uses native std::cout to output when the thread's
17- // NonBlockingCall() request has finished.
18- } DefaultOptions; // Options from Start()
12+ static struct {
13+ bool logCall = true ; // Uses JS console.log to output when the TSFN is
14+ // processing the NonBlockingCall().
15+ bool logThread = false ; // Uses native std::cout to output when the thread's
16+ // NonBlockingCall() request has finished.
17+ } DefaultOptions; // Options from Start()
1918
2019/* *
2120 * @brief Macro used specifically to support the dual CI test / documentation
@@ -44,59 +43,26 @@ class TSFNWrap;
4443// Context of the TSFN.
4544using Context = TSFNWrap;
4645
47- using CompletionHandler = std::function<void ()>;
48-
4946// Data passed (as pointer) to [Non]BlockingCall
50- struct DataType {
51- // Promise::Deferred;
52- // CompletionHandler handler;
53- std::future<int > deferred;
54- };
47+ using DataType = std::unique_ptr<std::promise<int >>;
5548
5649// CallJs callback function
5750static void CallJs (Napi::Env env, Napi::Function /* jsCallback*/ ,
5851 Context *context, DataType *data) {
59- if (env != nullptr ) {
60- if (data != nullptr ) {
61- // data->Resolve(context->Value());
62- }
63- }
6452 if (data != nullptr ) {
65- delete data;
53+ if (env != nullptr ) {
54+ (*data)->set_value (clock ());
55+ } else {
56+ (*data)->set_exception (std::make_exception_ptr (
57+ std::runtime_error (" TSFN has been finalized." )));
58+ }
6659 }
60+ // We do NOT delete data as it is a unique_ptr held by the calling thread.
6761}
6862
6963// Full type of the ThreadSafeFunctionEx
7064using TSFN = ThreadSafeFunctionEx<Context, DataType, CallJs>;
7165
72- struct FinalizerDataType {
73- std::vector<std::thread> threads;
74- std::unique_ptr<Promise::Deferred> deferred;
75- // struct {
76- // bool logCall = true; // Uses JS console.log to output when the TSFN is
77- // // processing the NonBlockingCall().
78- // bool logThread = false; // Uses native std::cout to output when the thread's
79- // // NonBlockingCall() request has finished.
80- // } options; // Options from Start()
81- };
82-
83- static void threadEntry (size_t threadId, TSFN tsfn, uint32_t callCount,
84- bool logThread) {
85- using namespace std ::chrono_literals;
86- for (auto i = 0U ; i < callCount; ++i) {
87- // auto callData = new DataType();
88- // tsfn.NonBlockingCall(callData);
89- // auto result = callData->deferred.get();
90- // if (logThread) {
91- // std::cout << "Thread " << threadId << " got result " << result << "\n";
92- // }
93-
94- // std::this_thread::sleep_for(50ms * threadId);
95- }
96- std::cout << " Thread " << threadId << " finished\n " ;
97- tsfn.Release ();
98- }
99-
10066using base = tsfnutil::TSFNWrapBase<TSFNWrap, Context, TSFN>;
10167
10268// A JS-accessible wrap that holds the TSFN.
@@ -119,20 +85,48 @@ class TSFNWrap : public base {
11985 }
12086 }
12187 ~TSFNWrap () {
122- for (auto & thread : finalizerData->threads ) {
88+ for (auto & thread : finalizerData->threads ) {
12389 if (thread.joinable ()) {
12490 thread.join ();
12591 }
12692 }
12793 }
12894
95+ struct FinalizerDataType {
96+ std::vector<std::thread> threads;
97+ std::unique_ptr<Promise::Deferred> deferred;
98+ };
99+
100+ // The finalizer data is shared, because we want to join the threads if our
101+ // TSFNWrap object gets garbage-collected and there are still active threads.
102+ using SharedFinalizerDataType = std::shared_ptr<FinalizerDataType>;
103+
104+ static void threadEntry (size_t threadId, TSFN tsfn, uint32_t callCount,
105+ bool logThread) {
106+ using namespace std ::chrono_literals;
107+ for (auto i = 0U ; i < callCount; ++i) {
108+ auto promise = std::make_unique<std::promise<int >>();
109+ tsfn.NonBlockingCall (&promise);
110+ auto future = promise->get_future ();
111+ auto result = future.get ();
112+ if (logThread) {
113+ std::cout << " Thread " << threadId << " got result " << result << " \n " ;
114+ }
115+ }
116+ if (logThread) {
117+ std::cout << " Thread " << threadId << " finished\n " ;
118+ }
119+ tsfn.Release ();
120+ }
121+
129122 static std::array<ClassPropertyDescriptor<TSFNWrap>, 3 > InstanceMethods () {
130123 return {InstanceMethod (" call" , &TSFNWrap::Call),
131124 InstanceMethod (" start" , &TSFNWrap::Start),
132125 InstanceMethod (" release" , &TSFNWrap::Release)};
133126 }
134127
135128 bool cppExceptions = false ;
129+ bool logThread;
136130 std::shared_ptr<FinalizerDataType> finalizerData;
137131
138132 Napi::Value Start (const CallbackInfo &info) {
@@ -149,15 +143,9 @@ class TSFNWrap : public base {
149143 // The JS-provided callback to execute for each call (if provided)
150144 Function callback;
151145
152- // std::unique_ptr<FinalizerDataType> finalizerData =
153- // std::make_unique<FinalizerDataType>();
154-
155- // finalizerData = std::shared_ptr<FinalizerDataType>(FinalizerDataType{ std::vector<std::thread>() , Promise::Deferred::New(env) });
156-
157146 finalizerData = std::make_shared<FinalizerDataType>();
158147
159-
160- bool logThread = DefaultOptions.logThread ;
148+ logThread = DefaultOptions.logThread ;
161149 bool logCall = DefaultOptions.logCall ;
162150
163151 if (info.Length () > 0 && info[0 ].IsObject ()) {
@@ -220,7 +208,6 @@ class TSFNWrap : public base {
220208 }
221209 }
222210
223-
224211 // Apply default arguments
225212 if (callCounts.size () == 0 ) {
226213 for (auto i = 0U ; i < DEFAULT_THREAD_COUNT; ++i) {
@@ -230,50 +217,48 @@ class TSFNWrap : public base {
230217
231218 const auto threadCount = callCounts.size ();
232219
233- auto *finalizerDataPtr = new std::shared_ptr<FinalizerDataType> (finalizerData);
220+ auto *finalizerDataPtr = new SharedFinalizerDataType (finalizerData);
234221
235222 _tsfn = TSFN::New (
236223 env, // napi_env env,
237224 TSFN::DefaultFunctionFactory (env), // const Function& callback,
238225 Value (), // const Object& resource,
239226 " Test" , // ResourceString resourceName,
240227 0 , // size_t maxQueueSize,
241- threadCount + 1 , // size_t initialThreadCount, +1 for Node thread
242- this , // Context* context,
243- Finalizer, // Finalizer finalizer
228+ threadCount + 1 , // size_t initialThreadCount, +1 for Node thread
229+ this , // Context* context,
230+ Finalizer, // Finalizer finalizer
244231 finalizerDataPtr // FinalizerDataType* data
245232 );
246233
247234 for (auto threadId = 0U ; threadId < threadCount; ++threadId) {
248- finalizerData->threads .push_back (
249- std::thread (threadEntry, threadId, _tsfn, callCounts[threadId],
250- logThread));
235+ finalizerData->threads .push_back (std::thread (
236+ threadEntry, threadId, _tsfn, callCounts[threadId], logThread));
251237 }
252238
253-
254239 return String::New (env, " started" );
255240 };
256241
257- // TSFN finalizer. Resolves the Promise returned by `Release()` above.
258- static void Finalizer (Napi::Env env, std::shared_ptr<FinalizerDataType> *finalizeData,
242+ // TSFN finalizer. Joins the threads and resolves the Promise returned by
243+ // `Release()` above.
244+ static void Finalizer (Napi::Env env, SharedFinalizerDataType *finalizeData,
259245 Context *ctx) {
260- // for (auto thread : finalizeData->threads) {
261246
247+ if (ctx->logThread ) {
248+ std::cout << " Finalizer joining threads\n " ;
249+ }
262250 for (auto &thread : (*finalizeData)->threads ) {
263- std::cout << " Finalizer joining thread\n " ;
264251 if (thread.joinable ()) {
265252 thread.join ();
266253 }
267254 }
255+ ctx->clearTSFN ();
256+ if (ctx->logThread ) {
257+ std::cout << " Finished\n " ;
258+ }
268259
260+ (*finalizeData)->deferred ->Resolve (Boolean::New (env, true ));
269261 delete finalizeData;
270-
271- // }
272- // if (deferred->get()) {
273- // (*deferred)->Resolve(Boolean::New(env, true));
274- // deferred->release();
275-
276- // }
277262 }
278263
279264 Napi::Value Release (const CallbackInfo &info) {
@@ -282,21 +267,24 @@ class TSFNWrap : public base {
282267 }
283268 // return finalizerData->deferred.Promise();
284269 auto env = info.Env ();
285- finalizerData->deferred .reset (new Promise::Deferred (Promise::Deferred::New (env)));
270+ finalizerData->deferred .reset (
271+ new Promise::Deferred (Promise::Deferred::New (env)));
286272 _tsfn.Release ();
287273 return finalizerData->deferred ->Promise ();
288274 };
289275
290276 Napi::Value Call (const CallbackInfo &info) {
291277 // auto *callData = new DataType(info.Env());
292- // _tsfn.NonBlockingCall(callData);
293- // return callData->Promise();
278+ // _tsfn.NonBlockingCall(callData); return callData->Promise();
294279 return info.Env ().Undefined ();
295280 };
296281
297282 Napi::Value GetContext (const CallbackInfo &) {
298283 return _tsfn.GetContext ()->Value ();
299284 };
285+
286+ // This does not run on the node thread.
287+ void clearTSFN () { _tsfn = TSFN (); }
300288};
301289} // namespace example
302290
0 commit comments