序文 クラスメートが MLSQL Stack のストリーミング サポートを調査しています。そこで、フローデバッグは実はかなり難しいと言いました。実践を通じて、次の3点を達成したいと考えています。
これら 3 つのポイントを実装した後、デバッグがはるかに簡単になったことがわかりました。 プロセス まず、Kafka へのデータの書き込みを容易にするために kaf_write.mlsql ファイルを作成しました。 abc=''' を設定します { "x": 100, "y": 200, "z": 200 ,"dataType":"グループ"} { "x": 120, "y": 100, "z": 260 ,"dataType":"B グループ"} { "x": 120, "y": 100, "z": 260 ,"dataType":"B グループ"} { "x": 120, "y": 100, "z": 260 ,"dataType":"B グループ"} { "x": 120, "y": 100, "z": 260 ,"dataType":"B グループ"} { "x": 120, "y": 100, "z": 260 ,"dataType":"B グループ"} { "x": 120, "y": 100, "z": 260 ,"dataType":"B グループ"} { "x": 120, "y": 100, "z": 260 ,"dataType":"B グループ"} { "x": 120, "y": 100, "z": 260 ,"dataType":"B グループ"} { "x": 120, "y": 100, "z": 260 ,"dataType":"B グループ"} { "x": 120, "y": 100, "z": 260 ,"dataType":"B グループ"} '''; jsonStr.`abc` を table1 として読み込みます。 table1 から to_json(struct(*)) を値として table2 として選択します。 追加したtable2をkafka.`wow`として保存します。 kafka.bootstrap.servers="127.0.0.1:9092"; こうすることで、実行するたびにデータを Kafka に書き込むことができます。 次に、書き込みが完了したら、データが実際に書き込まれているかどうか、またそれがどのようになっているかを確認する必要があります。
この文は、Kafka から 10 個の Kafka データをサンプリングすることを意味します。Kafka のアドレスは 127.0.0.1:9092 で、トピックは wow です。実行結果は次のとおりです。 問題ありません。次に、非常にシンプルなストリーミング プログラムを作成しました。 -- ストリーム名は一意である必要があります。 streamName を「streamExample」に設定します。 -- kafkaTool を使用して kafka からスキーマを推測します !kafkaTool registerSchema 2 件のレコードが "127.0.0.1:9092" から取得されました。すごいですね。 kafka.`wow` オプションをロードする kafka.bootstrap.servers="127.0.0.1:9092" newkafkatable1 として; newkafkatable1 から * を選択 表21のように; -- ターミナルコンソールの代わりに Web コンソールに印刷します。 保存追加テーブル21 webConsole として。 オプションモード="追加" 期間="15" および checkpointLocation="/tmp/s-cpl4"; 結果は次のとおりです。 ターミナルでリアルタイムの効果を確認することもできます。 補充する もちろん、MLSQL スタックにはストリーミングのための優れた機能が他にも 2 つあります。1 つ目は、ストリーミング イベントに HTTP プロトコル コールバックを設定し、バッチ SQL を使用してストリーミング結果を処理し、最終的にデータベースに保存できることです。次のスクリプトを参照してください。 -- ストリーム名は一意である必要があります。 streamName を「streamExample」に設定します。 -- いくつかのデータを模擬します。 データ='''を設定 {"key":"はい","値":"いいえ","topic":"テスト","パーティション":0,"オフセット":0,"タイムスタンプ":"2008-01-24 18:01:01.001","タイムスタンプタイプ":0} {"key":"はい","値":"いいえ","topic":"テスト","パーティション":0,"オフセット":1,"タイムスタンプ":"2008-01-24 18:01:01.002","タイムスタンプタイプ":0} {"key":"はい","値":"いいえ","topic":"テスト","パーティション":0,"オフセット":2,"タイムスタンプ":"2008-01-24 18:01:01.003","タイムスタンプタイプ":0} {"key":"はい","値":"いいえ","topic":"テスト","パーティション":0,"オフセット":3,"タイムスタンプ":"2008-01-24 18:01:01.003","タイムスタンプタイプ":0} {"key":"はい","値":"いいえ","topic":"テスト","パーティション":0,"オフセット":4,"タイムスタンプ":"2008-01-24 18:01:01.003","タイムスタンプタイプ":0} {"key":"はい","値":"いいえ","topic":"テスト","パーティション":0,"オフセット":5,"タイムスタンプ":"2008-01-24 18:01:01.003","タイムスタンプタイプ":0} '''; -- データをテーブルとしてロードする jsonStr.`data` をデータソースとして読み込みます。 --テーブルをストリームソースとして変換する mockStream.`datasource` オプションをロードする ステップサイズ範囲="0-3" newkafkatable1 として; -- 集約 newkafkatable1 から、cast(value as string) を k として選択します。 表21のように; !callback post "http://127.0.0.1:9002/api_v1/test" when "started,progress,terminated"; -- 結果をコンソールに出力します。 保存追加テーブル21 慣習として。 オプションモード="追加" 期間="15" そしてsourceTable="jack" コード=''' jack から count(*) を c として newjack として選択します。 append newjack を parquet.`/tmp/jack` として保存します。 ''' および checkpointLocation="/tmp/cpl15"; 要約する 以上がこの記事の全内容です。この記事の内容が皆様の勉強や仕事に何らかの参考学習価値をもたらすことを願います。123WORDPRESS.COM をご愛顧いただき、誠にありがとうございます。 以下もご興味があるかもしれません:
|
<<: Linux でファイルの作成時間を取得する方法と実践的なチュートリアル
検索パフォーマンスは最速から最遅まで次のとおりです (私が聞いたところによると)。 1 番目: ti...
目次序文ローカルストレージの使用シナリオ使用上の問題解決機能性有効期限を追加データ暗号化を追加する命...
以下の目標を達成するため: Mysql データベースは、一定の間隔 (2 時間または 1 日、カスタ...
フロントエンドの担当者であれば、面接でも仕事中でも、「CSS を使用して中央揃えにする」という効果に...
この記事では、モバイルモーダルボックス効果を実現するためのJavaScriptの具体的なコードを参考...
目次基本タイプあらゆるタイプ配列タプルインタフェース関数自己推論を入力する結合タイプ(1つ以上選択)...
目次1. 現状2. JSでCADグラフィックを作成および変更する2.1 サポートされているCADエン...
前回の記事でMySQLサービスが起動しない問題が解決したと分かった後、パスワードなしでrootユーザ...
Vm 内のハイパーリンク URL は、Get 要求のパラメータとして中国語と連結する必要があります。...
今日は、CSS を使用してアニメーションの再生と一時停止を制御する非常に簡単なトリックを紹介します。...
指導トピックウェブページ適用グレード高校2年生授業時間1 クラス教科書分析焦点: 静的および動的ウェ...
重要でないflex-basisテキストオーバーフローに省略記号を追加するという小さな機能に多くの問題...
この記事では、検証コード機能を実装するためのvue+spring bootの具体的なコードを例として...
任意のテキスト エディターを開き、次のコードをコピーして、たとえば SomeFilename.htm...
この記事では、画像テキストセグメンテーションを実装するためのNodeJSの具体的なコードを参考までに...