赞
踩
Kafka 0-8 Receiver模式和Direct模式都不适合当前版本不适用,本次学习采用Kafka 0-10 Direct模式,并通过第三方存储zookeeper来手动管理offset
Spark Streaming 获取Kafka的数据有两种方式:Receiver
和 Direct
。
Receiver 是通过Zookeeper连接Kafka队列获取数据。
需要一个专门的Executor去接收数据,然后发送给其他的Executor做计算。这样会存在一个问题,就是接收数据的Executor和计算的Executor处理速度会有差别,特别是在接收数据的Executor速度大于计算的Executor速度,会导致计算数据的节点内存溢出。早期版本中提供此方式,当前版本不推荐
Direct 是直接连接Kafka的节点获取数据。
是由计算的Executor来主动消费Kafka的数据,速度由自身控制,是在Spark 1.3中引入的,从而能够确保更加健壮的机制。这种方式会周期性地查询Kafka,来获得每个 topic + partition 的最新的 offset,从而定义每个batch的offset的范围。当处理数据的job启动时,就会使用Kafka的简单consumer api来获取Kafka指定offset范围的数据。
推荐使用Direct方式
offset的三种管理方式:
1、自动提交offset:
enable.auto.commit=true
一但consumer挂掉,就会导致数据丢失或重复消费。
offset不可控。
2、Kafka自身的offset管理:
(属于At-least-once语义,如果做好了幂等性,可以使用这种方式):在Kafka 0.10+版本中,offset的默认存储由ZooKeeper移动到了一个自带的topic中,名为__consumer_offsets。
Spark Streaming也专门提供了commitAsync() API用于提交offset。
需要将参数修改为enable.auto.commit=false
。
在我实际测试中发现,这种offset的管理方式,不会丢失数据,但会出现重复消费。停掉streaming应用程序再次启动后,会再次消费停掉前最后的一个批次数据,应该是由于offset是异步提交的方式导致,offset更新不及时引起的。因此需要做好数据的幂等性。(修改源码将异步改为同步,应该是可以做到Exactly-once语义的)
示例
在Kafka 0.10+版本中,offset的默认存储由ZooKeeper移动到了一个自带的topic中,名__consumer_offsets。所以我们读写offset的对象正是这个topic,Spark Streaming也专门提供了commitAsync() API用于提交offset。实际上,一切都已经封装好了,直接调用相关API即可。
stream.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
// 确保结果都已经正确且幂等地输出了
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}
3、自定义offset:
(推荐,采用这种方式,可以做到At-least-once语义):可以将offset存放在第三方储中,包括RDBMS、Redis、ZK、ES等。若消费数据存储在带事务的组件上,则强烈推荐将offset存储在一起,借助事务实现 Exactly-once 语义。
ZooKeeper
在Spark Streaming连接Kafka应用中使用Zookeeper来存储offsets也是一种比较可靠的方式。
在这个方案中,Spark Streaming任务在启动时会去Zookeeper中读取每个分区的offsets。如果有新的分区出现,那么他的offset将会设置在最开始的位置。在每批数据处理完之后,用户需要可以选择存储已处理数据的一个offset或者最后一个offset。此外,新消费者将使用跟旧的Kafka 消费者API一样的格式将offset保存在ZooKeeper中。因此,任何追踪或监控Zookeeper中Kafka Offset的工具仍然生效的。
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.2.3</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.2.3</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.2.2</version>
</dependency>
package com.demo;
import kafka.utils.ZKGroupTopicDirs;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
import org.apache.kafka.common.TopicPartition;
import org.apache.spark.streaming.kafka010.*;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;
import scala.collection.Iterator;
import scala.collection.Seq;
import java.io.Serializable;
import java.util.*;
class ZkKafkaOffsetManager implements Serializable {
private final Logger logger = LoggerFactory.getLogger(ZkKafkaOffsetManager.class);
private final transient ZkUtils zkUtils;
public ZkKafkaOffsetManager(String zkUrl) {
Tuple2<ZkClient, ZkConnection> zkClientAndConn = ZkUtils.createZkClientAndConnection(zkUrl, 30000, 30000);
zkUtils = new ZkUtils(zkClientAndConn._1, zkClientAndConn._2, false);
}
public Map<TopicPartition, Long> readOffsets(Seq<String> topics, String groupId) {
Map<TopicPartition, Long> offsets = new HashMap<>();
scala.collection.mutable.Map<String, Seq<Object>> partitionsForTopics = zkUtils.getPartitionsForTopics(topics);
Iterator<Tuple2<String, Seq<Object>>> partitionsIterator = partitionsForTopics.iterator();
while (partitionsIterator.hasNext()) {
Tuple2<String, Seq<Object>> partitions = partitionsIterator.next();
String topic = partitions._1;
ZKGroupTopicDirs groupTopicDirs = new ZKGroupTopicDirs(groupId, topic);
Iterator<Object> iterator = partitions._2.iterator();
while (iterator.hasNext()) {
Object partition = iterator.next();
String path = groupTopicDirs.consumerOffsetDir() + "/" + partition;
System.out.println("read path:" + path);
try {
Tuple2<String, Stat> data = zkUtils.readData(path);
if (data != null) {
offsets.put(new TopicPartition(topic, (Integer) partition), Long.parseLong(data._1));
logger.info(
"Read offset - topic={}, partition={}, offset={}, path={}",
topic, partition, data._1, path);
}
} catch (Exception e) {
offsets.put(new TopicPartition(topic, (Integer) partition), 0L);
logger.info(
"Read offset - not exist: {}, topic={}, partition={}, path={}",
e.getMessage(), topic, partition, path);
}
}
}
return offsets;
}
public void saveOffsets(Seq<OffsetRange> offsetRanges, String groupId) {
Iterator<OffsetRange> iterable = offsetRanges.iterator();
while (iterable.hasNext()) {
OffsetRange range = iterable.next();
ZKGroupTopicDirs groupTopicDirs = new ZKGroupTopicDirs(groupId, range.topic());
String path = groupTopicDirs.consumerOffsetDir() + "/" + range.partition();
List<ACL> list = new ArrayList<>(ZooDefs.Ids.OPEN_ACL_UNSAFE);
zkUtils.updatePersistentPath(path, String.valueOf(range.untilOffset()), list);
logger.info(
"Save offset - topic={}, partition={}, offset={}, path={}",
range.topic(), range.partition(), range.untilOffset(), path);
}
}
}
这样,offset就会被存储在ZK的/consumers/[groupId]/offsets/[topic]/[partition]路径下。当初始化DirectStream时,调用readOffsets()方法获得offset。当数据处理完成后,调用saveOffsets()方法来更新ZK中的值。
注意
不能对stream对象做transformation操作之后的结果进行强制转换(会直接报ClassCastException),因为RDD与DStream的类型都改变了。只有RDD或DStream的包含类型为ConsumerRecord才行。
package com.demo;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.*;
import scala.Tuple2;
import scala.collection.JavaConverters;
import scala.collection.Seq;
import java.util.*;
import java.util.regex.Pattern;
public class Demo {
public static void main(String[] args) throws InterruptedException {
SparkConf sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreamingFromkafka");
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(10));
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "192.168.32.101:9092,192.168.32.102:9092,192.168.32.103:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "sparkStreaming");
kafkaParams.put("fetch.message.max.bytes", "104857600");
String topics = "my_topic-1";
String groupName = "group-test-1";
Set<String> topicsSet = new HashSet<>(Arrays.asList(topics.split(",")));
Seq<String> topicsSeq = JavaConverters.asScalaIteratorConverter(topicsSet.iterator()).asScala().toSeq();
Pattern plv1 = Pattern.compile("\n");
Pattern plv2 = Pattern.compile("\\|");
String ljoin = "4|41||";
int keys[] = tools.sary2int("1,2,3".split(","));
int flds[] = tools.sary2int("0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19".split(","));
ZkKafkaOffsetManager zkKafkaOffsetManager = new ZkKafkaOffsetManager("bigdata01:2181,bigdata02:2181,bigdata03:2181/kafka");
Map<TopicPartition, Long> offsets = zkKafkaOffsetManager.readOffsets(topicsSeq, groupName);
JavaInputDStream<ConsumerRecord<Object, Object>> stream = KafkaUtils.createDirectStream(
jssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.Subscribe(topicsSet, kafkaParams, offsets)
);
stream.foreachRDD((VoidFunction<JavaRDD<ConsumerRecord<Object, Object>>>) consumerRecordJavaRDD -> {
OffsetRange[] offsetRangesArr = ((HasOffsetRanges) consumerRecordJavaRDD.rdd()).offsetRanges();
Seq<OffsetRange> OffsetRange = JavaConverters.asScalaIteratorConverter(Arrays.asList(offsetRangesArr).iterator()).asScala().toSeq();
zkKafkaOffsetManager.saveOffsets(OffsetRange, groupName);
});
JavaDStream<String[]> rdd =
stream.flatMap((f) -> Arrays.asList(plv1.split(f.value().toString())).iterator())
.map((f) -> plv2.split(ljoin + f));
JavaPairDStream<String, String[]> rdd2 = rdd.window(Durations.seconds(Long.parseLong("10")), Durations.seconds(Long.parseLong("10"))).mapToPair((f) -> {
String[] akey = tools.combine(f, keys);
String[] value = tools.combine(f, flds);
return new Tuple2<>(String.join(",", akey), value);
});
rdd2.foreachRDD((VoidFunction<JavaPairRDD<String, String[]>>) stringJavaPairRDD -> stringJavaPairRDD.foreach((VoidFunction<Tuple2<String, String[]>>) stringTuple2 -> {
System.out.println("key==" + stringTuple2._1);
System.out.println("value==" + Arrays.toString(stringTuple2._2));
}));
jssc.start();
jssc.awaitTermination();
jssc.close();
}
}
参考文章:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。