当前位置:   article > 正文

Elasticsearch:使用 MongoDB connector 同步数据到 Elasticsearch

Elasticsearch:使用 MongoDB connector 同步数据到 Elasticsearch

MongoDB 是一个基于分布式文件存储的数据库。由 C++ 语言编写。旨在为 WEB 应用提供可扩展的高性能数据存储解决方案。MongoDB 是一个介于关系数据库和非关系数据库之间的产品,是非关系数据库当中功能最丰富,最像关系数据库的。Elasticsearch 是一个高效强大的大数据搜索引擎。它的 Speed, Scale 及 Relevance 是很多数据库不具有的。通过 Elasticsearch 的高效搜索引擎,我们可以快速地搜索相关的内容,我们也可以使用 Kibana 所提供的可视化为数据提供洞察。在今天的文章中,我将介绍如何把 MongoDB 里的数据同步到 Elasticsearch。我将展示如何使用 MongoDB connector 来实现这个功能。

在今天的练习中,我将使用如下的架构:

如上所示,我们把 MongoDB 里的数据集通过 MongoDB 连接器同步到 Elasticsearch 中。在今天的展示中,我将使用 Elastic Stack 8.13.2 来进行展示。

使用 MongoDB connector 同步数据到 Elasticsearch

安装

MongoDB

我们可以按照链接来进行安装。我们在 Ubuntu OS 22.04 上进行安装。

在终端中,如果尚未安装,安装 gnupg 和 curl:

sudo apt-get install gnupg curl

要导入 MongoDB 的公共 GPG 密钥,请运行以下命令:

  1. curl -fsSL https://www.mongodb.org/static/pgp/server-7.0.asc | \
  2. sudo gpg -o /usr/share/keyrings/mongodb-server-7.0.gpg \
  3. --dearmor

创建一个 MongoDB 的列表文件。为你的 Ubuntu 版本创建列表文件 /etc/apt/sources.list.d/mongodb-org-7.0.list

echo "deb [ arch=amd64,arm64 signed-by=/usr/share/keyrings/mongodb-server-7.0.gpg ] https://repo.mongodb.org/apt/ubuntu jammy/mongodb-org/7.0 multiverse" | sudo tee /etc/apt/sources.list.d/mongodb-org-7.0.list

发出以下命令以重新加载本地软件包数据库:

sudo apt-get update

你可以安装 MongoDB 的最新稳定版本,也可以安装特定版本的 MongoDB。

sudo apt-get install -y mongodb-org

我们需要针对 MongoDB 进行配置:

/etc/mongod.conf

我们可以使用如下的命令来启动 mongodb:

sudo service mongod start

我们可以使用如下的命令来查看 mongodb 的状态:

service mongod status

我们可以使用如下的命令来创建一个数据库:

mongosh mongodb://192.168.0.9:27017
  1. parallels@ubuntu2204:~$ mongosh 192.168.0.9
  2. Current Mongosh Log ID: 661fc4d5b790b167823b9273
  3. Connecting to: mongodb://192.168.0.9:27017/?directConnection=true&appName=mongosh+2.2.4
  4. Using MongoDB: 7.0.8
  5. Using Mongosh: 2.2.4
  6. For mongosh info see: https://docs.mongodb.com/mongodb-shell/
  7. ------
  8. The server generated these startup warnings when booting
  9. 2024-04-17T20:40:52.680+08:00: Using the XFS filesystem is strongly recommended with the WiredTiger storage engine. See http://dochub.mongodb.org/core/prodnotes-filesystem
  10. 2024-04-17T20:40:53.345+08:00: Access control is not enabled for the database. Read and write access to data and configuration is unrestricted
  11. 2024-04-17T20:40:53.345+08:00: vm.max_map_count is too low
  12. ------
  13. test> use mydatabase
  14. switched to db mydatabase
  15. mydatabase> show dbs
  16. admin 40.00 KiB
  17. config 12.00 KiB
  18. local 40.00 KiB
  19. mydatabase> db.mycol.insert({"name":"liuxg"})
  20. DeprecationWarning: Collection.insert() is deprecated. Use insertOne, insertMany, or bulkWrite.
  21. {
  22. acknowledged: true,
  23. insertedIds: { '0': ObjectId('661fc5a3b790b167823b9274') }
  24. }
  25. mydatabase> show dbs
  26. admin 40.00 KiB
  27. config 12.00 KiB
  28. local 40.00 KiB
  29. mydatabase 8.00 KiB
  30. mydatabase>
  1. mydatabase> db.posts.insertOne({
  2. ... title: "Post Title 1",
  3. ... })
  4. {
  5. acknowledged: true,
  6. insertedId: ObjectId('6627ac96b790b167823b9275')
  7. }
  8. mydatabase> show dbs
  9. admin 40.00 KiB
  10. config 72.00 KiB
  11. local 40.00 KiB
  12. mydatabase 48.00 KiB
  1. mydatabase> show dbs
  2. admin 40.00 KiB
  3. config 72.00 KiB
  4. local 40.00 KiB
  5. mydatabase 48.00 KiB
  6. mydatabase> show collections
  7. mycol
  8. posts
  9. mydatabase> show tables
  10. mycol
  11. posts
  12. mydatabase> db.posts.find()
  13. [
  14. { _id: ObjectId('6627ac96b790b167823b9275'), title: 'Post Title 1' }
  15. ]

在默认的情况下,MongoDB 的安装是没有任何安全的。我们可以参考链接来配置 MongoDB 的安全。我们必须使用如下的步骤来进行:

  1. 创建管理用户。这个需要针对 admin 数据库来操作
  2. 启动安全,并为自己的数据库创建用户

我们首先为 admin 数据库添加一个用户:liuxg/password:

  1. use admin
  2. db.createUser(
  3. {
  4. user: "liuxg",
  5. pwd: "password",
  6. roles: [ { role: "userAdminAnyDatabase", db: "admin" }, "readWriteAnyDatabase" ]
  7. }
  8. )

如果你想在运行时设置密码,你可以使用如下的命令:

  1. use admin
  2. db.createUser(
  3. {
  4. user: "liuxg",
  5. pwd: passwordPrompt(),
  6. roles: [ { role: "userAdminAnyDatabase", db: "admin" }, "readWriteAnyDatabase" ]
  7. }
  8. )
  1. $ mongosh mongodb://192.168.0.9:27017
  2. Current Mongosh Log ID: 6629cb4422076463783b9273
  3. Connecting to: mongodb://192.168.0.9:27017/?directConnection=true&appName=mongosh+2.2.4
  4. Using MongoDB: 7.0.8
  5. Using Mongosh: 2.2.4
  6. For mongosh info see: https://docs.mongodb.com/mongodb-shell/
  7. ------
  8. The server generated these startup warnings when booting
  9. 2024-04-25T11:17:14.139+08:00: Using the XFS filesystem is strongly recommended with the WiredTiger storage engine. See http://dochub.mongodb.org/core/prodnotes-filesystem
  10. 2024-04-25T11:17:14.980+08:00: Access control is not enabled for the database. Read and write access to data and configuration is unrestricted
  11. 2024-04-25T11:17:14.980+08:00: vm.max_map_count is too low
  12. ------
  13. test> use admin
  14. switched to db admin
  15. admin> db.createUser(
  16. ... {
  17. ... user: "liuxg",
  18. ... pwd: "password",
  19. ... roles: [ { role: "userAdminAnyDatabase", db: "admin" }, "readWriteAnyDatabase" ]
  20. ... }
  21. ... )
  22. { ok: 1 }

我们可以通过 db.getUsers() 来查看已经创建的 users:

  1. test> use admin
  2. switched to db admin
  3. admin> db.getUsers()
  4. MongoServerError[Unauthorized]: Command usersInfo requires authentication
  5. admin> db.auth("liuxg", "password")
  6. { ok: 1 }
  7. admin> db.getUsers()
  8. {
  9. users: [
  10. {
  11. _id: 'admin.liuxg',
  12. userId: UUID('5ba3e137-17d1-4d7e-a20e-136ef7e6b978'),
  13. user: 'liuxg',
  14. db: 'admin',
  15. roles: [
  16. { role: 'userAdminAnyDatabase', db: 'admin' },
  17. { role: 'readWriteAnyDatabase', db: 'admin' }
  18. ],
  19. mechanisms: [ 'SCRAM-SHA-1', 'SCRAM-SHA-256' ]
  20. },
  21. {
  22. _id: 'admin.myUserAdmin',
  23. userId: UUID('b1bc8fa1-1974-4db9-b05a-1f5680f1fb88'),
  24. user: 'myUserAdmin',
  25. db: 'admin',
  26. roles: [
  27. { role: 'userAdminAnyDatabase', db: 'admin' },
  28. { role: 'readWriteAnyDatabase', db: 'admin' }
  29. ],
  30. mechanisms: [ 'SCRAM-SHA-1', 'SCRAM-SHA-256' ]
  31. },
  32. {
  33. _id: 'admin.siteUserAdmin',
  34. userId: UUID('1e84fcc6-f3b9-451f-9330-afee8de2da8c'),
  35. user: 'siteUserAdmin',
  36. db: 'admin',
  37. roles: [ { role: 'userAdminAnyDatabase', db: 'admin' } ],
  38. mechanisms: [ 'SCRAM-SHA-1', 'SCRAM-SHA-256' ]
  39. },
  40. {
  41. _id: 'admin.useradmin',
  42. userId: UUID('4b82fb97-0f3b-4b6c-aca8-381043383d95'),
  43. user: 'useradmin',
  44. db: 'admin',
  45. roles: [ { role: 'userAdminAnyDatabase', db: 'admin' } ],
  46. mechanisms: [ 'SCRAM-SHA-1', 'SCRAM-SHA-256' ]
  47. }
  48. ],
  49. ok: 1
  50. }

我们接下来退出来。我们需要配置 MongoDB。我们找到配置文件 /etc/mongodb.conf

/etc/mongodb.conf

我们需要启动安全。修改完毕后,我们再次使用如下的命令来启动 MongodDB:

  1. parallels@ubuntu2204:~$ sudo vi /etc/mongod.conf
  2. parallels@ubuntu2204:~$ sudo service mongod restart
  3. parallels@ubuntu2204:~$ service mongod status
  4. ● mongod.service - MongoDB Database Server
  5. Loaded: loaded (/lib/systemd/system/mongod.service; disabled; vendor preset: enabled)
  6. Active: active (running) since Thu 2024-04-25 11:21:10 CST; 2s ago
  7. Docs: https://docs.mongodb.org/manual
  8. Main PID: 58182 (mongod)
  9. Memory: 169.9M
  10. CPU: 897ms
  11. CGroup: /system.slice/mongod.service
  12. └─58182 /usr/bin/mongod --config /etc/mongod.conf

我们再次使用如下的命令来进行 CLI。我们在下面为 mydatabase 创建一个新的用户 elastic:

  1. use mydatabase
  2. db.createUser(
  3. {
  4. user: "elastic",
  5. pwd: "123456",
  6. roles: [ { role: "readWrite", db: "mydatabase" } ]
  7. }
  8. )
  1. parallels@ubuntu2204:~$ mongosh mongodb://192.168.0.9:27017
  2. Current Mongosh Log ID: 6629cc5699b3c8d57d3b9273
  3. Connecting to: mongodb://192.168.0.9:27017/?directConnection=true&appName=mongosh+2.2.4
  4. Using MongoDB: 7.0.8
  5. Using Mongosh: 2.2.4
  6. For mongosh info see: https://docs.mongodb.com/mongodb-shell/
  7. test> use mydatabase
  8. switched to db mydatabase
  9. mydatabase> db.posts.find({})
  10. MongoServerError[Unauthorized]: Command find requires authentication
  11. mydatabase> db.auth("liuxg", "password")
  12. MongoServerError[AuthenticationFailed]: Authentication failed.
  13. mydatabase> use admin
  14. switched to db admin
  15. admin> db.posts.find({})
  16. MongoServerError[Unauthorized]: Command find requires authentication
  17. admin> use admin
  18. already on db admin
  19. admin> db.auth("liuxg", "password")
  20. { ok: 1 }
  21. admin> use mydatabase
  22. switched to db mydatabase
  23. mydatabase> db.createUser(
  24. ... {
  25. ... user: "elastic",
  26. ... pwd: "123456",
  27. ... roles: [ { role: "readWrite", db: "mydatabase" } ]
  28. ... }
  29. ... )

如果我们没有使用 db.auth 来进行验证,那么我们不能针对 mydatabase 进行任何的操作。我们验证成功后,就可以为 mydatabase 进行创建用户。如上所示,我们创建了一个叫做 elastic/123456 的用户。

提示:我们可以使用 db.dropUser("<Username>") 来删除一个用户。 

我们也可以通过 elastic/123456 来进行验证:

  1. parallels@ubuntu2204:~$ mongosh mongodb://192.168.0.9:27017
  2. Current Mongosh Log ID: 663a2db6f8be5e933f99ea71
  3. Connecting to: mongodb://192.168.0.9:27017/?directConnection=true&appName=mongosh+2.2.5
  4. Using MongoDB: 7.0.8
  5. Using Mongosh: 2.2.5
  6. For mongosh info see: https://docs.mongodb.com/mongodb-shell/
  7. test> use mydatabase
  8. switched to db mydatabase
  9. mydatabase> db.auth("elastic", "password")
  10. MongoServerError[AuthenticationFailed]: Authentication failed.
  11. mydatabase> db.auth("elastic", "123456")
  12. { ok: 1 }
  13. mydatabase> show tables
  14. mycol
  15. posts

Java 安装

你需要安装 Java。版本在 Java 8 或者 Java 11。我们可以参考链接来查找需要的 Java 版本。

Elasticsearch

我们可参考我之前的文章 “如何在 Linux,MacOS 及 Windows 上进行安装 Elasticsearch” 来安装 Elasticsearch。特别地,我们需要按照 Elastic Stack 8.x 的安装指南来进行安装。

在 Elasticsearch 终端输出中,找到 elastic 用户的密码和 Kibana 的注册令牌。 这些是在 Elasticsearch 第一次启动时打印的。

我们记下这个密码,并在下面的配置中进行使用。同时它也会生成相应的证书文件:

  1. $ pwd
  2. /Users/liuxg/elastic/elasticsearch-8.13.2/config/certs
  3. $ ls
  4. http.p12 http_ca.crt transport.p12

保存密码、注册令牌和证书路径名。 你将在后面的步骤中需要它们。如果你对这些操作还不是很熟的话,请参考我之前的文章 “Elastic Stack 8.0 安装 - 保护你的 Elastic Stack 现在比以往任何时候都简单”。

安装 Kibana

我们接下来安装 Kibana。我们可以参考我之前的文章 “如何在 Linux,MacOS 及 Windows 上安装 Elastic 栈中的 Kibana” 来进行我们的安装。特别地,我们需要安装 Kibana 8.2 版本。如果你还不清楚如何安装 Kibana 8.2,那么请阅读我之前的文章 “Elastic Stack 8.0 安装 - 保护你的 Elastic Stack 现在比以往任何时候都简单”。在启动 Kibana 之前,我们可以修改 Kibana 的配置文件如下。添加如下的句子到 config/kibana.yml 中去:

config/kibana.yml

enterpriseSearch.host: http://localhost:3002

Enterprise search 安装

我们在地址 Download Elastic Enterprise Search | Elastic 找到我们需要的版本进行下载。并按照页面上相应的指令来进行按照。如果你想针对你以前的版本进行安装的话,请参阅地址 https://www.elastic.co/downloads/past-releases#app-search

等我们下载完 Enterprise Search 的安装包,我们可以使用如下的命令来进行解压缩:

  1. $ pwd
  2. /Users/liuxg/elastic
  3. $ ls
  4. elastic-agent-8.13.2-darwin-aarch64.tar.gz kibana-8.13.2
  5. elasticsearch-8.13.2 kibana-8.13.2-darwin-aarch64.tar.gz
  6. elasticsearch-8.13.2-darwin-aarch64.tar.gz logstash-8.13.2-darwin-aarch64.tar.gz
  7. enterprise-search-8.13.2.tar.gz metricbeat-8.13.2-darwin-aarch64.tar.gz
  8. filebeat-8.13.2-darwin-aarch64.tar.gz
  9. $ tar xzf enterprise-search-8.13.2.tar.gz
  10. $ cd enterprise-search-8.13.2
  11. $ ls
  12. LICENSE NOTICE.txt README.md bin config lib metricbeat

如上所示,它含有一个叫做 config 的目录。我们在启动  Enterprise Search 之前,必须做一些相应的配置。我们需要修改 config/enterprise-search.yml 文件。在这个文件中添加如下的内容:

config/enterprise-search.yml

  1. allow_es_settings_modification: true
  2. # secret_management.encryption_keys: ['q3t6w9z$C&F)J@McQfTjWnZr4u7x!A%D']
  3. elasticsearch.username: elastic
  4. elasticsearch.password: "VDMlz5QnM_0g-349fFq7"
  5. elasticsearch.host: https://127.0.0.1:9200
  6. elasticsearch.ssl.enabled: true
  7. elasticsearch.ssl.certificate_authority: /Users/liuxg/elastic/elasticsearch-8.13.2/config/certs/http_ca.crt
  8. kibana.external_url: http://localhost:5601

在上面,请注意 elasticsearch.password 是我们在 Elasticsearch 安装过程中生成的密码。elasticsearch.ssl.certificate_authority 必须根据自己的 Elasticsearch 安装路径中生成的证书进行配置。在上面的配置中,如果我们还没有配置 secret_management.encryption_keys。我们可以使用上面的配置先运行,然后让系统帮我们生成。在配置上面的密码时,我们需要添加上引号。我发现在密码中含有 * 字符会有错误的信息。我们使用如下的命令来启动:

bin/enterprise-search

  1. allow_es_settings_modification: true
  2. # secret_management.encryption_keys: ['q3t6w9z$C&F)J@McQfTjWnZr4u7x!A%D']
  3. secret_management.encryption_keys: [8641aa589cf5d6a90250e25a875eeda20052f049999e3f31f9265c7444ad703a]
  4. elasticsearch.username: elastic
  5. elasticsearch.password: "VDMlz5QnM_0g-349fFq7"
  6. elasticsearch.host: https://127.0.0.1:9200
  7. elasticsearch.ssl.enabled: true
  8. elasticsearch.ssl.certificate_authority: /Users/liuxg/elastic/elasticsearch-8.13.2/config/certs/http_ca.crt
  9. kibana.external_url: http://localhost:5601

我们再次运行:

bin/enterprise-search

  1. allow_es_settings_modification: true
  2. # secret_management.encryption_keys: ['q3t6w9z$C&F)J@McQfTjWnZr4u7x!A%D']
  3. secret_management.encryption_keys: [8641aa589cf5d6a90250e25a875eeda20052f049999e3f31f9265c7444ad703a]
  4. elasticsearch.username: elastic
  5. elasticsearch.password: "VDMlz5QnM_0g-349fFq7"
  6. elasticsearch.host: https://127.0.0.1:9200
  7. elasticsearch.ssl.enabled: true
  8. elasticsearch.ssl.certificate_authority: /Users/liuxg/elastic/elasticsearch-8.13.2/config/certs/http_ca.crt
  9. kibana.external_url: http://localhost:5601
  10. secret_session_key: f5b111b4272b3593c08599a4dfd236676e2b74ece5c805eef19e528d4c5191e950b62c5499384d0690ffdef52d31e03a5cd23c49718531aa3a2f59aa80ff40d2
  11. feature_flag.elasticsearch_search_api: true

为了能够使得我们能够在 App Search 中使用 Elasticsearch 搜索,我们必须设置
feature_flag.elasticsearch_search_api: true。 我们再次重新启动 enterprise search:

再次重新启动:

bin/enterprise-search

这次启动后,我们再也不会看到任何的配置输出了。这样我们的 enterprise search 就配置好了。

至此我们的安装已经完成了。

同步数据到 Elasticsearch

我们登录到 Kibana,然后转到 Search -> Content -> Indices -> Create a new index

我们接下来创建一个 Connector:

接下来,我们需要按照 Kibana 中的指示来进行操作:

我们首先需要申请一个 API key:

我们需要克隆 elastic/connectors 仓库:

git clone https://github.com/elastic/connectors

我们进入到该仓库代码的根目录下,并把 config.yml.example 文件拷贝到 config.yml 文件中。我们也同时拷贝 Elasticsearch 的证书到这个目录下:

  1. $ pwd
  2. /Users/liuxg/python/connectors
  3. $ ls
  4. Dockerfile catalog-info.yaml requirements
  5. Dockerfile.ftest config.yml.example resources
  6. LICENSE connectors scripts
  7. MANIFEST.in docs setup.py
  8. Makefile logo-enterprise-search.png tests
  9. NOTICE.txt pyrightconfig.json
  10. README.md pytest.ini
  11. $ cp config.yml.example config.yml
  12. $ cp ~/elastic/elasticsearch-8.13.2/config/certs/http_ca.crt .

config.yml

  1. connectors:
  2. -
  3. connector_id: "7V37Do8Bjfz0jsfahzL3"
  4. service_type: "mongodb"
  5. elasticsearch:
  6. host: "https://192.168.0.3:9200"
  7. api_key: "N2wzX0RvOEJqZnowanNmYWhqS0k6eVZtSDJMVFFRRS1UZjFacWI2eUhPUQ=="
  8. ssl: true
  9. username: "elastic"
  10. password: "VDMlz5QnM_0g-349fFq7"
  11. ca_certs: "/usr/share/certs/http_ca.crt"
  12. service:
  13. idling: 30
  14. heartbeat: 300
  15. max_errors: 20
  16. max_errors_span: 600
  17. max_concurrent_content_syncs: 1
  18. max_concurrent_access_control_syncs: 1
  19. job_cleanup_interval: 300
  20. log_level: INFO
  21. sources:
  22. mongodb: connectors.sources.mongo:MongoDataSource

你需要根据自己的配置进行相应的修改。请注意上面的 connector_id 需要根据上面界面生成的值进行相应的修改。

在运行下面的 docker 之前,我们可以使用如下的命令来检查 docker 能否访问 MongdoDB 所在的 IP 地址:

docker run --rm -it --network host alpine ping -c 4 192.168.0.9 )

根据链接的描述,我们有两种方法来进行部署。我们选择使用 docker 来进行部署。我们在刚才创建的 config.yml 目录下进行运行:

  1. docker run \
  2. -v ~/python/connectors:/config \
  3. --volume="$PWD/http_ca.crt:/usr/share/certs/http_ca.crt:ro" \
  4. --rm \
  5. --tty -i \
  6. --network host \
  7. docker.elastic.co/enterprise-search/elastic-connectors:8.13.0.0-SNAPSHOT \
  8. /app/bin/elastic-ingest \
  9. -c /config/config.yml

运行完上面的命令后,我们可以在 Kibana 中看到:

上面显示我们的 connector 已经成功地连接了。

我们也可以使用自己数据库上的账号:

在进行下面的同步之前,我们可以先创建一个索引 test,当然也可以使用之前创建的 mydatabase:

一旦配置完毕,我们可以看到如下的画面:

点击上面的 Sync 按钮:

等 Sync 完毕后,我们可以在 Docker 的 terminal 中看到如下的画面:

我们可以到 Kibana 中进行查看文件:

GET test/_search

我们可以看到 test 现在含有通过过来的文档。

好了,现在我们已经完成了从 MongoDB 到 Elasticsearch 的同步了。我们可以很方便地把 MongoDB 的数据通过 Connector 同步到 Elasticsearch。希望这篇文章对大家的开发有所帮助。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/知新_RL/article/detail/556937
推荐阅读
相关标签
  

闽ICP备14008679号