几天前,Node.js版本10.5.0发布,其中包含的主要功能之一是添加了(实验)线程支持。
这很有趣,特别是来自一种总以不需要的线程感到自豪的语言,这要归功于它非常棒的异步I / O。那么为什么我们需要Node中的线程呢?
简洁的答案是:过去的Node中唯一的区域中表现突出的问题:处理繁重的CPU密集型计算。这主要是为什么Node.js在人工智能,机器学习,数据科学等领域不够强大的原因。有很多努力正在解决这个问题,但我们仍然没有像部署微服务时那样表现出色。
因此,我将尝试将官方文档提供的技术文档简化为更实用,更简单的示例。希望这足以让你开始。
那么我们如何使用新的线程模块呢?
首先,您将需要一个名为“worker_threads”的模块。
请注意,只有 --experimental-worker
在执行脚本时使用该标志才能使用,否则将不会找到该模块。
注意flag是指worker而不是线程,这是他们在整个文档中被引用的方式:工作者线程或简单的worker。
如果您以前使用过multi-processing,您会发现这种方法有很多相似之处,但如果您没有,请不要担心,我会尽可能多地解释。
你可以用他们做什么?
就像我之前提到的,工作线程是指CPU密集型任务,使用它们进行I / O会浪费资源,因为根据官方文档,Node提供的处理异步I / O的内部机制要多得多比使用工作线程更有效率,所以...不要麻烦。
我们从一个简单的例子开始,介绍如何创建一个 worker
并使用它。
实例1
const { Worker, isMainThread, workerData } = require('worker_threads');
let currentVal = 0;
let intervals = [100,1000, 500]
function counter(id, i){
console.log("[", id, "]", i)
return i;
}
if(isMainThread) {
console.log("this is the main thread")
for(let i = 0; i < 2; i++) {
let w = new Worker(__filename, {workerData: i});
}
setInterval((a) => currentVal = counter(a,currentVal + 1), intervals[2], "MainThread");
} else {
console.log("this isn't")
setInterval((a) => currentVal = counter(a,currentVal + 1), intervals[workerData], workerData);
}
上面的例子将简单地输出一组显示递增计数器的行,它们将使用不同的速度增加它们的值。
让我们分解一下:
IF语句中的代码创建2个工作线程[worker threads],由于__filename 参数传递,它们的代码从同一个文件中获取。工作线程[Workers]现在需要完整的文件路径,他们不能处理相对路径,所以这就是为什么使用这个值。
这两名worker将作为全局参数发送一个值,其形式为workerData,您在第二个参数中看到的属性。然后可以通过具有相同名称的常量访问该值(请参阅常量是如何在文件的第一行中创建的,并在以后的最后一行中使用)。
这个例子是你可以用这个模块做的最基本的事情之一,但它不是很有趣,是吗?我们来看另一个例子。
实例2
让我们现在尝试做一些“繁重”的计算,同时在主线程中做一些异步的东西。
const { Worker, isMainThread, parentPort, workerData } = require('worker_threads');
const request = require("request");
if(isMainThread) {
console.log("This is the main thread")
let w = new Worker(__filename, {workerData: null});
w.on('message', (msg) => { //A message from the worker!
console.log("First value is: ", msg.val);
console.log("Took: ", (msg.timeDiff / 1000), " seconds");
})
w.on('error', console.error);
w.on('exit', (code) => {
if(code != 0)
console.error(new Error(`Worker stopped with exit code ${code}`))
});
request.get('http://www.google.com', (err, resp) => {
if(err) {
return console.error(err);
}
console.log("Total bytes received: ", resp.body.length);
})
} else { //the worker's code
function random(min, max) {
return Math.random() * (max - min) + min
}
const sorter = require("./test2-worker");
const start = Date.now()
let bigList = Array(1000000).fill().map( (_) => random(1,10000))
sorter.sort(bigList);
parentPort.postMessage({ val: sorter.firstValue, timeDiff: Date.now() - start});
}
<!--test2-worker-->
module.exports = {
firstValue: null,
sort: function(list) {
let sorted = list.sort();
this.firstValue = sorted[0]
}
}
这一次,我们访问Google.com的首页,同时对随机生成的100万个数字进行排序。这花费了几秒钟的时间,所以对我们来说,这是完美的表现。我们还将测量工作线程执行排序所需的时间,并且我们将把该值(以及第一个排序后的值)发送到主线程,在那里我们将显示结果。
这个例子的主要内容是线程之间的通信。
Workers可以通过该 on方法在主线程
中 接收消息
。我们可以听到的事件是代码中显示的事件。该message每当我们使用从实际线程发送消息触发事件 parentPort.postMessag
e的方法。您也可以使用相同的方法在您的Workers实例上向线程的代码发送消息,并使用 该parentPort对象捕获它们。
现在让我们看看一个非常相似的例子,但是使用更简洁的代码,给你一个关于如何构建工作线程代码的最终想法。
例3:将它们放在一起
作为最后一个例子,我将继续使用相同的功能,但向您展示如何将其清理干净并拥有更易维护的版本。
const { Worker, isMainThread, parentPort, workerData } = require('worker_threads');
const request = require("request");
function startWorker(path, cb) {
let w = new Worker(path, {workerData: null});
w.on('message', (msg) => {
cb(null, msg)
})
w.on('error', cb);
w.on('exit', (code) => {
if(code != 0)
console.error(new Error(`Worker stopped with exit code ${code}`))
});
return w;
}
console.log("this is the main thread")
let myWorker = startWorker(__dirname + '/workerCode.js', (err, result) => {
if(err) return console.error(err);
console.log("[[Heavy computation function finished]]")
console.log("First value is: ", result.val);
console.log("Took: ", (result.timeDiff / 1000), " seconds");
})
const start = Date.now();
request.get('http://www.google.com', (err, resp) => {
if(err) {
return console.error(err);
}
console.log("Total bytes received: ", resp.body.length);
//myWorker.postMessage({finished: true, timeDiff: Date.now() - start}) //you could send messages to your workers like this
})
你的线程代码可以在另一个文件中,比如:
const { parentPort } = require('worker_threads');
function random(min, max) {
return Math.random() * (max - min) + min
}
const sorter = require("./test2-worker");
const start = Date.now()
let bigList = Array(1000000).fill().map( (_) => random(1,10000))
/**
//you can receive messages from the main thread this way:
parentPort.on('message', (msg) => {
console.log("Main thread finished on: ", (msg.timeDiff / 1000), " seconds...");
})
*/
sorter.sort(bigList);
parentPort.postMessage({ val: sorter.firstValue, timeDiff: Date.now() - start});
我们看到:
主线程和工作线程现在将他们的代码放在不同的文件中。这更容易维护和扩展。
该startWorker函数返回新的实例,如果您愿意,您可以稍后向其发送消息。
如果主线程的代码实际上是主线程(我们删除了主要的IF语句),则不再需要担心。
您可以在worder的代码中看到如何从主线程接收消息,从而实现双向异步通信。
这将是这篇文章的重点,我希望你已经足够了解如何开始玩这个新模块。请记住:
这仍然是高度实验性的,这些东西在未来的版本中可能会改变;
去阅读评论和文档,那里有更多关于这方面的信息,我只关注实现它的基本步骤。
玩的开心!报告错误并提出改进建议,这才刚刚开始!
欢迎关注IT实战联盟
注意:本文归作者所有,未经作者允许,不得转载