当前位置:   article > 正文

golang学习之go连接Kafka_go kafka

go kafka

一、kafka是什么

1、Kafka 本质上是⼀个消息队列,一个高吞吐量、持久性、分布式的消息系统。
2、包含生产者(producer)和消费者(consumer),每个consumer属于一个特定的消费者组(Consumer Group)。
3、生产者生产消息(message)写入到kafka服务器(broker,kafka集群的节点),消费者从kafka服务器(broker)读取消息。
4、消息可分为不同的类型即不同的主题(topic)。
5、同一主题(topic)的消息可以分散存储到不同的服务器节点(partition)上,一个分区(partition)只能由一个消费者组内的一个消费者消费。
6、每个partition可以有多个副本,一个Leader和若干个Follower,Leader发生故障时,会选取某个Follower成为新的Leader。

二、kafka的安装

kafka集群管理依赖zookeeper的支持,kafka、zookeeper运行需要java环境。我的kafka安装在了windows wsl环境下。

1、jdk安装

1.1、https://www.oracle.com/cn/java/technologies/downloads/下载需要的jdk.
1.2、解压下载的jdk

tar -zxvf jdk-19_linux-x64_bin.tar.gz
  • 1

1.3、配置jdk环境变量,/etc/profile影响所有用户,.bashrc影响当前用户。

vi /etc/profile
export JAVA_HOME=/mnt/d/workspace/wsl/java/jdk-19.0.1
export PATH=${JAVA_HOME}/bin:$PATH
  • 1
  • 2
  • 3

1.4、测试jdk是否安装成功

java -version
java version "19.0.1" 2022-10-18
Java(TM) SE Runtime Environment (build 19.0.1+10-21)
Java HotSpot(TM) 64-Bit Server VM (build 19.0.1+10-21, mixed mode, sharing)
  • 1
  • 2
  • 3
  • 4

2、zookeeper单机安装

2.1、https://zookeeper.apache.org/releases.html zookeeper下载地址
2.2、解压zookeeper包 apache-zookeeper-3.8.0-bin.tar.gz

tar -zxvf apache-zookeeper-3.8.0-bin.tar.gz
  • 1

2.3、配置zookeeper
解压后zookeeper配置文件名称默认为zoo_sample.cfg需要修改为zoo.cfg

mv zoo_sample.cfg zoo.cfg
  • 1

2.4、zookeeper启动
进入到zookeeper安装目录,输入启动命令

bin/zkServer.sh start
  • 1

输出如下信息说明启动成功。

Starting zookeeper ... STARTED
  • 1

2.5、查看zookeeper运行信息
输入查看命令

bin/zkServer.sh status
  • 1

输出zookeeper运行信息

ZooKeeper JMX enabled by default
Using config: /mnt/d/ProgramFiles/apache-zookeeper-3.8.0-bin/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: standalone
  • 1
  • 2
  • 3
  • 4

2.6、zookeeper停止运行

bin/zkServer.sh stop
  • 1

3、kafka单机安装

3.1、kafka包下载https://kafka.apache.org/downloads.html
3.2、解压kafka包

tar -zxvf kafka_2.12-3.3.1.tgz
  • 1

3.3、kafka配置
3.3.0、kafka配置文件 config/server.properties
3.3.1、kafka数据日志目录:log.dirs=***
3.3.2、zookeeper连接地址:zookeeper.connect=localhost:2181
3.3.3、节点id集群时使用:broker.id=0
3.3.4、kafka服务监听的ip和端口:listeners=PLAINTEXT://172.24.198.152:9092。我的kafka安装在了windows wsl内,172.24.198.152是我的wsl地址可以通过如下命令查看:

命令: ip addr |grep eth0
输出: inet 172.24.198.152/20 brd .......
  • 1
  • 2

3.4、kafka启动

bin/kafka-server-start.sh config/server.properties
  • 1

3.5、创建topic

bin/kafka-topics.sh --create --topic my_topic
  • 1

3.6、kafka停止

bin/kafka-server-stop.sh
  • 1

三、go连接kafka

1、go kafka安装

go get github.com/segmentio/kafka-go
  • 1

2、生产者:官方github examples producer-api

package main

import (
	"fmt"
	"io/ioutil"
	"log"
	"net/http"

	kafka "github.com/segmentio/kafka-go"
)

func producerHandler(kafkaWriter *kafka.Writer) func(http.ResponseWriter, *http.Request) {
	return http.HandlerFunc(func(wrt http.ResponseWriter, req *http.Request) {
		body, err := ioutil.ReadAll(req.Body)
		if err != nil {
			log.Fatalln(err)
		}
		msg := kafka.Message{
			Key:   []byte(fmt.Sprintf("address-%s", req.RemoteAddr)),
			Value: body,
		}
		err = kafkaWriter.WriteMessages(req.Context(), msg)

		if err != nil {
			wrt.Write([]byte(err.Error()))
			log.Fatalln(err)
		}
	})
}

func getKafkaWriter(kafkaURL, topic string) *kafka.Writer {
	return &kafka.Writer{
		Addr:     kafka.TCP(kafkaURL),
		Topic:    topic,
		Balancer: &kafka.LeastBytes{},
	}
}

func main() {
	// get kafka writer using environment variables.
	//kafkaURL := os.Getenv("kafkaURL")
	kafkaURL := "172.24.198.152:9092"

	//topic := os.Getenv("topic")
	topic := "my_topic"

	kafkaWriter := getKafkaWriter(kafkaURL, topic)

	defer kafkaWriter.Close()

	// Add handle func for producer.
	http.HandleFunc("/", producerHandler(kafkaWriter))

	// Run the web server.
	fmt.Println("start producer-api ... !!")
	log.Fatal(http.ListenAndServe(":8081", nil))
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57

3、消费者:官方github examples consumer-logger

package main

import (
	"context"
	"fmt"
	"log"
	"strings"

	kafka "github.com/segmentio/kafka-go"
)

func getKafkaReader(kafkaURL, topic, groupID string) *kafka.Reader {
	brokers := strings.Split(kafkaURL, ",")
	return kafka.NewReader(kafka.ReaderConfig{
		Brokers:  brokers,
		GroupID:  groupID,
		Topic:    topic,
		MinBytes: 10e3, // 10KB
		MaxBytes: 10e6, // 10MB
	})
}

func main() {
	// get kafka reader using environment variables.
	//kafkaURL := os.Getenv("kafkaURL")
	kafkaURL := "172.24.198.152:9092"
	//topic := os.Getenv("topic")
	topic := "my_topic"
	//groupID := os.Getenv("groupID")
	groupID := ""

	reader := getKafkaReader(kafkaURL, topic, groupID)

	defer reader.Close()

	fmt.Println("start consuming ... !!")
	for {
		m, err := reader.ReadMessage(context.Background())
		if err != nil {
			log.Fatalln(err)
		}
		fmt.Printf("message at topic:%v partition:%v offset:%v	%s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value))
	}
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/羊村懒王/article/detail/600218
推荐阅读
相关标签
  

闽ICP备14008679号