JVM 上の高性能データ形式ライブラリ パッケージである Apache Arrow の紹介とアーキテクチャ (Gkatziouras)

JVM 上の高性能データ形式ライブラリ パッケージである Apache Arrow の紹介とアーキテクチャ (Gkatziouras)

Apache Arrow は、BigQuery を含むさまざまなビッグデータ ツールで使用される一般的な形式で、フラット データと階層型データの両方のストレージ形式です。これは、アプリケーションを高速化するためのメモリを大量に消費する方法です。

データ処理およびデータ サイエンスの分野でよく使用されるライブラリ: Apache Arrow。 Arrow は、Apache Parquet、Apache Spark、pandas などのオープンソース プロジェクトや、多くの商用またはクローズド ソース サービスで使用されています。以下の機能を提供します:

  • インメモリコンピューティング
  • 標準化された列指向ストレージ形式
  • プロセスとノード間のデータ交換のためのIPCとRPCフレームワーク

Arrow が登場する前はどのように機能していたかを見てみましょう。

Spark が Parquet ファイルからデータを読み取るには、データを Parquet 形式で読み取って逆シリアル化する必要があることがわかります。これには、データをメモリにロードして完全なコピーを作成する必要があります。まず、データをメモリ内バッファに読み込み、次に Parquet の変換メソッドを使用してデータ (文字列や数値など) をプログラミング言語の表現に変換します。これは、Parquet が Python プログラミング言語とは異なる方法で数値を表現するために必要です。

これは、さまざまな理由からパフォーマンスにとって大きな問題となります。

  • データをコピーし、それに対して変換手順を実行しています。データはさまざまな形式であるため、計算を実行する前にすべてのデータを読み取って変換する必要があります。
  • ロードするデータはメモリに収まる必要があります。 RAM は 8GB しかないのに、データは 10GB ですか?あなたは本当に不運ですね!

それでは、Apache Arrow がこれをどのように改善するかを見てみましょう。

Arrow は、データをコピーして変換するのではなく、データを直接読み取って操作する方法を理解します。この目的のために、Arrow コミュニティは、シリアル化されたデータに対して直接機能する新しいファイル形式と操作を定義しました。このデータ形式は、メモリにロードしてデータを変換/逆シリアル化することなく、ディスクから直接読み取ることができます。もちろん、データの一部は引き続き RAM にロードされますが、データがメモリに収まる必要はありません。 Arrow はファイル メモリ マッピング機能を使用して、必要な分だけ可能な限り多くのデータをメモリにロードします。

Apache Arrow は次の言語をサポートしています。

  • C++
  • C#
  • 行く
  • ジャワ
  • JavaScript
  • さび
  • Python (C++ ライブラリ経由)
  • Ruby (C++ ライブラリ経由)
  • R (C++ ライブラリ経由)
  • MATLAB (C++ ライブラリ経由)。

矢印機能

Arrow は、何よりもまず、インメモリ コンピューティング用の列データ構造を提供するライブラリです。任意のデータを解凍して Arrow 列データ構造にデコードし、デコードされたデータに対してインメモリ分析を実行できます。 Arrow 列形式には、ランダム アクセスが O(1) であり、各値セルがメモリ内で前のセルおよび次のセルに隣接しているため、反復処理が非常に効率的であるという優れた特性があります。

Apache Arrow は、メッセージングやプロセス間通信に使用できる Arrow 列配列のコレクション (「レコード バッチ」と呼ばれる) を配置するためのバイナリ「シリアル化」プロトコルを定義します。プロトコルはディスク上を含むどこにでも配置でき、後でメモリマップしたり、メモリに読み込んで別の場所に送信したりできます。

Arrow プロトコルは、逆シリアル化なしで Arrow データのブロックを「マップ」できるように設計されているため、ディスク上の Arrow プロトコル データの分析を実行すると、メモリ マッピングが使用され、実質的にコストはかかりません。このプロトコルは、Spark SQL と Python 間のデータのストリーミング、Spark SQL データのチャンクに対する pandas 関数の実行など、さまざまな目的で使用されます。これらは「pandas udfs」と呼ばれます。

Arrow はメモリ用に設計されています (ただし、ディスク上に配置してからメモリ マップすることもできます)。これらは相互に互換性があり、アプリケーションで一緒に使用されるように設計されていますが、競合製品の Apache Parquet ファイルはディスク ストレージ用に設計されています。

利点: Apache Arrow は、CPU や GPU などの最新のハードウェア上で効率的な分析操作が行えるように編成された、フラットおよび階層型データ用の言語に依存しない列型ストレージ形式を定義します。 Arrow メモリ形式は、シリアル化のオーバーヘッドなしで超高速データ アクセスを実現するゼロ コピー読み取りもサポートします。

Java 用 Apache Arrow

ライブラリをインポートします。

<依存関係>
    <グループ ID>org.apache.arrow</グループ ID>
    <artifactId>矢印メモリネットティ</artifactId>
    <バージョン>${arrow.version}</バージョン>
</依存関係>
<依存関係>
    <グループ ID>org.apache.arrow</グループ ID>
    <artifactId>矢印ベクトル</artifactId>
    <バージョン>${arrow.version}</バージョン>
</依存関係>

始める前に、Arrow の読み取り/書き込み操作ではバイト バッファーが使用されることを理解することが重要です。読み取りや書き込みなどの操作は、バイトの連続的な交換です。効率を向上させるために、Arrow には固定サイズまたは自動拡張機能を備えたバッファ アロケータが付属しています。割り当て管理をサポートするライブラリは、arrow-memory-netty と arrow-memory-unsafe です。ここではnettyを使用します。

Arrow を使用してデータを保存するには、プログラムで定義できるスキーマが必要です。

パッケージ com.gkatzioura.ar​​row;

java.io.IOException をインポートします。

java.util.List をインポートします。

org.apache.arrow.vector.types.pojo.ArrowType をインポートします。

org.apache.arrow.vector.types.pojo.Field をインポートします。

org.apache.arrow.vector.types.pojo.FieldType をインポートします。

org.apache.arrow.vector.types.pojo.Schema をインポートします。

パブリッククラスSchemaFactory {

パブリック静的スキーマ DEFAULT_SCHEMA = createDefault();

パブリック静的スキーマcreateDefault() {

var strField = new Field("col1", FieldType.nullable(new ArrowType.Utf8()), null);

var intField = new Field("col2", FieldType.nullable(new ArrowType.Int(32, true)), null);

新しいスキーマ(List.of(strField, intField))を返します。

}

パブリック静的スキーマ schemaWithChildren() {

var amount = new Field("amount", FieldType.nullable(new ArrowType.Decimal(19,4,128)), null);

var currency = new Field("currency",FieldType.nullable(new ArrowType.Utf8()), null);

var itemField = new Field("item", FieldType.nullable(new ArrowType.Utf8()), List.of(amount,currency));

新しいスキーマ(List.of(itemField))を返します。

}

パブリック静的SchemafromJson(String jsonString) {

試す {

Schema.fromJSON(jsonString) を返します。

} キャッチ (IOException e) {

新しい ArrowExampleException(e) をスローします。

}

}

}

解析可能な JSON 表現もあります:

{
  「フィールド」: [ {
    "名前" : "col1",
    「null可能」: true、
    "タイプ" : {
      "名前" : "utf8"
    },
    "子供たち" : [ ]
  }, {
    "名前" : "col2",
    「null可能」: true、
    "タイプ" : {
      "名前" : "int",
      「ビット幅」: 32,
      "isSigned" : 真
    },
    "子供たち" : [ ]
  } ]
}

さらに、Avro と同様に、フィールドに複雑なスキーマや埋め込み値を設計できます。

パブリック静的スキーマ schemaWithChildren() {
    var amount = new Field("amount", FieldType.nullable(new ArrowType.Decimal(19,4,128)), null);
    var currency = new Field("currency",FieldType.nullable(new ArrowType.Utf8()), null);
    var itemField = new Field("item", FieldType.nullable(new ArrowType.Utf8()), List.of(amount,currency));
 
    新しいスキーマ(List.of(itemField))を返します。
}

上記のスキーマに基づいて、クラスの DTO を作成します。

パッケージ com.gkatzioura.ar​​row;
 
lombok.Builder をインポートします。
lombok.Data をインポートします。
 
@データ
@ビルダー
パブリッククラスDefaultArrowEntry {
 
    プライベート文字列 col1;
    プライベート整数col2;
 
}

私たちの目標は、これらの Java オブジェクトを Arrow バイト ストリームに変換することです。

1. アロケータを使用してDirectByteBufferを作成する

これらのバッファはオフヒープです。使用されているメモリを解放する必要がありますが、ライブラリ ユーザーの場合は、アロケータで close() 操作を実行することによってこれが行われます。私たちの場合、クラスは Closeable インターフェイスを実装し、アロケータのクローズ操作を実行します。

ストリーミング API を使用すると、データは Arrow 形式を使用して送信された OutPutStream にストリーミングされます。

パッケージ com.gkatzioura.ar​​row;
 
java.io.Closeable をインポートします。
java.io.IOException をインポートします。
java.nio.channels.WritableByteChannel をインポートします。
java.util.List をインポートします。
 
org.apache.arrow.memory.RootAllocator をインポートします。
org.apache.arrow.vector.IntVector をインポートします。
org.apache.arrow.vector.VarCharVector をインポートします。
org.apache.arrow.vector.VectorSchemaRoot をインポートします。
org.apache.arrow.vector.dictionary.DictionaryProvider をインポートします。
org.apache.arrow.vector.ipc.ArrowStreamWriter をインポートします。
org.apache.arrow.vector.util.Text をインポートします。
 
static com.gkatzioura.ar​​row.SchemaFactory.DEFAULT_SCHEMA をインポートします。
 
パブリッククラスDefaultEntriesWriterはCloseableを実装します{
 
    プライベート最終 RootAllocator rootAllocator;
    private final VectorSchemaRoot vectorSchemaRoot; //ベクトルアロケータの作成:
 
    パブリックDefaultEntriesWriter() {
        rootAllocator = 新しい RootAllocator();
        ルートアロケータを作成します。
    }
 
    パブリック void write(List<DefaultArrowEntry> defaultArrowEntries、int batchSize、WritableByteChannel out) {
        バッチサイズ <= 0 の場合
            バッチサイズ = defaultArrowEntries.size();
        }
 
        DictionaryProvider.MapDictionaryProvider dictProvider = 新しい DictionaryProvider.MapDictionaryProvider();
        試してください(ArrowStreamWriterライター = 新しいArrowStreamWriter(vectorSchemaRoot, dictProvider, out)) {
            ライターを起動します。
 
            VarCharVector の childVector1 = (VarCharVector) vectorSchemaRoot.getVector(0);
            IntVector の childVector2 = (IntVector) vectorSchemaRoot.getVector(1);
            子ベクター1.リセット();
            子ベクター2.リセット();
 
            ブール値 exactBatches = defaultArrowEntries.size()%batchSize == 0;
            int バッチカウンタ = 0;
 
            for(int i=0; i < defaultArrowEntries.size(); i++) {
                childVector1.setSafe(batchCounter、新しい Text(defaultArrowEntries.get(i).getCol1()));
                childVector2.setSafe(batchCounter、defaultArrowEntries.get(i).getCol2());
 
                バッチカウンタ++;
 
                バッチカウンタ == バッチサイズの場合
                    バッチサイズをセットします。
                    ライター.writeBatch();
                    バッチカウンタ = 0;
                }
            }
 
            if(!exactBatches) {
                vectorSchemaRoot.setRowCount(バッチカウンタ);
                ライター.writeBatch();
            }
 
            ライター.end();
        } キャッチ (IOException e) {
            新しい ArrowExampleException(e) をスローします。
        }
    }
 
    @オーバーライド
    パブリック void close() は IOException をスローします {
        ベクトルスキーマルートを閉じます。
        ルートアロケータを閉じます。
    }
 
}

Arrow でのバッチ処理のサポートを示すために、関数に単純なバッチ処理アルゴリズムが実装されています。この例では、データをバッチで書き込むことを検討します。

上記のコードが何をするのかを詳しく見てみましょう。

ベクトルアロケータの作成:

パブリックDefaultEntriesToBytesConverter() {
    rootAllocator = 新しい RootAllocator();
    ルートアロケータを作成します。
}

ストリームに書き込むときに、Arrowストリームライターが実装され、起動されます。

ArrowStreamWriter ライター = 新しい ArrowStreamWriter(vectorSchemaRoot、dictProvider、Channels.newChannel(out));
ライターを起動します。

ベクトルにデータを入力してリセットしますが、事前に割り当てられたバッファはそのまま残します。

VarCharVector の childVector1 = (VarCharVector) vectorSchemaRoot.getVector(0);
IntVector の childVector2 = (IntVector) vectorSchemaRoot.getVector(1);
子ベクター1.リセット();
子ベクター2.リセット();

データを書き込むときは、setSafe 操作を使用します。さらに多くのバッファを割り当てる必要がある場合にこれを実行する必要があります。この例では、書き込みごとにこれが実行されますが、必要な操作とバッファ サイズを考慮すると回避できます。

childVector1.setSafe(i, 新しい Text(defaultArrowEntries.get(i).getCol1()));
childVector2.setSafe(i, defaultArrowEntries.get(i).getCol2());

次に、バッチをストリームに書き込みます。

バッチサイズをセットします。
ライター.writeBatch();

最後になりましたが、著者は次のように締めくくっています。

@オーバーライド
パブリック void close() は IOException をスローします {
    ベクトルスキーマルートを閉じます。
    ルートアロケータを閉じます。
}

以上が、JVM (Gkatziouras) 上の高性能データフォーマットライブラリパッケージである Apache Arrow の紹介とアーキテクチャの詳細な内容です。Apache Arrow の入門に関する詳細については、123WORDPRESS.COM の他の関連記事にも注目してください。

以下もご興味があるかもしれません:
  • JVM 入門: クラス ローディングとバイトコード テクノロジ (クラス ローディングとクラス ローダー)
  • JVM の概要: メモリ構造 (ヒープ、メソッド領域)
  • JVM 入門 - JVM の概要

<<:  グリッド共通レイアウトの実装

>>:  MySQLのSeconds_Behind_Masterの詳細な説明

推薦する

HTML/CSS での空白処理とページ内の空白を保持する方法

HTML の空白ルールHTML では、コンテンツ内の複数のスペースは通常 1 つとみなされ、連続する...

HTML と CSS に関する基本的なメモ (フロントエンドでは必読)

HTMLに触れた当初はレイアウトにいつもテーブルを使っていましたが、とても面倒で見た目も悪かったの...

Mysql Workbench クエリ mysql データベース メソッド

Mysql Workbench はオープンソースのデータベース クライアントです。このオープンソース...

スライダー間隔コンポーネントのネイティブ js 実装

この記事の例では、スライダー間隔コンポーネントを実装するためのjsの具体的なコードを参考までに共有し...

mysql のインデックスと FROM_UNIXTIME に関する問題

ゼロ、背景今週の木曜日にたくさんのアラートを受け取りました。DBA に確認を依頼したところ、遅いクエ...

MySQLで適切なインデックスを選択する方法

まずは栗を見てみましょう EXPLAIN select * from employees where...

nginx ウェブサイト サービスのアンチホットリンクを設定する方法 (推奨)

1. ホットリンクの原則1.1 Webページの準備Web ソース ホスト (192.168.153...

Mac OS10.12 に mysql5.7.18 をインストールするチュートリアル

ウェブ全体を検索して、さまざまな落とし穴を見つけましたが、問題は解決しませんでした。ついに自分でも分...

Dockerデータを完全にクリーンアップする方法

目次定期的に剪定するミラーエビクションコンテナのクリーンアップネットワークソート体積の蒸発完全にクリ...

MySQL の暗号化と復号化の例

MySQL の暗号化と復号化の例データの暗号化と復号化はセキュリティ分野で非常に重要です。プログラマ...

Linux7 ベースの Hadoop のインストールと構成の詳細なグラフィック説明

上記のように材料を準備します(ps: hadoop-3.1.2-srcはhadoop-3.1.2に変...

JS を使用して航空機戦争の小さなゲームを実装する

この記事の例では、参考のために航空機戦争ゲームを実装するためのJSの具体的なコードを共有しています。...

Windows Server 2016 で Flash を有効にする方法

最近、VMware Horizo​​n を導入してテストしましたが、そのコンソールにはデフォルトで ...

$remote_addr に基づく nginx フロントエンド配布方法の詳細な説明

要件は次のとおりです。ドメイン名の下に複数のサーバーがあります。現在、特定の地域をテストしています。...

「さらに表示」ボタンによる複数行テキストの切り捨てに関する考察

最近、たまたまこの小さな要件に遭遇しました。昔、JS を使用してこれを処理したことを覚えていますが、...