概要: この記事では、Avro データをシリアル化して生成し、FlinkSQL を使用して解析する方法を説明します。 Avro 公式ドキュメント、http://avro.apache.org/docs/current/index.html。 Avroの紹介Avroはデータシリアル化システムです 提供内容:
技術的背景インターネットの急速な発展に伴い、クラウドコンピューティング、ビッグデータ、人工知能AI、モノのインターネットなどの最先端技術は、現代の主流のハイテクとなっています。電子商取引サイト、顔認識、自動運転車、スマートホーム、スマートシティなどは、人々の衣食住や交通を促進するだけでなく、さまざまなシステムプラットフォームによって常に大量のデータが収集、整理、分析されています。データの低遅延、高スループット、セキュリティを確保することが特に重要です。Apache Avro自体は、バイナリ転送用のスキーマを通じてシリアル化されており、一方では高速データ転送を、他方ではデータセキュリティを確保しています。Avroは現在、さまざまな業界でますます広く使用されています。Avroデータをどのように処理および解析するかは特に重要です。この記事では、シリアル化によってAvroデータを生成し、FlinkSQLを使用して解析する方法を説明します。 この記事は、Avro 解析のデモです。現在、FlinkSQL は単純な Avro データ解析にのみ適しており、複雑にネストされた Avro データはまだサポートされていません。 シーン紹介この記事では主に以下の3つのポイントを紹介します。
前提条件
手順1. 新しいAvro Mavenプロジェクトを作成し、POM依存関係を構成する pom ファイルの内容は次のとおりです。 <?xml バージョン="1.0" エンコーディング="UTF-8"?> <プロジェクト xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <モデルバージョン>4.0.0</モデルバージョン> <グループID>com.huawei.bigdata</グループID> <artifactId>アヴロデモ</artifactId> <バージョン>1.0-SNAPSHOT</バージョン> <依存関係> <依存関係> <グループ ID>org.apache.avro</グループ ID> <artifactId>アブロ</artifactId> <バージョン>1.8.1</バージョン> </依存関係> <依存関係> <groupId>ジュニット</groupId> <artifactId>junit</artifactId> <バージョン>4.12</バージョン> </依存関係> </依存関係> <ビルド> <プラグイン> <プラグイン> <グループ ID>org.apache.avro</グループ ID> <artifactId>Avro-Maven プラグイン</artifactId> <バージョン>1.8.1</バージョン> <処刑> <実行> <phase>ソースを生成する</phase> <目標> <goal>スキーマ</goal> </目標> <構成> <ソースディレクトリ>${project.basedir}/src/main/avro/</ソースディレクトリ> <出力ディレクトリ>${project.basedir}/src/main/java/</出力ディレクトリ> </構成> </実行> </処刑> </プラグイン> <プラグイン> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <構成> <ソース>1.6</ソース> <target>1.6</target> </構成> </プラグイン> </プラグイン> </ビルド> </プロジェクト> 注: 上記の pom ファイルは、自動的に生成されるクラスのパス、つまり ${project.basedir}/src/main/avro/ と ${project.basedir}/src/main/java/ を構成します。この構成の後、mvn コマンドを実行すると、プラグインはこのディレクトリの avsc スキーマからクラス ファイルを自動的に生成し、後者のディレクトリに配置します。 avro ディレクトリが生成されない場合は、手動で作成します。 2. スキーマを定義する JSON を使用して Avro のスキーマを定義します。スキーマは、プリミティブ型 (null、boolean、int、long、float、double、bytes、string) と複合型 (record、enum、array、map、union、fixed) で構成されます。たとえば、次の例では、ユーザー スキーマを定義し、メイン ディレクトリの下に avro ディレクトリを作成し、次に avro ディレクトリの下に新しいファイル user.avsc を作成します。 {"名前空間": "lancoo.ecbdc.pre", "タイプ": "レコード", "名前": "ユーザー", 「フィールド」: [ {"name": "名前", "type": "文字列"}, {"name": "favorite_number", "type": ["int", "null"]}, {"名前": "お気に入りの色", "タイプ": ["文字列", "null"]} ] } 3. スキーマをコンパイルする Mavenプロジェクトのコンパイルをクリックしてコンパイルすると、名前空間パスとUserクラスコードが自動的に作成されます。 4. シリアル化 生成されたデータをシリアル化するためのTestUserクラスを作成する ユーザー user1 = 新しい User(); user1.setName("アリッサ"); user1.setお気に入り番号(256); // お気に入りの列または null を残す // 代替コンストラクタ ユーザー user2 = new User("Ben", 7, "red"); // ビルダー経由で構築 ユーザー user3 = User.newBuilder() .setName("チャーリー") .setFavoriteColor("青") .setお気に入り番号(null) 。建てる(); // user1、user2、user3 をディスクにシリアル化します DatumWriter<User> userDatumWriter = 新しい SpecificDatumWriter<User>(User.class); DataFileWriter<ユーザー> dataFileWriter = 新しい DataFileWriter<ユーザー>(userDatumWriter); dataFileWriter.create(user1.getSchema(), 新しいファイル("user_generic.avro")); dataFileWriter.append(user1); dataFileWriter.append(user2); dataFileWriter.append(user3); データファイルライターを閉じます。 シリアル化プログラムを実行すると、プロジェクトと同じディレクトリにAvroデータが生成されます。 user_generic.avro の内容は次のとおりです。
5. デシリアライゼーション デシリアライゼーションコードによるAvroデータの解析 // ディスクからユーザーをデシリアライズする DatumReader<User> userDatumReader = 新しい SpecificDatumReader<User>(User.class); DataFileReader<ユーザー> dataFileReader = new DataFileReader<ユーザー>(新しいファイル("user_generic.avro"), userDatumReader); ユーザー user = null; (dataFileReader.hasNext()) が実行される間 { // ユーザーオブジェクトをnext()に渡して再利用します。これにより、 // 多くのオブジェクトを割り当ててガベージコレクションするファイル // 多くのアイテム。 ユーザー = dataFileReader.next(ユーザー); System.out.println(ユーザー); } デシリアライズコードを実行してuser_generic.avroを解析します。 Avro データの解析が成功しました。 6. user_generic.avroをhdfsパスにアップロードする hdfs dfs -mkdir -p /tmp/lztest/ hdfs dfs -put user_generic.avro /tmp/lztest/ 7. flinkserverを設定する Avro jarパッケージを準備する flink-sql-avro-*.jar と flink-sql-avro-confluent-registry-*.jar を flinkserver ライブラリに配置し、すべての flinkserver ノードで次のコマンドを実行します。 cp /opt/huawei/Bigdata/FusionInsight_Flink_8.1.2/install/FusionInsight-Flink-1.12.2/flink/opt/flink-sql-avro*.jar /opt/huawei/Bigdata/FusionInsight_Flink_8.1.3/install/FusionInsight-Flink-1.12.2/flink/lib chmod 500 flink-sql-avro*.jar chown omm:wheel flink-sql-avro*.jar 同時に、FlinkServer インスタンスを再起動し、再起動後に avro パッケージがアップロードされているかどうかを確認します。
8. FlinkSQLを書く テーブルtestHdfsを作成します( 名前文字列、 お気に入り番号 int, favorite_color 文字列 ) と( 'コネクタ' = 'ファイルシステム'、 'パス' = 'hdfs:///tmp/lztest/user_generic.avro', 'フォーマット' = 'avro' );テーブル KafkaTable の作成 ( 名前文字列、 お気に入り番号 int, favorite_color 文字列 ) と ( 'コネクタ' = 'kafka'、 'トピック' = 'testavro'、 'properties.bootstrap.servers' = '96.10.2.1:21005', 'properties.group.id' = 'testGroup'、 'scan.startup.mode' = '最新オフセット'、 'フォーマット' = 'avro' ); 挿入する Kafkaテーブル 選択 * から テストHdfs; タスクを保存して送信 9. 対応するトピックにデータがあるかどうかを確認する FlinkSQL は Avro データを正常に解析しました。 Apache Avro データの解析に関するこの記事はこれで終わりです。Apache Avro データに関するより関連性の高いコンテンツについては、123WORDPRESS.COM の過去の記事を検索するか、以下の関連記事を引き続き参照してください。今後とも 123WORDPRESS.COM をよろしくお願いいたします。 以下もご興味があるかもしれません:
|
>>: ホバー画像のポップアウトポップアップ効果を実現するための純粋な CSS のサンプルコード
目次1. 基本2. ノード、ツリー、仮想DOM 1. 仮想DOM 3. createElementパ...
目次1. vuexとは何か2. インストールと導入3. vuexの使用4. プロセスの紹介5. 突然...
Web デザインを学習する過程で、html と htm の関係など、遭遇した多くの問題について深く...
序文MySQL データのインポートとエクスポートは mysqldump コマンドで解決できることは誰...
1. ユーザーとパスワードの作成方法1. MySQLデータベースに入る mysql> mysq...
テーブルフィールドを追加する テーブルtable1を変更し、トランザクタvarchar(10)をNu...
導入MySQL データベースを使用する場合、int を主キーとして使用し、自動インクリメントに設定す...
目次コンポーネント - タイムラインカスタムノードスタイルカスタムタイムスタンプコンポーネント - ...
仮想マシンをインストールするときに、「VMware ワークステーションはデバイス/資格情報ガードと互...
スタイル シートは、ドキュメントの表示方法、発音方法、または入力方法を記述します。XSL 言語は、X...
Portainer は、ステータス表示パネル、アプリケーション テンプレートの迅速な展開、コンテナ ...
MySQL 開発チームは、2019 年 10 月 14 日に MySQL 8.0.18 GA バージ...
コードをコピーコードは次のとおりです。 <ヘッド> <meta http-equi...
この記事では、参考までに、計算機を実装するためのWeChatアプレットの具体的なコードを紹介します。...
多くの場合、 Web デザインが完成した後でデザイナーの無知が露呈し、批判されることがあります。彼ら...