当前位置:   article > 正文

kafka安装以及go使用kafka_go-kafka创建topic

go-kafka创建topic

Kafka入门教程 Golang实现Kafka消息发送、接收

https://blog.csdn.net/tflasd1157/article/details/81985722

 

 

 

安装

参考https://blog.csdn.net/panchang199266/article/details/82113453

Step 1: 下载代码

你可以登录Apache kafka 官方下载。
http://kafka.apache.org/downloads.html
备注:2.11-1.1.0版本才与JDK1.7兼容,否则更高版本需要JDK1.8


Step 2: 启动服务

运行kafka需要使用Zookeeper,所以你需要先启动Zookeeper,如果你没有Zookeeper,你可以使用kafka自带打包和配置好的Zookeeper(PS:在kafka包里)。

  1. /后台启动(推荐)
  2. ./zookeeper-server-start.sh ../config/zookeeper.properties 1>/dev/null 2>&1 &

现在启动kafka

vim config/server1.properties

  1. broker.id=0
  2. listeners=PLAINTEXT://192.168.48.131:9092
  3. log.dirs=kafka-logs
  4. zookeeper.connect=localhost:2181

//后台启动kafka
./kafka-server-start.sh ../config/server1.properties 1>/dev/null 2>&1 &
 

./kafka-topics.sh --create --zookeeper localhost:2181 --config max.message.bytes=12800000 --config flush.messages=1 --replication-factor 1 --partitions 1 --topic test
 

  1. 命令解析:
  2. --create: 指定创建topic动作
  3. --topic:指定新建topic的名称
  4. --zookeeper: 指定kafka连接zk的连接url,该值和server.properties文件中的配置项{zookeeper.connect}一样
  5. --config:指定当前topic上有效的参数值,参数列表参考文档为: http://kafka.apache.org/082/documentation.html#brokerconfigs
  6. --partitions:指定当前创建的kafka分区数量,默认为1
  7. --replication-factor:指定每个分区的复制因子个数,默认1

创建好之后,可以通过运行以下命令,查看已创建的topic信息:

./kafka-topics.sh --list --zookeeper localhost:2181

./kafka-topics.sh --describe --zookeeper localhost:2181  --topic test

  1. 命令解析:
  2. --describe: 指定是展示详细信息命令
  3. --zookeeper: 指定kafka连接zk的连接url,该值和server.properties文件中的配置项{zookeeper.connect}一样
  4. --topic:指定需要展示数据的topic名称

发送消息

[root@administrator bin]# ./kafka-console-producer.sh --broker-list localhost:9092 --topic test
>this is a message
>this is another message
//按`Ctrl+C`终止输入
消费消息

./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

如果你有2台不同的终端上运行上述命令,那么当你在运行生产者时,消费者就能消费到生产者发送的消息。

 

 6: 设置多个broker集群(单机伪集群的配置)

到目前,我们只是单一的运行一个broker,没什么意思。对于Kafka,一个broker仅仅只是一个集群的大小,所有让我们多设几个broker。

  首先为每个broker创建一个配置文件:

  1. cp config/server.properties config/server-1.properties
  2. cp config/server.properties config/server-2.properties

修改添加 3个config/server.properties 

  1. config/server1.properties:
  2. broker.id=0
  3. listeners=PLAINTEXT://192.168.10.130:9092
  4. log.dirs=kafka-logs
  5. zookeeper.connect=localhost:2181
  6. config/server-1.properties:
  7. broker.id=1
  8. listeners=PLAINTEXT://192.168.10.130:9093
  9. log.dirs=kafka-logs-1
  10. zookeeper.connect=localhost:2181
  11. config/server-2.properties:
  12. broker.id=2
  13. listeners=PLAINTEXT://192.168.10.130:9094
  14. log.dirs=kafka-logs-2
  15. zookeeper.connect=localhost:2181

备注2:当使用java客户端访问远程的kafka时,一定要把集群中所有的端口打开,否则会连接超时

/sbin/iptables -I INPUT -p tcp --dport 9092 -j ACCEPT
/sbin/iptables -I INPUT -p tcp --dport 9093 -j ACCEPT
/sbin/iptables -I INPUT -p tcp --dport 9094 -j ACCEPT
/etc/rc.d/init.d/iptables save

broker.id是集群中每个节点的唯一且永久的名称,我们修改端口和日志目录是因为我们现在在同一台机器上运行,我们要防止broker在同一端口上注册和覆盖对方的数据。

  我们已经运行了zookeeper和刚才的一个kafka节点,所有我们只需要在启动2个新的kafka节点。

./kafka-server-start.sh ../config/server-1.properties 1>/dev/null 2>&1 &
./kafka-server-start.sh ../config/server-2.properties 1>/dev/null 2>&1 &
现在,我们创建一个新topic,把备份设置为:3

./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
好了,现在我们已经有了一个集群了,我们怎么知道每个集群在做什么呢?运行命令“describe topics”

>./kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic

//所有分区的摘要
Topic:my-replicated-topic    PartitionCount:1    ReplicationFactor:3    Configs:
//提供一个分区信息,因为我们只有一个分区,所以只有一行。
Topic: my-replicated-topic    Partition: 0    Leader: 1    Replicas: 1,2,0    Isr: 1,2,0

“leader”:该节点负责该分区的所有的读和写,每个节点的leader都是随机选择的。
“replicas”:备份的节点列表,无论该节点是否是leader或者目前是否还活着,只是显示。
“isr”:“同步备份”的节点列表,也就是活着的节点并且正在同步leader
  其中Replicas和Isr中的1,2,0就对应着3个broker他们的broker.id属性!

  我们运行这个命令,看看一开始我们创建的那个节点:
> ./kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
Topic:test    PartitionCount:1    ReplicationFactor:1    Configs:
Topic: test    Partition: 0    Leader: 0    Replicas: 0    Isr: 0

这并不奇怪,刚才创建的主题没有Replicas,并且在服务器“0”上,我们创建它的时候,集群中只有一个服务器,所以是“0”。

Step 7: 测试集群的容错能力

7.1发布消息到集群

  1. [root@administrator bin]# ./kafka-console-producer.sh --broker-list 192.168.10.130:9092 --topic my-replicated-topic
  2. >cluster message 1
  3. >cluster message 2
  4. //Ctrl+C终止产生消息

7.2消费消息

[root@administrator bin]# ./kafka-console-consumer.sh --bootstrap-server 192.168.10.130:9093 --from-beginning --topic my-replicated-topic
cluster message 1
cluster message 2
//Ctrl+C终止消费消息

7.3干掉leader,测试集群容错
  首先查询谁是leader

> ./kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
//所有分区的摘要
Topic:my-replicated-topic    PartitionCount:1    ReplicationFactor:3    Configs:
//提供一个分区信息,因为我们只有一个分区,所以只有一行。
Topic: my-replicated-topic    Partition: 0    Leader: 1    Replicas: 1,2,0    Isr: 1,2,0

可以看到Leader的broker.id为1,找到对应的Broker

[root@administrator bin]# jps -m
5130 Kafka ../config/server.properties
4861 QuorumPeerMain ../config/zookeeper.properties
1231 Bootstrap start start
7420 Kafka ../config/server-2.properties
7111 Kafka ../config/server-1.properties
9139 Jps -m

通过以上查询到Leader的PID(Kafka ../config/server-1.properties)为7111,杀掉该进程

//杀掉该进程
kill -9 7111
//再查询一下,确认新的Leader已经产生,新的Leader为broker.id=0
[root@administrator bin]# ./kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic       PartitionCount:1        ReplicationFactor:3    Configs:
//备份节点之一成为新的leader,而broker1已经不在同步备份集合里了
Topic: my-replicated-topic      Partition: 0    Leader: 0       Replicas: 1,0,2 Isr: 0,2

7.4再次消费消息,确认消息没有丢失

[root@administrator bin]# ./kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-replicated-topic
cluster message 1
cluster message 2
消息依然存在,故障转移成功!!

从kafka读取数据后 数据会自动删除吗

不会,kafka中数据的删除跟有没有消费者消费完全无关。数据的删除,只跟kafka broker上面上面的这两个配置有关:

1

2

log.retention.hours=48 #数据最多保存48小时

log.retention.bytes=1073741824 #数据最多1G

默认

    log.retention.bytes = -1
    log.retention.hours = 168

 

 

 

go调用kafka

参考https://blog.csdn.net/kdpujie/article/details/79093595

https://blog.csdn.net/tflasd1157/article/details/81985722

消费者接收

  1. package main
  2. import (
  3. "github.com/Shopify/sarama"
  4. "fmt"
  5. )
  6. var Address = []string{"192.168.48.131:9092","192.168.48.131:9093","192.168.48.131:9094"}
  7. func main(){
  8. //配置
  9. config := sarama.NewConfig()
  10. //接收失败通知
  11. config.Consumer.Return.Errors = true
  12. //设置使用的kafka版本,如果低于V0_10_0_0版本,消息中的timestrap没有作用.需要消费和生产同时配置
  13. config.Version = sarama.V0_11_0_0
  14. //新建一个消费者
  15. consumer, e := sarama.NewConsumer(Address, config)
  16. if e != nil {
  17. panic("error get consumer")
  18. }
  19. defer consumer.Close()
  20. //根据消费者获取指定的主题分区的消费者,Offset这里指定为获取最新的消息.
  21. partitionConsumer, err := consumer.ConsumePartition("my-replicated-topic", 0, sarama.OffsetNewest)
  22. if err != nil {
  23. fmt.Println("error get partition consumer", err)
  24. }
  25. defer partitionConsumer.Close()
  26. //循环等待接受消息.
  27. for {
  28. select {
  29. //接收消息通道和错误通道的内容.
  30. case msg := <-partitionConsumer.Messages():
  31. fmt.Println("msg offset: ", msg.Offset, " partition: ", msg.Partition, " timestrap: ",
  32. msg.Timestamp.Format("2006-Jan-02 15:04"), " value: ", string(msg.Value))
  33. case err := <-partitionConsumer.Errors():
  34. fmt.Println(err.Err)
  35. }
  36. }
  37. }

 

二. 生产者

1. 同步消息模式

  1. import (
  2. "github.com/Shopify/sarama"
  3. "time"
  4. "log"
  5. "fmt"
  6. "os"
  7. "os/signal"
  8. "sync"
  9. )
  10. var Address = []string{"10.130.138.164:9092","10.130.138.164:9093","10.130.138.164:9094"}
  11. func main() {
  12. syncProducer(Address)
  13. //asyncProducer1(Address)
  14. }
  15. //同步消息模式
  16. func syncProducer(address []string) {
  17. config := sarama.NewConfig()
  18. config.Producer.Return.Successes = true
  19. config.Producer.Timeout = 5 * time.Second
  20. p, err := sarama.NewSyncProducer(address, config)
  21. if err != nil {
  22. log.Printf("sarama.NewSyncProducer err, message=%s \n", err)
  23. return
  24. }
  25. defer p.Close()
  26. topic := "test"
  27. srcValue := "sync: this is a message. index=%d"
  28. for i:=0; i<10; i++ {
  29. value := fmt.Sprintf(srcValue, i)
  30. msg := &sarama.ProducerMessage{
  31. Topic:topic,
  32. Value:sarama.ByteEncoder(value),
  33. }
  34. part, offset, err := p.SendMessage(msg)
  35. if err != nil {
  36. log.Printf("send message(%s) err=%s \n", value, err)
  37. }else {
  38. fmt.Fprintf(os.Stdout, value + "发送成功,partition=%d, offset=%d \n", part, offset)
  39. }
  40. time.Sleep(2*time.Second)
  41. }
  42. }

 

http://bbs.itheima.com/forum.php?mod=viewthread&tid=406586&highlight=go

公司决定使用kafka来作为新一代的消息队列来使用,于是开始对kafka的机制,原理,go客户端的使用,各种了解了一番,过程中也遇到了不少的坑,特地写出来,和大家分享,也供自己参考,加深印象。
首先,kafka的设计思想,各个角色比如broker,producer,consumer,partition等等还有与它们相关的配置,这里就先不作介绍了,官方文档都有,文章后面也会提到。
附上kafka官方文档链接: 
http://kafka.apachecn.org/documentation.html
客户端选择: 
go连接kafka的客户端不多,综合对比了下,决定使用sarama 
“go get github.com/Shopify/sarama”
生产者:

func SaramaProducer()  {

    config := sarama.NewConfig()
    //等待服务器所有副本都保存成功后的响应
    config.Producer.RequiredAcks = sarama.WaitForAll
    //随机向partition发送消息
    config.Producer.Partitioner = sarama.NewRandomPartitioner
    //是否等待成功和失败后的响应,只有上面的RequireAcks设置不是NoReponse这里才有用.
    config.Producer.Return.Successes = true
    config.Producer.Return.Errors = true
    //设置使用的kafka版本,如果低于V0_10_0_0版本,消息中的timestrap没有作用.需要消费和生产同时配置
    //注意,版本设置不对的话,kafka会返回很奇怪的错误,并且无法成功发送消息
    config.Version = sarama.V0_10_0_1

    fmt.Println("start make producer")
    //使用配置,新建一个异步生产者
    producer, e := sarama.NewAsyncProducer([]string{"182.61.9.153:6667","182.61.9.154:6667","182.61.9.155:6667"}, config)
    if e != nil {
        fmt.Println(e)
        return
    }
    defer producer.AsyncClose()

    //循环判断哪个通道发送过来数据.
    fmt.Println("start goroutine")
    go func(p sarama.AsyncProducer) {
        for{
            select {
            case  <-p.Successes():
                //fmt.Println("offset: ", suc.Offset, "timestamp: ", suc.Timestamp.String(), "partitions: ", suc.Partition)
            case fail := <-p.Errors():
                fmt.Println("err: ", fail.Err)
            }
        }
    }(producer)

    var value string
    for i:=0;;i++ {
        time.Sleep(500*time.Millisecond)
        time11:=time.Now()
        value = "this is a message 0606 "+time11.Format("15:04:05")

        // 发送的消息,主题。
        // 注意:这里的msg必须得是新构建的变量,不然你会发现发送过去的消息内容都是一样的,因为批次发送消息的关系。
        msg := &sarama.ProducerMessage{
            Topic: "0606_test",
        }

        //将字符串转化为字节数组
        msg.Value = sarama.ByteEncoder(value)
        //fmt.Println(value)

        //使用通道发送
        producer.Input() <- msg
    }
}

这里使用的是异步producer,kafka的producer有个特点是,批次发送,这么做的好处就是,可以提高吞吐量,所以我们在看几个主流的消息队列性能测试对比的时候,kafka的吞吐量是遥遥领先的。
producer还有个特性是,就是每发送一次消息,都会要求broker返回一个消息回执,即ack。如果ack没有收到,producer会进行重发,如果设置了重发次数的话。这个ack有三种模式:

// The level of acknowledgement reliability needed from the broker (defaults
// to WaitForLocal). Equivalent to the `request.required.acks` setting of the JVM producer.
// 等同于jvm kafka中的`request.required.acks`
        RequiredAcks RequiredAcks

type RequiredAcks int16
const (
// 第一个模式,NoResponse doesn't send any response, the TCP ACK is all you get.
    NoResponse RequiredAcks = 0
//第二个模式, WaitForLocal waits for only the local commit to succeed before responding.
    WaitForLocal RequiredAcks = 1
// 第三个模式,WaitForAll waits for all in-sync replicas to commit before responding.
// The minimum number of in-sync replicas is configured on the broker via
// the `min.insync.replicas` configuration key.
    WaitForAll RequiredAcks = -1
)

如果RequiredAcks设置为0,在这种情况下,服务器是否收到请求是没法保证的,并且参数retries(重发)也不会生效(因为客户端无法获得失败信息)。既然提到了重发,可以看一下下面sarama的重发定义:

Retry struct {
            // The total number of times to retry sending a message (default 3).
            // Similar to the `message.send.max.retries` setting of the JVM producer.
            Max int
            // How long to wait for the cluster to settle between retries
            // (default 100ms). Similar to the `retry.backoff.ms` setting of the
            // JVM producer.
            Backoff time.Duration
        }

消费者:

func SaramaConsumer()  {

    fmt.Println("start consume")
    config := sarama.NewConfig()

    //提交offset的间隔时间,每秒提交一次给kafka
    config.Consumer.Offsets.CommitInterval = 1 * time.Second

    //设置使用的kafka版本,如果低于V0_10_0_0版本,消息中的timestrap没有作用.需要消费和生产同时配置
    config.Version = sarama.V0_10_0_1

//consumer新建的时候会新建一个client,这个client归属于这个consumer,并且这个client不能用作其他的consumer
    consumer, err := sarama.NewConsumer([]string{"182.61.9.153:6667","182.61.9.154:6667","182.61.9.155:6667"}, config)
    if err != nil {
        panic(err)
    }

//新建一个client,为了后面offsetManager做准备
    client, err := sarama.NewClient([]string{"182.61.9.153:6667","182.61.9.154:6667","182.61.9.155:6667"}, config)
    if err != nil {
        panic("client create error")
    }
    defer client.Close()

//新建offsetManager,为了能够手动控制offset
    offsetManager,err:=sarama.NewOffsetManagerFromClient("group111",client)
    if err != nil {
        panic("offsetManager create error")
    }
    defer offsetManager.Close()

//创建一个第2分区的offsetManager,每个partition都维护了自己的offset
    partitionOffsetManager,err:=offsetManager.ManagePartition("0606_test",2)
    if err != nil {
        panic("partitionOffsetManager create error")
    }
    defer partitionOffsetManager.Close()


    fmt.Println("consumer init success")

    defer func() {
        if err := consumer.Close(); err != nil {
            log.Fatalln(err)
        }
    }()

    //sarama提供了一些额外的方法,以便我们获取broker那边的情况
    topics,_:=consumer.Topics()
    fmt.Println(topics)
    partitions,_:=consumer.Partitions("0606_test")
    fmt.Println(partitions)

//第一次的offset从kafka获取(发送OffsetFetchRequest),之后从本地获取,由MarkOffset()得来
    nextOffset,_:=partitionOffsetManager.NextOffset()
    fmt.Println(nextOffset)

//创建一个分区consumer,从上次提交的offset开始进行消费
    partitionConsumer, err := consumer.ConsumePartition("0606_test", 2, nextOffset+1)
    if err != nil {
        panic(err)
    }

    defer func() {
        if err := partitionConsumer.Close(); err != nil {
            log.Fatalln(err)
        }
    }()

    // Trap SIGINT to trigger a shutdown.
    signals := make(chan os.Signal, 1)
    signal.Notify(signals, os.Interrupt)

    fmt.Println("start consume really")

ConsumerLoop:
    for {
        select {
        case msg := <-partitionConsumer.Messages():
            log.Printf("Consumed message offset %d\n message:%s", msg.Offset,string(msg.Value))
            //拿到下一个offset
            nextOffset,offsetString:=partitionOffsetManager.NextOffset()
            fmt.Println(nextOffset+1,"...",offsetString)
            //提交offset,默认提交到本地缓存,每秒钟往broker提交一次(可以设置)
            partitionOffsetManager.MarkOffset(nextOffset+1,"modified metadata")

        case <-signals:
            break ConsumerLoop
        }
    }
}

至此,一个初步的consumer构建好了,很多关于consumer的内容见上面代码的注释。可以根据consumer.Partitions(“topic”)来获取这个topic的所有分区,然后为每个分区构建一个consumer,然后进行消费。

这样挺麻烦的,也不够优雅,其实kafka的consumer还有个很重要的机制,就是consumer group,可惜sarama并不支持,不过有另一个开源的库,叫做”github.com/bsm/sarama-cluster”,它是在sarama上加了一层封装,支持了consumer group,这个我会在下一篇文章中写到

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/凡人多烦事01/article/detail/529531
推荐阅读
相关标签
  

闽ICP备14008679号