背景 データ ウェアハウス モデリングでは、何ら処理されていない元のビジネス レイヤー データは ODS (Operational Data Store) データと呼ばれます。インターネット企業では、一般的な ODS データには、業務ログ データ (Log) と業務 DB データ (DB) が含まれます。ビジネス DB データの場合、MySQL などのリレーショナル データベースからビジネス データを収集し、それを Hive にインポートすることが、データ ウェアハウス作成の重要な部分です。 MySQL データを Hive に正確かつ効率的に同期するにはどうすればよいですか?一般的に使用されるソリューションは、データをバッチで取得してロードすることです。つまり、MySQL に直接接続してテーブルからデータを選択し、それを中間ストレージとしてローカル ファイルに保存し、最後にファイルを Hive テーブルにロードします。このソリューションの利点は実装が簡単なことですが、ビジネスが発展するにつれて、その欠点が徐々に明らかになってきます。
これらの問題を完全に解決するために、当社は徐々に CDC (Change Data Capture) + Merge 技術ソリューション、つまりリアルタイム Binlog 収集 + Binlog のオフライン処理によるビジネス データの復元ソリューションに移行しました。 Binlog は MySQL のバイナリ ログであり、MySQL 内のすべてのデータの変更を記録します。MySQL クラスター自体のマスター スレーブ同期は Binlog に基づいています。 この記事では、主に、Binlog のリアルタイム収集と、Binlog のオフライン処理によるビジネス データの復元という 2 つの側面から、DB データをデータ ウェアハウスに正確かつ効率的に入力する方法を紹介します。 全体的なアーキテクチャ 全体的なアーキテクチャは上の図に示されています。リアルタイムの Binlog 収集に関しては、MySQL から Binlog をリアルタイムで取得し、適切な解析を完了する Alibaba のオープンソース プロジェクト Canal を採用しました。 Binlog が収集されると、ダウンストリームでの使用のために Kafka に一時的に保存されます。全体のリアルタイム取得部分は図中の赤い矢印で示されています。 図の黒い矢印で示されているように、オフライン Binlog 処理では、次の手順で Hive 上の MySQL テーブルを復元します。
背景で紹介したバッチデータ取得およびロードソリューションが遭遇したさまざまな問題を振り返ってみましょう。なぜこのソリューションは上記の問題を解決できるのでしょうか?
Binlogリアルタイム収集 Binlog のリアルタイム収集には、2 つの主要モジュールが含まれます。1 つは CanalManager で、主に収集タスクの割り当て、アラームの監視、メタデータ管理、外部の依存システムとのドッキングを担当します。もう 1 つは Canal と CanalClient で、実際に収集タスクを実行します。 ユーザーが特定の DB の Binlog 収集リクエストを送信すると、CanalManager はまず DBA プラットフォームの関連インターフェイスを呼び出して、DB が配置されている MySQL インスタンスに関する関連情報を取得し、Binlog 収集に最適なマシンを選択します。次に、コレクション インスタンス (Canal インスタンス) を適切な Canal サーバー (CanalServer) に配布します。特定の CanalServer を選択する場合、CanalManager は負荷分散やデータセンター間の転送などの要素を考慮し、同じリージョン内の負荷と転送が低いマシンを優先します。 CanalServer はコレクション要求を受信すると、ZooKeeper にコレクション情報を登録します。登録内容には以下が含まれます。
これには 2 つの目的があります。
Binlog へのサブスクリプションは MySQL DB の粒度に基づいており、DB の Binlog は Kafka トピックに対応します。基盤となる実装では、MySQL インスタンスの下にあるすべてのサブスクライブされた DB が同じ Canal インスタンスによって処理されます。これは、Binlog が MySQL インスタンスの粒度で生成されるためです。 CanalServer はサブスクライブ解除された Binlog データを破棄し、CanalClient は受信した Binlog を DB の粒度に応じて Kafka に配布します。 MySQLデータをオフラインで復元する Binlog の収集が完了したら、次のステップは Binlog を使用してビジネス データを復元することです。解決すべき最初の問題は、Kafka から Hive に Binlog を同期することです。 カフカ2ハイブ Kafka2Hive タスク全体の管理は、他の ETL と同様に、タスク プリミティブの表現やスケジューリング メカニズムを含め、Meituan データ プラットフォームの ETL フレームワークの下で実行されます。基盤レイヤーは LinkedIn のオープンソース プロジェクト Camus を使用し、実際の Kafka2Hive データ転送作業を完了するためにターゲットを絞った二次開発を実行します。 カミュの二次的発展 Kafka に保存される Binlog にはスキーマがありませんが、Hive テーブルにはスキーマが必要であり、そのパーティション、フィールドなどの設計によってダウンストリームでの効率的な消費が容易になる必要があります。 Camus に加えられる最初の変更は、Kafka 上の Binlog をターゲット スキーマに準拠する形式に解析することです。 Camus の 2 番目の変換は、Meituan の ETL フレームワークによって決定されました。当社のタスク スケジューリング システムでは、現在、同じスケジューリング キュー内のタスクの上流および下流の依存関係のみを分析しており、スケジューリング キュー間で依存関係を確立することはできません。 MySQL2Hive のプロセス全体において、Kafka2Hive タスクは 1 時間に 1 回実行する必要があります (時間単位のキュー)。また、Merge タスクは 1 日に 1 回実行する必要があります (日単位のキュー)。マージ タスクの起動は、1 時間ごとの Kafka2Hive タスクの完了に厳密に依存する必要があります。 この問題を解決するために、Checkdone タスクを導入しました。 Checkdone タスクは毎日のタスクであり、主に Kafka2Hive が前日に正常に完了したかどうかを確認する役割を担います。正常に完了すると、Checkdone タスクが正常に実行され、下流の Merge タスクを正しく開始できるようになります。 Checkdone 検出ロジック Checkdone はどのように検出しますか?各 Kafka2Hive タスクがデータ転送を正常に完了すると、Camus は対応する HDFS ディレクトリにタスクの開始時刻を記録する役割を担います。 Checkdone は前日のすべてのタイムスタンプをスキャンします。最大のタイムスタンプが 0:00 を超える場合、前日の Kafka2Hive タスクが正常に完了し、Checkdone による検出が完了したことを意味します。 さらに、Camus 自体は Kafka を読み取って HDFS ファイルを書き込むプロセスのみを完了するため、下流でクエリを実行する前に Hive パーティションの読み込みも完了する必要があります。したがって、Kafka2Hive タスク全体の最後のステップは、Hive パーティションをロードすることです。このようにして、タスク全体が正常に実行されたと見なされます。 各 Kafka2Hive タスクは、特定のトピックを読み取り、Binlog データを original_binlog ライブラリの下のテーブル (前の図の original_binlog.db) に書き込む役割を担います。このテーブルには、MySQL DB に対応するすべての Binlog が格納されます。 上図は、Kafka2Hive が完了した後の HDFS 上のファイルのディレクトリ構造を示しています。 MySQL DB が user と呼ばれる場合、対応する Binlog は original_binlog.user テーブルに保存されます。 ready ディレクトリには、その日に正常に実行されたすべての Kafka2Hive タスクの開始時刻が Checkdone で使用するために毎日保存されます。各テーブルの Binlog はパーティションに編成されます。たとえば、userinfo テーブルの Binlog はパーティション table_name=userinfo に保存されます。各 table_name の第 1 レベル パーティションの下で、第 2 レベル パーティションは dt によって編成されます。図の xxx.lzo ファイルと xxx.lzo.index ファイルには、lzo 圧縮された Binlog データが保存されます。 マージ Binlog が正常に保存されたら、次のステップは Binlog に基づいて MySQL データを復元することです。マージ プロセスでは、2 つの処理が実行されます。まず、その日に生成された Binlog データを Delta テーブルに保存し、次に既存の在庫データとの主キー ベースのマージを実行します。デルタ テーブル内のデータは、その日の最新のデータです。 1 日にデータが複数回変更された場合、デルタ テーブルには最後の変更後のデータのみが保存されます。 Delta データと株価データをマージする場合、それらが同じデータであるかどうかを判断するために一意のキーが必要です。在庫テーブルとデルタ テーブルの両方に同じデータが表示される場合は、データが更新されたことを意味し、デルタ テーブル内のデータが最終結果として選択されます。それ以外の場合は、変更は発生していないことを意味し、元の在庫テーブルのデータが最終結果として保持されます。マージの結果データは、図の origindb.table である元のテーブルに挿入上書きされます。 マージプロセスの例 次の例は、マージ プロセスを示しています。 データ テーブルには、id と value の 2 つの列があり、id が主キーです。デルタ データを抽出する場合、同じデータが複数回更新された場合は、最後に更新されたものだけが選択されます。したがって、id=1 のデータの場合、Delta テーブルには最後に更新された値 value=120 が記録されます。デルタ データと既存のデータがマージされた後、最終結果では、新しいデータ (id=4) が挿入され、2 つのデータ (id=1 と id=2) が更新され、1 つのデータは変更されません (id=3)。 デフォルトでは、この重複検出の一意キーとして MySQL テーブルの主キーを使用します。実際の状況に応じて、MySQL とは異なる一意キーを企業が設定することもできます。 上記では、Binlog ベースのデータ収集と ODS データ復元の全体的なアーキテクチャについて紹介しました。以下では、主に2つの側面から私たちが解決した実際のビジネス上の課題を紹介します。 実践1: シャーディングのサポート ビジネス規模が拡大するにつれて、MySQL にはますます多くのシャード データベースとテーブルが含まれるようになり、多くの企業ではシャード テーブルの数が数千に上ります。通常、データ開発者は分析のためにこのデータを集約する必要があります。各シャード テーブルを手動で同期してから Hive に集約する必要がある場合、そのコストを受け入れることは困難です。したがって、ODS レイヤーでサブテーブルの集計を完了する必要があります。 まず、リアルタイムで Binlog を収集する場合、異なる DB からの Binlog を同じ Kafka Topic に書き込むことをサポートします。 Binlog 収集を申請する場合、ユーザーは同じビジネス ロジックの下にある複数の物理 DB を同時に選択できます。 Binlog コレクション レイヤーで集約すると、すべてのシャードの Binlog が同じ Hive テーブルに書き込まれるため、ダウンストリームではマージを実行するときに 1 つの Hive テーブルのみを読み取る必要があります。 次に、マージ タスクの構成では正規表現の一致がサポートされます。ビジネス パーティション テーブルの命名規則に準拠する正規表現を構成することにより、マージ タスクはどの MySQL テーブル Binlog を集計する必要があるかを理解し、対応するパーティションのデータを選択して実行できるようになります。 このように、2 つのレベルでの作業により、ODS レイヤーでのサブデータベースとサブテーブルのマージが完了します。 ここでは技術的な最適化が行われています。Kafka2Hive を実行する際に、ビジネス テーブルのパーティション分割ルールに従ってテーブル名を処理し、物理テーブル名を論理テーブル名に変換しました。たとえば、テーブル名 userinfo123 は userinfo に変換され、その Binlog データは original_binlog.user テーブルの table_name=userinfo パーティションに保存されます。これは、小さな HDFS ファイルと Hive パーティションが多すぎることによって引き起こされる根本的な負荷を防ぐために行われます。 練習2: 削除イベントのサポート 削除操作は MySQL では非常に一般的です。Hive は削除をサポートしていないため、MySQL で削除されたデータを Hive でも削除したい場合は、「回避的な」方法を使用する必要があります。 削除イベントを処理する必要があるマージ プロセスでは、次の 2 つの手順が使用されます。
見通し データ ウェアハウス作成の基盤として、Meituan のデータ プラットフォームが提供する Binlog ベースの MySQL2Hive サービスは、基本的に Meituan 内のすべての業務ラインをカバーしています。現在、ほとんどの企業のデータ同期のニーズを満たし、DB データの正確で効率的なウェアハウスを実現できます。今後の開発では、CanalManager の単一点問題の解決に注力し、データセンター間災害復旧アーキテクチャを構築して、より安定的にビジネス展開をサポートしていきます。 この記事では、主にこのサービスのアーキテクチャを、Binlog ストリーミング収集と Binlog ベースの ODS データ復元という 2 つの側面から紹介し、実際に遭遇したいくつかの典型的な問題と解決策を紹介します。これが他の開発者にとって参考になることを願っています。どなたでもお気軽にご連絡ください。 要約する 以上がこの記事の全内容です。この記事の内容が皆様の勉強や仕事に何らかの参考学習価値をもたらすことを願います。123WORDPRESS.COM をご愛顧いただき、誠にありがとうございます。これについてもっと知りたい場合は、次のリンクをご覧ください。 以下もご興味があるかもしれません:
|
<<: CentOS7 インストール Zabbix 4.0 チュートリアル (イラストとテキスト)
この記事では、主に次のような Vue ドラッグ アンド ドロップの簡単な実装を紹介します。レンダリン...
最近、モバイルページを制作する際には、レイアウトにインラインブロック要素がよく使われますが、インライ...
日常業務では、スペースのないファイルに遭遇することがよくあります。これにより、削除操作がはるかに簡単...
Linux での MySQL データベースのマスター/スレーブ同期構成の利点は、この方法をバックアッ...
この記事の例はすべて小さなプログラムで書かれていますが、実装される機能には影響しません。 wxmlル...
目次1. 再帰とは何ですか? 2. 再帰を使って数学の問題を解く1. 1 * 2 * 3 * 4 …...
一般的に言えば、HTML ドキュメント内で極端に大きな <ol> リストに遭遇する可能性...
MySQL 全文検索中国語ソリューション最近、会社のプロジェクトで、データベースで中国語を検索する機...
パノラマビュー効果を見てみましょう: 住所を表示スクリーンショット: 体験してみると、周囲の環境がぐ...
説明 ソリューションVMware 15 仮想マシン ブリッジ モードではインターネットにアクセスでき...
デモコマンドをカスタマイズするVue カスタム ディレクティブの構文は次のとおりです。 Vue.di...
私はtengineを使用しています。インストールディレクトリは/usr/local/tengineで...
目次font-faceでフォントを正しく宣言するフォントをプリロードするフォントをホストするにはli...
まずは効果を見てみましょう:この効果は非常に華やかに見えますが、原理は複雑ではありません。1 枚の花...
<br />前回のWebデザインチュートリアル:Webデザインチュートリアル(3):デザ...