赞
踩
#因为elasticsearch 是基于java平台,所以需要先安装java [root@localhost ~]# java -version #查看是否安装java [root@localhost ~]# yum search java #查找java 版本 [root@localhost ~]# yum install java-17-openjdk-headless.x86_64 #安装java-17-openjdk-headless 版本 [root@localhost ~]# java -version openjdk version "17.0.6" 2023-01-17 LTS OpenJDK Runtime Environment (Red_Hat-17.0.6.0.10-3.el9) (build 17.0.6+10-LTS) OpenJDK 64-Bit Server VM (Red_Hat-17.0.6.0.10-3.el9) (build 17.0.6+10-LTS, mixed mode, sharing) #安装elasticsearch [root@localhost home]# wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-8.12.2-linux-x86_64.tar.gz #下载elasticsearch,也可以直接官网下载https://www.elastic.co/cn/elasticsearch 安装包 [root@localhost home]# tar -zxvf elasticsearch-8.12.2-linux-x86_64.tar.gz #解压 #启动elasticsearch [root@localhost home]# cd /home/elasticsearch-8.12.2/ [root@localhost elasticsearch-8.12.2]# ./bin/elasticsearch CompileCommand: exclude org/apache/lucene/util/MSBRadixSorter.computeCommonPrefixLengthAndBuildHistogram bool exclude = true CompileCommand: exclude org/apache/lucene/util/RadixSelector.computeCommonPrefixLengthAndBuildHistogram bool exclude = true 二月 23, 2024 11:25:43 上午 sun.util.locale.provider.LocaleProviderAdapter <clinit> WARNING: COMPAT locale provider will be removed in a future release [2024-02-23T11:25:44,075][ERROR][o.e.b.Elasticsearch ] [localhost] fatal exception while booting Elasticsearchjava.lang.RuntimeException: can not run elasticsearch as root at org.elasticsearch.server@8.12.2/org.elasticsearch.bootstrap.Elasticsearch.initializeNatives(Elasticsearch.java:282) at org.elasticsearch.server@8.12.2/org.elasticsearch.bootstrap.Elasticsearch.initPhase2(Elasticsearch.java:167) at org.elasticsearch.server@8.12.2/org.elasticsearch.bootstrap.Elasticsearch.main(Elasticsearch.java:72) See logs for more details. ERROR: Elasticsearch did not exit normally - check the logs at /home/elasticsearch-8.12.2/logs/elasticsearch.log ERROR: Elasticsearch exited unexpectedly, with exit code 1 #查看elasticsearch的日志 /home/elasticsearch-8.12.2/logs/elasticsearch.log,我这边提示是不能以root用户启动elasticsearch [root@localhost home]# useradd es #添加用户 [root@localhost home]# passwd es #密码 更改用户 es 的密码 。 新的密码: 无效的密码: 密码少于 8 个字符 重新输入新的密码: passwd:所有的身份验证令牌已经成功更新。 [root@localhost home]# chown -R es:es /home/elasticsearch-8.12.2 #更改文件所在的用户和组 [root@localhost home]# chmod -R 777 /home/elasticsearch-8.12.2 #修改权限 [root@localhost home]# su es #切换es用户 [es@localhost elasticsearch-8.12.2]$ ./bin/elasticsearch #启动elasticsearch (-d 是后台启动) [2024-02-23T14:17:51,625][WARN ][o.e.h.n.Netty4HttpServerTransport] [localhost] received plaintext http traffic on an https channel, closing connection Netty4HttpChannel{localAddress=/192.168.243.134:9200, remoteAddress=/192.168.243.1:62306} #浏览器访问http://ip:9200 提示此错误,因为elasticsearch 默认开启了ssl认证,修改(config/elasticsearch.yml 中xpack.security.enabled:false 改为false) [es@localhost elasticsearch-8.12.2]$ ./bin/elasticsearch #重新启动
mysql8相关的安装可以看下另一篇博客
https://editor.csdn.net/md/?articleId=135905811
1.下载安装logstash
[root@localhost home]# wget https://artifacts.elastic.co/downloads/logstash/logstash-8.12.2-linux-x86_64.tar.gz #与elasticsearch 同版本
[root@localhost home]# tar -zxvf logstash-8.12.2-linux-x86_64.tar.gz #解压
2.logstash 配置
logstash.yml
# Settings file in YAML # # Settings can be specified either in hierarchical form, e.g.: # 使用分层形式设置管道批处理大小和批处理延迟 # pipeline: # batch: # size: 125 #管道批处理大小 # delay: 5 #管道批处理延迟 # # Or as flat keys: # 要表达与平面关键点相同的值 # pipeline.batch.size: 125 # pipeline.batch.delay: 5 # # ------------ Node identity ------------ # # Use a descriptive name for the node: # # node.name: test #节点名称,在集群中具备唯一性,默认为logstash主机的主机名 # # If omitted the node name will default to the machine's host name # # ------------ Data path ------------------ # # Which directory should be used by logstash and its plugins # for any persistent needs. Defaults to LOGSTASH_HOME/data # # path.data: #Logstash及其插件使用目录 # # ------------ Pipeline Settings -------------- # # The ID of the pipeline. # # pipeline.id: main #管道id,默认为main # # Set the number of workers that will, in parallel, execute the filters+outputs # stage of the pipeline. # # This defaults to the number of the host's CPU cores. # # pipeline.workers: 2 #并行执行管道的筛选器和输出阶段的工作者的数量,默认值为CPU的核数 # # How many events to retrieve from inputs before sending to filters+workers # # pipeline.batch.size: 125 #单个工作进程从输入中收集的最大事件数 # # How long to wait in milliseconds while polling for the next event # before dispatching an undersized batch to filters+outputs # # pipeline.batch.delay: 50 #当创建管道事件批处理时,在向管道工作人员发送一个较小的批处理之前,等待每个事件的时间为多少毫秒 # # Force Logstash to exit during shutdown even if there are still inflight # events in memory. By default, logstash will refuse to quit until all # received events have been pushed to the outputs. # # WARNING: Enabling this can lead to data loss during shutdown # # pipeline.unsafe_shutdown: false #当设置为true时,即使内存中仍然存在游离事件,也会在关闭期间强制Logstash退出,默认情况下,Logstash将拒绝退出,直到所有接收到的事件都被推送到输出,启用此选项可能导致关闭期间的数据丢失 # # Set the pipeline event ordering. Options are "auto" (the default), "true" or "false". # "auto" automatically enables ordering if the 'pipeline.workers' setting # is also set to '1', and disables otherwise. # "true" enforces ordering on the pipeline and prevent logstash from starting # if there are multiple workers. # "false" disables any extra processing necessary for preserving ordering. # # pipeline.ordered: auto #1.pipeline.ordered: auto并且pipeline.workers: 1,会自动启用事件排序;2.设置为true,强制排序;3.false 禁止排序 # # Sets the pipeline's default value for `ecs_compatibility`, a setting that is # available to plugins that implement an ECS Compatibility mode for use with # the Elastic Common Schema. # Possible values are: # - disabled # - v1 # - v8 (default) # Pipelines defined before Logstash 8 operated without ECS in mind. To ensure a # migrated pipeline continues to operate as it did before your upgrade, opt-OUT # of ECS for the individual pipeline in its `pipelines.yml` definition. Setting # it here will set the default for _all_ pipelines, including new ones. # # pipeline.ecs_compatibility: v8 # # ------------ Pipeline Configuration Settings -------------- # # Where to fetch the pipeline configuration for the main pipeline # # path.config: #管道的Logstash配置路径 # # Pipeline configuration string for the main pipeline # # config.string: #包含要用于主管道的管道配置的字符串 # # At startup, test if the configuration is valid and exit (dry run) # # config.test_and_exit: false #设置为true,启动时测试配置是否有效并退出,默认false # # Periodically check if the configuration has changed and reload the pipeline # This can also be triggered manually through the SIGHUP signal # # config.reload.automatic: false #定期检查配置是否已更新并重新加载,默认false # # How often to check if the pipeline configuration has changed (in seconds) # Note that the unit value (s) is required. Values without a qualifier (e.g. 60) # are treated as nanoseconds. # Setting the interval this way is not recommended and might change in later versions. # # config.reload.interval: 3s #间隔多少秒检查管道中的配置是否更改 # # Show fully compiled configuration as debug log message # NOTE: --log.level must be 'debug' # # config.debug: false #设置为true时,将完整编译的配置显示为debug日志消息,你还必须设置log.level: debug # # When enabled, process escaped characters such as \n and \" in strings in the # pipeline configuration files. # # config.support_escapes: false #是否开启字符串转义 # # ------------ API Settings ------------- # Define settings related to the HTTP API here. # # The HTTP API is enabled by default. It can be disabled, but features that rely # on it will not work as intended. # # api.enabled: true #是否开启http访问 # # By default, the HTTP API is not secured and is therefore bound to only the # host's loopback interface, ensuring that it is not accessible to the rest of # the network. # When secured with SSL and Basic Auth, the API is bound to _all_ interfaces # unless configured otherwise. # # api.http.host: 127.0.0.1 #http访问地址 # # The HTTP API web server will listen on an available port from the given range. # Values can be specified as a single port (e.g., `9600`), or an inclusive range # of ports (e.g., `9600-9700`). # # api.http.port: 9600-9700 #http访问端口 # # The HTTP API includes a customizable "environment" value in its response, # which can be configured here. # # api.environment: "production" #http响应环境值 # # The HTTP API can be secured with SSL (TLS). To do so, you will need to provide # the path to a password-protected keystore in p12 or jks format, along with credentials. # # api.ssl.enabled: false #是否开启ssl # api.ssl.keystore.path: /path/to/keystore.jks #ssl key证书路径 # api.ssl.keystore.password: "y0uRp4$$w0rD" #ssl key密码 # # The availability of SSL/TLS protocols depends on the JVM version. Certain protocols are # disabled by default and need to be enabled manually by changing `jdk.tls.disabledAlgorithms` # in the $JDK_HOME/conf/security/java.security configuration file. # # api.ssl.supported_protocols: [TLSv1.2,TLSv1.3] # # The HTTP API can be configured to require authentication. Acceptable values are # - `none`: no auth is required (default) # - `basic`: clients must authenticate with HTTP Basic auth, as configured # with `api.auth.basic.*` options below # api.auth.type: none # # When configured with `api.auth.type` `basic`, you must provide the credentials # that requests will be validated against. Usage of Environment or Keystore # variable replacements is encouraged (such as the value `"${HTTP_PASS}"`, which # resolves to the value stored in the keystore's `HTTP_PASS` variable if present # or the same variable from the environment) # # api.auth.basic.username: "logstash-user" # api.auth.basic.password: "s3cUreP4$$w0rD" # # When setting `api.auth.basic.password`, the password should meet # the default password policy requirements. # The default password policy requires non-empty minimum 8 char string that # includes a digit, upper case letter and lower case letter. # Policy mode sets Logstash to WARN or ERROR when HTTP authentication password doesn't # meet the password policy requirements. # The default is WARN. Setting to ERROR enforces stronger passwords (recommended). # # api.auth.basic.password_policy.mode: WARN # # ------------ Module Settings --------------- # Define modules here. Modules definitions must be defined as an array. # The simple way to see this is to prepend each `name` with a `-`, and keep # all associated variables under the `name` they are associated with, and # above the next, like this: #模块定义,必须为数组 # 模块变量名格式必须为var.PLUGIN_TYPE.PLUGIN_NAME.KEY # # modules: # - name: MODULE_NAME # var.PLUGINTYPE1.PLUGINNAME1.KEY1: VALUE # var.PLUGINTYPE1.PLUGINNAME1.KEY2: VALUE # var.PLUGINTYPE2.PLUGINNAME1.KEY1: VALUE # var.PLUGINTYPE3.PLUGINNAME3.KEY1: VALUE # # Module variable names must be in the format of # # var.PLUGIN_TYPE.PLUGIN_NAME.KEY # # modules: # # ------------ Cloud Settings --------------- # Define Elastic Cloud settings here. # Format of cloud.id is a base64 value e.g. dXMtZWFzdC0xLmF3cy5mb3VuZC5pbyRub3RhcmVhbCRpZGVudGlmaWVy # and it may have an label prefix e.g. staging:dXMtZ... # This will overwrite 'var.elasticsearch.hosts' and 'var.kibana.host' # cloud.id: <identifier> # # Format of cloud.auth is: <user>:<pass> # This is optional # If supplied this will overwrite 'var.elasticsearch.username' and 'var.elasticsearch.password' # If supplied this will overwrite 'var.kibana.username' and 'var.kibana.password' # cloud.auth: elastic:<password> # # ------------ Queuing Settings -------------- # # Internal queuing model, "memory" for legacy in-memory based queuing and # "persisted" for disk-based acked queueing. Defaults is memory # 事件缓冲的内部排队模型,值可以指定为内存memory或磁盘persisted,默认为内存memory # queue.type: memory # # If `queue.type: persisted`, the directory path where the pipeline data files will be stored. # Each pipeline will group its PQ files in a subdirectory matching its `pipeline.id`. # Default is path.data/queue. #使用持久化队列(queue.type: persisted)时,存储管道数据文件的目录路径。默认值为path.data/queue # path.queue: # # If using queue.type: persisted, the page data files size. The queue data consists of # append-only data files separated into pages. Default is 64mb # 使用持久化队列(queue.type: persisted)时,页面数据文件的大小。默认值为64mb # queue.page_capacity: 64mb # # If using queue.type: persisted, the maximum number of unread events in the queue. # Default is 0 (unlimited) #使用持久化队列(queue.type: persisted)时,队列中未读事件的最大数目。默认为0 # queue.max_events: 0 # # If using queue.type: persisted, the total capacity of the queue in number of bytes. # If you would like more unacked events to be buffered in Logstash, you can increase the # capacity using this setting. Please make sure your disk drive has capacity greater than # the size specified here. If both max_bytes and max_events are specified, Logstash will pick # whichever criteria is reached first # Default is 1024mb or 1gb #使用持久化队列(queue.type: persisted)时,队列的总容量(以字节为单位)。默认为1024mb # queue.max_bytes: 1024mb # # If using queue.type: persisted, the maximum number of acked events before forcing a checkpoint # Default is 1024, 0 for unlimited #使用持久化队列(queue.type: persisted)时,强制检查点之前已确认事件的最大数量。默认值为1024,0表示无限制 # queue.checkpoint.acks: 1024 # # If using queue.type: persisted, the maximum number of written events before forcing a checkpoint # Default is 1024, 0 for unlimited #使用持久化队列(queue.type: persisted)时,强制检查点之前写入的最大事件数。默认值为1024,0表示无限制 # queue.checkpoint.writes: 1024 # # If using queue.type: persisted, the interval in milliseconds when a checkpoint is forced on the head page # Default is 1000, 0 for no periodic checkpoint. #使用持久化队列(queue.type: persisted)时,强制执行检查点时的间隔(以毫秒为单位)。默认值为1000,0表示没有定期检查点 # queue.checkpoint.interval: 1000 # # ------------ Dead-Letter Queue Settings -------------- # Flag to turn on dead-letter queue. #是否启用死信队列。默认false # dead_letter_queue.enable: false # If using dead_letter_queue.enable: true, the maximum size of each dead letter queue. Entries # will be dropped if they would increase the size of the dead letter queue beyond this setting. # Default is 1024mb #每个死信队列的最大大小,超过该值,则会被删除,默认1024mb # dead_letter_queue.max_bytes: 1024mb # If using dead_letter_queue.enable: true, the interval in milliseconds where if no further events eligible for the DLQ # have been created, a dead letter queue file will be written. A low value here will mean that more, smaller, queue files # may be written, while a larger value will introduce more latency between items being "written" to the dead letter queue, and # being available to be read by the dead_letter_queue input when items are written infrequently. # Default is 5000. #启用死信队列,写入延迟的时间间隔,默认5000ms # dead_letter_queue.flush_interval: 5000 # If using dead_letter_queue.enable: true, controls which entries should be dropped to avoid exceeding the size limit. # Set the value to `drop_newer` (default) to stop accepting new events that would push the DLQ size over the limit. # Set the value to `drop_older` to remove queue pages containing the oldest events to make space for new ones. #启用死信队列时,应控制删除哪些条目以避免超过大小限制。将值设置为“drop_newer”(默认值)以停止接受会使死信队列大小超过限制的新事件,将值设置为“drop_older”可删除包含最旧事件的队列页面,为新事件腾出空间。 # dead_letter_queue.storage_policy: drop_newer # If using dead_letter_queue.enable: true, the interval that events have to be considered valid. After the interval has # expired the events could be automatically deleted from the DLQ. # The interval could be expressed in days, hours, minutes or seconds, using as postfix notation like 5d, # to represent a five days interval. # The available units are respectively d, h, m, s for day, hours, minutes and seconds. # If not specified then the DLQ doesn't use any age policy for cleaning events. #死信队列保存数据的有效时间,超时则从死信队列删除。 # dead_letter_queue.retain.age: 1d # If using dead_letter_queue.enable: true, the directory path where the data files will be stored. # Default is path.data/dead_letter_queue #死信队列的存储路径 # path.dead_letter_queue: # # ------------ Debugging Settings -------------- # # Options for log.level: # * fatal # * error # * warn # * info (default) # * debug # * trace #日志等级 # log.level: info #日志路径 # path.logs: # # ------------ Other Settings -------------- # # Allow or block running Logstash as superuser (default: true) # 是否运行超级用户运行Logstash # allow_superuser: false # # Where to find custom plugins #自定义插件的路径 # path.plugins: [] # # Flag to output log lines of each pipeline in its separate log file. Each log filename contains the pipeline.name # Default is false # 是否启用在不同日志文件中每个管道的日志 # pipeline.separate_logs: false # # ------------ X-Pack Settings (not applicable for OSS build)-------------- # # X-Pack Monitoring # https://www.elastic.co/guide/en/logstash/current/monitoring-logstash.html #xpack.monitoring.enabled: false #xpack.monitoring.elasticsearch.username: logstash_system #xpack.monitoring.elasticsearch.password: password #xpack.monitoring.elasticsearch.proxy: ["http://proxy:port"] #xpack.monitoring.elasticsearch.hosts: ["https://es1:9200", "https://es2:9200"] # an alternative to hosts + username/password settings is to use cloud_id/cloud_auth #xpack.monitoring.elasticsearch.cloud_id: monitoring_cluster_id:xxxxxxxxxx #xpack.monitoring.elasticsearch.cloud_auth: logstash_system:password # another authentication alternative is to use an Elasticsearch API key #xpack.monitoring.elasticsearch.api_key: "id:api_key" #xpack.monitoring.elasticsearch.ssl.certificate_authority: "/path/to/ca.crt" #xpack.monitoring.elasticsearch.ssl.ca_trusted_fingerprint: xxxxxxxxxx #xpack.monitoring.elasticsearch.ssl.truststore.path: path/to/file #xpack.monitoring.elasticsearch.ssl.truststore.password: password # use either keystore.path/keystore.password or certificate/key configurations #xpack.monitoring.elasticsearch.ssl.keystore.path: /path/to/file #xpack.monitoring.elasticsearch.ssl.keystore.password: password #xpack.monitoring.elasticsearch.ssl.certificate: /path/to/file #xpack.monitoring.elasticsearch.ssl.key: /path/to/key #xpack.monitoring.elasticsearch.ssl.verification_mode: full #xpack.monitoring.elasticsearch.ssl.cipher_suites: [] #xpack.monitoring.elasticsearch.sniffing: false #xpack.monitoring.collection.interval: 10s #xpack.monitoring.collection.pipeline.details.enabled: true # # X-Pack Management # https://www.elastic.co/guide/en/logstash/current/logstash-centralized-pipeline-management.html #xpack.management.enabled: false #xpack.management.pipeline.id: ["main", "apache_logs"] #xpack.management.elasticsearch.username: logstash_admin_user #xpack.management.elasticsearch.password: password #xpack.management.elasticsearch.proxy: ["http://proxy:port"] #xpack.management.elasticsearch.hosts: ["https://es1:9200", "https://es2:9200"] # an alternative to hosts + username/password settings is to use cloud_id/cloud_auth #xpack.management.elasticsearch.cloud_id: management_cluster_id:xxxxxxxxxx #xpack.management.elasticsearch.cloud_auth: logstash_admin_user:password # another authentication alternative is to use an Elasticsearch API key #xpack.management.elasticsearch.api_key: "id:api_key" #xpack.management.elasticsearch.ssl.ca_trusted_fingerprint: xxxxxxxxxx #xpack.management.elasticsearch.ssl.certificate_authority: "/path/to/ca.crt" #xpack.management.elasticsearch.ssl.truststore.path: /path/to/file #xpack.management.elasticsearch.ssl.truststore.password: password # use either keystore.path/keystore.password or certificate/key configurations #xpack.management.elasticsearch.ssl.keystore.path: /path/to/file #xpack.management.elasticsearch.ssl.keystore.password: password #xpack.management.elasticsearch.ssl.certificate: /path/to/file #xpack.management.elasticsearch.ssl.key: /path/to/certificate_key_file #xpack.management.elasticsearch.ssl.cipher_suites: [] #xpack.management.elasticsearch.ssl.verification_mode: full #xpack.management.elasticsearch.sniffing: false #xpack.management.logstash.poll_interval: 5s # X-Pack GeoIP Database Management # https://www.elastic.co/guide/en/logstash/current/plugins-filters-geoip.html#plugins-filters-geoip-manage_update #xpack.geoip.downloader.enabled: true #xpack.geoip.downloader.endpoint: "https://geoip.elastic.co/v1/database"
3.pipelines.yml 配置
# List of pipelines to be loaded by Logstash # # This document must be a list of dictionaries/hashes, where the keys/values are pipeline settings. # Default values for omitted settings are read from the `logstash.yml` file. # When declaring multiple pipelines, each MUST have its own `pipeline.id`. # # Example of two pipelines: # # - pipeline.id: test # pipeline.workers: 1 # pipeline.batch.size: 1 # config.string: "input { generator {} } filter { sleep { time => 1 } } output { stdout { codec => dots } }" # - pipeline.id: another_test # queue.type: persisted # path.config: "/tmp/logstash/*.config" # # Available options: # # # name of the pipeline # pipeline.id: mylogs # # # The configuration string to be used by this pipeline # config.string: "input { generator {} } filter { sleep { time => 1 } } output { stdout { codec => dots } }" # # # The path from where to read the configuration text # path.config: "/etc/conf.d/logstash/myconfig.cfg" # # # How many worker threads execute the Filters+Outputs stage of the pipeline # pipeline.workers: 1 (actually defaults to number of CPUs) # # # How many events to retrieve from inputs before sending to filters+workers # pipeline.batch.size: 125 # # # How long to wait in milliseconds while polling for the next event # # before dispatching an undersized batch to filters+outputs # pipeline.batch.delay: 50 # # Set the pipeline event ordering. Options are "auto" (the default), "true" # # or "false". # "auto" automatically enables ordering if the 'pipeline.workers' setting # is also set to '1', and disables otherwise. # "true" enforces ordering on a pipeline and prevents logstash from starting # a pipeline with multiple workers allocated. # "false" disable any extra processing necessary for preserving ordering. # # pipeline.ordered: auto # # # Internal queuing model, "memory" for legacy in-memory based queuing and # # "persisted" for disk-based acked queueing. Defaults is memory # queue.type: memory # # # If using queue.type: persisted, the page data files size. The queue data consists of # # append-only data files separated into pages. Default is 64mb # queue.page_capacity: 64mb # # # If using queue.type: persisted, the maximum number of unread events in the queue. # # Default is 0 (unlimited) # queue.max_events: 0 # # # If using queue.type: persisted, the total capacity of the queue in number of bytes. # # Default is 1024mb or 1gb # queue.max_bytes: 1024mb # # # If using queue.type: persisted, the maximum number of acked events before forcing a checkpoint # # Default is 1024, 0 for unlimited # queue.checkpoint.acks: 1024 # # # If using queue.type: persisted, the maximum number of written events before forcing a checkpoint # # Default is 1024, 0 for unlimited # queue.checkpoint.writes: 1024 # # # If using queue.type: persisted, the interval in milliseconds when a checkpoint is forced on the head page # # Default is 1000, 0 for no periodic checkpoint. # queue.checkpoint.interval: 1000 # # # Enable Dead Letter Queueing for this pipeline. # dead_letter_queue.enable: false # # If using dead_letter_queue.enable: true, the maximum size of dead letter queue for this pipeline. Entries # will be dropped if they would increase the size of the dead letter queue beyond this setting. # Default is 1024mb # dead_letter_queue.max_bytes: 1024mb # # If using dead_letter_queue.enable: true, the interval in milliseconds where if no further events eligible for the DLQ # have been created, a dead letter queue file will be written. A low value here will mean that more, smaller, queue files # may be written, while a larger value will introduce more latency between items being "written" to the dead letter queue, and # being available to be read by the dead_letter_queue input when items are are written infrequently. # Default is 5000. # # dead_letter_queue.flush_interval: 5000 # If using dead_letter_queue.enable: true, controls which entries should be dropped to avoid exceeding the size limit. # Set the value to `drop_newer` (default) to stop accepting new events that would push the DLQ size over the limit. # Set the value to `drop_older` to remove queue pages containing the oldest events to make space for new ones. # # dead_letter_queue.storage_policy: drop_newer # If using dead_letter_queue.enable: true, the interval that events have to be considered valid. After the interval has # expired the events could be automatically deleted from the DLQ. # The interval could be expressed in days, hours, minutes or seconds, using as postfix notation like 5d, # to represent a five days interval. # The available units are respectively d, h, m, s for day, hours, minutes and seconds. # If not specified then the DLQ doesn't use any age policy for cleaning events. # # dead_letter_queue.retain.age: 1d # # If using dead_letter_queue.enable: true, the directory path where the data files will be stored. # Default is path.data/dead_letter_queue # # path.dead_letter_queue:
同步方式:
1.logstash
2.go-mysql-elasticsearch
3.canal(阿里云)
一.logstash
1.安装mysql-connector-java 插件(需与mysql 版本一致)
[root@localhost home]# wget https://downloads.mysql.com/archives/get/p/3/file/mysql-connector-j-8.0.33-1.el9.noarch.rpm --2024-02-23 16:01:15-- https://downloads.mysql.com/archives/get/p/3/file/mysql-connector-j-8.0.33-1.el9.noarch.rpm 正在解析主机 downloads.mysql.com (downloads.mysql.com)... 23.66.135.36, 2600:1406:3c00:18b::2e31, 2600:1406:3c00:189::2e31 正在连接 downloads.mysql.com (downloads.mysql.com)|23.66.135.36|:443... 已连接。 已发出 HTTP 请求,正在等待回应... 302 Moved Temporarily 位置:https://cdn.mysql.com/archives/mysql-connector-java-8.0/mysql-connector-j-8.0.33-1.el9.noarch.rpm [跟随至新的 URL] --2024-02-23 16:01:16-- https://cdn.mysql.com/archives/mysql-connector-java-8.0/mysql-connector-j-8.0.33-1.el9.noarch.rpm 正在解析主机 cdn.mysql.com (cdn.mysql.com)... 23.42.93.135, 2600:1406:3a00:293::1d68, 2600:1406:3a00:282::1d68 正在连接 cdn.mysql.com (cdn.mysql.com)|23.42.93.135|:443... 已连接。 已发出 HTTP 请求,正在等待回应... 200 OK 长度:2425346 (2.3M) [application/x-redhat-package-manager] 正在保存至: “mysql-connector-j-8.0.33-1.el9.noarch.rpm” mysql-connector-j-8.0.33-1.el9.noarch.rpm 100%[==========================================================================================================================================>] 2.31M 736KB/s 用时 3.2s 2024-02-23 16:01:21 (736 KB/s) - 已保存 “mysql-connector-j-8.0.33-1.el9.noarch.rpm” [2425346/2425346]) [root@localhost home]# rpm -ivh mysql-connector-j-8.0.33-1.el9.noarch.rpm #安装 警告:mysql-connector-j-8.0.33-1.el9.noarch.rpm: 头V4 RSA/SHA256 Signature, 密钥 ID 3a79bd29: NOKEY 错误:依赖检测失败: java-headless >= 1:1.8.0 被 mysql-connector-j-1:8.0.33-1.el9.noarch 需要 [root@localhost home]# yum -y install java-headless #直接重装java-openjdk [root@localhost home]# rpm -ivh mysql-connector-j-8.0.33-1.el9.noarch.rpm 警告:mysql-connector-j-8.0.33-1.el9.noarch.rpm: 头V4 RSA/SHA256 Signature, 密钥 ID 3a79bd29: NOKEY Verifying... ################################# [100%] 准备中... ################################# [100%] 正在升级/安装... 1:mysql-connector-j-1:8.0.33-1.el9 ################################# [100%] [root@localhost home]# 安装完成,文件默认放在/usr/share/java/mysql-connector-java.jar
2.配置logstash.conf
#单表 input { jdbc { # mysql 数据库连接 jdbc_connection_string => "jdbc:mysql://192.168.243.134:3306/wine" # 用户名和密码 jdbc_user => "root" jdbc_password => "root" # 驱动类名 jdbc_driver_class => "com.mysql.cj.jdbc.Driver" # 驱动 jdbc_driver_library => "/usr/share/java/mysql-connector-j.jar" # 是否分页 jdbc_paging_enabled => true jdbc_page_size => "1000" # 是否清除last_run_metadata_path 的记录,如果为真那么每次相当于从头开始查询所有的数据库记录 clear_run =>false #是否需要记录某个column 的值,如果 record_last_run 为真,可以自定义我们需要表的字段名称, #此时该参数就要为 true. 否则默认 track 的是 timestamp 的值. use_column_value => true #如果 use_column_value 为真,需配置此参数. 这个参数就是数据库给出的一个字段名称。当然该字段必须是递增的,可以是 数据库的数据时间这类的 tracking_column => "unix_ts_in_secs" #是否记录上次执行结果, 如果为真,将会把上次执行到的 tracking_column 字段的值记录下来,保存到 last_run_metadata_path 指定的文件中 record_last_run => "true" # 字段类型 tracking_column_type => "numeric" # 设置监听间隔 schedule => "*/5 * * * * *" # 执行sql statement => "SELECT * FROM wine_address" # 索引类型 type => "es_table" } } filter { ruby { code => "event.set('@timestamp',event.get('timestamp'))" } mutate { copy => { "id" => "[@metadata][_id]"} remove_field => ["id", "@version", "unix_ts_in_secs","timestamp"] } } output { if [type]=="es_table" { elasticsearch { hosts => ["192.168.243.134:9200"] (es ip与端口) index => "es_table_idx" (索引名称) document_id => "%{[@metadata][_id]}" } } } #多表 input { jdbc { jdbc_connection_string => "jdbc:mysql://192.168.243.134:3306/wine" jdbc_user => "root" jdbc_password => "123456" jdbc_driver_class => "com.mysql.cj.jdbc.Driver" jdbc_driver_library => "/usr/share/java/mysql-connector-j.jar" jdbc_paging_enabled => true jdbc_page_size => "1000" clean_run =>false use_column_value => true tracking_column => "unix_ts_in_secs" record_last_run => "true" tracking_column_type => "numeric" schedule => "*/5 * * * * *" statement => "SELECT * FROM wine_address" type => "es_table" } jdbc { jdbc_connection_string => "jdbc:mysql://192.168.243.134:3306/wine" jdbc_user => "root" jdbc_password => "123456" jdbc_driver_class => "com.mysql.cj.jdbc.Driver" jdbc_driver_library => "/usr/share/java/mysql-connector-j.jar" jdbc_paging_enabled => true jdbc_page_size => "1000" clean_run =>false use_column_value => true tracking_column => "unix_ts_in_secs" record_last_run => "true" tracking_column_type => "numeric" schedule => "*/5 * * * * *" statement => "SELECT * FROM wine_area" type => "es_table1" } } filter { mutate { copy => { "id" => "[@metadata][_id]"} remove_field => ["id", "@version", "unix_ts_in_secs","timestamp"] } } output { if [type]=="es_table" { elasticsearch { hosts => ["192.168.243.134:9200"] index => "es_table_idx" document_id => "%{address_id}" } } if [type]=="es_table1" { elasticsearch { hosts => ["192.168.243.134:9200"] index => "es_table_idx1" document_id => "%{area_id}" } } }
3.启动logstash
[root@localhost logstash-8.12.2]# ./bin/logstash -f /home/logstash-8.12.2/config/logstash.conf &
4.插入数据
INSERT INTO `wine`.`wine_address` ( `address_id`, `member_id`, `area_id`, `city_id`, `province_id`, `area_info`, `address`, `mob_phone`, `reciver_name`, `is_default`, `dis_mode`, `add_time` )
VALUES( 5, 5, 5, 5, 5, '测试', '测试', 14512456789, '10', 0, '1', 0 );
5.查看数据
二.go-mysql-elasticsearch
1.安装go环境
[root@localhost /]# wget https://golang.google.cn/dl/go1.15.4.linux-amd64.tar.gz #下载
[root@localhost /]# tar -zxvf go1.15.4.linux-amd64.tar.gz #解压
[root@localhost /]# mv go /usr/local/
[root@localhost river]# vim /etc/profile #添加环境变量
export GOROOT=/usr/local/go
export GOPATH=/root/go
export PATH=$PATH:$GOROOT/bin:$GOPATH/bin
[root@localhost /]# suorce /etc/profile #更新
2.下载go-mysql-elasticsearch
[root@localhost /] go get github.com/siddontang/go-mysql-elasticsearch #下载
[root@localhost river]# cd $GOPATH/src/github.com/siddontang/go-mysql-elasticsearch
[root@localhost go-mysql-elasticsearch]# make #编译,编译成功后 go-mysql-elasticsearch/bin 目录下会生成名为 go-mysql-elasticsearch 的可执行文件
3.配置($GOPATH/src/github.com/siddontang/go-mysql-elasticsearch/etc/river.toml)
my_addr = "192.168.243.134:3306" #数据库ip地址 my_user = "root" #数据库用户名 my_pass = "123456" #数据库密码 es_addr = "192.168.243.134:9200" #es 地址 es_user = "" #es账号 es_pass = "" #es密码 data_dir = "/root/go/src/github.com/siddontang/go-mysql-elasticsearch/data" #数据存储目录 stat_addr = "192.168.243.134:12800" #内部地址加端口 stat_path = "/metrics" server_id = 1001 flavor = "mysql" mysqldump = "mysqldump " bulk_size = 128 flush_bulk_time = "200ms" skip_no_pk_table = false [[source]] schema = "wine" #数据库名称 tables = ["wine_role"] #数据表名称 [[rule]] schema = "wine" #数据库名称 table = "wine_role" #数据表名称 index = "" #生成es数据索引名称,对应schema type = "" #生成es数据类型,对应table filter = ["id", "name"] #只同步的数据字段
4.启动
# 官网提示mysql版本小于8,es 版本小于6
[root@localhost go-mysql-elasticsearch]# ./bin/go-mysql-elasticsearch -config=./etc/river.toml
5.查看elasticsearch 数据(可以通过google elasticsearch-head 插件)
三.canal(https://github.com/alibaba/canal/releases)
1.下载安装canal
#下载1.17 版本,支持MySQL 8.0。因为本人mysql是8.0.36 [root@localhost home]# wget https://github.com/alibaba/canal/releases/download/canal-1.1.7/canal.adapter-1.1.7.tar.gz #是canal的客户端适配器,可将其看作canal client [root@localhost home]# wget https://github.com/alibaba/canal/releases/download/canal-1.1.7/canal.admin-1.1.7.tar.gz #canal 操作界面 [root@localhost home]# wget https://github.com/alibaba/canal/releases/download/canal-1.1.7/canal.deployer-1.1.7.tar.gz #canal server 端 #创建文件夹 [root@localhost home]# madir canal [root@localhost home]# cd canal [root@localhost canal]# mkdir canal-adapter [root@localhost canal]# mkdir canal-admin [root@localhost canal]# mkdir canal-server #解压 [root@localhost home]# tar -zxvf canal.adapter-1.1.7.tar.gz -C ./canal/canal-adapter/ [root@localhost home]# tar -zxvf canal.admin-1.1.7.tar.gz -C ./canal/canal-admin/ [root@localhost home]# tar -zxvf canal.deployer-1.1.7.tar.gz -C ./canal/canal-server/ #修改权限 [root@localhost home]# chmod -R 777 canal
2.配置
#canal-server/conf/canal.properties ################################################# ######### common argument ############# ################################################# # tcp bind ip canal.ip = 127.0.0.1 # register ip to zookeeper canal.register.ip = canal.port = 11111 canal.metrics.pull.port = 11112 # canal instance user/passwd # canal.user = canal # canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458 # canal admin config #canal.admin.manager = 127.0.0.1:8089 canal.admin.port = 11110 canal.admin.user = admin canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441 # admin auto register #canal.admin.register.auto = true #canal.admin.register.cluster = #canal.admin.register.name = canal.zkServers = # flush data to zk canal.zookeeper.flush.period = 1000 canal.withoutNetty = false # tcp, kafka, rocketMQ, rabbitMQ, pulsarMQ canal.serverMode = tcp # flush meta cursor/parse position to file canal.file.data.dir = ${canal.conf.dir} canal.file.flush.period = 1000 ## memory store RingBuffer size, should be Math.pow(2,n) canal.instance.memory.buffer.size = 16384 ## memory store RingBuffer used memory unit size , default 1kb canal.instance.memory.buffer.memunit = 1024 ## meory store gets mode used MEMSIZE or ITEMSIZE canal.instance.memory.batch.mode = MEMSIZE canal.instance.memory.rawEntry = true ## detecing config canal.instance.detecting.enable = false #canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now() canal.instance.detecting.sql = select 1 canal.instance.detecting.interval.time = 3 canal.instance.detecting.retry.threshold = 3 canal.instance.detecting.heartbeatHaEnable = false # support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery canal.instance.transaction.size = 1024 # mysql fallback connected to new master should fallback times canal.instance.fallbackIntervalInSeconds = 60 # network config canal.instance.network.receiveBufferSize = 16384 canal.instance.network.sendBufferSize = 16384 canal.instance.network.soTimeout = 30 # binlog filter config canal.instance.filter.druid.ddl = true canal.instance.filter.query.dcl = false canal.instance.filter.query.dml = false canal.instance.filter.query.ddl = false canal.instance.filter.table.error = false canal.instance.filter.rows = false canal.instance.filter.transaction.entry = false canal.instance.filter.dml.insert = false canal.instance.filter.dml.update = false canal.instance.filter.dml.delete = false # binlog format/image check canal.instance.binlog.format = ROW,STATEMENT,MIXED canal.instance.binlog.image = FULL,MINIMAL,NOBLOB # binlog ddl isolation canal.instance.get.ddl.isolation = false # parallel parser config canal.instance.parser.parallel = true ## concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors() #canal.instance.parser.parallelThreadSize = 16 ## disruptor ringbuffer size, must be power of 2 canal.instance.parser.parallelBufferSize = 256 # table meta tsdb info canal.instance.tsdb.enable = true canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:} canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL; canal.instance.tsdb.dbUsername = canal canal.instance.tsdb.dbPassword = canal # dump snapshot interval, default 24 hour canal.instance.tsdb.snapshot.interval = 24 # purge snapshot expire , default 360 hour(15 days) canal.instance.tsdb.snapshot.expire = 360 ################################################# ######### destinations ############# ################################################# canal.destinations = example # conf root dir canal.conf.dir = ../conf # auto scan instance dir add/remove and start/stop instance canal.auto.scan = true canal.auto.scan.interval = 5 # set this value to 'true' means that when binlog pos not found, skip to latest. # WARN: pls keep 'false' in production env, or if you know what you want. canal.auto.reset.latest.pos.mode = false canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml #canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml canal.instance.global.mode = spring canal.instance.global.lazy = false canal.instance.global.manager.address = ${canal.admin.manager} #canal.instance.global.spring.xml = classpath:spring/memory-instance.xml canal.instance.global.spring.xml = classpath:spring/file-instance.xml #canal.instance.global.spring.xml = classpath:spring/default-instance.xml ################################################## ######### MQ Properties ############# ################################################## # aliyun ak/sk , support rds/mq canal.aliyun.accessKey = canal.aliyun.secretKey = canal.aliyun.uid= canal.mq.flatMessage = true canal.mq.canalBatchSize = 50 canal.mq.canalGetTimeout = 100 # Set this value to "cloud", if you want open message trace feature in aliyun. canal.mq.accessChannel = local canal.mq.database.hash = true canal.mq.send.thread.size = 30 canal.mq.build.thread.size = 8 ################################################## ######### Kafka ############# ################################################## kafka.bootstrap.servers = 127.0.0.1:9092 kafka.acks = all kafka.compression.type = none kafka.batch.size = 16384 kafka.linger.ms = 1 kafka.max.request.size = 1048576 kafka.buffer.memory = 33554432 kafka.max.in.flight.requests.per.connection = 1 kafka.retries = 0 kafka.kerberos.enable = false kafka.kerberos.krb5.file = ../conf/kerberos/krb5.conf kafka.kerberos.jaas.file = ../conf/kerberos/jaas.conf # sasl demo # kafka.sasl.jaas.config = org.apache.kafka.common.security.scram.ScramLoginModule required \\n username=\"alice\" \\npassword="alice-secret\"; # kafka.sasl.mechanism = SCRAM-SHA-512 # kafka.security.protocol = SASL_PLAINTEXT ################################################## ######### RocketMQ ############# ################################################## rocketmq.producer.group = test rocketmq.enable.message.trace = false rocketmq.customized.trace.topic = rocketmq.namespace = rocketmq.namesrv.addr = 127.0.0.1:9876 rocketmq.retry.times.when.send.failed = 0 rocketmq.vip.channel.enabled = false rocketmq.tag = ################################################## ######### RabbitMQ ############# ################################################## rabbitmq.host = rabbitmq.virtual.host = rabbitmq.exchange = rabbitmq.username = rabbitmq.password = rabbitmq.deliveryMode = ################################################## ######### Pulsar ############# ################################################## pulsarmq.serverUrl = pulsarmq.roleToken = pulsarmq.topicTenantPrefix = /home/canal/canal-server/conf/example/instance.properties #canal-server/conf/example ################################################# ## mysql serverId , v1.0.26+ will autoGen canal.instance.mysql.slaveId=200 # enable gtid use true/false canal.instance.gtidon=false # position info #mysql地址加端口 canal.instance.master.address=127.0.0.1:3306 #开始同步的binlog日志文件,注意这里的binlog文件名以你自己查出来的为准 canal.instance.master.journal.name=mysql-bin.000045 #开始同步的binlog文件位置 canal.instance.master.position=237 #开始同步时间点 时间戳形式 canal.instance.master.timestamp=1709112558000 #开始同步gtid canal.instance.master.gtid= # rds oss binlog canal.instance.rds.accesskey= canal.instance.rds.secretkey= canal.instance.rds.instanceId= # table meta tsdb info canal.instance.tsdb.enable=true #canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb #canal.instance.tsdb.dbUsername=canal #canal.instance.tsdb.dbPassword=canal #canal.instance.standby.address = #canal.instance.standby.journal.name = #canal.instance.standby.position = #canal.instance.standby.timestamp = #canal.instance.standby.gtid= # username/password canal.instance.dbUsername=root #数据库用户名 canal.instance.dbPassword=123456 #数据库密码 canal.instance.connectionCharset = UTF-8 # enable druid Decrypt database password canal.instance.enableDruid=false #canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ== # table regex canal.instance.filter.regex=.*\\..* # table black regex #配置不同步mysql库 canal.instance.filter.black.regex=mysql\\.slave_.* # table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2) #canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch # table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2) #canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch # mq config canal.mq.topic=example # dynamic topic route by schema or table regex #canal.mq.dynamicTopic=mytest1.user,topic2:mytest2\\..*,.*\\..* canal.mq.partition=0 # hash partition config #canal.mq.enableDynamicQueuePartition=false #canal.mq.partitionsNum=3 #canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6 #canal.mq.partitionHash=test.table:id^name,.*\\..* # # multi stream for polardbx canal.instance.multi.stream.on=false ################################################# #/home/canal/canal-adapter/conf/application.yml server: port: 8081 #adapter 端口 spring: jackson: date-format: yyyy-MM-dd HH:mm:ss time-zone: GMT+8 default-property-inclusion: non_null canal.conf: mode: tcp #tcp canal server读取模式 kafka rocketMQ rabbitMQ flatMessage: true # 扁平message开关, 是否以json字符串形式投递数据, 仅在kafka/rocketMQ模式下有效 zookeeperHosts:#集群模式下的zk地址, 如果配置了canalServerHost, 则以canalServerHost为准 syncBatchSize: 1000 #批处理大小 retries: -1 #重试次数,-1时表示一致阻塞 timeout: #获取数据的时长 accessKey: secretKey: consumerProperties: # canal tcp consumer canal.tcp.server.host: 192.168.243.134:11111 canal.tcp.zookeeper.hosts: canal.tcp.batch.size: 500 canal.tcp.username: canal.tcp.password: # kafka consumer kafka.bootstrap.servers: 127.0.0.1:9092 kafka.enable.auto.commit: false kafka.auto.commit.interval.ms: 1000 kafka.auto.offset.reset: latest kafka.request.timeout.ms: 40000 kafka.session.timeout.ms: 30000 kafka.isolation.level: read_committed kafka.max.poll.records: 1000 # rocketMQ consumer rocketmq.namespace: rocketmq.namesrv.addr: 127.0.0.1:9876 rocketmq.batch.size: 1000 rocketmq.enable.message.trace: false rocketmq.customized.trace.topic: rocketmq.access.channel: rocketmq.subscribe.filter: # rabbitMQ consumer rabbitmq.host: rabbitmq.virtual.host: rabbitmq.username: rabbitmq.password: rabbitmq.resource.ownerId: srcDataSources: #数据来源 defaultDS: url: jdbc:mysql://192.168.243.134:3306/wine?useUnicode=true username: root password: 123456 canalAdapters: #数据去处 - instance: example # canal instance Name or mq topic name #对应对应canal destination或者 mq topic groups: #适配器组 - groupId: g1 #组id outerAdapters: #适配器列表 - name: logger # - name: rdb # key: mysql1 # properties: # jdbc.driverClassName: com.mysql.jdbc.Driver # jdbc.url: jdbc:mysql://127.0.0.1:3306/mytest2?useUnicode=true # jdbc.username: root # jdbc.password: 121212 # druid.stat.enable: false # druid.stat.slowSqlMillis: 1000 # - name: rdb # key: oracle1 # properties: # jdbc.driverClassName: oracle.jdbc.OracleDriver # jdbc.url: jdbc:oracle:thin:@localhost:49161:XE # jdbc.username: mytest # jdbc.password: m121212 # - name: rdb # key: postgres1 # properties: # jdbc.driverClassName: org.postgresql.Driver # jdbc.url: jdbc:postgresql://localhost:5432/postgres # jdbc.username: postgres # jdbc.password: 121212 # threads: 1 # commitSize: 3000 # - name: hbase # properties: # hbase.zookeeper.quorum: 127.0.0.1 # hbase.zookeeper.property.clientPort: 2181 # zookeeper.znode.parent: /hbase - name: es8 #本人是es8 hosts: http://192.168.243.134:9200 # 127.0.0.1:9200 for rest mode,rest模式ip前要加http,transport 值端口为9300 properties: mode: rest # or rest # security.auth: test:123456 # only used for rest mode cluster.name: elasticsearch #集群名称 # - name: kudu # key: kudu # properties: # kudu.master.address: 127.0.0.1 # ',' split multi address # - name: phoenix # key: phoenix # properties: # jdbc.driverClassName: org.apache.phoenix.jdbc.PhoenixDriver # jdbc.url: jdbc:phoenix:127.0.0.1:2181:/hbase/db # jdbc.username: # jdbc.password: #//home/canal/canal-adapter/conf/es8/mytest_user.yml dataSourceKey: defaultDS #源数据源的key, 对应application.yml配置srcDataSources中的值 destination: example groupId: g1 #组id esMapping: _index: mytest_user #索引 _id: _id #id,如不配置该项必须配置下面的pk项_id否则会由es自动分配 _type: _doc #类型 upsert: true #支持不存在新增操作 # pk: id sql: "select a.album_id as _id, a.album_name, a.class_name, a.mark,a.goods_sku,a.add_time,a.merchant_id from wine_album a" # objFields: # _labels: array:; etlCondition: "where a.add_time>={}" commitBatch: 3000 #批量提交大小
3.启动
[root@localhost canal-server]# ./bin/startup.sh
4.查看日志
#查看server日志 (/home/canal/canal-server/logs) [root@localhost canal]# tail -5f canal.log 2024-02-28 14:47:50.369 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## set default uncaught exception handler 2024-02-28 14:47:50.375 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## load canal configurations 2024-02-28 14:47:50.383 [main] INFO com.alibaba.otter.canal.deployer.CanalStarter - ## start the canal server. 2024-02-28 14:47:50.414 [main] INFO com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[127.0.0.1(127.0.0.1):11111] 2024-02-28 14:47:51.318 [main] INFO com.alibaba.otter.canal.deployer.CanalStarter - ## the canal server is running now ...... #查看example日志 [root@localhost example]# tail -5f example.log 2024-02-28 14:47:51.284 [main] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table black filter : ^mysql\.slave_.*$ 2024-02-28 14:47:51.289 [main] INFO c.a.otter.canal.instance.core.AbstractCanalInstance - start successful.... 2024-02-28 14:47:51.342 [destination = example , address = /127.0.0.1:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> begin to find start position, it will be long time for reset or first position 2024-02-28 14:47:51.342 [destination = example , address = /127.0.0.1:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - prepare to find start position just show master status 2024-02-28 14:47:51.749 [destination = example , address = /127.0.0.1:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> find start position successfully, EntryPosition[included=false,journalName=mysql-bin.000044,position=4,serverId=1,gtid=<null>,timestamp=1709092749000] cost : 403ms , the next step is binlog dump #查看adapter日志(/home/canal/canal-adapter/logs/adapter) [root@localhost logs]# tail -10f ./adapter/adapter.log 2024-02-28 18:56:58.798 [main] INFO c.a.o.canal.adapter.launcher.loader.CanalAdapterLoader - Load canal adapter: logger succeed 2024-02-28 18:56:59.017 [main] INFO c.a.o.c.client.adapter.es.core.config.ESSyncConfigLoader - ## Start loading es mapping config ... 2024-02-28 18:56:59.028 [main] INFO c.a.o.c.client.adapter.es.core.config.ESSyncConfigLoader - ## ES mapping config loaded 2024-02-28 18:56:59.242 [main] INFO c.a.o.canal.adapter.launcher.loader.CanalAdapterLoader - Load canal adapter: es8 succeed 2024-02-28 18:56:59.248 [main] INFO c.alibaba.otter.canal.connector.core.spi.ExtensionLoader - extension classpath dir: /home/canal/canal-adapter/plugin 2024-02-28 18:56:59.267 [main] INFO c.a.o.canal.adapter.launcher.loader.CanalAdapterLoader - Start adapter for canal-client mq topic: example-g1 succeed 2024-02-28 18:56:59.267 [main] INFO c.a.o.canal.adapter.launcher.loader.CanalAdapterService - ## the canal client adapters are running now ...... 2024-02-28 18:56:59.267 [Thread-3] INFO c.a.otter.canal.adapter.launcher.loader.AdapterProcessor - =============> Start to connect destination: example <============= 2024-02-28 18:56:59.273 [main] INFO c.a.otter.canal.adapter.launcher.CanalAdapterApplication - Started CanalAdapterApplication in 2.381 seconds (JVM running for 2.951) 2024-02-28 18:56:59.356 [Thread-3] INFO c.a.otter.canal.adapter.launcher.loader.AdapterProcessor - =============> Subscribe destination: example succeed <============= 2024-02-28 18:58:32.556 [pool-3-thread-1] INFO c.a.o.canal.client.adapter.logger.LoggerAdapterExample - DML: {"data":[{"album_id":4,"album_name":"2","class_name":6,"mark":"2","goods_sku":"2","add_time":2,"merchant_id":2}],"database":"wine","destination":"example","es":1709117912000,"groupId":"g1","isDdl":false,"old":null,"pkNames":["album_id"],"sql":"","table":"wine_album","ts":1709117912427,"type":"INSERT"} 2024-02-28 18:58:32.805 [pool-3-thread-1] DEBUG c.a.o.canal.client.adapter.es.core.service.ESSyncService - DML: {"data":[{"album_id":4,"album_name":"2","class_name":6,"mark":"2","goods_sku":"2","add_time":2,"merchant_id":2}],"database":"wine","destination":"example","es":1709117912000,"groupId":"g1","isDdl":false,"old":null,"pkNames":["album_id"],"sql":"","table":"wine_album","ts":1709117912427,"type":"INSERT"} Affected indexes: mytest_user
5.查看es 索引数据
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。