赞
踩
- jdbc {
- type => "jdbc"
- jdbc_connection_string => "jdbc:mysql://IP:3306/数据库名?characterEncoding=UTF-8&useUnicode=true&useSSL=false&tinyInt1isBit=false&allowPublicKeyRetrieval=true&serverTimezone=Asia/Shanghai&autoReconnect=true"
- # 数据库连接账号密码;
- jdbc_user => "账号"
- jdbc_password => "密码"
- # MySQL依赖包路径;
- jdbc_driver_library => "/usr/share/logstash/config/messages/mysql-connector-java-8.0.21.jar"
- # the name of the driver class for mysql
- jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
- #jdbc_driver_class => "com.mysql.jdbc.Driver"
- jdbc_default_timezone => "Asia/Shanghai"
- statement => "SELECT * FROM sys_log_test"
- #schedule => "* * * * *"
- codec => json_lines
- }
备注:以上配置为全量同步表数据,logstash在启动后自动开始全量同步,并且要注意数据库驱动包,如果是docker部署的logstash,需要把驱动包挂载或拷贝到容器中
- tcp {
- host => "0.0.0.0"
- port => 4560
- mode => "server"
- tags => ["tags"]
- codec => json_lines
- }
- #kafka输入源配置
- kafka {
- type => "kafka"
- #kafka集群地址
- bootstrap_servers => "10.7.11.191:9092"
- enable_auto_commit => true
- auto_commit_interval_ms => "1000"
-
- #sasl_mechanism => "GSSAPI"
- #security_protocol => "SASL_PLAINTEXT"
- #sasl_kerberos_service_name => "kafka"
- #auto_commit_interval_ms => "1000"
- #auto_offset_reset => "latest"
- #从kafka中哪个topic读取数据,这里的topic名要与filebeat中使用的topic保持一致
- topics => ["test191"]
- #group_id => "sink-logstash-test"
- #kafka消费者组中每个消费者的ID,默认值是“logstash”
- #client_id => "node01-test-i"
- #logstash的消费线程,一般一个线程对应kafka中的一个partition(分区),同一组logstash的consumer_threads之和应该不大于一个topic的partition,超过了就是资源的浪费,一般的建议是相等。
- #consumer_threads => 20
- #由于beat传输数据给kafka集群的时候,会附加很多tag,默认情况下,logstash就会将这串tag也认为是message的一部分。这样不利于后期的数据处理。所有需要添加codec处理。得到原本的message数据。
- codec => json
- #jaas_path => "/path/logstash/config/jaas.conf"
- #kerberos_config => "/path/logstash/config/krb5.conf"
- }
备注:kafka配置codec一定要注意json格式,不是json_lines;其他相关配置我都注释掉了,根据需要自己配置。
此文件可以不配置也不影响,jass.conf文件内容:
- KafkaClient {
- com.sun.security.auth.module.Krb5LoginModule required
- useKeyTab=true
- storeKey=true
- useTicketCache=true
- serviceName="kafka"
- keyTab="xxx.keytab"
- principal="xxx";
- };
-
-
- output {
-
- if [type] == "jdbc" {
- elasticsearch {
- hosts => "IP:9200"
- index => "logstash-jdbc"
- document_id => "%{id}"
- }
- }else if [type] == "kafka" {
- elasticsearch {
- hosts => "IP:9200"
- index => "logstash-kafka-%{+YYY.MM.dd}"
- }
- }else{
- elasticsearch {
- hosts => "IP:9200"
- index => "logstash-%{+YYY.MM.dd}"
- }
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。