赞
踩
Kafka的运行依赖于Zookeeper,所以在运行Kafka之前我们需要安装并运行Zookeeper 。Kafka 通过 Zookeeper 管理集群配置,选举 leader,以及在 Consumer Group 发生变化时进行 rebalance。
安装的过程就不说了,类似教程很多。
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
顺序如下:
Producer:
@Configuration @EnableKafka public class KafkaProducerConfig { @Value("${spring.kafka.bootstrap-servers}") private String bootstrapServers; @Value("${spring.kafka.producer.key-serializer}") private String keySerializer; @Value("${spring.kafka.producer.value-serializer}") private String valueSerializer; @Bean public Map<String, Object> producerConfig() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer); return props; } @Bean public ProducerFactory<String, String> producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfig()); } @Bean public KafkaTemplate<String, String> kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } }
Consumer:
@Configuration @EnableKafka public class KafkaConsumerConfig { @Value("${spring.kafka.bootstrap-servers}") private String bootstrapServer; @Value("${spring.kafka.consumer.key-deserializer}") private String keySerializer; @Value("${spring.kafka.consumer.value-deserializer}") private String valueSerializer; @Value("${spring.kafka.consumer.group-id}") private String groupId; @Value("${spring.kafka.consumer.auto-offset-reset}") private String autoOffsetReset; @Bean public Map<String, Object> consumerConfig() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keySerializer); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueSerializer); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); return props; } @Bean public ConsumerFactory<String, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfig()); } @Bean public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); return factory; } }
@Component
@Slf4j
public class TestConsumer {
@KafkaListener(id = "demo", topics = "topic.demo")
public void listen(ConsumerRecord<?, ?> record) {
log.info("topic = {}, offset = {}, value = {}", record.topic(), record.offset(), record.value());
}
}
启动项目(记住,要先启动zookeeper和kafka),可以看到输出,Consumer的配置信息,每个消费者都会输出该配置信息:
[org.apache.kafka.common.config.AbstractConfig.logAll:180] [INFO] ConsumerConfig values: auto.commit.interval.ms = 5000 auto.offset.reset = earliest bootstrap.servers = [127.0.0.1:9092] check.crcs = true client.id = consumer-1 connections.max.idle.ms = 540000 enable.auto.commit = true exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 500 fetch.min.bytes = 1 group.id = test heartbeat.interval.ms = 3000 interceptor.classes = null key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer max.partition.fetch.bytes = 1048576 max.poll.interval.ms = 300000 max.poll.records = 500 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.sample.window.ms = 30000 partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor] receive.buffer.bytes = 65536 reconnect.backoff.ms = 50 request.timeout.ms = 305000 retry.backoff.ms = 100 sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT send.buffer.bytes = 131072 session.timeout.ms = 10000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
已经Kafka成功启动的信息:
[org.apache.kafka.clients.consumer.internals.AbstractCoordinator$GroupCoordinatorResponseHandler.onSuccess:555] [INFO] Discovered coordinator XU:9092 (id: 2147483647 rack: null) for group test.
[org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinPrepare:333] [INFO] Revoking previously assigned partitions [] for group test
[org.springframework.kafka.listener.AbstractMessageListenerContainer$2.onPartitionsRevoked:242] [INFO] partitions revoked:[]
[org.apache.kafka.clients.consumer.internals.AbstractCoordinator.sendJoinGroupRequest:381] [INFO] (Re-)joining group test
[org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess:349] [INFO] Successfully joined group test with generation 5
[org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete:225] [INFO] Setting newly assigned partitions [topic.demo-0] for group test
[org.springframework.kafka.listener.AbstractMessageListenerContainer$2.onPartitionsAssigned:247] [INFO] partitions assigned:[topic.demo-0]
@RunWith(SpringRunner.class)
@SpringBootTest
public class TestKafka {
@Autowired
private KafkaTemplate kafkaTemplate;
@Test
public void testDemo() throws InterruptedException {
kafkaTemplate.send("topic.demo", "this is my first demo");
Thread.sleep(5000);
}
}
KafkaTemplate在发送的时候就已经帮我们完成了创建的操作,所以我们不需要主动创建Topic,而是交由KafkaTemplate去完成。但这样也出现了问题,这种情况创建出来的Topic的Partition(分区)数永远只有1个,也不会有副本。
[INFO] topic = topic.demo, offset = 2, value = this is my first demo
Kafka Tools是一款Kafka的可视化客户端工具,可以非常方便的查看Topic的队列信息以及消费者信息以及kafka节点信息。
安装后打开可以看到如下界面:
添加Cluster:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。