MLSQL スタックでストリームのデバッグを簡単にする方法

MLSQL スタックでストリームのデバッグを簡単にする方法

序文

クラスメートが MLSQL Stack のストリーミング サポートを調査しています。そこで、フローデバッグは実はかなり難しいと言いました。実践を通じて、次の3点を達成したいと考えています。

  • いつでも最新の固定数の Kafka データを表示できる機能
  • デバッグ結果(シンク)はWebコンソールに出力できます
  • ストリーミング プログラムは JSON スキーマを自動的に推測できます (Spark では現時点ではこれができません)

これら 3 つのポイントを実装した後、デバッグがはるかに簡単になったことがわかりました。

プロセス

まず、Kafka へのデータの書き込みを容易にするために kaf_write.mlsql ファイルを作成しました。

abc=''' を設定します
{ "x": 100, "y": 200, "z": 200 ,"dataType":"グループ"}
{ "x": 120, "y": 100, "z": 260 ,"dataType":"B グループ"}
{ "x": 120, "y": 100, "z": 260 ,"dataType":"B グループ"}
{ "x": 120, "y": 100, "z": 260 ,"dataType":"B グループ"}
{ "x": 120, "y": 100, "z": 260 ,"dataType":"B グループ"}
{ "x": 120, "y": 100, "z": 260 ,"dataType":"B グループ"}
{ "x": 120, "y": 100, "z": 260 ,"dataType":"B グループ"}
{ "x": 120, "y": 100, "z": 260 ,"dataType":"B グループ"}
{ "x": 120, "y": 100, "z": 260 ,"dataType":"B グループ"}
{ "x": 120, "y": 100, "z": 260 ,"dataType":"B グループ"}
{ "x": 120, "y": 100, "z": 260 ,"dataType":"B グループ"}
''';
jsonStr.`abc` を table1 として読み込みます。

table1 から to_json(struct(*)) を値として table2 として選択します。
追加したtable2をkafka.`wow`として保存します。 
kafka.bootstrap.servers="127.0.0.1:9092";

こうすることで、実行するたびにデータを Kafka に書き込むことができます。

次に、書き込みが完了したら、データが実際に書き込まれているかどうか、またそれがどのようになっているかを確認する必要があります。

!kafkaTool sampleData 「127.0.0.1:9092」からの 10 件のレコード、すごい!

この文は、Kafka から 10 個の Kafka データをサンプリングすることを意味します。Kafka のアドレスは 127.0.0.1:9092 で、トピックは wow です。実行結果は次のとおりです。

問題ありません。次に、非常にシンプルなストリーミング プログラムを作成しました。

-- ストリーム名は一意である必要があります。
streamName を「streamExample」に設定します。

-- kafkaTool を使用して kafka からスキーマを推測します
!kafkaTool registerSchema 2 件のレコードが "127.0.0.1:9092" から取得されました。すごいですね。


kafka.`wow` オプションをロードする 
kafka.bootstrap.servers="127.0.0.1:9092"
newkafkatable1 として;


newkafkatable1 から * を選択
表21のように;


-- ターミナルコンソールの代わりに Web コンソールに印刷します。
保存追加テーブル21 
webConsole として。 
オプションモード="追加"
期間="15"
および checkpointLocation="/tmp/s-cpl4";

結果は次のとおりです。

ターミナルでリアルタイムの効果を確認することもできます。

補充する

もちろん、MLSQL スタックにはストリーミングのための優れた機能が他にも 2 つあります。1 つ目は、ストリーミング イベントに HTTP プロトコル コールバックを設定し、バッチ SQL を使用してストリーミング結果を処理し、最終的にデータベースに保存できることです。次のスクリプトを参照してください。

-- ストリーム名は一意である必要があります。
streamName を「streamExample」に設定します。


-- いくつかのデータを模擬します。
データ='''を設定
{"key":"はい","値":"いいえ","topic":"テスト","パーティション":0,"オフセット":0,"タイムスタンプ":"2008-01-24 18:01:01.001","タイムスタンプタイプ":0}
{"key":"はい","値":"いいえ","topic":"テスト","パーティション":0,"オフセット":1,"タイムスタンプ":"2008-01-24 18:01:01.002","タイムスタンプタイプ":0}
{"key":"はい","値":"いいえ","topic":"テスト","パーティション":0,"オフセット":2,"タイムスタンプ":"2008-01-24 18:01:01.003","タイムスタンプタイプ":0}
{"key":"はい","値":"いいえ","topic":"テスト","パーティション":0,"オフセット":3,"タイムスタンプ":"2008-01-24 18:01:01.003","タイムスタンプタイプ":0}
{"key":"はい","値":"いいえ","topic":"テスト","パーティション":0,"オフセット":4,"タイムスタンプ":"2008-01-24 18:01:01.003","タイムスタンプタイプ":0}
{"key":"はい","値":"いいえ","topic":"テスト","パーティション":0,"オフセット":5,"タイムスタンプ":"2008-01-24 18:01:01.003","タイムスタンプタイプ":0}
''';

-- データをテーブルとしてロードする
jsonStr.`data` をデータソースとして読み込みます。

--テーブルをストリームソースとして変換する
mockStream.`datasource` オプションをロードする 
ステップサイズ範囲="0-3"
newkafkatable1 として;

-- 集約 
newkafkatable1 から、cast(value as string) を k として選択します。
表21のように;


!callback post "http://127.0.0.1:9002/api_v1/test" when "started,progress,terminated";
-- 結果をコンソールに出力します。


保存追加テーブル21 
慣習として。 
オプションモード="追加"
期間="15"
そしてsourceTable="jack"
コード='''
jack から count(*) を c として newjack として選択します。
append newjack を parquet.`/tmp/jack` として保存します。 
'''
および checkpointLocation="/tmp/cpl15";

要約する

以上がこの記事の全内容です。この記事の内容が皆様の勉強や仕事に何らかの参考学習価値をもたらすことを願います。123WORDPRESS.COM をご愛顧いただき、誠にありがとうございます。

以下もご興味があるかもしれません:
  • Mysql LONGBLOB型はバイナリデータを格納します(変更+デバッグ+ソート)
  • Mysql LONGTEXT型は大きなファイル(バイナリも可能)を保存します(変更+デバッグ+ソート)
  • Mysql 中国語の挿入と中国語のクエリ (変更 + デバッグ)
  • 初心者向け PHP デバッグ環境の設定 (IIS+PHP+MYSQL)
  • MySQL UDFデバッグモードdebugviewの関連メソッド
  • MySQL のデバッグと最適化に関する 101 のヒントを共有する
  • GDBデバッグMySQL実戦ソースコードコンパイルとインストール

<<:  Linux でファイルの作成時間を取得する方法と実践的なチュートリアル

>>:  JavaScriptのスリープ関数の使用

推薦する

MySQLストレージフィールドタイプのクエリ効率についての簡単な理解

検索パフォーマンスは最速から最遅まで次のとおりです (私が聞いたところによると)。 1 番目: ti...

Typescriptを使用してローカルストレージをカプセル化する方法

目次序文ローカルストレージの使用シナリオ使用上の問題解決機能性有効期限を追加データ暗号化を追加する命...

Mysql はテーブル内の古いデータを定期的にクリアし、いくつかのデータを保持します (推奨)

以下の目標を達成するため: Mysql データベースは、一定の間隔 (2 時間または 1 日、カスタ...

CSS3は水平方向の中央揃え、垂直方向の中央揃え、水平方向と垂直方向の中央揃えのサンプルコードを実装しています。

フロントエンドの担当者であれば、面接でも仕事中でも、「CSS を使用して中央揃えにする」という効果に...

JavaScript でモバイル モーダル ボックスの効果を実現

この記事では、モバイルモーダルボックス効果を実現するためのJavaScriptの具体的なコードを参考...

1つの記事でTypeScriptのデータ型について学ぶ

目次基本タイプあらゆるタイプ配列タプルインタフェース関数自己推論を入力する結合タイプ(1つ以上選択)...

フロントエンドはJavaScriptを通じてCADグラフィックスの詳細を作成および変更します。

目次1. 現状2. JSでCADグラフィックを作成および変更する2.1 サポートされているCADエン...

MySQL 5.7 のルートパスワードログイン問題の解決策

前回の記事でMySQLサービスが起動しない問題が解決したと分かった後、パスワードなしでrootユーザ...

HTML ハイパーリンク内の中国語文字化けの分析と解決

Vm 内のハイパーリンク URL は、Get 要求のパラメータとして中国語と連結する必要があります。...

アニメーションの再生と一時停止を制御するための CSS のヒント (非常に実用的)

今日は、CSS を使用してアニメーションの再生と一時停止を制御する非常に簡単なトリックを紹介します。...

ウェブレッスンプラン、初心者向けレッスンプラン

指導トピックウェブページ適用グレード高校2年生授業時間1 クラス教科書分析焦点: 静的および動的ウェ...

CSS フレックスベースのテキストオーバーフロー問題の解決方法

重要でないflex-basisテキストオーバーフローに省略記号を追加するという小さな機能に多くの問題...

Vue+Spring Bootで検証コード機能を実現

この記事では、検証コード機能を実装するためのvue+spring bootの具体的なコードを例として...

IE をフリーズさせる HTML コード

任意のテキスト エディターを開き、次のコードをコピーして、たとえば SomeFilename.htm...

NodeJS は画像テキスト分割を実現します

この記事では、画像テキストセグメンテーションを実装するためのNodeJSの具体的なコードを参考までに...