赞
踩
向Kafka写数据 如果topic不存在则会自动创建一个副本和分区数都是1的topic
package com.shujia.kafka; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; public class Demo01KafkaProducer { public static void main(String[] args) { Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "master:9092,node1:9092,node2:9092"); properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 创建Kafka 生产者 KafkaProducer<String, String> producer = new KafkaProducer<>(properties); // 向Kafka写数据 如果topic不存在则会自动创建一个副本和分区数都是1的topic producer.send(new ProducerRecord<String,String>("topic02","1500100001,施笑槐,22,女,文科六班")); producer.flush(); } }
执行了两次的结果:
package com.shujia.kafka; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import java.io.BufferedReader; import java.io.FileReader; import java.io.IOException; import java.util.Properties; public class Demo02KafkaStuProducer { // 将1000条学生数据写入Kafka public static void main(String[] args) throws IOException { Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "master:9092,node1:9092,node2:9092"); properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 创建Kafka 生产者 KafkaProducer<String, String> producer = new KafkaProducer<>(properties); // 读取文件 BufferedReader br = new BufferedReader(new FileReader("kafka/data/stu/students.txt")); String line; while ((line = br.readLine()) != null) { producer.send(new ProducerRecord<>("students1000", line)); } producer.flush(); } }
结果:
package com.shujia.kafka; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.ArrayList; import java.util.Properties; public class Demo03KafkaConsumer { public static void main(String[] args) throws InterruptedException { Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "master:9092,node1:9092,node2:9092"); properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); /* * 消费者组的偏移量设定规则: * earliest 相当于from-beginning 从头开始消费 * latest 从最新的数据开始消费 */ properties.setProperty("auto.offset.reset", "earliest"); // 设置消费者组id,每一个grp02只能读一次,第二次再读取时,则不会出现数据!(结果显示) properties.setProperty("group.id", "grp03"); // 创建Kafka的消费者 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties); ArrayList<String> topic = new ArrayList<>(); topic.add("students1000"); // 指定消费的topic consumer.subscribe(topic); // 加一个死循环,使其为无界流,一直读取 while (true){ ConsumerRecords<String, String> records = consumer.poll(10000); for (ConsumerRecord<String, String> record : records) { System.out.println(record.headers()); System.out.println(record.offset()); System.out.println(record.timestamp()); // 由于所读的Topic students1000中只有一个partition,所以结果中只显示有一个分区 // System.out.println(record.partition()); // 没有key,打印结果为null System.out.println(record.key()); System.out.println(record.value()); } // 防止执行太快,没有取完所有的数据;给它睡眠一会 Thread.sleep(5000); } } }
统计每个班级的学生人数:
package tfTest; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class Demo01KafkaSource { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); KafkaSource<String> kafkaSource = KafkaSource .<String>builder() .setBootstrapServers("master:9092,node1:9092,node2:9092") .setGroupId("group001") .setTopics("students1000") .setStartingOffsets(OffsetsInitializer.earliest()) .setValueOnlyDeserializer(new SimpleStringSchema()) .build(); DataStreamSource<String> kafkaDS = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafkaSource"); kafkaDS.map(line -> Tuple2.of(line.split(",")[4],1), Types.TUPLE(Types.STRING,Types.INT)) .keyBy(t2 -> t2.f0) .sum(1) .print(); env.execute(); /** * 结果为:(数据流为无界流,会一直接收数据并处理) * 13> (理科五班,17) * 13> (理科四班,17) * 13> (理科四班,18) * 13> (理科四班,19) * 13> (理科四班,20) * 13> (理科五班,18) * 13> (理科四班,21) * 13> (理科四班,22) * 13> (理科五班,19) * 13> (理科四班,23) */ } }
设置写入时的语义:
1、AT_LEAST_ONCE:保证数据至少被写入了一次,性能会更好,但是又可能会写入重复的数据
2、EXACTLY_ONCE:保证数据只会写入一次,不多不少,性能会有损耗
package com.shujia.flink.kafka; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.connector.base.DeliveryGuarantee; import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema; import org.apache.flink.connector.kafka.sink.KafkaSink; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class Demo02KafkaSink { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> carDS = env.readTextFile("flink/data/cars_sample.json"); KafkaSink<String> sink = KafkaSink.<String>builder() .setBootstrapServers("master:9092,node1:9092,node2:9092") .setRecordSerializer( KafkaRecordSerializationSchema .builder() .setTopic("cars_json") // 不存在会自动创建 // 指定数据流的序列化方式 .setValueSerializationSchema(new SimpleStringSchema()) .build() ) /** 设置写入时的语义: 1、AT_LEAST_ONCE:保证数据至少被写入了一次,性能会更好,但是又可能会写入重复的数据 2、EXACTLY_ONCE:保证数据只会写入一次,不多不少,性能会有损耗 */ .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) .build(); carDS.sinkTo(sink); env.execute(); } }
注:
TODO 将从Topic中读取的数据转换成自定义的Car对象,便于后续操作
SingleOutputStreamOperator carDS = carStrDS.map(carStr -> JSON.parseObject(carStr, Car.class));第三个参数:源名称的设置在Flink中主要用于提高日志、监控、元数据管理和代码可读性的目的
DataStreamSource carStrDS = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), “cars”);reduce中聚合操作
package com.shujia.flink.kafka; import com.alibaba.fastjson.JSON; import jdk.nashorn.internal.scripts.JO; import lombok.AllArgsConstructor; import lombok.Getter; import lombok.NoArgsConstructor; import lombok.Setter; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class Demo03CarsAvgSpeed { public static void main(String[] args) throws Exception { // 基于Kafka Cars数据实时统计每条道路的平均车速 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 构建Kafka Source KafkaSource<String> kafkaSource = KafkaSource .<String>builder() .setBootstrapServers("master:9092,node1:9092,node2:9092") .setGroupId("grp001") .setTopics("cars_json") .setStartingOffsets(OffsetsInitializer.earliest()) .setValueOnlyDeserializer(new SimpleStringSchema()) .build(); // {"car":"皖AQLXL2","city_code":"340100","county_code":"340111","card":117331031812010,"camera_id":"01012","orientation":"西","road_id":34406326,"time":1614731906,"speed":47.86} // 第三个参数:源名称的设置在Flink中主要用于提高日志、监控、元数据管理和代码可读性的目的 DataStreamSource<String> carStrDS = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "cars"); //TODO 将从Topic中读取的数据转换成自定义的Car对象,便于后续操作 SingleOutputStreamOperator<Car> carDS = carStrDS.map(carStr -> JSON.parseObject(carStr, Car.class)); carDS // 只取Car对象属性中的road_id、speed,加上“1”,用于后续求平均值 .map(car-> Tuple3.of(car.road_id,car.speed,1), Types.TUPLE(Types.LONG,Types.DOUBLE,Types.INT)) .keyBy(t3->t3.f0,Types.LONG) // 对整体进行聚合: .reduce(new ReduceFunction<Tuple3<Long, Double, Integer>>() { @Override public Tuple3<Long, Double, Integer> reduce(Tuple3<Long, Double, Integer> value1, Tuple3<Long, Double, Integer> value2) throws Exception { return Tuple3.of(value1.f0, value1.f1 + value2.f1, value1.f2 + value2.f2); } }) // 聚合后,求出各路段的平均车速 .map(t3 -> Tuple2.of(t3.f0, t3.f1 / t3.f2),Types.TUPLE(Types.LONG,Types.DOUBLE)) .print(); env.execute(); } } // 定义一个Car类型,使用注解的方式创建get、set、构造器方法; 前期处理:加入lombok依赖,下载lombok插件 @Getter @Setter @AllArgsConstructor @NoArgsConstructor class Car{ String car; Integer city_code; Integer county_code; Long card; String camera_id; String orientation; Long road_id; Long time; Double speed; }
**当任务停止后,可以在HDFS上缓存任务中的结果数据。**再次启动任务时,输入数据得出的结果会算上上次运行的结果(实现故障恢复的效果)
checkpoint保存了算子运行后的结果状态:
package com.shujia.flink.state; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class Demo02CheckPoint { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 开启CK,每 5000ms 开始一次 checkpoint env.enableCheckpointing(5000); // 高级选项: // 设置模式为精确一次 (这是默认值) env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); // 确认 checkpoints 之间的时间会进行 500 ms env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); // Checkpoint 必须在一分钟内完成,否则就会被抛弃 env.getCheckpointConfig().setCheckpointTimeout(60000); // 允许两个连续的 checkpoint 错误 env.getCheckpointConfig().setTolerableCheckpointFailureNumber(2); // 同一时间只允许一个 checkpoint 进行 env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // 使用 externalized checkpoints,这样 checkpoint 在作业取消后仍就会被保留 env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); // 开启实验性的 unaligned checkpoints env.getCheckpointConfig().enableUnalignedCheckpoints(); // 设置CK保存的路径,一般是HDFS的路径;端口号9000可以在HDFS UI上查看;地址中只会保存最新的checkpoint,之前的会进进行缓存 env.getCheckpointConfig().setCheckpointStorage("hdfs://master:9000/flink/checkpoint"); // Flink算子在计算时,实际上已经自带了状态,但是并没有主动进行CheckPoint env // 从一个开启的socket中获取数据 .socketTextStream("master", 8888) .map(word -> Tuple2.of(word, 1), Types.TUPLE(Types.STRING, Types.INT)) .keyBy(t2 -> t2.f0).sum(1) .print(); env.execute(); } }
再次启动时,需要指定checkpoint的存储路径:
hdfs://master:9000/flink/checkpoint/d882f9b4d726d7462573a3bee8ab4fcb/chk-14
- 1
一般在keyBy计算之后之后进行状态存储,将状态保存得开启checkpoint,可以在配置文件中开启(就不用总是使用代码进行开启了)
使用状态
不同类型的状态:
ValueState:单值状态,包含两个方法:update更新状态、value获取状态
ListState :状态为多值
MapState : 状态为KV
ReducingState :状态需要聚合,最终还是单值状态
AggregatingState:状态需要聚合,最终还是单值状态
package com.shujia.flink.state; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.util.Collector; public class Demo03ValueState { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<String> ds01 = env.socketTextStream("master", 8888); // 在配置文件中开启了CK,则不需要通过env再设置了 SingleOutputStreamOperator<Tuple2<String,Integer>> wordDS = ds01.flatMap((line, out) -> { for (String word : line.split(",")) { out.collect(Tuple2.of(word, 1)); } }, Types.TUPLE(Types.STRING, Types.INT)); KeyedStream<Tuple2<String, Integer>, String> keyedDS = wordDS.keyBy(t2 -> t2.f0, Types.STRING); // 基于分组之后的数据流同样可以调用process方法 keyedDS .process(new KeyedProcessFunction<String, Tuple2<String, Integer>, String>() { // 定义一个ValueState单值状态,包含两个方法:update更新状态、value获取状态 // Flink会给每一个keyBy的key单独维护一个状态 /** * 一般在keyBy计算之后之后进行状态存储,将状态保存得开启checkpoint,可以在配置文件中开启(就不用总是使用代码进行开启了) * 使用状态 * 不同类型的状态: * ListState :状态为多值 * MapState : 状态为KV * ReducingState :状态需要聚合,最终还是单值状态 * AggregatingState:状态需要聚合,最终还是单值状态 */ ValueState<Integer> valueState; // 当KeyedProcessFunction构建时只会执行一次 @Override public void open(Configuration parameters) throws Exception { // 使用Flink Context来初始化状态 RuntimeContext context = getRuntimeContext(); ValueStateDescriptor<Integer> descriptor = new ValueStateDescriptor<>("count", Types.INT); valueState = context.getState(descriptor); } // 每一条数据会执行一次 @Override public void processElement(Tuple2<String, Integer> value, KeyedProcessFunction<String, Tuple2<String, Integer>, String>.Context ctx, Collector<String> out) throws Exception { Integer cnt = valueState.value(); int count = 1; // 如果是第一次处理某个单词,则返回null if (cnt != null){ count = cnt + 1; } valueState.update(count); out.collect(value.f0+","+count); } }).print(); env.execute(); } }
任务停止后,要想继续上一次的执行结果,再次启动时,需要指定checkpoint的存储路径:
再次执行时,checkpoint会从所指定的checkpoint开始,如下图:
对某个人的交易流水进行欺诈检测:如果有一笔交易小于一元,然后紧接着的一笔交易大于500,则判断有欺诈风险
package com.shujia.flink.state; import lombok.AllArgsConstructor; import lombok.Data; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.util.Collector; public class Demo04FraudCheck { public static void main(String[] args) throws Exception { // 对某个人的交易流水进行欺诈检测:如果有一笔交易小于一元,然后紧接着的一笔交易大于500,则判断有欺诈风险 /* * 1,1000 * 1,500 * 1,200 * 1,0.1 * 1,1000 * 1,0.1 * 1,300 */ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> transDS = env.socketTextStream("master", 8888); SingleOutputStreamOperator<MyTrans> myTransDS = transDS.map(line -> { String[] split = line.split(","); return new MyTrans(split[0], Double.parseDouble(split[1])); }); myTransDS .keyBy(MyTrans::getId)// 按照每个人的户号进行分组 .process(new KeyedProcessFunction<String, MyTrans, String>() { // 创建状态: ValueState<Boolean> flagState; @Override public void open(Configuration parameters) throws Exception { RuntimeContext context = getRuntimeContext(); flagState = context.getState(new ValueStateDescriptor<Boolean>("flag", Types.BOOLEAN)); } @Override public void processElement(MyTrans value, KeyedProcessFunction<String, MyTrans, String>.Context ctx, Collector<String> out) throws Exception { // 获取上一条纪录的状态,如果为true,则表示上一条记录是小于1的,则需要对当前记录进行是否大于500的判断 // 如果为false,则只需要判断当前记录中的金额是否小于1 Boolean flag = flagState.value(); if(flag==null){ // 默认值为false flag = false; } // 获取每笔交易中的交易金额 Double trans = value.getTrans(); if (trans < 1) { flagState.update(true); } // 如果上一次的flag为true,并且当前的trans>500,则会触发println执行 if (flag) { if (trans > 500) { System.out.println("存在交易风险"); } // 大于500设置flagState为false flagState.update(false); } } }); // flink不需要action算子触发任务,由事件触发(数据流发生变化、所监控的文件发生变化时,会触发执行) env.execute(); } } @Data @AllArgsConstructor class MyTrans { String id; Double trans; }
要么都执行,要么都不执行
send操作,要么是提交事务后都执行,要么是都不执行
# 若通过事务的方式写Kafka,在读取时--isolation-level <String> :默认读取未提交的所以数据 read_uncommitted
# 若要读取提交了的数据,那么得使用 read_committed
kafka-console-consumer.sh --isolation-level read_committed --bootstrap-server master:9092,node1:9092,node3:9092 --from-beginning --topic trans_topic
kafka-console-consumer.sh --bootstrap-server master:9092,node1:9092,node3:9092 --from-beginning --topic trans_topic
package com.shujia.kafka; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; public class Demo03KafkaTransaction { public static void main(String[] args) throws InterruptedException { // 通过事务的方式写Kafka Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "master:9092,node2:9092,node2:9092"); properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 若不加则会报错 properties.setProperty("transactional.id", "trans01"); // 创建Kafka 生产者 KafkaProducer<String, String> producer = new KafkaProducer<>(properties); // 开启一个事务,要么都执行,要么都不执行 producer.initTransactions(); producer.beginTransaction(); // 向Kafka写数据 如果topic不存在则会自动创建一个副本和分区数都是1的topic producer.send(new ProducerRecord<String,String>("trans_topic","1500100001,施笑槐,22,女,文科六班")); producer.send(new ProducerRecord<String,String>("trans_topic","1500100002,施笑槐,22,女,文科六班")); producer.send(new ProducerRecord<String,String>("trans_topic","1500100003,施笑槐,22,女,文科六班")); Thread.sleep(10000); producer.send(new ProducerRecord<String,String>("trans_topic","1500100004,施笑槐,22,女,文科六班")); producer.send(new ProducerRecord<String,String>("trans_topic","1500100005,施笑槐,22,女,文科六班")); producer.flush(); // 提交事务之后才算写入完成 producer.commitTransaction(); } }
确保数据不重复也不丢失
Flink作为消费端和处理端,从kafka中读取数据,将消费的偏移量和计算的结果通过checkpoint保存起来,以便故障的恢复。
如果需要提交到集群运行,记得在$FLINK_HOME/lib目录下添加flink-sql-connector-kafka-1.15.4.jar依赖
package com.shujia.flink.state; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class Demo05ConsumeKafkaExactlyOnce { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 设置CK的时间间隔 env.enableCheckpointing(15000); // 如果需要提交到集群运行,记得在$FLINK_HOME/lib目录下添加flink-sql-connector-kafka-1.15.4.jar依赖 KafkaSource<String> kafkaSource = KafkaSource .<String>builder() .setBootstrapServers("master:9092,node1:9092,node2:9092") .setGroupId("grp001") // 第一次可以随便指定,如果需要恢复则必须和上一次同步 .setTopics("words001") //TODO 读取的时候如果不存在会报错(读取Topic时,若其不存在,不会为其创建) // 如果是故障后从CK恢复,FLink会自动将其设置为committedOffsets,即从上一次失败的位置继续消费 .setStartingOffsets(OffsetsInitializer.earliest()) .setValueOnlyDeserializer(new SimpleStringSchema()) .build(); // 从KafkaSource接收数据变成DS 无界流 // Topic有几个分区,则KafkaSource有几个并行度去读取Kafka的数据 DataStreamSource<String> kafkaDS = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafkaSource"); // 统计班级人数 kafkaDS .map(word -> Tuple2.of(word, 1), Types.TUPLE(Types.STRING, Types.INT)) .keyBy(t2 -> t2.f0) .sum(1) .print(); env.execute(); } }
Flink从kafka接收数据,处理后再将结果写入到kafka
这种情况下:Flink会将需要做的checkpoint操作(将消费偏移量和计算的结果保存到HDFS上)和将结果写入到Kafka这两个操作构成一个事务,来保证ExactlyOnce
注:执行时报错
org.apache.flink.kafka.shaded.org.apache.kafka.common.KafkaException:
Unexpected error in InitProducerIdResponse;
The transaction timeout is larger than the maximum value allowed by the broker
(as configured by transaction.max.timeout.ms).原因:
transaction.max.timeout.ms : Kafka事务最大的超时时间,默认15分钟,即Broker允许的事务最大时间为15分钟 ,Flink的KafkaSink默认事务的超时时间为1小时。若不统一它们的时间,则会发生冲突。
transaction.timeout.ms :设置Kafka Sink的事务时间,只要小于15分钟即可
package com.shujia.flink.state; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.connector.base.DeliveryGuarantee; import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema; import org.apache.flink.connector.kafka.sink.KafkaSink; import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import java.util.Properties; public class Demo06SinkKafkaExactlyOnce { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 设置CK的时间间隔 env.enableCheckpointing(15000); KafkaSource<String> kafkaSource = KafkaSource .<String>builder() .setBootstrapServers("master:9092,node1:9092,node2:9092") .setGroupId("grp001") // 第一次可以随便指定,如果需要恢复则必须和上一次同步 .setTopics("words001") // 读取的时候如果不存在会报错 // 如果是故障后从CK恢复,FLink会自动将其设置为committedOffsets,即从上一次失败的位置继续消费 .setStartingOffsets(OffsetsInitializer.earliest()) .setValueOnlyDeserializer(new SimpleStringSchema()) .build(); /** * 从KafkaSource接收数据变成DS 无界流 * Topic有几个分区,则KafkaSource有几个并行度去读取Kafka的数据 * 从kafka中读取数据,再对其进行处理,最后写入到kafka中的一个Topic中 */ DataStreamSource<String> kafkaDS = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafkaSource"); Properties prop = new Properties(); /* * org.apache.flink.kafka.shaded.org.apache.kafka.common.KafkaException: * Unexpected error in InitProducerIdResponse; * The transaction timeout is larger than the maximum value allowed by the broker * (as configured by transaction.max.timeout.ms). * * transaction.max.timeout.ms : Kafka事务最大的超时时间,默认15分钟,即Broker允许的事务最大时间为15分钟 * Flink的Kafka Sink默认事务的超时时间为1小时。若不同意它们的时间,则会放生冲突。 * * transaction.timeout.ms :设置Kafka Sink的事务时间,只要小于15分钟即可 */ prop.setProperty("transaction.timeout.ms", 15 * 1000 + ""); KafkaSink<String> sink = KafkaSink .<String>builder() .setBootstrapServers("master:9092,node1:9092,node2:9092") .setKafkaProducerConfig(prop) .setRecordSerializer( KafkaRecordSerializationSchema .builder() .setTopic("word_cnt01") // 写入数据时,Topic不存在会自动创建 .setValueSerializationSchema(new SimpleStringSchema()) .build() ) /* 设置写入时的语义: 1、AT_LEAST_ONCE:保证数据至少被写入了一次,性能会更好,但是又可能会写入重复的数据 2、EXACTLY_ONCE:保证数据只会写入一次,不多不少,性能会有损耗 */ .setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE) .build(); // 统计班级人数 kafkaDS .map(word -> Tuple2.of(word, 1), Types.TUPLE(Types.STRING, Types.INT)) .keyBy(t2 -> t2.f0) .sum(1) // 将结果的二元组转换成String才能写入Kafka .map(t2 -> t2.f0 + "," + t2.f1) .sinkTo(sink); env.execute(); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。