序文 多くの場合、ユーザーが自分のデータに対して実行する操作に基づいて何かを行う必要があります。 たとえば、ユーザーがアカウントを削除した場合、私たちはそのユーザーを叱責し、再度アカウントに戻ってきてもらうよう懇願するテキストメッセージを送信します。 同様の機能はビジネス ロジック層に実装でき、ユーザーの削除要求を受け取った後にこの操作を実行できますが、データベース binlog は別の操作方法を提供します。 binlog を監視するには、2 つの手順が必要です。最初の手順は、もちろん MySQL でこの機能を有効にすることです。2 番目の手順は、ログを読み取るプログラムを作成することです。 mysql は binlog をオンにします。 まず、MySQL の binlog は通常は開かれていないため、次のものが必要です。 mysql 構成ファイル my.cnf を見つけます。場所はオペレーティング システムによって異なる場合があります。自分で見つけることができます。 これに次の内容を追加します。 [mysqld] サーバーID = 1 ログ bin = mysql bin binlog 形式 = ROW 次に、mysql を再起動します。 /ウブントゥ サービスmysqlの再起動 // マック mysql.server の再起動 正常に有効化されているかどうかを監視する mysql コマンドラインを入力して実行します: '%log_bin%' のような変数を表示します。 結果が以下の通りであれば成功です。 書き込まれているバイナリログのステータスを表示します。 binlog を読み取るコード 依存関係の導入 私たちはオープンソースの実装をいくつか使用しています。奇妙な理由から、mysql-binlog-connector-java パッケージ (公式 github リポジトリ) [github.com/shyiko/mysq…] を選択しました。具体的な依存関係は次のとおりです。 <!-- https://mvnrepository.com/artifact/com.github.shyiko/mysql-binlog-connector-java --> <依存関係> <groupId>com.github.shyiko</groupId> <artifactId>mysql-binlog-connector-java</artifactId> <バージョン>0.17.0</バージョン> </依存関係> もちろん、binlog 処理用のオープンソース実装は多数あり、Alibaba の cancl もその 1 つであり、これを使用することもできます。 デモを書く 公式リポジトリの readme に従って、簡単にデモを書いてみましょう。 パブリック静的voidメイン(String[] args) { BinaryLogClient クライアント = new BinaryLogClient("ホスト名", 3306, "ユーザー名", "パスワード"); イベントデシリアライザーeventDeserializer = new EventDeserializer(); イベントデシリアライザー.setCompatibilityMode( イベントデシリアライザー.互換性モード.DATE_AND_TIME_AS_LONG、 イベントデシリアライザー.互換性モード.CHAR_AND_BINARY_AS_BYTE_ARRAY ); クライアントにイベントデシリアライザーを設定します。 クライアント.registerEventListener(新しいBinaryLogClient.EventListener() { @オーバーライド パブリック void onEvent(イベント イベント) { //やるべきこと 何かを行う(); ロガー情報(イベント.toString()); } }); クライアントに接続します。 } これは、公式チュートリアルに完全に従って記述されています。onEvent に独自のビジネス ロジックを記述できます。私はテスト中なので、すべてのイベントを出力します。 その後、MySQL に手動でログインし、それぞれ追加、変更、削除の操作を実行しました。監視されたログは次のとおりです。
独自のビジネスに合わせて、より優れたカスタマイズされたツールクラスをカプセル化します 最初はコードを投稿するつもりでしたが、、、コードがどんどん長くなっていったのでGitHubにアップロードすることにしました。ここでは実装の一部だけ投稿します。コード転送ポータル 実装のアイデア
したがって、実装のアイデアはおおよそ次のようになります。
初期化コード: パブリックMysqlBinLogListener(Conf conf) { BinaryLogClient クライアント = new BinaryLogClient(conf.host、conf.port、conf.username、conf.passwd); イベントデシリアライザーeventDeserializer = new EventDeserializer(); イベントデシリアライザー.setCompatibilityMode( イベントデシリアライザー.互換性モード.DATE_AND_TIME_AS_LONG、 イベントデシリアライザー.互換性モード.CHAR_AND_BINARY_AS_BYTE_ARRAY ); クライアントにイベントデシリアライザーを設定します。 this.parseClient = クライアント; this.queue = 新しいArrayBlockingQueue<>(1024); conf を次のように変更します。 リスナー = 新しい ConcurrentHashMap<>(); dbTableCols = 新しいConcurrentHashMap<>(); this.consumer = Executors.newFixedThreadPool(consumerThreads); } 登録コード: パブリック void regListener(String db、String table、BinLogListener listener) 例外をスローします { 文字列 dbTable = getdbTable(db, table); クラス.forName("com.mysql.jdbc.Driver"); // 現在登録されているテーブルの列情報を保存します。Connection connection = DriverManager.getConnection("jdbc:mysql://" + conf.host + ":" + conf.port, conf.username, conf.passwd); Map<String, Colum> cols = getColMap(connection, db, table); dbTableCols.put(dbTable、列); //現在登録されているリスナーを保存する リスト<BinLogListener> リスト = listeners.getOrDefault(dbTable, 新しい ArrayList<>()); リストに追加します(リスナー)。 listeners.put(dbTable、リスト); } このステップでは、リスナーを登録しながらテーブルのスキーマ情報を取得し、それをマップに保存して、後続のデータ処理を容易にします。 リスニングコード: @オーバーライド パブリック void onEvent(イベント イベント) { イベントタイプ eventType = event.getHeader().getEventType(); イベントタイプ == イベントタイプ.TABLE_MAP) { テーブルマップイベントデータ tableData = event.getData(); 文字列 db = tableData.getDatabase(); 文字列テーブル = tableData.getTable(); dbTable = getdbTable(db、テーブル); } // 追加、削除、更新の3つの操作のみを処理します if (isWrite(eventType) || isUpdate(eventType) || isDelete(eventType)) { if (isWrite(イベントタイプ)) { WriteRowsEventData データ = event.getData(); (Serializable[] 行: data.getRows()) の場合 { dbTableCols.containsKey(dbTable) の場合 { LogItem e = LogItem.itemFromInsert(行、dbTableCols.get(dbTable)); e.setDbTable(dbTable); キューに追加します。 } } } } } 面倒なので、ここでは追加操作の処理のみを実装し、他の操作は書いていません。 消費コード: パブリック void parse() は IOException をスローします { parseClient.registerEventListener(これを); (int i = 0; i < consumerThreads; i++) の場合 { 消費者.送信(() -> { (真)の間{ キューサイズ() > 0 の場合 試す { ログアイテム項目 = queue.take(); 文字列 dbtable = item.getDbTable(); リスナー.get(dbtable).forEach(l -> { l.onEvent(アイテム); }); } キャッチ (InterruptedException e) { e.printStackTrace(); } } スレッドをスリープ状態にします(1000); } }); } parseClient.connect(); } 消費するときは、キューからアイテムを取得し、対応する 1 つ以上のリスナーを取得してそれぞれアイテムを消費します。 テストコード: パブリック静的void main(String[] args)は例外をスローします{ Conf conf = 新しい Conf(); conf.host = "ホスト名"; conf.port = 3306; conf.username = conf.passwd = "hhsgsb"; MysqlBinLogListener を新しい MysqlBinLogListener(conf); mysqlBinLogListener.parseArgsAndRun(引数); mysqlBinLogListener.regListener("pf", "学生", 項目 -> { System.out.println(新しい文字列((byte[])item.getAfter().get("name"))); logger.info("{} に挿入、値 = {}", item.getDbTable(), item.getAfter()); }); mysqlBinLogListener.regListener("pf", "teacher", item -> System.out.println("teacher ====")); mysqlBinLogListener.parse(); } この短いコードでは、2 つのリスナーが登録されており、それぞれ学生テーブルと教師テーブルをリッスンし、それぞれを印刷します。テスト後、教師テーブルにデータを挿入するときに、定義されたビジネス ロジックを個別に実行できます。 注: ここでのツール クラスは、多くの例外処理が行われていないため直接使用できません。関数は挿入ステートメントのみをリッスンしますが、これは実装の参照として使用できます。 参考文献
要約する 以上がこの記事の全内容です。この記事の内容が皆様の勉強や仕事に何らかの参考学習価値をもたらすことを願います。123WORDPRESS.COM をご愛顧いただき、誠にありがとうございます。 以下もご興味があるかもしれません:
|
>>: VMware 15 を使用して仮想マシンをインストールし、CentOS 8 を使用する詳細な手順
「脳が多数の領域間の関係を処理できるように、入力は論理的なグループに分割する必要があります。」 – ...
Nginxのクロスドメイン設定は次のようには機能しません サーバー{ 聞く 80; server_n...
MySQL 5.7.8 では json フィールドが導入されました。このタイプのフィールドは使用頻度...
yum install httpd php mariadb-server –yランプの動作環境を設定...
1 はじめにKong は単純な製品ではありません。この記事で言及されている Kong は主に Kon...
ul liの前のアイコン1をキャンセルしますクリア値1値を1に設定ラベル中央値1をクリアラベルの中央...
序文私たちのビジネスがまだ初期段階にあり、同時実行の度合いが比較的低い場合、数年間はデッドロックの問...
目次序文v-model の修飾子:怠け者トリム番号さまざまな入力タイプやその他の要素での v-mod...
1. 現在インストールされているPHPパッケージを確認するyum list installed |...
目次導入従来のトランジションアニメーションCSS トランジションアニメーションjsアニメーション従来...
1. はじめにMySQL グループ レプリケーション (略して MGR) は文字通り MySQL グ...
多くの場合、画像をコンテナのサイズに合わせて調整する必要があります。 1. imgタグ方式幅と高さを...
Tomcatが同時リクエストを処理する方法を理解することで、スレッドプール、ロック、キュー、および...
ディレクトリを作成する cd /usr/local/docker/ jenkins-docker を...
数日前に CentOS8 がリリースされました。8 の最初のバージョンですが、今日は VM12 に ...