当前位置:   article > 正文

flink rabbitmq 读取和写入mysql_rabbit 读数据库表

rabbit 读数据库表

经过一天的奋斗终于成功了。。。。。。

1.maven

  1. <!--flink依赖-->
  2. <dependency>
  3. <groupId>org.apache.flink</groupId>
  4. <artifactId>flink-java</artifactId>
  5. <version>1.6.1</version>
  6. </dependency>
  7. <dependency>
  8. <groupId>org.apache.flink</groupId>
  9. <artifactId>flink-streaming-java_2.11</artifactId>
  10. <version>1.6.1</version>
  11. <!--<scope>provided</scope>-->
  12. </dependency>
  13. <!--rabbitmq-->
  14. <dependency>
  15. <groupId>org.apache.flink</groupId>
  16. <artifactId>flink-connector-rabbitmq_2.12</artifactId>
  17. <version>1.8.0</version>
  18. </dependency>
  19. <!--gson-->
  20. <dependency>
  21. <groupId>com.google.code.gson</groupId>
  22. <artifactId>gson</artifactId>
  23. <version>2.8.0</version>
  24. </dependency>
  25. <dependency>
  26. <groupId>com.alibaba</groupId>
  27. <artifactId>druid</artifactId>
  28. <version>1.1.9</version>
  29. </dependency>
  30. <!--mysql数据库驱动 -->
  31. <dependency>
  32. <groupId>mysql</groupId>
  33. <artifactId>mysql-connector-java</artifactId>
  34. <version>5.1.38</version>
  35. </dependency>

2.flink的代码

  1. import org.apache.flink.streaming.api.datastream.DataStream;
  2. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  3. import org.apache.flink.streaming.connectors.rabbitmq.RMQSource;
  4. import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;
  5. import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
  6. public class TestMysql {
  7. public static void main(String[] args) throws Exception{
  8. //1.获取flink的运行环境
  9. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  10. RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
  11. .setHost("192.168.6.11") //地址
  12. .setPort(5672)
  13. .setUserName("admin") //别用默认的,自己创建一个用户,注意用户权限
  14. .setPassword("admin")
  15. .setVirtualHost("/")
  16. .build();
  17. //2.连接socket获取输入的数据(数据源Data Source)
  18. //2.1. rabbitmq连接的配置,2.rabbitmq的队列名,消费的队列名
  19. DataStream<String> dataStreamSource = env.addSource(new RMQSource<String>(connectionConfig,
  20. "tutu",true, new SimpleStringSchema()));
  21. dataStreamSource.print(); //输出Source的信息
  22. //3.数据转换
  23. //MapFunction:第一个参数是你接收的数据的类型
  24. //MapFunction:第二个参数是返回数据的类型
  25. DataStream<ApisMessageVo> requtapisMessageVo = dataStreamSource.map(new
  26. MapFunction<String, ApisMessageVo>() {
  27. @Override
  28. public ApisMessageVo map(String value) throws Exception {
  29. Gson gson = new Gson();
  30. //反序列化,拿到实体(我这里接受的是实体,如果你只是string 直接return就完事了)
  31. ApisMessageVo apisMessageVo = gson.fromJson(value, ApisMessageVo.class);
  32. return apisMessageVo;
  33. }
  34. });
  35. //4.sink输出
  36. requtapisMessageVo.addSink(new SinkMysql());
  37. //5.这一行代码一定要实现,否则程序不执行
  38. env.execute("Socket window coun1t");
  39. }
  40. }

2.1SinkMysql 

  1. import org.apache.flink.configuration.Configuration;
  2. import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
  3. import java.sql.*;
  4. import java.util.List;
  5. import java.util.UUID;
  6. /**
  7. * 插入操作
  8. */
  9. public class SinkMysql extends RichSinkFunction<ApisMessageVo> {
  10. DruidDataSource druidDataSource; //阿里的连接池
  11. private PreparedStatement ps;
  12. private Connection connection;
  13. /**
  14. * open() 方法中建立连接,这样不用每次 invoke 的时候都要建立连接和释放连接
  15. * @param parameters
  16. * @throws Exception
  17. */
  18. @Override
  19. public void open(Configuration parameters) throws Exception {
  20. super.open(parameters);
  21. druidDataSource = new DruidDataSource();
  22. connection = getConnection(druidDataSource); //连接池
  23. }
  24. @Override
  25. public void close() throws Exception {
  26. super.close();
  27. //关闭连接和释放资源
  28. if (connection != null) {
  29. connection.close();
  30. }
  31. if (ps != null) {
  32. ps.close();
  33. }
  34. }
  35. /**
  36. * 每条数据的插入都要调用一次 invoke() 方法
  37. *
  38. * @param value
  39. * @param context
  40. * @throws Exception
  41. */
  42. @Override
  43. public void invoke(ApisMessageVo value, Context context) throws Exception {
  44. //获取对象,换成自己对象,或者什么string,随意。
  45. ps =connection.prepareStatement(""insert into
  46. iapi_passenger_contact_information(" +
  47. " ID ,\n" +
  48. " PCommunicationType ,\n" +
  49. " PCommunicationAddress ) values(?,?,?);");
  50. ps.setString(1,UUID.randomUUID().toString().replaceAll("-",""));
  51. ps.setString(2,value.getpCommunicationType());
  52. ps.setString(3,value.getpCommunicationAddress());
  53. ps.executeUpdate(); //结果集
  54. System.out.println("成功");
  55. }
  56. private static Connection getConnection(DruidDataSource druidDataSource) {
  57. druidDataSource.setDriverClassName("com.mysql.jdbc.Driver");
  58. //注意,替换成自己本地的 mysql 数据库地址和用户名、密码
  59. druidDataSource.setUrl("jdbc:mysql://192.168.6.22:3306/pics");
  60. druidDataSource.setUsername("root");
  61. druidDataSource.setPassword("1qaz!QAZ");
  62. //设置连接池的一些参数
  63. //1.数据库连接池初始化的连接个数
  64. druidDataSource.setInitialSize(50);
  65. //2.指定最大的连接数,同一时刻可以同时向数据库申请的连接数
  66. druidDataSource.setMaxActive(200);
  67. //3.指定小连接数:在数据库连接池空闲状态下,连接池中保存的最少的空闲连接数
  68. druidDataSource.setMinIdle(30);
  69. Connection con = null;
  70. try {
  71. con = druidDataSource.getConnection();
  72. System.out.println("创建连接池:" + con);
  73. } catch (Exception e) {
  74. System.out.println("-----------mysql get connection has exception , msg = " + e.getMessage());
  75. }
  76. return con;
  77. }
  78. }

3.rabbitmq  生产者(测试用到得)

         这是第二项目,方便测试。

  1. <!--rocketmq-->
  2. <dependency>
  3. <groupId>org.apache.rocketmq</groupId>
  4. <artifactId>rocketmq-client</artifactId>
  5. <version>4.2.0</version>
  6. </dependency>
  7. <!--gson-->
  8. <dependency>
  9. <groupId>com.google.code.gson</groupId>
  10. <artifactId>gson</artifactId>
  11. <version>2.8.0</version>
  12. </dependency>
  1. import java.io.IOException;
  2. import java.util.concurrent.TimeoutException;
  3. import com.rabbitmq.client.Channel;
  4. import com.rabbitmq.client.Connection;
  5. import com.rabbitmq.client.ConnectionFactory;
  6. public class Producer {
  7. public final static String QUEUE_NAME="tutu"; //队列的名字
  8. public static void main(String[] args) throws IOException, TimeoutException {
  9. //创建连接工厂
  10. ConnectionFactory factory = new ConnectionFactory();
  11. //设置RabbitMQ相关信息
  12. factory.setHost("192.168.6.11");
  13. factory.setUsername("admin");
  14. factory.setPassword("admin");
  15. factory.setPort(5672);
  16. Connection connection = factory.newConnection();
  17. //如果是string就不需要序列化了
  18. ApisMessageVo apisMessageVo=new ApisMessageVo();
  19. //set 赛数据,就不写了。
  20. String str= parser.gson.toJson(vo); //序列化
  21. Channel channel = connection.createChannel();
  22. channel.queueDeclare(QUEUE_NAME, true, false, false, null);
  23. while(true) {
  24. channel.basicPublish("", QUEUE_NAME, null, str.toString().getBytes("utf-8"));
  25. }
  26. //关闭通道和连接
  27. //channel.close();
  28. //connection.close();
  29. }
  30. }

写的比较简单,不懂得可以交流一下。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/木道寻08/article/detail/819973
推荐阅读
相关标签
  

闽ICP备14008679号