当前位置:   article > 正文

logstash配置input为jdbc,tcp,kafka_logstash配置kafka

logstash配置kafka

1.配置jdbc在logstash配置文件input块中填写以下内容

  1. jdbc {
  2. type => "jdbc"
  3. jdbc_connection_string => "jdbc:mysql://IP:3306/数据库名?characterEncoding=UTF-8&useUnicode=true&useSSL=false&tinyInt1isBit=false&allowPublicKeyRetrieval=true&serverTimezone=Asia/Shanghai&autoReconnect=true"
  4. # 数据库连接账号密码;
  5. jdbc_user => "账号"
  6. jdbc_password => "密码"
  7. # MySQL依赖包路径;
  8. jdbc_driver_library => "/usr/share/logstash/config/messages/mysql-connector-java-8.0.21.jar"
  9. # the name of the driver class for mysql
  10. jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
  11. #jdbc_driver_class => "com.mysql.jdbc.Driver"
  12. jdbc_default_timezone => "Asia/Shanghai"
  13. statement => "SELECT * FROM sys_log_test"
  14. #schedule => "* * * * *"
  15. codec => json_lines
  16. }

备注:以上配置为全量同步表数据,logstash在启动后自动开始全量同步,并且要注意数据库驱动包,如果是docker部署的logstash,需要把驱动包挂载或拷贝到容器中

2.配置tcp在logstash配置文件input块中填写以下内容

  1. tcp {
  2. host => "0.0.0.0"
  3. port => 4560
  4. mode => "server"
  5. tags => ["tags"]
  6. codec => json_lines
  7. }

3.配置kafka在logstash配置文件input块中填写以下内容  

  1. #kafka输入源配置
  2. kafka {
  3. type => "kafka"
  4. #kafka集群地址
  5. bootstrap_servers => "10.7.11.191:9092"
  6. enable_auto_commit => true
  7. auto_commit_interval_ms => "1000"
  8. #sasl_mechanism => "GSSAPI"
  9. #security_protocol => "SASL_PLAINTEXT"
  10. #sasl_kerberos_service_name => "kafka"
  11. #auto_commit_interval_ms => "1000"
  12. #auto_offset_reset => "latest"
  13. #从kafka中哪个topic读取数据,这里的topic名要与filebeat中使用的topic保持一致
  14. topics => ["test191"]
  15. #group_id => "sink-logstash-test"
  16. #kafka消费者组中每个消费者的ID,默认值是“logstash”
  17. #client_id => "node01-test-i"
  18. #logstash的消费线程,一般一个线程对应kafka中的一个partition(分区),同一组logstash的consumer_threads之和应该不大于一个topic的partition,超过了就是资源的浪费,一般的建议是相等。
  19. #consumer_threads => 20
  20. #由于beat传输数据给kafka集群的时候,会附加很多tag,默认情况下,logstash就会将这串tag也认为是message的一部分。这样不利于后期的数据处理。所有需要添加codec处理。得到原本的message数据。
  21. codec => json
  22. #jaas_path => "/path/logstash/config/jaas.conf"
  23. #kerberos_config => "/path/logstash/config/krb5.conf"
  24. }

备注:kafka配置codec一定要注意json格式,不是json_lines;其他相关配置我都注释掉了,根据需要自己配置。

此文件可以不配置也不影响,jass.conf文件内容:

  1. KafkaClient {
  2. com.sun.security.auth.module.Krb5LoginModule required
  3. useKeyTab=true
  4. storeKey=true
  5. useTicketCache=true
  6. serviceName="kafka"
  7. keyTab="xxx.keytab"
  8. principal="xxx";
  9. };

配置对应的logstash输出到elasticsearch

  1. output {
  2. if [type] == "jdbc" {
  3. elasticsearch {
  4. hosts => "IP:9200"
  5. index => "logstash-jdbc"
  6. document_id => "%{id}"
  7. }
  8. }else if [type] == "kafka" {
  9. elasticsearch {
  10. hosts => "IP:9200"
  11. index => "logstash-kafka-%{+YYY.MM.dd}"
  12. }
  13. }else{
  14. elasticsearch {
  15. hosts => "IP:9200"
  16. index => "logstash-%{+YYY.MM.dd}"
  17. }
  18. }
  19. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家小花儿/article/detail/580407
推荐阅读
相关标签
  

闽ICP备14008679号