序文 クラスメートが MLSQL Stack のストリーミング サポートを調査しています。そこで、フローデバッグは実はかなり難しいと言いました。実践を通じて、次の3点を達成したいと考えています。
これら 3 つのポイントを実装した後、デバッグがはるかに簡単になったことがわかりました。 プロセス まず、Kafka へのデータの書き込みを容易にするために kaf_write.mlsql ファイルを作成しました。 abc=''' を設定します { "x": 100, "y": 200, "z": 200 ,"dataType":"グループ"} { "x": 120, "y": 100, "z": 260 ,"dataType":"B グループ"} { "x": 120, "y": 100, "z": 260 ,"dataType":"B グループ"} { "x": 120, "y": 100, "z": 260 ,"dataType":"B グループ"} { "x": 120, "y": 100, "z": 260 ,"dataType":"B グループ"} { "x": 120, "y": 100, "z": 260 ,"dataType":"B グループ"} { "x": 120, "y": 100, "z": 260 ,"dataType":"B グループ"} { "x": 120, "y": 100, "z": 260 ,"dataType":"B グループ"} { "x": 120, "y": 100, "z": 260 ,"dataType":"B グループ"} { "x": 120, "y": 100, "z": 260 ,"dataType":"B グループ"} { "x": 120, "y": 100, "z": 260 ,"dataType":"B グループ"} '''; jsonStr.`abc` を table1 として読み込みます。 table1 から to_json(struct(*)) を値として table2 として選択します。 追加したtable2をkafka.`wow`として保存します。 kafka.bootstrap.servers="127.0.0.1:9092"; こうすることで、実行するたびにデータを Kafka に書き込むことができます。 次に、書き込みが完了したら、データが実際に書き込まれているかどうか、またそれがどのようになっているかを確認する必要があります。
この文は、Kafka から 10 個の Kafka データをサンプリングすることを意味します。Kafka のアドレスは 127.0.0.1:9092 で、トピックは wow です。実行結果は次のとおりです。 問題ありません。次に、非常にシンプルなストリーミング プログラムを作成しました。 -- ストリーム名は一意である必要があります。 streamName を「streamExample」に設定します。 -- kafkaTool を使用して kafka からスキーマを推測します !kafkaTool registerSchema 2 件のレコードが "127.0.0.1:9092" から取得されました。すごいですね。 kafka.`wow` オプションをロードする kafka.bootstrap.servers="127.0.0.1:9092" newkafkatable1 として; newkafkatable1 から * を選択 表21のように; -- ターミナルコンソールの代わりに Web コンソールに印刷します。 保存追加テーブル21 webConsole として。 オプションモード="追加" 期間="15" および checkpointLocation="/tmp/s-cpl4"; 結果は次のとおりです。 ターミナルでリアルタイムの効果を確認することもできます。 補充する もちろん、MLSQL スタックにはストリーミングのための優れた機能が他にも 2 つあります。1 つ目は、ストリーミング イベントに HTTP プロトコル コールバックを設定し、バッチ SQL を使用してストリーミング結果を処理し、最終的にデータベースに保存できることです。次のスクリプトを参照してください。 -- ストリーム名は一意である必要があります。 streamName を「streamExample」に設定します。 -- いくつかのデータを模擬します。 データ='''を設定 {"key":"はい","値":"いいえ","topic":"テスト","パーティション":0,"オフセット":0,"タイムスタンプ":"2008-01-24 18:01:01.001","タイムスタンプタイプ":0} {"key":"はい","値":"いいえ","topic":"テスト","パーティション":0,"オフセット":1,"タイムスタンプ":"2008-01-24 18:01:01.002","タイムスタンプタイプ":0} {"key":"はい","値":"いいえ","topic":"テスト","パーティション":0,"オフセット":2,"タイムスタンプ":"2008-01-24 18:01:01.003","タイムスタンプタイプ":0} {"key":"はい","値":"いいえ","topic":"テスト","パーティション":0,"オフセット":3,"タイムスタンプ":"2008-01-24 18:01:01.003","タイムスタンプタイプ":0} {"key":"はい","値":"いいえ","topic":"テスト","パーティション":0,"オフセット":4,"タイムスタンプ":"2008-01-24 18:01:01.003","タイムスタンプタイプ":0} {"key":"はい","値":"いいえ","topic":"テスト","パーティション":0,"オフセット":5,"タイムスタンプ":"2008-01-24 18:01:01.003","タイムスタンプタイプ":0} '''; -- データをテーブルとしてロードする jsonStr.`data` をデータソースとして読み込みます。 --テーブルをストリームソースとして変換する mockStream.`datasource` オプションをロードする ステップサイズ範囲="0-3" newkafkatable1 として; -- 集約 newkafkatable1 から、cast(value as string) を k として選択します。 表21のように; !callback post "http://127.0.0.1:9002/api_v1/test" when "started,progress,terminated"; -- 結果をコンソールに出力します。 保存追加テーブル21 慣習として。 オプションモード="追加" 期間="15" そしてsourceTable="jack" コード=''' jack から count(*) を c として newjack として選択します。 append newjack を parquet.`/tmp/jack` として保存します。 ''' および checkpointLocation="/tmp/cpl15"; 要約する 以上がこの記事の全内容です。この記事の内容が皆様の勉強や仕事に何らかの参考学習価値をもたらすことを願います。123WORDPRESS.COM をご愛顧いただき、誠にありがとうございます。 以下もご興味があるかもしれません:
|
<<: Linux でファイルの作成時間を取得する方法と実践的なチュートリアル
CSS3 では、transform 関数を使用して、テキストや画像の回転、拡大縮小、傾斜、移動という...
メタタグは、HTML言語のヘッド領域にある補助タグです。HTML文書のヘッダーにあるヘッドタグとタイ...
この記事の例では、ショッピングカート機能を実装するためのvuexの具体的なコードを参考までに共有して...
目次序文実装のアイデア実装コード成果を達成する序文これは、テーブルを動的に追加する例です。[追加] ...
目次序文シナリオ分析要約する序文数日前、友人がWeChatで私に連絡してきて、マシンがダウンタイムか...
よく食べて十分に休息を取るというのは簡単なことのように思えますが、実際に実行するのはそれほど簡単では...
1. コンセプト1. ホットバックアップとバックアップの違いホット バックアップは高可用性 (HA)...
システムをインストールした後、毎回いくつかのソフトウェアを再インストールする必要があります。ソフトウ...
初心者は、いくつかの HTML タグを理解することで HTML を学習できます。この入門書は、初心者...
以前、Amap API を非同期にロードする方法を紹介しました。今回は、vue-amap の使用方法...
ラムダ式ラムダ式 (クロージャとも呼ばれる) は、Java 8 のリリースを推進した最も重要な新機能...
目次データ列を分離するプレフィックスインデックスとインデックスの選択性データ列を分離するMySQL ...
序文Nginxの組み込みモジュールは、同時リクエスト数の制限とリクエストのソースの制限をサポートして...
概要Binlog2sql は、Python で開発されたオープンソースの MySQL Binlog ...
インターネット上には、正しい方法であっても、使用しても正しい結果が得られない方法が数多くあります。正...