当前位置:   article > 正文

Flink之Kafka Connector_flink-connector-kafka

flink-connector-kafka

1、了解Kafka Connector

在Kafka Connector连接器中提供Source数据源和Sink接收器类,在Flink 1.12版本中提供基于新的接口消费Kafka数据:KafkaSource
在这里插入图片描述

2、使用Kafka Connector连接器,添加Maven 依赖:


<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka_2.11</artifactId>
    <version>1.13.1</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

3 、FlinkKafkaConsumer

3.1 跳转官网

https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/#kafka-sourcefunction

在这里插入图片描述

Flink 的 Kafka 消费者 - 提供从一个或多个 Kafka 主题读取的访问。FlinkKafkaConsumer

构造函数接受以下参数:

1. 订阅的主题:`topic`,一个Topic名称或一个列表(多个Topic)
2. 反序列化规则:`deserialization`
3. 消费者属性-集群地址:`bootstrap.servers`
4. 消费者属性-消费者组id(如果不设置,会有默认的,但是默认的不方便管理)`group.id`

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

当从Kafka消费数据时,需要指定反序列化实现类:将Kafka读取二进制数据,转换为String对象
在这里插入图片描述
Kafka Consumer消费数据,反序列化数据说明:
在这里插入图片描述

3.2代码实现

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;

import java.util.Properties;

/**
 * Flink从Kafka消费数据,指定topic名称和反序列化类
 */
public class ConnectorFlinkKafkaConsumerDemo {

	public static void main(String[] args) throws Exception{
		// 1. 执行环境-env
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.setParallelism(3);

		// 2. 数据源-source
		// 2-1. 创建消费Kafka数据时属性
		Properties props = new Properties();
		props.setProperty("bootstrap.servers", "node1:9092");
		props.setProperty("group.id", "test");
		// 2-2. 构建FlinkKafkaConsumer实例对象
		FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<String>(
			"flink-topic", //
			new SimpleStringSchema(), //
			props
		);
		// 2-3. 添加Source
		DataStream<String> kafkaStream = env.addSource(kafkaConsumer);

		// 3. 数据转换-transformation
		// 4. 数据接收器-sink
		kafkaStream.printToErr();

		// 5. 触发执行-execute
		env.execute("ConnectorFlinkKafkaConsumerDemo") ;
	}

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40

4、FlinkKafkaProducer

4.1官网解释

在这里插入图片描述1. topic 名称
2. 序列化:将Java对象转byte[]
3. Kafka Server地址信息
4. 容错语义
在这里插入图片描述

4.2 代码

需求:自定义Source数据源,产生交易订单数据,将其转换为JSON字符串,实时保存到Kafka topic

import com.alibaba.fastjson.JSON;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import javax.annotation.Nullable;
import java.util.Properties;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

/**
 * 将DataStream数据流中数据保存至Kafka Topic队列,使用FlinkKafkaProducer类完成
 */
public class ConnectorFlinkKafkaProducerDemo {
//创建实体类
	@Data
	@NoArgsConstructor
	@AllArgsConstructor
	static class Order {
		private String id;
		private Integer userId;
		private Double money;
		private Long orderTime;
	}

	/**
	 * 自定义数据源:每隔1秒产生1条交易订单数据
	 */
	private static class OrderSource extends RichParallelSourceFunction<Order> {
		// 定义标识变量,表示是否产生数据
		private boolean isRunning = true;

		// 模拟产生交易订单数据
		@Override
		public void run(SourceContext<Order> ctx) throws Exception {
			Random random = new Random() ;
			while (isRunning){
				// 构建交易订单数据
				// 构建交易订单数据
				Order order = new Order(
					UUID.randomUUID().toString(), //
					random.nextInt(10) + 1 , //
					(double)random.nextInt(100) ,//
					System.currentTimeMillis()
				);

				// 将数据输出
				ctx.collect(order);

				// 每隔1秒产生1条数据,线程休眠
				TimeUnit.SECONDS.sleep(1);
			}
		}

		@Override
		public void cancel() {
			isRunning = false ;
		}
	}


	/**
	 * 创建子类,实现接口,对数据进行序列化操作
	 */
	private static class KafkaStringSchema implements KafkaSerializationSchema<String>{
		@Override
		public ProducerRecord<byte[], byte[]> serialize(String jsonStr, @Nullable Long timestamp) {
			return new ProducerRecord<>("flink-topic", jsonStr.getBytes());
		}
	}

	public static void main(String[] args) throws Exception {
		// 1. 执行环境-env
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.setParallelism(3) ;

		// 2. 数据源-source
		DataStreamSource<Order> orderDataStream = env.addSource(new OrderSource());
		//orderDataStream.printToErr();

		// 3. 数据转换-transformation

		// 将订单数据Order对象,转换为JSON字符串,存储到Kafka Topic队列
		SingleOutputStreamOperator<String> jsonDataStream = orderDataStream.map(new MapFunction<Order, String>() {
			@Override
			public String map(Order order) throws Exception {
				// 阿里巴巴库:fastJson,转换对象为json字符串
				return JSON.toJSONString(order);
			}
		});
		//jsonDataStream.printToErr();

		// 4. 数据终端-sink
		// 4-1. 写入数据时序列化
		KafkaSerializationSchema<String> serializationSchema = new KafkaStringSchema() ;
		// 4-2. 生成者属性设置
		Properties props = new Properties() ;
		props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");
		// 4-3. 构建实例对象
		FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<String>(
			"flink-topic",
				serializationSchema,
				props,
				FlinkKafkaProducer.Semantic.EXACTLY_ONCE
		);
		// 4-4. 添加接收器
		jsonDataStream.addSink(kafkaProducer) ;

		// 5. 触发执行-execute
		env.execute("ConnectorFlinkKafkaProducerDemo");
	}

}  
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小小林熬夜学编程/article/detail/493555
推荐阅读
相关标签
  

闽ICP备14008679号