canal实现当数据库改变时,同步数据到redis
思路
canal感知sql的改变,作为消息的提供者将消息(图⽚的postion属性,指图⽚位于⽹页的位置)放到rabbitmq的队列,nginx作为消息的消费者,获取消息,并通过Lua脚本更新数据行到水穷处 坐看云起时是什么意思
第⼀步,将消息放到消息队列
启动类上加上 @EnableCanalClient //声明当前服务是canal的客户端
配置⽂件
canal.ample.host=192.168.200.128
歌颂祖国的现代诗canal.ample.port=11111
canal.ample.batchSize=1000
spring.rabbitmq.host=192.168.200.128
编写rabbitmq的配置类
@Configuration
public class RabbitMQConfig {
//定义队列的名称
public static final String AD_UPDATE_QUEUE = "ad_update_queue";
//声明队列
哪个品牌电动车最好@Bean
public Queue queue(){
return new Queue(AD_UPDATE_QUEUE);
}
}
注意Queue的包时springframework的⾥的
编写监听类,监听canal的消息,并发送到mq
植物的活化石1 @CanalEventListener//声明当前的类是canal的监听类
2public class BusinessListener {
3
4 @Autowired
5private RabbitTemplate rabbitTemplate;
6
7/**
8 *
9 * @param eventType 当前操作数据库的类型
10 * @param rowData 当前操作数据库的数据 changgou_business
11*/
12 @ListenPoint(schema = "changgou_business", table = {"tb_ad"})//声明监听哪个库的哪个表
13public void adUpdate(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
14 println("⼴告数据发⽣变化");
15
16//修改前数据
17//BeforeColumnsList().forEach((c)-> System.out.println("改变前的数据:"+c.getName()+"::"+c.getValue()));
18 /* for(CanalEntry.Column column: BeforeColumnsList()) {
19 Name().equals("position")){
20 System.out.println("发送消息到mq ad_update_queue:"+Value());
21 vertAndSend("","ad_update_queue",Value()); //发送消息到mq
22 break;
23 }
24 } */
25
26//修改后数据
27//AfterColumnsList().forEach((c) -> System.out.println("改变后的数据"+c.getName()+"::"+c.getValue()));
28for(CanalEntry.Column column: AfterColumnsList()) {
Name().equals("position")){
30 System.out.println("发送消息到mq ad_update_queue:"+Value());
31//发送消息到mq 没有交换机,路由Key直接写队列名
32 vertAndSend("","ad_update_queue",Value());
33break;
34 }
35 }
36 }
37 }
这样,当数据库发⽣改变,canal就会通知这个类,这个类就会把position这个消息放到消息队列⾥牡蛎怎么吃
第⼆步,监听消息队列,如果有消息,就通知nginx调⽤lua更新redis
在消息的消费者模块引⼊坐标
<!--⽤于从消息队列获取数据-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--⽤于远程调⽤nginx-->
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
<version>3.9.0</version>
</dependency>
创建监听者,代码如下
@Component
public class AdListener {
@RabbitListener(queues = "ad_update_queue")
public void receiveMessage(String message){
System.out.println("接收到的消息未:" + message);
//发起远程调⽤nginx,进⾏更新
OkHttpClient okHttpClient = new OkHttpClient();
String url = "192.168.200.128/ad_update?position"+message;
Request request = new Request.Builder().url(url).build();
Call call = wCall(request);
@Override
public void onFailure(Call call, IOException e) {
//如果请求失败
e.printStackTrace();
}
怎样做宫保鸡丁@Override
public void onResponse(Call call, Response response) throws IOException {
//如果请求成功
System.out.println("请求成功:"+ ssage());
}
});
}
}
商城项⽬中还有商品⾏家同步到es的实例
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论