特定の MySQL テーブルの完全データと増分データをメッセージ キューに同期する - ソリューション

特定の MySQL テーブルの完全データと増分データをメッセージ キューに同期する - ソリューション

1. 当初の需要

特定の MySQL ライブラリ内の特定のテーブルの元の完全なデータと増分データをリアルタイムで同期する必要があり、対応する変更と削除も同期する必要があります。

データの同期は邪魔にならないようにする必要があります。つまり、ビジネス手順を変更したり、ビジネス側に過度のパフォーマンス圧力をかけたりしてはなりません。

アプリケーション シナリオ: データ ETL 同期とビジネス サーバーへの負荷の軽減。

2. 解決策

3. 運河の導入と設置

Canal は、純粋な Java で開発された Alibaba のオープンソース プロジェクトです。データベースの増分ログ分析に基づいて、増分データのサブスクリプションと消費を提供し、現在は主に MySQL をサポートしています (mariaDB もサポートしています)。

動作原理: MySQL マスタースレーブレプリケーションの実装

大まかに見ると、レプリケーションは次の 3 つのステップに分かれます。

  1. マスターはバイナリ ログに変更を記録します (これらの記録はバイナリ ログ イベントと呼ばれ、show binlog events で表示できます)。
  2. スレーブはマスターのバイナリ ログ イベントをリレー ログにコピーします。
  3. スレーブはリレー ログ内のイベントをやり直し、データを自身のものに反映するように変更します。

運河の仕組み

原理は比較的単純です。

  1. Canal は MySQL スレーブの対話型プロトコルをシミュレートし、MySQL スレーブのふりをして、ダンプ プロトコルを MySQL マスターに送信します。
  2. MySQL マスターはダンプ要求を受信し、バイナリ ログをスレーブ (チャネル) にプッシュし始めます。
  3. Canalはバイナリログオブジェクト(元々はバイトストリーム)を解析します

建築

例:

  • サーバーは、JVMに対応するチャネル実行インスタンスを表します。
  • インスタンスはデータ キューに対応します (1 つのサーバーは 1..n 個のインスタンスに対応します)

インスタンスモジュール:

  • eventParser (データ ソース アクセス、スレーブ プロトコルとマスターの相互作用のシミュレーション、プロトコル解析)
  • eventSink (パーサーおよびストア コネクタ、データのフィルタリング、処理、および配信を実行)
  • eventStore (データストレージ)
  • metaManager (増分サブスクリプションおよび消費情報マネージャー)

インストール

1. MySQLとKafka環境の準備

2. canal をダウンロード: wget https://github.com/alibaba/canal/releases/download/canal-1.1.3/canal.deployer-1.1.3.tar.gz

3. 解凍: tar -zxvf canal.deployer-1.1.3.tar.gz

4. confディレクトリ内のファイルパラメータを設定する

canal.properties を設定します。

conf/example に入り、instance.properties を設定します。

5. 起動: bin/startup.sh

6. ログの表示:

4. 検証

1. 対応するKafkaコンシューマーを開発する

パッケージ org.kafka;

java.util.Arrays をインポートします。
java.util.Properties をインポートします。
org.apache.kafka.clients.consumer.ConsumerRecord をインポートします。
org.apache.kafka.clients.consumer.ConsumerRecords をインポートします。
org.apache.kafka.clients.consumer.KafkaConsumer をインポートします。
org.apache.kafka.common.serialization.StringDeserializer をインポートします。


/**
 *
 * タイトル: KafkaConsumerTest
 * 説明:
 * kafka コンシューマーデモ
 * バージョン:1.0.0
 * @著者 パンCM
 * @日付 2018年1月26日 */
パブリッククラス KafkaConsumerTest は Runnable を実装します {

    プライベート最終 KafkaConsumer<String, String> コンシューマー;
    プライベート ConsumerRecords<String, String> msgList;
    プライベート最終文字列トピック;
    プライベート静的最終文字列 GROUPID = "groupA";

    パブリック KafkaConsumerTest(文字列トピック名) {
        プロパティ props = new Properties();
        props.put("bootstrap.servers", "192.168.7.193:9092");
        props.put("group.id", GROUPID);
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("セッションタイムアウト.ms", "30000");
        props.put("auto.offset.reset", "最新");
        props.put("key.deserializer", StringDeserializer.class.getName());
        props.put("value.deserializer", StringDeserializer.class.getName());
        this.consumer = 新しい KafkaConsumer<String, String>(props);
        this.topic = トピック名;
        this.consumer.subscribe(Arrays.asList(トピック));
    }

    @オーバーライド
    パブリックボイド実行() {
        int メッセージ番号 = 1;
        System.out.println("---------消費開始---------");
        試す {
            のために (; ; ) {
                メッセージリスト = consumer.poll(1000);
                if (null != msgList && msgList.count() > 0) {
                    (ConsumerRecord<String, String> レコード: msgList) {
                        // 100 件のレコードを消費した後に印刷しますが、印刷されたデータはこのパターンに従わない可能性があります System.out.println(messageNo + "=======receive: key = " + record.key() + ", value = " + record.value() + " offset===" + record.offset());


// 文字列 v = decodeUnicode(record.value());

// System.out.println(v);

                        // 1000 メッセージが消費されたら終了 if (messageNo % 1000 == 0) {
                            壊す;
                        }
                        メッセージNo++;
                    }
                } それ以外 {
                    スレッド.sleep(11);
                }
            }
        } キャッチ (InterruptedException e) {
            e.printStackTrace();
        ついに
            コンシューマーを閉じます。
        }
    }

    パブリック静的void main(String args[]) {
        KafkaConsumerTest test1 = 新しい KafkaConsumerTest("サンプルデータ");
        スレッド thread1 = new Thread(test1);
        スレッド1を開始します。
    }


    /*
     * 中国語をユニコードに変換*/
    パブリック静的文字列 gbEncoding(最終的な文字列 gbString) {
        char[] utfBytes = gbString.toCharArray();
        文字列 unicodeBytes = "";
        (int i = 0; i < utfBytes.length; i++) の場合 {
            文字列 hexB = Integer.toHexString(utfBytes[i]);
            (hexB.length() <= 2)の場合{
                hexB = "00" + hexB;
            }
            ユニコードバイト = ユニコードバイト + "\\u" + hexB;
        }
        unicodeBytes を返します。
    }

    /*
     * 中国語への Unicode エンコード */
    パブリック静的文字列decodeUnicode(final String dataStr) {
        開始 = 0;
        int 終了 = 0;
        最終的なStringBufferバッファ = 新しいStringBuffer();
        (開始 > -1) の間 {
            終了 = dataStr.indexOf("\\u", 開始 + 2);
            文字列 charStr = "";
            終了 == -1 の場合 {
                charStr = dataStr.substring(start + 2, dataStr.length());
            } それ以外 {
                charStr = dataStr.substring(開始 + 2、終了);
            }
            char letter = (char) Integer.parseInt(charStr, 16); // 16進整数文字列を解析します。
            buffer.append(新しいCharacter(letter).toString());
            開始 = 終了;
        }
        buffer.toString() を返します。

    }
}

2. テーブルbak1にデータを追加する

テーブル `bak1` を作成します (
  `vin` varchar(20) NOT NULL,
  `p1` ダブルデフォルト NULL、
  `p2` ダブルデフォルト NULL、
  `p3` ダブルデフォルト NULL、
  `p4` ダブルデフォルト NULL、
  `p5` ダブルデフォルト NULL、
  `p6` ダブルデフォルト NULL、
  `p7` ダブルデフォルト NULL、
  `p8` ダブルデフォルト NULL、
  `p9` ダブルデフォルト NULL、
  `p0` ダブル デフォルト NULL
) エンジン=InnoDB デフォルト文字セット=utf8mb4

表示テーブル bak1 を作成します。

insert into bak1 select '李雷abcv',
  `p1`、
  `p2`、
  `p3`、
  `p4`、
  `p5`、
  `p6`、
  `p7`、
  `p8`、
  `p9`、
  moci 制限 10 からの `p0`

3. 出力結果を表示します。

これで、特定の MySQL テーブルの完全データと増分データをメッセージ キューに同期するソリューションに関するこの記事は終了です。特定の MySQL テーブルでデータを同期する方法の詳細については、123WORDPRESS.COM の以前の記事を検索するか、次の関連記事を引き続き参照してください。今後とも 123WORDPRESS.COM をよろしくお願いいたします。

以下もご興味があるかもしれません:
  • MySQLからElasticsearchにデータを同期する方法の詳細な説明
  • Python を使用して MySQL データを ElasticSearch に同期する方法のチュートリアル
  • node.js を使用して MongoDB データを MySQL に同期する手順
  • MySQL5.6 マスタースレーブレプリケーション (mysql データ同期構成)
  • MySQLマスタースレーブデータ同期遅延の削減の詳細な説明
  • 2つのテーブル間でデータを同期するためのmysqlトリガー
  • MySQLデータ同期におけるSlave_IO_Running:Noの問題の解決方法のまとめ
  • MySQLのバックアップと移行データの同期方法
  • MYSQL5 マスタースレーブデータ同期設定方法
  • MySQLデータを同期する方法

<<:  Tencent Cloud Server での Jenkins の設定方法の詳細

>>:  メッセージ ボタンに数量バッジを追加する HTML コード

推薦する

Dockerはnextcloudを使用してプライベートBaiduクラウドディスクを構築します

突然、ドキュメントの保存と共同作業のためのプライベート サービスを構築する必要がありました。多くの場...

Mysql Explainコマンドの使用と分析

mysql explain コマンドは、MySQL がインデックスを使用して選択ステートメントを処理...

Nginx イントラネット スタンドアロン リバース プロキシの実装

目次1 Nginxのインストール2 Nginxの設定3 ホストファイルを変更する4 テストNginx...

「fsck」を使用して Linux のファイルシステムエラーを修正する方法

序文ファイル システムは、データの保存方法と復元方法を整理する役割を担います。 いずれにせよ、時間の...

CSS 水平方向の中央揃えと最大幅の制限

CSS レイアウトとスタイルに関する質問: 水平方向の中央揃えと最大幅の制限のバランスをとる方法最近...

要素内の TimePicker は時間の一部を無効にします (分単位で無効)

プロジェクトの要件は、日付と時刻を選択し、現在の時刻以降の時刻のみを選択し、最小レベルを分単位で無効...

JavaScript プロトタイプオブジェクトの this ポイント問題の詳細な説明

目次1. これは2. この点を修正する1. call() メソッド2. apply() メソッド要約...

HTML のテキストエリア タグ

<textarea></textarea> は、複数行を入力できるテキスト ...

MySQL 8.0 バージョンで getTables がすべてのデータベース テーブルを返す問題の簡単な分析

序文この記事では、主にライブラリ内のすべてのテーブルを返すMysql8.0ドライバgetTables...

MySQL のインデックスにおける NULL の影響についての詳細な説明

序文私は多くのブログを読み、弊社の DBA を含む多くの人々が、MySql では列に null が含...

MySQL のマスター スレーブ レプリケーション オプションをオンラインで変更する方法

序文: MySQL で最も一般的に使用されるアーキテクチャは、マスター スレーブ レプリケーションで...

WeChatアプレットの入力ジッター問題を解決する方法

問題を見つけるまず問題を見てみましょう。ミニプログラムでは、Vant のダイアログ コンポーネント ...

JavaScriptがDOMツリーの構築にどのように影響するかについて詳しく説明します。

目次ドキュメント オブジェクト モデル (DOM) DOM と JavaScript DOMツリーの...

MySQL でデータ復旧に binlog を使用する方法

序文最近、オンラインでデータが誤って操作されました。データベースが直接変更されたため、それを回復する...

複数の Docker コンテナが同じポート番号を持たない場合の解決策

背景Dockerでは、同じイメージを使用して4つのコンテナを作成します。ネットワークはブリッジモード...