Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。 对于像Hadoop的一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群机来提供实时的消费。








  1. #记录节点的信息已经Kafka节点和Consumer的一些值
  2. dataDir=/data/zookeeper
  3. # the port at which the clients will connect
  4. #占用到的端口
  5. clientPort=2181
  6. # disable the per-ip limit on the number of connections since this is a non-production config
  7. maxClientCnxns=0
  8. #心跳最长时间确定节点的存活
  9. tickTime=2000
  10. syncLimit=2
  11. #初始尝试最多链接5次
  12. initLimit=5
  13. #集群配置
  14. server.1=
  15. server.2=
  16. server.3=


  1. broker.id=0 #broker的唯一标识,集群中也不能一样
  2. port=9092 #broker用到的端口
  3. num.network.threads=2
  4. num.io.threads=8 #开启多少线程同时在执行IO数据持久化
  5. socket.send.buffer.bytes=1048576 #数据接受的缓冲大小
  6. socket.receive.buffer.bytes=1048576 #消费时发出去的缓冲大小
  7. socket.request.max.bytes=104857600 #一个请求会接受的最大数据量
  8. log.dirs=/tmp/kafka-logs #日志的打印目录
  9. num.partitions=2 #Topic默认的分区数
  10. log.retention.hours=168
  11. log.segment.bytes=536870912
  12. log.retention.check.interval.ms=60000
  13. log.cleaner.enable=false
  14. zookeeper.connect=localhost:2181,, #zookeeper集群配置
  15. zookeeper.connection.timeout.ms=1000000
  16. host.name= #hostname
  17. delete.topic.enable=true #配置这个参数删除Topic的时候同时也会删除数据


./bin/zookeeper-server-start.sh ./conf/zookeeper.properties


./bin/kafka-server-start.sh ./conf/server.properties


./bin/kafka-topics.sh --create --zookeeper Master:2181 --replication-factor 2 --partitions 2 --topic test-topic


./bin/kafka-topics.sh --list --zookeeper
./bin/kafka-topics.sh --describe --zookeeper Master:2181--topic test-topic  #查看某一个分区的具体情况



  1. package org.apache.flume.plugins;
  2. /**
  3. * KAFKA Flume Sink (Kafka 0.8 Beta, Flume 1.5).
  4. * User:
  5. * Date: 2016/03/28
  6. * Time: PM 4:32
  7. */
  8. import java.util.Properties;
  9. import kafka.javaapi.producer.Producer;
  10. import kafka.producer.KeyedMessage;
  11. import kafka.producer.ProducerConfig;
  12. import org.apache.commons.lang.StringUtils;
  13. import org.apache.flume.*;
  14. import org.apache.flume.conf.Configurable;
  15. import org.apache.flume.event.EventHelper;
  16. import org.apache.flume.sink.AbstractSink;
  17. import org.slf4j.Logger;
  18. import org.slf4j.LoggerFactory;
  19. import com.google.common.base.Preconditions;
  20. import com.google.common.collect.ImmutableMap;
  21. /**
  22. * kafka sink.
  23. */
  24. public class KafkaSink extends AbstractSink implements Configurable {
  25. /**
  26. * 日志记录
  27. */
  28. private static final Logger LOGGER = LoggerFactory.getLogger(KafkaSink.class);
  29. /**
  30. * 参数
  31. */
  32. private Properties parameters;
  33. /**
  34. * 生产者
  35. */
  36. private Producer<String, String> producer;
  37. /**
  38. * The Context.上下文
  39. */
  40. private Context context;
  41. /**
  42. * Configure void. 参数设置
  43. *
  44. * @param context
  45. * the context
  46. */
  47. @Override
  48. public void configure(Context context) {
  49. this.context = context;
  50. ImmutableMap<String, String> props = context.getParameters();
  51. parameters = new Properties();
  52. for (String key : props.keySet()) {
  53. String value = props.get(key);
  54. this.parameters.put(key, value);
  55. }
  56. }
  57. /**
  58. * Start void.
  59. */
  60. @Override
  61. public synchronized void start() {
  62. super.start();
  63. ProducerConfig config = new ProducerConfig(this.parameters);
  64. this.producer = new Producer<String, String>(config);
  65. }
  66. /**
  67. * Process status.
  68. *
  69. * @return the status
  70. * @throws EventDeliveryException
  71. * the event delivery exception
  72. */
  73. @Override
  74. public Status process() throws EventDeliveryException {
  75. Status status = null;
  76. // Start transaction
  77. Channel ch = getChannel();
  78. Transaction txn = ch.getTransaction();
  79. txn.begin();
  80. try {
  81. // This try clause includes whatever Channel operations you want to do
  82. Event event = ch.take();
  83. String partitionKey = (String) parameters.get(KafkaFlumeConstans.PARTITION_KEY_NAME);
  84. String encoding = StringUtils.defaultIfEmpty(
  85. (String) this.parameters.get(KafkaFlumeConstans.ENCODING_KEY_NAME),
  86. KafkaFlumeConstans.DEFAULT_ENCODING);
  87. String topic = Preconditions.checkNotNull(
  88. (String) this.parameters.get(KafkaFlumeConstans.CUSTOME_TOPIC_KEY_NAME),
  89. "custom.topic.name is required");
  90. String eventData = new String(event.getBody(), encoding);
  91. KeyedMessage<String, String> data;
  92. // if partition key does'nt exist
  93. if (StringUtils.isEmpty(partitionKey)) {
  94. data = new KeyedMessage<String, String>(topic, eventData);
  95. } else {
  96. data = new KeyedMessage<String, String>(topic, partitionKey, eventData);
  97. }
  98. if (LOGGER.isInfoEnabled()) {
  99. LOGGER.info("Send Message to Kafka : [" + eventData + "] -- [" + EventHelper.dumpEvent(event) + "]");
  100. }
  101. producer.send(data);
  102. txn.commit();
  103. status = Status.READY;
  104. } catch (Throwable t) {
  105. txn.rollback();
  106. status = Status.BACKOFF;
  107. // re-throw all Errors
  108. if (t instanceof Error) {
  109. throw (Error) t;
  110. }
  111. } finally {
  112. txn.close();
  113. }
  114. return status;
  115. }
  116. /**
  117. * Stop void.
  118. */
  119. @Override
  120. public void stop() {
  121. producer.close();
  122. }
  123. }


  1. @Override
  2. public int partition(Object key, int numPartitions) {
  3. try {
  4. int partitionNum = Integer.parseInt((String) key);
  5. return Math.abs(Integer.parseInt((String) key) % numPartitions);
  6. } catch (Exception e) {
  7. return Math.abs(key.hashCode() % numPartitions);
  8. }
  9. }



  1. package com.test.kafka;
  2. /**
  3. * Created by root on 16-3-13.
  4. */
  5. public class KafkaProperties {
  6. public final static String zkConnect=",,";
  7. public final static String groupId="group1";
  8. public final static String groupId2="group2";
  9. public final static String topic="kafkaToptic";
  10. public final static int kafkaProduceBufferSize=64*1024;
  11. public final static int connectionTimeOut=20000;
  12. public final static int reconnectInterval=10000;
  13. public final static String topic2="topic2";
  14. public final static String topic3="topic3";
  15. }

  1. package com.test.kafka;
  2. import kafka.consumer.ConsumerConfig;
  3. import kafka.consumer.ConsumerIterator;
  4. import kafka.consumer.KafkaStream;
  5. import kafka.javaapi.consumer.ConsumerConnector;
  6. import java.util.HashMap;
  7. import java.util.List;
  8. import java.util.Map;
  9. import java.util.Properties;
  10. /**
  11. * Created by root on 16-3-14.
  12. */
  13. public class KafkaConsumer2 extends Thread {
  14. private final String topic;
  15. private final ConsumerConnector consumer;
  16. public KafkaConsumer2(String topic){
  17. consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig());
  18. this.topic = topic;
  19. }
  20. private static ConsumerConfig createConsumerConfig(){
  21. Properties props = new Properties();
  22. props.put("zookeeper.connect",KafkaProperties.zkConnect);
  23. props.put("group.id",KafkaProperties.groupId2);
  24. props.put("zookeeper.session.timeout.ms","40000");
  25. props.put("zookeeper.sync.time.ms","200");
  26. props.put("auto.commit.interval.ms","1000");
  27. return new ConsumerConfig(props);
  28. }
  29. @Override
  30. public void run() {
  31. Map<String,Integer> topicCountMap = new HashMap<String,Integer>();
  32. topicCountMap.put(topic, new Integer(1));
  33. Map<String,List<KafkaStream<byte[],byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
  34. KafkaStream<byte[],byte[]> stream = consumerMap.get(topic).get(0);
  35. ConsumerIterator<byte[],byte[]> it = stream.iterator();
  36. while(it.hasNext()){
  37. System.out.println("receiced:"+new String(it.next().message()));
  38. try {
  39. sleep(3000);
  40. }catch(InterruptedException e){
  41. e.printStackTrace();
  42. }
  43. }
  44. }
  45. }





  1. Flume配置
  2. client2.sources = source1
  3. client2.sinks = sink1 sink2
  4. client2.channels = channel1
  5. #Define a sink group which for Load balancing
  6. client1.sinkgroups = g1
  7. client1.sinkgroups.g1.sinks = sink1 sink2
  8. client1.sinkgroups.g1.processor.type = load_balance
  9. client1.sinkgroups.g1.processor.backoff = true
  10. client1.sinkgroups.g1.processor.selector = round_robin
  11. #Describe source
  12. client2.sources.source1.type = avro
  13. client2.sources.source1.channels = channel1
  14. client2.sources.source1.port = 6666
  15. client2.sources.source1.bind =
  16. #Describe sink1
  17. #client2.sinks.sink1.type = logger
  18. client2.sinks.sink1.type = org.apache.flume.plugins.KafkaSink
  19. client2.sinks.sink1.metadata.broker.list=,,
  20. client2.sinks.sink1.zk.connect=,,
  21. client2.sinks.sink1.partition.key=0
  22. client2.sinks.sink1.partitioner.class=org.apache.flume.plugins.SinglePartition
  23. client2.sinks.sink1.serializer.class=kafka.serializer.StringEncoder
  24. client2.sinks.sink1.request.required.acks=1
  25. client2.sinks.sink1.max.message.size=1000000
  26. client2.sinks.sink1.producer.type=async
  27. client2.sinks.sink1.custom.encoding=UTF-8
  28. client2.sinks.sink1.custom.topic.name=kafkaToptic
  29. #Describe sink2
  30. #client2.sinks.sink2.type = logger
  31. client2.sinks.sink2.type = org.apache.flume.plugins.KafkaSink
  32. client2.sinks.sink2.metadata.broker.list=,,
  33. client2.sinks.sink2.zk.connect=,,
  34. client2.sinks.sink2.partition.key=0
  35. client2.sinks.sink2.partitioner.class=org.apache.flume.plugins.SinglePartition
  36. client2.sinks.sink2.serializer.class=kafka.serializer.StringEncoder
  37. client2.sinks.sink2.request.required.acks=1
  38. client2.sinks.sink2.max.message.size=1000000
  39. client2.sinks.sink2.producer.type=async
  40. client2.sinks.sink2.custom.encoding=UTF-8
  41. client2.sinks.sink2.custom.topic.name=kafkaToptic
  42. #Describe channel
  43. client2.channels.channel1.type = memory
  44. client2.channels.channel1.capacity = 10000
  45. client2.channels.channel1.transactionCapacity = 1000
  46. #bind the source/sink with channel
  47. client2.sinks.sink1.channel = channel1
  48. client2.sources.source1.channels = channel1


