Spark RDD をデータフレームに変換し、それを MySQL に書き込む例

Spark RDD をデータフレームに変換し、それを MySQL に書き込む例

DataframeはSpark 1.3.0で導入された新しいAPIで、Sparkで大規模な構造化データを処理できるようになります。従来のRDD変換方式よりも使いやすく、計算性能も2倍高速化されていると言われています。 Sparkは、オフラインバッチ処理またはリアルタイムコンピューティングでRDDをDataFrameに変換し、簡単なSQLコマンドでデータを操作できます。SQLに精通している人にとっては、変換とフィルタリングのプロセスは非常に便利で、より高レベルのアプリケーションを実現することもできます。たとえば、リアルタイムでは、Kafkaのトピック名とSQLステートメントが渡され、バックグラウンドで構成されたコンテンツフィールドを読み取ってクラスに反映し、入力SQLと出力SQLを使用してリアルタイムデータを計算します。この場合、Spark Streamingを知らない人でも、リアルタイムコンピューティングのメリットを簡単に享受できます。

次の例は、ローカルファイルをRDDに読み込み、それを暗黙的にDataFrameに変換してデータをクエリし、最後にMySQLテーブルに追加形式で書き込むプロセスです。Scalaコードの例は次のとおりです。

java.sql.Timestamp をインポートする
org.apache.spark.sql.{SaveMode, SQLContext} をインポートします。
org.apache.spark.{SparkContext, SparkConf} をインポートします。
オブジェクト DataFrameSql {
 case class memberbase(data_date:Long,memberid:String,createtime:Timestamp,sp:Int)はSerializableを拡張します{
 def toString をオーバーライドします: String="%d\t%s\t%s\t%d".format(data_date,memberid,createtime,sp)
 }
 def main(args:Array[String]): 単位 = {
 val conf = 新しい SparkConf()
 conf.setMaster("local[2]")
// ----------------------
 //パラメータ spark.sql.autoBroadcastJoinThreshold はテーブルをブロードキャストするかどうかを設定します。デフォルトは 10M で、無効にするには -1 に設定します //spark.sql.codegen は SQL を Java バイトコードにプリコンパイルするかどうかを設定します。長い SQL や頻繁な SQL は最適化効果があります //spark.sql.inMemoryColumnarStorage.batchSize は一度に処理される行数です。oom に注意してください
 //spark.sql.inMemoryColumnarStorage.compressed は、メモリ内の列ストレージを圧縮する必要があるかどうかを設定します// ----------------------
 conf.set("spark.sql.shuffle.partitions","20") //デフォルトのパーティションは200です conf.setAppName("dataframe test")
 val sc = 新しい SparkContext(conf)
 val sqc = 新しいSQLContext(sc)
 val ac = sc.accumulator(0,"失敗数")
 val ファイル = sc.textFile("src\\main\\resources\\000000_0")
 val log = file.map(行 => 行.split(" ")).filter(行 =>
  if (line.length != 4) { // 単純なフィルターを実行する ac.add(1)
  間違い
  } それ以外の場合は true)
  .map(行 => memberbase(行(0).toLong, 行(1),Timestamp.valueOf(行(2)), 行(3).toInt))
 // 方法 1: 暗黙的な変換を使用する import sqc.implicits._
 val dftemp = log.toDF() // 変換/*
  方法 2: createDataFrame メソッドを使用して、内部的にリフレクションを使用してフィールドとその型を取得します。val dftemp = sqc.createDataFrame(log)
  */
 val df = dftemp.registerTempTable("memberbaseinfo")
 /*val sqlcommand = "date_format(createtime,'yyyy-MM')をmmとして、count(1)をnumsとして選択" +
  "memberbaseinfo から date_format(createtime,'yyyy-MM') でグループ化" +
  「数字の降順、mm の昇順で並べ替え」*/
 val sqlcommand="memberbaseinfo から * を選択"
 val sel = sqc.sql(sqlコマンド)
 val prop = 新しい java.util.Properties
 prop.setProperty("ユーザー","etl")
 prop.setProperty("パスワード","xxx")
 //DataFrameWriterを呼び出してmysqlにデータを書き込む
 val dataResult = sqc.sql(sqlcommand).write.mode(SaveMode.Append).jdbc("jdbc:mysql://localhost:3306/test","t_spark_dataframe_test",prop) // テーブルが存在しない可能性があります println(ac.name.get+" "+ac.value)
 sc.stop()
 }
}

上記コードの textFile 内のサンプル データは次のとおりです。データは Hive から取得されます。フィールド情報は、パーティション番号、ユーザー ID、登録時間、およびサードパーティ番号です。

20160309 45386477 2012-06-12 20:13:15 901438
20160309 45390977 2012-06-12 22:38:06 901036
20160309 45446677 2012-06-14 21:57:39 901438
20160309 45464977 2012-06-15 13:42:55 901438
20160309 45572377 2012-06-18 14:55:03 902606
20160309 45620577 2012-06-20 00:21:09 902606
20160309 45628377 2012-06-20 10:48:05 901181
20160309 45628877 2012-06-20 11:10:15 902606
20160309 45667777 2012-06-21 18:58:34 902524
20160309 45680177 2012-06-22 01:49:55 
20160309 45687077 2012-06-22 11:23:22 902607

ここでのフィールド タイプのマッピング、つまり、公式 Web サイトのスクリーンショットに示されているように、ケース クラスからデータフレームへのマッピングに注意してください。

詳細については、公式ドキュメント「Spark SQLおよびDataFrameガイド」を参照してください。

Spark RDD をデータフレームに変換して MySQL に書き込む上記の例が、私が皆さんと共有したいことのすべてです。これが皆さんの参考になれば幸いです。また、123WORDPRESS.COM をサポートしていただければ幸いです。

以下もご興味があるかもしれません:
  • SparkSQLはIDEAを使用して、DataFrameとDataSetをすぐに使い始めることができます。
  • DataFrame: SparkSql を介して Scala クラスを DataFrame に変換する方法
  • pyspark.sql.DataFrame と pandas.DataFrame 間の変換例
  • DataFrameとSparkSqlの値の誤解についての簡単な説明
  • Spark SQL 2.4.8 データフレームを操作する2つの方法

<<:  DockerでSpring Bootアプリケーションを実行する方法

>>:  Vueフィルターの使い方

推薦する

mysql5.7.21 utf8 エンコーディングの問題と Mac 環境での解決方法

1. 目標: mysql の character_set_server の値を latin1 から ...

HTML 5 プレビュー

<br />オリジナル: http://www.alistapart.com/artic...

Vueはカスタムツリーコンポーネントを再帰的に実装します

この記事では、カスタムツリーコンポーネントを再帰的に実装するVueの具体的なコードを参考までに共有し...

ElementUI の this.$notify.close() 呼び出しが機能しない問題の解決方法

目次要件の説明問題の説明問題分析問題解決質問の拡張要件の説明このプロジェクトでは、まずユーザーが質問...

React スキャフォールディングのパスエイリアスを設定する方法

この記事を書いている時点でのReactのバージョンは16.13.1です1 npm run eject...

JavaScript 戦略パターンを使用してフォームを検証する方法

目次概要戦略パターンを使用しないフォーム検証戦略パターンを使用して最適化する戦略パターンの利点要約す...

ネイティブ js でカスタム難易度のマインスイーパ ゲームを実装する

この記事の例では、マインスイーパゲームを実装するためのjsの具体的なコードを参考までに共有しています...

MySQLデータベースのマスタースレーブレプリケーションの原理と機能の分析

目次1. データベースのマスター/スレーブ分類: 2. MySQL マスタースレーブの紹介3. マス...

Nginx ロケーション設定(ロケーションのマッチング順序)の詳細な説明

ロケーションは「位置指定」を意味し、主にさまざまな位置指定のための URI に基づいています。これは...

Docker MySQLコンテナデータベースへの変更が有効にならない問題を解決する

公式の MySQL イメージを使用するには、構成ファイル、DB データ ファイル ディレクトリなどの...

js 実行コンテキストとスコープの概要

目次序文文章1. JavaScriptコードの実行プロセスに関連する概念2. 実行コンテキストと実行...

Vue h関数の使い方の詳しい説明

目次1. 理解2. 使用1. h() パラメータ2. 使い方が簡単3. カウンターケースを実装する4...

MySQLデータ内の多数の改行と復帰に対する解決策

目次問題を見つける1. 改行と復帰を削除する方法2. SELECTクエリで「改行と復帰」を無視する方...

CentOS プラットフォーム上で LAMP 環境を素早く構築する方法

この記事では、例を使用して、CentOS プラットフォーム上で LAMP 環境を迅速に構築する方法に...