当前位置:   article > 正文

kafka接入学习_skywalking8.9接入kafka

skywalking8.9接入kafka

什么是kafka

Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。 对于像Hadoop的一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群机来提供实时的消费。

kafka有哪些优点?

1.它是一种高容错的消息发布订阅系统(因为它的数据是持久化到硬盘,可以重复读取)

2.它是高效率,有很大的吞吐量

3.它的扩展性极高,是用zookeeper集群来管理Consumer和Broker节点的动态加入与退出

4.支持实时流的方式传输数据


怎么样使用kafka?

1.首先要配置一下zookeeper的集群,这里是配置一下Zookeeper.properties

  1. #记录节点的信息已经Kafka节点和Consumer的一些值
  2. dataDir=/data/zookeeper
  3. # the port at which the clients will connect
  4. #占用到的端口
  5. clientPort=2181
  6. # disable the per-ip limit on the number of connections since this is a non-production config
  7. maxClientCnxns=0
  8. #心跳最长时间确定节点的存活
  9. tickTime=2000
  10. syncLimit=2
  11. #初始尝试最多链接5次
  12. initLimit=5
  13. #集群配置
  14. server.1=192.168.196.138:2888:3888
  15. server.2=192.168.196.140:2888:3888
  16. server.3=192.168.196.139:2888:3888

2.然后在配置kafka的broker的节点,这边是配置Conf里面的Server.properties文件

  1. broker.id=0 #broker的唯一标识,集群中也不能一样
  2. port=9092 #broker用到的端口
  3. num.network.threads=2
  4. num.io.threads=8 #开启多少线程同时在执行IO数据持久化
  5. socket.send.buffer.bytes=1048576 #数据接受的缓冲大小
  6. socket.receive.buffer.bytes=1048576 #消费时发出去的缓冲大小
  7. socket.request.max.bytes=104857600 #一个请求会接受的最大数据量
  8. log.dirs=/tmp/kafka-logs #日志的打印目录
  9. num.partitions=2 #Topic默认的分区数
  10. log.retention.hours=168
  11. log.segment.bytes=536870912
  12. log.retention.check.interval.ms=60000
  13. log.cleaner.enable=false
  14. zookeeper.connect=localhost:2181,192.168.196.140:2181,192.168.196.139:2181 #zookeeper集群配置
  15. zookeeper.connection.timeout.ms=1000000
  16. host.name=192.168.196.138 #hostname
  17. delete.topic.enable=true #配置这个参数删除Topic的时候同时也会删除数据

3.启动zookeeper集群

./bin/zookeeper-server-start.sh ./conf/zookeeper.properties

4.启动kafk的Broker节点

./bin/kafka-server-start.sh ./conf/server.properties

5.创建Topic

./bin/kafka-topics.sh --create --zookeeper Master:2181 --replication-factor 2 --partitions 2 --topic test-topic

6.可以查看以下kafka集群中存在的Topic

./bin/kafka-topics.sh --list --zookeeper 192.168.196.138:2181
./bin/kafka-topics.sh --describe --zookeeper Master:2181--topic test-topic  #查看某一个分区的具体情况

7.Flume数据源传输数据到Kafka

KafkaSink的代码:这边还需要把要用到的包引入到Flume底下的lib文件夹,具体的包可以在我的百度云盘下载:点击打开链接这里面解压后又一个lib里面的所有包都拷贝进去。

  1. package org.apache.flume.plugins;
  2. /**
  3. * KAFKA Flume Sink (Kafka 0.8 Beta, Flume 1.5).
  4. * User:
  5. * Date: 2016/03/28
  6. * Time: PM 4:32
  7. */
  8. import java.util.Properties;
  9. import kafka.javaapi.producer.Producer;
  10. import kafka.producer.KeyedMessage;
  11. import kafka.producer.ProducerConfig;
  12. import org.apache.commons.lang.StringUtils;
  13. import org.apache.flume.*;
  14. import org.apache.flume.conf.Configurable;
  15. import org.apache.flume.event.EventHelper;
  16. import org.apache.flume.sink.AbstractSink;
  17. import org.slf4j.Logger;
  18. import org.slf4j.LoggerFactory;
  19. import com.google.common.base.Preconditions;
  20. import com.google.common.collect.ImmutableMap;
  21. /**
  22. * kafka sink.
  23. */
  24. public class KafkaSink extends AbstractSink implements Configurable {
  25. /**
  26. * 日志记录
  27. */
  28. private static final Logger LOGGER = LoggerFactory.getLogger(KafkaSink.class);
  29. /**
  30. * 参数
  31. */
  32. private Properties parameters;
  33. /**
  34. * 生产者
  35. */
  36. private Producer<String, String> producer;
  37. /**
  38. * The Context.上下文
  39. */
  40. private Context context;
  41. /**
  42. * Configure void. 参数设置
  43. *
  44. * @param context
  45. * the context
  46. */
  47. @Override
  48. public void configure(Context context) {
  49. this.context = context;
  50. ImmutableMap<String, String> props = context.getParameters();
  51. parameters = new Properties();
  52. for (String key : props.keySet()) {
  53. String value = props.get(key);
  54. this.parameters.put(key, value);
  55. }
  56. }
  57. /**
  58. * Start void.
  59. */
  60. @Override
  61. public synchronized void start() {
  62. super.start();
  63. ProducerConfig config = new ProducerConfig(this.parameters);
  64. this.producer = new Producer<String, String>(config);
  65. }
  66. /**
  67. * Process status.
  68. *
  69. * @return the status
  70. * @throws EventDeliveryException
  71. * the event delivery exception
  72. */
  73. @Override
  74. public Status process() throws EventDeliveryException {
  75. Status status = null;
  76. // Start transaction
  77. Channel ch = getChannel();
  78. Transaction txn = ch.getTransaction();
  79. txn.begin();
  80. try {
  81. // This try clause includes whatever Channel operations you want to do
  82. Event event = ch.take();
  83. String partitionKey = (String) parameters.get(KafkaFlumeConstans.PARTITION_KEY_NAME);
  84. String encoding = StringUtils.defaultIfEmpty(
  85. (String) this.parameters.get(KafkaFlumeConstans.ENCODING_KEY_NAME),
  86. KafkaFlumeConstans.DEFAULT_ENCODING);
  87. String topic = Preconditions.checkNotNull(
  88. (String) this.parameters.get(KafkaFlumeConstans.CUSTOME_TOPIC_KEY_NAME),
  89. "custom.topic.name is required");
  90. String eventData = new String(event.getBody(), encoding);
  91. KeyedMessage<String, String> data;
  92. // if partition key does'nt exist
  93. if (StringUtils.isEmpty(partitionKey)) {
  94. data = new KeyedMessage<String, String>(topic, eventData);
  95. } else {
  96. data = new KeyedMessage<String, String>(topic, partitionKey, eventData);
  97. }
  98. if (LOGGER.isInfoEnabled()) {
  99. LOGGER.info("Send Message to Kafka : [" + eventData + "] -- [" + EventHelper.dumpEvent(event) + "]");
  100. }
  101. producer.send(data);
  102. txn.commit();
  103. status = Status.READY;
  104. } catch (Throwable t) {
  105. txn.rollback();
  106. status = Status.BACKOFF;
  107. // re-throw all Errors
  108. if (t instanceof Error) {
  109. throw (Error) t;
  110. }
  111. } finally {
  112. txn.close();
  113. }
  114. return status;
  115. }
  116. /**
  117. * Stop void.
  118. */
  119. @Override
  120. public void stop() {
  121. producer.close();
  122. }
  123. }


还要写一个分区策略,简单的分区策略SinglePartition

  1. @Override
  2. public int partition(Object key, int numPartitions) {
  3. try {
  4. int partitionNum = Integer.parseInt((String) key);
  5. return Math.abs(Integer.parseInt((String) key) % numPartitions);
  6. } catch (Exception e) {
  7. return Math.abs(key.hashCode() % numPartitions);
  8. }
  9. }


8.写kafka消费着去消费集群中所在Topic的数据.

1.Consumer所需要用到的静态配置数据

  1. package com.test.kafka;
  2. /**
  3. * Created by root on 16-3-13.
  4. */
  5. public class KafkaProperties {
  6. public final static String zkConnect="192.168.196.138:2181,192.168.196.139:2181,192.168.196.140:2181";
  7. public final static String groupId="group1";
  8. public final static String groupId2="group2";
  9. public final static String topic="kafkaToptic";
  10. public final static int kafkaProduceBufferSize=64*1024;
  11. public final static int connectionTimeOut=20000;
  12. public final static int reconnectInterval=10000;
  13. public final static String topic2="topic2";
  14. public final static String topic3="topic3";
  15. }
2.消费者的具体实现

  1. package com.test.kafka;
  2. import kafka.consumer.ConsumerConfig;
  3. import kafka.consumer.ConsumerIterator;
  4. import kafka.consumer.KafkaStream;
  5. import kafka.javaapi.consumer.ConsumerConnector;
  6. import java.util.HashMap;
  7. import java.util.List;
  8. import java.util.Map;
  9. import java.util.Properties;
  10. /**
  11. * Created by root on 16-3-14.
  12. */
  13. public class KafkaConsumer2 extends Thread {
  14. private final String topic;
  15. private final ConsumerConnector consumer;
  16. public KafkaConsumer2(String topic){
  17. consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig());
  18. this.topic = topic;
  19. }
  20. private static ConsumerConfig createConsumerConfig(){
  21. Properties props = new Properties();
  22. props.put("zookeeper.connect",KafkaProperties.zkConnect);
  23. props.put("group.id",KafkaProperties.groupId2);
  24. props.put("zookeeper.session.timeout.ms","40000");
  25. props.put("zookeeper.sync.time.ms","200");
  26. props.put("auto.commit.interval.ms","1000");
  27. return new ConsumerConfig(props);
  28. }
  29. @Override
  30. public void run() {
  31. Map<String,Integer> topicCountMap = new HashMap<String,Integer>();
  32. topicCountMap.put(topic, new Integer(1));
  33. Map<String,List<KafkaStream<byte[],byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
  34. KafkaStream<byte[],byte[]> stream = consumerMap.get(topic).get(0);
  35. ConsumerIterator<byte[],byte[]> it = stream.iterator();
  36. while(it.hasNext()){
  37. System.out.println("receiced:"+new String(it.next().message()));
  38. try {
  39. sleep(3000);
  40. }catch(InterruptedException e){
  41. e.printStackTrace();
  42. }
  43. }
  44. }
  45. }

这里还需要把用到的Kafka相关的包给引进来,直接用maven引进就可以,下次直接补充。

我的海量日志采集架构

我这边实现一个海量日志采集平台,主要是利用Flume采集服务平台的实时日志,然后对日志进行过滤复制处理,其中正则配置只选择JSON格式的日志,复制两份日志数据,一份存入HDFS,一份传入Kafka转为实时流。其中在HDFS和Kafka两条线都做了负载均衡处理。一张整体框架图还有一张详细的数据路径图






架构实现

该架构的其中一条线已经在Flume的那篇博客讲述过,这边就不在重复,这边只进行Kafka这条线进行实现分析。这里上文已经知道消费者怎么进行消费,所以只需要在Flume端配置联通这个数据传输通道就能将数据稳定传输,Flume客户端启动在Flume那篇博客已经写得很清楚了。这里也不做过多介绍。
接下去写Flume接入Kafka的那个MasterClient2.conf的配置如下:
  1. Flume配置
  2. client2.sources = source1
  3. client2.sinks = sink1 sink2
  4. client2.channels = channel1
  5. #Define a sink group which for Load balancing
  6. client1.sinkgroups = g1
  7. client1.sinkgroups.g1.sinks = sink1 sink2
  8. client1.sinkgroups.g1.processor.type = load_balance
  9. client1.sinkgroups.g1.processor.backoff = true
  10. client1.sinkgroups.g1.processor.selector = round_robin
  11. #Describe source
  12. client2.sources.source1.type = avro
  13. client2.sources.source1.channels = channel1
  14. client2.sources.source1.port = 6666
  15. client2.sources.source1.bind = 0.0.0.0
  16. #Describe sink1
  17. #client2.sinks.sink1.type = logger
  18. client2.sinks.sink1.type = org.apache.flume.plugins.KafkaSink
  19. client2.sinks.sink1.metadata.broker.list=192.168.196.138:9092,192.168.196.139:9092,192.168.196.140:9092
  20. client2.sinks.sink1.zk.connect=192.168.196.138:2181,192.168.196.139:2181,192.168.196.140:2181
  21. client2.sinks.sink1.partition.key=0
  22. client2.sinks.sink1.partitioner.class=org.apache.flume.plugins.SinglePartition
  23. client2.sinks.sink1.serializer.class=kafka.serializer.StringEncoder
  24. client2.sinks.sink1.request.required.acks=1
  25. client2.sinks.sink1.max.message.size=1000000
  26. client2.sinks.sink1.producer.type=async
  27. client2.sinks.sink1.custom.encoding=UTF-8
  28. client2.sinks.sink1.custom.topic.name=kafkaToptic
  29. #Describe sink2
  30. #client2.sinks.sink2.type = logger
  31. client2.sinks.sink2.type = org.apache.flume.plugins.KafkaSink
  32. client2.sinks.sink2.metadata.broker.list=192.168.196.138:9092,192.168.196.139:9092,192.168.196.140:9092
  33. client2.sinks.sink2.zk.connect=192.168.196.138:2181,192.168.196.139:2181,192.168.196.140:2181
  34. client2.sinks.sink2.partition.key=0
  35. client2.sinks.sink2.partitioner.class=org.apache.flume.plugins.SinglePartition
  36. client2.sinks.sink2.serializer.class=kafka.serializer.StringEncoder
  37. client2.sinks.sink2.request.required.acks=1
  38. client2.sinks.sink2.max.message.size=1000000
  39. client2.sinks.sink2.producer.type=async
  40. client2.sinks.sink2.custom.encoding=UTF-8
  41. client2.sinks.sink2.custom.topic.name=kafkaToptic
  42. #Describe channel
  43. client2.channels.channel1.type = memory
  44. client2.channels.channel1.capacity = 10000
  45. client2.channels.channel1.transactionCapacity = 1000
  46. #bind the source/sink with channel
  47. client2.sinks.sink1.channel = channel1
  48. client2.sources.source1.channels = channel1

然后启动三个Flume客户端后在开启日志模拟器,开启消费者程序就可以开始看到日志的传输,在2G内存,20G硬盘,1核处理器的集群单个节点上数据的传输延迟大概是0.8秒。


声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小小林熬夜学编程/article/detail/629285
推荐阅读
相关标签
  

闽ICP备14008679号