AsyncWorker 不支持在子线程中多次回调 JS 传过来的回调函数,所以,嗯,撸起袖子就是干。
为什么要重写
在使用 node-addon-api 开发原生模块时,涉及到异步回调 JS 函数的操作一般都是用 Napi::AsyncWorker
类去做,但是 Napi::AsyncWorker
并不支持在子线程中多次执行 JS 传过来的回调函数,在某些场景下(比如 JS 要知道异步操作的进度并更新进度条)就有了一定的限制。
为了能够满足这些场景下的需求,就必须要在异步耗时的操作中多次执行 JS 回调。node-addon-api 官方提供了 Napi::ThreadSafeFunction
这个类来满足在多个线程中执行 JS 回调的需求,但是它仅仅是 napi_threadsafe_function
的 C++ Wrapper,没有与 Napi::AsyncWorker
配合得很好,所以还是决定参考 Napi::AsyncWorker
的实现另外写一个 ThreadSafeAsyncWorker
。
类声明
注释很详细。
1 | // ThreadSafeAsyncWorker.h |
关键实现
构造函数
两个构造函数,带进度回调和不带进度回调。
1 | // 不带进度回调的构造函数,只创建 napi_async_work |
子线程执行与完成
需要注意执行是跑在子线程里的,完成是跑在主线程里的。
1 | inline void ThreadSafeAsyncWorker::OnExecute(napi_env /*DO_NOT_USE*/, void* this_pointer) { |
析构函数
析构函数释放底层的 NAPI 对象。
1 | inline ThreadSafeAsyncWorker::~ThreadSafeAsyncWorker() { |
自定义线程安全数据处理
这个函数负责调用 OnProgress,调用者重写 OnProgress,在 OnProgress 中处理数据并多次回调 JS。需要注意如果没有进度回调函数这个函数什么也不做,如果 data 指向的内存是动态分配的,会无法释放造成内存泄漏。
1 | inline void ThreadSafeAsyncWorker::CallJS(napi_env env, napi_value jsCallback, void* this_pointer, void* data) { |
Worker 的入队与取消
这没啥,就是调用 NAPI。
1 | inline void ThreadSafeAsyncWorker::Queue() { |
默认的事件处理
这几个函数可以由调用者重写,用于相关的业务逻辑。
1 | inline void ThreadSafeAsyncWorker::OnProgress(void* data) { |
子线程中可用的函数
SetError 用来指定错误信息,如果错误信息不为空字符串,子线程结束后会触发 OnError。
EmitProgress 用来触发 CallJS,CallJS 里会调用 OnProgress,同上使用时需要注意 data 指针内存泄漏。
1 | inline void ThreadSafeAsyncWorker::SetError(const std::string& error) { |
禁止实例自杀
new 了指针以后默认不需要调用者 delete,可以用 SuppressDestruct 关掉这个特性。
1 | inline void ThreadSafeAsyncWorker::SuppressDestruct() { |
用法
写一个类继承
ThreadSafeAsyncWorker
,实现 Execute,重写 OnProgress,OnOK,OnError1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65// MyAsyncWorker.h
using namespace Napi;
class MyAsyncWorker : public ThreadSafeAsyncWorker {
public:
MyAsyncWorker(const std::string&, Napi::Function&, Napi::Function&);
~MyAsyncWorker();
void Execute();
void OnProgress(void* data);
void OnOK();
void OnError(const Napi::Error&);
private:
// 保存一些数据
size_t _length;
};
MyAsyncWorker::MyAsyncWorker(
size_t l,
Napi::Function& callback,
Napi::Function& progressCallback): ThreadSafeAsyncWorker(callback, progressCallback), _length(l) {/* 构造 */}
MyAsyncWorker::~MyAsyncWorker() {/* 析构 */}
void MyAsyncWorker::Execute() {
// 异步耗时操作,这里是子线程
// ...
for (size_t i = 0; i < _length; i++) {
// ...
double* progress = new double(number);
EmitProgress((void*)progress);
// ...
}
// 有错
// SetError("有错");
// ...
}
void MyAsyncWorker::OnProgress(void* data) {
// EmitProgress() 后会来这里(不是同步的)
Napi::Env env = Env();
HandleScope scope(env);
double* value = (double*)data;
Object res = Object::New(env);
res["percentage"] = Number::New(env, value);
// 相当于 JS 的 progressCallback({ percentage: value })
ProgressCallback().Call({ res });
delete value; // 用完了记得释放
}
void MyAsyncWorker::OnOK() {
HandleScope scope(Env());
// 相当于 JS 的 callback(null)
Callback().Call({ Env().Null() });
}
void MyAsyncWorker::OnError(const Napi::Error& err) {
HandleScope scope(Env());
// 相当于 JS 的 callback(err)
Callback().Call({ err.Value() });
}new 出实例来调用 Queue
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 相当于
// function threadSafeAsyncCall (length, onComplete, onProgress) {
//
// }
static Value threadSafeAsyncCall(const CallbackInfo& info) {
// ...
MyAsyncWorker *w = new MyAsyncWorker(info[0].As<Number>().Uint32Value(), info[1].As<Function>(), info[2].As<Function>());
w->Queue();
// ...
}
static Object _index(Env env, Object exports) {
// module.exports = threadSafeAsyncCall
return Function::New(env, threadSafeAsyncCall, "threadSafeAsyncCall");
}
NODE_API_MODULE(NODE_GYP_MODULE_NAME, _index)用 JS 来调用
1
2
3
4
5
6const threadSafeAsyncCall = require('./build/Release/${node-gyp模块名}.node')
threadSafeAsyncCall(100000, function (err) {
// 跑完了
}, function (percentage) {
// 执行中
})