赞
踩
flink 提供了专门操作RabbitMQ的连接器,使用起来更方便,配置连接信息即可快速实现数据读取与输出,但目前仅支持Queue模式,如需使用交换机模式,仍需要自定义RabbitMQ 数据源读取与数据
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-rabbitmq_2.12</artifactId>
<version>1.12.2</version>
</dependency>
package com.leilei; import com.alibaba.fastjson.JSON; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.rabbitmq.RMQSink; import org.apache.flink.streaming.connectors.rabbitmq.RMQSource; import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig; import java.nio.charset.StandardCharsets; /** * @author lei * @version 1.0 * @date 2021/3/14 15:27 * @desc flink 连接器 rabbitmq */ public class FlinkConnectorsRabbitMq { public static void main(String[] args) { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); final RMQConnectionConfig rabbitConfig = new RMQConnectionConfig.Builder() .setHost("xxx") .setUserName("admin") .setPassword("xx") .setPort(5672) .setVirtualHost("/") .build(); //rabbit connectors 加载数据 DataStream<String> stream = env .addSource(new RMQSource<>( // mq 连接配置 rabbitConfig, // 队列名 "vehicle-location", true, // 反序列化方式 new SimpleStringSchema(StandardCharsets.UTF_8))) .setParallelism(1); stream.print(); //rabbit connectors 数据输出 stream.addSink(new RMQSink<>( rabbitConfig, // 输出到哪个队列 "over-speeding-alarm", // 序列化方式 new SimpleStringSchema(StandardCharsets.UTF_8))); try { env.execute(); } catch (Exception e) { e.printStackTrace(); } } @Data @NoArgsConstructor @AllArgsConstructor public static class Location { private String id; private String licensePlate; private String plateColor; private Integer speed; private Integer limitSpeed; private Long deviceTime; private String zone; } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。