Rabbitmq heartbea ハートビート検出メカニズムの原理の分析

Rabbitmq heartbea ハートビート検出メカニズムの原理の分析

序文

RabbitMQ を使用する場合、一定期間クライアントと RabbitMQ サーバーの間でトラフィックがない場合、サーバーはクライアントとの TCP 接続を切断します。

サーバー上に次のようなログが表示されます。

クライアントからのハートビートが失われました。タイムアウト: xxs

この間隔がハートビート間隔です。

ハートビートは通常、通信の相手側がアクティブであるかどうか (ソケット接続を正常に閉じることができなかったために発生した異常なクラッシュ) を検出するために使用されます。基本的な原理は、対応するソケット接続上のデータ送受信が正常かどうかを検出することです。一定時間内にデータの送受信が行われない場合、ハートビート検出パケットが相手側に送信されます。一定時間内に応答がない場合は、ハートビートがタイムアウトした、つまり相手側が異常クラッシュした可能性があるとみなされます。

RabbitMQも例外ではありません。クライアントとサーバーの間でHeatbeatを使用することで、相手側が正常かどうか、つまりクライアントとサーバー間のTCPリンクが正常かどうかを検出します。

RabbitMQハートビートについて

1. ハートビート検出時間間隔は、設定ファイル rabbitmq.config に設定項目 {heartbeat,Timeout} を追加することで設定できます。ここで、Timeout は時間間隔を秒単位で指定します。また、クライアント側でもハートビート時間を設定することができます。

サーバーが設定されていない場合

デフォルトのプロキシハートビート時間:

RabbitMQ 3.2.2: 580秒
RabbitMQ 3.5.5: 60秒

2. 公式の推奨ではハートビートを無効にしないことになっており、推奨されるハートビート時間は 60 秒です。

3. ハートビートは、ハートビート タイムアウト/2 秒ごとに送信されます。サーバーが 2 回受信しない場合、TCP 接続は切断され、以前の接続は無効になり、クライアントは再接続する必要があります。

4. Java、.NET、Erlangクライアントを使用する場合、サーバーとクライアントはハートビート時間をネゴシエートします。

どちらかの値が 0 の場合、大きい方の値が使用されます。

それ以外の場合は、2つのうち小さい方を使用してください

両方の値が 0 の場合、ハートビートが無効になっていることを意味し、サーバーとクライアントはこの TCP 接続を維持し、切断されません。

注: Python クライアントでこれを直接 0 に設定すると、ハートビート機能が無効になります。

Python クライアントでハートビートを無効にする方法:

py3:ConnectionParameters で heartbeat_interval=0 を設定するだけです。

py2:ConnectionParameters で heartbeat=0 を設定するだけです。

5. 接続上のすべてのトラフィック (送信された有効なデータ、確認など) は、ハートビート フレームを含む有効なハートビートとしてカウントされます。

6. 誰かがオンラインでこの質問をしているのを見ました:

なぜサーバーがクラッシュするのでしょうか? ハートビート検出メカニズムでは、サーバー側は切断されますが、クライアント側は TCP 切断を検出できません。クライアントが TCP 接続の切断を検出できないことをテストしました。クライアントがこの TCP に対して操作を行った場合にのみ検出できます。もちろん、切断された TCP 接続に対して操作を実行すると、エラーが発生します (メッセージの送信など)。

輸入ナキウサギ 
インポート時間 

クレジット = pika.PlainCredentials(ユーザー名 = 'cloud'、パスワード = 'cloud')
接続 = pika.BlockingConnection(pika.ConnectionParameters(
  ホスト='10.32.1.12'、資格情報=credit))
チャネル = connection.channel() 
真の場合:
  connect_close = 接続が閉じられました
  connect_open = 接続が開いている
  チャネルが閉じている = チャネルが閉じている
  チャンネルオープン = チャンネルがオープンしている
  
  print("接続が閉じられました ", connect_close)
  print("接続はis_open"、connect_open)
  print("チャンネルは閉じています ", channel_close)
  print("チャンネルは開いている ", channel_open)
  印刷("")
  時間.睡眠(5)

7. 一部の RabbitMQ クライアント (Bunny、Java、.NET、Objective-C、Swift) は、ネットワーク障害後に接続を自動的に復元するメカニズムを提供しますが、pika は接続の異常を検出して接続を再確立することしかできません。

サンプルコード: 接続例外を検出して接続を再作成します。

輸入ナキウサギ

真の場合:
  試す:
    接続 = pika.BlockingConnection()
    チャネル = connection.channel()
    チャネル.basic_consume('テスト'、on_message_callback)
    チャネルの開始_消費()
  # ブローカーによって接続が閉じられた場合は回復しない
  pika.exceptions.ConnectionClosedByBroker を除く:
    壊す
  # チャネルエラー時に回復しない
  pika.exceptions.AMQPChannelError を除く:
    壊す
  # その他の接続エラーが発生した場合は回復する
  pika.exceptions.AMQPConnectionError を除く:
    続く

retry などの操作再試行ライブラリを使用することもできます。

再試行からインポート再試行

@retry(pika.exceptions.AMQPConnectionError、遅延=5、ジッター=(1、3))
def 消費():
  接続 = pika.BlockingConnection()
  チャネル = connection.channel()
  チャネル.basic_consume('テスト'、on_message_callback)
  試す:
    チャネルの開始_消費()
  # サーバーによって閉じられた接続を回復しない
  pika.exceptions.ConnectionClosedByBroker を除く:
    合格
スタブ()

ハートビートの実装

クライアントから connection.tune-ok シグナルを受信した後、rabbitmq はハートビート検出を有効にします。rabbitmq はハートビート検出のために各 TCP 接続ごとに 2 つのプロセスを作成します。1 つのプロセスは TCP 接続でデータが送信されているかどうかを定期的に検出します (ここでの送信とは、rabbitmq がクライアントにデータを送信することを意味します)。一定期間クライアントにデータが送信されない場合は、ハートビート パケットがクライアントに送信され、次にループで次の検出が実行されます。もう 1 つのプロセスは TCP 接続でデータが受信されているかどうかを定期的に検出します。一定期間データが受信されない場合は、ハートビートがタイムアウトしたと判断され、TCP 接続は最終的に閉じられます。さらに、RabbitMQ のフロー制御メカニズムによりハートビート検出が一時停止される場合もありますが、これについてはここでは説明しません。

関連するソースコード:

開始(SupPid、Sock、SendTimeoutSec、
   SendFun、ReceiveTimeoutSec、ReceiveFun) ->
  %%データ送信検出プロセス {ok, Sender} = start_heartbeater(SendTimeoutSec, SupPid, Sock,
                   SendFun、heartbeat_sender、
                   start_heartbeat_sender)、
  %%データ受信検出プロセス {ok, Receiver} = start_heartbeater(ReceiveTimeoutSec, SupPid,
                    ソックス、レシーブファン、
                    ハートビートレシーバー、
                    start_heartbeat_receiver)、
  {送信者、受信者}。

start_heartbeat_sender(Sock、TimeoutSec、SendFun) ->
  %% 'div 2' は、待つ必要がないようにするためにあります
  %% ハートビートを送信する前に約2 * TimeoutSec
  %% 境界ケース
  ハートビーター({Sock, TimeoutSec * 1000 div 2, send_oct, 0,
         fun () -> SendFun()、続行 end})。

start_heartbeat_receiver(Sock、TimeoutSec、ReceiveFun) ->
  %% 一定間隔ごとに受信データをチェックし、一定時間後にタイムアウトします
  %% 2回のチェックで変化なし。結果としてタイムアウトになります
  %% 最後のデータが取得されてから2~3間隔の間
  %% 受け取った
  ハートビーター({Sock, TimeoutSec * 1000, recv_oct, 1,
        fun () -> ReceiveFun()、stop end})。

ハートビーター({Sock, TimeoutMillisec, 
       StatName、しきい値、ハンドラー} = パラメータ、
      デブ、
      {StatVal, SameCount} = 状態) ->
  再帰 = fun (State1) -> heartbeater(Params, Deb, State1) 終了、
  受け取る
    ...
  %% TimeoutMillisec 後の時間検出 ->
    ケースrabbit_net:getstat(Sock, [StatName]) の
      {ok、[{StatName, NewStatVal}]} ->
        %% 送受信されたデータは、NewStatVal =/= StatVal の場合に変更されました ->
            %%検出を再開する Recurse({NewStatVal, 0});
          %% 指定回数に達しなかった場合は送信は0、受信は1となります
          同じカウント<しきい値->
            %%Count に 1 を加算して再度チェックしますRecurse({NewStatVal, SameCount + 1});
          %%ハートビートタイムアウト true ->
            %% 送信検出タイムアウトの場合、クライアントにハートビートパケットを送信します %% 受信検出タイムアウトの場合、親プロセスにタイムアウト通知を送信します %% 親プロセスはTCPシャットダウンおよびその他の操作をトリガーします case Handler() の
              %%受信検出タイムアウト停止 -> ok;
              %%送信検出タイムアウト継続 -> Recurse({NewStatVal, 0})
            終わり;
      ...

送受信検出時には、inetモジュールのgetstatを使用してソケットの統計情報を表示します。

recv_oct: ソケットで受信したバイト数を表示する

send_oct: ソケットで送信されたバイト数を表示します

inet の詳細については、http://www.erlang.org/doc/man/inet.html を参照してください。

以上がこの記事の全内容です。皆様の勉強のお役に立てれば幸いです。また、123WORDPRESS.COM を応援していただければ幸いです。

以下もご興味があるかもしれません:
  • Docker デプロイメント RabbitMQ コンテナ実装プロセス分析
  • SpringBoot で RabbitMQ を使用する詳細なチュートリアル
  • C# は RabbitMq キューを使用します (サンプル、ワーク、ファンアウト、ダイレクトなどのモードを簡単に使用できます)
  • CentOS で yum を使用して rabbitmq-server をインストールする方法
  • SpringBoot+RabbitMQを使用したメッセージ送受信の実装例
  • Python rabbitMQ でプロダクション モデルとコンシューマー モデルを実装する方法
  • DockerにRabbitMQを素早くインストールする方法
  • Springboot + rabbitmq でメッセージ確認メカニズムを実装する方法 (落とし穴の経験)
  • C# は RabbitMQ を呼び出してメッセージ キューのサンプル コードを実装します。
  • JavaでRabbitMqメッセージミドルウェアを構築するプロセスの詳細な説明

<<:  js で下線とキャメルケースの変換を実装する (複数の方法)

>>:  最も完全な 50 の MySQL データベース クエリ演習

推薦する

mysql8.0.20 のダウンロードとインストールおよび発生した問題 (図とテキスト)

1.ブラウザでmysqlを検索してダウンロードしてインストールしますアドレス: https://d...

WeChatミニプログラムはuni-appを通じて世界中に共有されます

実際の使用では、ミニプログラムを友人や友人サークルと共有する必要があることが多く、通常は一度に 1 ...

Nginx で WordPress を設定する方法

以前、私は自分で WordPress を構築していましたが、当時はサードパーティの仮想ホストを使用し...

Tomcat のプレースホルダーによるポート設定方法 (パラメータ指定方式)

仕事で必要になったため、インターネットで多くの情報を見つけましたが、それらはすべてコピーアンドペース...

canvas.toDataURL image/png エラー処理方法の推奨

問題の背景:再生中のビデオのスクリーンショットを撮る必要があります。ビデオはビデオタグを使用して再生...

JavaScript Canvas は動的なワイヤーフレーム効果を描画します

この記事では、JavaScript Canvasの動的なワイヤーフレーム効果を描画する具体的なコード...

モバイルレイアウト用の動的REMの実装

ダイナミックレム1. まず、現在の長さの単位を紹介しましょうpx em Mの幅 / 漢字の幅 1em...

docker で Apollo をデプロイする詳細なチュートリアル

1. はじめにここでは apollo について詳しく説明しません。公式サイト https://git...

Xmeter APIインターフェーステストツールの使用状況の分析

XMeter API は、以下のサービスを含む、JMeter に基づくワンストップのオンライン イン...

MySQL 派生テーブル(Derived Table)の簡単な使用例分析

この記事では、例を使用して、MySQL 派生テーブルの簡単な使用方法を説明します。ご参考までに、詳細...

Element-uiはテーブル内のセルを直接クリックして編集します

目次成果を達成する実装コード最近、会社でelementUIを使い始めたため、開発の過程でテーブルのセ...

Javascript実践におけるコマンドモードの詳しい説明

目次意味構造例カスタムショートカットキー元に戻すとやり直し録音と再生マクロ要約する意味リクエストをオ...

MySQLデータベース移行におけるデータ文字化けの問題を解決する

リーダーの指示のもと、Java プロジェクトを引き継ぎ、リファクタリングを行う必要がありました。同時...

MySQL の完全なデータベース バックアップ データを使用して単一のテーブル データを復元する方法

序文データベースをバックアップするときは、データベース全体のバックアップを使用します。ただし、何らか...

年末ですが、MySQL パスワードは安全ですか?

序文:年末です。データベースを検査する時期ではないでしょうか?一般的に、検査では、パスワードの複雑さ...