赞
踩
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _} import org.apache.flink.streaming.api.windowing.time.Time object SocketWindowWordCount { def main(args: Array[String]): Unit = { val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment val res: DataStream[String] = env.socketTextStream("localhost", 9998, '\n') val wordCounts = res .flatMap { w => w.split(",") } .map(w => WordWithCount(w, 1)) .keyBy("word") .timeWindow(Time.seconds(5), Time.seconds(1)) .sum("count") wordCounts.print() env.execute("SocketWindowWordCount") } }
import org.apache.flink.core.fs.FileSystem import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _} object TextWindowWordCount { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment // 设置并行度 env.setParallelism(2) val values = env.readTextFile("/Users/zhangzhiqiang/Documents/test_project/flinkdemo/data") values.print() val res = values.flatMap(_.split(",")) .filter(_.nonEmpty) .map((_, 1)) .keyBy(0) .sum(1) res.writeAsCsv("/Users/zhangzhiqiang/Documents/test_project/flinkdemo/data2", FileSystem.WriteMode.NO_OVERWRITE) env.execute() } }
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.streaming.api.scala._
object ReadCsvFile {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
// 直接将数据,转成Student(相当于Schema)
val values = env.readCsvFile[Student]("/Users/zhangzhiqiang/Documents/test_project/flinkdemo/data/1.csv")
values.print()
}
}
Student类
case class Student(name: String, age: Int, sex: String, id: String)
import org.apache.flink.api.scala.ExecutionEnvironment import org.apache.flink.table.api.TableEnvironment import org.apache.flink.table.api.scala._ import org.apache.flink.api.scala._ object ReadTableCsvFile { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) val input = env.readCsvFile[Student]("/Users/zhangzhiqiang/Documents/test_project/flinkdemo/data/1.csv") input.print() tableEnv.registerDataSet("student", input) val result = tableEnv.sqlQuery("select * from student") result.printSchema() result.toDataSet[Student].print() } }
或者做些转换,如:
import org.apache.flink.api.scala.{ExecutionEnvironment, _} import org.apache.flink.table.api.TableEnvironment import org.apache.flink.table.api.scala._ object ReadTableCsvFile2 { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment // 获取table env对象 val tableEnv = TableEnvironment.getTableEnvironment(env) // 读取数据 val input = env.readCsvFile[Student]("/Users/zhangzhiqiang/Documents/test_project/flinkdemo/data/1.csv") input.print() // 将DataSet转成Table对象 val table = tableEnv.fromDataSet(input) // 注册 Table tableEnv.registerTable("student", table) // sql 查询语句 val result = tableEnv.sqlQuery("select name,age,sex from student") result.printSchema() // 将数据转化输出 result.toDataSet[People].print() } }
People类
case class People(name: String, age: Int, sex: String)
import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _} import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010 object KafkaWordCountStreaming { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val properties = new Properties() properties.setProperty("bootstrap.servers", "localhost:9092") properties.setProperty("group.id", "test") val stream = env.addSource(new FlinkKafkaConsumer010[String]("test", new SimpleStringSchema(), properties)) stream.print() val result = stream.flatMap(x => x.split(",")) .map(x => (x, 1)).keyBy(0) .timeWindow(Time.seconds(10)) .sum(1) result.print() env.execute("KafkaWordCountStreaming") } }
import java.util.Properties import com.google.gson.Gson import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _} import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010 object KafkaJsonStreaming { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val p = new Properties() p.setProperty("bootstrap.servers", "localhost:9092") p.setProperty("group.id", "test") val stream = env.addSource(new FlinkKafkaConsumer010[String]("test", new SimpleStringSchema(), p)) stream.print() val result = stream.map { x => val g = new Gson() val people = g.fromJson(x, classOf[People]) people } result.print() env.execute("KafkaJsonStreaming") } }
import java.util.Properties import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer010, FlinkKafkaProducer010} import org.apache.flink.streaming.api.scala._ object KafkaToKafkaStreaming { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val p = new Properties() p.setProperty("bootstrap.servers", "localhost:9092") p.setProperty("group.id", "test") val input = env.addSource(new FlinkKafkaConsumer010[String]("test", new SimpleStringSchema(), p)) input.print() val p2 = new Properties() p2.setProperty("bootstrap.servers", "localhost:9092") p2.setProperty("zookeeper.connect", "localhost:2181") p2.setProperty("group.id", "test") input.addSink(new FlinkKafkaProducer010[String]("test", new SimpleStringSchema(), p2)) env.execute("KafkaToKafkaStreaming") } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。