赞
踩
Flink 程序Sink(数据输出)操作(5)自定义RabbitMq-Sink
自定义sink需要继承RichSinkFunction
ex:
public static class Demo extends RichSinkFunction<IN> {}
自定义RabbitMQ sink必要依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-rabbitmq_2.12</artifactId>
<version>1.12.2</version>
</dependency>
如上依赖所示,其实已经是有rabbitMQ的连接器,但是,此连接器只能简单的Queue 模式,我们的业务需求可能不能直接使用Queue模式,比如需要发送到交换机中(Fanout、Driect)等。如果有这种场景呢,我们需要连接器的基础上再自定义RabbitMQ Sink了。
# 计算结果输出RabbitMQ配置
sink.rabbitmq.host=10.50.40.116
sink.rabbitmq.port=5673
sink.rabbitmq.username=admin
sink.rabbitmq.password=admin
sink.rabbitmq.exchange=vehicle-alarm-over-speeding
配置对应实体类
我们一会使用代码,读取我们的sink配置为对象,作为参数不断传递
@NoArgsConstructor @AllArgsConstructor @Builder @Data public class RabbitMqSinkProperties implements Serializable { /** * rabbitMQ ip */ private String host; /** * 端口 */ private int port; /** * 用户民 */ private String userName; /** * 密码 */ private String passWord; /** * 交换机名 */ private String exchange; }
RichSinkFunction
此类主要作用是作为一个RabbitMQ sink的中间模板,其中定义MQ sink 与RabbitMQ 的连接与关闭,交换机的类型指定等等。
我们某个具体的MQ sink 只需要继承此中间模板类,传输我们之前定义的配置对象,即可快速在invoke中完成对RabbitMQ数据的输出
package com.leilei.sink; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import lombok.extern.slf4j.Slf4j; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; /** * @author lei * @version 1.0 * @desc mq 公共模板类 我们我们如果有多个mq-sink 只需继承此类 即可 * @date 2021-03-15 16:41 */ @Slf4j public class DataRichSinkFunction<IN> extends RichSinkFunction<IN> { // 配置对象 后续我们在定义具体实体类时用子类触发父类构造调用 protected final RabbitMqSinkProperties rabbitMQSinkProperties; protected Connection connection; protected Channel channel; public DataRichSinkFunction(RabbitMqSinkProperties rabbitMQSinkProperties) { this.rabbitMQSinkProperties = rabbitMQSinkProperties; } /** * open() 方法中建立连接,这样不用每次 invoke 的时候都要建立连接和释放连接 * * @param parameters * @throws Exception */ @Override public void open(Configuration parameters) throws Exception { super.open(parameters); // 创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); // 设置RabbitMQ相关信息 factory.setHost(rabbitMQSinkProperties.getHost()); factory.setUsername(rabbitMQSinkProperties.getUserName()); factory.setPassword(rabbitMQSinkProperties.getPassWord()); factory.setPort(rabbitMQSinkProperties.getPort()); // 创建一个新的连接 connection = factory.newConnection(); // 创建一个通道 channel = connection.createChannel(); // 声明交换机 channel.exchangeDeclare(rabbitMQSinkProperties.getExchange(), BuiltinExchangeType.FANOUT, true); } /** * 关闭连接 flink程序从启动到销毁只会执行一次 * @throws Exception */ @Override public void close() throws Exception { super.close(); channel.close(); connection.close(); } }
由于公共MQSink模板类中已经对rabbitMq做了一个连接通道的开启和关闭,因此我们当前sink无需关系自身与Mq的连接与关闭,直接在invoke方法中,将数据输出到Mq即可
public class DemoSinkFunction extends DataRichSinkFunction<String> { public OverSpeedAlarmSinkFunction(RabbitMqSinkProperties rabbitMQSinkProperties) { /** * 调用父类(DataRichSinkFunction)构造,完成父类中属性填充 */ super(rabbitMQSinkProperties); } /** * 数据输出到 rabbitMQSinkProperties 指定的交换机中 * @param value * @param context * @throws Exception */ @Override public void invoke(String value, Context context) throws Exception { System.out.println(LocalDateTime.now() + "发送数据:" + value); channel.basicPublish(rabbitMQSinkProperties.getExchange(), "", null, value.getBytes(StandardCharsets.UTF_8)); } }
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.STREAMING); //自定义数据源加载 DataStreamSource<Location> source = env.addSource(new MyLocationSource()); //读取rabbitMq sink配置文件 String path = "RabbitMqSink.properties"; Props props = new Props(path); RabbitMqSinkProperties sinkProperties = RabbitMqSinkProperties.builder() .host(props.getStr("sink.rabbitmq.host")) .port(props.getInt("sink.rabbitmq.port")) .userName(props.getStr("sink.rabbitmq.username")) .passWord(props.getStr("sink.rabbitmq.password")) .exchange(props.getStr("sink.rabbitmq.exchange")) .build(); // TODO source进行数据处理 得到结果流 //将结果流用自定义的sink发送到rabbitmq stream.addSink(new DemoSinkFunction(sinkProperties));
从控制台打印以及RabbitMQ-WEB页面看到,我们计算的结果,成功发送到了RabbitMQ的自定义交换机中, RabbitMQ SInk 示例完成!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。