赞
踩
我们在IntelliJ中新建一个工程SinkToRabbitMQ。
Archetype填入:org.apache.flink:flink-quickstart-java
版本填入与Flink的版本:1.19.1
在pom.xml中新增RabbitMQ连接器
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-rabbitmq</artifactId>
<version>3.0.1-1.17</version>
</dependency>
这段代码将产生两个字符串数据,后续这些数据会被写入到RabbitMQ的队列中。
List<String> data = new ArrayList<>();
data.add("Hello, World!");
data.add("Hello, Flink!");
DataStream<String> stream = env.fromCollection(data);
不同于《Java版Flink使用指南——从RabbitMQ中队列中接入消息流》中创建RMQSource用来接收RabbitMQ队列中数据,这次我们创建RMQSink用来发布数据。
String sinkQueueName = "data.to.rbtmq"; // name of the queue to send data to String host = "172.21.112.140"; // IP of the rabbitmq server int port = 5672; String username = "admin"; String password = "fangliang"; String virtualHost = "/"; int parallelism = 1; RMQConnectionConfig rmqConnectionConfig = new RMQConnectionConfig.Builder() .setHost(host) .setPort(port) .setUserName(username) .setPassword(password) .setVirtualHost(virtualHost) .build(); RMQSink<String> stringRMQSink = new RMQSink<>(rmqConnectionConfig, sinkQueueName, new SimpleStringSchema()); stream.addSink(stringRMQSink).name(username + "'s sink to " + sinkQueueName).setParallelism(parallelism);
打包、提交并运行任务
然后在RabbitMQ的后台可以看到收到两条消息
其内容也是我们之前在代码中生成的内容
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。