赞
踩
cat >> /etc/hosts << EOF
192.168.2.128 node-1
192.168.2.129 node-2
192.168.2.130 node-3
EOF
yum install java-1.8.0-openjdk -y
java -version
openjdk version "1.8.0_262"
mkdir -p /soft/src
cd /soft/src
下载kafka(kafka中自带zookeeper)
wget https://mirrors.bfsu.edu.cn/apache/kafka/2.8.0/kafka_2.12-2.8.0.tgz
tar xf kafka_2.12-2.8.0.tgz
mv kafka_2.12-2.8.0 kafka
scp -r kafka root@node-2:/soft/src/
scp -r kafka root@node-3:/soft/src/
配置zookeeper集群。 cd kafka/config/ [root@node-2 config]# egrep -v '^#|^$' zookeeper.properties dataDir=/soft/src/kafka/zookeeper clientPort=2181 maxClientCnxns=0 initLimit=10 syncLimit=5 server.1=node-1:2888:3888 server.2=node-2:2889:3889 server.3=node-3:2890:3890 scp /soft/src/kafka/config/zookeeper.properties root@node-2:/soft/src/kafka/config scp /soft/src/kafka/config/zookeeper.properties root@node-3:/soft/src/kafka/config ssh root@node-2 'mkdir /soft/src/kafka/zookeeper' ssh root@node-2 'echo 2 > /soft/src/kafka/zookeeper/myid' ssh root@node-3 'mkdir /soft/src/kafka/zookeeper' ssh root@node-3 'echo 3 > /soft/src/kafka/zookeeper/myid' 启动zookeeper /soft/src/kafka/bin/zookeeper-server-start.sh -daemon /soft/src/kafka/config/zookeeper.properties 验证是否启动,有时候没启动也不报错 ps -ef|grep zookeeper.properties
配置kafka集群 [root@localhost config]$egrep -v '^#|^$' server.properties #### 注意,broker.id后面只能写数字,写注释一定会报错 broker.id=1 #三个节点不能一样 listeners=PLAINTEXT://:9092 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/soft/src/kafka/kafka-logs num.partitions=1 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 zookeeper.connect=192.168.106.7:2181,192.168.106.8:2181,192.168.106.9:2181 zookeeper.connection.timeout.ms=6000 group.initial.rebalance.delay.ms=0 scp /soft/src/kafka/config/server.properties root@node-2:/soft/src/ scp /soft/src/kafka/config/server.properties root@node-3:/soft/src/ 启动kafka /soft/src/kafka/bin/kafka-server-start.sh -daemon /soft/src/kafka/config/server.properties ps -ef|grep server.properties
创建topic /soft/src/kafka/bin/kafka-topics.sh --create --zookeeper 192.168.2.128:2181,192.168.2.129:2181,192.168.2.130:2181 --replication-factor 3 --partitions 1 --topic test-topic 注意:如果只有一个kafka,--replication-factor 这个为 1 查看topic /soft/src/kafka/bin/kafka-topics.sh --list --zookeeper 192.168.2.128:2181,192.168.2.129:2181,192.168.2.130:2181 生产消息 /soft/src/kafka/bin/kafka-console-producer.sh --broker-list 192.168.2.128:9092,192.168.2.129:9092,192.168.2.130:9092 --topic test-topic 消费消息 /soft/src/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.2.128:9092,192.168.2.129:9092,192.168.2.130:9092 --topic test-topic --from-beginning 查看topic消费进度 /soft/src/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 172.16.0.15:9092 --topic prod-report --time -1 删除topic ./kafka-topics.sh --delete --zookeeper 172.16.0.15:2181 --topic prod-report
cd /soft/src wget https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-7.13.1-linux-x86_64.tar.gz tar xf filebeat-7.13.1-linux-x86_64.tar.gz cd filebeat-7.13.1-linux-x86_64/ ./filebeat modules enable nginx 配置filebeat_nginx.yml (记得注释kafka version,不然报错) vim filebeat_nginx.yml filebeat.modules: - module: nginx access: enabled: true var.paths: ["/var/log/nginx/access.log*"] error: enabled: true var.paths: ["/var/log/nginx/error.log*"] #----------------------------------Kafka output--------------------------------# output.kafka: enabled: true hosts: ['xxx:9092', 'xxx:9092', 'xxx:9092'] topic: 'test-topic' required_acks: 1 #default compression: gzip #default max_message_bytes: 1000000 #default codec.format: string: '%{[message]}' 启动filebeat nohub ./filebeat -e -c filebeat_nginx.yml & 然后访问nginx,再启动kafka消费查看有日志输出
vim filebeat-test.yml filebeat.inputs: - type: log enabled: true paths: - /biz-code/logs/merchant/*.log multiline: pattern: '^\d{4}-\d{1,2}-\d{1,2}\s\d{1,2}:\d{1,2}:\d{1,2}' negate: true match: after fields: log_topics: merchant logtype: merchant-log - type: log enabled: true paths: - /biz-code/logs/report/*.log multiline: pattern: '^\d{4}-\d{1,2}-\d{1,2}\s\d{1,2}:\d{1,2}:\d{1,2}' negate: true match: after fields: log_topics: report logtype: report-log output.kafka: enabled: true hosts: ["172.16.0.15:9092"] topic: '%{[fields][log_topics]}'
注意:在腾讯云服务器和阿里云服务器上面做这一步的时候会有问题 2020-12-02T19:16:44.589+0800 INFO [publisher] pipeline/retry.go:219 retryer: send unwait signal to consumer 2020-12-02T19:16:44.590+0800 INFO [publisher] pipeline/retry.go:223 done 2020-12-02T19:16:44.590+0800 INFO [publisher] pipeline/retry.go:213 retryer: send wait signal to consumer 2020-12-02T19:16:44.590+0800 INFO [publisher] pipeline/retry.go:217 done 2020-12-02T19:16:44.590+0800 INFO [publisher] pipeline/retry.go:219 retryer: send unwait signal to consumer 2020-12-02T19:16:44.591+0800 INFO [publisher] pipeline/retry.go:223 done 2020-12-02T19:16:53.325+0800 INFO [publisher] pipeline/retry.go:219 retryer: send unwait signal to consumer 2020-12-02T19:16:53.325+0800 INFO [publisher] pipeline/retry.go:223 done 2020-12-02T19:17:05.584+0800 INFO [publisher] pipeline/retry.go:219 retryer: send unwait signal to consumer 2020-12-02T19:17:05.584+0800 INFO [publisher] pipeline/retry.go:223 done 它会一直循环这个报错,通过在配置文件中加入 #logging.level: debug 来进行调试,发现 `Kafka publish failed with: dial tcp: lookup VM-0-15-centos on 1.6.82.98:53: no such host` 这个报错 解决:在filebeat主机hosts中加入kafka主机的ip+主机名 vim /etc/hosts 1.1.1.1 VM-0-15-centos
然后就可以看到日志出现在kafka中
cd /soft/src wget https://artifacts.elastic.co/downloads/logstash/logstash-7.9.2.tar.gz tar xf logstash-7.9.2.tar.gz cd logstash/config vim nginx.conf input{ kafka { bootstrap_servers => ["192.168.2.128:9092,192.168.2.129:9092,192.168.2.130:9092"] auto_offset_reset => "latest" consumer_threads => 3 decorate_events => true topics => ["test-topic"] codec => "json" } } output { elasticsearch { hosts => ["42.193.12.10:9200"] index => "kafkalog-%{+YYYY.MM.dd}" # 这里定义的index就是kibana里面显示的索引名称 } } 启动,等es安装完了再启动 cd ../bin nohup ./logstash -f ../config/nginx.conf >> logstash.log & 检查 ps -ef|grep logstash
cd config/ vim pipelines.yml - pipeline.id: report path.config: "/soft/src/logstash/config/report.conf" - pipeline.id: merchant path.config: "/soft/src/logstash/config/merchant.conf" vim report.conf input{ kafka { bootstrap_servers => ["172.16.0.15:9092"] auto_offset_reset => "latest" consumer_threads => 3 decorate_events => true topics => ["report"] codec => "json" } } filter { json { source => "message" } } output { elasticsearch { hosts => ["172.16.0.15:9222"] index => "report-%{+YYYY-MM-dd}" } } vim merchant.conf input{ kafka { bootstrap_servers => ["172.16.0.15:9092"] auto_offset_reset => "latest" consumer_threads => 3 decorate_events => true topics => ["merchant"] codec => "json" } } filter { json { source => "message" } } output { elasticsearch { hosts => ["172.16.0.15:9222"] index => "merchant-%{+YYYY-MM-dd}" } } cd /soft/src/logstash/bin ./logstash > logstash.log
docker run -p 9200:9200 -p 9330:9300 -itd -e "discovery.type=single-node" --name es \
-v /es_data:/usr/share/elasticsearch/data \
docker.elastic.co/elasticsearch/elasticsearch:7.9.2
这里可能es容器会起不来,报错 `AccessDeniedException[/usr/share/elasticsearch/data/nodes];`
解决:chmod 777 /es_data
docker run -p 5601:5601 -it -d --link es -e ELASTICSEARCH_URL=http://localhost:9200 \
--name kibana kibana:7.9.2
docker exec -it kibana /bin/bash
vi config/kibana.yml
elasticsearch.hosts: [ "http://42.193.12.10:9200/" ]
docker restart kibana
访问:http://42.193.12.10:5601/
这里就是logstash里面定义的index
这里展示的就是filebeat收集的nginx日志了
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。