当前位置:   article > 正文

Flink 简单入门示例_flink 简单程序示例

flink 简单程序示例

Flink 简单入门示例

1.Flink读取Socket流,实现Word Count示例
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
import org.apache.flink.streaming.api.windowing.time.Time

object SocketWindowWordCount {

  def main(args: Array[String]): Unit = {

    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    val res: DataStream[String] = env.socketTextStream("localhost", 9998, '\n')

    val wordCounts = res
      .flatMap { w => w.split(",") }
      .map(w => WordWithCount(w, 1))
      .keyBy("word")
      .timeWindow(Time.seconds(5), Time.seconds(1))
      .sum("count")

    wordCounts.print()

    env.execute("SocketWindowWordCount")
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
2.Flink读取Text文件,实现Word Count示例
import org.apache.flink.core.fs.FileSystem
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}

object TextWindowWordCount {
  def main(args: Array[String]): Unit = {

    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // 设置并行度
    env.setParallelism(2)

    val values = env.readTextFile("/Users/zhangzhiqiang/Documents/test_project/flinkdemo/data")

    values.print()

    val res = values.flatMap(_.split(","))
      .filter(_.nonEmpty)
      .map((_, 1))
      .keyBy(0)
      .sum(1)

    res.writeAsCsv("/Users/zhangzhiqiang/Documents/test_project/flinkdemo/data2", FileSystem.WriteMode.NO_OVERWRITE)

    env.execute()
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
3.Flink读取csv文件

import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.streaming.api.scala._


object ReadCsvFile {

  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    // 直接将数据,转成Student(相当于Schema)
    val values = env.readCsvFile[Student]("/Users/zhangzhiqiang/Documents/test_project/flinkdemo/data/1.csv")
    values.print()
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

Student类

case class Student(name: String, age: Int, sex: String, id: String)
  • 1
4.Flink读取csv文件,并使用Table sql转换
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._
import org.apache.flink.api.scala._

object ReadTableCsvFile {

  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment

    val tableEnv = TableEnvironment.getTableEnvironment(env)

    val input = env.readCsvFile[Student]("/Users/zhangzhiqiang/Documents/test_project/flinkdemo/data/1.csv")

    input.print()

    tableEnv.registerDataSet("student", input)

    val result = tableEnv.sqlQuery("select * from student")

    result.printSchema()

    result.toDataSet[Student].print()
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

或者做些转换,如:

import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._

object ReadTableCsvFile2 {

  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment

    // 获取table env对象
    val tableEnv = TableEnvironment.getTableEnvironment(env)

    // 读取数据
    val input = env.readCsvFile[Student]("/Users/zhangzhiqiang/Documents/test_project/flinkdemo/data/1.csv")

    input.print()

    // 将DataSet转成Table对象
    val table = tableEnv.fromDataSet(input)

    // 注册 Table
    tableEnv.registerTable("student", table)

    // sql 查询语句
    val result = tableEnv.sqlQuery("select name,age,sex from student")

    result.printSchema()

    // 将数据转化输出
    result.toDataSet[People].print()
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32

People类

case class People(name: String, age: Int, sex: String)
  • 1
5.Flink读取Kafka流,实现Word Count
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010

object KafkaWordCountStreaming {

  def main(args: Array[String]): Unit = {

    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "localhost:9092")
    properties.setProperty("group.id", "test")
    val stream = env.addSource(new FlinkKafkaConsumer010[String]("test", new SimpleStringSchema(), properties))

    stream.print()

    val result = stream.flatMap(x => x.split(","))
      .map(x => (x, 1)).keyBy(0)
      .timeWindow(Time.seconds(10))
      .sum(1)

    result.print()

    env.execute("KafkaWordCountStreaming")
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
6.Flink读取Kafka流,转换后输出
import java.util.Properties

import com.google.gson.Gson
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010

object KafkaJsonStreaming {

  def main(args: Array[String]): Unit = {

    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val p = new Properties()
    p.setProperty("bootstrap.servers", "localhost:9092")
    p.setProperty("group.id", "test")

    val stream = env.addSource(new FlinkKafkaConsumer010[String]("test", new SimpleStringSchema(), p))

    stream.print()

    val result = stream.map { x =>
      val g = new Gson()
      val people = g.fromJson(x, classOf[People])
      people
    }

    result.print()

    env.execute("KafkaJsonStreaming")
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
7.Flink读取Kafka流,并写入Kafka
import java.util.Properties

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer010, FlinkKafkaProducer010}
import org.apache.flink.streaming.api.scala._

object KafkaToKafkaStreaming {

  def main(args: Array[String]): Unit = {

    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val p = new Properties()
    p.setProperty("bootstrap.servers", "localhost:9092")
    p.setProperty("group.id", "test")

    val input = env.addSource(new FlinkKafkaConsumer010[String]("test", new SimpleStringSchema(), p))

    input.print()

    val p2 = new Properties()
    p2.setProperty("bootstrap.servers", "localhost:9092")
    p2.setProperty("zookeeper.connect", "localhost:2181")
    p2.setProperty("group.id", "test")
    input.addSink(new FlinkKafkaProducer010[String]("test", new SimpleStringSchema(), p2))

    env.execute("KafkaToKafkaStreaming")
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30

具体项目参见github

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/空白诗007/article/detail/900815
推荐阅读
相关标签
  

闽ICP备14008679号