MLSQL スタックでストリームのデバッグを簡単にする方法

MLSQL スタックでストリームのデバッグを簡単にする方法

序文

クラスメートが MLSQL Stack のストリーミング サポートを調査しています。そこで、フローデバッグは実はかなり難しいと言いました。実践を通じて、次の3点を達成したいと考えています。

  • いつでも最新の固定数の Kafka データを表示できる機能
  • デバッグ結果(シンク)はWebコンソールに出力できます
  • ストリーミング プログラムは JSON スキーマを自動的に推測できます (Spark では現時点ではこれができません)

これら 3 つのポイントを実装した後、デバッグがはるかに簡単になったことがわかりました。

プロセス

まず、Kafka へのデータの書き込みを容易にするために kaf_write.mlsql ファイルを作成しました。

abc=''' を設定します
{ "x": 100, "y": 200, "z": 200 ,"dataType":"グループ"}
{ "x": 120, "y": 100, "z": 260 ,"dataType":"B グループ"}
{ "x": 120, "y": 100, "z": 260 ,"dataType":"B グループ"}
{ "x": 120, "y": 100, "z": 260 ,"dataType":"B グループ"}
{ "x": 120, "y": 100, "z": 260 ,"dataType":"B グループ"}
{ "x": 120, "y": 100, "z": 260 ,"dataType":"B グループ"}
{ "x": 120, "y": 100, "z": 260 ,"dataType":"B グループ"}
{ "x": 120, "y": 100, "z": 260 ,"dataType":"B グループ"}
{ "x": 120, "y": 100, "z": 260 ,"dataType":"B グループ"}
{ "x": 120, "y": 100, "z": 260 ,"dataType":"B グループ"}
{ "x": 120, "y": 100, "z": 260 ,"dataType":"B グループ"}
''';
jsonStr.`abc` を table1 として読み込みます。

table1 から to_json(struct(*)) を値として table2 として選択します。
追加したtable2をkafka.`wow`として保存します。 
kafka.bootstrap.servers="127.0.0.1:9092";

こうすることで、実行するたびにデータを Kafka に書き込むことができます。

次に、書き込みが完了したら、データが実際に書き込まれているかどうか、またそれがどのようになっているかを確認する必要があります。

!kafkaTool sampleData 「127.0.0.1:9092」からの 10 件のレコード、すごい!

この文は、Kafka から 10 個の Kafka データをサンプリングすることを意味します。Kafka のアドレスは 127.0.0.1:9092 で、トピックは wow です。実行結果は次のとおりです。

問題ありません。次に、非常にシンプルなストリーミング プログラムを作成しました。

-- ストリーム名は一意である必要があります。
streamName を「streamExample」に設定します。

-- kafkaTool を使用して kafka からスキーマを推測します
!kafkaTool registerSchema 2 件のレコードが "127.0.0.1:9092" から取得されました。すごいですね。


kafka.`wow` オプションをロードする 
kafka.bootstrap.servers="127.0.0.1:9092"
newkafkatable1 として;


newkafkatable1 から * を選択
表21のように;


-- ターミナルコンソールの代わりに Web コンソールに印刷します。
保存追加テーブル21 
webConsole として。 
オプションモード="追加"
期間="15"
および checkpointLocation="/tmp/s-cpl4";

結果は次のとおりです。

ターミナルでリアルタイムの効果を確認することもできます。

補充する

もちろん、MLSQL スタックにはストリーミングのための優れた機能が他にも 2 つあります。1 つ目は、ストリーミング イベントに HTTP プロトコル コールバックを設定し、バッチ SQL を使用してストリーミング結果を処理し、最終的にデータベースに保存できることです。次のスクリプトを参照してください。

-- ストリーム名は一意である必要があります。
streamName を「streamExample」に設定します。


-- いくつかのデータを模擬します。
データ='''を設定
{"key":"はい","値":"いいえ","topic":"テスト","パーティション":0,"オフセット":0,"タイムスタンプ":"2008-01-24 18:01:01.001","タイムスタンプタイプ":0}
{"key":"はい","値":"いいえ","topic":"テスト","パーティション":0,"オフセット":1,"タイムスタンプ":"2008-01-24 18:01:01.002","タイムスタンプタイプ":0}
{"key":"はい","値":"いいえ","topic":"テスト","パーティション":0,"オフセット":2,"タイムスタンプ":"2008-01-24 18:01:01.003","タイムスタンプタイプ":0}
{"key":"はい","値":"いいえ","topic":"テスト","パーティション":0,"オフセット":3,"タイムスタンプ":"2008-01-24 18:01:01.003","タイムスタンプタイプ":0}
{"key":"はい","値":"いいえ","topic":"テスト","パーティション":0,"オフセット":4,"タイムスタンプ":"2008-01-24 18:01:01.003","タイムスタンプタイプ":0}
{"key":"はい","値":"いいえ","topic":"テスト","パーティション":0,"オフセット":5,"タイムスタンプ":"2008-01-24 18:01:01.003","タイムスタンプタイプ":0}
''';

-- データをテーブルとしてロードする
jsonStr.`data` をデータソースとして読み込みます。

--テーブルをストリームソースとして変換する
mockStream.`datasource` オプションをロードする 
ステップサイズ範囲="0-3"
newkafkatable1 として;

-- 集約 
newkafkatable1 から、cast(value as string) を k として選択します。
表21のように;


!callback post "http://127.0.0.1:9002/api_v1/test" when "started,progress,terminated";
-- 結果をコンソールに出力します。


保存追加テーブル21 
慣習として。 
オプションモード="追加"
期間="15"
そしてsourceTable="jack"
コード='''
jack から count(*) を c として newjack として選択します。
append newjack を parquet.`/tmp/jack` として保存します。 
'''
および checkpointLocation="/tmp/cpl15";

要約する

以上がこの記事の全内容です。この記事の内容が皆様の勉強や仕事に何らかの参考学習価値をもたらすことを願います。123WORDPRESS.COM をご愛顧いただき、誠にありがとうございます。

以下もご興味があるかもしれません:
  • Mysql LONGBLOB型はバイナリデータを格納します(変更+デバッグ+ソート)
  • Mysql LONGTEXT型は大きなファイル(バイナリも可能)を保存します(変更+デバッグ+ソート)
  • Mysql 中国語の挿入と中国語のクエリ (変更 + デバッグ)
  • 初心者向け PHP デバッグ環境の設定 (IIS+PHP+MYSQL)
  • MySQL UDFデバッグモードdebugviewの関連メソッド
  • MySQL のデバッグと最適化に関する 101 のヒントを共有する
  • GDBデバッグMySQL実戦ソースコードコンパイルとインストール

<<:  Linux でファイルの作成時間を取得する方法と実践的なチュートリアル

>>:  JavaScriptのスリープ関数の使用

推薦する

Linuxコマンドunzipの詳しい説明

目次1. 解凍コマンド1.1 構文1.2 オプション2. 例1. 解凍コマンドunzip コマンドは...

MySQL における IF()、IFNULL()、NULLIF()、および ISNULL() 関数の使用に関する詳細な説明

MySQL では、IF()、IFNULL()、NULLIF()、および ISNULL() 関数を使用...

MySQLデータ移行の概要

目次序文: 1. データ移行について2. 移行計画と留意点要約:序文:日常業務では、テーブル、データ...

Linuxのファイル権限の詳細な紹介

Linux の優れた点は、マルチユーザー、マルチタスク システムにあります。 Linux では通常、...

写真のプレビューとアップロード機能を実現するhtml+css+js

はじめに: Web ページを作成するときに、画像をアップロードする必要がある場合がよくあります。画像...

MySQL 時間差関数 (TIMESTAMPDIFF、DATEDIFF)、日付変換計算関数 (date_add、day、date_format、str_to_date)

1. 時間差関数(TIMESTAMPDIFF、DATEDIFF) MySQLを使用して時間差を計算...

JS for ループで setTimeout を使用する 4 つのソリューション

目次概要解決策 1: クロージャ解決策2: 構造を分割する解決策3:解決策4: setTimeout...

VMware ESXi のインストールと使用記録(ダウンロード付き)

目次1. ESXiをインストールする2. ESXiをセットアップする3. ESXiを起動するESXi...

Mysql5.7 のルートパスワードを忘れた場合の対処法 (シンプルで効果的な方法)

前回の記事では、MySQL 5.7でルートパスワードを忘れた場合と、MySQL 5.7でルートパスワ...

Windows 上の MySQL バージョン 5.7 でエンコードを UTF-8 に変更する方法

序文MySQLの勉強を始めたばかりで、公式サイトから最新バージョン5.7.14をダウンロードしました...

MySQL ストレージエンジンの簡単な紹介

1. MySQL アーキテクチャストレージ エンジンを紹介する前に、まずは MySQL アーキテクチ...

jsは多次元配列を1次元配列に変換し、それを並べ替えます

目次まず多次元配列の平坦化についてお話しましょう方法 1: flat()方法 2: 空の文字列を連結...

MySQL 5.7.17 winx64 解凍版のインストールと設定方法のグラフィックチュートリアル

この記事では、MySQL 5.7.17 winx64解凍版のインストールと設定方法を紹介します。具体...

ReactでのsetStateの使用と同期と非同期の使用

React では、this.state を使用して状態を直接変更しても、コンポーネントは再レンダリン...

Ubuntu 16.04 mysql5.7.17 リモートポート 3306 を開く

MySQLへのリモートアクセスを有効にするデフォルトでは、MySQL ユーザーにはリモート アクセス...