NodeJS 10.5.0 中的线程:实用介绍

我是小傅哥 2018-06-27 14:42:38 ⋅ 647 阅读

几天前,Node.js版本10.5.0发布,其中包含的主要功能之一是添加了(实验)线程支持。

这很有趣,特别是来自一种总以不需要的线程感到自豪的语言,这要归功于它非常棒的异步I / O。那么为什么我们需要Node中的线程呢?

简洁的答案是:过去的Node中唯一的区域中表现突出的问题:处理繁重的CPU密集型计算这主要是为什么Node.js在人工智能,机器学习,数据科学等领域不够强大的原因。有很多努力正在解决这个问题,但我们仍然没有像部署微服务时那样表现出色。

因此,我将尝试将官方文档提供的技术文档简化为更实用,更简单的示例。希望这足以让你开始。

那么我们如何使用新的线程模块呢?

首先,您将需要一个名为“worker_threads”的模块。

请注意,只有 --experimental-worker 在执行脚本时使用该标志才能使用,否则将不会找到该模块。

注意flag是指worker而不是线程,这是他们在整个文档中被引用的方式:工作者线程或简单的worker。

如果您以前使用过multi-processing,您会发现这种方法有很多相似之处,但如果您没有,请不要担心,我会尽可能多地解释。

你可以用他们做什么?

就像我之前提到的,工作线程是指CPU密集型任务,使用它们进行I / O会浪费资源,因为根据官方文档,Node提供的处理异步I / O的内部机制要多得多比使用工作线程更有效率,所以...不要麻烦。

我们从一个简单的例子开始,介绍如何创建一个 worker并使用它。

实例1

 
  1. const { Worker, isMainThread,  workerData } = require('worker_threads');

  2. let currentVal = 0;

  3. let intervals = [100,1000, 500]

  4. function counter(id, i){

  5.    console.log("[", id, "]", i)

  6.    return i;

  7. }

  8. if(isMainThread) {

  9.    console.log("this is the main thread")

  10.    for(let i = 0; i < 2; i++) {

  11.        let w = new Worker(__filename, {workerData: i});

  12.    }

  13.    setInterval((a) => currentVal = counter(a,currentVal + 1), intervals[2], "MainThread");

  14. } else {

  15.    console.log("this isn't")

  16.    setInterval((a) => currentVal = counter(a,currentVal + 1), intervals[workerData], workerData);

  17. }

上面的例子将简单地输出一组显示递增计数器的行,它们将使用不同的速度增加它们的值。

让我们分解一下:

  1. IF语句中的代码创建2个工作线程[worker threads],由于__filename 参数传递,它们的代码从同一个文件中获取。工作线程[Workers]现在需要完整的文件路径,他们不能处理相对路径,所以这就是为什么使用这个值。

  2. 这两名worker将作为全局参数发送一个值,其形式为workerData,您在第二个参数中看到的属性。然后可以通过具有相同名称的常量访问该值(请参阅常量是如何在文件的第一行中创建的,并在以后的最后一行中使用)。

这个例子是你可以用这个模块做的最基本的事情之一,但它不是很有趣,是吗?我们来看另一个例子。

实例2

让我们现在尝试做一些“繁重”的计算,同时在主线程中做一些异步的东西。

 
  1. const { Worker, isMainThread, parentPort, workerData } = require('worker_threads');

  2. const request = require("request");

  3. if(isMainThread) {

  4.    console.log("This is the main thread")

  5.    let w = new Worker(__filename, {workerData: null});

  6.    w.on('message', (msg) => { //A message from the worker!

  7.        console.log("First value is: ", msg.val);

  8.        console.log("Took: ", (msg.timeDiff / 1000), " seconds");

  9.    })

  10.    w.on('error', console.error);

  11.    w.on('exit', (code) => {

  12.        if(code != 0)

  13.            console.error(new Error(`Worker stopped with exit code ${code}`))

  14.   });

  15.    request.get('http://www.google.com', (err, resp) => {

  16.        if(err) {

  17.            return console.error(err);

  18.        }

  19.    console.log("Total bytes received: ", resp.body.length);

  20.    })

  21. } else { //the worker's code

  22.    function random(min, max) {

  23.        return Math.random() * (max - min) + min

  24.    }

  25.    const sorter = require("./test2-worker");

  26.    const start = Date.now()

  27.    let bigList = Array(1000000).fill().map( (_) => random(1,10000))

  28.    sorter.sort(bigList);

  29.   parentPort.postMessage({ val: sorter.firstValue, timeDiff: Date.now() - start});

  30. }

 
  1. <!--test2-worker-->

  2. module.exports = {

  3.    firstValue: null,

  4.    sort: function(list) {

  5.        let sorted = list.sort();

  6.        this.firstValue = sorted[0]

  7.    }

  8. }

这一次,我们访问Google.com的首页,同时对随机生成的100万个数字进行排序。这花费了几秒钟的时间,所以对我们来说,这是完美的表现。我们还将测量工作线程执行排序所需的时间,并且我们将把该值(以及第一个排序后的值)发送到主线程,在那里我们将显示结果。

这个例子的主要内容是线程之间的通信。

Workers可以通过该 on方法在主线程接收消息。我们可以听到的事件是代码中显示的事件。该message每当我们使用从实际线程发送消息触发事件 parentPort.postMessage的方法。您也可以使用相同的方法在您的Workers实例上向线程的代码发送消息,并使用 parentPort对象捕获它们。

现在让我们看看一个非常相似的例子,但是使用更简洁的代码,给你一个关于如何构建工作线程代码的最终想法。

例3:将它们放在一起

作为最后一个例子,我将继续使用相同的功能,但向您展示如何将其清理干净并拥有更易维护的版本。

 
  1. const { Worker, isMainThread, parentPort, workerData } = require('worker_threads');

  2. const request = require("request");

  3. function startWorker(path, cb) {

  4.    let w = new Worker(path, {workerData: null});

  5.    w.on('message', (msg) => {

  6.        cb(null, msg)

  7.    })

  8.    w.on('error', cb);

  9.    w.on('exit', (code) => {

  10.        if(code != 0)

  11.            console.error(new Error(`Worker stopped with exit code ${code}`))

  12.   });

  13.    return w;

  14. }

  15. console.log("this is the main thread")

  16. let myWorker = startWorker(__dirname + '/workerCode.js', (err, result) => {

  17.    if(err) return console.error(err);

  18.    console.log("[[Heavy computation function finished]]")

  19.    console.log("First value is: ", result.val);

  20.    console.log("Took: ", (result.timeDiff / 1000), " seconds");

  21. })

  22. const start = Date.now();

  23. request.get('http://www.google.com', (err, resp) => {

  24.    if(err) {

  25.        return console.error(err);

  26.    }

  27.    console.log("Total bytes received: ", resp.body.length);

  28.    //myWorker.postMessage({finished: true, timeDiff: Date.now() - start}) //you could send messages to your workers like this

  29. })

你的线程代码可以在另一个文件中,比如:

 
  1. const {  parentPort } = require('worker_threads');

  2. function random(min, max) {

  3.    return Math.random() * (max - min) + min

  4. }

  5. const sorter = require("./test2-worker");

  6. const start = Date.now()

  7. let bigList = Array(1000000).fill().map( (_) => random(1,10000))

  8. /**

  9. //you can receive messages from the main thread this way:

  10. parentPort.on('message', (msg) => {

  11.    console.log("Main thread finished on: ", (msg.timeDiff / 1000), " seconds...");

  12. })

  13. */

  14. sorter.sort(bigList);

  15. parentPort.postMessage({ val: sorter.firstValue, timeDiff: Date.now() - start});

我们看到:

  1. 主线程和工作线程现在将他们的代码放在不同的文件中。这更容易维护和扩展。

  2. 该startWorker函数返回新的实例,如果您愿意,您可以稍后向其发送消息。

  3. 如果主线程的代码实际上是主线程(我们删除了主要的IF语句),则不再需要担心。

  4. 您可以在worder的代码中看到如何从主线程接收消息,从而实现双向异步通信。

这将是这篇文章的重点,我希望你已经足够了解如何开始玩这个新模块。请记住:

  1. 这仍然是高度实验性的,这些东西在未来的版本中可能会改变;

  2. 去阅读评论和文档,那里有更多关于这方面的信息,我只关注实现它的基本步骤。

  3. 玩的开心!报告错误并提出改进建议,这才刚刚开始!

欢迎关注IT实战联盟



全部评论: 0

    我有话说:

    Node异步式I/O和异步式编程(三)

     Node.js 最大特点就是异步式 I/O(或者非阻塞I/O)与事件紧密结合编程模式。 第一部分: I/O 1.阻塞I/O与非阻塞I/O概念 1.1阻塞I/O(同步I/O) 线

    面试官:HashMap为什么是线不安全

    一直以来只是知道HashMap是线不安全,但是到底HashMap为什么线不安全?

    Node.js v15.13.0 发布

    Node.js 15.13.0 正式发布。Node.js 是能够在服务器端运行 JavaScript 开放源代码、跨平台 JavaScript 

    推荐一款功能强大,开源免费H5可视化编辑器

    H5-Dooring 是一款功能强大,开源免费H5可视化页面配置解决方案,致力于提供一套简单方便、专业可靠、无限可能H5落地页最佳实践。技术栈以react为主, 后台采用nodejs开发. 预览

    Nodejs实用技巧之-Exceljs

    今天我们就在此介绍下exceljs基本使用,应该可以满足我们大部分需求。

    Node模块之fs模块(六)

    屏幕快照 2017-08-08 上午10.53.21.png 第一部分 概述 Node.js 提供一组类似UNIX(POSIX)标准文件操作API,Node.js操作文件模块是fs(File

    Apache Tomcat 8.5.59、9.0.39和10.0.0-M9发布

    Apache Tomcat 8.5.59, 9.0.39 和 10.0.0-M9 已发布。 8.5.x 已取代 8.0.x,并增加了从 Tomcat 9.0.x 吸收新功能。与 8.5.58

    「轻阅读」图解 Java 线生命周期

    Java 线生命周期都包含哪些状态?生命周期各个状态都是什么含义?

    JavaWeb实战篇:视图化了解多线Wait、NotifyAll使用案例

    内容简介本节针对于我们常听说而不常使用 wait 和 notify 方法做个生产者和消费者案例效果图多线......

    Redis多线演进

    Redis作为一个基于内存缓存系统,一直以高性能著称,因没有上下文切换以及无锁操作,即使在单线处理情况下,读速度仍可达到11万次/s,写速度达到8.1万次/s。但是,单线设计也给Redis

    DDDplus 1.0.2 发布,轻量级业务台开发框架

    DDDplus 简介 一套轻量级业务台开发框架,以DDD思想为本,致力于业务资产可沉淀可传承,全方位解决复杂业务场景扩展问题,实现台核心要素,赋能台建设。 融合了前台复杂生态协作方法论

    Node实战篇:Nodejs 链接 Mariadb 实例

    MariaDB数据库管理系统是MySQL一个分支,主要由开源社区在维护,采用GPL授权许可 MariaDB目的是完全兼容MySQL

    Node实战篇:Express--jade模板引擎(七)

    Jade(Pug) — Node Template Engine,一个高性能模板引擎,专为 Node 而做......

    Node 模块之 util URL queryString path(八)

    第一部分 util util是一个Node.js核心模块,util模块设计主要目的是为了满足Node内部API需求。其中包括:格式化字符串、对象序列化、实现对象继承等常用方法。要使用util

    ECharts 5.0.1 发布,JavaScript 实现交互式图表可视化库

    Apache ECharts (incubating) 5.0.1 已发布,ECharts 是一个使用 JavaScript 实现开源可视化库,可以流畅运行在 PC 和移动设备上,兼容

    Node.js 15.6.0 发布

    Node.js 15.6.0 发布,Node.js 是一个基于 Chrome V8 引擎 JavaScript 运行时。 此版本主要更新内容包括: child_process: 添加

    Node实战篇:Express路由(三)

    Express 是一个基于 Node.js 平台极简、灵活 web 应用开发框架,它提供一系列强大特性,帮助你创建各种 Web 和移动设备应用。

    Apache Tomcat 10.0.4、9.0.44 和 8.5.64 发布

    Apache Tomcat 三个分支发布了更新,分别是 10.0.4、9.0.44 和 8.5.64。 Tomcat 10.0.x 系列目标平台是 Jakarta EE 9。官方表示,Tomcat

    2018年8个技巧来构建更好Node.js应用程序

    2018年8个技巧来构建更好Node.js应用程序