1. 当初の需要特定の MySQL ライブラリ内の特定のテーブルの元の完全なデータと増分データをリアルタイムで同期する必要があり、対応する変更と削除も同期する必要があります。 データの同期は邪魔にならないようにする必要があります。つまり、ビジネス手順を変更したり、ビジネス側に過度のパフォーマンス圧力をかけたりしてはなりません。 アプリケーション シナリオ: データ ETL 同期とビジネス サーバーへの負荷の軽減。 2. 解決策3. 運河の導入と設置Canal は、純粋な Java で開発された Alibaba のオープンソース プロジェクトです。データベースの増分ログ分析に基づいて、増分データのサブスクリプションと消費を提供し、現在は主に MySQL をサポートしています (mariaDB もサポートしています)。 動作原理: MySQL マスタースレーブレプリケーションの実装 大まかに見ると、レプリケーションは次の 3 つのステップに分かれます。
運河の仕組み原理は比較的単純です。
建築例:
インスタンスモジュール:
インストール1. MySQLとKafka環境の準備 2. canal をダウンロード: wget https://github.com/alibaba/canal/releases/download/canal-1.1.3/canal.deployer-1.1.3.tar.gz 3. 解凍: tar -zxvf canal.deployer-1.1.3.tar.gz 4. confディレクトリ内のファイルパラメータを設定する canal.properties を設定します。 conf/example に入り、instance.properties を設定します。 5. 起動: bin/startup.sh 6. ログの表示: 4. 検証1. 対応するKafkaコンシューマーを開発する パッケージ org.kafka; java.util.Arrays をインポートします。 java.util.Properties をインポートします。 org.apache.kafka.clients.consumer.ConsumerRecord をインポートします。 org.apache.kafka.clients.consumer.ConsumerRecords をインポートします。 org.apache.kafka.clients.consumer.KafkaConsumer をインポートします。 org.apache.kafka.common.serialization.StringDeserializer をインポートします。 /** * * タイトル: KafkaConsumerTest * 説明: * kafka コンシューマーデモ * バージョン:1.0.0 * @著者 パンCM * @日付 2018年1月26日 */ パブリッククラス KafkaConsumerTest は Runnable を実装します { プライベート最終 KafkaConsumer<String, String> コンシューマー; プライベート ConsumerRecords<String, String> msgList; プライベート最終文字列トピック; プライベート静的最終文字列 GROUPID = "groupA"; パブリック KafkaConsumerTest(文字列トピック名) { プロパティ props = new Properties(); props.put("bootstrap.servers", "192.168.7.193:9092"); props.put("group.id", GROUPID); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("セッションタイムアウト.ms", "30000"); props.put("auto.offset.reset", "最新"); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", StringDeserializer.class.getName()); this.consumer = 新しい KafkaConsumer<String, String>(props); this.topic = トピック名; this.consumer.subscribe(Arrays.asList(トピック)); } @オーバーライド パブリックボイド実行() { int メッセージ番号 = 1; System.out.println("---------消費開始---------"); 試す { のために (; ; ) { メッセージリスト = consumer.poll(1000); if (null != msgList && msgList.count() > 0) { (ConsumerRecord<String, String> レコード: msgList) { // 100 件のレコードを消費した後に印刷しますが、印刷されたデータはこのパターンに従わない可能性があります System.out.println(messageNo + "=======receive: key = " + record.key() + ", value = " + record.value() + " offset===" + record.offset()); // 文字列 v = decodeUnicode(record.value()); // System.out.println(v); // 1000 メッセージが消費されたら終了 if (messageNo % 1000 == 0) { 壊す; } メッセージNo++; } } それ以外 { スレッド.sleep(11); } } } キャッチ (InterruptedException e) { e.printStackTrace(); ついに コンシューマーを閉じます。 } } パブリック静的void main(String args[]) { KafkaConsumerTest test1 = 新しい KafkaConsumerTest("サンプルデータ"); スレッド thread1 = new Thread(test1); スレッド1を開始します。 } /* * 中国語をユニコードに変換*/ パブリック静的文字列 gbEncoding(最終的な文字列 gbString) { char[] utfBytes = gbString.toCharArray(); 文字列 unicodeBytes = ""; (int i = 0; i < utfBytes.length; i++) の場合 { 文字列 hexB = Integer.toHexString(utfBytes[i]); (hexB.length() <= 2)の場合{ hexB = "00" + hexB; } ユニコードバイト = ユニコードバイト + "\\u" + hexB; } unicodeBytes を返します。 } /* * 中国語への Unicode エンコード */ パブリック静的文字列decodeUnicode(final String dataStr) { 開始 = 0; int 終了 = 0; 最終的なStringBufferバッファ = 新しいStringBuffer(); (開始 > -1) の間 { 終了 = dataStr.indexOf("\\u", 開始 + 2); 文字列 charStr = ""; 終了 == -1 の場合 { charStr = dataStr.substring(start + 2, dataStr.length()); } それ以外 { charStr = dataStr.substring(開始 + 2、終了); } char letter = (char) Integer.parseInt(charStr, 16); // 16進整数文字列を解析します。 buffer.append(新しいCharacter(letter).toString()); 開始 = 終了; } buffer.toString() を返します。 } } 2. テーブルbak1にデータを追加する テーブル `bak1` を作成します ( `vin` varchar(20) NOT NULL, `p1` ダブルデフォルト NULL、 `p2` ダブルデフォルト NULL、 `p3` ダブルデフォルト NULL、 `p4` ダブルデフォルト NULL、 `p5` ダブルデフォルト NULL、 `p6` ダブルデフォルト NULL、 `p7` ダブルデフォルト NULL、 `p8` ダブルデフォルト NULL、 `p9` ダブルデフォルト NULL、 `p0` ダブル デフォルト NULL ) エンジン=InnoDB デフォルト文字セット=utf8mb4 表示テーブル bak1 を作成します。 insert into bak1 select '李雷abcv', `p1`、 `p2`、 `p3`、 `p4`、 `p5`、 `p6`、 `p7`、 `p8`、 `p9`、 moci 制限 10 からの `p0` 3. 出力結果を表示します。 これで、特定の MySQL テーブルの完全データと増分データをメッセージ キューに同期するソリューションに関するこの記事は終了です。特定の MySQL テーブルでデータを同期する方法の詳細については、123WORDPRESS.COM の以前の記事を検索するか、次の関連記事を引き続き参照してください。今後とも 123WORDPRESS.COM をよろしくお願いいたします。 以下もご興味があるかもしれません:
|
<<: Tencent Cloud Server での Jenkins の設定方法の詳細
>>: メッセージ ボタンに数量バッジを追加する HTML コード
1. Linuxカーネルドライバモジュールの仕組み静的ロードでは、ドライバモジュールをカーネルにコン...
この記事ではjQueryを使用して、階段のスライド効果を実装し、フロアをスクロールし、フロアボタンを...
1. フロート: 主な目的は、テキストを画像の周囲に折り返す効果を実現することです。また、複数列レイ...
目次1. 準備2. MySQL暗号化関数方式2.1 MySQL 暗号化2.2 MYSQL 復号化3....
この記事では、タグイベントを動的に追加するためのjQueryの具体的なコードを参考までに紹介します。...
以下のコードはすべて <head>...</head> の間にあり、具体的な...
html-webpack-pluginプラグインを使用してページを開始すると、htmlページをメモリ...
テーブルページを作成するときに、td に設定された幅が無効になることがあります。td の幅は常に内部...
Linux でファイルを編集した後、保存して終了するにはどうすればよいですか?保存して終了するコマン...
序文この記事は主に、MySQL で浮動小数点型を文字型に変換するときに発生する問題を紹介します。これ...
多くの場合、ホームページを作成するときに、Web ページ ヘッダー属性の設定を無視します。 Web ...
文字列関数文字ascii(str)のASCIIコード値をチェックし、strが空の文字列の場合は0を返...
序文:先週の日曜日、先輩から3ページ作るのを手伝って欲しいと頼まれました。データのやり取りなどはなく...
目次背景DHCPの設定DHCP ファイル (動的ホスト構成プロトコル) の編集tftp 設定sysl...
目次Vue でのモデルバインド表示の if の v-text の説明v-html: v-オンv-if...