赞
踩
事件驱动(Event-driven)
跟随当前时间点上出现的事件,调动可用资源,执行相关任务,使不断出现的问题得以解决
基于流处理
一切皆由流组成,离线数据是有界的流;实时数据是一个没有界限的流。(有界流、无界流)
分层API
其他特点:
pom依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.12.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>1.12.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>1.12.1</version>
</dependency>
代码实现
// 创建执行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// 从数据源中读取数据
String inputPath = "/tmp/Flink_Tutorial/src/main/resources/hello.txt";0
//流处理
DataStream<String> inputDataStream = env.readTextFile(inputPath);
// 基于数据流进行转换计算
DataStream<Tuple2<String,Integer>> resultStream = inputDataStream.flatMap(new WordCount.MyFlatMapper())
.keyBy(item->item.f0)
.sum(1);
// 输出处理结果
resultSet.print();
// 执行流处理任务
env.execute();
//批处理
DataSet<String> inputDataSet = env.readTextFile(inputPath);
// 对数据集进行处理
DataSet<Tuple2<String, Integer>> resultSet = inputDataSet.flatMap(new MyFlatMapper())
.groupBy(0)//批处理
.sum(1);
// 自定义类,实现FlatMapFunction接口
public static class MyFlatMapper implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String s, Collector<Tuple2<String, Integer>> out) throws Exception {
// 按空格分词
String[] words = s.split(" ");
// 遍历所有word,包成二元组输出
for (String str : words) {
out.collect(new Tuple2<>(str, 1));
}
}
}
独立模式。
启动命令:bin/flink run -c <入口类> -p <并行度> <jar包路径> <启动参数>
访问 http://localhost:8081 可以对 flink 集群和任务进行监控管理。
以 Yarn 模式部署 Flink 任务时,要求 Flink 是有 Hadoop 支持的版本,Hadoop环境需要保证版本在 2.2 以上,并且集群中安装有 HDFS 服务。
Flink提供了两种在yarn上运行的模式,分别为Session-Cluster和Per-Job-Cluster模式。
在 yarn 中初始化一个 flink 集群,这个 flink 集群会常驻在 yarn 集群中,除非手工停止;并开辟指定的资源,用于任务的提交执行,申请的资源永远保持不变,如果资源满了,下一个作业就无法提交,得等其他作业执行完成后,释放了资源,下个作业才会正常提交。适合规模小执行时间短的作业。
**每提交一个Job都会创建一个新的 flink 集群,任务之间互相独立,互不影响,任务执行完成之后创建的集群也会消失。**适合规模大长时间运行的作业。
Flink运行时架构主要包括四个不同的组件,它们会在运行流处理应用程序时协同工作:
也叫Master,接收Job作业,协调任务调度、资源分配、分布式计算,协调checkpoints,错误调度等。
JobManager会把JobGraph转换成一个物理层面的数据流图,即“执行图”(ExecutionGraph,包含了所有可以并发执行的任务)
JobManager会向资源管理器(ResourceManager)请求执行任务必要的资源(TaskManager的slot)。一旦它获取到了足够的资源,就会将执行图分发到真正运行它们的TaskManager上,并在运行过程中,负责所有需要中央协调的操作。
Flink集群至少需要一个 JobManager,部署多个JobManager时,只有一个leader,其他都是standby模式
主要负责管理TaskManager的slot资源,包括接收JobManager的资源申请、分配空闲的slot给JobManager、终止空闲的TaskManger等。
如果ResourceManager没有足够的插槽来满足JobManager的请求,它还可以向资源提供平台发起会话,以提供启动TaskManager进程的容器。
Flink为不同的环境和资源管理工具提供了不同资源管理器,比如YARN、Mesos、K8s,以及standalone部署。
也叫Worker,Flink中的工作进程。每一个TaskManager都有一定数量的插槽(slots)。插槽平分TaskManager的内存,其数量限制了TaskManager能够执行的任务数量。
启动之后,TaskManager会向资源管理器注册它的插槽;收到资源管理器的指令后,TaskManager就会将一个或者多个插槽提供给JobManager调用。
在执行过程中,同一个Job的不同TaskManager间可以交换数据。
可以跨作业运行,它为应用提交提供了REST接口。
当一个应用被提交执行时,分发器就会启动并将应用移交给一个JobManager。由于是REST接口,所以Dispatcher可以作为集群的一个HTTP接入点,这样就能够不受防火墙阻挡。Dispatcher也会启动一个Web UI,用来方便地展示和监控作业执行的信息。
Dispatcher在架构中可能并不是必需的,这取决于应用提交运行的方式。
以下是yarn模式:
同一个SlotGroup的算子能共享同一个slot。
同一个算子(并行度不为1)的子算子不能共享slot。
在代码中通过算子的.slotSharingGroup("组名")
可以指定算子所在的Slot组名。
默认的SlotGroup就是"default"。
每个算子的SlotGroup默认和上一个算子相同。
利用SlotGroup控制一个slot保存作业的整个管道的好处:
- 省去跨slot、跨TaskManager的通信损耗(降低了并行度)
- 执行健壮性更高,若某些slot执行出异常也能有其他slot补上。
- 避免不同slot的CPU资源分配不均,有些slot分配到的子任务非CPU密集型,有些则CPU密集型,如果每个slot只完成自己的子任务,将出现某些slot太闲,某些slot过忙的现象。
实际运行Job时所需的Slot总数 = 每个Slot组中的最大并行度。
一个特定算子的 子任务(subtask)的个数被称之为其并行度(parallelism)。
①对单独的每个算子进行设置并行度,
②用env设置全局的并行度
③在启动命令中设置
④在flink配置文件中设置
优先级①>②>③>④
开发环境的并行度默认就是计算机的CPU逻辑核数
在运行时,Flink上运行的程序会被映射成“逻辑数据流”(dataflows),它包含了这三部分: Source 、Transformation 和 Sink
dataflow类似于任意的有向无环图(DAG),每一个dataflow以一个或多个sources开始以一个或多个sinks结束。
在大部分情况下,程序中的转换运算(transformations)跟dataflow中的算子(operator)是一一对应的关系
Flink 中的执行图可以分成四层:StreamGraph -> JobGraph -> ExecutionGraph -> 物理执行图。
StreamGraph:也被称为逻辑流图,由Flink程序直接映射而成,是根据用户通过Stream API 编写的代码生成的最初的图。用来表示程序的拓扑结构。
JobGraph:是对StreamGraph优化后生成的,提交给JobManager 的数据结构。主要的优化为,将多个符合条件的节点chain 在一起作为一个节点,这样可以减少数据在节点之间流动所需要的序列化/反序列化/传输消耗。
ExecutionGraph:JobManager 根据JobGraph 生成的,是JobGraph的并行化版本。
物理执行图:不是一个具体的数据结构,是JobManager 根据ExecutionGraph 对Job 进行调度后,在各个TaskManager 上部署Task 后形成的“图”。
由于不同的算子可能具有不同的并行度,算子之间传输数据的形式可以是 one-to-one (forwarding) 的模式也可以是redistributing 的模式
flink程序提交给JobManager前会将StreamGraph转换为JobGraph,涉及到任务链的生成,即将多个算子通过本地转发(local forward)的方式进行连接。
算子并行度相同、并且是 one-to-one 操作
创建一个执行环境,表示当前执行程序的上下文。
有三种创建方式:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("jobmanage-hostname", 6123,"YOURPATH//WordCount.jar");
DataStream<SensorReading> sensorDataStream = env.fromCollection(
Arrays.asList(
new SensorReading("sensor_1", 1547718199L, 35.8),
new SensorReading("sensor_6", 1547718201L, 15.4),
new SensorReading("sensor_7", 1547718202L, 6.7),
new SensorReading("sensor_10", 1547718205L, 38.1)
)
);
env.readTextFile("YOUR_FILE_PATH ");
env.addSource( new FlinkKafkaConsumer011<String>("sensor", new SimpleStringSchema(), properties));
需要实现SourceFunction 或者继承SourceFunction的富函数RichSourceFunction
:env.addSource( new SourceFunction(){xxx});
DataStream<Tuple2<Integer, Integer>> inputStream= env.addSource(new RandomFibonacciSource());
private static class RandomFibonacciSource implements SourceFunction<Tuple2<Integer, Integer>> {
private static final long serialVersionUID = 1L;
private Random rnd = new Random();
private int counter = 0;
private volatile boolean isRunning = true;
@Override
public void run(SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception {
while (isRunning && counter < BOUND) {
int first = rnd.nextInt(BOUND / 2 - 1) + 1;
int second = rnd.nextInt(BOUND / 2 - 1) + 1;
ctx.collect(new Tuple2<>(first, second));
counter++;
Thread.sleep(50L);
}
}
@Override
public void cancel() {
isRunning = false;
}
}
把数组流中的每一个值,使用所提供的函数执行一遍,得到元素个数相同的数组流。
DataStream<Integer> mapStram = dataStream.map(new MapFunction<String, Integer>() {
public Integer map(String value) throws Exception {
return value.length();
}
});
把数组流中的每一个值,使用所提供的函数执行一遍,执行结果也是个数组流,并将这些数组拍平合并,得到元素不一定相同的数组流。
DataStream<String> flatMapStream = dataStream.flatMap(new FlatMapFunction<String,
String>() {
public void flatMap(String value, Collector<String> out) throws Exception {
String[] fields = value.split(",");
for( String field: fields )
out.collect(field);
}
});
DataStream<Interger> filterStream = dataStream.filter(new FilterFunction<String>(){
public boolean filter(String value) throws Exception {
return value == 1;
}
});
DataStream -> KeyedStream:逻辑地将一个流拆分成不相交的分区,每个分区包含具有相同key的元素,在内部以hash的形式实现的。
1、KeyBy会重新分区; 2、不同的key有可能分到一起,因为是通过hash原理实现的;
这些算子可以针对KeyedStream的每一个支流做聚合。
都是对支流聚合后取最大值,区别在于,max()只取当前比较的字段的最大值,例如一条数据有a b c三个字段,max(a)会将当前数据的a字段更新为最大值,bc不变,然后返回;而maxBy(a),会将当前数据替换为a字段最大的那条数据,也就是bc也可能变化
KeyedStream → DataStream:一个分组数据流的聚合操作,合并当前的元素 和上次聚合的结果,产生一个新的值,返回的流中包含每一次聚合的结果。
需要实现ReduceFunction
函数式接口。如:
keyedStream.reduce((curSensor,newSensor)->new SensorReading(curSensor.getId(),newSensor.getTimestamp(), Math.max(curSensor.getTemperature(), newSensor.getTemperature())));
其中curSensor表示上一条数据reduce的结果,newSensor当前数据
注:新版Flink已经不存在Split和Select这两个API了(至少Flink1.12.1没有!)
// 1. 分流,按照温度值30度为界分为两条流
SplitStream<SensorReading> splitStream = dataStream.split(new OutputSelector<SensorReading>() {
@Override
public Iterable<String> select(SensorReading value) {
return (value.getTemperature() > 30) ? Collections.singletonList("high") : Collections.singletonList("low");
}
});
DataStream<SensorReading> highTempStream = splitStream.select("high");
DataStream<SensorReading> allTempStream = splitStream.select("high", "low");
// 2. 将高温流转换成二元组类型
DataStream<Tuple2<String, Double>> warningStream = highTempStream.map(new MapFunction<SensorReading, Tuple2<String, Double>>() {
@Override
public Tuple2<String, Double> map(SensorReading value) throws Exception {
return new Tuple2<>(value.getId(), value.getTemperature());
}
});
// 2. 合流 connect
ConnectedStreams<Tuple2<String, Double>, SensorReading> connectedStreams = warningStream.connect(lowTempStream);
DataStream<Object> resultStream = connectedStreams.map(new CoMapFunction<Tuple2<String, Double>, SensorReading, Object>() {
@Override
public Object map1(Tuple2<String, Double> value) throws Exception {
return new Tuple3<>(value.f0, value.f1, "high temp warning");
}
@Override
public Object map2(SensorReading value) throws Exception {
return new Tuple2<>(value.getId(), "normal");
}
});
DataStream -> DataStream:对两个或者两个以上的DataStream进行Union操作,产生一个包含多有DataStream元素的新DataStream。
Flink使用类型信息的概念来表示数据类型,并为每个数据类型生成特定的序列化器、反序列化器和比较器。
Flink还具有一个类型提取系统,该系统分析函数的输入和返回类型,以自动获取类型信息,从而获得序列化器和反序列化器。但是,在某些情况下,例如lambda函数或泛型类型,需要显式地提供类型信息,才能使应用程序正常工作或提高其性能。
Flink支持Java和Scala中所有常见数据类型。使用最广泛的类型有以下几种。
Flink支持所有的Java和Scala基础数据类型,Int, Double, Long, String, …
java的元组类型由Flink的包提供,默认提供Tuple0~Tuple25
java的POJO这里要求必须提供无参构造函数
成员变量要求都是public(或者private但是提供get、set方法)
User Defined Function即用户自定义函数。
Flink暴露了所有UDF函数的接口(实现方式为接口或者抽象类)。例如MapFunction, FilterFunction, ProcessFunction等等。
也就是说,用户可以自定义函数内容,以实现流操作。
可以自定义参数传进去。示例:
DataStream<String> tweets = env.readTextFile("INPUT_FILE ");
DataStream<String> flinkTweets = tweets.filter(new KeyWordFilter("flink"));
public static class KeyWordFilter implements FilterFunction<String> {
private String keyWord;
KeyWordFilter(String keyWord) {
this.keyWord = keyWord;
}
@Override public boolean filter(String value) throws Exception {
return value.contains(this.keyWord);
}
}
DataStream<String> tweets = env.readTextFile("INPUT_FILE");
DataStream<String> flinkTweets = tweets.filter( tweet -> tweet.contains("flink") );
“富函数”是DataStream API提供的一个函数类的接口,所有Flink函数类都有其Rich版本。
它与常规函数的不同在于,可以获取运行环境的上下文,并拥有一些生命周期方法,所以可以实现更复杂的功能。
Rich Function有一个生命周期的概念。典型的生命周期方法有:
// 实现自定义富函数类(RichMapFunction是一个抽象类)
public static class MyMapper extends RichMapFunction<SensorReading, Tuple2<String, Integer>> {
@Override
public Tuple2<String, Integer> map(SensorReading value) throws Exception {
// RichFunction可以获取State状态
// getRuntimeContext().getState();
return new Tuple2<>(value.getId(), getRuntimeContext().getIndexOfThisSubtask());
}
@Override
public void open(Configuration parameters) throws Exception {
// 初始化工作,一般是定义状态,或者建立数据库连接
System.out.println("open");
}
@Override
public void close() throws Exception {
// 一般是关闭连接和清空状态的收尾操作
System.out.println("close");
}
}
}
keyBy():基于key的hash值进行分区
broadcase():广播,将消息广播给所有下游算子
shuffle():洗牌,将消息随机发送给下游算子
forward():直通,只在当前分区做计算
rebalance():以轮询的方式均匀分发给下游算子
rescale():类似rebalance,但是是分组的rebalance,如上游两个算子,下游四个算子,则将下游分为两组,上游的算子分别对一组进行轮询分发
global():固定将数据发送给下游的第一个分区
partitioncustom():自定义分区
public class TransformTest6_Partition {
public static void main(String[] args) throws Exception{
// 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置并行度 = 4
env.setParallelism(4);
// 从文件读取数据
DataStream<String> inputStream = env.readTextFile("/tmp/Flink_Tutorial/src/main/resources/sensor.txt");
// 转换成SensorReading类型
DataStream<SensorReading> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
});
// SingleOutputStreamOperator多并行度默认就rebalance,轮询方式分配
dataStream.print("input");
// 1. shuffle (并非批处理中的获取一批后才打乱,这里每次获取到直接打乱且分区)
DataStream<String> shuffleStream = inputStream.shuffle();
shuffleStream.print("shuffle");
// 2. keyBy (Hash,然后取模)
dataStream.keyBy(SensorReading::getId).print("keyBy");
// 3. global (直接发送给第一个分区,少数特殊情况才用)
dataStream.global().print("global");
env.execute();
}
}
flink的所有对外的输出操作都要利用 Sink 完成:stream.addSink(new MySink(xxxx))
官方提供了一部分的框架的 sink,用户也可以自定义实现 sink。
实现RichSinkFunction
接口
dataStream.addSink(new MyJdbcSink());
// 实现自定义的SinkFunction
public static class MyJdbcSink extends RichSinkFunction<SensorReading> {
// 声明连接和预编译语句
Connection connection = null;
PreparedStatement insertStmt = null;
PreparedStatement updateStmt = null;
@Override
public void open(Configuration parameters) throws Exception {
connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/flink_test?useUnicode=true&serverTimezone=Asia/Shanghai&characterEncoding=UTF-8&useSSL=false", "root", "example");
insertStmt = connection.prepareStatement("insert into sensor_temp (id, temp) values (?, ?)");
updateStmt = connection.prepareStatement("update sensor_temp set temp = ? where id = ?");
}
// 每来一条数据,调用连接,执行sql
@Override
public void invoke(SensorReading value, Context context) throws Exception {
// 直接执行更新语句,如果没有更新那么就插入
updateStmt.setDouble(1, value.getTemperature());
updateStmt.setString(2, value.getId());
updateStmt.execute();
if (updateStmt.getUpdateCount() == 0) {
insertStmt.setString(1, value.getId());
insertStmt.setDouble(2, value.getTemperature());
insertStmt.execute();
}
}
@Override
public void close() throws Exception {
insertStmt.close();
updateStmt.close();
connection.close();
}
}
dataStream.addSink( new FlinkKafkaProducer<String>("localhost:9092", "sinktest", new SimpleStringSchema()));
// 定义jedis连接配置(我这里连接的是docker的redis)
FlinkJedisPoolConfig config = new FlinkJedisPoolConfig.Builder()
.setHost("localhost")
.setPort(6379)
.setPassword("123456")
.setDatabase(0)
.build();
dataStream.addSink(new RedisSink<>(config, new MyRedisMapper()));
// 自定义RedisMapper
public static class MyRedisMapper implements RedisMapper<SensorReading> {
// 定义保存数据到redis的命令,存成Hash表,hset sensor_temp id temperature
@Override
public RedisCommandDescription getCommandDescription() {
return new RedisCommandDescription(RedisCommand.HSET, "sensor_temp");
}
@Override
public String getKeyFromData(SensorReading data) {
return data.getId();
}
@Override
public String getValueFromData(SensorReading data) {
return data.getTemperature().toString();
}
}
dataStream.addSink( new ElasticsearchSink.Builder<SensorReading>(httpHosts, new MyEsSinkFunction()).build());
// 实现自定义的ES写入操作
public static class MyEsSinkFunction implements ElasticsearchSinkFunction<SensorReading> {
@Override
public void process(SensorReading element, RuntimeContext ctx, RequestIndexer indexer) {
// 定义写入的数据source
HashMap<String, String> dataSource = new HashMap<>();
dataSource.put("id", element.getId());
dataSource.put("temp", element.getTemperature().toString());
dataSource.put("ts", element.getTimestamp().toString());
// 创建请求,作为向es发起的写入命令(ES7统一type就是_doc,不再允许指定type)
IndexRequest indexRequest = Requests.indexRequest()
.index("sensor")
.source(dataSource);
// 用index发送请求
indexer.add(indexRequest);
}
}
window是一种切割无限数据为有限块进行处理的手段,将一个无限的stream拆分成有限大小的”buckets”桶,我们可以在这些桶上做计算操作。
无限数据集是指一种不断增长的本质上无限的数据集。
Flink 默认的时间窗口根据 Processing Time 进行窗口的划分。
时间窗口(Time Window):按时间生成窗口
滚动时间窗口(Tumbling Windows):按固定窗口长度对数据做切割,可以看做是滑动窗口的一种特殊情况(即窗口大小和滑动间隔相等)
特点:时间对齐,窗口长度固定,没有重叠。
适用场景:适合做 BI 统计等(做每个时间段的聚合计算)。
滑动时间窗口(Sliding Windows):每隔一个滑动间隔时间,对过去固定窗口长度的数据做切割
会话窗口(Session Windows):由一系列事件组合一个指定时间长度的timeout间隙组成,也就是一段时间没有接收到新数据就会生成新的窗口,配置 session 间隔以定义非活跃周期的长度,当收不到数据的时间达到session间隔,则关闭当前session,后续数据会分配到新session窗口
计数窗口(Count Window):按数据数量生成窗口
window()
方法.timeWindow()
和.countWindow()
方法,用于定义时间窗口和计数窗口。.windowAll()
,其他窗口定义方法必须在keyBy之后才能使用滚动窗口:.timeWindow( Time.seconds(15) )
滑动窗口:.timeWindow( Time.seconds(15), Time.seconds(5) ) ,参数一个是window_size,一个是sliding_size。
DataStream<Tuple2<String, Double>> minTempPerWindowStream = dataStream
.map(xxx)
.keyBy(data -> data.f0)
.timeWindow( Time.seconds(15) )
.minBy(1);
时间间隔可以通过Time.milliseconds(x)
,Time.seconds(x)
,Time.minutes(x)
等其中的一个来指定。
滚动窗口:.countWindow( long)
滑动窗口:.countWindow( long, long ) ,参数一个是window_size,一个是sliding_size。
CountWindow 的 window_size 指的是相同 Key 的元素的个数,不是输入的所有元素的总数。
.window(EventTimeSessionWindows.withGap(Time.minutes(10)))
function例子:http://www.javashuo.com/article/p-ftiqugmc-eu.html
window function 定义了要对窗口中收集的数据做的计算操作,在window开窗操作后调用
主要可以分为两类:
增量聚合函数:每条数据到来就进行计算,保持一个简单的状态。典型的增量聚合函数有ReduceFunction, AggregateFunction。
.window(EventTimeSessionWindows.withGap(Time.minutes(10)))
,保持一个简单的状态。典型的增量聚合函数有ReduceFunction, AggregateFunction。
全窗口函数:先把窗口所有数据收集起来,等到计算的时候会遍历所有数据。WindowFunction和ProcessWindowFunction
// 1. 增量聚合函数 (这里简单统计每个key组里传感器信息的总数)
DataStream<Integer> resultStream = dataStream.keyBy("id")
// .timeWindow(Time.seconds(15)) // 已经不建议使用@Deprecated
.window(TumblingProcessingTimeWindows.of(Time.seconds(15)))
.aggregate(new AggregateFunction<SensorReading, Integer, Integer>() {
// 新建的累加器
@Override
public Integer createAccumulator() {
return 0;
}
// 每个数据在上次的基础上累加
@Override
public Integer add(SensorReading value, Integer accumulator) {
return accumulator + 1;
}
// 返回结果值
@Override
public Integer getResult(Integer accumulator) {
return accumulator;
}
// 分区合并结果(TimeWindow一般用不到,SessionWindow可能需要考虑合并)
@Override
public Integer merge(Integer a, Integer b) {
return a + b;
}
});
resultStream.print("result");
env.execute();
new AggregateFunction<UserActionLog, Tuple2<Long,Long>, Double>() {
// 一、初始值
// 定义累加器初始值
@Override
public Tuple2<Long, Long> createAccumulator() {
return new Tuple2<>(0L,0L);
}
// 二、累加
// 定义累加器如何基于输入数据进行累加
@Override
public Tuple2<Long, Long> add(UserActionLog value, Tuple2<Long, Long> accumulator) {
accumulator.f0 += 1;
accumulator.f1 += value.getProductPrice();
return accumulator;
}
// 三、合并
// 定义累加器如何和State中的累加器进行合并
@Override
public Tuple2<Long, Long> merge(Tuple2<Long, Long> acc1, Tuple2<Long, Long> acc2) {
acc1.f0+=acc2.f0;
acc1.f1+=acc2.f1;
return acc1;
}
// 四、输出
// 定义如何输出数据
@Override
public Double getResult(Tuple2<Long, Long> accumulator) {
return accumulator.f1 / (accumulator.f0 * 1.0);
}
}
.trigger() —— 触发器
定义 window 什么时候关闭,触发计算并输出结果
.evitor() —— 移除器
定义移除某些数据的逻辑
.allowedLateness() —— 允许处理迟到的数据
.sideOutputLateData() —— 将迟到的数据放入侧输出流
.getSideOutput() —— 获取侧输出流
flink默认使用ProcessingTime
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
Flink对于迟到数据有三层保障,先来后到的保障顺序是:
前言:流处理从事件产生,到流经source,再到operator,有可能由于网络、分布式等原因,导致Flink接收到的事件的先后顺序与事件的Event Time顺序不同,此时若使用了Event Time语义,则不能明确数据是否全部到位,但又不能无限期的等下去,于是有了Watermark。
Watermark = maxEventTime-延迟时间t
,最常见的引用方式如下:
dataStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<SensorReading>(Time.seconds(2)) {
@Override
public long extractTimestamp(SensorReading element) {
return element.getTimestamp() * 1000L;
}
});
BoundedOutOfOrdernessTimestampExtractor是AssignerWithPeriodicWatermarks
的实现类,还有一个接口AssignerWithPunctuatedWatermarks
,这两个接口都可以自定义如何从事件数据中抽取时间戳。
AssignerWithPeriodicWatermarks
ExecutionConfig.setAutoWatermarkInterval()
方法进行设置AssignerWithPunctuatedWatermarks
一般认为Watermark的设置代码,在里Source步骤越近的地方越合适。
即使如此,依然可能会有些事件数据在 Watermark 之后到达,这时可以用Late Elements 处理。
dataStream
.keyBy(SensorReading::getId)
.map(new MyMapper());
// 自定义map富函数,测试 键控状态
public static class MyMapper extends RichMapFunction<SensorReading,Integer>{
private ValueState<Integer> valueState;
// 其它类型状态的声明
private ListState<String> myListState;
private MapState<String, Double> myMapState;
private ReducingState<SensorReading> myReducingState;
@Override
public void open(Configuration parameters) throws Exception {
valueState = getRuntimeContext().getState(new ValueStateDescriptor<Integer>("my-int", Integer.class));
myListState = getRuntimeContext().getListState(new ListStateDescriptor<String>("my-list", String.class));
myMapState = getRuntimeContext().getMapState(new MapStateDescriptor<String, Double>("my-map", String.class, Double.class));
// myReducingState = getRuntimeContext().getReducingState(new ReducingStateDescriptor<SensorReading>())
}
// 这里就简单的统计每个 传感器的 信息数量
@Override
public Integer map(SensorReading value) throws Exception {
// 其它状态API调用
// list state
for(String str: myListState.get()){
System.out.println(str);
}
myListState.add("hello");
// map state
myMapState.get("1");
myMapState.put("2", 12.3);
myMapState.remove("2");
// reducing state
// myReducingState.add(value);
myMapState.clear();
Integer count = valueState.value();
// 第一次获取是null,需要判断
count = count==null?0:count;
++count;
valueState.update(count);
return count;
}
}
// 1. 状态后端配置
env.setStateBackend(new MemoryStateBackend());
env.setStateBackend(new FsStateBackend("checkpointDataUri"));
// 这个需要另外导入依赖
env.setStateBackend(new RocksDBStateBackend("checkpointDataUri"));
一致性实际上是"正确性级别"的另一种说法,即成功处理故障并恢复之后得到的结果,与没有发生故障时得到的结果相比,到底有多正确。
at-most-once: 最多一次,这其实是没有正确性保障的委婉说法——故障发生之后,计数结果可能丢失。
at-least-once: 至少一次,这表示计数程序在发生故障后可能多算,但是绝不会少算。
exactly-once: 精确一次,保证在发生故障后得到的计数结果与正确值一致。
最先保证 exactly-once 的系统(Storm Trident 和 Spark Streaming)在性能和表现力这两个方面付出了很大的代价。它们按批处理,保证对每一批的处理要么全部成功,要么全部失败。这就导致在得到结果前,必须等待一批记录处理结束。因此,用户经常不得不使用两个流处理框架(一个用来保证 exactly-once,另一个用来对每个元素做低延迟处理),结果使基础设施更加复杂。
Flink 的一个重大价值在于, 它既保证了 exactly-once ,也具有低延迟和高吞吐力 的处理能力
端到端的保障指的是在整个数据处理管道上结果都是正确的。在每个组件都提供自身的保障情况下,整个处理管道上端到端的保障会受制于保障最弱的那个组件。
2PC 对外部 sink 系统的要求
• 外部 sink 系统必须提供事务支持,或者 sink 任务必须能够模拟外部系统上的事务
• 在 checkpoint 的间隔期间里,必须能够开启一个事务并接受数据写入
• 在收到 checkpoint 完成的通知之前,事务必须是“等待提交”的状态。在故障恢复的情况下,这可能需要一些时间。如果这个时候sink系统关闭事务(例如超时了),那么未提交的数据就会丢失
• sink 任务必须能够在进程失败后恢复事务
• 提交事务必须是幂等操作
Flink 检查点的核心作用是确保状态正确,即使遇到程序中断,也要正确。
checkpoint使 Flink 可以保证 exactly-once,又不需要牺牲性能。
checkpoint其实就是**所有任务的状态在某个时间点的一份拷贝;**这个时间点,应该是所有任务都恰好处理完一个相同的输入数据的时候
在JobManager中也有个Chechpoint的指针,指向了仓库的状态快照的一个拓扑图,为以后的数据故障恢复做准备
Flink 检查点算法的正式名称是异步分界线快照(asynchronous barrier snapshotting),有个核心概念Barrier(屏障),由算子处理,但是不参与计算,而是会触发与检查点相关的行为:
状态的存储、访问以及维护,由一个可插入的组件决定,这个组件就叫做状态后端( state backend)
状态后端主要负责两件事:本地状态管理,以及将检查点(checkPoint)状态写入远程存储
CheckPoint为自动保存,SavePoint为手动保存
public class StateTest4_FaultTolerance {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 1. 状态后端配置
env.setStateBackend(new MemoryStateBackend());
env.setStateBackend(new FsStateBackend(""));
// 这个需要另外导入依赖
env.setStateBackend(new RocksDBStateBackend(""));
// 2. 检查点配置 (每300ms让jobManager进行一次checkpoint检查)
env.enableCheckpointing(300);
// 高级选项
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
//Checkpoint的处理超时时间
env.getCheckpointConfig().setCheckpointTimeout(60000L);
// 最大允许同时处理几个Checkpoint(比如上一个处理到一半,这里又收到一个待处理的Checkpoint事件)
env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);
// 与上面setMaxConcurrentCheckpoints(2) 冲突,这个时间间隔是 当前checkpoint的处理完成时间与接收最新一个checkpoint之间的时间间隔
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(100L);
// 如果同时开启了savepoint且有更新的备份,是否倾向于使用更老的自动备份checkpoint来恢复,默认false
env.getCheckpointConfig().setPreferCheckpointForRecovery(true);
// 最多能容忍几次checkpoint处理失败(默认0,即checkpoint处理失败,就当作程序执行异常)
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(0);
// 3. 重启策略配置
// 固定延迟重启(最多尝试3次,每次间隔10s)
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 10000L));
// 失败率重启(在10分钟内最多尝试3次,每次至少间隔1分钟)
env.setRestartStrategy(RestartStrategies.failureRateRestart(3, Time.minutes(10), Time.minutes(1)));
// socket文本流
DataStream<String> inputStream = env.socketTextStream("localhost", 7777);
// 转换成SensorReading类型
DataStream<SensorReading> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
});
dataStream.print();
env.execute();
}
}
Process Function 用来构建事件驱动的应用以及实现自定义的业务逻辑(使用之前的window 函数和转换算子无法实现),可以访问到事件的时间戳信息和水位线信息
使用方法:.process( xxx )
所有的Process Function都有open()
、close()
和getRuntimeContext()
等方法。
Flink提供了8个Process Function:
用于KeyedStream,会处理流的每一个元素,输出为0个、1个或者多个元素。
特有方法:
processElement(I value, Context ctx, Collector<O> out)
,流中的每一个元素都会调用这个方法,调用结果将会放在Collector数据类型中输出。Context可以访问元素的时间戳,元素的 key ,以及TimerService 时间服务。 Context 还可以将结果输出到别的流(side outputs)。onTimer(long timestamp, OnTimerContext ctx, Collector<O> out)
,是一个回调函数。当之前注册的定时器触发时调用。参数timestamp 为定时器所设定的触发的时间戳。Collector 为输出结果的集合。OnTimerContext和processElement的Context 参数一样,提供了上下文的一些信息,例如定时器触发的时间信息(事件时间或者处理时间)。dataStream.keyBy("id").process( new MyProcess() ).print();
// 实现自定义的处理函数
public static class MyProcess extends KeyedProcessFunction<Tuple, SensorReading, Integer> {
ValueState<Long> tsTimerState;
@Override
public void open(Configuration parameters) throws Exception {
tsTimerState = getRuntimeContext().getState(new ValueStateDescriptor<Long>("ts-timer", Long.class));
}
@Override
public void processElement(SensorReading value, Context ctx, Collector<Integer> out) throws Exception {
out.collect(value.getId().length());
// context
// Timestamp of the element currently being processed or timestamp of a firing timer.
ctx.timestamp();
// Get key of the element being processed.
ctx.getCurrentKey();
// ctx.output();
ctx.timerService().currentProcessingTime();
ctx.timerService().currentWatermark();
// 在5处理时间的5秒延迟后触发
ctx.timerService().registerProcessingTimeTimer( ctx.timerService().currentProcessingTime() + 5000L);
tsTimerState.update(ctx.timerService().currentProcessingTime() + 1000L);
// ctx.timerService().registerEventTimeTimer((value.getTimestamp() + 10) * 1000L);
// 删除指定时间触发的定时器
// ctx.timerService().deleteProcessingTimeTimer(tsTimerState.value());
}
//定时器
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Integer> out) throws Exception {
System.out.println(timestamp + " 定时器触发");
ctx.getCurrentKey();
// ctx.output();
ctx.timeDomain();
}
@Override
public void close() throws Exception {
tsTimerState.clear();
}
}
Context 和OnTimerContext 所持有的TimerService 对象拥有以下方法:
long currentProcessingTime()
返回当前处理时间long currentWatermark()
返回当前watermark 的时间戳void registerProcessingTimeTimer( long timestamp)
会注册当前key的processing time的定时器。当processing time 到达定时时间时,触发timer。void deleteProcessingTimeTimer(long timestamp)
删除之前注册处理时间定时器。如果没有这个时间戳的定时器,则不执行。void deleteEventTimeTimer(long timestamp)
删除之前注册的事件时间定时器,如果没有此时间戳的定时器,则不执行。当定时器timer 触发时,会执行回调函数onTimer()。注意定时器timer 只能在keyed streams 上面使用。
场景:温度>=30放入高温流输出,反之放入低温流输出
java代码
package processfunction;
import apitest.beans.SensorReading;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
/**
* @author : Ashiamd email: ashiamd@foxmail.com
* @date : 2021/2/3 2:07 AM
*/
public class ProcessTest3_SideOuptCase {
public static void main(String[] args) throws Exception {
// 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置并行度 = 1
env.setParallelism(1);
// 从本地socket读取数据
DataStream<String> inputStream = env.socketTextStream("localhost", 7777);
// 转换成SensorReading类型
DataStream<SensorReading> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
});
// 定义一个OutputTag,用来表示侧输出流低温流
// An OutputTag must always be an anonymous inner class
// so that Flink can derive a TypeInformation for the generic type parameter.
OutputTag<SensorReading> lowTempTag = new OutputTag<SensorReading>("lowTemp"){};
// 测试ProcessFunction,自定义侧输出流实现分流操作
SingleOutputStreamOperator<SensorReading> highTempStream = dataStream.process(new ProcessFunction<SensorReading, SensorReading>() {
@Override
public void processElement(SensorReading value, Context ctx, Collector<SensorReading> out) throws Exception {
// 判断温度,大于30度,高温流输出到主流;小于低温流输出到侧输出流
if (value.getTemperature() > 30) {
out.collect(value);
} else {
ctx.output(lowTempTag, value);
}
}
});
highTempStream.print("high-temp");
highTempStream.getSideOutput(lowTempTag).print("low-temp");
env.execute();
}
}
processElement1()
和processElement2()
。// 2. 合流 connect
ConnectedStreams<Tuple2<String, Double>, SensorReading> connectedStreams = warningStream.connect(lowTempStream);
DataStream<Object> resultStream = connectedStreams.map(new CoMapFunction<Tuple2<String, Double>, SensorReading, Object>() {
@Override
public Object map1(Tuple2<String, Double> value) throws Exception {
return new Tuple3<>(value.f0, value.f1, "high temp warning");
}
@Override
public Object map2(SensorReading value) throws Exception {
return new Tuple2<>(value.getId(), "normal");
}
});
package apitest.tableapi;
import apitest.beans.SensorReading;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
/**
* @author : Ashiamd email: ashiamd@foxmail.com
* @date : 2021/2/3 5:47 AM
*/
public class TableTest1_Example {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 1. 读取数据
DataStream<String> inputStream = env.readTextFile("/tmp/Flink_Tutorial/src/main/resources/sensor.txt");
// 2. 转换成POJO
DataStream<SensorReading> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
});
// 3. 创建表环境
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 4. 基于流创建一张表
Table dataTable = tableEnv.fromDataStream(dataStream);
// 5. 调用table API进行转换操作
Table resultTable = dataTable.select("id, temperature")
.where("id = 'sensor_1'");
// 6. 执行SQL
tableEnv.createTemporaryView("sensor", dataTable);
String sql = "select id, temperature from sensor where id = 'sensor_1'";
Table resultSqlTable = tableEnv.sqlQuery(sql);
// 7. 将动态表转换为流进行输出
tableEnv.toAppendStream(resultTable, Row.class).print("result");
tableEnv.toAppendStream(resultSqlTable, Row.class).print("sql");
env.execute();
}
}
Table API 和 SQL 的程序结构,与流式处理的程序结构十分类似
StreamTableEnvironment tableEnv = ... // 创建表的执行环境
// 创建一张表,用于读取数据
tableEnv.connect(...).createTemporaryTable("inputTable");
// 注册一张表,用于把计算结果输出
tableEnv.connect(...).createTemporaryTable("outputTable");
// 通过 Table API 查询算子,得到一张结果表
Table result = tableEnv.from("inputTable").select(...);
// 通过 SQL 查询语句,得到一张结果表
Table sqlResult = tableEnv.sqlQuery("SELECT ... FROM inputTable ...");
// 将结果表写入输出表中
result.insertInto("outputTable");
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
两种方式:基于老版本planner、基于Blink。
新版本blink,真正把批处理、流处理都以DataStream实现。
package apitest.tableapi;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.internal.TableEnvironmentImpl;
/**
* @author : Ashiamd email: ashiamd@foxmail.com
* @date : 2021/2/3 3:56 PM
*/
public class TableTest2_CommonApi {
public static void main(String[] args) {
// 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置并行度为1
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 1.1 基于老版本planner的流处理
EnvironmentSettings oldStreamSettings = EnvironmentSettings.newInstance()
.useOldPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment oldStreamTableEnv = StreamTableEnvironment.create(env,oldStreamSettings);
// 1.2 基于老版本planner的批处理
ExecutionEnvironment batchEnv = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment oldBatchTableEnv = BatchTableEnvironment.create(batchEnv);
// 1.3 基于Blink的流处理
EnvironmentSettings blinkStreamSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment blinkStreamTableEnv = StreamTableEnvironment.create(env,blinkStreamSettings);
// 1.4 基于Blink的批处理
EnvironmentSettings blinkBatchSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inBatchMode()
.build();
TableEnvironment blinkBatchTableEnv = TableEnvironment.create(blinkBatchSettings);
}
}
TableEnvironment可以调用connect()
方法,连接外部系统,并调用.createTemporaryTable()
方法,在Catalog中注册表
tableEnv
.connect(...) // 定义表的数据来源,和外部系统建立连接
.withFormat(...) // 定义数据格式化方法
.withSchema(...) // 定义表结构
.createTemporaryTable("MyTable"); // 创建临时表
package apitest.tableapi;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Csv;
import org.apache.flink.table.descriptors.FileSystem;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.types.Row;
/**
* @author : Ashiamd email: ashiamd@foxmail.com
* @date : 2021/2/3 3:56 PM
*/
public class TableTest2_CommonApi {
public static void main(String[] args) throws Exception {
// 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置并行度为1
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 2. 表的创建:连接外部系统,读取数据
// 2.1 读取文件
String filePath = "/tmp/Flink_Tutorial/src/main/resources/sensor.txt";
tableEnv.connect(new FileSystem().path(filePath)) // 定义到文件系统的连接
.withFormat(new Csv()) // 定义以csv格式进行数据格式化
.withSchema(new Schema()
.field("id", DataTypes.STRING())
.field("timestamp", DataTypes.BIGINT())
.field("temp", DataTypes.DOUBLE())
) // 定义表结构
.createTemporaryTable("inputTable"); // 创建临时表
Table inputTable = tableEnv.from("inputTable");
inputTable.printSchema();
tableEnv.toAppendStream(inputTable, Row.class).print();
env.execute();
}
}
Table API是集成在Scala和Java语言内的查询API
Table API基于代表"表"的Table类,并提供一整套操作处理的方法API;这些方法会返回一个新的Table对象,表示对输入表应用转换操作的结果
有些关系型转换操作,可以由多个方法调用组成,构成链式调用结构
Table sensorTable = tableEnv.from("inputTable");
Table resultTable = sensorTable
.select("id","temperature")
.filter("id = 'sensor_1'");
// 3.1 Table API
// 简单转换
Table resultTable = inputTable.select("id, temp")
.filter("id === 'sensor_6'");
// 聚合统计
Table aggTable = inputTable.groupBy("id")
.select("id, id.count as count, temp.avg as avgTemp");
Table resultSqlTable = tableEnv.sqlQuery("select id, temperature from sensorTable where id ='sensor_1'");
// 3.2 SQL
tableEnv.sqlQuery("select id, temp from inputTable where id = 'senosr_6'");
Table sqlAggTable = tableEnv.sqlQuery("select id, count(id) as cnt, avg(temp) as avgTemp from inputTable group by id");
表的输出,是通过将数据写入 TableSink 来实现的
TableSink 是一个通用接口,可以支持不同的文件格式、存储数据库和消息队列
输出表最直接的方法,就是通过 Table.insertInto() 方法将一个 Table 写入注册过的 TableSink 中
tableEnv.connect(...).createTemporaryTable("outputTable");
Table resultSqlTable = ...
resultTable.insertInto("outputTable");
输出到文件有局限,只能是批处理,且只能是追加写,不能是更新式的随机写。
tableEnv.connect(
new FileSystem().path("output.txt")
) // 定义到文件系统的连接
.withFormat(new Csv())
.withSchema(new Schema()
.field("id", DataTypes.STRING())
.field("temp", DataTypes.Double())
).createTemporaryTable("outputTable") ; // 创建临时表
resultTable.insertInto("outputTable"); // 输出表
package apitest.tableapi;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Csv;
import org.apache.flink.table.descriptors.Kafka;
import org.apache.flink.table.descriptors.Schema;
/**
* @author : Ashiamd email: ashiamd@foxmail.com
* @date : 2021/2/3 6:33 PM
*/
public class TableTest4_KafkaPipeLine {
public static void main(String[] args) throws Exception {
// 1. 创建环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 2. 连接Kafka,读取数据
tableEnv.connect(new Kafka()
.version("universal")
.topic("sensor")
.property("zookeeper.connect", "localhost:2181")
.property("bootstrap.servers", "localhost:9092")
)
.withFormat(new Csv())
.withSchema(new Schema()
.field("id", DataTypes.STRING())
.field("timestamp", DataTypes.BIGINT())
.field("temp", DataTypes.DOUBLE())
)
.createTemporaryTable("inputTable");
// 3. 查询转换
// 简单转换
Table sensorTable = tableEnv.from("inputTable");
Table resultTable = sensorTable.select("id, temp")
.filter("id === 'sensor_6'");
// 聚合统计
Table aggTable = sensorTable.groupBy("id")
.select("id, id.count as count, temp.avg as avgTemp");
// 4. 建立kafka连接,输出到不同的topic下
tableEnv.connect(new Kafka()
.version("universal")
.topic("sinktest")
.property("zookeeper.connect", "localhost:2181")
.property("bootstrap.servers", "localhost:9092")
)
.withFormat(new Csv())
.withSchema(new Schema()
.field("id", DataTypes.STRING())
// .field("timestamp", DataTypes.BIGINT())
.field("temp", DataTypes.DOUBLE())
)
.createTemporaryTable("outputTable");
resultTable.insertInto("outputTable");
tableEnv.execute("");
}
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。