当前位置:   article > 正文

Flink 系例 之 Connectors 连接 RabbitMq_flink sql connector rabbitmq

flink sql connector rabbitmq
通过使用 Flink DataStream Connectors 数据流连接器连接到 RabbitMq 消息队列中间件,并提供数据流输入与输出操作;

示例环境

  1. java.version: 1.8.x
  2. flink.version: 1.11.1
  3. rabbitMq:3.5.7

示例数据源 (项目码云下载)

Flink 系例 之 搭建开发环境与数据

示例模块 (pom.xml)

Flink 系例 之 DataStream Connectors 与 示例模块

数据流输入

DataStreamSource.java

  1. package com.flink.examples.rabbitmq;
  2. import org.apache.flink.api.common.serialization.SimpleStringSchema;
  3. import org.apache.flink.streaming.api.datastream.DataStream;
  4. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  5. import org.apache.flink.streaming.connectors.rabbitmq.RMQSource;
  6. import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;
  7. /**
  8. * @Description 从MQ中获取数据并输出到DataStream流中
  9. */publicclassDataStreamSource{
  10. /**
  11. * 官方文档:https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/connectors/rabbitmq.html
  12. */publicstatic void main(String[] args) throwsException {
  13. finalStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  14. RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
  15. .setHost("127.0.0.1")
  16. .setPort(5672)
  17. .setUserName("admin")
  18. .setPassword("admin")
  19. .setVirtualHost("datastream")
  20. .build();
  21. finalDataStream<String> stream = env
  22. .addSource(new RMQSource<String>( connectionConfig, "test", true, new SimpleStringSchema()))
  23. .setParallelism(1);
  24. stream.print();
  25. env.execute("flink rabbitMq source");
  26. }
  27. }

数据流输出

DataStreamSink.java

  1. package com.flink.examples.rabbitmq;
  2. import org.apache.flink.api.common.serialization.SimpleStringSchema;
  3. import org.apache.flink.streaming.api.datastream.DataStream;
  4. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  5. import org.apache.flink.streaming.connectors.rabbitmq.RMQSink;
  6. import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;
  7. /**
  8. * @Description 将DataStream流中的数据输出到rabbitMq队列中
  9. */publicclassDataStreamSink{
  10. /**
  11. * 官方文档:https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/connectors/rabbitmq.html
  12. */publicstaticvoidmain(String[] args)throws Exception {
  13. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  14. RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
  15. .setHost("127.0.0.1")
  16. .setPort(5672)
  17. .setUserName("admin")
  18. .setPassword("admin")
  19. .setVirtualHost("datastream")
  20. .build();
  21. String [] words = new String[]{"props","student","build","name","execute"};
  22. final DataStream<String> stream = env.fromElements(words);
  23. stream.addSink(new RMQSink<String>(connectionConfig,"test",new SimpleStringSchema()));
  24. env.execute("flink rabbitMq sink");
  25. }
  26. }

数据展示

声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号