nodejs で worker_threads を使用して新しいスレッドを作成する方法

nodejs で worker_threads を使用して新しいスレッドを作成する方法

導入

前の記事で述べたように、NodeJS には 2 種類のスレッドがあります。1 つは、ユーザー リクエストに応答し、さまざまなコールバックを処理するために使用されるイベント ループです。もう 1 つは、さまざまな時間のかかる操作を処理するために使用されるワーカー プールです。

nodejs の公式 Web サイトには、nodejs ローカル ワーカー プールを使用できる webworker-threads というライブラリが記載されています。

残念ながら、webworker-threads の最終更新は 2 年前であり、最新の nodejs 12 ではまったく使用できません。

webworker-threads の作成者は、web-worker と呼ばれる新しいライブラリを推奨しました。

Web-worker は、Node.js の worker_threads 上に構築されています。この記事では、worker_threads と Web-worker の使用について詳しく説明します。

ワーカースレッド

worker_threads モジュールのソース コードは lib/worker_threads.js から取得されます。これはワーカー スレッドを参照し、新しいスレッドを開始して JavaScript プログラムを並列に実行できます。

nodejs 自体の非同期 IO は非常に強力であるため、worker_threads は主に IO 操作ではなく CPU を集中的に使用する操作を処理するために使用されます。

worker_threads には 5 つのメイン属性、3 つのクラス、3 つのメインメソッドがあります。次に一つずつ説明していきます。

メインスレッド

isMainThread は、コードがメイン スレッドで実行されているかどうかを判断するために使用されます。使用例を見てみましょう。

const { Worker, isMainThread } = require('worker_threads');

if (isMainThread) {
 console.log('メインスレッド内');
 新しいワーカー(__filename);
} それ以外 {
 console.log('ワーカースレッド内');
 console.log(isMainThread); // 'false' を出力します。
}

上記の例では、worker_threads モジュールから Worker と isMainThread を導入しました。Worker はワーカー スレッドのメイン クラスであり、後で詳しく説明します。ここでは、Worker を使用してワーカー スレッドを作成します。

メッセージチャネル

MessageChannel は非同期の双方向通信チャネルを表します。 MessageChannel にはメソッドはありません。MessageChannel は主に両端の MessagePorts を接続するために使用されます。

クラスMessageChannel {
  読み取り専用ポート1: MessagePort;
  読み取り専用ポート2: MessagePort;
 }

new MessageChannel() を使用すると、2 つの MessagePort が自動的に作成されます。

const { MessageChannel } = require('worker_threads');

const { port1, port2 } = 新しい MessageChannel();
port1.on('メッセージ', (メッセージ) => console.log('受信したメッセージ', メッセージ));
port2.postMessage({ foo: 'bar' });
// 出力: `port1.on('message')` リスナーから { foo: 'bar' } を受信しました

MessageChannel を介して、MessagePort 間で通信できます。

親ポートとメッセージポート

parentPort は MessagePort 型であり、主にワーカー スレッドとメイン スレッド間のメッセージのやり取りに使用されます。

parentPort.postMessage() 経由で送信されたメッセージは、worker.on('message') 経由でメイン スレッドで利用できるようになります。

メイン スレッドで worker.postMessage() 経由で送信されたメッセージは、parentPort.on('message') 経由でワーカー スレッドで受信されます。

MessagePort の定義を見てみましょう。

クラスMessagePortはEventEmitterを拡張します{
  閉じる(): void;
  postMessage(値: 任意、転送リスト?: Array<ArrayBuffer | MessagePort>): void;
  ref(): void;
  参照を解除します。
  開始(): void;

  addListener(イベント: "close", リスナー: () => void): this;
  addListener(イベント: "message", リスナー: (値: any) => void): this;
  addListener(イベント: 文字列 | シンボル、リスナー: (...引数: any[]) => void): this;

  発行(イベント: "close"): ブール値;
  出力(イベント: "メッセージ", 値: 任意): boolean;
  出力(イベント: 文字列 | シンボル、...引数: any[]): boolean;

  on(イベント: "close", リスナー: () => void): this;
  on(イベント: "message", リスナー: (値: any) => void): this;
  on(イベント: 文字列 | シンボル、リスナー: (...引数: any[]) => void): this;

  一度(イベント: "close", リスナー: () => void): this;
  once(イベント: "message", リスナー: (値: any) => void): this;
  once(イベント: 文字列 | シンボル、リスナー: (...引数: any[]) => void): this;

  prependListener(イベント: "close", リスナー: () => void): this;
  prependListener(イベント: "message", リスナー: (値: any) => void): this;
  prependListener(イベント: 文字列 | シンボル、リスナー: (...引数: any[]) => void): this;

  prependOnceListener(イベント: "close", リスナー: () => void): this;
  prependOnceListener(イベント: "message", リスナー: (値: any) => void): this;
  prependOnceListener(イベント: 文字列 | シンボル、リスナー: (...引数: any[]) => void): this;

  リスナーを削除します(イベント: "close", リスナー: () => void): this;
  リスナーを削除します(イベント: "message", リスナー: (値: any) => void): this;
  リスナーを削除します(イベント: 文字列 | シンボル、リスナー: (...引数: any[]) => void): this;

  off(イベント: "close", リスナー: () => void): this;
  off(イベント: "message", リスナー: (値: any) => void): this;
  off(イベント: 文字列 | シンボル、リスナー: (...引数: any[]) => void): this;
 }

MessagePort は、非同期双方向通信チャネルの一方の端を表す EventEmitter から継承されます。このチャネルは MessageChannel と呼ばれ、MessagePort は MessageChannel を介して通信します。

MessagePort を使用して、構造データ、メモリ領域、または他の MessagePort を転送できます。

ソース コードから、MessagePort に close と message の 2 つのイベントがあることがわかります。

チャネルのいずれかの端が切断されると、close イベントがトリガーされ、port.postMessage が呼び出されると、message イベントがトリガーされます。例を見てみましょう。

const { MessageChannel } = require('worker_threads');
const { port1, port2 } = 新しい MessageChannel();

// 印刷:
// フーバー
// 閉店しました!
port2.on('message', (message) => console.log(message));
port2.on('close', () => console.log('closed!'));

port1.postMessage('foobar');
ポート1を閉じます。

port.on('message') は、実際にはメッセージ イベントのリスナーを追加します。Port は、リスナーを手動で追加するための addListener メソッドも提供します。

port.on('message') は自動的に port.start() メソッドをトリガーし、ポートの開始を示します。

ポートにリスナーがある場合、ポートには参照があることを意味します。参照が存在する場合、プログラムは終了しません。 port.unref メソッドを呼び出すことで、この参照をキャンセルできます。

次に、ポートを介してメッセージを送信する方法を見てみましょう。

port.postMessage(値[, 転送リスト])

postMessage は 2 つのパラメータを受け入れることができ、最初のパラメータは JavaScript オブジェクトである value です。 2 番目のパラメータは transferList です。

まず、1 つのパラメータが渡される場合を見てみましょう。

const { MessageChannel } = require('worker_threads');
const { port1, port2 } = 新しい MessageChannel();

port1.on('message', (message) => console.log(message));

const 円形データ = {};
円形データ.foo = 円形データ;
// 出力: { foo: [Circular] }
port2.postMessage(円形データ);

通常、postMessage によって送信されるオブジェクトは値のコピーですが、transferList を指定すると、transferList 内のオブジェクトはチャネルの受信側に転送され、オブジェクトを送信する場合と同様に、送信側には存在しなくなります。

transferList はリストであり、リスト内のオブジェクトは ArrayBuffer、MessagePort、FileHandle になります。

値に SharedArrayBuffer オブジェクトが含まれている場合、そのオブジェクトを transferList に含めることはできません。

2 つのパラメータを持つ例を見てみましょう。

const { MessageChannel } = require('worker_threads');
const { port1, port2 } = 新しい MessageChannel();

port1.on('message', (message) => console.log(message));

const uint8Array = 新しい Uint8Array([ 1, 2, 3, 4 ]);
// uint8Array のコピーを投稿します:
port2.postMessage(uint8Array);

port2.postMessage(uint8Array, [ uint8Array.buffer ]);

//port2.postMessage(uint8Array);

上記の例では次のように出力されます:

Uint8Array(4) [ 1, 2, 3, 4 ]
Uint8Array(4) [ 1, 2, 3, 4 ]

最初の postMessage はコピーであり、2 番目の postMessage は Uint8Array の基になるバッファの転送です。

port2.postMessage(uint8Array) を再度呼び出すと、次のエラーが発生します。

DOMException [DataCloneError]: ArrayBuffer が分離されており、複製できませんでした。

バッファは TypedArray の基礎となるストレージ構造です。バッファが転送されると、以前の TypedArray は使用できなくなります。

譲渡不可としてマーク

この問題を回避するには、バッファを転送不可としてマークするために markAsUntransferable を呼び出します。markAsUntransferable の例を見てみましょう。

const { MessageChannel、markAsUntransferable } = require('worker_threads');

const pooledBuffer = 新しいArrayBuffer(8);
const typedArray1 = 新しい Uint8Array(pooledBuffer);
const typedArray2 = 新しい Float64Array(pooledBuffer);

転送不可としてマークします(pooledBuffer);

const { port1 } = 新しい MessageChannel();
port1.postMessage(typedArray1、[typedArray1.buffer]);

コンソールにログ出力します。
コンソールにログ出力します。

シェア_ENV

SHARE_ENV はワーカーコンストラクタに渡される環境変数です。この変数を設定することで、メインスレッドとワーカースレッド間で共有環境変数を読み書きできるようになります。

const { ワーカー、SHARE_ENV } = require('worker_threads');
新しいワーカー('process.env.SET_IN_WORKER = "foo"', { eval: true, env: SHARE_ENV })
 .on('終了', () => {
 console.log(process.env.SET_IN_WORKER); // 'foo' を出力します。
 });

ワーカーデータ

postMessage() に加えて、メイン スレッドのワーカー コンストラクターに workerData を渡すことによって、メイン スレッドからワーカーにデータを渡すこともできます。

const { Worker、isMainThread、workerData } = require('worker_threads');

if (isMainThread) {
 const worker = new Worker(__filename, { workerData: 'Hello, world!' });
} それ以外 {
 console.log(workerData); // 'Hello, world!' を出力します。
}

労働者階級

まず労働者の定義を見てみましょう。

 クラスWorkerはEventEmitterを拡張します{
  読み取り専用 stdin: 書き込み可能 | null;
  読み取り専用 stdout: 読み取り可能;
  読み取り専用 stderr: 読み取り可能;
  読み取り専用スレッドID: 数値;
  読み取り専用リソース制限?: ResourceLimits;

  コンストラクター(ファイル名: 文字列 | URL、オプション: WorkerOptions);

  postMessage(値: 任意、転送リスト?: Array<ArrayBuffer | MessagePort>): void;
  ref(): void;
  参照を解除します。

  終了(): Promise<数値>;

  getHeapSnapshot(): Promise<Readable>;

  addListener(イベント: "error", リスナー: (err: Error) => void): this;
  addListener(イベント: "exit", リスナー: (exitCode: number) => void): this;
  addListener(イベント: "message", リスナー: (値: any) => void): this;
  リスナーを追加します(イベント: "online", リスナー: () => void): this;
  addListener(イベント: 文字列 | シンボル、リスナー: (...引数: any[]) => void): this;

  ... 
 }

Worker は EventEmitter を継承し、エラー、終了、メッセージ、オンラインの 4 つの重要なイベントが含まれます。

ワーカーは独立した JavaScript 実行スレッドを表します。ファイル名または URL を渡すことでワーカーを構築できます。

各ワーカーには、ワーカーの作成時に互いに関連付けられた組み込み MessagePort のペアがあります。ワーカーは、この組み込み MessagePort のペアを使用して親スレッドと通信します。

parentPort.postMessage() 経由で送信されたメッセージは、worker.on('message') 経由でメイン スレッドで利用できるようになります。

メイン スレッドで worker.postMessage() 経由で送信されたメッセージは、parentPort.on('message') 経由でワーカー スレッドで受信されます。

もちろん、MessageChannel オブジェクトを明示的に作成し、MessagePort をメッセージとして他のスレッドに渡すこともできます。例を見てみましょう。

const assert = require('assert');
定数{
 ワーカー、メッセージチャネル、メッセージポート、isMainThread、親ポート
} = 'worker_threads' が必要です。
if (isMainThread) {
 定数ワーカー = 新しいワーカー (__filename);
 定数サブチャネル = 新しいメッセージチャネル();
 ワーカー.postMessage({ hereIsYourPort: subChannel.port1 }, [subChannel.port1]);
 subChannel.port2.on('メッセージ', (値) => {
 console.log('受信:', 値);
 });
} それ以外 {
 parentPort.once('メッセージ', (値) => {
 assert(value.hereIsYourPort は MessagePort のインスタンスです);
 value.hereIsYourPort.postMessage('ワーカースレッドがこのメッセージを送信しています');
 value.hereIsYourPort.close();
 });
}

上記の例では、worker と parentPort のメッセージング機能を利用して、明示的な MessageChannel で MessagePort を渡しました。

その後、メッセージは MessagePort を通じて配信されます。

受信メッセージオンポート

ポートの on('message') メソッドに加えて、receiveMessageOnPort を使用してメッセージを手動で受信することもできます。

const { MessageChannel、receiveMessageOnPort } = require('worker_threads');
const { port1, port2 } = 新しい MessageChannel();
port1.postMessage({ hello: 'world' });

console.log(ポート2でメッセージを受信);
// 出力: { message: { hello: 'world' } }
console.log(ポート2でメッセージを受信);
// 出力: undefined

メッセージポートをコンテキストへ移動

まず、Node.js のコンテキストの概念を理解しましょう。VM からコンテキストを作成できます。これは分離されたコンテキスト環境であり、さまざまなオペレーティング環境のセキュリティを確保します。コンテキストの例を見てみましょう。

定数 vm = require('vm');

定数x = 1;

定数コンテキスト = { x: 2 };
vm.createContext(context); // コンテキスト分離オブジェクト。

定数コード = 'x += 40; var y = 17;';
// `x` と `y` はコンテキスト内のグローバル変数です。
// 最初は、context.x の値が 2 であるため、x の値は 2 になります。
vm.runInContext(コード、コンテキスト);

コンソールログ(コンテキストx); // 42
コンソールログ(context.y); // 17

console.log(x); // 1; yは定義されていません。

ワーカーでは、MessagePort を別のコンテキストに移動できます。

ワーカー.メッセージポートをコンテキストに移動する(ポート、コンテキスト化されたサンドボックス)

このメソッドは 2 つのパラメータを受け取ります。最初のパラメータは移動する MessagePort であり、2 番目のパラメータは vm.createContext() によって作成されたコンテキスト オブジェクトです。

worker_threads スレッドプール

上記では単一のワーカー スレッドの使用について説明しましたが、プログラムでは 1 つのスレッドでは不十分な場合が多く、ワーカー スレッド オブジェクトを維持するためにスレッド プールを作成する必要があります。

Nodejs は、非同期リソースの拡張として AsyncResource クラスを提供します。

AsyncResource クラスは async_hooks モジュールにあります。

次に、AsyncResource クラスを使用してワーカー スレッド プールを作成する方法を見てみましょう。

2 つの数値を加算するタスクがあり、スクリプト名が task_processor.js であるとします。

const { 親ポート } = require('worker_threads');
parentPort.on('メッセージ', (タスク) => {
 親ポートにメッセージを投稿します。
});

ワーカー プールの実装は次のとおりです。

const { AsyncResource } = require('async_hooks');
const { EventEmitter } = require('events');
定数パス = require('path');
const { ワーカー } = require('worker_threads');

const kTaskInfo = シンボル('kTaskInfo');
const kWorkerFreedEvent = シンボル('kWorkerFreedEvent');

WorkerPoolTask​​InfoクラスはAsyncResourceを拡張します{
 コンストラクタ(コールバック) {
 スーパー('ワーカープールタスク情報');
 this.callback = コールバック;
 }

 完了(エラー、結果) {
 this.runInAsyncScope(this.callback、null、err、結果);
 this.emitDestroy(); // `TaskInfo` は 1 回だけ使用されます。
 }
}

クラス WorkerPool は EventEmitter を拡張します {
 コンストラクター(スレッド数) {
 素晴らしい();
 this.numThreads = numThreads;
 this.workers = [];
 this.freeWorkers = [];

 (i = 0 とします; i < numThreads; i++)
  新しいワーカーを追加します。
 }

 新しいワーカーを追加します(){
 定数ワーカー = 新しいワーカー (path.resolve(__dirname, 'task_processor.js'));
 ワーカー.on('メッセージ', (結果) => {
  // 成功した場合: `runTask` に渡されたコールバックを呼び出します。
  // Worker に関連付けられた `TaskInfo` を削除し、空きとしてマークします
  // また。
  ワーカー[kTaskInfo].done(null、結果);
  ワーカー[kTaskInfo] = null;
  this.freeWorkers.push(ワーカー);
  this.emit(kWorkerFreedEvent);
 });
 ワーカー.on('エラー', (err) => {
  // キャッチされない例外の場合: 渡されたコールバックを呼び出します
  // エラーのある `runTask`。
  if (ワーカー[kTaskInfo])
  ワーカー[kTaskInfo].done(err, null);
  それ以外
  this.emit('エラー', err);
  // リストからワーカーを削除し、新しいワーカーを開始して置き換えます。
  // 現在のもの。
  this.workers.splice(this.workers.indexOf(ワーカー), 1);
  新しいワーカーを追加します。
 });
 this.workers.push(ワーカー);
 this.freeWorkers.push(ワーカー);
 this.emit(kWorkerFreedEvent);
 }

 runTask(タスク、コールバック) {
 if (this.freeWorkers.length === 0) {
  // 空きスレッドがないので、ワーカー スレッドが空くまで待機します。
  this.once(kWorkerFreedEvent, () => this.runTask(task, callback));
  戻る;
 }

 ワーカーを解放します。
 ワーカー[kTaskInfo] = 新しい WorkerPoolTask​​Info(コールバック);
 ワーカー.postMessage(タスク);
 }

 近い() {
 (this.workers の const ワーカー) に対して、worker.terminate();
 }
}

モジュールをエクスポートします。

ワーカーの新しい kTaskInfo プロパティを作成し、非同期コールバックを WorkerPoolTask​​Info にカプセル化して、worker.kTaskInfo に割り当てます。

次にworkerPoolを使用します。

ワーカープールを require('./worker_pool.js');
os が必要です。

定数プール = 新しい WorkerPool(os.cpus().length);

終了 = 0 にします。
(i = 0; i < 10; i++ とします) {
 pool.runTask({ a: 42, b: 100 }, (err, 結果) => {
 console.log(i, err, 結果);
 (++終了 === 10)の場合
  プールを閉じます。
 });
}

これで、nodejs で worker_threads を使用して新しいスレッドを作成する方法についての記事は終了です。nodejs で worker_threads を使用してスレッドを作成する方法の詳細については、123WORDPRESS.COM の以前の記事を検索するか、次の関連記事を引き続き参照してください。今後とも 123WORDPRESS.COM をよろしくお願いいたします。

以下もご興味があるかもしれません:
  • プロセス解析を使用する Javascript Web Worker
  • Yii2 と Workerman の Websocket の例を組み合わせた詳細な説明
  • JavaScript での Web ワーカー マルチスレッド API の研究
  • Node.js のワーカー スレッドの詳細な理解
  • 複数ページ通信を実現する JavaScript の sharedWorker の詳細な例
  • Javascript ワーカー サブスレッド コード例
  • JavaScript でのワーカー イベント API の理解
  • JS で webWorker を使用する方法

<<:  MySQL countの詳細な説明と関数のサンプルコード

>>:  Win7 の VMware 仮想マシンに Linux7.2 をインストールするインターネット アクセス構成チュートリアル

推薦する

Vue codemirrorはオンラインコードコンパイラの効果を実現します

序文Web 上でオンライン コード コンパイルの効果を実現したい場合は、 CodeMirrorを再度...

MySQLのトランザクションとデータ一貫性処理の問題を分析する

この記事では、セキュリティ、使用方法、同時処理などを通じて、MySQL トランザクションとデータの一...

Vue ルーター vue-router 詳細説明ガイド

中国語ドキュメント: https://router.vuejs.org/zh/ Vue Router...

JavaScript のデシェイクとスロットリングの例

目次安定スロットル: 手ぶれ防止: 一定時間内に最後のタスクのみを実行します。スロットル: 一定期間...

Docker の NFS-Ganesha イメージを使用して NFS サーバーを構築する詳細なプロセス

目次1. NFS-Ganeshaの紹介2. NFS-Ganeshaの設定3. NFS-Ganesha...

Dockerfileを使用してDockerイメージを構築する

目次Dockerfileを使用してDockerイメージを構築する1. Dockerfile とは何で...

jQueryはネストされたタブ機能を実装します

この記事では、ネストされたタブ機能を実装するためのjQueryの具体的なコードを参考までに紹介します...

CentOS で LibreOffice を使用してドキュメント形式を変換する方法

プロジェクト要件では、アップロードされたドキュメントの前処理が必要です。ユーザーが doc 形式でド...

Linux (Centos7) での redis5 クラスターの構築と使用方法の詳細な説明

目次1. 簡単な説明2. クラスターを作成する手順2.1. ディレクトリを作成する2.2. ソースコ...

Flex モバイルレイアウトにおけるシングルラインレイアウトとダブルラインレイアウトの違いと使い方

レイアウトにul>liを使用した単一行レイアウトを以下に示します。 <ul class=...

Angularが予期しない例外エラーを処理する方法の詳細な説明

前面に書かれたコードがどれだけ適切に記述されていても、すべての可能性のある例外を完全に処理することは...

Mysql は非集計列を選択できません

1. はじめに最近ブログをアップグレードし、記事ページの下部に前の記事と次の記事に直接ジャンプできる...

Vue3 での watchEffect の使用に関する簡単な分析

序文誰もが vue2 の watch API に精通している必要があります。vue2 の vue イ...

MySql はデータを正常に挿入しますが、[Err] 1055 エラーが報告されます。解決策

1. 質問:最近、挿入操作を行っています。MySQLのバージョンは5.7です。挿入は成功しますが、エ...

MySQL が uuid または snowflake id を主キーとして使用することを推奨しない理由の詳細な分析

前書き: MySQL でテーブルを設計する場合、MySQL では UUID や非連続かつ非繰り返しの...