当前位置:   article > 正文

Python 消费Kafka手动提交 批量存入Elasticsearch_kafka 读取topic 消费数据后如何提交 python

kafka 读取topic 消费数据后如何提交 python

一、第三方包选择

pip install kafka,对比了kafka和pykafka,还是选择kafka,消费速度更快
pip install elasticsearch==7.12.0(ES版本)

二、创建es连接对象

  1. from elasticsearch import Elasticsearch
  2. from elasticsearch.helpers import bulk
  3. class Create_ES(object):
  4. _instance = None
  5. def __new__(cls, *args, **kwargs):
  6. if cls._instance is None:
  7. cls._instance = super().__new__(cls)
  8. return cls._instance
  9. def __init__(self, hosts):
  10. try:
  11. self.es = Elasticsearch([{'host':host, 'port':9200}])
  12. except Exception as e:
  13. print('Connect ES Fail db:{} error:{}'.format(hosts, str(e)))
  14. def get_conn(self):
  15. return self.es
  16. def set_multi_data(self, datas):
  17. '''批量插入数据'''
  18. success = bulk(self.es, datas, raise_on_error=True)
  19. return success

三、消费kafka数据

  1. from kafka import KafkaConsumer, TopicPartition, OffsetAndMetadata
  2. from . import Create_ES
  3. class AppKfkConsumer(object):
  4. def __init__(self):
  5. self.server = 'localhost:9092'
  6. self.topic = KAFKA_TOPIC
  7. self.consumer = None
  8. self.tp = None
  9. self.consumer_timeout_ms = 5000 # 设置消费超时时间,
  10. self.type = 'members'
  11. self.group_id = 'test1' # 设置消费group_id,避免重复消费
  12. self.es_index = 'index' # es的index
  13. def get_connect(self):
  14. self.consumer = KafkaConsumer(
  15. group_id=self.group_id,
  16. auto_offset_reset='earliest', # 从最早的数据开始消费
  17. bootstrap_servers=self.server,
  18. enable_auto_commit=False, # 关闭自动提交
  19. consumer_timeout_ms=self.consumer_timeout_ms
  20. )
  21. self.tp = TopicPartition(topic=self.topic, partition=0) # 设置我们要消费的分区
  22. self.consumer.assign([self.tp]) # 由consumer对象分配分区
  23. def beginConsumer(self):
  24. now_offset = 0 # 当前偏移量
  25. es_conn = Create_ES()
  26. Actions = []
  27. while True:
  28. for message in self.consumer:
  29. now_offset = message.offset # 获取当前偏移量
  30. data = eval(message.value.decode()) # 解析数据
  31. action = {
  32. "_index": self.es_index,
  33. "_type": self.type,
  34. "_source": data
  35. }
  36. Actions.append(action)
  37. if len(Actions) >= 50000:
  38. result = es_conn.set_multi_data(Actions) # 批量插入数据
  39. Actions = []
  40. # 提交偏移量,now_offset+1的原因是因为我发现如果不加1,下次消费会从上次消费最后一条数据开始,重复消费
  41. self.consumer.commit(offsets={tp:(OffsetAndMetadata(now_offset+1, None))})
  42. if len(Actions) > 0:
  43. result = es_conn.set_multi_data(Actions)
  44. Actions = []
  45. self.consumer.commit(offsets={tp:(OffsetAndMetadata(now_offset+1, None))})
  46. def delconnect(self):
  47. self.consumer.close()
  48. # 执行任务
  49. ks = AppKfkConsumer()
  50. ks.get_connect()
  51. ks.beginConsumer()

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小桥流水78/article/detail/893239
推荐阅读
相关标签
  

闽ICP备14008679号