序文 クラスメートが MLSQL Stack のストリーミング サポートを調査しています。そこで、フローデバッグは実はかなり難しいと言いました。実践を通じて、次の3点を達成したいと考えています。
これら 3 つのポイントを実装した後、デバッグがはるかに簡単になったことがわかりました。 プロセス まず、Kafka へのデータの書き込みを容易にするために kaf_write.mlsql ファイルを作成しました。 abc=''' を設定します { "x": 100, "y": 200, "z": 200 ,"dataType":"グループ"} { "x": 120, "y": 100, "z": 260 ,"dataType":"B グループ"} { "x": 120, "y": 100, "z": 260 ,"dataType":"B グループ"} { "x": 120, "y": 100, "z": 260 ,"dataType":"B グループ"} { "x": 120, "y": 100, "z": 260 ,"dataType":"B グループ"} { "x": 120, "y": 100, "z": 260 ,"dataType":"B グループ"} { "x": 120, "y": 100, "z": 260 ,"dataType":"B グループ"} { "x": 120, "y": 100, "z": 260 ,"dataType":"B グループ"} { "x": 120, "y": 100, "z": 260 ,"dataType":"B グループ"} { "x": 120, "y": 100, "z": 260 ,"dataType":"B グループ"} { "x": 120, "y": 100, "z": 260 ,"dataType":"B グループ"} '''; jsonStr.`abc` を table1 として読み込みます。 table1 から to_json(struct(*)) を値として table2 として選択します。 追加したtable2をkafka.`wow`として保存します。 kafka.bootstrap.servers="127.0.0.1:9092"; こうすることで、実行するたびにデータを Kafka に書き込むことができます。 次に、書き込みが完了したら、データが実際に書き込まれているかどうか、またそれがどのようになっているかを確認する必要があります。
この文は、Kafka から 10 個の Kafka データをサンプリングすることを意味します。Kafka のアドレスは 127.0.0.1:9092 で、トピックは wow です。実行結果は次のとおりです。 問題ありません。次に、非常にシンプルなストリーミング プログラムを作成しました。 -- ストリーム名は一意である必要があります。 streamName を「streamExample」に設定します。 -- kafkaTool を使用して kafka からスキーマを推測します !kafkaTool registerSchema 2 件のレコードが "127.0.0.1:9092" から取得されました。すごいですね。 kafka.`wow` オプションをロードする kafka.bootstrap.servers="127.0.0.1:9092" newkafkatable1 として; newkafkatable1 から * を選択 表21のように; -- ターミナルコンソールの代わりに Web コンソールに印刷します。 保存追加テーブル21 webConsole として。 オプションモード="追加" 期間="15" および checkpointLocation="/tmp/s-cpl4"; 結果は次のとおりです。 ターミナルでリアルタイムの効果を確認することもできます。 補充する もちろん、MLSQL スタックにはストリーミングのための優れた機能が他にも 2 つあります。1 つ目は、ストリーミング イベントに HTTP プロトコル コールバックを設定し、バッチ SQL を使用してストリーミング結果を処理し、最終的にデータベースに保存できることです。次のスクリプトを参照してください。 -- ストリーム名は一意である必要があります。 streamName を「streamExample」に設定します。 -- いくつかのデータを模擬します。 データ='''を設定 {"key":"はい","値":"いいえ","topic":"テスト","パーティション":0,"オフセット":0,"タイムスタンプ":"2008-01-24 18:01:01.001","タイムスタンプタイプ":0} {"key":"はい","値":"いいえ","topic":"テスト","パーティション":0,"オフセット":1,"タイムスタンプ":"2008-01-24 18:01:01.002","タイムスタンプタイプ":0} {"key":"はい","値":"いいえ","topic":"テスト","パーティション":0,"オフセット":2,"タイムスタンプ":"2008-01-24 18:01:01.003","タイムスタンプタイプ":0} {"key":"はい","値":"いいえ","topic":"テスト","パーティション":0,"オフセット":3,"タイムスタンプ":"2008-01-24 18:01:01.003","タイムスタンプタイプ":0} {"key":"はい","値":"いいえ","topic":"テスト","パーティション":0,"オフセット":4,"タイムスタンプ":"2008-01-24 18:01:01.003","タイムスタンプタイプ":0} {"key":"はい","値":"いいえ","topic":"テスト","パーティション":0,"オフセット":5,"タイムスタンプ":"2008-01-24 18:01:01.003","タイムスタンプタイプ":0} '''; -- データをテーブルとしてロードする jsonStr.`data` をデータソースとして読み込みます。 --テーブルをストリームソースとして変換する mockStream.`datasource` オプションをロードする ステップサイズ範囲="0-3" newkafkatable1 として; -- 集約 newkafkatable1 から、cast(value as string) を k として選択します。 表21のように; !callback post "http://127.0.0.1:9002/api_v1/test" when "started,progress,terminated"; -- 結果をコンソールに出力します。 保存追加テーブル21 慣習として。 オプションモード="追加" 期間="15" そしてsourceTable="jack" コード=''' jack から count(*) を c として newjack として選択します。 append newjack を parquet.`/tmp/jack` として保存します。 ''' および checkpointLocation="/tmp/cpl15"; 要約する 以上がこの記事の全内容です。この記事の内容が皆様の勉強や仕事に何らかの参考学習価値をもたらすことを願います。123WORDPRESS.COM をご愛顧いただき、誠にありがとうございます。 以下もご興味があるかもしれません:
|
<<: Linux でファイルの作成時間を取得する方法と実践的なチュートリアル
目次1. 解凍コマンド1.1 構文1.2 オプション2. 例1. 解凍コマンドunzip コマンドは...
MySQL では、IF()、IFNULL()、NULLIF()、および ISNULL() 関数を使用...
目次序文: 1. データ移行について2. 移行計画と留意点要約:序文:日常業務では、テーブル、データ...
Linux の優れた点は、マルチユーザー、マルチタスク システムにあります。 Linux では通常、...
はじめに: Web ページを作成するときに、画像をアップロードする必要がある場合がよくあります。画像...
1. 時間差関数(TIMESTAMPDIFF、DATEDIFF) MySQLを使用して時間差を計算...
目次概要解決策 1: クロージャ解決策2: 構造を分割する解決策3:解決策4: setTimeout...
目次1. ESXiをインストールする2. ESXiをセットアップする3. ESXiを起動するESXi...
前回の記事では、MySQL 5.7でルートパスワードを忘れた場合と、MySQL 5.7でルートパスワ...
序文MySQLの勉強を始めたばかりで、公式サイトから最新バージョン5.7.14をダウンロードしました...
1. MySQL アーキテクチャストレージ エンジンを紹介する前に、まずは MySQL アーキテクチ...
目次まず多次元配列の平坦化についてお話しましょう方法 1: flat()方法 2: 空の文字列を連結...
この記事では、MySQL 5.7.17 winx64解凍版のインストールと設定方法を紹介します。具体...
React では、this.state を使用して状態を直接変更しても、コンポーネントは再レンダリン...
MySQLへのリモートアクセスを有効にするデフォルトでは、MySQL ユーザーにはリモート アクセス...