1. 当初の需要特定の MySQL ライブラリ内の特定のテーブルの元の完全なデータと増分データをリアルタイムで同期する必要があり、対応する変更と削除も同期する必要があります。 データの同期は邪魔にならないようにする必要があります。つまり、ビジネス手順を変更したり、ビジネス側に過度のパフォーマンス圧力をかけたりしてはなりません。 アプリケーション シナリオ: データ ETL 同期とビジネス サーバーへの負荷の軽減。 2. 解決策3. 運河の導入と設置Canal は、純粋な Java で開発された Alibaba のオープンソース プロジェクトです。データベースの増分ログ分析に基づいて、増分データのサブスクリプションと消費を提供し、現在は主に MySQL をサポートしています (mariaDB もサポートしています)。 動作原理: MySQL マスタースレーブレプリケーションの実装 大まかに見ると、レプリケーションは次の 3 つのステップに分かれます。
運河の仕組み原理は比較的単純です。
建築例:
インスタンスモジュール:
インストール1. MySQLとKafka環境の準備 2. canal をダウンロード: wget https://github.com/alibaba/canal/releases/download/canal-1.1.3/canal.deployer-1.1.3.tar.gz 3. 解凍: tar -zxvf canal.deployer-1.1.3.tar.gz 4. confディレクトリ内のファイルパラメータを設定する canal.properties を設定します。 conf/example に入り、instance.properties を設定します。 5. 起動: bin/startup.sh 6. ログの表示: 4. 検証1. 対応するKafkaコンシューマーを開発する パッケージ org.kafka; java.util.Arrays をインポートします。 java.util.Properties をインポートします。 org.apache.kafka.clients.consumer.ConsumerRecord をインポートします。 org.apache.kafka.clients.consumer.ConsumerRecords をインポートします。 org.apache.kafka.clients.consumer.KafkaConsumer をインポートします。 org.apache.kafka.common.serialization.StringDeserializer をインポートします。 /** * * タイトル: KafkaConsumerTest * 説明: * kafka コンシューマーデモ * バージョン:1.0.0 * @著者 パンCM * @日付 2018年1月26日 */ パブリッククラス KafkaConsumerTest は Runnable を実装します { プライベート最終 KafkaConsumer<String, String> コンシューマー; プライベート ConsumerRecords<String, String> msgList; プライベート最終文字列トピック; プライベート静的最終文字列 GROUPID = "groupA"; パブリック KafkaConsumerTest(文字列トピック名) { プロパティ props = new Properties(); props.put("bootstrap.servers", "192.168.7.193:9092"); props.put("group.id", GROUPID); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("セッションタイムアウト.ms", "30000"); props.put("auto.offset.reset", "最新"); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", StringDeserializer.class.getName()); this.consumer = 新しい KafkaConsumer<String, String>(props); this.topic = トピック名; this.consumer.subscribe(Arrays.asList(トピック)); } @オーバーライド パブリックボイド実行() { int メッセージ番号 = 1; System.out.println("---------消費開始---------"); 試す { のために (; ; ) { メッセージリスト = consumer.poll(1000); if (null != msgList && msgList.count() > 0) { (ConsumerRecord<String, String> レコード: msgList) { // 100 件のレコードを消費した後に印刷しますが、印刷されたデータはこのパターンに従わない可能性があります System.out.println(messageNo + "=======receive: key = " + record.key() + ", value = " + record.value() + " offset===" + record.offset()); // 文字列 v = decodeUnicode(record.value()); // System.out.println(v); // 1000 メッセージが消費されたら終了 if (messageNo % 1000 == 0) { 壊す; } メッセージNo++; } } それ以外 { スレッド.sleep(11); } } } キャッチ (InterruptedException e) { e.printStackTrace(); ついに コンシューマーを閉じます。 } } パブリック静的void main(String args[]) { KafkaConsumerTest test1 = 新しい KafkaConsumerTest("サンプルデータ"); スレッド thread1 = new Thread(test1); スレッド1を開始します。 } /* * 中国語をユニコードに変換*/ パブリック静的文字列 gbEncoding(最終的な文字列 gbString) { char[] utfBytes = gbString.toCharArray(); 文字列 unicodeBytes = ""; (int i = 0; i < utfBytes.length; i++) の場合 { 文字列 hexB = Integer.toHexString(utfBytes[i]); (hexB.length() <= 2)の場合{ hexB = "00" + hexB; } ユニコードバイト = ユニコードバイト + "\\u" + hexB; } unicodeBytes を返します。 } /* * 中国語への Unicode エンコード */ パブリック静的文字列decodeUnicode(final String dataStr) { 開始 = 0; int 終了 = 0; 最終的なStringBufferバッファ = 新しいStringBuffer(); (開始 > -1) の間 { 終了 = dataStr.indexOf("\\u", 開始 + 2); 文字列 charStr = ""; 終了 == -1 の場合 { charStr = dataStr.substring(start + 2, dataStr.length()); } それ以外 { charStr = dataStr.substring(開始 + 2、終了); } char letter = (char) Integer.parseInt(charStr, 16); // 16進整数文字列を解析します。 buffer.append(新しいCharacter(letter).toString()); 開始 = 終了; } buffer.toString() を返します。 } } 2. テーブルbak1にデータを追加する テーブル `bak1` を作成します ( `vin` varchar(20) NOT NULL, `p1` ダブルデフォルト NULL、 `p2` ダブルデフォルト NULL、 `p3` ダブルデフォルト NULL、 `p4` ダブルデフォルト NULL、 `p5` ダブルデフォルト NULL、 `p6` ダブルデフォルト NULL、 `p7` ダブルデフォルト NULL、 `p8` ダブルデフォルト NULL、 `p9` ダブルデフォルト NULL、 `p0` ダブル デフォルト NULL ) エンジン=InnoDB デフォルト文字セット=utf8mb4 表示テーブル bak1 を作成します。 insert into bak1 select '李雷abcv', `p1`、 `p2`、 `p3`、 `p4`、 `p5`、 `p6`、 `p7`、 `p8`、 `p9`、 moci 制限 10 からの `p0` 3. 出力結果を表示します。 これで、特定の MySQL テーブルの完全データと増分データをメッセージ キューに同期するソリューションに関するこの記事は終了です。特定の MySQL テーブルでデータを同期する方法の詳細については、123WORDPRESS.COM の以前の記事を検索するか、次の関連記事を引き続き参照してください。今後とも 123WORDPRESS.COM をよろしくお願いいたします。 以下もご興味があるかもしれません:
|
<<: Tencent Cloud Server での Jenkins の設定方法の詳細
>>: メッセージ ボタンに数量バッジを追加する HTML コード
突然、ドキュメントの保存と共同作業のためのプライベート サービスを構築する必要がありました。多くの場...
mysql explain コマンドは、MySQL がインデックスを使用して選択ステートメントを処理...
目次1 Nginxのインストール2 Nginxの設定3 ホストファイルを変更する4 テストNginx...
序文ファイル システムは、データの保存方法と復元方法を整理する役割を担います。 いずれにせよ、時間の...
CSS レイアウトとスタイルに関する質問: 水平方向の中央揃えと最大幅の制限のバランスをとる方法最近...
プロジェクトの要件は、日付と時刻を選択し、現在の時刻以降の時刻のみを選択し、最小レベルを分単位で無効...
目次1. これは2. この点を修正する1. call() メソッド2. apply() メソッド要約...
<textarea></textarea> は、複数行を入力できるテキスト ...
序文この記事では、主にライブラリ内のすべてのテーブルを返すMysql8.0ドライバgetTables...
序文私は多くのブログを読み、弊社の DBA を含む多くの人々が、MySql では列に null が含...
序文: MySQL で最も一般的に使用されるアーキテクチャは、マスター スレーブ レプリケーションで...
問題を見つけるまず問題を見てみましょう。ミニプログラムでは、Vant のダイアログ コンポーネント ...
目次ドキュメント オブジェクト モデル (DOM) DOM と JavaScript DOMツリーの...
序文最近、オンラインでデータが誤って操作されました。データベースが直接変更されたため、それを回復する...
背景Dockerでは、同じイメージを使用して4つのコンテナを作成します。ネットワークはブリッジモード...