当前位置:   article > 正文

MySQL数据同步到ES集群(MySQL数据库与ElasticSearch全文检索的同步)_数据库有全文索引还要es

数据库有全文索引还要es

简介:MySQL数据库与ElasticSearch全文检索的同步,通过binlog的设置对MySQL数据库操作的日志进行记录,利用Python模块对日志进行操作,再利用kafka的生产者消费者模式进行订阅,最终实现MySQL与ElasticSearch间数据的同步。


视频地址:

  1. mysql与elasticsearch同步1-数据库binlog的设置及python读取
  2. mysql与elasticsearch同步2-kafka生产者消费者模式消费binlog
  3. mysql与elasticsearch同步3-elasticsearch的增删改同步数据库

博客地址:

  1. Python实战案例:elasticsearch与数据库mysql的同步(上)
  2. Python实战案例:elasticsearch与数据库mysql的同步(下)

目录

P01-数据库binlog的设置及python读取

程序汇总

reader.py

运行截图

P02-kafka生产者消费者模式消费binlog

zookeeper安装

kafka安装

程序汇总

kafka_consumer.py

kafka_producer.py

kafka_producer_reader.py

reader_data.py

运行截图

P03-elasticsearch的增删改同步数据库

程序汇总

kafka_consumer.py

kafka_producer.py

kafka_producer_reader.py

reader_data.py

附录

视频word笔记

sql语句-readerbinlog.sql


P01-数据库binlog的设置及python读取

mysql -u root -p

show global variables like "%binlog%";

show binlog events;

set global binlog_format="ROW";

create database readerBinlog default charset=utf8;

use readerBinlog;

create table mytable(id int(11), name varchar(20));

insert into table mytable values(1, "孙大圣");

mysql> use readerbinlog;
Database changed
mysql> select * from mytable;
+------+------+
| id   | name |
+------+------+
|    1 | sds  |
|    2 | zbj  |
+------+------+
2 rows in set (0.00 sec)

 

pip3 install mysql-replication

【MySQL】Server-id导致Slave_IO_Running: No主从复制故障_ITPUB博客

(1236, 'Misconfigured master - server id was not set')

  1. mysql> SET GLOBAL server_id=3028;
  2. Query OK, 0 rows affected (0.00 sec)

程序汇总

reader.py

  1. from pymysqlreplication import BinLogStreamReader
  2. from pymysqlreplication.row_event import (
  3. DeleteRowsEvent,
  4. UpdateRowsEvent,
  5. WriteRowsEvent,
  6. )
  7. import json
  8. import sys
  9. MYSQL_SETTINGS = {
  10. "host": "localhost",
  11. "user": "root",
  12. "password": "root"
  13. }
  14. stream = BinLogStreamReader(connection_settings=MYSQL_SETTINGS,
  15. server_id=2,
  16. blocking=True,
  17. only_schemas="readerbinlog",
  18. only_events=[WriteRowsEvent, UpdateRowsEvent, DeleteRowsEvent])
  19. print(stream)
  20. for binlogstream in stream:
  21. for row in binlogstream.rows:
  22. print("========================")
  23. print(row)

运行截图

P02-kafka生产者消费者模式消费binlog

zookeeper安装

zookeeper下载地址:Index of /zookeeper

kafka安装

kafka下载地址:Apache Kafka

cd windows

dir

kafka-server-start

kafka-server-start ..\..\config\server.properties

kafka-console-producer --broker-list localhost:9092 --topic test

kafka-console-consumer --bootstrap-server localhost:9092 --topic test

pip3 install kafka-python

程序汇总

kafka_consumer.py

  1. from kafka import KafkaConsumer
  2. consumer = KafkaConsumer("message", bootstrap_servers=["localhost:9092"])
  3. for mess in consumer:
  4. print(mess.value.decode("utf8"))

kafka_producer.py

  1. from kafka import KafkaProducer
  2. # 实例化生产者
  3. producer = KafkaProducer(bootstrap_servers=["localhost:9092"])
  4. producer.send("message", "kafka信息".encode())
  5. producer.close()

kafka_producer_reader.py

  1. from kafka import KafkaProducer
  2. import json
  3. # 实例化生产者
  4. producer = KafkaProducer(bootstrap_servers=["localhost:9092"])
  5. from pymysqlreplication import BinLogStreamReader
  6. from pymysqlreplication.row_event import (
  7. DeleteRowsEvent,
  8. UpdateRowsEvent,
  9. WriteRowsEvent,
  10. )
  11. MYSQL_SETTINGS = {
  12. "host": "localhost",
  13. "user": "root",
  14. "password": "root"
  15. }
  16. stream = BinLogStreamReader(connection_settings=MYSQL_SETTINGS,
  17. server_id=4,
  18. blocking=True,
  19. only_schemas="readerbinlog",
  20. only_events=[WriteRowsEvent, UpdateRowsEvent, DeleteRowsEvent])
  21. # print(stream)
  22. for binlogstream in stream:
  23. for row in binlogstream.rows:
  24. # print("========================")
  25. # print(row)
  26. row_json = json.dumps(row, ensure_ascii=False)
  27. producer.send("message", row_json.encode())
  28. producer.close()

reader_data.py

  1. import pymysql
  2. from elasticsearch import Elasticsearch
  3. def get_data():
  4. # 连接数据库
  5. conn = pymysql.connect(host="localhost", port=3306, user="root", password="root", database="readerbinlog")
  6. # 设置游标
  7. cursor = conn.cursor()
  8. # 执行sql语句,查找数据库中的所有的记录
  9. sql = "select * from mytable"
  10. cursor.execute(sql)
  11. # 获取执行sql语句后的所有结果
  12. results = cursor.fetchall()
  13. # 返回从数据库中取出的数据
  14. return results
  15. def write_elasticsearch():
  16. # es = Elasticsearch()
  17. es = Elasticsearch(['http://localhost:9100'])
  18. try:
  19. results = get_data()
  20. for row in results:
  21. print(row)
  22. res = {
  23. "id": row[0],
  24. "name": row[1]
  25. }
  26. # es.index(index="westjourney", doc_type="test-type", id=row[0], body=res)
  27. es.index(index="westjourney", doc_type="test-type", id=row[0], body=res)
  28. except Exception as e:
  29. print(e)
  30. if __name__ == "__main__":
  31. # print(get_data())
  32. write_elasticsearch()

运行截图

P03-elasticsearch的增删改同步数据库

pip3 install elasticsearch

谷歌浏览器es head插件

  1. import pymysql
  2. from elasticsearch import Elasticsearch
  3. def get_data():
  4. # 连接数据库
  5. conn = pymysql.connect(host="localhost", port=3306, user="root", password="root", database="readerbinlog")
  6. # 设置游标
  7. cursor = conn.cursor()
  8. # 执行sql语句,查找数据库中的所有的记录
  9. sql = "select * from mytable"
  10. cursor.execute(sql)
  11. # 获取执行sql语句后的所有结果
  12. results = cursor.fetchall()
  13. # 返回从数据库中取出的数据
  14. return results
  15. def write_elasticsearch():
  16. # es = Elasticsearch()
  17. es = Elasticsearch(['http://localhost:9100'])
  18. try:
  19. results = get_data()
  20. for row in results:
  21. print(row)
  22. res = {
  23. "id": row[0],
  24. "name": row[1]
  25. }
  26. # es.index(index="westjourney", doc_type="test-type", id=row[0], body=res)
  27. es.index(index="westjourney", doc_type="test-type", id=row[0], body=res)
  28. except Exception as e:
  29. print(e)
  30. if __name__ == "__main__":
  31. # print(get_data())
  32. write_elasticsearch()

程序汇总

kafka_consumer.py

  1. from kafka import KafkaConsumer
  2. import json
  3. from elasticsearch import Elasticsearch
  4. consumer = KafkaConsumer("message", bootstrap_servers=["localhost:9092"])
  5. es = Elasticsearch()
  6. for mess in consumer:
  7. # print(mess.value.decode("utf8"))
  8. # 传进来的数据需要进行json转换
  9. result = json.loads(mess.value.decode("utf8"))
  10. # print(event["event"])
  11. event = result["event"]
  12. if event == "insert":
  13. result_values = result["values"]
  14. es.index(index="westjourney", doc_type="test-type", id=result_values["id"], body=result_values)
  15. print("添加数据成功!")
  16. elif event == "update":
  17. # 注意更新操作,body内容要加入一个doc键,指示的内容就是要修改的内容
  18. result_values = result["after_values"]
  19. es.update(index="westjourney", doc_type="test-type", id=result_values["id"], body={"doc": result_values})
  20. print("更新数据成功!")
  21. elif event == "delete":
  22. result_id = result["values"]["id"]
  23. es.delete(index="westjourney", doc_type="test-type", id=result_id)
  24. print("删除数据成功!")

kafka_producer.py

  1. from kafka import KafkaProducer
  2. # 实例化生产者
  3. producer = KafkaProducer(bootstrap_servers=["localhost:9092"])
  4. producer.send("message", "kafka信息".encode())
  5. producer.close()

kafka_producer_reader.py

  1. from kafka import KafkaProducer
  2. import json
  3. # 实例化生产者
  4. producer = KafkaProducer(bootstrap_servers=["localhost:9092"])
  5. from pymysqlreplication import BinLogStreamReader
  6. from pymysqlreplication.row_event import (
  7. DeleteRowsEvent,
  8. UpdateRowsEvent,
  9. WriteRowsEvent,
  10. )
  11. MYSQL_SETTINGS = {
  12. "host": "localhost",
  13. "user": "root",
  14. "password": "root"
  15. }
  16. stream = BinLogStreamReader(connection_settings=MYSQL_SETTINGS,
  17. server_id=4,
  18. blocking=True,
  19. only_schemas="readerbinlog",
  20. only_events=[WriteRowsEvent, UpdateRowsEvent, DeleteRowsEvent])
  21. # print(stream)
  22. for binlogstream in stream:
  23. for row in binlogstream.rows:
  24. # print("========================")
  25. # print(row)
  26. if isinstance(binlogstream, WriteRowsEvent):
  27. row["event"] = "insert"
  28. elif isinstance(binlogstream, UpdateRowsEvent):
  29. row["event"] = "update"
  30. elif isinstance(binlogstream, DeleteRowsEvent):
  31. row["event"] = "delete"
  32. row_json = json.dumps(row, ensure_ascii=False)
  33. producer.send("message", row_json.encode())
  34. producer.close()

reader_data.py

  1. import pymysql
  2. from elasticsearch import Elasticsearch
  3. def get_data():
  4. # 连接数据库
  5. conn = pymysql.connect(host="localhost", port=3306, user="root", password="root", database="readerbinlog")
  6. # 设置游标
  7. cursor = conn.cursor()
  8. # 执行sql语句,查找数据库中的所有的记录
  9. sql = "select * from mytable"
  10. cursor.execute(sql)
  11. # 获取执行sql语句后的所有结果
  12. results = cursor.fetchall()
  13. # 返回从数据库中取出的数据
  14. return results
  15. def write_elasticsearch():
  16. # es = Elasticsearch()
  17. es = Elasticsearch(['http://localhost:9100'])
  18. try:
  19. results = get_data()
  20. for row in results:
  21. print(row)
  22. res = {
  23. "id": row[0],
  24. "name": row[1]
  25. }
  26. # es.index(index="westjourney", doc_type="test-type", id=row[0], body=res)
  27. es.index(index="westjourney", doc_type="test-type", id=row[0], body=res)
  28. except Exception as e:
  29. print(e)
  30. if __name__ == "__main__":
  31. # print(get_data())
  32. write_elasticsearch()

附录

视频word笔记

  

 

sql语句-readerbinlog.sql

  1. /*
  2. SQLyog Ultimate v12.08 (64 bit)
  3. MySQL - 5.5.40-log : Database - readerbinlog
  4. *********************************************************************
  5. */
  6. /*!40101 SET NAMES utf8 */;
  7. /*!40101 SET SQL_MODE=''*/;
  8. /*!40014 SET @OLD_UNIQUE_CHECKS=@@UNIQUE_CHECKS, UNIQUE_CHECKS=0 */;
  9. /*!40014 SET @OLD_FOREIGN_KEY_CHECKS=@@FOREIGN_KEY_CHECKS, FOREIGN_KEY_CHECKS=0 */;
  10. /*!40101 SET @OLD_SQL_MODE=@@SQL_MODE, SQL_MODE='NO_AUTO_VALUE_ON_ZERO' */;
  11. /*!40111 SET @OLD_SQL_NOTES=@@SQL_NOTES, SQL_NOTES=0 */;
  12. CREATE DATABASE /*!32312 IF NOT EXISTS*/`readerbinlog` /*!40100 DEFAULT CHARACTER SET utf8 */;
  13. USE `readerbinlog`;
  14. /*Table structure for table `mytable` */
  15. DROP TABLE IF EXISTS `mytable`;
  16. CREATE TABLE `mytable` (
  17. `id` int(11) DEFAULT NULL,
  18. `name` varchar(20) DEFAULT NULL
  19. ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
  20. /*Data for the table `mytable` */
  21. insert into `mytable`(`id`,`name`) values (1,'sds'),(2,'zbj'),(3,'lsls'),(4,'shdjsh'),(5,'宋壹');
  22. /*!40101 SET SQL_MODE=@OLD_SQL_MODE */;
  23. /*!40014 SET FOREIGN_KEY_CHECKS=@OLD_FOREIGN_KEY_CHECKS */;
  24. /*!40014 SET UNIQUE_CHECKS=@OLD_UNIQUE_CHECKS */;
  25. /*!40111 SET SQL_NOTES=@OLD_SQL_NOTES */;

ヾ(◍°∇°◍)ノ゙加油~

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/代码探险家/article/detail/882856
推荐阅读
相关标签
  

闽ICP备14008679号