特定の MySQL テーブルの完全データと増分データをメッセージ キューに同期する - ソリューション

特定の MySQL テーブルの完全データと増分データをメッセージ キューに同期する - ソリューション

1. 当初の需要

特定の MySQL ライブラリ内の特定のテーブルの元の完全なデータと増分データをリアルタイムで同期する必要があり、対応する変更と削除も同期する必要があります。

データの同期は邪魔にならないようにする必要があります。つまり、ビジネス手順を変更したり、ビジネス側に過度のパフォーマンス圧力をかけたりしてはなりません。

アプリケーション シナリオ: データ ETL 同期とビジネス サーバーへの負荷の軽減。

2. 解決策

3. 運河の導入と設置

Canal は、純粋な Java で開発された Alibaba のオープンソース プロジェクトです。データベースの増分ログ分析に基づいて、増分データのサブスクリプションと消費を提供し、現在は主に MySQL をサポートしています (mariaDB もサポートしています)。

動作原理: MySQL マスタースレーブレプリケーションの実装

大まかに見ると、レプリケーションは次の 3 つのステップに分かれます。

  1. マスターはバイナリ ログに変更を記録します (これらの記録はバイナリ ログ イベントと呼ばれ、show binlog events で表示できます)。
  2. スレーブはマスターのバイナリ ログ イベントをリレー ログにコピーします。
  3. スレーブはリレー ログ内のイベントをやり直し、データを自身のものに反映するように変更します。

運河の仕組み

原理は比較的単純です。

  1. Canal は MySQL スレーブの対話型プロトコルをシミュレートし、MySQL スレーブのふりをして、ダンプ プロトコルを MySQL マスターに送信します。
  2. MySQL マスターはダンプ要求を受信し、バイナリ ログをスレーブ (チャネル) にプッシュし始めます。
  3. Canalはバイナリログオブジェクト(元々はバイトストリーム)を解析します

建築

例:

  • サーバーは、JVMに対応するチャネル実行インスタンスを表します。
  • インスタンスはデータ キューに対応します (1 つのサーバーは 1..n 個のインスタンスに対応します)

インスタンスモジュール:

  • eventParser (データ ソース アクセス、スレーブ プロトコルとマスターの相互作用のシミュレーション、プロトコル解析)
  • eventSink (パーサーおよびストア コネクタ、データのフィルタリング、処理、および配信を実行)
  • eventStore (データストレージ)
  • metaManager (増分サブスクリプションおよび消費情報マネージャー)

インストール

1. MySQLとKafka環境の準備

2. canal をダウンロード: wget https://github.com/alibaba/canal/releases/download/canal-1.1.3/canal.deployer-1.1.3.tar.gz

3. 解凍: tar -zxvf canal.deployer-1.1.3.tar.gz

4. confディレクトリ内のファイルパラメータを設定する

canal.properties を設定します。

conf/example に入り、instance.properties を設定します。

5. 起動: bin/startup.sh

6. ログの表示:

4. 検証

1. 対応するKafkaコンシューマーを開発する

パッケージ org.kafka;

java.util.Arrays をインポートします。
java.util.Properties をインポートします。
org.apache.kafka.clients.consumer.ConsumerRecord をインポートします。
org.apache.kafka.clients.consumer.ConsumerRecords をインポートします。
org.apache.kafka.clients.consumer.KafkaConsumer をインポートします。
org.apache.kafka.common.serialization.StringDeserializer をインポートします。


/**
 *
 * タイトル: KafkaConsumerTest
 * 説明:
 * kafka コンシューマーデモ
 * バージョン:1.0.0
 * @著者 パンCM
 * @日付 2018年1月26日 */
パブリッククラス KafkaConsumerTest は Runnable を実装します {

    プライベート最終 KafkaConsumer<String, String> コンシューマー;
    プライベート ConsumerRecords<String, String> msgList;
    プライベート最終文字列トピック;
    プライベート静的最終文字列 GROUPID = "groupA";

    パブリック KafkaConsumerTest(文字列トピック名) {
        プロパティ props = new Properties();
        props.put("bootstrap.servers", "192.168.7.193:9092");
        props.put("group.id", GROUPID);
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("セッションタイムアウト.ms", "30000");
        props.put("auto.offset.reset", "最新");
        props.put("key.deserializer", StringDeserializer.class.getName());
        props.put("value.deserializer", StringDeserializer.class.getName());
        this.consumer = 新しい KafkaConsumer<String, String>(props);
        this.topic = トピック名;
        this.consumer.subscribe(Arrays.asList(トピック));
    }

    @オーバーライド
    パブリックボイド実行() {
        int メッセージ番号 = 1;
        System.out.println("---------消費開始---------");
        試す {
            のために (; ; ) {
                メッセージリスト = consumer.poll(1000);
                if (null != msgList && msgList.count() > 0) {
                    (ConsumerRecord<String, String> レコード: msgList) {
                        // 100 件のレコードを消費した後に印刷しますが、印刷されたデータはこのパターンに従わない可能性があります System.out.println(messageNo + "=======receive: key = " + record.key() + ", value = " + record.value() + " offset===" + record.offset());


// 文字列 v = decodeUnicode(record.value());

// System.out.println(v);

                        // 1000 メッセージが消費されたら終了 if (messageNo % 1000 == 0) {
                            壊す;
                        }
                        メッセージNo++;
                    }
                } それ以外 {
                    スレッド.sleep(11);
                }
            }
        } キャッチ (InterruptedException e) {
            e.printStackTrace();
        ついに
            コンシューマーを閉じます。
        }
    }

    パブリック静的void main(String args[]) {
        KafkaConsumerTest test1 = 新しい KafkaConsumerTest("サンプルデータ");
        スレッド thread1 = new Thread(test1);
        スレッド1を開始します。
    }


    /*
     * 中国語をユニコードに変換*/
    パブリック静的文字列 gbEncoding(最終的な文字列 gbString) {
        char[] utfBytes = gbString.toCharArray();
        文字列 unicodeBytes = "";
        (int i = 0; i < utfBytes.length; i++) の場合 {
            文字列 hexB = Integer.toHexString(utfBytes[i]);
            (hexB.length() <= 2)の場合{
                hexB = "00" + hexB;
            }
            ユニコードバイト = ユニコードバイト + "\\u" + hexB;
        }
        unicodeBytes を返します。
    }

    /*
     * 中国語への Unicode エンコード */
    パブリック静的文字列decodeUnicode(final String dataStr) {
        開始 = 0;
        int 終了 = 0;
        最終的なStringBufferバッファ = 新しいStringBuffer();
        (開始 > -1) の間 {
            終了 = dataStr.indexOf("\\u", 開始 + 2);
            文字列 charStr = "";
            終了 == -1 の場合 {
                charStr = dataStr.substring(start + 2, dataStr.length());
            } それ以外 {
                charStr = dataStr.substring(開始 + 2、終了);
            }
            char letter = (char) Integer.parseInt(charStr, 16); // 16進整数文字列を解析します。
            buffer.append(新しいCharacter(letter).toString());
            開始 = 終了;
        }
        buffer.toString() を返します。

    }
}

2. テーブルbak1にデータを追加する

テーブル `bak1` を作成します (
  `vin` varchar(20) NOT NULL,
  `p1` ダブルデフォルト NULL、
  `p2` ダブルデフォルト NULL、
  `p3` ダブルデフォルト NULL、
  `p4` ダブルデフォルト NULL、
  `p5` ダブルデフォルト NULL、
  `p6` ダブルデフォルト NULL、
  `p7` ダブルデフォルト NULL、
  `p8` ダブルデフォルト NULL、
  `p9` ダブルデフォルト NULL、
  `p0` ダブル デフォルト NULL
) エンジン=InnoDB デフォルト文字セット=utf8mb4

表示テーブル bak1 を作成します。

insert into bak1 select '李雷abcv',
  `p1`、
  `p2`、
  `p3`、
  `p4`、
  `p5`、
  `p6`、
  `p7`、
  `p8`、
  `p9`、
  moci 制限 10 からの `p0`

3. 出力結果を表示します。

これで、特定の MySQL テーブルの完全データと増分データをメッセージ キューに同期するソリューションに関するこの記事は終了です。特定の MySQL テーブルでデータを同期する方法の詳細については、123WORDPRESS.COM の以前の記事を検索するか、次の関連記事を引き続き参照してください。今後とも 123WORDPRESS.COM をよろしくお願いいたします。

以下もご興味があるかもしれません:
  • MySQLからElasticsearchにデータを同期する方法の詳細な説明
  • Python を使用して MySQL データを ElasticSearch に同期する方法のチュートリアル
  • node.js を使用して MongoDB データを MySQL に同期する手順
  • MySQL5.6 マスタースレーブレプリケーション (mysql データ同期構成)
  • MySQLマスタースレーブデータ同期遅延の削減の詳細な説明
  • 2つのテーブル間でデータを同期するためのmysqlトリガー
  • MySQLデータ同期におけるSlave_IO_Running:Noの問題の解決方法のまとめ
  • MySQLのバックアップと移行データの同期方法
  • MYSQL5 マスタースレーブデータ同期設定方法
  • MySQLデータを同期する方法

<<:  Tencent Cloud Server での Jenkins の設定方法の詳細

>>:  メッセージ ボタンに数量バッジを追加する HTML コード

推薦する

Linux カーネル デバイス ドライバー Linux カーネル 基本メモの概要

1. Linuxカーネルドライバモジュールの仕組み静的ロードでは、ドライバモジュールをカーネルにコン...

床スクロール効果を実現する js

この記事ではjQueryを使用して、階段のスライド効果を実装し、フロアをスクロールし、フロアボタンを...

概要ページでのフロートとクリアフロート

1. フロート: 主な目的は、テキストを画像の周囲に折り返す効果を実現することです。また、複数列レイ...

MySQL データベースで機密データの暗号化と復号化を実装する方法

目次1. 準備2. MySQL暗号化関数方式2.1 MySQL 暗号化2.2 MYSQL 復号化3....

jQueryは動的タグイベントを実装します

この記事では、タグイベントを動的に追加するためのjQueryの具体的なコードを参考までに紹介します。...

HTMLページのヘッダーコードは完全に明確です

以下のコードはすべて <head>...</head> の間にあり、具体的な...

HTML webpackプラグインの使用に関する簡単な分析

html-webpack-pluginプラグインを使用してページを開始すると、htmlページをメモリ...

テーブルセルの幅tdの設定は無効であり、内部コンテンツによって常に引き伸ばされます

テーブルページを作成するときに、td に設定された幅が無効になることがあります。td の幅は常に内部...

Linux でのファイルの編集、保存、終了の実践的な説明

Linux でファイルを編集した後、保存して終了するにはどうすればよいですか?保存して終了するコマン...

MySQL で浮動小数点データを文字データに変換するときに起こりうる問題の詳細な説明

序文この記事は主に、MySQL で浮動小数点型を文字型に変換するときに発生する問題を紹介します。これ...

ウェブデザインにおけるキーワード設計手法の紹介

多くの場合、ホームページを作成するときに、Web ページ ヘッダー属性の設定を無視します。 Web ...

MySql 組み込み関数の自習知識ポイントまとめ

文字列関数文字ascii(str)のASCIIコード値をチェックし、strが空の文字列の場合は0を返...

発生したブラウザの互換性の問題と解決策(推奨)について

序文:先週の日曜日、先輩から3ページ作るのを手伝って欲しいと頼まれました。データのやり取りなどはなく...

PXEを使用してLinuxシステムを自動的に展開する方法

目次背景DHCPの設定DHCP ファイル (動的ホスト構成プロトコル) の編集tftp 設定sysl...

vue.js でよく使われる v 命令の解析

目次Vue でのモデルバインド表示の if の v-text の説明v-html: v-オンv-if...