jdbc-input-plugin は、elasticsearch に対するデータベースの追加と増分書き込みのみを実装できますが、多くの場合、jdbc ソース側のデータベースはデータベースの削除または更新操作を実行する可能性があります。その結果、データベースと検索エンジンのデータベースの間には非対称性が生じます。この記事では主に、MySQL と Elasticsearch の間のデータの非対称性の問題の解決策に関する関連情報を紹介します。elasticsearch のインクリメンタル書き込みでは、jdbc ソース側のデータベースでデータベースの削除または更新操作が実行されることがよくあります。解決策が必要な方は、こちらを参照してください。次に、それが皆さんのお役に立てれば幸いです。
もちろん、開発チームがいる場合は、削除または更新時に検索エンジンの動作を同期するプログラムを作成できます。この機能がない場合は、次の方法を試してください。
これはデータテーブルの記事です。mtimeフィールドはON UPDATE CURRENT_TIMESTAMPを定義しているため、mtimeの更新時刻は毎回変わります
mysql> desc article; +-------------+--------------+------+-----+--------------------------------+-------+ | Field | Type | Null | Key | Default | Extra | +-------------+--------------+------+-----+--------------------------------+-------+ | id | int(11) | NO | | 0 | | | title | mediumtext | NO | | NULL | | | description | mediumtext | YES | | NULL | | | author | varchar(100) | YES | | NULL | | | source | varchar(100) | YES | | NULL | | | content | longtext | YES | | NULL | | | status | enum('Y','N')| NO | | 'N' | | | ctime | timestamp | NO | | CURRENT_TIMESTAMP | | | mtime | timestamp | YES | | ON UPDATE CURRENT_TIMESTAMP | | +-------------+--------------+------+-----+--------------------------------+-------+ 7 rows in set (0.00 sec)
logstashはmtimeのクエリルールを追加します
jdbc { jdbc_driver_library => "/usr/share/java/mysql-connector-java.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_connection_string => "jdbc:mysql://localhost:3306/cms" jdbc_user => "cms" jdbc_password => "password" schedule => "* * * * *" #定时cron的表达式,这里是每分钟执行一次 statement => "select * from article where mtime > :sql_last_value" use_column_value => true tracking_column => "mtime" tracking_column_type => "timestamp" record_last_run => true last_run_metadata_path => "/var/tmp/article-mtime.last" }
ごみ箱テーブルを作成します、本件 データベースの削除またはステータス = 'N' の無効化の問題を解決するために使用されます。
CREATE TABLE `elasticsearch_trash` ( `id` int(11) NOT NULL, `ctime` timestamp NULL DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8
article テーブルのトリガーを作成します
CREATE DEFINER=`dba`@`%` TRIGGER `article_BEFORE_UPDATE` BEFORE UPDATE ON `article` FOR EACH ROW BEGIN -- 此处的逻辑是解决文章状态变为 N 的时候,需要将搜索引擎中对应的数据删除。 IF NEW.status = 'N' THEN insert into elasticsearch_trash(id) values(OLD.id); END IF; -- 此处逻辑是修改状态到 Y 的时候,方式elasticsearch_trash仍然存在该文章ID,导致误删除。所以需要删除回收站中得回收记录。 IF NEW.status = 'Y' THEN delete from elasticsearch_trash where id = OLD.id; END IF; END CREATE DEFINER=`dba`@`%` TRIGGER `article_BEFORE_DELETE` BEFORE DELETE ON `article` FOR EACH ROW BEGIN -- 此处逻辑是文章被删除同事将改文章放入搜索引擎回收站。 insert into elasticsearch_trash(id) values(OLD.id); END
次に、1 分ごとに実行し、elasticsearch_trash データ テーブルからデータを取得し、curl コマンドを使用してelasticsearch の Restful インターフェイスと、回復されたデータの削除。
関連プログラムを開発することもできます。これは Spring Boot のスケジュールされたタスクの例です。
Entity
package cn.netkiller.api.domain.elasticsearch; import java.util.Date; import javax.persistence.Column; import javax.persistence.Entity; import javax.persistence.Id; import javax.persistence.Table; @Entity @Table public class ElasticsearchTrash { @Id private int id; @Column(columnDefinition = "TIMESTAMP DEFAULT CURRENT_TIMESTAMP") private Date ctime; public int getId() { return id; } public void setId(int id) { this.id = id; } public Date getCtime() { return ctime; } public void setCtime(Date ctime) { this.ctime = ctime; } }
Warehouse
package cn.netkiller.api.repository.elasticsearch; import org.springframework.data.repository.CrudRepository; import com.example.api.domain.elasticsearch.ElasticsearchTrash; public interface ElasticsearchTrashRepository extends CrudRepository<ElasticsearchTrash, Integer>{ }
Timed task
package cn.netkiller.api.schedule; import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.rest.RestStatus; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import com.example.api.domain.elasticsearch.ElasticsearchTrash; import com.example.api.repository.elasticsearch.ElasticsearchTrashRepository; @Component public class ScheduledTasks { private static final Logger logger = LoggerFactory.getLogger(ScheduledTasks.class); @Autowired private TransportClient client; @Autowired private ElasticsearchTrashRepository alasticsearchTrashRepository; public ScheduledTasks() { } @Scheduled(fixedRate = 1000 * 60) // 60秒运行一次调度任务 public void cleanTrash() { for (ElasticsearchTrash elasticsearchTrash : alasticsearchTrashRepository.findAll()) { DeleteResponse response = client.prepareDelete("information", "article", elasticsearchTrash.getId() + "").get(); RestStatus status = response.status(); logger.info("delete {} {}", elasticsearchTrash.getId(), status.toString()); if (status == RestStatus.OK || status == RestStatus.NOT_FOUND) { alasticsearchTrashRepository.delete(elasticsearchTrash); } } } }
Spring Bootによりメインプログラムが開始されます。
package cn.netkiller.api; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.scheduling.annotation.EnableScheduling; @SpringBootApplication @EnableScheduling public class Application { public static void main(String[] args) { SpringApplication.run(Application.class, args); } }
関連する推奨事項:
Elasticsearch とは何ですか? Elasticsearch はどこで使用できますか?
Elasticsearchインデックスとドキュメント操作のサンプルチュートリアル
SpringでElasticsearchを使用する詳細なサンプルチュートリアル
以上がMySQL と Elasticsearch の間のデータの非対称性の詳細な例の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。