赞
踩
https://kafka.apache.org/
wget https://mirror.bit.edu.cn/apache/kafka/2.7.0/kafka_2.12-2.7.0.tgz
tar zxvf kafka_2.12-2.7.0.tgz
cp -r kafka_2.12-2.7.0 kafka01
cp -r kafka_2.12-2.7.0 kafka02
cp -r kafka_2.12-2.7.0 kafka03
server.properties
# The id of the broker. This must be set to a unique integer for each broker.
# 分别修改为0,1,2
broker.id=0
# 分别修改为 9092,9093,9094
listeners=PLAINTEXT://:9093
# 修改为自己的ip:端口
zookeeper.connect=localhost:2181
# 日志地址改为 01/02/03
log.dirs=/tmp/kafka-logs01
记得要先启动zookeeper
cd kafka02
bin/kafka-server-start.sh -daemon config/server.properties
cd kafka03
bin/kafka-server-start.sh -daemon config/server.properties
cd kafka04
bin/kafka-server-start.sh -daemon config/server.properties
#关闭
bin/kafka-server-stop.sh
#显示有那些topic bin/kafka-topics.sh --list --bootstrap-server localhost:9094 #删除topic bin/kafka-topics.sh --delete --bootstrap-server localhost:9094 --topic test #创建topic bin/kafka-topics.sh --create --bootstrap-server localhost:9094 --replication-factor 1 --partitions 1 --topic test # replication-factor 备份 # partitions 分区最好和主题吧数量一致 bin/kafka-topics.sh --create --bootstrap-server localhost:9094 --replication-factor 1 --partitions 3 --topic city # 日志目录查看主题会发现city在每个目录都有,但是city只有一个目录下有 ls /tmp/kafka-logs
# 发送消息
bin/kafka-console-producer.sh --bootstrap-server localhost:9094 --topic -test
# 接收消息(接收启动之后发送的数据)
bin/kafka-console-consumer.sh --bootstrap-server localhost:9094 --topic -test
# 接收所有消息
bin/kafka-console-consumer.sh --bootstrap-server localhost:9094 --topic -test --from-beginning
<!-- kafka依赖 -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.13</artifactId>
<version>2.7.0</version>
</dependency>
import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import java.util.Properties; import java.util.concurrent.ExecutionException; public class OneProducer { // 第一个泛型:当前生产者所生产消息的key // 第二个泛型:当前生产者所生产的消息本身 private KafkaProducer<Integer, String> producer; public OneProducer() { Properties properties = new Properties(); // 指定kafka集群 // properties.put("bootstrap.servers", "localhost:9092,localhost:9093,localhost:9094"); properties.put("bootstrap.servers", "localhost:9094"); // 指定key与value的序列化器 properties.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer"); properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); this.producer = new KafkaProducer<Integer, String>(properties); } public void sendMsg() throws ExecutionException, InterruptedException { // 创建消息记录(包含主题、消息本身) (String topic, V value) //ProducerRecord<Integer, String> record = new ProducerRecord<>("city", "tianjin"); // 创建消息记录(包含主题、key、消息本身) (String topic, K key, V value) // ProducerRecord<Integer, String> record = new ProducerRecord<>("cities", 1, "tianjin"); // 创建消息记录(包含主题、partition、key、消息本身) (String topic, Integer partition, K key, V value) ProducerRecord<Integer, String> record = new ProducerRecord<>("city", 0, 1, "tianjin"); producer.send(record, new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if (exception != null) { exception.printStackTrace(); } else { System.out.println("The offset of the record we just sent is: " + metadata.offset()); } } }); // producer.send(record); } } public class OneProducerTest { public static void main(String[] args) throws IOException, ExecutionException, InterruptedException { OneProducer producer = new OneProducer(); producer.sendMsg(); System.in.read(); } }
import kafka.utils.ShutdownableThread; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.Collections; import java.util.Properties; public class SomeConsumer extends ShutdownableThread { private KafkaConsumer<Integer, String> consumer; public SomeConsumer() { // 两个参数: // 1)指定当前消费者名称 // 2)指定消费过程是否会被中断 super("KafkaConsumerTest", false); Properties properties = new Properties(); String brokers = "localhost:9092,localhost:9093,localhost:9094"; // 指定kafka集群 properties.put("bootstrap.servers", brokers); // 指定消费者组ID properties.put("group.id", "cityGroup1"); // 开启自动提交,默认为true properties.put("enable.auto.commit", "true"); // 指定自动提交的超时时限,默认5s properties.put("auto.commit.interval.ms", "1000"); // 指定消费者被broker认定为挂掉的时限。若broker在此时间内未收到当前消费者发送的心跳,则broker // 认为消费者已经挂掉。默认为10s properties.put("session.timeout.ms", "30000"); // 指定两次心跳的时间间隔,默认为3s,一般不要超过session.timeout.ms的 1/3 properties.put("heartbeat.interval.ms", "10000"); // 当kafka中没有指定offset初值时,或指定的offset不存在时,从这里读取offset的值。其取值的意义为: // earliest:指定offset为第一条offset // latest: 指定offset为最后一条offset properties.put("auto.offset.reset", "earliest"); // 指定key与value的反序列化器 properties.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer"); properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); this.consumer = new KafkaConsumer<Integer, String>(properties); } @Override public void doWork() { // 订阅消费主题 consumer.subscribe(Collections.singletonList("city")); // 从broker摘取消费。参数表示,若buffer中没有消费,消费者等待消费的时间。 // 0,表示没有消息什么也不返回 // >0,表示当时间到后仍没有消息,则返回空 ConsumerRecords<Integer, String> records = consumer.poll(1000); for(ConsumerRecord record : records) { System.out.println("topic = " + record.topic()); System.out.println("partition = " + record.partition()); System.out.println("key = " + record.key()); System.out.println("value = " + record.value()); } } } public class ConsumerTest { public static void main(String[] args) { SomeConsumer consumer = new SomeConsumer(); consumer.start(); } }
同步提交可能会导致重复消费:
1.强行kill线程,导致消费后的数据,offset没有提交(消费系统宕机、重启等)
2.设置offset为自动提交,关闭kafka时,如果在close之前,调用 consumer.unsubscribe() 则有可能部分offset没提交,下次重启会重复消费。
所以我们改变提交方式为手动提交:
// 开启手动提交
properties.put("enable.auto.commit", "false");
3.(重复消费最常见的原因):消费后的数据,当offset还没有提交时,partition就断开连接。比如,通常会遇到消费的数据,处理很耗时,导致超过了Kafka的session timeout时间(0.10.x版本默认是30秒),那么就会re-blance重平衡,此时有一定几率offset没提交,会导致重平衡后重复消费。
4.当消费者重新分配partition的时候,可能出现从头开始消费的情况,导致重发问题。
5.当消费者消费的速度很慢的时候,可能在一个session周期内还未完成,导致心跳机制检测报告出问题。
6.并发很大,可能在规定的时间(session.time.out默认30s)内没有消费完,就会可能导致reblance重平衡,导致一部分offset自动提交失败,然后重平衡后重复消费
kafka可能重复消费的原因:https://my.oschina.net/wangzhiwubigdata/blog/4392190
import kafka.utils.ShutdownableThread; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.Collections; import java.util.Properties; public class SyncManualConsumer extends ShutdownableThread { private KafkaConsumer<Integer, String> consumer; public SyncManualConsumer() { // 两个参数: // 1)指定当前消费者名称 // 2)指定消费过程是否会被中断 super("KafkaConsumerTest", false); Properties properties = new Properties(); String brokers = "kafkaOS1:9092,kafkaOS2:9092,kafkaOS3:9092"; // 指定kafka集群 properties.put("bootstrap.servers", brokers); // 指定消费者组ID properties.put("group.id", "cityGroup1"); // 开启手动提交 properties.put("enable.auto.commit", "false"); // 指定自动提交的超时时限,默认5s // properties.put("auto.commit.interval.ms", "1000"); // 指定一次提交10个offset properties.put("max.poll.records", 10); // 指定消费者被broker认定为挂掉的时限。若broker在此时间内未收到当前消费者发送的心跳,则broker // 认为消费者已经挂掉。默认为10s properties.put("session.timeout.ms", "30000"); // 指定两次心跳的时间间隔,默认为3s,一般不要超过session.timeout.ms的 1/3 properties.put("heartbeat.interval.ms", "10000"); // 当kafka中没有指定offset初值时,或指定的offset不存在时,从这里读取offset的值。其取值的意义为: // earliest:指定offset为第一条offset // latest: 指定offset为最后一条offset properties.put("auto.offset.reset", "earliest"); // 指定key与value的反序列化器 properties.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer"); properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); this.consumer = new KafkaConsumer<Integer, String>(properties); } @Override public void doWork() { // 订阅消费主题 consumer.subscribe(Collections.singletonList("cities")); // 从broker摘取消费。参数表示,若buffer中没有消费,消费者等待消费的时间。 // 0,表示没有消息什么也不返回 // >0,表示当时间到后仍没有消息,则返回空 ConsumerRecords<Integer, String> records = consumer.poll(1000); for(ConsumerRecord record : records) { System.out.println("topic = " + record.topic()); System.out.println("partition = " + record.partition()); System.out.println("key = " + record.key()); System.out.println("value = " + record.value()); // 手动同步提交 consumer.commitSync(); } } }
同步提交方式是,消费者向 broker 提交 offset 后等待 broker 成功响应。若没有收到响
应,则会重新提交,直到获取到响应。而在这个等待过程中,消费者是阻塞的。其严重影响
了消费者的吞吐量。
异步提交方式是,消费者向 broker 提交 offset 后不用等待成功响应,所以其增加了消费者的吞
吐量。
@Override public void doWork() { // 订阅消费主题 consumer.subscribe(Collections.singletonList("cities")); // 从broker摘取消费。参数表示,若buffer中没有消费,消费者等待消费的时间。 // 0,表示没有消息什么也不返回 // >0,表示当时间到后仍没有消息,则返回空 ConsumerRecords<Integer, String> records = consumer.poll(1000); for(ConsumerRecord record : records) { System.out.println("topic = " + record.topic()); System.out.println("partition = " + record.partition()); System.out.println("key = " + record.key()); System.out.println("value = " + record.value()); // 手动异步提交 // consumer.commitAsync(); consumer.commitAsync((offsets, ex) -> { if(ex != null) { System.out.print("提交失败,offsets = " + offsets); System.out.println(", exception = " + ex); } }); } }
同异步提交,即同步提交与异步提交组合使用。一般情况下,若偶尔出现提交失败,其
也不会影响消费者的消费。因为后续提交最终会将这次提交失败的 offset 给提交了。
但异步提交会产生重复消费,为了防止重复消费,可以将同步提交与异常提交联合使用。
@Override public void doWork() { // 订阅消费主题 consumer.subscribe(Collections.singletonList("cities")); // 从broker摘取消费。参数表示,若buffer中没有消费,消费者等待消费的时间。 // 0,表示没有消息什么也不返回 // >0,表示当时间到后仍没有消息,则返回空 ConsumerRecords<Integer, String> records = consumer.poll(1000); for(ConsumerRecord record : records) { System.out.println("topic = " + record.topic()); System.out.println("partition = " + record.partition()); System.out.println("key = " + record.key()); System.out.println("value = " + record.value()); consumer.commitAsync((offsets, ex) -> { if(ex != null) { System.out.print("提交失败,offsets = " + offsets); System.out.println(", exception = " + ex); // 同步提交 consumer.commitSync(); } }); } }
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency>
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class SomeConsumer {
@KafkaListener(topics = "${kafka.topic}")
public void onMsg(String message) {
System.out.println("Kafka消费者接受到消息 " + message);
}
}
@RestController public class SomeProducer { @Autowired private KafkaTemplate<String, String> template; // 从配置文件读取自定义属性 @Value("${kafka.topic}") private String topic; // 由于是提交数据,所以使用Post方式 @PostMapping("/msg/send") public String sendMsg(@RequestParam("message") String message) { template.send(topic, message); return "send success"; } }
spring-boot-autoconfigure-2.1.3.RELEASE.jar!/META-INF/spring.factories
org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration
@Configuration
@ConditionalOnClass(KafkaTemplate.class)
@EnableConfigurationProperties(KafkaProperties.class)
@Import({ KafkaAnnotationDrivenConfiguration.class,
KafkaStreamsAnnotationDrivenConfiguration.class })
public class KafkaAutoConfiguration {
...
}
org.springframework.boot.autoconfigure.kafka.KafkaProperties
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。