1 つの記事で Apache Avro データを解析する

1 つの記事で Apache Avro データを解析する

概要: この記事では、Avro データをシリアル化して生成し、FlinkSQL を使用して解析する方法を説明します。

Avro 公式ドキュメント、http://avro.apache.org/docs/current/index.html。

Avroの紹介

Avroはデータシリアル化システムです

提供内容:

  • 豊富なデータ構造
  • コンパクトで高速なバイナリデータ形式
  • 永続的なデータを保存するためのファイル形式
  • リモート プロシージャ コール (RPC) システム
  • 動的言語とのシンプルな対話。データ ファイルの読み取りと書き込み用のコードを生成する必要はなく、RPC プロトコルを使用したり実装したりする必要もありません。コード生成は最適化の一形態ですが、静的言語の場合にのみ意味があります。

技術的背景

インターネットの急速な発展に伴い、クラウドコンピューティング、ビッグデータ、人工知能AI、モノのインターネットなどの最先端技術は、現代の主流のハイテクとなっています。電子商取引サイト、顔認識、自動運転車、スマートホーム、スマートシティなどは、人々の衣食住や交通を促進するだけでなく、さまざまなシステムプラットフォームによって常に大量のデータが収集、整理、分析されています。データの低遅延、高スループット、セキュリティを確保することが特に重要です。Apache Avro自体は、バイナリ転送用のスキーマを通じてシリアル化されており、一方では高速データ転送を、他方ではデータセキュリティを確保しています。Avroは現在、さまざまな業界でますます広く使用されています。Avroデータをどのように処理および解析するかは特に重要です。この記事では、シリアル化によってAvroデータを生成し、FlinkSQLを使用して解析する方法を説明します。

この記事は、Avro 解析のデモです。現在、FlinkSQL は単純な Avro データ解析にのみ適しており、複雑にネストされた Avro データはまだサポートされていません。

シーン紹介

この記事では主に以下の3つのポイントを紹介します。

  • Avro データをシリアル化して生成する方法
  • Avro データをデシリアライズして解析する方法
  • FlinkSQL を使用して Avro データを解析する方法

前提条件

  • Avroの詳細については、Apache Avro公式サイトのクイックスタートガイドを参照してください。
  • Avro アプリケーションのシナリオを理解する

手順

1. 新しいAvro Mavenプロジェクトを作成し、POM依存関係を構成する

pom ファイルの内容は次のとおりです。

<?xml バージョン="1.0" エンコーディング="UTF-8"?>
<プロジェクト xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <モデルバージョン>4.0.0</モデルバージョン>

    <グループID>com.huawei.bigdata</グループID>
    <artifactId>アヴロデモ</artifactId>
    <バージョン>1.0-SNAPSHOT</バージョン>
    <依存関係>
        <依存関係>
            <グループ ID>org.apache.avro</グループ ID>
            <artifactId>アブロ</artifactId>
            <バージョン>1.8.1</バージョン>
        </依存関係>
        <依存関係>
            <groupId>ジュニット</groupId>
            <artifactId>junit</artifactId>
            <バージョン>4.12</バージョン>
        </依存関係>
    </依存関係>

    <ビルド>
        <プラグイン>
            <プラグイン>
                <グループ ID>org.apache.avro</グループ ID>
                <artifactId>Avro-Maven プラグイン</artifactId>
                <バージョン>1.8.1</バージョン>
                <処刑>
                    <実行>
                        <phase>ソースを生成する</phase>
                        <目標>
                            <goal>スキーマ</goal>
                        </目標>
                        <構成>
                            <ソースディレクトリ>${project.basedir}/src/main/avro/</ソースディレクトリ>
                            <出力ディレクトリ>${project.basedir}/src/main/java/</出力ディレクトリ>
                        </構成>
                    </実行>
                </処刑>
            </プラグイン>
            <プラグイン>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <構成>
                    <ソース>1.6</ソース>
                    <target>1.6</target>
                </構成>
            </プラグイン>
        </プラグイン>
    </ビルド>

</プロジェクト>

注: 上記の pom ファイルは、自動的に生成されるクラスのパス、つまり ${project.basedir}/src/main/avro/ と ${project.basedir}/src/main/java/ を構成します。この構成の後、mvn コマンドを実行すると、プラグインはこのディレクトリの avsc スキーマからクラス ファイルを自動的に生成し、後者のディレクトリに配置します。 avro ディレクトリが生成されない場合は、手動で作成します。

2. スキーマを定義する

JSON を使用して Avro のスキーマを定義します。スキーマは、プリミティブ型 (null、boolean、int、long、float、double、bytes、string) と複合型 (record、enum、array、map、union、fixed) で構成されます。たとえば、次の例では、ユーザー スキーマを定義し、メイン ディレクトリの下に avro ディレクトリを作成し、次に avro ディレクトリの下に新しいファイル user.avsc を作成します。

{"名前空間": "lancoo.ecbdc.pre",
 "タイプ": "レコード",
 "名前": "ユーザー",
 「フィールド」: [
     {"name": "名前", "type": "文字列"},
     {"name": "favorite_number", "type": ["int", "null"]},
     {"名前": "お気に入りの色", "タイプ": ["文字列", "null"]}
 ]
} 

3. スキーマをコンパイルする

Mavenプロジェクトのコンパイルをクリックしてコンパイルすると、名前空間パスとUserクラスコードが自動的に作成されます。

4. シリアル化

生成されたデータをシリアル化するためのTestUserクラスを作成する

ユーザー user1 = 新しい User();
user1.setName("アリッサ");
user1.setお気に入り番号(256);
// お気に入りの列または null を残す

// 代替コンストラクタ
ユーザー user2 = new User("Ben", 7, "red");

// ビルダー経由で構築
ユーザー user3 = User.newBuilder()
        .setName("チャーリー")
        .setFavoriteColor("青")
        .setお気に入り番号(null)
        。建てる();

// user1、user2、user3 をディスクにシリアル化します
DatumWriter<User> userDatumWriter = 新しい SpecificDatumWriter<User>(User.class);
DataFileWriter<ユーザー> dataFileWriter = 新しい DataFileWriter<ユーザー>(userDatumWriter);
dataFileWriter.create(user1.getSchema(), 新しいファイル("user_generic.avro"));
dataFileWriter.append(user1);
dataFileWriter.append(user2);
dataFileWriter.append(user3);
データファイルライターを閉じます。

シリアル化プログラムを実行すると、プロジェクトと同じディレクトリにAvroデータが生成されます。

user_generic.avro の内容は次のとおりです。

Objavro.schema�{"type":"record","name":"User","namespace":"lancoo.ecbdc.pre","fields":[{"name":"name","type":"string"},{"name":"favorite_number","type":["int","null"]},{"name":"favorite_color","type":["string","null"]}]}

5. デシリアライゼーション

デシリアライゼーションコードによるAvroデータの解析

// ディスクからユーザーをデシリアライズする
DatumReader<User> userDatumReader = 新しい SpecificDatumReader<User>(User.class);
DataFileReader<ユーザー> dataFileReader = new DataFileReader<ユーザー>(新しいファイル("user_generic.avro"), userDatumReader);
ユーザー user = null;
(dataFileReader.hasNext()) が実行される間 {
    // ユーザーオブジェクトをnext()に渡して再利用します。これにより、
    // 多くのオブジェクトを割り当ててガベージコレクションするファイル
    // 多くのアイテム。
    ユーザー = dataFileReader.next(ユーザー);
    System.out.println(ユーザー);
}

デシリアライズコードを実行してuser_generic.avroを解析します。

Avro データの解析が成功しました。

6. user_generic.avroをhdfsパスにアップロードする

hdfs dfs -mkdir -p /tmp/lztest/
hdfs dfs -put user_generic.avro /tmp/lztest/ 

7. flinkserverを設定する

Avro jarパッケージを準備する

flink-sql-avro-*.jar と flink-sql-avro-confluent-registry-*.jar を flinkserver ライブラリに配置し、すべての flinkserver ノードで次のコマンドを実行します。

cp /opt/huawei/Bigdata/FusionInsight_Flink_8.1.2/install/FusionInsight-Flink-1.12.2/flink/opt/flink-sql-avro*.jar /opt/huawei/Bigdata/FusionInsight_Flink_8.1.3/install/FusionInsight-Flink-1.12.2/flink/lib
chmod 500 flink-sql-avro*.jar
chown omm:wheel flink-sql-avro*.jar 

同時に、FlinkServer インスタンスを再起動し、再起動後に avro パッケージがアップロードされているかどうかを確認します。

hdfs dfs -ls /FusionInsight_FlinkServer/8.1.2-312005/lib

8. FlinkSQLを書く

テーブルtestHdfsを作成します(
  名前文字列、
  お気に入り番号 int,
  favorite_color 文字列
) と(
  'コネクタ' = 'ファイルシステム'、
  'パス' = 'hdfs:///tmp/lztest/user_generic.avro',
  'フォーマット' = 'avro'
);テーブル KafkaTable の作成 (
  名前文字列、
  お気に入り番号 int,
  favorite_color 文字列
) と (
  'コネクタ' = 'kafka'、
  'トピック' = 'testavro'、
  'properties.bootstrap.servers' = '96.10.2.1:21005',
  'properties.group.id' = 'testGroup'、
  'scan.startup.mode' = '最新オフセット'、
  'フォーマット' = 'avro'
);
挿入する
  Kafkaテーブル
選択
  *
から
  テストHdfs; 

タスクを保存して送信

9. 対応するトピックにデータがあるかどうかを確認する

FlinkSQL は Avro データを正常に解析しました。

Apache Avro データの解析に関するこの記事はこれで終わりです。Apache Avro データに関するより関連性の高いコンテンツについては、123WORDPRESS.COM の過去の記事を検索するか、以下の関連記事を引き続き参照してください。今後とも 123WORDPRESS.COM をよろしくお願いいたします。

以下もご興味があるかもしれません:
  • Apache Superset を使用して ClickHouse データを視覚化する 2 つの方法

<<:  SQL重複排除方法の概要

>>:  ホバー画像のポップアウトポップアップ効果を実現するための純粋な CSS のサンプルコード

推薦する

レンダリング関数と JSX の詳細

目次1. 基本2. ノード、ツリー、仮想DOM 1. 仮想DOM 3. createElementパ...

Vue の基本入門: Vuex のインストールと使用

目次1. vuexとは何か2. インストールと導入3. vuexの使用4. プロセスの紹介5. 突然...

HTM と HTML の違いは何ですか? HTM と HTML の違いは何ですか?

Web デザインを学習する過程で、html と htm の関係など、遭遇した多くの問題について深く...

DockerコンテナでのMySQLデータのインポート/エクスポートの詳細な説明

序文MySQL データのインポートとエクスポートは mysqldump コマンドで解決できることは誰...

MySQLでユーザーを作成し、権限を管理する方法

1. ユーザーとパスワードの作成方法1. MySQLデータベースに入る mysql> mysq...

主キーを追加または変更するMySQL SQL文操作

テーブルフィールドを追加する テーブルtable1を変更し、トランザクタvarchar(10)をNu...

MySQLのint主キーの自己増分の問題を解決する

導入MySQL データベースを使用する場合、int を主キーとして使用し、自動インクリメントに設定す...

要素タイムラインの実装

目次コンポーネント - タイムラインカスタムノードスタイルカスタムタイムスタンプコンポーネント - ...

VMware Workstationはデバイス/資格情報ガードと互換性がありません

仮想マシンをインストールするときに、「VMware ワークステーションはデバイス/資格情報ガードと互...

W3C チュートリアル (7): W3C XSL アクティビティ

スタイル シートは、ドキュメントの表示方法、発音方法、または入力方法を記述します。XSL 言語は、X...

Docker 用ビジュアル UI 管理ツール Portainer のインストールと使用方法の分析

Portainer は、ステータス表示パネル、アプリケーション テンプレートの迅速な展開、コンテナ ...

MySQL 8.0 の新機能: ハッシュ結合

MySQL 開発チームは、2019 年 10 月 14 日に MySQL 8.0.18 GA バージ...

モバイルデバイス Web 開発における HTML ヘッドの書き方

コードをコピーコードは次のとおりです。 <ヘッド> <meta http-equi...

WeChat アプレット計算機の例

この記事では、参考までに、計算機を実装するためのWeChatアプレットの具体的なコードを紹介します。...

デザイナーはコーディングを学ぶ必要がありますか?

多くの場合、 Web デザインが完成した後でデザイナーの無知が露呈し、批判されることがあります。彼ら...