リアルタイムコンピューティングフレームワークFlinkクラスタの構築と動作メカニズムについての簡単な説明

リアルタイムコンピューティングフレームワークFlinkクラスタの構築と動作メカニズムについての簡単な説明

1. Flinkの概要

1.1 基本的な紹介

主な機能には、バッチとストリームの統合、正確な状態管理、イベント時間のサポート、および正確に 1 回の状態一貫性の保証が含まれます。 Flink は、YARN、Mesos、Kubernetes などのさまざまなリソース管理フレームワークで実行できるだけでなく、ベアメタル クラスターでの独立したデプロイメントもサポートします。高可用性オプションを有効にすると、単一障害点がなくなります。

ここで説明する概念は 2 つあります。

  • 境界: データ集約戦略または条件として理解できる、無制限および制限付きデータ フロー。
  • ステータス: 実行順序に依存関係があるかどうか、つまり次の実行が前の実行の結果に依存するかどうか。

1.2 アプリケーションシナリオ

データ駆動型

イベント駆動型アプリケーションでは、リモート データベースをクエリする必要がありません。ローカル データ アクセスにより、スループットの向上とレイテンシの低減が可能になります。不正防止のケースを例にとると、DataDriven は処理ルール モデルを DatastreamAPI に書き込み、ロジック全体を Flink エンジンに抽象化します。イベントまたはデータが流入すると、対応するルール モデルがトリガーされます。ルール内の条件がトリガーされると、DataDriven はそれをすばやく処理し、ビジネス アプリケーションに通知します。

データ分析

バッチ分析と比較して、ストリーミング分析では定期的なデータインポートとクエリプロセスが不要になるため、イベントからインジケーターを取得する際のレイテンシが低くなります。さらに、バッチクエリは定期的なインポートや入力境界によって生じる人工的なデータ境界に対処する必要がありますが、ストリーミングクエリではこの問題を考慮する必要がありません。Flink は継続的なストリーミング分析とバッチ分析の両方に優れたサポートを提供し、データをリアルタイムで処理および分析します。リアルタイムの大画面やリアルタイムレポートなどのシナリオで広く使用されています。

データパイプライン

定期的な ETL タスクと比較して、継続的なデータ パイプラインは、データを宛先に移動するレイテンシを大幅に削減できます。たとえば、上流の StreamETL に基づいて、リアルタイムのデータ クリーニングまたは拡張を実行し、下流にリアルタイム データ ウェアハウスを構築して、データ クエリの適時性を確保し、高効率のデータ クエリ リンクを形成できます。このシナリオは、メディア ストリームの推奨や検索エンジンで非常に一般的です。

2. 環境の展開

2.1. インストールパッケージの管理

[root@hop01 opt]# tar -zxvf flink-1.7.0-bin-hadoop27-scala_2.11.tgz

[root@hop02 opt]# mv flink-1.7.0 flink1.7

2.2 クラスタ構成

管理ノード

[root@hop01 opt]# cd /opt/flink1.7/conf

[root@hop01 conf]# vim flink-conf.yaml

ジョブマネージャ.rpc.アドレス: hop01

分散ノード

[root@hop01 conf]# vim スレーブ

ホップ02

ホップ03

2 つの構成はすべてのクラスター ノードに同期されます。

2.3. 開始と停止

クラスタを起動します

/opt/flink1.7/bin/stop-cluster.sh

起動ログ:

[root@hop01 conf]# /opt/flink1.7/bin/start-cluster.sh

クラスターを開始しています。

ホスト hop01 でスタンドアロンセッション デーモンを起動しています。

ホスト hop02 で taskexecutor デーモンを起動しています。

ホスト hop03 で taskexecutor デーモンを起動しています。

2.4 ウェブインターフェース

アクセス: http://hop01:8081/

3. 開発参入事例

3.1 データスクリプト

各ノードにデータ スクリプトを配布します。

/var/flink/test/word.txt

3.2. 基本的な依存関係の紹介

以下は Java で記述された基本的なケースです。

<依存関係>
    <依存関係>
        <グループ ID>org.apache.flink</グループ ID>
        <artifactId>flink-java</artifactId>
        <バージョン>1.7.0</バージョン>
    </依存関係>
    <依存関係>
        <グループ ID>org.apache.flink</グループ ID>
        <artifactId>flink-ストリーミング-java_2.11</artifactId>
        <バージョン>1.7.0</バージョン>
    </依存関係>
</依存関係>

3.3. ファイルデータの読み取り

ここでは、ファイル内のデータを直接読み取り、プログラムフローを通じて各単語の出現回数を分析します。

パブリッククラス WordCount {
    パブリック静的void main(String[] args)は例外をスローします{
        // ファイルデータを読み取ります readFile();
    }

    パブリック静的void readFile()は例外をスローします{
        // 1. 実行環境を作成する ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment();

        // 2. データファイルを読み取る String filePath = "/var/flink/test/word.txt";
        DataSet<String> inputFile = environment.readTextFile(filePath);

        // 3. グループ化して合計する DataSet<Tuple2<String, Integer>> wordDataSet = inputFile.flatMap(new WordFlatMapFunction(
        )).groupBy(0).sum(1);

        // 4. 処理結果を印刷する wordDataSet.print();
    }

    //データの読み取りと切り取り方法 static class WordFlatMapFunction implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @オーバーライド
        パブリック void flatMap(String 入力、Collector<Tuple2<String, Integer>> コレクター){
            文字列[] wordArr = input.split(",");
            for (文字列 word : wordArr) {
                コレクター.collect(新しいTuple2<>(word, 1));
            }
        }
    }
} 

3.4. ポートデータの読み取り

hop01 サービスにポートを作成し、ポートへのデータ送信をシミュレートします。

[root@hop01 ~]# nc -lk 5566

C++、Java

Flink プログラムを使用して、ポートのデータ コンテンツを読み取って分析します。

パブリッククラス WordCount {
    パブリック静的void main(String[] args)は例外をスローします{
        // ポートデータを読み取る readPort();
    }

    パブリック静的void readPort()は例外をスローします{
        // 1. 実行環境を作成する StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2. ソケット データ ポートを読み取ります。DataStreamSource<String> inputStream = environment.socketTextStream("hop01", 5566);

        // 3. データの読み取りと切り取り方法 SingleOutputStreamOperator<Tuple2<String, Integer>> resultDataStream = inputStream.flatMap(
                新しい FlatMapFunction<String, Tuple2<String, Integer>>()
        {
            @オーバーライド
            パブリック void flatMap(String 入力、Collector<Tuple2<String, Integer>> コレクター) {
                文字列[] wordArr = input.split(",");
                for (文字列 word : wordArr) {
                    コレクター.collect(新しいTuple2<>(word, 1));
                }
            }
        }).keyBy(0).sum(1);

        // 4. 分析結果を印刷する resultDataStream.print();

        // 5. 環境の起動 environment.execute();
    }
}

IV. 動作メカニズム

4.1、Flinkクライアント

クライアントは、データ ストリームを準備して JobManager ノードに送信するために使用されます。その後、特定のニーズに応じて、クライアントは直接切断するか、接続状態を維持してタスク処理の結果を待つことができます。

4.2 ジョブマネージャー

Flink クラスターでは、JobManger ノードと少なくとも 1 つの TaskManager ノードが開始されます。JobManager は、クライアントから送信されたタスクを受信すると、タスクを調整して特定の TaskManager ノードに送信し、実行します。TaskManager ノードは、ハートビートと処理情報を JobManager に送信します。

4.3 タスクマネージャー

スロットは、TaskManager における最小のリソース スケジューリング単位です。スロットの数は起動時に設定されます。各スロットは、タスクを開始し、JobManager ノードによってデプロイされたタスクを受信し、特定の分析と処理を実行できます。

5. ソースコードアドレス

GitHub アドレス

https://github.com/cicadasmile/big-data-parent

GitEE アドレス

https://gitee.com/cicadasmile/big-data-parent

上記は、リアルタイムコンピューティングフレームワークFlinkクラスターの構築と動作メカニズムの詳細についての簡単な説明です。リアルタイムコンピューティングフレームワークFlinkクラスターの構築と動作メカニズムの詳細については、123WORDPRESS.COMの他の関連記事に注目してください。

以下もご興味があるかもしれません:
  • ビッグデータ処理エンジンFlinkのメモリ管理を詳しく解説
  • Apache FlinkCEP でタイムアウトステータス監視を実装するための詳細な手順
  • Flink はどのようなデータ型をサポートしていますか?
  • Flink WordCount プロセス分析を実装するための Java ラムダ式
  • ビッグデータ HelloWorld-Flink は WordCount を実装します
  • Flinkのコア原則を分析し、コア抽象化を実装する

<<:  MySQL がデータの削除と挿入に非常に時間がかかる問題の解決策

>>:  Vue3 がデータ監視を実装するためにプロキシを使用する理由の分析

推薦する

Dockerはコード検出プラットフォームSonarQubeを構築し、Mavenプロジェクトのプロセスを検出します

1 はじめに優れたコーディング習慣は優れたプログラマーが備えるべき資質ですが、コードの品質を保証する...

dockercompose を使用して springboot-mysql-nginx アプリケーションをビルドする

前回の記事では、Docker を使用して、コンパイルされた jar パッケージをイメージに組み込む ...

安全な構成のためにDockerでTLSを有効にする手順

序文以前、Docker の 2375 Remote API を有効にしていました。会社のセキュリティ...

ネイティブ JavaScript を使用して計算機のサンプル コードを開発する

計算機の主な機能は数値計算を実行することです。計算機機能の Web インスタンスを開発すると、js ...

Vue の基本的な手順の例のグラフィック説明

目次1. v-on指令1. 基本的な使い方2. 糖衣構文3. イベントパラメータ4. イベント修飾子...

ウェブ画像形式としてPNG、JPG、GIFを選択して使用する方法

では、GIF、PNG、JPG のどの形式を候補形式として選択すればよいのでしょうか。また、どの画像形...

mysqlは2つ以上のフィールドがNULLであるレコードを見つける問題を解決します

コアコード /*-------------------------------- 2つ以上のフィール...

Vueでルーティング権限を動的に設定する主なアイデア

以前、インターネット上で動的ルーティング設定をいくつか見たことがありましたが、現在のプロジェクトとは...

Vueライフサイクルの詳細な説明

目次ライフサイクルを理解する理由ライフサイクルとはライフサイクルフック関数作成され、マウントされたフ...

Vueカスタムコンポーネントはイベント修飾子を使用してピットレコードを踏む

序文今日、自作のコンポーネントを使っていたところ、突然、長い間忘れていたバブリングイベントに遭遇しま...

MySQL を暗号化および復号化するいくつかの方法 (要約)

目次前面に書かれた双方向暗号化エンコード/デコードAES_ENCRYPT/AES_DECRYPT D...

Vue でスクロールバーのスタイルを変更する方法

目次まず、スクロール バーのスタイルを変更するには、疑似要素-webkit-scrollbarを使用...

CSSでイメージマッピングを実装する方法

1. はじめにイメージマップを使用すると、画像の領域をホットスポットとして指定できます。この領域にマ...

Vue ミックスインの使い方の詳しい説明

目次Vue ミックスインの使用ミックスインでのデータアクセスミックスイン/index.jsホーム.v...

MySQL 8.0.19 では、間違ったパスワードを 3 回入力するとアカウントがロックされるようになりました (例)

MySQL 8.0.19 では、間違ったパスワードを 3 回入力するとアカウントがロックされるよう...