jdbc-input-plugin은 elasticsearch에 대한 데이터베이스 추가 및 증분 쓰기만 구현할 수 있지만 종종 jdbc 소스 끝의 데이터베이스가 데이터베이스 삭제 또는 업데이트 작업을 수행할 수 있습니다. 결과적으로 데이터베이스와 검색 엔진 데이터베이스 간에 비대칭이 발생합니다. 이 기사에서는 주로 MySQL과 Elasticsearch 간의 데이터 비대칭 문제에 대한 관련 정보를 소개합니다. Elasticsearch 증분 쓰기의 경우 JDBC 소스 끝의 데이터베이스에서 데이터베이스 삭제 또는 업데이트 작업을 수행하는 경우가 많습니다. 여기에서 솔루션을 참조할 수 있습니다. 다음으로, 그것이 모두에게 도움이 되기를 바랍니다.
물론, 개발팀이 있다면 삭제나 업데이트 시 검색엔진 동작을 동기화하는 프로그램을 작성할 수도 있습니다. 이 능력이 없다면 다음 방법을 시도해 볼 수 있습니다.
여기 데이터 테이블 기사가 있습니다. mtime 필드는 ON UPDATE CURRENT_TIMESTAMP를 정의하므로 mtime을 업데이트하는 시간은 매번 변경됩니다.
mysql> desc article; +-------------+--------------+------+-----+--------------------------------+-------+ | Field | Type | Null | Key | Default | Extra | +-------------+--------------+------+-----+--------------------------------+-------+ | id | int(11) | NO | | 0 | | | title | mediumtext | NO | | NULL | | | description | mediumtext | YES | | NULL | | | author | varchar(100) | YES | | NULL | | | source | varchar(100) | YES | | NULL | | | content | longtext | YES | | NULL | | | status | enum('Y','N')| NO | | 'N' | | | ctime | timestamp | NO | | CURRENT_TIMESTAMP | | | mtime | timestamp | YES | | ON UPDATE CURRENT_TIMESTAMP | | +-------------+--------------+------+-----+--------------------------------+-------+ 7 rows in set (0.00 sec)
logstash는 mtime에 대한 쿼리 규칙을 추가합니다.
jdbc { jdbc_driver_library => "/usr/share/java/mysql-connector-java.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_connection_string => "jdbc:mysql://localhost:3306/cms" jdbc_user => "cms" jdbc_password => "password" schedule => "* * * * *" #定时cron的表达式,这里是每分钟执行一次 statement => "select * from article where mtime > :sql_last_value" use_column_value => true tracking_column => "mtime" tracking_column_type => "timestamp" record_last_run => true last_run_metadata_path => "/var/tmp/article-mtime.last" }
휴지통 테이블을 만듭니다. 이 사항은 데이터베이스 삭제 또는 비활성화 상태 = 'N' 문제를 해결하는 데 사용됩니다.
CREATE TABLE `elasticsearch_trash` ( `id` int(11) NOT NULL, `ctime` timestamp NULL DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8
기사 테이블에 대한 트리거 만들기
CREATE DEFINER=`dba`@`%` TRIGGER `article_BEFORE_UPDATE` BEFORE UPDATE ON `article` FOR EACH ROW BEGIN -- 此处的逻辑是解决文章状态变为 N 的时候,需要将搜索引擎中对应的数据删除。 IF NEW.status = 'N' THEN insert into elasticsearch_trash(id) values(OLD.id); END IF; -- 此处逻辑是修改状态到 Y 的时候,方式elasticsearch_trash仍然存在该文章ID,导致误删除。所以需要删除回收站中得回收记录。 IF NEW.status = 'Y' THEN delete from elasticsearch_trash where id = OLD.id; END IF; END CREATE DEFINER=`dba`@`%` TRIGGER `article_BEFORE_DELETE` BEFORE DELETE ON `article` FOR EACH ROW BEGIN -- 此处逻辑是文章被删除同事将改文章放入搜索引擎回收站。 insert into elasticsearch_trash(id) values(OLD.id); END
다음으로 매 분마다 실행되고 elasticsearch_trash 데이터 테이블에서 데이터를 검색한 다음 컬 명령을 사용하여 호출하는 간단한 셸을 작성해야 합니다. elasticsearch 편안한 인터페이스 및 복구된 데이터 삭제.
관련 프로그램도 개발할 수 있습니다. Spring boot 예약 작업 예는 다음과 같습니다.
Entity
package cn.netkiller.api.domain.elasticsearch; import java.util.Date; import javax.persistence.Column; import javax.persistence.Entity; import javax.persistence.Id; import javax.persistence.Table; @Entity @Table public class ElasticsearchTrash { @Id private int id; @Column(columnDefinition = "TIMESTAMP DEFAULT CURRENT_TIMESTAMP") private Date ctime; public int getId() { return id; } public void setId(int id) { this.id = id; } public Date getCtime() { return ctime; } public void setCtime(Date ctime) { this.ctime = ctime; } }
Warehouse
package cn.netkiller.api.repository.elasticsearch; import org.springframework.data.repository.CrudRepository; import com.example.api.domain.elasticsearch.ElasticsearchTrash; public interface ElasticsearchTrashRepository extends CrudRepository<ElasticsearchTrash, Integer>{ }
Timed task
package cn.netkiller.api.schedule; import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.rest.RestStatus; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import com.example.api.domain.elasticsearch.ElasticsearchTrash; import com.example.api.repository.elasticsearch.ElasticsearchTrashRepository; @Component public class ScheduledTasks { private static final Logger logger = LoggerFactory.getLogger(ScheduledTasks.class); @Autowired private TransportClient client; @Autowired private ElasticsearchTrashRepository alasticsearchTrashRepository; public ScheduledTasks() { } @Scheduled(fixedRate = 1000 * 60) // 60秒运行一次调度任务 public void cleanTrash() { for (ElasticsearchTrash elasticsearchTrash : alasticsearchTrashRepository.findAll()) { DeleteResponse response = client.prepareDelete("information", "article", elasticsearchTrash.getId() + "").get(); RestStatus status = response.status(); logger.info("delete {} {}", elasticsearchTrash.getId(), status.toString()); if (status == RestStatus.OK || status == RestStatus.NOT_FOUND) { alasticsearchTrashRepository.delete(elasticsearchTrash); } } } }
Spring 부트가 메인 프로그램을 시작합니다.
package cn.netkiller.api; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.scheduling.annotation.EnableScheduling; @SpringBootApplication @EnableScheduling public class Application { public static void main(String[] args) { SpringApplication.run(Application.class, args); } }
관련 추천:
Elasticsearch란 무엇인가요? Elasticsearch는 어디에 사용될 수 있나요?
Elasticsearch 인덱스 및 문서 연산 예제 튜토리얼
봄에 Elasticsearch를 활용하는 자세한 예제 튜토리얼
위 내용은 MySQL과 Elasticsearch 간의 데이터 비대칭에 대한 자세한 예의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!