赞
踩
env.readTextFile("").print()
[root@master ~]# kafka-topics.sh --create --topic raytek --partitions 3 --replication-factor 3 --zookeeper master:2181
Created topic "raytek".
[root@master ~]# kafka-topics.sh --list --zookeeper master:2181
__consumer_offsets
gamelog
myItems_topic5
raytek
test
import java.util.Properties import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer object DataFromKafKa { def main(args: Array[String]): Unit = { //环境 val env = StreamExecutionEnvironment.getExecutionEnvironment //配置kafka相关参数 val props = new Properties() props.setProperty("bootstrap.servers", "master:9092,slave1:9092,slave2:9092") props.setProperty("zookeeper.connect", "master:2181,slave1:2181,slave2:2181") props.setProperty("group.id", "group01") props.setProperty("auto.offset.reset", "latest") //定义kafka源 env.addSource(new FlinkKafkaConsumer[String]("raytek", new SimpleStringSchema(), props)) .print() //触发启动 env.execute(this.getClass.getSimpleName) } }
编写完成后运行等待生产者生产数据
[root@master config]# kafka-console-producer.sh --topic raytek --broker-list master:9092
>raytek_3,36.389,john,1582641123,北京西站-地铁站
raytek_1,36.3,jack,1582641121,北京西站-北广场
raytek_3,36.389,john,1582641123,北京西站-地铁站
raytek_9,37.3,tom,1582641124,北京西站-公交车站
raytek_2,35.4,leon,1582641127,北京西站北广场3号停车场
raytek_3,36.4,kate,1582641128,北京西站-东5号停车场
raytek_3,36.389,alice,1582641129,北京西站南广场-公交站
import com.yuanhanhan.entity.Raytek import org.apache.flink.streaming.api.functions.source.SourceFunction import scala.io.Source class MySource extends SourceFunction[Raytek]{ //计数 var cnt = 0; //控制运行标记变量 var isRunning = true; //自定义source逻辑 体温大于37度的输出 override def run(sourceContext: SourceFunction.SourceContext[Raytek]): Unit = { //读取文件 val list = Source.fromFile("input/raytek.log").getLines().toList //过滤数据 while(cnt < list.size && isRunning){ val perEle = list(cnt) val arr = perEle.split(",") val id = arr(0).trim val temperature = arr(1).trim.toDouble val name = arr(2).trim val timestamp = arr(3).trim.toLong val location = arr(4).trim val instance = Raytek(id, temperature, name, timestamp, location) if(temperature > 37){ sourceContext.collect(instance) } cnt += 1 } } override def cancel(): Unit = { isRunning = false } }
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment object ReadDataFromMySource { def main(args: Array[String]): Unit = { //环境 val env = StreamExecutionEnvironment.getExecutionEnvironment import org.apache.flink.api.scala._ //读取自定义数据源 env.addSource(new MySource).print() //启动 env.execute(this.getClass.getSimpleName) } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。