MySQLとElasticsearch間のデータ非対称性問題の解決策

MySQLとElasticsearch間のデータ非対称性問題の解決策

MySQLとElasticsearch間のデータ非対称性問題の解決策

jdbc-input-plugin は elasticsearch へのデータベース追加と増分書き込みのみを実装できますが、jdbc ソース側のデータベースではデータベースの削除または更新操作が頻繁に実行される可能性があります。これにより、データベースと検索エンジンのデータベースの間に非対称性が生じます。

もちろん、開発チームがあれば、削除や更新時に検索エンジンの動作を同期するプログラムを作成することもできます。この機能がない場合は、次の方法を試してください。

これはデータテーブルの記事です。mtimeフィールドはON UPDATE CURRENT_TIMESTAMPとして定義されているため、mtimeの更新ごとに時間が変わります。

mysql> desc 記事;
+-------------+--------------+------+-----+--------------------------------+-------+
| フィールド | タイプ | Null | キー | デフォルト | 追加 |
+-------------+--------------+------+-----+--------------------------------+-------+
| id | int(11) | いいえ | | 0 | |
| タイトル | 中テキスト | いいえ | | NULL | |
| 説明 | 中テキスト | はい | | NULL | |
| 著者 | varchar(100) | はい | | NULL | |
| ソース | varchar(100) | はい | | NULL | |
| コンテンツ | ロングテキスト | はい | | NULL | |
| ステータス | enum('Y','N')| いいえ | | 'N' | |
| ctime | タイムスタンプ | NO | | CURRENT_TIMESTAMP | |
| mtime | タイムスタンプ | YES | | ON UPDATE CURRENT_TIMESTAMP | |
+-------------+--------------+------+-----+--------------------------------+-------+
セット内の行数は 7 です (0.00 秒)

Logstash が mtime のクエリルールを追加

jdbc {
  jdbc_driver_library => "/usr/share/java/mysql-connector-java.jar"
  jdbc_driver_class => "com.mysql.jdbc.Driver"
  jdbc_connection_string => "jdbc:mysql://localhost:3306/cms"
  jdbc_user => "cms"
  jdbc_password => "パスワード"
  スケジュール => "* * * * *" #時間指定の cron 式。ここでは 1 分ごとに実行されます。ステートメント => "select * from article where mtime > :sql_last_value"
  列の値を使用する => true
  追跡列 => "mtime"
  追跡列タイプ => "タイムスタンプ" 
  record_last_run => 真
  last_run_metadata_path => "/var/tmp/article-mtime.last"
 }

データベースの削除または無効化ステータス = 'N' の問題を解決するために使用されるごみ箱テーブルを作成します。

テーブル `elasticsearch_trash` を作成します (
 `id` int(11) NULLではない、
 `ctime` タイムスタンプ NULL DEFAULT CURRENT_TIMESTAMP,
 主キー (`id`)
) エンジン=InnoDB デフォルト文字セット=utf8

記事テーブルのトリガーを作成する

各行の `article` の更新前に `article_BEFORE_UPDATE` トリガーを作成する DEFINER=`dba`@`%`
始める
 -- ここでのロジックは、記事のステータスが N になったときに、検索エンジン内の対応するデータを削除する必要があるという問題を解決することです。
 NEW.ステータス = 'N' の場合
 elasticsearch_trash(id) に値(OLD.id) を挿入します。
 終了の場合;
 -- ここでのロジックは、ステータスが Y に変更されると、elasticsearch_trash メソッドに記事 ID がまだ存在するため、誤って削除されてしまうというものです。したがって、ごみ箱内のリサイクル記録を削除する必要があります。
  NEW.ステータス = 'Y' の場合
 elasticsearch_trash から id = OLD.id を削除します。
 終了の場合;
終わり

各行の `article` の削除前に `article_BEFORE_DELETE` トリガーを CREATE DEFINER=`dba`@`%` で作成します
始める
 -- ここでのロジックは、記事が削除されると、その記事は検索エンジンのごみ箱に入れられるというものです。
 elasticsearch_trash(id) に値(OLD.id) を挿入します。
終わり

次に、elasticsearch_trash データ テーブルからデータを取得するために 1 分ごとに実行される単純なシェルを作成し、次に curl コマンドを使用して elasticsearch RESTful インターフェイスを呼び出して取得したデータを削除する必要があります。

関連するプログラムを開発することもできます。以下は、Spring Boot のスケジュールされたタスクの例です。

実在物

パッケージ cn.netkiller.api.domain.elasticsearch;

java.util.Date をインポートします。

javax.persistence.Column をインポートします。
javax.persistence.Entity をインポートします。
javax.persistence.Id をインポートします。
javax.persistence.Table をインポートします。

@実在物
@テーブル
パブリッククラスElasticsearchTrash {
 @ID
 プライベート int id;

 @Column(columnDefinition = "TIMESTAMP DEFAULT CURRENT_TIMESTAMP")
 プライベート日付 ctime;

 パブリック int getId() {
 ID を返します。
 }

 パブリック void setId(int id) {
 id は、
 }

 パブリック日付 getCtime() {
 ctime を返します。
 }

 パブリック void setCtime(Date ctime) {
 this.ctime = ctime;
 }

}

倉庫

パッケージ cn.netkiller.api.repository.elasticsearch;

org.springframework.data.repository.CrudRepository をインポートします。

com.example.api.domain.elasticsearch.ElasticsearchTrash をインポートします。

パブリックインターフェース ElasticsearchTrashRepository は CrudRepository<ElasticsearchTrash, Integer> を拡張します{


}

スケジュールされたタスク

パッケージ cn.netkiller.api.schedule;

org.elasticsearch.action.delete.DeleteResponse をインポートします。
org.elasticsearch.client.transport.TransportClient をインポートします。
org.elasticsearch.rest.RestStatus をインポートします。
org.slf4j.Logger をインポートします。
org.slf4j.LoggerFactory をインポートします。
org.springframework.beans.factory.annotation.Autowired をインポートします。
org.springframework.scheduling.annotation.Scheduled をインポートします。
org.springframework.stereotype.Component をインポートします。

com.example.api.domain.elasticsearch.ElasticsearchTrash をインポートします。
com.example.api.repository.elasticsearch.ElasticsearchTrashRepository をインポートします。

@成分
パブリッククラス ScheduledTasks {
 プライベート静的最終 Logger ロガー = LoggerFactory.getLogger(ScheduledTasks.class);

 オートワイヤード
 プライベート TransportClient クライアント。

 オートワイヤード
 プライベート ElasticsearchTrashRepository alasticsearchTrashRepository;

 パブリックスケジュールタスク() {
 }

 @Scheduled(fixedRate = 1000 * 60) // スケジュールされたタスクを60秒ごとに実行します public void cleanTrash() {
 (ElasticsearchTrash elasticsearchTrash : alasticsearchTrashRepository.findAll()) の場合 {
  DeleteResponse 応答 = client.prepareDelete("information", "article", elasticsearchTrash.getId() + "").get();
  レストステータスステータス = response.status();
  logger.info("{} {} を削除", elasticsearchTrash.getId(), status.toString());
  ステータス == RestStatus.OK || ステータス == RestStatus.NOT_FOUND) {
  alasticsearchTrashRepository を削除します。
  }
 }
 }
}

Spring Boot はメインプログラムを起動します。

パッケージ cn.netkiller.api;

org.springframework.boot.SpringApplication をインポートします。
org.springframework.boot.autoconfigure.SpringBootApplication をインポートします。
org.springframework.scheduling.annotation.EnableScheduling をインポートします。

@SpringBootアプリケーション
@スケジュールを有効にする
パブリッククラスアプリケーション{

 パブリック静的voidメイン(String[] args) {
 SpringApplication.run(Application.class、引数);
 }
}
 

上記は、MySQL と Elasticsearch 間のデータ非対称性問題の解決策の説明です。ご質問がある場合は、メッセージを残すか、このサイトのコミュニティで議論してください。お読みいただきありがとうございます。お役に立てれば幸いです。このサイトをサポートしていただきありがとうございます。

以下もご興味があるかもしれません:
  • Windows での MySQL 5.6 のインストールと設定 (スクリーンショットと詳細な手順付き)
  • Mysql 文字列インターセプション関数 SUBSTRING の使用方法
  • MySQL の日付データ型と時刻型の使用法の概要
  • MySQL ユーザーの作成と認証方法
  • MySQL CASE WHEN ステートメントの使用手順
  • mysql update ステートメントの詳細な使用方法
  • MySQL のヒント: PID ファイルを更新せずにサーバーが終了する問題の解決方法

<<:  Nodejs エラー処理プロセス記録

>>:  Nginx try_files ディレクティブの使用例

推薦する

設定ファイルを書いてMyBatisを簡単に使う方法

設定ファイルを書いてMyBatisを簡単に使う方法マイバティス3.xここでは MyBatis につい...

ユーザーエクスペリエンスの構築

<br />おそらく、あなたは会社に入社したばかりで、その会社が「ユーザビリティ」に関す...

Docker で複数のアプリケーション サイトをプロキシするために Nginx を使用する方法

序文エージェントの役割は何ですか? - 複数のドメイン名が同じサーバーに解決される- 1つのサーバー...

WindowsにOpenSSHをインストールし、SSHキーを生成してLinuxサーバーにログインします。

SSH の正式名称は Secure SHell です。 SSH を使用すると、送信されるすべてのデ...

Linux 仮想ホストで SourceGuardian (sg11) 暗号化コンポーネントを有効にする詳細な手順

注: sg11 弊社では Linux システム仮想ホストのセルフインストールのみサポートしております...

CentOS 7.4 64 ビット版に MySQL 8.0 をインストールして設定するための詳細な手順

ステップ1: MySQL YUMソースを取得するMySQLの公式サイトにアクセスして、RPMパッケー...

WeChatアプレットが複数行テキストのスクロール効果を実現

この記事の例では、WeChatアプレットで複数行のテキストスクロールを実装するための具体的なコードを...

Windows サーバー ポートを開きます (例としてポート 8080 を使用します)

ポートとは何ですか?私たちが通常参照するポートは、物理的な意味でのポートではなく、具体的には TCP...

リフレッシュリダイレクトを実現する HTML ヘッドタグメタ

コードをコピーコードは次のとおりです。 <html> <ヘッド> <m...

Vuex でゲッターとアクションを使用するための追加手順

予備的注釈1.Vue2.xとVue3.xの違い: Vue 3.x にはヘルパー関数はありません。 V...

MySQLデータベースを作成し、中国語の文字をサポートする方法

まずMySQLの公式ドキュメントを見てみましょう: 5.7 {データベース | スキーマ} を作成 ...

dockerでsshd操作を有効にする

まず、docker に openssh-server をインストールします。インストールが完了したら...

MySQL Limitクエリのパフォーマンスを向上させる方法

MySQL データベース操作では、一部のクエリを実行するときにデータベース エンジンが完全なテーブル...

nginxを使用して取得したIPアドレスが127.0.0.1である問題を解決する

IPツールを取得 lombok.extern.slf4j.Slf4j をインポートします。 org....