赞
踩
1)从Collection接入数据
/** * 从集合中采集数据 */ object FromCollection { def main(args: Array[String]): Unit = { // 1. 环境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment // 2. source 集合 val stream: DataStream[SensorReading] = env.fromCollection(List( SensorReading("sensor_1", 1547718199, 35.80018327300259), SensorReading("sensor_6", 1547718201, 15.402984393403084), SensorReading("sensor_7", 1547718202, 6.720945201171228), SensorReading("sensor_10", 1547718205, 38.101067604893444) )) // 3. transformation // 4. sink stream.print("stream").setParallelism(1) // 5. execute env.execute("API Test") } } /** * * @param id 传感器ID * @param timeStamp 时间戳 * @param temperature 传感器温度 */ case class SensorReading(id: String, timeStamp: Long, temperature: Double)
2)从Text文本接入数据
object FromText { def main(args: Array[String]): Unit = { // 1. 环境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment // 2. source Text val stream: DataStream[String] = env.readTextFile("data/words") // 3. transformation // 4. sink stream.print("stream").setParallelism(1) // 5. execute env.execute("API Test") } }
3)从Kafka接入数据
/** * 从集合中采集数据 */ object FromCollection { def main(args: Array[String]): Unit = { // 1. 环境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment // 2. source 集合 val stream: DataStream[SensorReading] = env.fromCollection(List( SensorReading("sensor_1", 1547718199, 35.80018327300259), SensorReading("sensor_6", 1547718201, 15.402984393403084), SensorReading("sensor_7", 1547718202, 6.720945201171228), SensorReading("sensor_10", 1547718205, 38.101067604893444) )) // 3. transformation // 4. sink stream.print("stream").setParallelism(1) // 5. execute env.execute("API Test") } } /** * * @param id 传感器ID * @param timeStamp 时间戳 * @param temperature 传感器温度 */ case class SensorReading(id: String, timeStamp: Long, temperature: Double)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。