MLSQL スタックでストリームのデバッグを簡単にする方法

MLSQL スタックでストリームのデバッグを簡単にする方法

序文

クラスメートが MLSQL Stack のストリーミング サポートを調査しています。そこで、フローデバッグは実はかなり難しいと言いました。実践を通じて、次の3点を達成したいと考えています。

  • いつでも最新の固定数の Kafka データを表示できる機能
  • デバッグ結果(シンク)はWebコンソールに出力できます
  • ストリーミング プログラムは JSON スキーマを自動的に推測できます (Spark では現時点ではこれができません)

これら 3 つのポイントを実装した後、デバッグがはるかに簡単になったことがわかりました。

プロセス

まず、Kafka へのデータの書き込みを容易にするために kaf_write.mlsql ファイルを作成しました。

abc=''' を設定します
{ "x": 100, "y": 200, "z": 200 ,"dataType":"グループ"}
{ "x": 120, "y": 100, "z": 260 ,"dataType":"B グループ"}
{ "x": 120, "y": 100, "z": 260 ,"dataType":"B グループ"}
{ "x": 120, "y": 100, "z": 260 ,"dataType":"B グループ"}
{ "x": 120, "y": 100, "z": 260 ,"dataType":"B グループ"}
{ "x": 120, "y": 100, "z": 260 ,"dataType":"B グループ"}
{ "x": 120, "y": 100, "z": 260 ,"dataType":"B グループ"}
{ "x": 120, "y": 100, "z": 260 ,"dataType":"B グループ"}
{ "x": 120, "y": 100, "z": 260 ,"dataType":"B グループ"}
{ "x": 120, "y": 100, "z": 260 ,"dataType":"B グループ"}
{ "x": 120, "y": 100, "z": 260 ,"dataType":"B グループ"}
''';
jsonStr.`abc` を table1 として読み込みます。

table1 から to_json(struct(*)) を値として table2 として選択します。
追加したtable2をkafka.`wow`として保存します。 
kafka.bootstrap.servers="127.0.0.1:9092";

こうすることで、実行するたびにデータを Kafka に書き込むことができます。

次に、書き込みが完了したら、データが実際に書き込まれているかどうか、またそれがどのようになっているかを確認する必要があります。

!kafkaTool sampleData 「127.0.0.1:9092」からの 10 件のレコード、すごい!

この文は、Kafka から 10 個の Kafka データをサンプリングすることを意味します。Kafka のアドレスは 127.0.0.1:9092 で、トピックは wow です。実行結果は次のとおりです。

問題ありません。次に、非常にシンプルなストリーミング プログラムを作成しました。

-- ストリーム名は一意である必要があります。
streamName を「streamExample」に設定します。

-- kafkaTool を使用して kafka からスキーマを推測します
!kafkaTool registerSchema 2 件のレコードが "127.0.0.1:9092" から取得されました。すごいですね。


kafka.`wow` オプションをロードする 
kafka.bootstrap.servers="127.0.0.1:9092"
newkafkatable1 として;


newkafkatable1 から * を選択
表21のように;


-- ターミナルコンソールの代わりに Web コンソールに印刷します。
保存追加テーブル21 
webConsole として。 
オプションモード="追加"
期間="15"
および checkpointLocation="/tmp/s-cpl4";

結果は次のとおりです。

ターミナルでリアルタイムの効果を確認することもできます。

補充する

もちろん、MLSQL スタックにはストリーミングのための優れた機能が他にも 2 つあります。1 つ目は、ストリーミング イベントに HTTP プロトコル コールバックを設定し、バッチ SQL を使用してストリーミング結果を処理し、最終的にデータベースに保存できることです。次のスクリプトを参照してください。

-- ストリーム名は一意である必要があります。
streamName を「streamExample」に設定します。


-- いくつかのデータを模擬します。
データ='''を設定
{"key":"はい","値":"いいえ","topic":"テスト","パーティション":0,"オフセット":0,"タイムスタンプ":"2008-01-24 18:01:01.001","タイムスタンプタイプ":0}
{"key":"はい","値":"いいえ","topic":"テスト","パーティション":0,"オフセット":1,"タイムスタンプ":"2008-01-24 18:01:01.002","タイムスタンプタイプ":0}
{"key":"はい","値":"いいえ","topic":"テスト","パーティション":0,"オフセット":2,"タイムスタンプ":"2008-01-24 18:01:01.003","タイムスタンプタイプ":0}
{"key":"はい","値":"いいえ","topic":"テスト","パーティション":0,"オフセット":3,"タイムスタンプ":"2008-01-24 18:01:01.003","タイムスタンプタイプ":0}
{"key":"はい","値":"いいえ","topic":"テスト","パーティション":0,"オフセット":4,"タイムスタンプ":"2008-01-24 18:01:01.003","タイムスタンプタイプ":0}
{"key":"はい","値":"いいえ","topic":"テスト","パーティション":0,"オフセット":5,"タイムスタンプ":"2008-01-24 18:01:01.003","タイムスタンプタイプ":0}
''';

-- データをテーブルとしてロードする
jsonStr.`data` をデータソースとして読み込みます。

--テーブルをストリームソースとして変換する
mockStream.`datasource` オプションをロードする 
ステップサイズ範囲="0-3"
newkafkatable1 として;

-- 集約 
newkafkatable1 から、cast(value as string) を k として選択します。
表21のように;


!callback post "http://127.0.0.1:9002/api_v1/test" when "started,progress,terminated";
-- 結果をコンソールに出力します。


保存追加テーブル21 
慣習として。 
オプションモード="追加"
期間="15"
そしてsourceTable="jack"
コード='''
jack から count(*) を c として newjack として選択します。
append newjack を parquet.`/tmp/jack` として保存します。 
'''
および checkpointLocation="/tmp/cpl15";

要約する

以上がこの記事の全内容です。この記事の内容が皆様の勉強や仕事に何らかの参考学習価値をもたらすことを願います。123WORDPRESS.COM をご愛顧いただき、誠にありがとうございます。

以下もご興味があるかもしれません:
  • Mysql LONGBLOB型はバイナリデータを格納します(変更+デバッグ+ソート)
  • Mysql LONGTEXT型は大きなファイル(バイナリも可能)を保存します(変更+デバッグ+ソート)
  • Mysql 中国語の挿入と中国語のクエリ (変更 + デバッグ)
  • 初心者向け PHP デバッグ環境の設定 (IIS+PHP+MYSQL)
  • MySQL UDFデバッグモードdebugviewの関連メソッド
  • MySQL のデバッグと最適化に関する 101 のヒントを共有する
  • GDBデバッグMySQL実戦ソースコードコンパイルとインストール

<<:  Linux でファイルの作成時間を取得する方法と実践的なチュートリアル

>>:  JavaScriptのスリープ関数の使用

推薦する

IDEA を使用して Web プロジェクトを作成し、Tomcat に公開する方法

目次ウェブ開発1. Web開発の概要Tomcatのインストールと設定Tomcatをインストールする2...

mysql5.7.19 winx64 解凍版のインストールと設定のチュートリアル

mysql 5.7.19 winx64解凍版のインストールチュートリアルを収録しました。具体的な内容...

JavaScriptのプリミティブ値とラッパーオブジェクトの詳細な紹介

目次序文文章プリミティブ型プリミティブ値ラッパーオブジェクト物体コンストラクタ通常機能(関数)プリミ...

Django がローカル MySQL データベースに接続する手順 (pycharm)

ステップ1:setting.pyでデータベースを変更する # データベースを構成する DATABAS...

Nginx の場所と proxy_pass パスの設定の問題の概要

目次1. Nginxロケーションの基本設定1.1 Nginx 設定ファイル1.2 Pythonスクリ...

よくある CSS のヒントと経験談 11 選

1. 画像の下にある数ピクセルの空白を削除するにはどうすればよいですか?コードをコピーコードは次のと...

MySQL の group by に関する簡単な説明

目次1. はじめに2. ユーザーテーブルを準備する2.1 グループ化ルール2.2 グループの使用2....

YUMを使用してdockerをインストールする方法

次の図に示すように: Centos 7.0以上であれば問題ありません。現在のシステム カーネル バー...

HTML チュートリアル: DOCTYPE の省略形

HTML コードを書くとき、最初の行は DOCTYPE にする必要がありますが、DOCTYPE は通...

ウェブページ作成に役立つコード

<br />ホームページの右側にあるスクロールバーを削除するにはどうすればよいですか? ...

詳細なアイデアを備えたシンプルな計算機の HTML 実装

コードをコピーコードは次のとおりです。 <!DOCTYPE html> <html...

mysql 5.7.18 winx64 無料インストール設定方法

1. ダウンロード2. 減圧3. パス環境変数を追加し、mysqlが配置されているbinディレクトリ...

InnoDB ロック (レコード、ギャップ、Next-Key ロック) の詳細な説明

レコード ロックは、単一のインデックス レコードをロックします。レコード ロックは常にインデックスを...

MySQLを監視するためのbinlogログ解析ツールの詳しい説明:Canal

Canal は、Java を使用して開発された Alibaba のオープンソース プロジェクトです...

Linux CentOS インストール JDK および Tomcat チュートリアル

まずJDKをダウンロードします。ここではjdk-8u181-linux-x64.tar.gzを使用し...