当前位置:   article > 正文

Flink(六)Flink自定义Data Sink_flink dataset 使用addsink

flink dataset 使用addsink

前言

这篇文章将写一个 demo 教大家将从 Kafka Source 的数据 Sink 到 MySQL 中去。

准备工作

我们先来看下 Flink 从 Kafka topic 中获取数据的 demo,首先你需要安装好了 Flink 和 Kafka 。

运行启动 Flink、Zookepeer、Kafka

好了,都启动了!

数据库建表

  1. DROP TABLE IF EXISTS `student`;
  2. CREATE TABLE `student` (
  3. `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
  4. `name` varchar(25) COLLATE utf8_bin DEFAULT NULL,
  5. `password` varchar(25) COLLATE utf8_bin DEFAULT NULL,
  6. `age` int(10) DEFAULT NULL,
  7. PRIMARY KEY (`id`)
  8. ) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8 COLLATE=utf8_bin;

实体类

Student.java

  1. package com.zhisheng.flink.model;
  2. /**
  3. * Desc:
  4. * weixin: zhisheng_tian
  5. * blog: http://www.54tianzhisheng.cn/
  6. */
  7. public class Student {
  8. public int id;
  9. public String name;
  10. public String password;
  11. public int age;
  12. public Student() {
  13. }
  14. public Student(int id, String name, String password, int age) {
  15. this.id = id;
  16. this.name = name;
  17. this.password = password;
  18. this.age = age;
  19. }
  20. @Override
  21. public String toString() {
  22. return "Student{" +
  23. "id=" + id +
  24. ", name='" + name + '\'' +
  25. ", password='" + password + '\'' +
  26. ", age=" + age +
  27. '}';
  28. }
  29. public int getId() {
  30. return id;
  31. }
  32. public void setId(int id) {
  33. this.id = id;
  34. }
  35. public String getName() {
  36. return name;
  37. }
  38. public void setName(String name) {
  39. this.name = name;
  40. }
  41. public String getPassword() {
  42. return password;
  43. }
  44. public void setPassword(String password) {
  45. this.password = password;
  46. }
  47. public int getAge() {
  48. return age;
  49. }
  50. public void setAge(int age) {
  51. this.age = age;
  52. }
  53. }

工具类

工具类往 kafka topic student 发送数据

  1. import com.alibaba.fastjson.JSON;
  2. import com.zhisheng.flink.model.Metric;
  3. import com.zhisheng.flink.model.Student;
  4. import org.apache.kafka.clients.producer.KafkaProducer;
  5. import org.apache.kafka.clients.producer.ProducerRecord;
  6. import java.util.HashMap;
  7. import java.util.Map;
  8. import java.util.Properties;
  9. /**
  10. * 往kafka中写数据
  11. * 可以使用这个main函数进行测试一下
  12. * weixin: zhisheng_tian
  13. * blog: http://www.54tianzhisheng.cn/
  14. */
  15. public class KafkaUtils2 {
  16. public static final String broker_list = "localhost:9092";
  17. public static final String topic = "student"; //kafka topic 需要和 flink 程序用同一个 topic
  18. public static void writeToKafka() throws InterruptedException {
  19. Properties props = new Properties();
  20. props.put("bootstrap.servers", broker_list);
  21. props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  22. props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  23. KafkaProducer producer = new KafkaProducer<String, String>(props);
  24. for (int i = 1; i <= 100; i++) {
  25. Student student = new Student(i, "zhisheng" + i, "password" + i, 18 + i);
  26. ProducerRecord record = new ProducerRecord<String, String>(topic, null, null, JSON.toJSONString(student));
  27. producer.send(record);
  28. System.out.println("发送数据: " + JSON.toJSONString(student));
  29. }
  30. producer.flush();
  31. }
  32. public static void main(String[] args) throws InterruptedException {
  33. writeToKafka();
  34. }
  35. }

SinkToMySQL

该类就是 Sink Function,继承了 RichSinkFunction ,然后重写了里面的方法。在 invoke 方法中将数据插入到 MySQL 中。

  1. package com.zhisheng.flink.sink;
  2. import com.zhisheng.flink.model.Student;
  3. import org.apache.flink.configuration.Configuration;
  4. import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
  5. import java.sql.Connection;
  6. import java.sql.DriverManager;
  7. import java.sql.PreparedStatement;
  8. /**
  9. * Desc:
  10. * weixin: zhisheng_tian
  11. * blog: http://www.54tianzhisheng.cn/
  12. */
  13. public class SinkToMySQL extends RichSinkFunction<Student> {
  14. PreparedStatement ps;
  15. private Connection connection;
  16. /**
  17. * open() 方法中建立连接,这样不用每次 invoke 的时候都要建立连接和释放连接
  18. *
  19. * @param parameters
  20. * @throws Exception
  21. */
  22. @Override
  23. public void open(Configuration parameters) throws Exception {
  24. super.open(parameters);
  25. connection = getConnection();
  26. String sql = "insert into Student(id, name, password, age) values(?, ?, ?, ?);";
  27. ps = this.connection.prepareStatement(sql);
  28. }
  29. @Override
  30. public void close() throws Exception {
  31. super.close();
  32. //关闭连接和释放资源
  33. if (connection != null) {
  34. connection.close();
  35. }
  36. if (ps != null) {
  37. ps.close();
  38. }
  39. }
  40. /**
  41. * 每条数据的插入都要调用一次 invoke() 方法
  42. *
  43. * @param value
  44. * @param context
  45. * @throws Exception
  46. */
  47. @Override
  48. public void invoke(Student value, Context context) throws Exception {
  49. //组装数据,执行插入操作
  50. ps.setInt(1, value.getId());
  51. ps.setString(2, value.getName());
  52. ps.setString(3, value.getPassword());
  53. ps.setInt(4, value.getAge());
  54. ps.executeUpdate();
  55. }
  56. private static Connection getConnection() {
  57. Connection con = null;
  58. try {
  59. Class.forName("com.mysql.jdbc.Driver");
  60. con = DriverManager.getConnection("jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8", "root", "root123456");
  61. } catch (Exception e) {
  62. System.out.println("-----------mysql get connection has exception , msg = "+ e.getMessage());
  63. }
  64. return con;
  65. }
  66. }

Flink 程序

这里的 source 是从 kafka 读取数据的,然后 Flink 从 Kafka 读取到数据(JSON)后用阿里 fastjson 来解析成 student 对象,然后在 addSink 中使用我们创建的 SinkToMySQL,这样就可以把数据存储到 MySQL 了。

  1. package com.zhisheng.flink;
  2. import com.alibaba.fastjson.JSON;
  3. import com.zhisheng.flink.model.Student;
  4. import com.zhisheng.flink.sink.SinkToMySQL;
  5. import org.apache.flink.api.common.serialization.SimpleStringSchema;
  6. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  7. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  8. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  9. import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
  10. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
  11. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
  12. import java.util.Properties;
  13. /**
  14. * Desc:
  15. * weixin: zhisheng_tian
  16. * blog: http://www.54tianzhisheng.cn/
  17. */
  18. public class Main3 {
  19. public static void main(String[] args) throws Exception {
  20. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  21. Properties props = new Properties();
  22. props.put("bootstrap.servers", "localhost:9092");
  23. props.put("zookeeper.connect", "localhost:2181");
  24. props.put("group.id", "metric-group");
  25. props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  26. props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  27. props.put("auto.offset.reset", "latest");
  28. SingleOutputStreamOperator<Student> student = env.addSource(new FlinkKafkaConsumer011<>(
  29. "student", //这个 kafka topic 需要和上面的工具类的 topic 一致
  30. new SimpleStringSchema(),
  31. props)).setParallelism(1)
  32. .map(string -> JSON.parseObject(string, Student.class)); //Fastjson 解析字符串成 student 对象
  33. student.addSink(new SinkToMySQL()); //数据 sink 到 mysql
  34. env.execute("Flink add sink");
  35. }
  36. }

结果

运行 Flink 程序,然后再运行 KafkaUtils2.java 工具类,这样就可以了。

如果数据插入成功了,那么我们查看下我们的数据库:

数据库中已经插入了 100 条我们从 Kafka 发送的数据了。证明我们的 SinkToMySQL 起作用了。

原创地址为:http://www.54tianzhisheng.cn/2018/10/28/flink-sources/

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小小林熬夜学编程/article/detail/615096
推荐阅读
相关标签
  

闽ICP备14008679号