赞
踩
编辑配置文件:
vim /etc/elasticsearch/elasticsearch.yml
# ---------------------------------- Cluster ----------------------------------- # # Use a descriptive name for your cluster: # cluster.name: CLUSTER-NAME #集群的名字 # # ------------------------------------ Node ------------------------------------ # # Use a descriptive name for the node: # node.name: node-1 #节点的名字 # # Add custom attributes to the node: # #node.attr.rack: r1 # # ----------------------------------- Paths ------------------------------------ # # Path to directory where to store the data (separate multiple locations by comma): # path.data: /var/lib/elasticsearch # # Path to log files: # path.logs: /var/log/elasticsearch # # ---------------------------------- Network ----------------------------------- # # Set the bind address to a specific IP (IPv4 or IPv6): # network.host: xxx.xxx.xxx.xxx # # Set a custom port for HTTP: # http.port: 9200 # # For more information, consult the network module documentation. # # --------------------------------- Discovery ---------------------------------- # # Pass an initial list of hosts to perform discovery when this node is started: # The default list of hosts is ["127.0.0.1", "[::1]"] # #discovery.seed_hosts: ["host1", "host2"] discovery.seed_hosts: ["127.0.0.1", "[::1]"] # # Bootstrap the cluster using an initial set of master-eligible nodes: # cluster.initial_master_nodes: ["node-1"] # # For more information, consult the discovery and cluster formation module documentation. xpack.license.self_generated.type: basic #指定license为basic
编辑配置文件:
vim /etc/kibana/kibana.yml
# Kibana is served by a back end server. This setting specifies the port to use. server.port: 5601 #server.port: 80 # Specifies the address to which the Kibana server will bind. IP addresses and host names are both valid values. # The default is 'localhost', which usually means remote machines will not be able to connect. # To allow connections from remote users, set this parameter to a non-loopback address. server.host: "xxx.xxx.xxx.xxx" #指定kibana的服务器地址 # Enables you to specify a path to mount Kibana at if you are running behind a proxy. # Use the `server.rewriteBasePath` setting to tell Kibana if it should remove the basePath # from requests it receives, and to prevent a deprecation warning at startup. # This setting cannot end in a slash. #server.basePath: "" # Specifies whether Kibana should rewrite requests that are prefixed with # `server.basePath` or require that they are rewritten by your reverse proxy. # This setting was effectively always `false` before Kibana 6.3 and will # default to `true` starting in Kibana 7.0. #server.rewriteBasePath: false # The maximum payload size in bytes for incoming server requests. #server.maxPayloadBytes: 1048576 # The Kibana server's name. This is used for display purposes. #server.name: "your-hostname" # The URLs of the Elasticsearch instances to use for all your queries. elasticsearch.hosts: ["http://xxx.xxx.xxx.xxx:9200"] #指定elasticsearch服务器的地址及端口 # When this setting's value is true Kibana uses the hostname specified in the server.host # setting. When the value of this setting is false, Kibana uses the hostname of the host # that connects to this Kibana instance. #elasticsearch.preserveHost: true # Kibana uses an index in Elasticsearch to store saved searches, visualizations and # dashboards. Kibana creates a new index if the index doesn't already exist. kibana.index: ".kibana" # The default application to load. #kibana.defaultAppId: "home" # If your Elasticsearch is protected with basic authentication, these settings provide # the username and password that the Kibana server uses to perform maintenance on the Kibana # index at startup. Your Kibana users still need to authenticate with Elasticsearch, which # is proxied through the Kibana server. #elasticsearch.username: "kibana_system" #elasticsearch.password: "pass" # Enables SSL and paths to the PEM-format SSL certificate and SSL key files, respectively. # These settings enable SSL for outgoing requests from the Kibana server to the browser. #server.ssl.enabled: false #server.ssl.certificate: /path/to/your/server.crt #server.ssl.key: /path/to/your/server.key # Optional settings that provide the paths to the PEM-format SSL certificate and key files. # These files are used to verify the identity of Kibana to Elasticsearch and are required when # xpack.security.http.ssl.client_authentication in Elasticsearch is set to required. #elasticsearch.ssl.certificate: /path/to/your/client.crt #elasticsearch.ssl.key: /path/to/your/client.key # Optional setting that enables you to specify a path to the PEM file for the certificate # authority for your Elasticsearch instance. #elasticsearch.ssl.certificateAuthorities: [ "/path/to/your/CA.pem" ] # To disregard the validity of SSL certificates, change this setting's value to 'none'. #elasticsearch.ssl.verificationMode: full # Time in milliseconds to wait for Elasticsearch to respond to pings. Defaults to the value of # the elasticsearch.requestTimeout setting. #elasticsearch.pingTimeout: 1500 # Time in milliseconds to wait for responses from the back end or Elasticsearch. This value # must be a positive integer. #elasticsearch.requestTimeout: 30000 # List of Kibana client-side headers to send to Elasticsearch. To send *no* client-side # headers, set this value to [] (an empty list). #elasticsearch.requestHeadersWhitelist: [ authorization ] # Header names and values that are sent to Elasticsearch. Any custom headers cannot be overwritte n # by client-side headers, regardless of the elasticsearch.requestHeadersWhitelist configuration. #elasticsearch.customHeaders: {} # Time in milliseconds for Elasticsearch to wait for responses from shards. Set to 0 to disable. #elasticsearch.shardTimeout: 30000 # Time in milliseconds to wait for Elasticsearch at Kibana startup before retrying. #elasticsearch.startupTimeout: 5000 # Logs queries sent to Elasticsearch. Requires logging.verbose set to true. #elasticsearch.logQueries: false # Specifies the path where Kibana creates the process ID file. #pid.file: /var/run/kibana.pid # Enables you specify a file where Kibana stores log output. #logging.dest: stdout # Set the value of this setting to true to suppress all logging output. #logging.silent: false # Set the value of this setting to true to suppress all logging output other than error messages. #logging.quiet: false # Set the value of this setting to true to log all events, including system usage information # and all requests. #logging.verbose: false # Set the interval in milliseconds to sample system and process performance # metrics. Minimum is 100ms. Defaults to 5000. #ops.interval: 5000 # Specifies locale to be used for all localizable strings, dates and number formats. # Supported languages are the following: English - en , by default , Chinese - zh-CN . #i18n.locale: "en" i18n.locale: "zh-CN" #修改界面语言为中文
根据安装位置的不同,配置文件存放在kafka根目录下的config文件夹中
vim /usr/share/kafka_2.12-2.5.0/config/server.properties
############################# Server Basics ############################# # The id of the broker. This must be set to a unique integer for each broker. broker.id=0 ############################# Socket Server Settings ############################# # The address the socket server listens on. It will get the value returned from # java.net.InetAddress.getCanonicalHostName() if not configured. # FORMAT: # listeners = listener_name://host_name:port # EXAMPLE: # listeners = PLAINTEXT://your.host.name:9092 listeners=PLAINTEXT://xxx.xxx.xxx.xxx:9092 # Hostname and port the broker will advertise to producers and consumers. If not set, # it uses the value for "listeners" if configured. Otherwise, it will use the value # returned from java.net.InetAddress.getCanonicalHostName(). advertised.listeners=PLAINTEXT://xxx.xxx.xxx.xxx:9092 # Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details #listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL # The number of threads that the server uses for receiving requests from the network and sendingresponses to the network num.network.threads=3 # The number of threads that the server uses for processing requests, which may include disk I/O num.io.threads=8 # The send buffer (SO_SNDBUF) used by the socket server socket.send.buffer.bytes=102400 # The receive buffer (SO_RCVBUF) used by the socket server socket.receive.buffer.bytes=102400 # The maximum size of a request that the socket server will accept (protection against OOM) socket.request.max.bytes=104857600 ############################# Log Basics ############################# # A comma separated list of directories under which to store log files log.dirs=/tmp/kafka-logs # The default number of log partitions per topic. More partitions allow greater # parallelism for consumption, but this will also result in more files across # the brokers. num.partitions=1 # The number of threads per data directory to be used for log recovery at startup and flushing at shutdown. # This value is recommended to be increased for installations with data dirs located in RAID array. num.recovery.threads.per.data.dir=1 ############################# Internal Topic Settings ############################# # The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state" # For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3. offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 ############################# Log Flush Policy ############################# # Messages are immediately written to the filesystem but by default we only fsync() to sync # the OS cache lazily. The following configurations control the flush of data to disk. # There are a few important trade-offs here: # 1. Durability: Unflushed data may be lost if you are not using replication. # 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occuras there will be a lot of data to flush. # 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks. # The settings below allow one to configure the flush policy to flush data after a period of time or # every N messages (or both). This can be done globally and overridden on a per-topic basis. # The number of messages to accept before forcing a flush of data to disk #log.flush.interval.messages=10000 # The maximum amount of time a message can sit in a log before we force a flush #log.flush.interval.ms=1000 ############################# Log Retention Policy ############################# # The following configurations control the disposal of log segments. The policy can # be set to delete segments after a period of time, or after a given size has accumulated. # A segment will be deleted whenever *either* of these criteria are met. Deletion always happens # from the end of the log. # The minimum age of a log file to be eligible for deletion due to age log.retention.hours=168 # A size-based retention policy for logs. Segments are pruned from the log unless the remaining # segments drop below log.retention.bytes. Functions independently of log.retention.hours. #log.retention.bytes=1073741824 # The maximum size of a log segment file. When this size is reached a new log segment will be created. log.segment.bytes=1073741824 # The interval at which log segments are checked to see if they can be deleted according # to the retention policies log.retention.check.interval.ms=300000 ############################# Zookeeper ############################# # Zookeeper connection string (see zookeeper docs for details). # This is a comma separated host:port pairs, each corresponding to a zk # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". # You can also append an optional chroot string to the urls to specify the # root directory for all kafka znodes. zookeeper.connect=xxx.xxx.xxx.xxx:2181 # Timeout in ms for connecting to zookeeper zookeeper.connection.timeout.ms=18000 ############################# Group Coordinator Settings ############################# # The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance. # The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms. # The default value for this is 3 seconds. # We override this to 0 here as it makes for a better out-of-the-box experience for development and testing. # However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup. group.initial.rebalance.delay.ms=0
直接启动kafka的话,就无法关闭terminal,远程连接的时候,这个session就不能做其他操作了,所以要要使用不返回信息,并且关闭后仍然继续运行的启动命令
任意位置,做成以下启动脚本,追加到随系统启动
vim /home/scripts/kafkastart.sh
输入以下内容
#!/bin/bash
nohup /usr/share/kafka_2.12-2.5.0/bin/kafka-server-start.sh /usr/share/kafka_2.12-2.5.0/config/server.properties &
vim /usr/share/kafka/config/zookeeper.properties
# Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # the directory where the snapshot is stored. dataDir=/tmp/zookeeper #dataLogDir=/tmp/zookeeper/logs # the port at which the clients will connect clientPort=2181 # disable the per-ip limit on the number of connections since this is a non-production config maxClientCnxns=0 # Disable the adminserver by default to avoid port conflicts. # Set the port to something non-conflicting if choosing to enable this admin.enableServer=false # admin.serverPort=8080 #whitelist 4lw.commands.whitelist=* #白名单,否则会报错
和kafka一样,启动zookeeper也需要特别的启动命令
做成启动脚本
vim /home/scripts/zookeeperstart.sh
输入以下内容
#!/bin/bash
nohup /usr/share/kafka_2.12-2.5.0/bin/zookeeper-server-start.sh /usr/share/kafka_2.12-2.5.0/config/zookeeper.properties &
配置文件的模板存放在以下位置
/etc/logstash/logstash-sample.conf
将该文件复制到conf.d文件夹
cp /etc/logstash/logstash-sample.conf /etc/logstash/conf.d/logstash.conf
默认配置文件
# Sample Logstash configuration for creating a simple # Beats -> Logstash -> Elasticsearch pipeline. #从beats接收消息 input { beats { port => 5044 } } #输出到elasticsearch output { elasticsearch { #elasticsearch的地址 hosts => ["http://localhost:9200"] index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}" #index的格式 #user => "elastic" #password => "changeme" } }
以下是从kafka接收消息,然后进行分割后的配置文件
input { #从kafka接收消息 kafka { bootstrap_servers => ["xxx.xxx.xxx.xxx:9092"] #kafka的地址,必须与kafka中设定的一致,不能一个为域名,一个为ip topics => ["sg_toelk"] #kafka中建立的topic名 auto_offset_reset => "latest" } } filter{ #对field:message进行分割 #未指定分隔符时,默认按照换行符"\n"分割消息 #因为源消息在kafka中多条合并后发送过来,在elasticsearch中会多条混合在一起显示,无法进行统计和分析 split{ field=>"message" } #对消息进行处理,添加字段,便于在elasticsearch中统计分析 #使用的是正则表达式 grok{ match => {"message" => "\"\[(?<Date>[0-9]{2}\/[a-zA-Z]{3}\/[0-9]{4}):(?<Time>[0-9]{2}:[0-9]{2}:[0-9]{2}) (?<Timezone>\+0800)\]\" \"ZCR02_ASG\"\ (?<name>[^\ ]+) (?<ClientIP>[0-9]{3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}) (?<DstURL>.*?\:\/\/[^ ]+?) (?<HttpPort>[0-9]{1,4}) (?<SentBytes>[0-9]+?) (?<RecvBytes>[0-9]+?) (?<Method>[a-zA-Z]+?) (?<ProxyAction>[^\ ]+?) (?<StatCode>[0-9]{3}) (?<UserAgent>[^\r]+)" } } #由于默认变量@timestamp的时间是消息生成的时间,而不是日志生成的时间 #所以需要进行变换,追加新的field:l_datetime,将grok中取得的变量 Date和Time合并后赋值给l_datetime mutate{ add_field => {"l_datetime" => "%{Date} %{Time}"} } #使用date插件,将l_datetime的属性转变成日期属性,并指定格式 #最后将这个值赋予变量@timestamp #remove_field可以删除变量,不在message中显示 date{ match => ["l_datetime", "dd/MMM/yyy HH:mm:ss"] target => "@timestamp" # remove_field => ["datetime"] } } output { elasticsearch { #指定elasticsearch服务器地址 hosts => ["xxx.xxx.xxx.xxx:9200"] #指定index,并追加日期,以便搜索时限定时间范围 index => "sg_toelk-%{+YYY.MM.dd}" #强制输出格式未json codec => "json" } }
Grok插件中使用了正则表达式来划分日志的字段
grok{
match => {"message" => "\"\[(?<Date>[0-9]{2}\/[a-zA-Z]{3}\/[0-9]{4}):(?<Time>[0-9]{2}:[0-9]{2}:[0-9]{2}) (?<Timezone>\+0800)\]\" \"ZCR02_ASG\"\ (?<name>[^\ ]+) (?<ClientIP>[0-9]{3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}) (?<DstURL>.*?\:\/\/[^ ]+?) (?<HttpPort>[0-9]{1,4}) (?<SentBytes>[0-9]+?) (?<RecvBytes>[0-9]+?) (?<Method>[a-zA-Z]+?) (?<ProxyAction>[^\ ]+?) (?<StatCode>[0-9]{3}) (?<UserAgent>[^\r]+)"
}
依照日志的格式逐字逐句的指定各个字段的内容
日志原文和正则表达式的对比
"[24/Aug/2020:14:33:55 +0800]" "ProxyName" 张三 1.1.1.1 tcp://baidu.com:443/ 443 1412 53767 CONNECT TCP_TUNNELED 200 "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Rocket.Chat/2.15.3 Chrome/69.0.3497.128 Electron/4.1.5 Safari/537.36"
\"\[(?<Date>[0-9]{2}\/[a-zA-Z]{3}\/[0-9]{4}):(?<Time>[0-9]{2}:[0-9]{2}:[0-9]{2}) (?<Timezone>\+0800)\]\" \"ProxyName\"\ (?<name>[^\ ]+) (?<ClientIP>[0-9]{3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}) (?<DstURL>.*?\:\/\/[^ ]+?) (?<HttpPort>[0-9]{1,4}) (?<SentBytes>[0-9]+?) (?<RecvBytes>[0-9]+?) (?<Method>[a-zA-Z]+?) (?<ProxyAction>[^\ ]+?) (?<StatCode>[0-9]{3}) (?<UserAgent>[^\r]+)
指定字段
语句格式
(?<字段名>值)
(?<Date>[0-9]{2}\/[a-zA-Z]{3}\/[0-9]{4})
#Date对应的值24/Aug/2020
这样在elasticsearch中就可以搜索字段Date是24/Aug/2020的日志有哪些
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。