JVM 上の高性能データ形式ライブラリ パッケージである Apache Arrow の紹介とアーキテクチャ (Gkatziouras)

JVM 上の高性能データ形式ライブラリ パッケージである Apache Arrow の紹介とアーキテクチャ (Gkatziouras)

Apache Arrow は、BigQuery を含むさまざまなビッグデータ ツールで使用される一般的な形式で、フラット データと階層型データの両方のストレージ形式です。これは、アプリケーションを高速化するためのメモリを大量に消費する方法です。

データ処理およびデータ サイエンスの分野でよく使用されるライブラリ: Apache Arrow。 Arrow は、Apache Parquet、Apache Spark、pandas などのオープンソース プロジェクトや、多くの商用またはクローズド ソース サービスで使用されています。以下の機能を提供します:

  • インメモリコンピューティング
  • 標準化された列指向ストレージ形式
  • プロセスとノード間のデータ交換のためのIPCとRPCフレームワーク

Arrow が登場する前はどのように機能していたかを見てみましょう。

Spark が Parquet ファイルからデータを読み取るには、データを Parquet 形式で読み取って逆シリアル化する必要があることがわかります。これには、データをメモリにロードして完全なコピーを作成する必要があります。まず、データをメモリ内バッファに読み込み、次に Parquet の変換メソッドを使用してデータ (文字列や数値など) をプログラミング言語の表現に変換します。これは、Parquet が Python プログラミング言語とは異なる方法で数値を表現するために必要です。

これは、さまざまな理由からパフォーマンスにとって大きな問題となります。

  • データをコピーし、それに対して変換手順を実行しています。データはさまざまな形式であるため、計算を実行する前にすべてのデータを読み取って変換する必要があります。
  • ロードするデータはメモリに収まる必要があります。 RAM は 8GB しかないのに、データは 10GB ですか?あなたは本当に不運ですね!

それでは、Apache Arrow がこれをどのように改善するかを見てみましょう。

Arrow は、データをコピーして変換するのではなく、データを直接読み取って操作する方法を理解します。この目的のために、Arrow コミュニティは、シリアル化されたデータに対して直接機能する新しいファイル形式と操作を定義しました。このデータ形式は、メモリにロードしてデータを変換/逆シリアル化することなく、ディスクから直接読み取ることができます。もちろん、データの一部は引き続き RAM にロードされますが、データがメモリに収まる必要はありません。 Arrow はファイル メモリ マッピング機能を使用して、必要な分だけ可能な限り多くのデータをメモリにロードします。

Apache Arrow は次の言語をサポートしています。

  • C++
  • C#
  • 行く
  • ジャワ
  • JavaScript
  • さび
  • Python (C++ ライブラリ経由)
  • Ruby (C++ ライブラリ経由)
  • R (C++ ライブラリ経由)
  • MATLAB (C++ ライブラリ経由)。

矢印機能

Arrow は、何よりもまず、インメモリ コンピューティング用の列データ構造を提供するライブラリです。任意のデータを解凍して Arrow 列データ構造にデコードし、デコードされたデータに対してインメモリ分析を実行できます。 Arrow 列形式には、ランダム アクセスが O(1) であり、各値セルがメモリ内で前のセルおよび次のセルに隣接しているため、反復処理が非常に効率的であるという優れた特性があります。

Apache Arrow は、メッセージングやプロセス間通信に使用できる Arrow 列配列のコレクション (「レコード バッチ」と呼ばれる) を配置するためのバイナリ「シリアル化」プロトコルを定義します。プロトコルはディスク上を含むどこにでも配置でき、後でメモリマップしたり、メモリに読み込んで別の場所に送信したりできます。

Arrow プロトコルは、逆シリアル化なしで Arrow データのブロックを「マップ」できるように設計されているため、ディスク上の Arrow プロトコル データの分析を実行すると、メモリ マッピングが使用され、実質的にコストはかかりません。このプロトコルは、Spark SQL と Python 間のデータのストリーミング、Spark SQL データのチャンクに対する pandas 関数の実行など、さまざまな目的で使用されます。これらは「pandas udfs」と呼ばれます。

Arrow はメモリ用に設計されています (ただし、ディスク上に配置してからメモリ マップすることもできます)。これらは相互に互換性があり、アプリケーションで一緒に使用されるように設計されていますが、競合製品の Apache Parquet ファイルはディスク ストレージ用に設計されています。

利点: Apache Arrow は、CPU や GPU などの最新のハードウェア上で効率的な分析操作が行えるように編成された、フラットおよび階層型データ用の言語に依存しない列型ストレージ形式を定義します。 Arrow メモリ形式は、シリアル化のオーバーヘッドなしで超高速データ アクセスを実現するゼロ コピー読み取りもサポートします。

Java 用 Apache Arrow

ライブラリをインポートします。

<依存関係>
    <グループ ID>org.apache.arrow</グループ ID>
    <artifactId>矢印メモリネットティ</artifactId>
    <バージョン>${arrow.version}</バージョン>
</依存関係>
<依存関係>
    <グループ ID>org.apache.arrow</グループ ID>
    <artifactId>矢印ベクトル</artifactId>
    <バージョン>${arrow.version}</バージョン>
</依存関係>

始める前に、Arrow の読み取り/書き込み操作ではバイト バッファーが使用されることを理解することが重要です。読み取りや書き込みなどの操作は、バイトの連続的な交換です。効率を向上させるために、Arrow には固定サイズまたは自動拡張機能を備えたバッファ アロケータが付属しています。割り当て管理をサポートするライブラリは、arrow-memory-netty と arrow-memory-unsafe です。ここではnettyを使用します。

Arrow を使用してデータを保存するには、プログラムで定義できるスキーマが必要です。

パッケージ com.gkatzioura.ar​​row;

java.io.IOException をインポートします。

java.util.List をインポートします。

org.apache.arrow.vector.types.pojo.ArrowType をインポートします。

org.apache.arrow.vector.types.pojo.Field をインポートします。

org.apache.arrow.vector.types.pojo.FieldType をインポートします。

org.apache.arrow.vector.types.pojo.Schema をインポートします。

パブリッククラスSchemaFactory {

パブリック静的スキーマ DEFAULT_SCHEMA = createDefault();

パブリック静的スキーマcreateDefault() {

var strField = new Field("col1", FieldType.nullable(new ArrowType.Utf8()), null);

var intField = new Field("col2", FieldType.nullable(new ArrowType.Int(32, true)), null);

新しいスキーマ(List.of(strField, intField))を返します。

}

パブリック静的スキーマ schemaWithChildren() {

var amount = new Field("amount", FieldType.nullable(new ArrowType.Decimal(19,4,128)), null);

var currency = new Field("currency",FieldType.nullable(new ArrowType.Utf8()), null);

var itemField = new Field("item", FieldType.nullable(new ArrowType.Utf8()), List.of(amount,currency));

新しいスキーマ(List.of(itemField))を返します。

}

パブリック静的SchemafromJson(String jsonString) {

試す {

Schema.fromJSON(jsonString) を返します。

} キャッチ (IOException e) {

新しい ArrowExampleException(e) をスローします。

}

}

}

解析可能な JSON 表現もあります:

{
  「フィールド」: [ {
    "名前" : "col1",
    「null可能」: true、
    "タイプ" : {
      "名前" : "utf8"
    },
    "子供たち" : [ ]
  }, {
    "名前" : "col2",
    「null可能」: true、
    "タイプ" : {
      "名前" : "int",
      「ビット幅」: 32,
      "isSigned" : 真
    },
    "子供たち" : [ ]
  } ]
}

さらに、Avro と同様に、フィールドに複雑なスキーマや埋め込み値を設計できます。

パブリック静的スキーマ schemaWithChildren() {
    var amount = new Field("amount", FieldType.nullable(new ArrowType.Decimal(19,4,128)), null);
    var currency = new Field("currency",FieldType.nullable(new ArrowType.Utf8()), null);
    var itemField = new Field("item", FieldType.nullable(new ArrowType.Utf8()), List.of(amount,currency));
 
    新しいスキーマ(List.of(itemField))を返します。
}

上記のスキーマに基づいて、クラスの DTO を作成します。

パッケージ com.gkatzioura.ar​​row;
 
lombok.Builder をインポートします。
lombok.Data をインポートします。
 
@データ
@ビルダー
パブリッククラスDefaultArrowEntry {
 
    プライベート文字列 col1;
    プライベート整数col2;
 
}

私たちの目標は、これらの Java オブジェクトを Arrow バイト ストリームに変換することです。

1. アロケータを使用してDirectByteBufferを作成する

これらのバッファはオフヒープです。使用されているメモリを解放する必要がありますが、ライブラリ ユーザーの場合は、アロケータで close() 操作を実行することによってこれが行われます。私たちの場合、クラスは Closeable インターフェイスを実装し、アロケータのクローズ操作を実行します。

ストリーミング API を使用すると、データは Arrow 形式を使用して送信された OutPutStream にストリーミングされます。

パッケージ com.gkatzioura.ar​​row;
 
java.io.Closeable をインポートします。
java.io.IOException をインポートします。
java.nio.channels.WritableByteChannel をインポートします。
java.util.List をインポートします。
 
org.apache.arrow.memory.RootAllocator をインポートします。
org.apache.arrow.vector.IntVector をインポートします。
org.apache.arrow.vector.VarCharVector をインポートします。
org.apache.arrow.vector.VectorSchemaRoot をインポートします。
org.apache.arrow.vector.dictionary.DictionaryProvider をインポートします。
org.apache.arrow.vector.ipc.ArrowStreamWriter をインポートします。
org.apache.arrow.vector.util.Text をインポートします。
 
static com.gkatzioura.ar​​row.SchemaFactory.DEFAULT_SCHEMA をインポートします。
 
パブリッククラスDefaultEntriesWriterはCloseableを実装します{
 
    プライベート最終 RootAllocator rootAllocator;
    private final VectorSchemaRoot vectorSchemaRoot; //ベクトルアロケータの作成:
 
    パブリックDefaultEntriesWriter() {
        rootAllocator = 新しい RootAllocator();
        ルートアロケータを作成します。
    }
 
    パブリック void write(List<DefaultArrowEntry> defaultArrowEntries、int batchSize、WritableByteChannel out) {
        バッチサイズ <= 0 の場合
            バッチサイズ = defaultArrowEntries.size();
        }
 
        DictionaryProvider.MapDictionaryProvider dictProvider = 新しい DictionaryProvider.MapDictionaryProvider();
        試してください(ArrowStreamWriterライター = 新しいArrowStreamWriter(vectorSchemaRoot, dictProvider, out)) {
            ライターを起動します。
 
            VarCharVector の childVector1 = (VarCharVector) vectorSchemaRoot.getVector(0);
            IntVector の childVector2 = (IntVector) vectorSchemaRoot.getVector(1);
            子ベクター1.リセット();
            子ベクター2.リセット();
 
            ブール値 exactBatches = defaultArrowEntries.size()%batchSize == 0;
            int バッチカウンタ = 0;
 
            for(int i=0; i < defaultArrowEntries.size(); i++) {
                childVector1.setSafe(batchCounter、新しい Text(defaultArrowEntries.get(i).getCol1()));
                childVector2.setSafe(batchCounter、defaultArrowEntries.get(i).getCol2());
 
                バッチカウンタ++;
 
                バッチカウンタ == バッチサイズの場合
                    バッチサイズをセットします。
                    ライター.writeBatch();
                    バッチカウンタ = 0;
                }
            }
 
            if(!exactBatches) {
                vectorSchemaRoot.setRowCount(バッチカウンタ);
                ライター.writeBatch();
            }
 
            ライター.end();
        } キャッチ (IOException e) {
            新しい ArrowExampleException(e) をスローします。
        }
    }
 
    @オーバーライド
    パブリック void close() は IOException をスローします {
        ベクトルスキーマルートを閉じます。
        ルートアロケータを閉じます。
    }
 
}

Arrow でのバッチ処理のサポートを示すために、関数に単純なバッチ処理アルゴリズムが実装されています。この例では、データをバッチで書き込むことを検討します。

上記のコードが何をするのかを詳しく見てみましょう。

ベクトルアロケータの作成:

パブリックDefaultEntriesToBytesConverter() {
    rootAllocator = 新しい RootAllocator();
    ルートアロケータを作成します。
}

ストリームに書き込むときに、Arrowストリームライターが実装され、起動されます。

ArrowStreamWriter ライター = 新しい ArrowStreamWriter(vectorSchemaRoot、dictProvider、Channels.newChannel(out));
ライターを起動します。

ベクトルにデータを入力してリセットしますが、事前に割り当てられたバッファはそのまま残します。

VarCharVector の childVector1 = (VarCharVector) vectorSchemaRoot.getVector(0);
IntVector の childVector2 = (IntVector) vectorSchemaRoot.getVector(1);
子ベクター1.リセット();
子ベクター2.リセット();

データを書き込むときは、setSafe 操作を使用します。さらに多くのバッファを割り当てる必要がある場合にこれを実行する必要があります。この例では、書き込みごとにこれが実行されますが、必要な操作とバッファ サイズを考慮すると回避できます。

childVector1.setSafe(i, 新しい Text(defaultArrowEntries.get(i).getCol1()));
childVector2.setSafe(i, defaultArrowEntries.get(i).getCol2());

次に、バッチをストリームに書き込みます。

バッチサイズをセットします。
ライター.writeBatch();

最後になりましたが、著者は次のように締めくくっています。

@オーバーライド
パブリック void close() は IOException をスローします {
    ベクトルスキーマルートを閉じます。
    ルートアロケータを閉じます。
}

以上が、JVM (Gkatziouras) 上の高性能データフォーマットライブラリパッケージである Apache Arrow の紹介とアーキテクチャの詳細な内容です。Apache Arrow の入門に関する詳細については、123WORDPRESS.COM の他の関連記事にも注目してください。

以下もご興味があるかもしれません:
  • JVM 入門: クラス ローディングとバイトコード テクノロジ (クラス ローディングとクラス ローダー)
  • JVM の概要: メモリ構造 (ヒープ、メソッド領域)
  • JVM 入門 - JVM の概要

<<:  グリッド共通レイアウトの実装

>>:  MySQLのSeconds_Behind_Masterの詳細な説明

推薦する

ノードをMySQLデータベースに接続する際に発生する問題と解決策

今日、MySQL の新しいバージョン (8.0.21) をインストールしましたが、ノード フレームワ...

MySQL の条件文で 1 つの情報しか読み取れない問題に対する 2 つの解決策

今日、私の同僚が MYSQL クエリ ステートメントの作成時に非常に奇妙な問題に遭遇しました。MyS...

Vue2で配列の変更を検出できない理由と解決策

目次回避策Vue2.0 で 2 つの配列の変更を監視できないのはなぜですか?ソースコード分析ヴュー3...

React で遅延読み込みを使用して最初の画面の読み込み時間を短縮する方法

目次使用インストールルーティングでどのように使用しますか?読み込み速度の比較最近、中間およびバックエ...

js の parseInt() の奇妙な動作の調査と修正

背景: parseInt(0.006) または parseInt(0.0006) は 0 という値を...

CSS3 ボタン境界アニメーションの実装

まず効果を見てみましょう: html <a href="#"> &l...

Linuxのlocateコマンドの使い方

01. コマンドの概要実際には、locate コマンドは find -name の別の書き方ですが、...

...

MySQLクエリ時にフィールドにデフォルト値を割り当てる方法

必要フィールドをクエリする場合、フィールドに同じ値を指定する必要があります。この値はハードコードする...

MySQL 正規表現 (regexp と rlike) の検索機能の例分析

この記事では、例を使用して MySQL 正規表現 (regexp および rlike) の検索機能を...

LinuxにComposerをインストールする方法

1. インストールスクリプト(composer-setup.php)を現在のディレクトリにダウンロー...

Vue で円形プログレスバーを実装する例

データ表示は、常にあらゆる職業の人々が求めているものです。特にフロントエンド開発業界では、データを表...

MySQL 5.7.17 圧縮パッケージのインストールと設定方法のグラフィックチュートリアル

インターネット上にはMySQL 5.7.17のインストールチュートリアルがほとんどなく不十分なので、...

MySQLクエリインターセプトの詳細な分析

目次1. クエリの最適化1. MySQLチューニングの概要2. 小さなテーブルが大きなテーブルを動か...

MySQL 8.0 Windows zip パッケージ版の詳細なインストール手順

MySQL 8.0 Windows zipのインストール手順は次のように紹介されています。準備する:...