当前位置:   article > 正文

kafka 详解、安装、配置_kafka map 配置方法

kafka map 配置方法

kafka
--------------
    分布式流处理平台。
    在系统之间构建实时数据流管道。
    以topic分类对记录进行存储
    每个记录包含key-value+timestamp
    每秒钟百万消息吞吐量。
 
 
    producer            //消息生产者
    consumer            //消息消费者
    consumer group      //消费者组
    kafka server        //broker,kafka服务器
    topic               //主题,副本数,分区.
    zookeeper           //hadoop namenoade + RM HA | hbase | kafka
 
 
安装kafka
----------------
    0.选择s202 ~ s204三台主机安装kafka
    1.准备zk
        略
    2.jdk
        略
    3.tar文件
    4.环境变量
        略
    5.配置kafka
        [kafka/config/server.properties]
        ...
        broker.id=202
        ...
        listeners=PLAINTEXT://:9092
        ...
        log.dirs=/home/centos/kafka/logs
        ...
        zookeeper.connect=s201:2181,s202:2181,s203:2181
     
    6.分发server.properties,同时修改每个文件的broker.id
     
    7.启动kafka服务器
        a)先启动zk
        b)启动kafka
            [s202 ~ s204]
            $>bin/kafka-server-start.sh config/server.properties
 
        c)验证kafka服务器是否启动
            $>netstat -anop | grep 9092
     
    8.创建主题 
        $>bin/kafka-topics.sh --create --zookeeper s201:2181 --replication-factor 3 --partitions 3 --topic test
 
    9.查看主题列表
        $>bin/kafka-topics.sh --list --zookeeper s201:2181
 
    10.启动控制台生产者
        $>bin/kafka-console-producer.sh --broker-list s202:9092 --topic test
 
    11.启动控制台消费者
        $>bin/kafka-console-consumer.sh --bootstrap-server s202:9092 --topic test --from-beginning --zookeeper s202:2181
 
    12.在生产者控制台输入hello world
 
 
kafka集群在zk的配置
-----------------------
    /controller         ===> {"version":1,"brokerid":202,"timestamp":"1490926369148"
     
    /controller_epoch   ===> 1
 
    /brokers
    /brokers/ids
    /brokers/ids/202    ===> {"jmx_port":-1,"timestamp":"1490926370304","endpoints":["PLAINTEXT://s202:9092"],"host":"s202","version":3,"port":9092}
    /brokers/ids/203
    /brokers/ids/204    
 
 
    /brokers/topics/test/partitions/0/state ===>{"controller_epoch":1,"leader":203,"version":1,"leader_epoch":0,"isr":[203,204,202]}
    /brokers/topics/test/partitions/1/state ===>...
    /brokers/topics/test/partitions/2/state ===>...
 
    /brokers/seqid      ===> null
 
    /admin
    /admin/delete_topics/test       ===>标记删除的主题
 
    /isr_change_notification
 
    /consumers/xxxx/
    /config
 
容错
----------------------
 
 
创建主题
-------------------
    repliation_factor 2 partitions 5
 
    $>kafka-topic.sh --zookeeper s202:2181 --replication_factor 3 --partitions 4 --create --topic test3
 
    2 x 5  = 10     //是个文件夹
 
    [s202]
    test2-1         //
    test2-2         //
    test2-3         //
 
    [s203]
    test2-0
    test2-2
    test2-3
    test2-4
 
    [s204]
    test2-0
    test2-1
    test2-4
 
重新布局分区和副本,手动再平衡
--------------------------------
    $>kafka-topics.sh --alter --zookeeper s202:2181 --topic test2 --replica-assignment 203:204,203:204,203:204,203:204,203:204
 
副本
--------------
     broker存放消息以消息达到顺序存放。生产和消费都是副本感知的。
     支持到n-1故障。每个分区都有leader,follow.
     leader挂掉时,消息分区写入到本地log或者,向生产者发送消息确认回执之前,生产者向新的leader发送消息。
 
     新leader的选举是通过isr进行,第一个注册的follower成为leader。
 
kafka支持副本模式
---------------------
    [同步复制]
        1.producer联系zk识别leader
        2.向leader发送消息
        3.leadr收到消息写入到本地log
        4.follower从leader pull消息
        5.follower向本地写入log
        6.follower向leader发送ack消息
        7.leader收到所有follower的ack消息
        8.leader向producer回传ack
                     
     
    [异步副本]
        和同步复制的区别在与leader写入本地log之后,
        直接向client回传ack消息,不需要等待所有follower复制完成。
 
 
通过java API实现消息生产者,发送消息
---------------------------------------
    package com.it18zhang.kafkademo.test;
 
    import org.junit.Test;
 
    import kafka.javaapi.producer.Producer;
    import kafka.producer.KeyedMessage;
    import kafka.producer.ProducerConfig;
 
    import java.util.HashMap;
    import java.util.Properties;
 
    /**
     * Created by Administrator on 2017/3/31.
     */
    public class TestProducer {
        @Test
        public void testSend(){
            Properties props = new Properties();
            //broker列表
            props.put("metadata.broker.list", "s202:9092");
            //串行化
            props.put("serializer.class", "kafka.serializer.StringEncoder");
            //
            props.put("request.required.acks", "1");
 
            //创建生产者配置对象
            ProducerConfig config = new ProducerConfig(props);
 
            //创建生产者
            Producer<String, String> producer = new Producer<String, String>(config);
 
            KeyedMessage<String, String> msg = new KeyedMessage<String, String>("test3","100" ,"hello world tomas100");
            producer.send(msg);
            System.out.println("send over!");
        }
    }
 
 
消息消费者
--------------------
    /**
     * 消费者
     */
    @Test
    public void testConumser(){
        //
        Properties props = new Properties();
        props.put("zookeeper.connect", "s202:2181");
        props.put("group.id", "g3");
        props.put("zookeeper.session.timeout.ms", "500");
        props.put("zookeeper.sync.time.ms", "250");
        props.put("auto.commit.interval.ms", "1000");
        props.put("auto.offset.reset", "smallest");
        //创建消费者配置对象
        ConsumerConfig config = new ConsumerConfig(props);
        //
        Map<String, Integer> map = new HashMap<String, Integer>();
        map.put("test3", new Integer(1));
        Map<String, List<KafkaStream<byte[], byte[]>>> msgs = Consumer.createJavaConsumerConnector(new ConsumerConfig(props)).createMessageStreams(map);
        List<KafkaStream<byte[], byte[]>> msgList = msgs.get("test3");
        for(KafkaStream<byte[],byte[]> stream : msgList){
            ConsumerIterator<byte[],byte[]> it = stream.iterator();
            while(it.hasNext()){
                byte[] message = it.next().message();
                System.out.println(new String(message));
            }
        }
    }
 
flume集成kafka
-------------------
    1.KafkaSink
        [生产者]
        a1.sources = r1
        a1.sinks = k1
        a1.channels = c1
 
        a1.sources.r1.type=netcat
        a1.sources.r1.bind=localhost
        a1.sources.r1.port=8888
 
        a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
        a1.sinks.k1.kafka.topic = test3
        a1.sinks.k1.kafka.bootstrap.servers = s202:9092
        a1.sinks.k1.kafka.flumeBatchSize = 20
        a1.sinks.k1.kafka.producer.acks = 1
 
        a1.channels.c1.type=memory
 
        a1.sources.r1.channels = c1
        a1.sinks.k1.channel = c1
 
    2.KafkaSource
        [消费者]
        a1.sources = r1
        a1.sinks = k1
        a1.channels = c1
 
        a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
        a1.sources.r1.batchSize = 5000
        a1.sources.r1.batchDurationMillis = 2000
        a1.sources.r1.kafka.bootstrap.servers = s202:9092
        a1.sources.r1.kafka.topics = test3
        a1.sources.r1.kafka.consumer.group.id = g4
 
        a1.sinks.k1.type = logger
 
        a1.channels.c1.type=memory
 
        a1.sources.r1.channels = c1
        a1.sinks.k1.channel = c1
                 
    3.Channel
        生产者 + 消费者
        a1.sources = r1
        a1.sinks = k1
        a1.channels = c1
 
        a1.sources.r1.type = avro
        a1.sources.r1.bind = localhost
        a1.sources.r1.port = 8888
 
        a1.sinks.k1.type = logger
 
        a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
        a1.channels.c1.kafka.bootstrap.servers = s202:9092
        a1.channels.c1.kafka.topic = test3
        a1.channels.c1.kafka.consumer.group.id = g6
 
        a1.sources.r1.channels = c1
        a1.sinks.k1.channel = c1

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/繁依Fanyi0/article/detail/597117
推荐阅读
相关标签
  

闽ICP备14008679号