赞
踩
kafka是一个高吞吐量,分布式的消息队列系统,把数据从一个系统搬运到其它系统。同时它还提供一段时间内的数据存储,以及按索引或者时间戳索引的数据检索
流量削峰(请求徒增,后端无法承受)
多系统之间解耦(不同语言的系统不需要写客户端互相通信)
流式处理(大数据系统的数据输入和输出源)
日志接收和处理等(应用→kafka→logstash→es/hbase)
topic不保证数据的先后顺序,但是partition(分区)是保证数据写入的先后顺序,一个topic可以有一个分区,也可以有多个分区
consumer group是一组相同逻辑的机器/线程。他们订阅同一个topic时,会分别消费不同的partition,保证同一条消息在一个consumer group中不会被消费多次。
consumer group和partition的概念主要是为了增加消费端的并发消费能力,以提升消费速度。
在一个group中,一个consumer可以消费多个partition。但是一个partition只会被一个consumer消费。
一般而言可以认为consumer就是一个kafka client机器上的线程,一个consumer group可能包含n台有m个线程的kafka client机器。那么当这个group消费一个有n*m个partition的topic时,每个线程正好消费一个partition,性能最好。
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
通过本文可以实现对kafka消费端消息的统一拦截处理,以及相关的消息分发
public class GenericKafkaConsumerFactory<K, V> extends DefaultKafkaConsumerFactory<K, V> { public GenericKafkaConsumerFactory(Map<String, Object> configs, boolean broadcastGroup) throws NoSuchFieldException, IllegalAccessException { super(configs); //主要是为了一些特殊的业务场景准备,有部分业务场景同一个topic的消息需要被同一个业务组的应用消费 //比如异步通知其他pod同应用更新内存数据,此时就需要再分组后添加ip地址来生成一个唯一分组,保证可以消费到 if(broadcastGroup){ Object groupId = configs.get(ConsumerConfig.GROUP_ID_CONFIG); Assert.notNull(groupId, "group.id is empty"); groupId = groupId.toString() + "_" + NetUtils.getLocalHost(); //修改kafka分组 try { Class<DefaultKafkaConsumerFactory> defaultKafkaConsumerFactoryClass = DefaultKafkaConsumerFactory.class; Field kafkaConfigs = defaultKafkaConsumerFactoryClass.getDeclaredField("configs"); kafkaConfigs.setAccessible(true); Map<String, Object> updateConfigs = (Map<String, Object>) kafkaConfigs.get(this); updateConfigs.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); } catch (Exception e) { log.error("kafka消费端分组随机数设置失败",e); } } } }
public class GenericKafkaMessageListenerContainer implements InitializingBean { GenericKafkaMessageListenerContainer kafkaMessageListenerContainer = null; List<ContainerProperties> containerPropertiesList = new ArrayList<>(); GenericKafkaListener genericKafkaListener; protected List<KafkaConsumerConfig> consumerConfigs; DefaultKafkaConsumerFactory factory = null; @Override public void afterPropertiesSet() throws Exception { log.info("------- 初始化kafka监听容器"); for(KafkaConsumerConfig consumerConfig : consumerConfigs){ //设置kafka监听器监听的topic ContainerProperties containerProperties = new ContainerProperties(consumerConfig.getTopic()); KafkaMessageListenerContainer container = new KafkaMessageListenerContainer(factory, containerProperties); //将业务消费端配置加入到监听器中 genericKafkaListener.addConsumerConfig(consumerConfig); container.setupMessageListener(genericKafkaListener); container.start(); } log.info("------- 初始化kafka监听容器完成"); } }
public class KafkaConsumerConfig {
private String topic;
private String key; //messagekey 过滤暂不支持
private IMqKafkaBizConsumer consumer;
}
public interface IMqKafkaBizConsumer<T> {
/**
* 消息成功消费返回null,否则返回data
*
* @param data
* @return 返回true表示消费成功,false表示消费失败,但是抛弃这条消息不做处理。抛出异常会启动重投机制。
*/
boolean consumber(String topic, T data, int partition, long offset, String key) throws Exception;
}
监听实现,如果有监听的topic有消息,统一会回调onMessage方法,此时我们需要根据之前的topic和自定义consumer的映射,找到consumer将消息交由他处理
public class GenericKafkaListener<T> implements MessageListener<String, T> { protected Map<String, KafkaConsumerConfig> consumerConfigMap = new ConcurrentHashMap<>(); //加入KafkaConsumer配置 public void addConsumerConfig(KafkaConsumerConfig kafkaConsumerConfig){ consumerConfigMap.put(kafkaConsumerConfig.getTopic(), kafkaConsumerConfig); } public void onMessage(ConsumerRecord<String, T> record){ // 处理收到的消息 System.out.println("Received message "+ record.topic()+" : " + record.value()); String topic = record.topic(); KafkaConsumerConfig consumerConfig = consumerConfigMap.get(topic); IMqKafkaBizConsumer consumer = consumerConfig.getConsumer(); boolean t = false; try { //根据topic找到对应的业务consumer,将topic消息交由对应的consumer处理 t = consumer.consumber(record.topic(), record.value(), record.partition(), record.offset(), record.key()); if (!t) { log.warn("[topic:" + record.topic() + "],消费失败,但业务允许放弃:" + record.value().toString()); } } catch (Exception e) { //报错是否允许直接跳过 if (consumerConfig.isIgnore()) { return; } else { throw new RuntimeException( "-1,[topic:" + record.topic() + "],异常重试:" + record.value().toString() + "kafka_error1:", e); } } return; } }
<bean id="kafkaProducerFactory" class="org.springframework.kafka.core.DefaultKafkaProducerFactory"> <constructor-arg> <map> <entry key="bootstrap.servers" value="${kafka.bootstrap.servers}"/> <entry key="retries" value="5"/> <entry key="batch.size" value="16384"/> <entry key="linger.ms" value="2"/> <entry key="buffer.memory" value="33554432"/> <entry key="key.serializer" value="org.apache.kafka.common.serialization.StringSerializer"></entry> <entry key="value.serializer" value="org.apache.kafka.common.serialization.StringSerializer"></entry> <!-- <entry key="use.sasl" value="${kafka.use.sasl:true}"/>--> <!-- <entry key="servers.sasl" value="${kafka.biz.servers.sasl}"/>--> </map> </constructor-arg> </bean> <bean id="kafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate"> <constructor-arg ref="kafkaProducerFactory"/> <constructor-arg name="autoFlush" value="true"/> </bean>
之后在代码中注入kafkaTemplate 就可以使用其调用
kafkaTemplate.send(topic, message);
<bean id="consumerFactory" class="com.nuwa.retransmission.infrastructure.kafka.GenericKafkaConsumerFactory" > <constructor-arg> <map> <entry key="bootstrap.servers" value="${kafka.bootstrap.servers}" /> <entry key="enable.auto.commit" value="false" /> <entry key="auto.commit.interval.ms" value="3000"/> <entry key="session.timeout.ms" value="60000"/> <entry key="request.timeout.ms" value="61000"/> <entry key="group.id" value="nuwa-retransmission"/> <entry key="key.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer" /> <entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer" /> </map> </constructor-arg> <constructor-arg name="broadcastGroup" value="true"></constructor-arg> </bean> <bean id="messageListenerContainer" class="com.nuwa.retransmission.infrastructure.kafka.GenericKafkaMessageListenerContainer"> <property name="factory" ref="consumerFactory"/> <property name="kafkaTemplate" ref="kafkaTemplate"/> //配置Kafka统一监听器 <property name="genericKafkaListener" ref="genericKafkaListener"/> //消费端配置信息 <property name="consumerConfigs"> <list> <bean class="com.basic.mq.domain.KafkaConsumerConfig"> //监听的topic <property name="topic" value="topic2"/> //监听topic回调的consumer <property name="consumer"> <bean id="eventConsumer" class="com.nuwa.retransmission.service.impl.AppAndApiAuthSyncConsumer"/> </property> </bean> </list> </property> </bean> <bean id="genericKafkaListener" class="com.nuwa.retransmission.infrastructure.kafka.GenericKafkaListener"/>
消费端代码
public class AppAndApiAuthSyncConsumer implements IMqKafkaBizConsumer<String> { private static final Logger LOGGER = LoggerFactory.getLogger(AppAndApiAuthSyncConsumer.class); @Override public boolean consumber(String topic, String kafkaMqData, int i, long l, String s1) { String oldName = Thread.currentThread().getName(); Thread.currentThread().setName("Thread_AppAndApiAuthSyncConsumer_" + UUID.randomUUID()); LOGGER.info("topic {}, receive {}", topic, data); return true; } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。