Elasticsearch和MySQL之间的数据同步问题怎么解决

数据库   发布日期:2025年03月03日   浏览次数:274

这篇文章主要介绍“Elasticsearch和MySQL之间的数据同步问题怎么解决”,在日常操作中,相信很多人在Elasticsearch和MySQL之间的数据同步问题怎么解决问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”Elasticsearch和MySQL之间的数据同步问题怎么解决”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

Elasticsearch中的数据是来自于Mysql数据库的,因此当数据库中的数据进行增删改后,Elasticsearch中的数据,索引也必须跟着做出改变。而对于管理服务(MySQL)和搜索服务(Elasticsearch)往往会在不同的微服务上。

可以通过微服务之间的同步调用来解决数据同步问题,虽然实现起来比较简单,但是在搜索服务中引入管理服务时,业务的耦合度相对来说是比较高的。因此可以通过消息队列的形式来异步通知管理服务的改变。这样做的有优点是耦合度较低,但是依赖于消息队列的耦合度。

首先在两块微服务中引入RabbitMQ的依赖:

引入依赖

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-amqp</artifactId>
  4. </dependency>

创建常量类,将交换机和队列名称设置为常量,使用时方便取名。

  1. public class MqConstants {
  2. /**
  3. * 交换机名称
  4. */
  5. public final static String HOTEL_EXCHANGE = "hotel.topic";
  6. /**
  7. * 监听新增和修改的队列
  8. */
  9. public final static String HOTEL_INSERT_QUEUE = "hotel.insert.queue";
  10. /**
  11. * 监听删除的队列
  12. */
  13. public final static String HOTEL_DELETE_QUEUE = "hotel.delete.queue";
  14. /**
  15. * 新增或修改的路由键
  16. */
  17. public final static String HOTEL_INSERT_KEY = "hotel.insert";
  18. /**
  19. * 删除的路由键
  20. */
  21. public final static String HOTEL_DELETE_KEY = "hotel.delete";
  22. }

创建配置类,声明交换机,并将路由键,队列与交换机之间互相绑定:

  1. @Configuration
  2. public class MqConfig {
  3. @Bean
  4. public TopicExchange topicExchange(){
  5. return new TopicExchange(MqConstants.HOTEL_EXCHANGE,true,false);
  6. }
  7. @Bean
  8. public Queue insertQueue(){
  9. return new Queue(MqConstants.HOTEL_INSERT_QUEUE,true);
  10. }
  11. @Bean
  12. public Queue deleteQueue(){
  13. return new Queue(MqConstants.HOTEL_DELETE_QUEUE,true);
  14. }
  15. @Bean
  16. public Binding insertQueueBinding(){
  17. return BindingBuilder.bind(insertQueue()).to(topicExchange()).with(MqConstants.HOTEL_INSERT_KEY);
  18. }
  19. @Bean
  20. public Binding deleteQueueBinding(){
  21. return BindingBuilder.bind(deleteQueue()).to(topicExchange()).with(MqConstants.HOTEL_DELETE_KEY);
  22. }
  23. }

编写controller层,将酒店数据保存到数据库中,并将消息发送到消息队列里:

  1. @Autowired
  2. private IHotelService hotelService;
  3. @Autowired
  4. private RabbitTemplate rabbitTemplate;
  5. @PutMapping
  6. public void save(@RequestBody Hotel hotel){
  7. hotelService.save(hotel);
  8. rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE,MqConstants.HOTEL_INSERT_KEY,hotel.getId());
  9. }
  10. @DeleteMapping("/{id}")
  11. public void deleteById(@PathVariable("id") Long id){
  12. hotelService.removeById(id);
  13. rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE,MqConstants.HOTEL_DELETE_KEY,id);
  14. }

在service中加入两个方法,根据id增加和根据id删除:

  1. void insertById(Long id);
  2. void deleteById(Long id);

利用Ctrl+Alt+B的快捷键,在service的实现类里面重写方法进行实现:

  1. @Override
  2. public void insertById(Long id) {
  3. try {
  4. Hotel hotel = getById(id);
  5. HotelDoc hotelDoc = new HotelDoc(hotel);
  6. // 1.准备Request
  7. IndexRequest request = new IndexRequest("hotel").id(hotelDoc.getId().toString());
  8. // 2.准备请求参数DSL,其实就是文档的JSON字符串
  9. request.source(JSON.toJSONString(hotelDoc), XContentType.JSON);
  10. // 3.发送请求
  11. client.index(request, RequestOptions.DEFAULT);
  12. } catch (IOException e) {
  13. throw new RuntimeException(e);
  14. }
  15. }
  16. @Override
  17. public void deleteById(Long id) {
  18. try {
  19. DeleteRequest request = new DeleteRequest("hotel", id.toString());
  20. // 2.发送请求
  21. client.delete(request, RequestOptions.DEFAULT);
  22. } catch (IOException e) {
  23. throw new RuntimeException(e);
  24. }
  25. }

这样即可完成Elasticsearch和MySQL之间的数据同步。

以上就是Elasticsearch和MySQL之间的数据同步问题怎么解决的详细内容,更多关于Elasticsearch和MySQL之间的数据同步问题怎么解决的资料请关注九品源码其它相关文章!