序文 クラスメートが MLSQL Stack のストリーミング サポートを調査しています。そこで、フローデバッグは実はかなり難しいと言いました。実践を通じて、次の3点を達成したいと考えています。
これら 3 つのポイントを実装した後、デバッグがはるかに簡単になったことがわかりました。 プロセス まず、Kafka へのデータの書き込みを容易にするために kaf_write.mlsql ファイルを作成しました。 abc=''' を設定します { "x": 100, "y": 200, "z": 200 ,"dataType":"グループ"} { "x": 120, "y": 100, "z": 260 ,"dataType":"B グループ"} { "x": 120, "y": 100, "z": 260 ,"dataType":"B グループ"} { "x": 120, "y": 100, "z": 260 ,"dataType":"B グループ"} { "x": 120, "y": 100, "z": 260 ,"dataType":"B グループ"} { "x": 120, "y": 100, "z": 260 ,"dataType":"B グループ"} { "x": 120, "y": 100, "z": 260 ,"dataType":"B グループ"} { "x": 120, "y": 100, "z": 260 ,"dataType":"B グループ"} { "x": 120, "y": 100, "z": 260 ,"dataType":"B グループ"} { "x": 120, "y": 100, "z": 260 ,"dataType":"B グループ"} { "x": 120, "y": 100, "z": 260 ,"dataType":"B グループ"} '''; jsonStr.`abc` を table1 として読み込みます。 table1 から to_json(struct(*)) を値として table2 として選択します。 追加したtable2をkafka.`wow`として保存します。 kafka.bootstrap.servers="127.0.0.1:9092"; こうすることで、実行するたびにデータを Kafka に書き込むことができます。 次に、書き込みが完了したら、データが実際に書き込まれているかどうか、またそれがどのようになっているかを確認する必要があります。
この文は、Kafka から 10 個の Kafka データをサンプリングすることを意味します。Kafka のアドレスは 127.0.0.1:9092 で、トピックは wow です。実行結果は次のとおりです。 問題ありません。次に、非常にシンプルなストリーミング プログラムを作成しました。 -- ストリーム名は一意である必要があります。 streamName を「streamExample」に設定します。 -- kafkaTool を使用して kafka からスキーマを推測します !kafkaTool registerSchema 2 件のレコードが "127.0.0.1:9092" から取得されました。すごいですね。 kafka.`wow` オプションをロードする kafka.bootstrap.servers="127.0.0.1:9092" newkafkatable1 として; newkafkatable1 から * を選択 表21のように; -- ターミナルコンソールの代わりに Web コンソールに印刷します。 保存追加テーブル21 webConsole として。 オプションモード="追加" 期間="15" および checkpointLocation="/tmp/s-cpl4"; 結果は次のとおりです。 ターミナルでリアルタイムの効果を確認することもできます。 補充する もちろん、MLSQL スタックにはストリーミングのための優れた機能が他にも 2 つあります。1 つ目は、ストリーミング イベントに HTTP プロトコル コールバックを設定し、バッチ SQL を使用してストリーミング結果を処理し、最終的にデータベースに保存できることです。次のスクリプトを参照してください。 -- ストリーム名は一意である必要があります。 streamName を「streamExample」に設定します。 -- いくつかのデータを模擬します。 データ='''を設定 {"key":"はい","値":"いいえ","topic":"テスト","パーティション":0,"オフセット":0,"タイムスタンプ":"2008-01-24 18:01:01.001","タイムスタンプタイプ":0} {"key":"はい","値":"いいえ","topic":"テスト","パーティション":0,"オフセット":1,"タイムスタンプ":"2008-01-24 18:01:01.002","タイムスタンプタイプ":0} {"key":"はい","値":"いいえ","topic":"テスト","パーティション":0,"オフセット":2,"タイムスタンプ":"2008-01-24 18:01:01.003","タイムスタンプタイプ":0} {"key":"はい","値":"いいえ","topic":"テスト","パーティション":0,"オフセット":3,"タイムスタンプ":"2008-01-24 18:01:01.003","タイムスタンプタイプ":0} {"key":"はい","値":"いいえ","topic":"テスト","パーティション":0,"オフセット":4,"タイムスタンプ":"2008-01-24 18:01:01.003","タイムスタンプタイプ":0} {"key":"はい","値":"いいえ","topic":"テスト","パーティション":0,"オフセット":5,"タイムスタンプ":"2008-01-24 18:01:01.003","タイムスタンプタイプ":0} '''; -- データをテーブルとしてロードする jsonStr.`data` をデータソースとして読み込みます。 --テーブルをストリームソースとして変換する mockStream.`datasource` オプションをロードする ステップサイズ範囲="0-3" newkafkatable1 として; -- 集約 newkafkatable1 から、cast(value as string) を k として選択します。 表21のように; !callback post "http://127.0.0.1:9002/api_v1/test" when "started,progress,terminated"; -- 結果をコンソールに出力します。 保存追加テーブル21 慣習として。 オプションモード="追加" 期間="15" そしてsourceTable="jack" コード=''' jack から count(*) を c として newjack として選択します。 append newjack を parquet.`/tmp/jack` として保存します。 ''' および checkpointLocation="/tmp/cpl15"; 要約する 以上がこの記事の全内容です。この記事の内容が皆様の勉強や仕事に何らかの参考学習価値をもたらすことを願います。123WORDPRESS.COM をご愛顧いただき、誠にありがとうございます。 以下もご興味があるかもしれません:
|
<<: Linux でファイルの作成時間を取得する方法と実践的なチュートリアル
目次ウェブ開発1. Web開発の概要Tomcatのインストールと設定Tomcatをインストールする2...
mysql 5.7.19 winx64解凍版のインストールチュートリアルを収録しました。具体的な内容...
目次序文文章プリミティブ型プリミティブ値ラッパーオブジェクト物体コンストラクタ通常機能(関数)プリミ...
ステップ1:setting.pyでデータベースを変更する # データベースを構成する DATABAS...
目次1. Nginxロケーションの基本設定1.1 Nginx 設定ファイル1.2 Pythonスクリ...
1. 画像の下にある数ピクセルの空白を削除するにはどうすればよいですか?コードをコピーコードは次のと...
目次1. はじめに2. ユーザーテーブルを準備する2.1 グループ化ルール2.2 グループの使用2....
次の図に示すように: Centos 7.0以上であれば問題ありません。現在のシステム カーネル バー...
HTML コードを書くとき、最初の行は DOCTYPE にする必要がありますが、DOCTYPE は通...
<br />ホームページの右側にあるスクロールバーを削除するにはどうすればよいですか? ...
コードをコピーコードは次のとおりです。 <!DOCTYPE html> <html...
1. ダウンロード2. 減圧3. パス環境変数を追加し、mysqlが配置されているbinディレクトリ...
レコード ロックは、単一のインデックス レコードをロックします。レコード ロックは常にインデックスを...
Canal は、Java を使用して開発された Alibaba のオープンソース プロジェクトです...
まずJDKをダウンロードします。ここではjdk-8u181-linux-x64.tar.gzを使用し...