赞
踩
drop table if exists mq_config;
/*==============================================================*/
/* Table: mq_config */
/*==============================================================*/
create table mq_config
(
mq_id varchar(200) not null comment '交换机id',
exchange_type char(1) comment '交换机类型(0:DIRECT直连交换机; 1:TOPIC主题交换机; 2:FANOUT扇形交换机; 3:HEADERS头交换机)',
exchange_name varchar(200) comment '交换机名称',
queue_name varchar(200) comment '队列名称',
binding varchar(200) comment '绑定关系',
delay_type char(1) comment '是否死信队列(0:是;1:否)',
version bigint(20) default 0 comment '乐观锁',
del_flag char(1) default '0' comment '删除标志(0:存在; 1:删除)',
status char(1) default '0' comment '记录状态(0:在用; 1:停用)',
create_by varchar(64) comment '创建人',
create_time timestamp(0) default CURRENT_TIMESTAMP comment '创建时间',
update_by varchar(64) comment '修改人',
update_time datetime(0) default CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP comment '修改时间',
remark varchar(500) comment '备注',
primary key (mq_id)
);
alter table mq_config comment 'mq配置表';
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>
<!-- 连接池 -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>1.1.10</version>
</dependency>
<!-- swagger依赖 -->
<dependency>
<groupId>com.github.xiaoymin</groupId>
<artifactId>knife4j-micro-spring-boot-starter</artifactId>
<version>3.0.3</version>
</dependency>
<dependency>
<groupId>com.github.xiaoymin</groupId>
<artifactId>knife4j-spring-boot-starter</artifactId>
<version>3.0.3</version>
</dependency>
<!-- mybatis-plus -->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.5.2</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
# 应用名称
server:
port: 8080
spring:
datasource:
# 配置druid数据库连接池
druid:
#配置当前数据源类型
type: com.alibaba.druid.pool.DruidDataSource
# 配置MySQL的驱动程序类
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://localhost:3306/schedule?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8
username: root
password: root
# 配置监控统计拦截的filters,stat是sql监控,wall是防火墙(如果不添加则监控无效),添加log4j需要引入jar包
filters: stat,wall
# 连接池最大活跃连接数
max-active: 100
# 连接池初始化连接数量
initial-size: 1
# 配置获取连接等待超时的时间
max-wait: 60000
# 连接池最小空闲数
min-idle: 1
# 指定空闲连接检查、废弃连接清理、空闲连接池大小调整之间的操作时间间隔
time-between-eviction-runs-millis: 60000
# 指定一个空闲连接最少空闲多久后可被清除
min-evictable-idle-time-millis: 300000
# 连接是否有效的查询语句
validation-query: select 'x'
test-while-idle: true
test-on-borrow: false
test-on-return: false
#打开 PSCache,并且指定每个连接上 PSCache 的大小
pool-prepared-statements: true
max-open-prepared-statements: 50
max-pool-prepared-statement-per-connection-size: 20
# 配置 DruidStatFilter
web-stat-filter:
enabled: true #\u662F\u5426\u542F\u7528StatFilter\u9ED8\u8BA4\u503Ctrue
# 排除一些不必要的url,比如.js,/jslib/等
exclusions: "*.js,*.gif,*.jpg,*.png,*.css,*.ico,/druid/*"
# 过滤规则
url-pattern: /*
# 配置 DruidStatViewServlet
stat-view-servlet:
#手动重置监控数据
enabled: true
url-pattern: /druid/*
# IP白名单,没有配置或者为空,则允许所有访问
allow:
#IP黑名单,若白名单也存在,则优先使用
deny:
# 配置druid登录用户名、密码
login-username: admin
login-password: admin
# HTML 中 Reset All 按钮
reset-enable: true
rabbitmq:
host: 10.168.1.200
port: 5672
virtual-host: test
username: admin
password: admin
mybatis-plus:
configuration:
log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class MqVo implements Serializable {
private static final long serialVersionUID = -3630888028848412302L;
/**
* 交换机类型(0:DIRECT直连交换机; 1:TOPIC主题交换机; 2:FANOUT扇形交换机; 3:HEADERS头交换机)
*/
@ApiModelProperty(value = "交换机类型(0:DIRECT直连交换机; 1:TOPIC主题交换机; 2:FANOUT扇形交换机; 3:HEADERS头交换机)")
private String exchangeType;
/**
* 交换机名称
*/
@ApiModelProperty(value = "交换机名称")
private String exchangeName;
/**
* 队列名称
*/
@ApiModelProperty(value = "队列名称")
private String queueName;
/**
* 绑定关系
*/
@ApiModelProperty(value = "绑定关系")
private String binding;
/**
* 是否死信队列(0:是;1:否)
*/
@ApiModelProperty(value = "是否死信队列(0:是;1:否)")
private String delayType;
/**
* 操作类型(0:新增; 1:删除)
*/
@ApiModelProperty(value = "操作类型(0:新增; 1:删除)")
private int type;
}
@Slf4j
@Component
public class MqUtils {
/**
* 获取工厂配置类
*/
@Resource
private ConnectionFactory connectionFactory;
/**
* 新增消息对列
*/
public void mqOperate(MqVo mqVo) {
//交换机类型
String exchangeType = mqVo.getExchangeType();
log.info("exchangeType -> {}", exchangeType);
//队列名称
String queueName = mqVo.getQueueName();
log.info("queueName -> {}", queueName);
//交换机名称
String exchangeName = mqVo.getExchangeName();
log.info("exchangeName -> {}", exchangeName);
//绑定关系
String binding = mqVo.getBinding();
log.info("binding -> {}", binding);
//是否死信队列(0:是;1:否)
String delayType = mqVo.getDelayType();
log.info("delayType -> {}", delayType);
//操作类型
int status = mqVo.getType();
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
if (status == 0) {
//新增队列
rabbitAdmin.declareQueue(new Queue(queueName));
//新增交换机
rabbitAdmin.declareExchange(getExchange(exchangeType, exchangeName, delayType));
//新增绑定关系
rabbitAdmin.declareBinding(new Binding(queueName, Binding.DestinationType.QUEUE, exchangeName, binding, null));
} else {
//删除队列
rabbitAdmin.deleteQueue(queueName);
//删除交换机
rabbitAdmin.deleteExchange(exchangeName);
//删除绑定关系
rabbitAdmin.removeBinding(new Binding(queueName, Binding.DestinationType.QUEUE, exchangeName, binding, null));
}
close();
}
/**
* 交换机生成方法
* @param exchangeType 交换机类型
* @param exchangeName 交换机名称
* @param delayType 是否死信队列(0:是;1:否)
* @return Exchange
*/
private Exchange getExchange(String exchangeType, String exchangeName, String delayType) {
Exchange exchange = new DirectExchange(exchangeName);
String zero = "0";
switch (exchangeType) {
case "0":
if (zero.equals(delayType)) {
Map<String, Object> map = new HashMap<>(1);
map.put("x-delayed-type", "direct");
exchange = new CustomExchange(exchangeName, "x-delayed-message", true, false, map);
} else {
exchange = new DirectExchange(exchangeName);
}
break;
case "1":
if (zero.equals(delayType)) {
//待补充
} else {
exchange = new TopicExchange(exchangeName);
}
break;
case "2":
if (zero.equals(delayType)) {
//待补充
} else {
exchange = new FanoutExchange(exchangeName);
}
break;
case "3":
if (zero.equals(delayType)) {
//待补充
} else {
exchange = new HeadersExchange(exchangeName);
}
break;
default:
break;
}
return exchange;
}
/**
* 关闭连接
*/
public void close(){
try (Connection connection = connectionFactory.createConnection()){
try (Channel channel = connection.createChannel(true)){
com.rabbitmq.client.Connection connection1 = channel.getConnection();
connection1.close();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
@Slf4j
@Service("mqConfigService")
public class MqConfigServiceImpl extends ServiceImpl<MqConfigDao, MqConfig> implements MqConfigService {
@Resource
private MqUtils mqUtils;
@Override
public boolean add(MqConfig mqConfig) {
//写入数据库
boolean save = save(mqConfig);
//判断是否写入成功
if (save) {
//判断是否启用消息队列
status(mqConfig);
}
return save;
}
@Override
public boolean update(MqConfig mqConfig) {
//查询数据库现存信息
MqConfig byId = getById(mqConfig.getMqId());
//修改数据
boolean b = updateById(mqConfig);
//判断是否修改成功
if (b) {
//删除消息队列
addOrDelMq(byId, 1);
//判断是否启用消息队列
status(mqConfig);
}
return b;
}
/**
* 状态位为开启时,消息队列创建方法封装
* @param mqConfig
*/
private void status(MqConfig mqConfig) {
if ("0".equals(mqConfig.getStatus())) {
//创建消息队列
addOrDelMq(mqConfig, 0);
}
}
/**
* 新增队列方法封装
* @param mqConfig
*/
public void addOrDelMq(MqConfig mqConfig, int type){
mqUtils.mqOperate(
MqVo
.builder()
.exchangeType(mqConfig.getExchangeType())
.queueName(mqConfig.getExchangeName())
.exchangeName(mqConfig.getExchangeName())
.binding(mqConfig.getBinding())
.delayType(mqConfig.getDelayType())
.type(type)
.build()
);
}
/**
* 删除消息队列
* @param mqIds
* @return
*/
@Override
public boolean delete(String[] mqIds) {
Boolean b = false;
for (String mqId : mqIds) {
//查询消息队列信息
MqConfig mqConfig = getById(mqId);
//删除消息队列
b = removeById(mqId);
//判断消息队列是否删除成功
if (b) {
//删除消息队列
addOrDelMq(mqConfig, 1);
}
}
return b;
}
}
@RestController
@RequestMapping("mqConfig")
public class MqConfigController {
/**
* 服务对象
*/
@Resource
private MqConfigService mqConfigService;
/**
* 新增mq配置
*
* @param mqConfig 实体
* @return 新增是否成功
*/
@ApiOperation(value = "新增mq配置")
@PostMapping(value = "add", produces = "application/json;charset=utf-8")
public ApiResult<Boolean> add(MqConfig mqConfig) {
return ApiResult.ok("添加成功", mqConfigService.add(mqConfig));
}
/**
* 修改mq配置
*
* @param mqConfig 实体
* @return 修改是否成功
*/
@ApiOperation(value = "修改mq配置")
@PutMapping(value = "update", produces = "application/json;charset=utf-8")
public ApiResult<Boolean> update(MqConfig mqConfig) {
return ApiResult.ok("修改成功", mqConfigService.update(mqConfig));
}
/**
* 删除mq配置
*
* @param mqIds 主键
* @return 删除是否成功
*/
@ApiOperation(value = "删除mq配置")
@DeleteMapping(value = "deleteById", produces = "application/json;charset=utf-8")
public ApiResult<Boolean> deleteById(String[] mqIds) {
return ApiResult.ok("删除成功", mqConfigService.delete(mqIds));
}
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。