当前位置:   article > 正文

Flink(Scala版)消费Kafka数据存入Mysql_flink保存至mysql scala

flink保存至mysql scala

题目要求:1.使用Flume采集指定日志文件,并将采集到的数据存入kafka
2.将存入kafka的消息,使用Flink进行处理并存入Mysql

第一步 :编写Flume文档

在Flume目录下进入job目录,编写flume-kafka.conf文件
在这里插入图片描述

a.sources = s1
a.channels = c1
a.sinks = k1

a.sources.s1.type = exec			
a.sources.s1.command = tail -F /usr/apps/tmp/redis.log		

a.channels.c1.type = memory
a.channels.c1.capacity = 1000
a.channels.c1.transactionCapacity = 100

a.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a.sinks.k1.kafka.topic = ssm					
a.sinks.k1.kafka.bootstrap.servers = master:9092,slave1:9092,slave2:9092
a.sinks.k1.kafka.flumeBatchSize = 20
a.sinks.k1.kafka.producer.acks = 1
a.sinks.k1.kafka.producer.linger.ms = 1
a.sinks.k1.kafka.producer.compression.type = snappy

a.sources.s1.channels = c1
a.sinks.k1.channel = c1
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

对上文个别语句的解释:
a.sources.s1.type = exec //表示监听文件
a.sources.s1.command =tail -F /usr/apps/tmp/redis.log //监听文件的绝对路径
a.sinks.k1.kafka.topic = ssm //存入kafka的主题中,主题名为:ssm
监听的文件自行创建,保证和文档内写的相同就可以。
注意:此处不需要使用kafka提前创建主题,kafka会自动创建
关于Kafka的命令:kafka命令大全

第二步:开启Flume采集

1、jps 检查Kafka和ZK进程是否都开启了!
在这里插入图片描述
确认开启之后,开始Flume采集.
采集命令:
bin/flume-ng agent -c conf/ -n a -f job/flume-kafka.conf -Dflume.root.logger=INFO,console
Flume命令以及详细过程,请参考以下文章:Flume命令详解参考文章

第三步:编写Flink程序

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
import java.sql.{Connection, DriverManager, PreparedStatement}
import java.util.Properties

case class UVPV(user_id: String, times: Long)

object KafkaToMysql_test1 {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val prop = new Properties()
    prop.setProperty("bootstrap.servers", "192.168.38.147:9092")
    prop.setProperty("group.id", "flink")
    val kafkaSource = env.addSource(new FlinkKafkaConsumer011[String]("ssm", new SimpleStringSchema(), prop))
    val result = kafkaSource.map(data => {
      val arr = data.split(",")
      UVPV(arr(0), arr(1).toLong)
    })
    result.addSink(new MyJdbcSinkFunction())
    env.execute()
  }
}

class MyJdbcSinkFunction() extends RichSinkFunction[UVPV]{

  //定义连接,预编译语句
  var conn:Connection=_
  var insertStmt:PreparedStatement=_
  var updateStmt:PreparedStatement=_
  override def open(parameters: Configuration): Unit = {
    conn=DriverManager.getConnection("jdbc:mysql://ip地址:3306/数据库名称","用户名","密码")
    insertStmt=conn.prepareStatement("insert into uvpv(user_id,times) values (?,?)")
    updateStmt=conn.prepareStatement("update uvpv set times=? where user_id=?")
  }
  override def invoke(value: UVPV, context: SinkFunction.Context[_]): Unit = {
    updateStmt.setLong(1,value.times)
    updateStmt.setString(2,value.user_id)
    updateStmt.execute()

    if (updateStmt.getUpdateCount==0){
      insertStmt.setString(1,value.user_id)
      insertStmt.setLong(2,value.times)
      insertStmt.execute()
    }
  }
  override def close(): Unit = {

    insertStmt.close()
    updateStmt.close()
    conn.close()
  }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56

因为我用的Flink版本过低(1.10.2)并没有提供JdbcSink的包,所以需要自定义SinkFunction来连接Mysql进行交互。
jdbc如果看不懂的可以自行百度,或者说去小破站搜教程即可。

第四步 创建Mysql数据库和表

mysql -uroot -p000
create database test;
use test;
create table uvpv(user_id varchar(255) not null ,times int not null);
  • 1
  • 2
  • 3
  • 4

捋清楚执行顺序:

Flume采集–>Flink执行程序–>向Flume监听的文件加入数据—>Mysql查询表
第一步
第二步

第三步
第四步

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/weixin_40725706/article/detail/606397
推荐阅读
相关标签
  

闽ICP备14008679号