赞
踩
经过一天的奋斗终于成功了。。。。。。
- <!--flink依赖-->
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-java</artifactId>
- <version>1.6.1</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-java_2.11</artifactId>
- <version>1.6.1</version>
- <!--<scope>provided</scope>-->
- </dependency>
-
- <!--rabbitmq-->
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-rabbitmq_2.12</artifactId>
- <version>1.8.0</version>
- </dependency>
- <!--gson-->
- <dependency>
- <groupId>com.google.code.gson</groupId>
- <artifactId>gson</artifactId>
- <version>2.8.0</version>
- </dependency>
-
- <dependency>
- <groupId>com.alibaba</groupId>
- <artifactId>druid</artifactId>
- <version>1.1.9</version>
- </dependency>
-
- <!--mysql数据库驱动 -->
- <dependency>
- <groupId>mysql</groupId>
- <artifactId>mysql-connector-java</artifactId>
- <version>5.1.38</version>
- </dependency>
- import org.apache.flink.streaming.api.datastream.DataStream;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.streaming.connectors.rabbitmq.RMQSource;
- import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;
- import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
-
- public class TestMysql {
- public static void main(String[] args) throws Exception{
- //1.获取flink的运行环境
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
- .setHost("192.168.6.11") //地址
- .setPort(5672)
- .setUserName("admin") //别用默认的,自己创建一个用户,注意用户权限
- .setPassword("admin")
- .setVirtualHost("/")
- .build();
-
- //2.连接socket获取输入的数据(数据源Data Source)
- //2.1. rabbitmq连接的配置,2.rabbitmq的队列名,消费的队列名
- DataStream<String> dataStreamSource = env.addSource(new RMQSource<String>(connectionConfig,
- "tutu",true, new SimpleStringSchema()));
- dataStreamSource.print(); //输出Source的信息
-
-
- //3.数据转换
- //MapFunction:第一个参数是你接收的数据的类型
- //MapFunction:第二个参数是返回数据的类型
- DataStream<ApisMessageVo> requtapisMessageVo = dataStreamSource.map(new
- MapFunction<String, ApisMessageVo>() {
- @Override
- public ApisMessageVo map(String value) throws Exception {
- Gson gson = new Gson();
- //反序列化,拿到实体(我这里接受的是实体,如果你只是string 直接return就完事了)
- ApisMessageVo apisMessageVo = gson.fromJson(value, ApisMessageVo.class);
- return apisMessageVo;
- }
- });
-
- //4.sink输出
- requtapisMessageVo.addSink(new SinkMysql());
- //5.这一行代码一定要实现,否则程序不执行
- env.execute("Socket window coun1t");
- }
- }
- import org.apache.flink.configuration.Configuration;
- import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
- import java.sql.*;
- import java.util.List;
- import java.util.UUID;
- /**
- * 插入操作
- */
- public class SinkMysql extends RichSinkFunction<ApisMessageVo> {
- DruidDataSource druidDataSource; //阿里的连接池
- private PreparedStatement ps;
- private Connection connection;
-
-
- /**
- * open() 方法中建立连接,这样不用每次 invoke 的时候都要建立连接和释放连接
- * @param parameters
- * @throws Exception
- */
- @Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
- druidDataSource = new DruidDataSource();
- connection = getConnection(druidDataSource); //连接池
- }
-
- @Override
- public void close() throws Exception {
- super.close();
- //关闭连接和释放资源
- if (connection != null) {
- connection.close();
- }
- if (ps != null) {
- ps.close();
- }
- }
- /**
- * 每条数据的插入都要调用一次 invoke() 方法
- *
- * @param value
- * @param context
- * @throws Exception
- */
- @Override
- public void invoke(ApisMessageVo value, Context context) throws Exception {
- //获取对象,换成自己对象,或者什么string,随意。
-
- ps =connection.prepareStatement(""insert into
- iapi_passenger_contact_information(" +
- " ID ,\n" +
- " PCommunicationType ,\n" +
- " PCommunicationAddress ) values(?,?,?);");
- ps.setString(1,UUID.randomUUID().toString().replaceAll("-",""));
- ps.setString(2,value.getpCommunicationType());
- ps.setString(3,value.getpCommunicationAddress());
- ps.executeUpdate(); //结果集
- System.out.println("成功");
- }
- private static Connection getConnection(DruidDataSource druidDataSource) {
- druidDataSource.setDriverClassName("com.mysql.jdbc.Driver");
- //注意,替换成自己本地的 mysql 数据库地址和用户名、密码
- druidDataSource.setUrl("jdbc:mysql://192.168.6.22:3306/pics");
- druidDataSource.setUsername("root");
- druidDataSource.setPassword("1qaz!QAZ");
- //设置连接池的一些参数
- //1.数据库连接池初始化的连接个数
- druidDataSource.setInitialSize(50);
- //2.指定最大的连接数,同一时刻可以同时向数据库申请的连接数
- druidDataSource.setMaxActive(200);
- //3.指定小连接数:在数据库连接池空闲状态下,连接池中保存的最少的空闲连接数
- druidDataSource.setMinIdle(30);
- Connection con = null;
- try {
- con = druidDataSource.getConnection();
- System.out.println("创建连接池:" + con);
- } catch (Exception e) {
- System.out.println("-----------mysql get connection has exception , msg = " + e.getMessage());
- }
- return con;
- }
- }
这是第二项目,方便测试。
- <!--rocketmq-->
- <dependency>
- <groupId>org.apache.rocketmq</groupId>
- <artifactId>rocketmq-client</artifactId>
- <version>4.2.0</version>
- </dependency>
- <!--gson-->
- <dependency>
- <groupId>com.google.code.gson</groupId>
- <artifactId>gson</artifactId>
- <version>2.8.0</version>
- </dependency>
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
-
- public class Producer {
- public final static String QUEUE_NAME="tutu"; //队列的名字
- public static void main(String[] args) throws IOException, TimeoutException {
- //创建连接工厂
- ConnectionFactory factory = new ConnectionFactory();
-
- //设置RabbitMQ相关信息
- factory.setHost("192.168.6.11");
- factory.setUsername("admin");
- factory.setPassword("admin");
- factory.setPort(5672);
-
- Connection connection = factory.newConnection();
- //如果是string就不需要序列化了
- ApisMessageVo apisMessageVo=new ApisMessageVo();
- //set 赛数据,就不写了。
-
- String str= parser.gson.toJson(vo); //序列化
-
- Channel channel = connection.createChannel();
- channel.queueDeclare(QUEUE_NAME, true, false, false, null);
- while(true) {
- channel.basicPublish("", QUEUE_NAME, null, str.toString().getBytes("utf-8"));
- }
- //关闭通道和连接
- //channel.close();
- //connection.close();
- }
- }
写的比较简单,不懂得可以交流一下。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。