概要: この記事では、Avro データをシリアル化して生成し、FlinkSQL を使用して解析する方法を説明します。 Avro 公式ドキュメント、http://avro.apache.org/docs/current/index.html。 Avroの紹介Avroはデータシリアル化システムです 提供内容:
技術的背景インターネットの急速な発展に伴い、クラウドコンピューティング、ビッグデータ、人工知能AI、モノのインターネットなどの最先端技術は、現代の主流のハイテクとなっています。電子商取引サイト、顔認識、自動運転車、スマートホーム、スマートシティなどは、人々の衣食住や交通を促進するだけでなく、さまざまなシステムプラットフォームによって常に大量のデータが収集、整理、分析されています。データの低遅延、高スループット、セキュリティを確保することが特に重要です。Apache Avro自体は、バイナリ転送用のスキーマを通じてシリアル化されており、一方では高速データ転送を、他方ではデータセキュリティを確保しています。Avroは現在、さまざまな業界でますます広く使用されています。Avroデータをどのように処理および解析するかは特に重要です。この記事では、シリアル化によってAvroデータを生成し、FlinkSQLを使用して解析する方法を説明します。 この記事は、Avro 解析のデモです。現在、FlinkSQL は単純な Avro データ解析にのみ適しており、複雑にネストされた Avro データはまだサポートされていません。 シーン紹介この記事では主に以下の3つのポイントを紹介します。
前提条件
手順1. 新しいAvro Mavenプロジェクトを作成し、POM依存関係を構成する pom ファイルの内容は次のとおりです。 <?xml バージョン="1.0" エンコーディング="UTF-8"?> <プロジェクト xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <モデルバージョン>4.0.0</モデルバージョン> <グループID>com.huawei.bigdata</グループID> <artifactId>アヴロデモ</artifactId> <バージョン>1.0-SNAPSHOT</バージョン> <依存関係> <依存関係> <グループ ID>org.apache.avro</グループ ID> <artifactId>アブロ</artifactId> <バージョン>1.8.1</バージョン> </依存関係> <依存関係> <groupId>ジュニット</groupId> <artifactId>junit</artifactId> <バージョン>4.12</バージョン> </依存関係> </依存関係> <ビルド> <プラグイン> <プラグイン> <グループ ID>org.apache.avro</グループ ID> <artifactId>Avro-Maven プラグイン</artifactId> <バージョン>1.8.1</バージョン> <処刑> <実行> <phase>ソースを生成する</phase> <目標> <goal>スキーマ</goal> </目標> <構成> <ソースディレクトリ>${project.basedir}/src/main/avro/</ソースディレクトリ> <出力ディレクトリ>${project.basedir}/src/main/java/</出力ディレクトリ> </構成> </実行> </処刑> </プラグイン> <プラグイン> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <構成> <ソース>1.6</ソース> <target>1.6</target> </構成> </プラグイン> </プラグイン> </ビルド> </プロジェクト> 注: 上記の pom ファイルは、自動的に生成されるクラスのパス、つまり ${project.basedir}/src/main/avro/ と ${project.basedir}/src/main/java/ を構成します。この構成の後、mvn コマンドを実行すると、プラグインはこのディレクトリの avsc スキーマからクラス ファイルを自動的に生成し、後者のディレクトリに配置します。 avro ディレクトリが生成されない場合は、手動で作成します。 2. スキーマを定義する JSON を使用して Avro のスキーマを定義します。スキーマは、プリミティブ型 (null、boolean、int、long、float、double、bytes、string) と複合型 (record、enum、array、map、union、fixed) で構成されます。たとえば、次の例では、ユーザー スキーマを定義し、メイン ディレクトリの下に avro ディレクトリを作成し、次に avro ディレクトリの下に新しいファイル user.avsc を作成します。 {"名前空間": "lancoo.ecbdc.pre", "タイプ": "レコード", "名前": "ユーザー", 「フィールド」: [ {"name": "名前", "type": "文字列"}, {"name": "favorite_number", "type": ["int", "null"]}, {"名前": "お気に入りの色", "タイプ": ["文字列", "null"]} ] } 3. スキーマをコンパイルする Mavenプロジェクトのコンパイルをクリックしてコンパイルすると、名前空間パスとUserクラスコードが自動的に作成されます。 4. シリアル化 生成されたデータをシリアル化するためのTestUserクラスを作成する ユーザー user1 = 新しい User(); user1.setName("アリッサ"); user1.setお気に入り番号(256); // お気に入りの列または null を残す // 代替コンストラクタ ユーザー user2 = new User("Ben", 7, "red"); // ビルダー経由で構築 ユーザー user3 = User.newBuilder() .setName("チャーリー") .setFavoriteColor("青") .setお気に入り番号(null) 。建てる(); // user1、user2、user3 をディスクにシリアル化します DatumWriter<User> userDatumWriter = 新しい SpecificDatumWriter<User>(User.class); DataFileWriter<ユーザー> dataFileWriter = 新しい DataFileWriter<ユーザー>(userDatumWriter); dataFileWriter.create(user1.getSchema(), 新しいファイル("user_generic.avro")); dataFileWriter.append(user1); dataFileWriter.append(user2); dataFileWriter.append(user3); データファイルライターを閉じます。 シリアル化プログラムを実行すると、プロジェクトと同じディレクトリにAvroデータが生成されます。 user_generic.avro の内容は次のとおりです。
5. デシリアライゼーション デシリアライゼーションコードによるAvroデータの解析 // ディスクからユーザーをデシリアライズする DatumReader<User> userDatumReader = 新しい SpecificDatumReader<User>(User.class); DataFileReader<ユーザー> dataFileReader = new DataFileReader<ユーザー>(新しいファイル("user_generic.avro"), userDatumReader); ユーザー user = null; (dataFileReader.hasNext()) が実行される間 { // ユーザーオブジェクトをnext()に渡して再利用します。これにより、 // 多くのオブジェクトを割り当ててガベージコレクションするファイル // 多くのアイテム。 ユーザー = dataFileReader.next(ユーザー); System.out.println(ユーザー); } デシリアライズコードを実行してuser_generic.avroを解析します。 Avro データの解析が成功しました。 6. user_generic.avroをhdfsパスにアップロードする hdfs dfs -mkdir -p /tmp/lztest/ hdfs dfs -put user_generic.avro /tmp/lztest/ 7. flinkserverを設定する Avro jarパッケージを準備する flink-sql-avro-*.jar と flink-sql-avro-confluent-registry-*.jar を flinkserver ライブラリに配置し、すべての flinkserver ノードで次のコマンドを実行します。 cp /opt/huawei/Bigdata/FusionInsight_Flink_8.1.2/install/FusionInsight-Flink-1.12.2/flink/opt/flink-sql-avro*.jar /opt/huawei/Bigdata/FusionInsight_Flink_8.1.3/install/FusionInsight-Flink-1.12.2/flink/lib chmod 500 flink-sql-avro*.jar chown omm:wheel flink-sql-avro*.jar 同時に、FlinkServer インスタンスを再起動し、再起動後に avro パッケージがアップロードされているかどうかを確認します。
8. FlinkSQLを書く テーブルtestHdfsを作成します( 名前文字列、 お気に入り番号 int, favorite_color 文字列 ) と( 'コネクタ' = 'ファイルシステム'、 'パス' = 'hdfs:///tmp/lztest/user_generic.avro', 'フォーマット' = 'avro' );テーブル KafkaTable の作成 ( 名前文字列、 お気に入り番号 int, favorite_color 文字列 ) と ( 'コネクタ' = 'kafka'、 'トピック' = 'testavro'、 'properties.bootstrap.servers' = '96.10.2.1:21005', 'properties.group.id' = 'testGroup'、 'scan.startup.mode' = '最新オフセット'、 'フォーマット' = 'avro' ); 挿入する Kafkaテーブル 選択 * から テストHdfs; タスクを保存して送信 9. 対応するトピックにデータがあるかどうかを確認する FlinkSQL は Avro データを正常に解析しました。 Apache Avro データの解析に関するこの記事はこれで終わりです。Apache Avro データに関するより関連性の高いコンテンツについては、123WORDPRESS.COM の過去の記事を検索するか、以下の関連記事を引き続き参照してください。今後とも 123WORDPRESS.COM をよろしくお願いいたします。 以下もご興味があるかもしれません:
|
>>: ホバー画像のポップアウトポップアップ効果を実現するための純粋な CSS のサンプルコード
商品を検索するときに、すべてのブランドまたは一部のブランドを表示するTaobaoの機能を真似してみま...
1. <a>タグを使用して完了します <a href="/user/te...
HTML でフォームの送信を無効にする方法は 2 つあります。 1. コントロールタグにreadon...
イメージが正常にビルドされると、Docker 環境があれば使用できますが、イメージを Docker ...
目次1. 変数意味のある名前を使う不必要なコンテキストを追加しないようにするハードコードされた値を避...
序文最近この問題に遭遇するまで、私は UTF-8 が文字セットの問題に対する普遍的な解決策だと考えて...
目次私たちが毎日実行している Linux システムとは何でしょうか? LinuxカーネルとGNUシス...
この記事では、Jingdongの詳細ページの画像の拡大を実現するためのjsの具体的なコードを紹介しま...
デフォルトでは、MyISAM テーブルはディスク上に .frm (テーブル構造ファイル)、.MYD ...
1. セマンティゼーションとは何ですか? Bing辞書の説明セマンティクス化とは、適切な HTML ...
最近、Djangoを学習しているときにデータベースを使用する必要があったため、MySQLで使用するた...
Linux でパーティションのファイル システム タイプを確認する方法。パーティションのファイル シ...
ウェブサイトの互換性のデバッグは本当に面倒です。今日のウェブサイト デザイナーは、以前よりもはるかに...
目次1. JavaScriptはシングルスレッドです1. 同期タスク2. 非同期タスク2. タスクキ...
1. ファイルを現在のディレクトリに解凍しますコマンド: tar -zxvf mysql....ta...