赞
踩
通过使用 Flink DataStream Connectors 数据流连接器连接到 RabbitMq 消息队列中间件,并提供数据流输入与输出操作;
示例环境
- java.version: 1.8.x
- flink.version: 1.11.1
- rabbitMq:3.5.7
示例数据源 (项目码云下载)
示例模块 (pom.xml)
Flink 系例 之 DataStream Connectors 与 示例模块
数据流输入
DataStreamSource.java
- package com.flink.examples.rabbitmq;
-
- import org.apache.flink.api.common.serialization.SimpleStringSchema;
- import org.apache.flink.streaming.api.datastream.DataStream;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.streaming.connectors.rabbitmq.RMQSource;
- import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;
-
- /**
- * @Description 从MQ中获取数据并输出到DataStream流中
- */publicclassDataStreamSource{
-
- /**
- * 官方文档:https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/connectors/rabbitmq.html
- */publicstatic void main(String[] args) throwsException {
- finalStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
- .setHost("127.0.0.1")
- .setPort(5672)
- .setUserName("admin")
- .setPassword("admin")
- .setVirtualHost("datastream")
- .build();
-
- finalDataStream<String> stream = env
- .addSource(new RMQSource<String>( connectionConfig, "test", true, new SimpleStringSchema()))
- .setParallelism(1);
-
- stream.print();
- env.execute("flink rabbitMq source");
- }
- }

数据流输出
DataStreamSink.java
- package com.flink.examples.rabbitmq;
-
- import org.apache.flink.api.common.serialization.SimpleStringSchema;
- import org.apache.flink.streaming.api.datastream.DataStream;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.streaming.connectors.rabbitmq.RMQSink;
- import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;
-
- /**
- * @Description 将DataStream流中的数据输出到rabbitMq队列中
- */publicclassDataStreamSink{
-
- /**
- * 官方文档:https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/connectors/rabbitmq.html
- */publicstaticvoidmain(String[] args)throws Exception {
- final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
- .setHost("127.0.0.1")
- .setPort(5672)
- .setUserName("admin")
- .setPassword("admin")
- .setVirtualHost("datastream")
- .build();
-
- String [] words = new String[]{"props","student","build","name","execute"};
- final DataStream<String> stream = env.fromElements(words);
- stream.addSink(new RMQSink<String>(connectionConfig,"test",new SimpleStringSchema()));
- env.execute("flink rabbitMq sink");
- }
- }

数据展示
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。