当前位置:   article > 正文

PostgreSQL内核扩展之 - ElasticSearch同步插件

postgresql数据入es工具

背景介绍

Elasticsearch 是开源搜索平台的新成员,实时数据分析的神器,发展迅猛,基于 Lucene、RESTful、分布式、面向云计算设计、实时搜索、全文搜索、稳定、高可靠、可扩展、安装+使用方便。

PostgreSQL 是起源自伯克利大学的开源数据库,历史悠久,内核扩展性极强,用户横跨各个行业。
关于PostgreSQL的内核扩展指南请参考
https://yq.aliyun.com/articles/55981

传统数据库与搜索引擎ES如何同步

例如用户需要将数据库中某些数据同步到ES建立索引,传统的方法需要应用来负责数据的同步。
这种方法会增加一定的开发成本,时效也不是非常的实时。
2
1

PostgreSQL与ES结合有什么好处

PostgreSQL的扩展插件pg-es-fdw,使用PostgreSQL的foreign data wrap,允许直接在数据库中读写ES,方便用户实时的在ES建立索引。
这种方法不需要额外的程序支持,时效也能得到保障。
3

case

安装PostgreSQL 9.5

略,需要包含 --with-python

安装 ES on CentOS 7

  1. # yum install -y java-1.7.0-openjdk
  2. # rpm --import https://packages.elastic.co/GPG-KEY-elasticsearch
  3. # vi /etc/yum.repos.d/es.repo
  4. [elasticsearch-2.x]
  5. name=Elasticsearch repository for 2.x packages
  6. baseurl=https://packages.elastic.co/elasticsearch/2.x/centos
  7. gpgcheck=1
  8. gpgkey=https://packages.elastic.co/GPG-KEY-elasticsearch
  9. enabled=1
  10. # yum install -y elasticsearch
  11. # /bin/systemctl daemon-reload
  12. # /bin/systemctl enable elasticsearch.service
  13. # /bin/systemctl start elasticsearch.service
  14. # python --version
  15. Python 2.7.5
  16. # curl -X GET 'http://localhost:9200'
  17. {
  18. "name" : "Red Wolf",
  19. "cluster_name" : "elasticsearch",
  20. "version" : {
  21. "number" : "2.3.3",
  22. "build_hash" : "218bdf10790eef486ff2c41a3df5cfa32dadcfde",
  23. "build_timestamp" : "2016-05-17T15:40:04Z",
  24. "build_snapshot" : false,
  25. "lucene_version" : "5.5.0"
  26. },
  27. "tagline" : "You Know, for Search"
  28. }

python client

  1. # easy_install pip
  2. # pip install elasticsearch

PostgreSQL 插件 multicorn

  1. # wget http://api.pgxn.org/dist/multicorn/1.3.2/multicorn-1.3.2.zip
  2. # unzip multicorn-1.3.2.zip
  3. # cd multicorn-1.3.2
  4. # export PATH=/home/digoal/pgsql9.5/bin:$PATH
  5. # make && make install
  6. # su - digoal
  7. $ psql
  8. postgres=# create extension multicorn ;
  9. CREATE EXTENSION

PostgreSQL 插件 pg-es-fdw (foreign server基于multicorn)

  1. # git clone https://github.com/Mikulas/pg-es-fdw /tmp/pg-es-fdw
  2. # cd /tmp/pg-es-fdw
  3. # export PATH=/home/digoal/pgsql9.5/bin:$PATH
  4. # python setup.py install
  5. # su - digoal
  6. $ psql

使用例子

基于multicorn创建es foreign server
  1. CREATE SERVER multicorn_es FOREIGN DATA WRAPPER multicorn
  2. OPTIONS (
  3. wrapper 'dite.ElasticsearchFDW'
  4. );
创建测试表
  1. CREATE TABLE articles (
  2. id serial PRIMARY KEY,
  3. title text NOT NULL,
  4. content text NOT NULL,
  5. created_at timestamp
  6. );
创建外部表
  1. CREATE FOREIGN TABLE articles_es (
  2. id bigint,
  3. title text,
  4. content text
  5. ) SERVER multicorn_es OPTIONS (host '127.0.0.1', port '9200', node 'test', index 'articles');
创建触发器

对实体表,创建触发器函数,在用户对实体表插入,删除,更新时,通过触发器函数自动将数据同步到对应ES的外部表。
同步过程调用FDW的接口,对ES进行索引的建立,更新,删除。

  1. CREATE OR REPLACE FUNCTION index_article() RETURNS trigger AS $def$
  2. BEGIN
  3. INSERT INTO articles_es (id, title, content) VALUES
  4. (NEW.id, NEW.title, NEW.content);
  5. RETURN NEW;
  6. END;
  7. $def$ LANGUAGE plpgsql;
  8. CREATE OR REPLACE FUNCTION reindex_article() RETURNS trigger AS $def$
  9. BEGIN
  10. UPDATE articles_es SET
  11. title = NEW.title,
  12. content = NEW.content
  13. WHERE id = NEW.id;
  14. RETURN NEW;
  15. END;
  16. $def$ LANGUAGE plpgsql;
  17. CREATE OR REPLACE FUNCTION delete_article() RETURNS trigger AS $def$
  18. BEGIN
  19. DELETE FROM articles_es a WHERE a.id = OLD.id;
  20. RETURN OLD;
  21. END;
  22. $def$ LANGUAGE plpgsql;
  23. CREATE TRIGGER es_insert_article
  24. AFTER INSERT ON articles
  25. FOR EACH ROW EXECUTE PROCEDURE index_article();
  26. CREATE TRIGGER es_update_article
  27. AFTER UPDATE OF title, content ON articles
  28. FOR EACH ROW
  29. WHEN (OLD.* IS DISTINCT FROM NEW.*)
  30. EXECUTE PROCEDURE reindex_article();
  31. CREATE TRIGGER es_delete_article
  32. BEFORE DELETE ON articles
  33. FOR EACH ROW EXECUTE PROCEDURE delete_article();

测试

  1. curl 'localhost:9200/test/articles/_search?q=*:*&pretty'
  2. psql -c 'SELECT * FROM articles'
  3. 写入实体表,自动同步到ES
  4. psql -c "INSERT INTO articles (title, content, created_at) VALUES ('foo', 'spike', Now());"
  5. psql -c 'SELECT * FROM articles'
  6. 查询ES,检查数据是否已同步
  7. curl 'localhost:9200/test/articles/_search?q=*:*&pretty'
  8. 更新实体表,数据自动同步到ES
  9. psql -c "UPDATE articles SET content='yeay it updates\!' WHERE title='foo'"
  10. 查询ES数据是否更新
  11. curl 'localhost:9200/test/articles/_search?q=*:*&pretty'

参考

https://www.elastic.co/guide/en/elasticsearch/reference/current/setup-repositories.html
http://www.vpsee.com/2014/05/install-and-play-with-elasticsearch/
https://github.com/Mikulas/pg-es-fdw
https://wiki.postgresql.org/wiki/Fdw
http://multicorn.org/
http://pgxn.org/dist/multicorn/
http://multicorn.readthedocs.io/en/latest/index.html

小结

  1. PostgreSQL提供的FDW接口,允许用户在数据库中直接操纵外部的数据源,所以支持ES只是一个例子,还可以支持更多的数据源。
    这是已经支持的,几乎涵盖了所有的数据源。

https://wiki.postgresql.org/wiki/Fdw

  1. multicorn在FDW接口的上层再抽象了一层,支持使用python写FDW接口,方便快速试错,如果对性能要求不是那么高,直接用multicore就可以了。
  2. 开发人员如何编写FDW? 可以参考一下如下:
    http://multicorn.readthedocs.io/en/latest/index.html

https://yq.aliyun.com/articles/55981
https://www.postgresql.org/docs/9.6/static/fdwhandler.html

附录

  1. ###
  2. ### Author: Mikulas Dite
  3. ### Time-stamp: <2015-06-09 21:54:14 dwa>
  4. from multicorn import ForeignDataWrapper
  5. from multicorn.utils import log_to_postgres as log2pg
  6. from functools import partial
  7. import httplib
  8. import json
  9. import logging
  10. class ElasticsearchFDW (ForeignDataWrapper):
  11. def __init__(self, options, columns):
  12. super(ElasticsearchFDW, self).__init__(options, columns)
  13. self.host = options.get('host', 'localhost')
  14. self.port = int(options.get('port', '9200'))
  15. self.node = options.get('node', '')
  16. self.index = options.get('index', '')
  17. self.columns = columns
  18. def get_rel_size(self, quals, columns):
  19. """Helps the planner by returning costs.
  20. Returns a tuple of the form (nb_row, avg width)
  21. """
  22. conn = httplib.HTTPConnection(self.host, self.port)
  23. conn.request("GET", "/%s/%s/_count" % (self.node, self.index))
  24. resp = conn.getresponse()
  25. if not 200 == resp.status:
  26. return (0, 0)
  27. raw = resp.read()
  28. data = json.loads(raw)
  29. # log2pg('MARK RESPONSE: >>%d<<' % data['count'], logging.DEBUG)
  30. return (data['count'], len(columns) * 100)
  31. def execute(self, quals, columns):
  32. conn = httplib.HTTPConnection(self.host, self.port)
  33. conn.request("GET", "/%s/%s/_search&size=10000" % (self.node, self.index))
  34. resp = conn.getresponse()
  35. if not 200 == resp.status:
  36. yield {0, 0}
  37. raw = resp.read()
  38. data = json.loads(raw)
  39. for hit in data['hits']['hits']:
  40. row = {}
  41. for col in columns:
  42. if col == 'id':
  43. row[col] = hit['_id']
  44. elif col in hit['_source']:
  45. row[col] = hit['_source'][col]
  46. yield row
  47. @property
  48. def rowid_column(self):
  49. """Returns a column name which will act as a rowid column,
  50. for delete/update operations. This can be either an existing column
  51. name, or a made-up one.
  52. This column name should be subsequently present in every
  53. returned resultset.
  54. """
  55. return 'id';
  56. def es_index(self, id, values):
  57. content = json.dumps(values)
  58. conn = httplib.HTTPConnection(self.host, self.port)
  59. conn.request("PUT", "/%s/%s/%s" % (self.node, self.index, id), content)
  60. resp = conn.getresponse()
  61. if not 200 == resp.status:
  62. return
  63. raw = resp.read()
  64. data = json.loads(raw)
  65. return data
  66. def insert(self, new_values):
  67. log2pg('MARK Insert Request - new values: %s' % new_values, logging.DEBUG)
  68. if not 'id' in new_values:
  69. log2pg('INSERT requires "id" column. Missing in: %s' % new_values, logging.ERROR)
  70. id = new_values['id']
  71. new_values.pop('id', None)
  72. return self.es_index(id, new_values)
  73. def update(self, id, new_values):
  74. new_values.pop('id', None)
  75. return self.es_index(id, new_values)
  76. def delete(self, id):
  77. conn = httplib.HTTPConnection(self.host, self.port)
  78. conn.request("DELETE", "/%s/%s/%s" % (self.node, self.index, id))
  79. resp = conn.getresponse()
  80. if not 200 == resp.status:
  81. log2pg('Failed to delete: %s' % resp.read(), logging.ERROR)
  82. return
  83. raw = resp.read()
  84. return json.loads(raw)
  85. ## Local Variables: ***
  86. ## mode:python ***
  87. ## coding: utf-8 ***
  88. ## End: ***
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Monodyee/article/detail/136870
推荐阅读
相关标签
  

闽ICP备14008679号

        
cppcmd=keepalive&