当前位置:   article > 正文

SpringBoot整合Canal+RabbitMQ监听数据变更(对rabbit进行模块封装)

SpringBoot整合Canal+RabbitMQ监听数据变更(对rabbit进行模块封装)

SpringBoot+Canal(监听MySQL的binlog)+RabbitMQ(处理保存变更记录)

在SpringBoot中采用一种与业务代码解耦合的方式,来实现数据的变更记录,记录的内容是新数据,如果是更新操作还得有旧数据内容。
使用Canal来监听MySQL的binlog变化可以实现这个需求,可是在监听到变化后需要马上保存变更记录,除非再做一些逻辑处理,于是又结合了RabbitMQ来处理保存变更记录的操作。

  • 启动MySQL环境,并开启binlog
  • 启动Canal环境,为其创建一个MySQL账号,然后以Slave的形式连接MySQL
  • Canal服务模式设为TCP,用Java编写客户端代码,监听MySQL的binlog修改
  • Canal服务模式设为RabbitMQ,启动RabbitMQ环境,配置Canal和RabbitMQ的连接,用消息队列去接收binlog修改事件

预先在model实体中准备

短信实体

@Data
@ApiModel(description = "短信实体")
public class MsmVo{
	
	@ApiModelProperty(value="phone")
	private String phone;

	@ApiModelProperty(value = "短信模板code")
	private  String templateCode;

	@ApiModelProperty(value="短信模板参数")
	private Map<String,Object> param;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

排班实体

@Data
@ApiModel(description = "OrderMqVo")
public class OrderMqVo{
	
	@ApiModelProperty(value="可预约数")
	private Integer reserverdNumber

	@ApiModelProperty(value = "剩余预约数")
	private Integer availableNumber;

	@ApiModelProperty(value = "排班id")
	private String scheduleId;

	@ApiModelProperty(value = "短信实体")
	private MsmVo msmVo;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

一、安装RabbitMQ

docker pull rabbitmq:nanagemnet
docker run -d -p 5672:5672 -p 12672:15672 --name rabbitmq rabbitmq:nanagement
  • 1
  • 2

访问:http://IP:15672
在这里插入图片描述

二、rabbit-util模块封装

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
	<groupId>org.springframework.cloud</groupId>
	<artifactId>spring-cloud-starter-bus-amqp</artifactId>
</dependency>
<dependency>
	<groupId>com.alibaba</groupId>
	<artifactId>fastjson</artifactId>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

创建一个RabbitService用来发送消息

@Service
public class RabbitService{
	
	@Autowired
	private RabbitTemplate rabbitTemplate;

	//发送消息
	public boolean sendMessage(String exchange,String routingKey,Object message){
		rabbitTemplate.convertAndSend(exchange,routingKey,message);
		return true;
	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

创建mq消息转化器

@Configuration
public class MQConfig{
	
	@Bean
	public MessageConverter messageConverter(){
		return new Jackson2JsonMessageConverter();
	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

添加常量配置类

public class MqConst{
	
	//预约下单
	public static final String EXCHANGE_DIRECT_ORDER = "exchange.direct.order";
	public static final String ROUTING_ORDER = "order";
	//队列
	public static final String QUEUE_ORDER = "queue.order";

	//短信
	public static final String EXCHANGE_DIRECT_MSM = "exchange.direct.msm";
	public static final String ROUTING_MSM_ITEM = "msm.item";
	pulib static final String Queue_MSM_item = "queue.msm.item";
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

三、短信模块service-sms

将二中的模块依赖引入

<dependency>
	<groupId>com.michael</groupId>
	<artifactId>rabbit_util</artifactId>
	<version>xxx</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5

配置文件application.properties

spring.rabbitmq.host=192.168.44.168
spring.rabbitmq.port=5672
spring.rabbit.uername=guest
spring.rabbitmq.password=guest
  • 1
  • 2
  • 3
  • 4

Service发送消息

public interface MsmService{
	//发送手机验证码
	boolean send(String phone,String code);
	
	//MQ使用发送短信的接口
	boolean send(MsmVo msmVo);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
@Service
public class MsmServiceImpl implements MsmService{

	@Override
	public boolean send(String phone,String code){
		//判断手机号是否为空
		if(StringUtils.isEmpty(phone)){
			return false;
		}

		//整合阿里云相关参数,短信服务
		DefaultProfile profile = DefaultProfile.getProfile(ConstantPropertiesUtils.REGION_Id,
			ConstantPropertiesUtils.ACCESS_KEY_ID,
			ConstantPropertiesUtils.SECRET
		);
		IAcsClient client = new DefaultAcsClient(profile);
		CommonRequest request = new CommonRequest();

		request.setMethod(MethodType.POST);
		request.setDomain("dysmsapi.aliyuncs.com");
		request.setVersion("2018-08-08");
		request.setAction("SendSms");

		//手机号
		request.putQueryParameter("PhoneNumbers",phone);
		//签名名称
		request.putQueryParameter("SignName","我的网站");
		//模板
		request.putQueryParameter("TemplateCode","SMS_180051135");
		//验证码使用json格式{"code":"123456"}
		Map<String,Object> param = new HashMap();
		param.put("code",code);
		request.putQueryParameter("TemplateParam",JSONObject.toJSONString(param));
		
		//调用方法进行短信发送
		try{
			CommonResponse response = client.getCommonResponse(request);
			System.out.println(response.getData());
			return response.getHttpResponse().isSuccess();
		}catch(ServerException e){
			e.printStackTrace();
		}catch(ClientException e){
			e.printStackTrace();
		}
		return false;
	}
	
	@Override
	public boolean send(MsmVo msmVo){
		
		if(!StringUtils.isEmpty(msmVO.getPhone())){
			String code = (String)msmVo.getParam().get("code");
			boolean isSend = this.send(msmVo.getPhone(),code);
			return isSend;
		}
		return false;
	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58

创建mq监控器

@Component
public class MsmReceiver{
	
	@Autowired
	private MsmService msmService;

	//监听
	@RabbitListener(bindings = @QueueBinding(
		value = @Queue(value = MqConst.QUEUE_MSM_ITEM,durable = "true"),
		exchange = @Exchange(value = MqConst.EXCHANGE_DIRECT_MSM),
		key = {MqConst.ROUTING_MSM_ITEM}
	))
	public void send(MsmVo msmVo,Message message,Channel channel){
		msmService.ssend(msmVo);
	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

四、业务类

生成订单之后,发送短信并更新数量

①、业务模块中引入依赖

rabbit-util

②、添加配置

spring.rabbitmq.host=192.168.44.165
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
  • 1
  • 2
  • 3
  • 4

③、service接口以及实现类

@Override
public void update(Schedule schedule){
	schedule.setUpdata(new Date());
	scheduleRepository.save(schedule);
}
  • 1
  • 2
  • 3
  • 4
  • 5

④、receiver包中创建MQ监听器

@Component
public class HospitalReceiver{
	
	@Autowired
	private ScheduleService scheduleService;

	@Autowired
	private RabbitService rabbitService;

	//监听
	@RabbitListener(
		bindings = @QueueBinding(
			value = @Queue(value = MqConst.QUEUE_ORDER,durable = "true"),
			exchange = @Exchange(value = MqConst.EXCHANGE_DIRECT_ORDER),
			key = {MqConst.ROUTING_ORDER}
		)
	)
	public void receiver(OrderMqVo orderMqVo,Message message,Channel channle) throws IOException{
		//下单成功,更新数据
		Schedule schedule = scheduleService.getScheduleId(orderMqVo.getScheduleId());
		schedule.setReservedNumber(orderMqVo.getReservedNumber());
		schedule.setAvailableNumber(orderMqVo.getAvailableNumber);
		scheduleService.update(schedule);

		//发送短信
		MsmVo msmVo = orderMqVo.getMsmVo();
		if(null != msmVo){
			rebbitService.sendMessage(MqConst.QUEUE_MSM_ITEM,MqConst.ROUTING_MSM_ITEM,msmVo);
		}
	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/菜鸟追梦旅行/article/detail/445295
推荐阅读
相关标签
  

闽ICP备14008679号