赞
踩
下篇:Go实现LogCollect:海量日志收集系统【下篇——开发LogTransfer】
项目架构图:
当公司发展的越来越大,业务越来越复杂时,每个业务系统都有自己的日志。此时我们就应该将不同业务线的日志进行实时收集,存储到一个日志收集中心,最后再通过web页面展示出来。
- 解决方案:
- 把机器上的日志实时收集,统一的存储到中心系统
- 对这些日志建立索引,通过搜索即可以找到对应日志
- 提供界面友好的web界面,通过web即可以完成日志搜索
该系统可能会出现的问题:
①方案选择:
- 早期的ELK(Elasticsearch,Logstash, Kibana)到现在的EFK(Elasticsearch,FilebeatorFluentd, Kibana)。ELK在每台服务器上部署logstash,比较重量级,所以演化成客户端部署filebeat的EFK,由filebeat收集向logstash中写数据,最后落地到elasticsearch,通过kibana界面进行日志检索。其中Logstash主要用于收集、解析、转换
- 优:现成的解决方案,可以直接拿来使用
- 缺:运维成本高,每增加一个日志收集项都需要手动修改配置;无法准确获取logstash的状态,无法做到定制化开发与维护
方案设计:
各个组件说明:
- Log Agent:日志收集客户端,用来收集服务器上的日志
- Kafka:高吞吐量的分布式消息队列
- Elasticsearch:开源搜索引擎框架,提供基于http RESTFul的web接口
- Flink、Spark:分布式计算框架,能够对大量数据进行分布式处理
vim docker-compose.yml
docker-compose.yml:
version: '3' services: zookeeper: image: confluentinc/cp-zookeeper:6.2.0 ports: - "2181:2181" environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 kafka: image: confluentinc/cp-kafka:6.2.0 ports: - "9092:9092" environment: KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 #KAFKA_ADVERTISED_LISTENERS后面改为自己本地宿主机的ip,例如我本地mac的ip为192.168.0.101 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.0.101:9092 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 depends_on: - zookeeper
# 进入到docker-compose.yml所在目录,执行下面命令
docker-compose up -d
# 查看部署结果,状态为up表明部署成功
docker-compose ps
# 1. 创建对应topic
docker-compose exec kafka kafka-topics --create --topic nginx_log --partitions 1 --replication-factor 1 --bootstrap-server 192.168.0.101:9092
# 2. 查看topic列表
docker-compose exec kafka kafka-topics --list --zookeeper zookeeper:2181
//golang中操作kafka的库
go get github.com/IBM/sarama
package main import ( "fmt" "time" "github.com/IBM/sarama" ) func main() { config := sarama.NewConfig() config.Producer.RequiredAcks = sarama.WaitForAll // 发送完数据需要leader和follow都确认 config.Producer.Partitioner = sarama.NewRandomPartitioner // 新选出⼀个partition config.Producer.Return.Successes = true // 成功交付的消息将在success channel返回 // 连接kafka client, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config) if err != nil { fmt.Println("producer close, err:", err) return } defer client.Close() for { // 构造⼀个消息 msg := &sarama.ProducerMessage{} msg.Topic = "nginx_log" msg.Value = sarama.StringEncoder("this is a good test, my message is good") // 发送消息 pid, offset, err := client.SendMessage(msg) if err != nil { fmt.Println("send message failed,", err) return } fmt.Printf("pid:%v offset:%v\n", pid, offset) time.Sleep(10 * time.Millisecond) } }
- 根据log_agent.conf的LogAgent配置,初始化LogAgent参数,确认LogAgent工作日志(log_agent.log)的存放位置
- tail读取nginx_log.log日志信息,将读取到的信息通过kafka连接发送到kafka中
- kafka消费对应的信息
. ├─conf │ log_agent.conf │ ├─kafka │ kafka.go │ ├─consumer │ consumer.go │ ├─logs │ log_agent.log │ ├─main │ config.go │ log.go │ main.go │ server.go │ ├─tailf │ tail.go │ go.mod └─ go.sum
[logs]
log_level = debug
log_path = /Users/xxx/GolandProjects/LogAgent/log/log_agent.log
[collect]
log_path = /Users/xxx/GolandProjects/LogAgent/nginx_log.log
topic = nginx_log
chan_size = 100
[kafka]
server_addr = localhost:9092
用于消费发送到kafka分区中的数据
package main import ( "fmt" "github.com/IBM/sarama" ) // kafka consumer func main() { consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, nil) if err != nil { fmt.Printf("fail to start consumer, err:%v\n", err) return } partitionList, err := consumer.Partitions("nginx_log") // 根据topic取到所有的分区 if err != nil { fmt.Printf("fail to get list of partition:err%v\n", err) return } fmt.Println(partitionList) for partition := range partitionList { // 遍历所有的分区 // 针对每个分区创建一个对应的分区消费者 pc, err := consumer.ConsumePartition("nginx_log", int32(partition), sarama.OffsetNewest) if err != nil { fmt.Printf("failed to start consumer for partition %d,err:%v\n", partition, err) return } defer pc.AsyncClose() // 异步从每个分区消费信息 go func(sarama.PartitionConsumer) { for msg := range pc.Messages() { fmt.Printf("Partition:%d Offset:%d Key:%v Value:%v\n", msg.Partition, msg.Offset, msg.Key, string(msg.Value)) } }(pc) } //演示时使用 select {} }
package kafka import ( "github.com/IBM/sarama" "github.com/astaxie/beego/logs" ) var ( client sarama.SyncProducer ) func InitKafka(addr string) (err error) { // Kafka生产者配置 config := sarama.NewConfig() config.Producer.RequiredAcks = sarama.WaitForAll // 发送完数据需要leader和follow都确认 config.Producer.Partitioner = sarama.NewRandomPartitioner // 新选出⼀个partition config.Producer.Return.Successes = true // 成功交付的消息将在success channel返回 // 新建一个生产者对象 client, err = sarama.NewSyncProducer([]string{addr}, config) if err != nil { logs.Error("初识化Kafka producer失败:", err) return } logs.Debug("初始化Kafka producer成功,地址为:", addr) return } func SendToKafka(data, topic string) (err error) { msg := &sarama.ProducerMessage{} msg.Topic = topic msg.Value = sarama.StringEncoder(data) pid, offset, err := client.SendMessage(msg) if err != nil { logs.Error("发送信息失败, err:%v, data:%v, topic:%v", err, data, topic) return } logs.Debug("read success, pid:%v, offset:%v, topic:%v\n", pid, offset, topic) return }
package main import ( "LogAgent/tailf" "errors" "fmt" "github.com/astaxie/beego/config" ) var ( logConfig *Config ) // 日志配置 type Config struct { logLevel string logPath string chanSize int KafkaAddr string CollectConf []tailf.CollectConf } // 日志收集配置 func loadCollectConf(conf config.Configer) (err error) { var c tailf.CollectConf c.LogPath = conf.String("collect::log_path") if len(c.LogPath) == 0 { err = errors.New("无效的 collect::log_path ") return } c.Topic = conf.String("collect::topic") if len(c.Topic) == 0 { err = errors.New("无效的 collect::topic ") return } logConfig.CollectConf = append(logConfig.CollectConf, c) return } // 导入解析LogAgent初始化配置 func loadInitConf(confType, filename string) (err error) { conf, err := config.NewConfig(confType, filename) if err != nil { fmt.Printf("初始化配置文件出错:%v\n", err) return } // 导入配置信息 logConfig = &Config{} // 日志级别 logConfig.logLevel = conf.String("logs::log_level") if len(logConfig.logLevel) == 0 { logConfig.logLevel = "debug" } // 日志输出路径 logConfig.logPath = conf.String("logs::log_path") if len(logConfig.logPath) == 0 { logConfig.logPath = "/Users/xxx/GolandProjects/LogAgent/log/log_agent.log" } // 管道大小 logConfig.chanSize, err = conf.Int("collect::chan_size") if err != nil { logConfig.chanSize = 100 } // Kafka logConfig.KafkaAddr = conf.String("kafka::server_addr") if len(logConfig.KafkaAddr) == 0 { err = fmt.Errorf("初识化Kafka失败") return } err = loadCollectConf(conf) if err != nil { fmt.Printf("导入日志收集配置错误:%v", err) return } return }
package main import ( "encoding/json" "fmt" "github.com/astaxie/beego/logs" ) func convertLogLevel(level string) int { switch level { case "debug": return logs.LevelDebug case "warn": return logs.LevelWarn case "info": return logs.LevelInfo case "trace": return logs.LevelTrace } return logs.LevelDebug } func initLogger() (err error) { config := make(map[string]interface{}) config["filename"] = logConfig.logPath config["level"] = convertLogLevel(logConfig.logLevel) configStr, err := json.Marshal(config) if err != nil { fmt.Println("初始化日志, 序列化失败:", err) return } _ = logs.SetLogger(logs.AdapterFile, string(configStr)) return }
package main import ( "LogAgent/kafka" "LogAgent/tailf" "fmt" "github.com/astaxie/beego/logs" ) func main() { fmt.Println("开始") // 读取logAgent配置文件 filename := "/Users/xxx/GolandProjects/LogAgent/conf/log_agent.conf" err := loadInitConf("ini", filename) if err != nil { fmt.Printf("导入配置文件错误:%v\n", err) panic("导入配置文件错误") return } // 初始化日志信息 err = initLogger() if err != nil { fmt.Printf("导入日志文件错误:%v\n", err) panic("导入日志文件错误") return } // 输出成功信息 logs.Debug("导入日志成功%v", logConfig) // 初始化tailf(解析nginx_log日志文件所在路径等,管道大小) err = tailf.InitTail(logConfig.CollectConf, logConfig.chanSize) if err != nil { logs.Error("初始化tailf失败:", err) return } logs.Debug("初始化tailf成功!") // 初始化Kafka err = kafka.InitKafka(logConfig.KafkaAddr) if err != nil { logs.Error("初识化kafka producer失败:", err) return } logs.Debug("初始化Kafka成功!") // 运行 err = serverRun() if err != nil { logs.Error("serverRun failed:", err) } logs.Info("程序退出") }
package main import ( "LogAgent/kafka" "LogAgent/tailf" "fmt" "github.com/astaxie/beego/logs" "time" ) func serverRun() (err error) { for { msg := tailf.GetOneLine() err = sendToKafka(msg) if err != nil { logs.Error("发送消息到Kafka 失败, err:%v", err) time.Sleep(time.Second) continue } } } func sendToKafka(msg *tailf.TextMsg) (err error) { fmt.Printf("读取 msg:%s, topic:%s\n", msg.Msg, msg.Topic) // 将消息打印在终端 _ = kafka.SendToKafka(msg.Msg, msg.Topic) return }
package tailf import ( "fmt" "github.com/astaxie/beego/logs" "github.com/hpcloud/tail" "time" ) // 将日志收集配置放在tailf包下,方便其他包引用 type CollectConf struct { LogPath string Topic string } // 存入Collect type TailObj struct { tail *tail.Tail conf CollectConf } // 定义Message信息 type TextMsg struct { Msg string Topic string } // 管理系统所有tail对象 type TailObjMgr struct { tailsObjs []*TailObj msgChan chan *TextMsg } // 定义全局变量 var ( tailObjMgr *TailObjMgr ) func GetOneLine() (msg *TextMsg) { msg = <-tailObjMgr.msgChan return } func InitTail(conf []CollectConf, chanSize int) (err error) { // 加载配置项 if len(conf) == 0 { err = fmt.Errorf("无效的log collect conf:%v", conf) return } tailObjMgr = &TailObjMgr{ msgChan: make(chan *TextMsg, chanSize), // 定义Chan管道 } // 循环导入 for _, v := range conf { // 初始化Tail fmt.Println(v) tails, errTail := tail.TailFile(v.LogPath, tail.Config{ ReOpen: true, Follow: true, Location: &tail.SeekInfo{Offset: 0, Whence: 0}, MustExist: false, Poll: true, }) if errTail != nil { err = errTail fmt.Println("tail 操作文件错误:", err) return } // 导入配置项 obj := &TailObj{ conf: v, tail: tails, } tailObjMgr.tailsObjs = append(tailObjMgr.tailsObjs, obj) go readFromTail(obj) } return } // 读入日志数据 func readFromTail(tailObj *TailObj) { for true { msg, ok := <-tailObj.tail.Lines if !ok { logs.Warn("Tail file close reopen, filename:%s\n", tailObj.tail.Filename) time.Sleep(100 * time.Millisecond) continue } textMsg := &TextMsg{ Msg: msg.Text, Topic: tailObj.conf.Topic, } // 放入chan里面 tailObjMgr.msgChan <- textMsg } }
消费结果:
tailf读取nginx_log.log文件中的日志信息,并发送到kafka,由kakfa的消费者来进行消费
如果发现无法访问到docker中的kafka了,可能是因为你物理主机的ip更换了。docker-compose down暂停部署,然后重新修改docker-compose.yml中kafka绑定的物理主机IP即可,然后docker-compose up -d 重新部署。
docker network create etcd-network
docker run -d --name etcd1 --network etcd-network -p 2379:2379 -p 2380:2380 quay.io/coreos/etcd:v3.4.13 etcd \
--name etcd1 \
--advertise-client-urls http://0.0.0.0:2379 \
--listen-client-urls http://0.0.0.0:2379 \
--initial-advertise-peer-urls http://0.0.0.0:2380 \
--listen-peer-urls http://0.0.0.0:2380 \
--initial-cluster-token etcd-cluster-1 \
--initial-cluster etcd1=http://0.0.0.0:2380 \
--initial-cluster-state new
docker run -d --name etcd2 --network etcd-network -p 22379:2379 -p 22380:2380 quay.io/coreos/etcd:v3.4.13 etcd \
--name etcd2 \
--advertise-client-urls http://0.0.0.0:22379 \
--listen-client-urls http://0.0.0.0:22379 \
--initial-advertise-peer-urls http://0.0.0.0:22380 \
--listen-peer-urls http://0.0.0.0:22380 \
--initial-cluster-token etcd-cluster-1 \
--initial-cluster etcd1=http://etcd1:2380,etcd2=http://0.0.0.0:22380 \
--initial-cluster-state existing
docker run -d --name etcd3 --network etcd-network -p 32379:2379 -p 32380:2380 quay.io/coreos/etcd:v3.4.13 etcd \
--name etcd3 \
--advertise-client-urls http://0.0.0.0:32379 \
--listen-client-urls http://0.0.0.0:32379 \
--initial-advertise-peer-urls http://0.0.0.0:32380 \
--listen-peer-urls http://0.0.0.0:32380 \
--initial-cluster-token etcd-cluster-1 \
--initial-cluster etcd1=http://etcd1:2380,etcd2=http://etcd2:2380,etcd3=http://0.0.0.0:32380 \
--initial-cluster-state existing
这样,我们就成功在Docker中搭建了一个由3个etcd节点组成的集群,并分别暴露了端口2379、22379和32379。您可以使用docker ps命令来查看正在运行的容器,使用docker logs <container_name>命令来查看每个etcd容器的日志
. │ go.mod │ go.sum │ │ ├─conf │ log_agent.conf │ ├─kafka │ kafka.go │ ├─logs │ log_agent.log │ ├─main │ config.go │ etcd.go │ ip.go │ log.go │ main.go │ server.go │ ├─tailf │ tail.go │ └─tools └─SetConf main.go
package main import ( "LogAgent/tailf" "context" "encoding/json" "fmt" "go.etcd.io/etcd/client/v3" "time" ) // 定义etcd的前缀key const ( EtcdKey = "/backend/logagent/config/192.168.0.101" ) func SetLogConfToEtcd() { cli, err := clientv3.New(clientv3.Config{ Endpoints: []string{"localhost:2379", "localhost:22379", "localhost:32379"}, DialTimeout: 5 * time.Second, }) if err != nil { fmt.Println("connect failed, err:", err) return } fmt.Println("connect succ") defer cli.Close() var logConfArr []tailf.CollectConf logConfArr = append( logConfArr, tailf.CollectConf{ LogPath: "/Users/xxx/GolandProjects/LogAgent/mysql_log.log", Topic: "mysql_log", }, ) logConfArr = append( logConfArr, tailf.CollectConf{ LogPath: "/Users/xxx/GolandProjects/LogAgent/nginx_log.log", Topic: "nginx_log", }, ) // Json打包 data, err := json.Marshal(logConfArr) if err != nil { fmt.Println("json failed, ", err) return } ctx, cancel := context.WithTimeout(context.Background(), time.Second) _, err = cli.Put(ctx, EtcdKey, string(data)) cancel() if err != nil { fmt.Println("put failed, err:", err) return } ctx, cancel = context.WithTimeout(context.Background(), time.Second) resp, err := cli.Get(ctx, EtcdKey) cancel() if err != nil { fmt.Println("get failed, err:", err) return } for _, ev := range resp.Kvs { fmt.Printf("%s : %s\n", ev.Key, ev.Value) } } func main() { SetLogConfToEtcd() }
注意
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/从前慢现在也慢/article/detail/469100
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。