赞
踩
在Kafka Connector连接器中提供Source数据源和Sink接收器类,在Flink 1.12版本中提供基于新的接口消费Kafka数据:KafkaSource
。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.13.1</version>
</dependency>
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/#kafka-sourcefunction
Flink 的 Kafka 消费者 - 提供从一个或多个 Kafka 主题读取的访问。FlinkKafkaConsumer
构造函数接受以下参数:
1. 订阅的主题:`topic`,一个Topic名称或一个列表(多个Topic)
2. 反序列化规则:`deserialization`
3. 消费者属性-集群地址:`bootstrap.servers`
4. 消费者属性-消费者组id(如果不设置,会有默认的,但是默认的不方便管理):`group.id`
当从Kafka消费数据时,需要指定反序列化实现类:将Kafka读取二进制数据,转换为String对象。
Kafka Consumer消费数据,反序列化数据说明:
import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import java.util.Properties; /** * Flink从Kafka消费数据,指定topic名称和反序列化类 */ public class ConnectorFlinkKafkaConsumerDemo { public static void main(String[] args) throws Exception{ // 1. 执行环境-env StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(3); // 2. 数据源-source // 2-1. 创建消费Kafka数据时属性 Properties props = new Properties(); props.setProperty("bootstrap.servers", "node1:9092"); props.setProperty("group.id", "test"); // 2-2. 构建FlinkKafkaConsumer实例对象 FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<String>( "flink-topic", // new SimpleStringSchema(), // props ); // 2-3. 添加Source DataStream<String> kafkaStream = env.addSource(kafkaConsumer); // 3. 数据转换-transformation // 4. 数据接收器-sink kafkaStream.printToErr(); // 5. 触发执行-execute env.execute("ConnectorFlinkKafkaConsumerDemo") ; } }
1. topic 名称
2. 序列化:将Java对象转byte[]
3. Kafka Server地址信息
4. 容错语义
需求:自定义Source数据源,产生交易订单数据,将其转换为JSON字符串,实时保存到Kafka topic
import com.alibaba.fastjson.JSON; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import javax.annotation.Nullable; import java.util.Properties; import java.util.Random; import java.util.UUID; import java.util.concurrent.TimeUnit; /** * 将DataStream数据流中数据保存至Kafka Topic队列,使用FlinkKafkaProducer类完成 */ public class ConnectorFlinkKafkaProducerDemo { //创建实体类 @Data @NoArgsConstructor @AllArgsConstructor static class Order { private String id; private Integer userId; private Double money; private Long orderTime; } /** * 自定义数据源:每隔1秒产生1条交易订单数据 */ private static class OrderSource extends RichParallelSourceFunction<Order> { // 定义标识变量,表示是否产生数据 private boolean isRunning = true; // 模拟产生交易订单数据 @Override public void run(SourceContext<Order> ctx) throws Exception { Random random = new Random() ; while (isRunning){ // 构建交易订单数据 // 构建交易订单数据 Order order = new Order( UUID.randomUUID().toString(), // random.nextInt(10) + 1 , // (double)random.nextInt(100) ,// System.currentTimeMillis() ); // 将数据输出 ctx.collect(order); // 每隔1秒产生1条数据,线程休眠 TimeUnit.SECONDS.sleep(1); } } @Override public void cancel() { isRunning = false ; } } /** * 创建子类,实现接口,对数据进行序列化操作 */ private static class KafkaStringSchema implements KafkaSerializationSchema<String>{ @Override public ProducerRecord<byte[], byte[]> serialize(String jsonStr, @Nullable Long timestamp) { return new ProducerRecord<>("flink-topic", jsonStr.getBytes()); } } public static void main(String[] args) throws Exception { // 1. 执行环境-env StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(3) ; // 2. 数据源-source DataStreamSource<Order> orderDataStream = env.addSource(new OrderSource()); //orderDataStream.printToErr(); // 3. 数据转换-transformation // 将订单数据Order对象,转换为JSON字符串,存储到Kafka Topic队列 SingleOutputStreamOperator<String> jsonDataStream = orderDataStream.map(new MapFunction<Order, String>() { @Override public String map(Order order) throws Exception { // 阿里巴巴库:fastJson,转换对象为json字符串 return JSON.toJSONString(order); } }); //jsonDataStream.printToErr(); // 4. 数据终端-sink // 4-1. 写入数据时序列化 KafkaSerializationSchema<String> serializationSchema = new KafkaStringSchema() ; // 4-2. 生成者属性设置 Properties props = new Properties() ; props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092"); // 4-3. 构建实例对象 FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<String>( "flink-topic", serializationSchema, props, FlinkKafkaProducer.Semantic.EXACTLY_ONCE ); // 4-4. 添加接收器 jsonDataStream.addSink(kafkaProducer) ; // 5. 触发执行-execute env.execute("ConnectorFlinkKafkaProducerDemo"); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。