Spark SQL の 4 つの一般的なデータ ソースの詳細な説明

Spark SQL の 4 つの一般的なデータ ソースの詳細な説明

汎用ロード/書き込みメソッド

オプションを手動で指定する

Spark SQL の DataFrame インターフェースは、複数のデータ ソースに対する操作をサポートします。 DataFrame は RDD と同じように操作でき、一時テーブルとして登録することもできます。 DataFrame を一時テーブルとして登録した後、DataFrame に対して SQL クエリを実行できます。

Spark SQL のデフォルトのデータ ソースは Parquet 形式です。データ ソースが Parquet ファイルの場合、Spark SQL はすべての操作を簡単に実行できます。

デフォルトのデータ ソース形式を変更するには、構成項目 spark.sql.sources.default を変更します。

scala> val df = spark.read.load("hdfs://hadoop001:9000/namesAndAges.parquet")
df: org.apache.spark.sql.DataFrame = [年齢: bigint、名前: 文字列]
scala> df.select("name").write.save("names.parquet")

データ ソース形式が parquet ファイルでない場合は、データ ソース形式を手動で指定する必要があります。データ ソース形式には、完全な名前を指定する必要があります (例: org.apache.spark.sql.parquet)。データ ソース形式が組み込み形式の場合は、データ形式を指定するために、略語 json、parquet、jdbc、orc、libsvm、csv、text のみを指定する必要があります。

データの一般的な読み込みには SparkSession が提供する read.load メソッドを使用し、データの保存には write と save を使用できます。

scala> val peopleDF = spark.read.format("json").load("hdfs://hadoop001:9000/people.json")
peopleDF: org.apache.spark.sql.DataFrame = [年齢: bigint, 名前: 文字列]
scala> peopleDF.write.format("parquet").save("hdfs://hadoop001:9000/namesAndAges.parquet")
スカラ>

さらに、ファイルに対して直接 SQL を実行することもできます。

val sqlDF = spark.sql("SELECT * FROM parquet.`hdfs://hadoop001:9000/namesAndAges.parquet`")
sqlDF.表示()

ファイル保存オプション

SaveMode はストレージ操作を実行するために使用できます。SaveMode はデータ処理モードを定義します。これらの保存モードではロックが使用されず、アトミック操作ではないことに注意することが重要です。また、上書き方式を使用すると、新しいデータが出力される前に元のデータが削除されます。 SaveMode の詳細については、次の表を参照してください。

スカラ/Javaあらゆる言語意味
SaveMode.ErrorIfExists (デフォルト) 「エラー」(デフォルト)ファイルが存在する場合はエラーが報告されます。
保存モード.追加「追加」追加
保存モード.上書き「上書き」オーバーライド
保存モード.無視"無視する"データが存在する場合は無視する

寄木細工のファイル

Parquetの読み取りと書き込み

Parquet 形式は Hadoop エコシステムでよく使用され、Spark SQL のすべてのデータ型もサポートします。 Spark SQL は、Parquet 形式のファイルを直接読み取って保存する方法を提供します。

// 最も一般的なタイプのエンコーダーは、spark.implicits をインポートすることで自動的に提供されます。
spark.implicits._ をインポートします。
val peopleDF = spark.read.json("examples/src/main/resources/people.json")
// DataFramesはスキーマ情報を維持しながらParquetファイルとして保存できます
peopleDF.write.parquet("hdfs://hadoop001:9000/people.parquet")
// 上記で作成した parquet ファイルを読み込む
// Parquetファイルは自己記述型なのでスキーマは保持されます
// Parquet ファイルをロードした結果も DataFrame になります
parquetFileDF を spark.read.parquet("hdfs://hadoop001:9000/people.parquet") に変更します。
// Parquet ファイルは一時ビューの作成にも使用でき、その後 SQL ステートメントで使用できます。
parquetFileDF.createOrReplaceTempView("parquetFile")
val namesDF = spark.sql("parquetFile から name を選択し、age を 13 から 19 の間で指定します")
namesDF.map(attributes => "名前: " + 属性(0)).show()
// +------------+
// | 値|
// +------------+
// |名前: ジャスティン|
// +------------+

パーティション情報の解析

テーブルをパーティション分割することは、データを最適化する 1 つの方法です。パーティション化されたテーブルでは、データはパーティション列を使用して異なるディレクトリに保存されます。 Parquet データ ソースは、パーティション情報を自動的に検出して解決できるようになりました。たとえば、人口データを性別と国の列に分割するには、次のディレクトリ構造を使用します。

パス
└──に
└── テーブル
├── 性別=男性
│ ├──……
│ │
│ ├── 国=米国
│ │ └── データ.parquet
│ ├── 国=CN
│ │ └── データ.parquet
│ └──……
└── 性別=女性
├──……
│
├── 国=米国
│ └── データ.parquet
├── 国=CN
│ └── データ.parquet
└──……

SQLContext.read.parqueにpath/to/tableを渡すことによって

または SQLContext.read.load の場合、Spark SQL はパーティション情報を自動的に解決します。

返される DataFrame のスキーマは次のとおりです。

根
|-- 名前: 文字列 (nullable = true)
|-- age: long (nullable = true)
|-- 性別: 文字列 (nullable = true)
|-- 国: 文字列 (nullable = true)

データのパーティション列のデータ型は自動的に解析されることに注意してください。現在、数値型と文字列型がサポートされています。パーティション タイプを自動的に解析するためのパラメータは次のとおりです。

spark.sql.sources.partitionColumnTypeInference が有効

デフォルト値は true です。

この機能を無効にしたい場合は、このパラメータを無効に設定するだけです。このとき、パーティション列のデータ形式はデフォルトで文字列型に設定され、型の解析は実行されなくなります。

スキーマのマージ

ProtocolBuffer、Avro、Thrift と同様に、Parquet もスキーマの進化をサポートしています。ユーザーは最初に単純なスキーマを定義し、その後スキーマに列の説明を徐々に追加することができます。このようにして、ユーザーは、スキーマは異なるが互いに互換性のある複数の Parquet ファイルを取得できます。 Parquet データ ソースは、この状況を自動的に検出し、これらのファイルのスキーマをマージするようになりました。

スキーマのマージはコストのかかる操作であり、ほとんどの場合必要ないため、Spark SQL では 1.5.0 以降、この機能がデフォルトで無効になっています。この機能を有効にするには、次の 2 つの方法があります。

データ ソースが Parquet ファイルの場合は、データ ソース オプション mergeSchema を true に設定します。

グローバル SQL オプションを設定するには:

spark.sql.parquet.mergeSchema は true です。

// この例では、前の例の sqlContext が使用されます。
// これは RDD を暗黙的に DataFrame に変換するために使用されます。
spark.implicits._ をインポートします。
// パーティションディレクトリに保存される単純な DataFrame を作成します
val df1 = sc.makeRDD(1 から 5).map(i => (i, i * 2)).toDF("single", "double")
df1.write.parquet("hdfs://hadoop001:9000/data/test_table/key=1")
// 新しいパーティションディレクトリに別の DataFrame を作成します。
// 新しい列を追加し、既存の列を削除します
val df2 = sc.makeRDD(6 から 10).map(i => (i, i * 3)).toDF("シングル", "トリプル")
df2.write.parquet("hdfs://hadoop001:9000/data/test_table/key=2")
// パーティションテーブルを読み込む
val df3 = spark.read.option("mergeSchema", "true").parquet("hdfs://hadoop001:9000/data/test_table")
df3.printSchema()
// 最終的なスキーマは、Parquet ファイル内の 3 つの列すべてで構成されます
// パーティション ディレクトリ パスにパーティション列が表示されます。
//根
// |-- 単一: int (nullable = true)
// |-- double: int (nullable = true)
// |-- トリプル: int (nullable = true)
// |-- キー: int (nullable = true)

Hive データ ソース

Apache Hive は Hadoop の SQL エンジンであり、Spark SQL は Hive サポートの有無にかかわらずコンパイルできます。 Hive をサポートする Spark SQL は、Hive テーブル アクセス、UDF (ユーザー定義関数)、Hive クエリ言語 (HiveQL/HQL) などをサポートできます。強調する必要がある点の 1 つは、Spark SQL に Hive ライブラリを含める場合、事前に Hive をインストールする必要がないことです。一般的に、これらの機能を使用できるようにするには、Hive サポート付きで Spark SQL をコンパイルするのが最適です。 Spark のバイナリ バージョンをダウンロードした場合は、Hive サポートが追加された状態でコンパイルされているはずです。

デプロイされた Hive に Spark SQL を接続するには、hive-site.xml を Spark 構成ファイル ディレクトリ ($SPARK_HOME/conf) にコピーする必要があります。 Hive がデプロイされていない場合でも Spark SQL を実行できます。

Hive をデプロイしていない場合、Spark SQL は現在の作業ディレクトリに metastore_db と呼ばれる独自の Hive メタデータ ウェアハウスを作成することに注意してください。さらに、HiveQL で CREATE TABLE (CREATE EXTERNAL TABLE ではない) ステートメントを使用してテーブルを作成しようとすると、テーブルはデフォルトのファイル システム (クラスパスに hdfs-site.xml が構成されている場合は HDFS、そうでない場合はローカル ファイル システム) の /user/hive/warehouse ディレクトリに配置されます。

java.io.File をインポートする
org.apache.spark.sql.Row をインポートします。
org.apache.spark.sql.SparkSession をインポートします。
ケースクラス Record(キー: Int、値: String)
// warehouseLocation は管理対象データベースとテーブルのデフォルトの場所を指します
val warehouseLocation = 新しいファイル("spark-warehouse").getAbsolutePath
val スパーク = SparkSession
。ビルダー()
.appName("Spark Hive の例")
.config("spark.sql.warehouse.dir", 倉庫の場所)
.enableHiveSupport()
.getOrCreate()
spark.implicits._ をインポートします。
spark.sqlをインポートする
sql("テーブルが存在しない場合は作成します src (キー INT、値 STRING)")
sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' を TABLE src にロード")
// クエリはHiveQLで表現されます
sql("SELECT * FROM src").show()
// +---+-------+
// |キー| 値|
// +---+-------+
//|238|val_238|
//| 86| val_86|
//|311|val_311|
// ...
// 集計クエリもサポートされています。
sql("SELECT COUNT(*) FROM src").show()
// +--------+
// |カウント(1)|
// +--------+
// | 500 |
// +--------+
// SQL クエリの結果自体は DataFrame であり、すべての通常の関数をサポートします。
val sqlDF = sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key")
// DataFrame 内の項目は Row 型なので、序数で各列にアクセスできます。
val stringsDS = sqlDF.map {
case Row(キー: Int, 値: String) => s"キー: $key, 値: $value"
}
文字列DS.show()
//+--------------------+
// | 値|
// +--------------------+
// |キー: 0、値: val_0|
// |キー: 0、値: val_0|
// |キー: 0、値: val_0|
// ...
// DataFrames を使用して、SparkSession 内に一時ビューを作成することもできます。
val recordsDF = spark.createDataFrame((1 から 100).map(i => Record(i, s"val_$i")))
recordsDF.createOrReplaceTempView("レコード")
// クエリは、DataFrame データを Hive に保存されているデータと結合できます。
sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show()
// +---+------+---+------+
// |キー| 値|キー| 値|
// +---+------+---+------+
// | 2| val_2| 2| val_2|
// | 4| val_4| 4| val_4|
// | 5| val_5| 5| val_5|
// ...

組み込みHiveアプリケーション

組み込みの Hive を使用する場合は、何もする必要はなく、直接使用するだけです。 –conf: 設定:

spark.sql.warehouse.dir=

注: Spark 2.0 以降で内部 Hive を使用している場合、データ ウェアハウスのアドレスを指定するために spark.sql.warehouse.dir が使用されます。パスとして HDFS を使用する必要がある場合は、core-site.xml と hdfs-site.xml を Spark conf ディレクトリに追加する必要があります。そうしないと、マスター ノードのウェアハウス ディレクトリのみが作成され、クエリ中にファイルが見つからないという問題が発生します。この場合、HDFS を使用し、メタストアを削除して、クラスターを再起動する必要があります。

外部 Hive アプリケーション

外部にデプロイされた Hive に接続する場合は、次の手順を完了する必要があります。

a Hive 内の hive-site.xml ファイルを Spark インストール ディレクトリの下の conf ディレクトリにコピーまたはソフト リンクします。

b Spark シェルを開き、Hive メタベースにアクセスするための JDBC クライアントを用意します。

$ bin/spark-shell --master spark://hadoop001:7077 --jars mysql-connector-java-5.1.27-bin.jar

JSONデータセット

Spark SQL は、JSON データセットの構造を自動的に推測し、Dataset[Row] としてロードできます。SparkSession.read.json() を使用して、Dataset[String] または JSON ファイルをロードできます。この JSON ファイルは従来の JSON ファイルではないことに注意してください。各行は JSON 文字列である必要があります。

{"名前":"マイケル"}
{"名前":"アンディ", "年齢":30}
{"名前":"ジャスティン", "年齢":19}
// プリミティブ型(Int、Stringなど)と積型(ケースクラス)のエンコーダは
// データセットを作成するときにこれをインポートすることでサポートされます。
spark.implicits._ をインポートします。
// JSON データセットはパスによって指し示されます。
// パスは単一のテキストファイルまたはテキストファイルを格納するディレクトリのいずれかになります
val パス = "examples/src/main/resources/people.json"
val peopleDF = spark.read.json(パス)
// 推論されたスキーマはprintSchema()メソッドを使用して視覚化できます
peopleDF.printSchema()
//根
// |-- age: long (nullable = true)
// |-- name: 文字列 (nullable = true)
// DataFrameを使用して一時ビューを作成します
peopleDF.createOrReplaceTempView("人")
// SQL文はSparkが提供するSQLメソッドを使って実行できる
val teenagerNamesDF = spark.sql("年齢が13から19の間であるpeopleからnameを選択")
ティーンエイジャー名DF.show()
// +------+
// | 名前|
// +------+
// |ジャスティン|
// +------+
// あるいは、JSONデータセットのDataFrameを作成することもできます。
// 文字列ごとに1つのJSONオブジェクトを格納するDataset[String]
val otherPeopleDataset = spark.createDataset(
"""{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil)
val otherPeople = spark.read.json(otherPeopleDataset)
他のユーザー.表示()
// +--------------+----+
// | 住所|名前|
// +--------------+----+
// |[オハイオ州コロンバス]| 陰|
// +--------------+----+

ODBC ドライバ

Spark SQL は、JDBC を介してリレーショナル データベースからデータを読み取ることで DataFrame を作成できます。DataFrame で一連の計算を行った後、データをリレーショナル データベースに書き戻すことができます。

関連するデータベース ドライバーを Spark クラス パスに配置する必要があることに注意してください。

$ bin/spark-shell --master spark://hadoop001:7077 --jars mysql-connector-java-5.1.27-bin.jar
// 注: JDBC の読み込みと保存は、load/save メソッドまたは jdbc メソッドのいずれかを使用して実行できます。
// JDBCソースからデータをロードする
val jdbcDF = spark.read.format("jdbc").option("url", "jdbc:mysql://hadoop001:3306/rdd").option("dbtable", "rddtable").option("user", "root").option("password", "hive").load()
val connectionProperties = 新しい Properties()
connectionProperties.put("user", "root")
connectionProperties.put("パスワード", "ハイブ")
val jdbcDF2 = spark.read
.jdbc("jdbc:mysql://hadoop001:3306/rdd", "rddtable", 接続プロパティ)
// データをJDBCソースに保存する
jdbcDF.書き込み
.format("jdbc")
.option("url", "jdbc:mysql://hadoop001:3306/rdd")
.option("dbtable", "rddtable2")
.option("ユーザー", "ルート")
.option("パスワード", "ハイブ")
。保存()
jdbcDF2.書き込み
.jdbc("jdbc:mysql://hadoop001:3306/mysql", "db", 接続プロパティ)
// 書き込み時にテーブル列のデータ型を指定する
jdbcDF.書き込み
.option("createTableColumnTypes", "名前 CHAR(64), コメント VARCHAR(1024)")
.jdbc("jdbc:mysql://hadoop001:3306/mysql", "db", 接続プロパティ)

以上がこの記事の全内容です。皆様の勉強のお役に立てれば幸いです。また、123WORDPRESS.COM を応援していただければ幸いです。

以下もご興味があるかもしれません:
  • Spark SQL 2.4.8 データフレームを操作する2つの方法
  • SparkSessionとsparkSQLを作成する詳細なプロセス
  • IDEA 開発と SparkSQL の構成、およびシンプルなユースケース コード
  • Spark SQLの全体的な実装ロジックの分析
  • pyspark の MySQL データベースの読み取りと書き込みの実装
  • SparkSQLはHiveデータを読み取り、ローカルアイデアを詳細に実行します
  • Apache HudiはSpark SQLを統合してhideテーブルを操作する

<<:  JavaScript での AOP プログラミングの基本実装

>>:  CentOS に Nginx をインストールする方法

推薦する

VUEウォッチリスナーの基本的な使い方の詳しい説明

目次1. 次のコードはwatchの簡単な使用法です2. 即時監視3. ハンドラメソッド4. 深い属性...

この記事では、Vueのフロントエンドページングとバックエンドページングを実装する方法を説明します。

目次1: フロントエンドの手書きページング(データ量が少ない場合) 2: バックエンドのページング、...

VUE を使用して Ali Iconfont ライブラリをオンラインで呼び出す方法

序文何年も前、私はサーバー側の初心者でしたが、業界の競争が激しくなるにつれて、フロントエンドの初心者...

webpack イメージを base64 に変換する例

url-loader をダウンロード 糸を追加 -D URLローダー モジュール: { ルール: {...

ネイティブ JS でスネーク ゲームを書く

この記事では、参考までに、JSでスネークゲームを書くための具体的なコードを紹介します。具体的な内容は...

Vue の双方向イベントバインディング v-model の原理についての簡単な説明

目次説明する:要約する補充するDOM を直接変更して操作する js や jQuery とは異なり、V...

MySQL の連結で複数の一重引用符と三重引用符を使用する際の問題

文字列を動的に連結する場合、文字連結を使用することが多いです。次のような連結の引用符の意味がわかりま...

IMG での UserMap の使用例

usemap は <img> タグの属性であり、使用するイメージ マップの名前を指定する...

Docker ベースの Etcd 分散デプロイメントの方法と手順

1. 環境整備1.1 基本環境NTP設定: 省略 #時間の一貫性を確保するためにNTPサービスを設定...

TSオブジェクトのスプレッド演算子とレスト演算子の詳細な説明

目次概要オブジェクトの残り属性オブジェクトの拡張プロパティオブジェクトの浅いコピーを作成するkeyo...

Debian Dockerコンテナにcrontabスケジュールタスクを追加する

現在、DockerイメージのほとんどはDebianベースです # cat /etc/issue De...

MySQLとOracleの違いのまとめ(機能性能の比較、選択、使用時のSQLなど)

1. 同時実行性同時実行性は OLTP データベースの最も重要な機能ですが、同時実行性にはリソース...

webpackでHMRを手動で実装するいくつかの方法

目次1. はじめに2. GitHub 3. 基本構成プロジェクトディレクトリパッケージ.json c...

Vue イベントの $event パラメータ = イベント値の場合

テンプレート <el-table :data="データリスト"> &...