赞
踩
pip install kafka,对比了kafka和pykafka,还是选择kafka,消费速度更快
pip install elasticsearch==7.12.0(ES版本)
- from elasticsearch import Elasticsearch
- from elasticsearch.helpers import bulk
-
- class Create_ES(object):
- _instance = None
-
- def __new__(cls, *args, **kwargs):
- if cls._instance is None:
- cls._instance = super().__new__(cls)
- return cls._instance
-
- def __init__(self, hosts):
- try:
- self.es = Elasticsearch([{'host':host, 'port':9200}])
- except Exception as e:
- print('Connect ES Fail db:{} error:{}'.format(hosts, str(e)))
-
- def get_conn(self):
- return self.es
-
- def set_multi_data(self, datas):
- '''批量插入数据'''
- success = bulk(self.es, datas, raise_on_error=True)
- return success
- from kafka import KafkaConsumer, TopicPartition, OffsetAndMetadata
- from . import Create_ES
-
- class AppKfkConsumer(object):
- def __init__(self):
- self.server = 'localhost:9092'
- self.topic = KAFKA_TOPIC
- self.consumer = None
- self.tp = None
- self.consumer_timeout_ms = 5000 # 设置消费超时时间,
- self.type = 'members'
- self.group_id = 'test1' # 设置消费group_id,避免重复消费
- self.es_index = 'index' # es的index
-
- def get_connect(self):
- self.consumer = KafkaConsumer(
- group_id=self.group_id,
- auto_offset_reset='earliest', # 从最早的数据开始消费
- bootstrap_servers=self.server,
- enable_auto_commit=False, # 关闭自动提交
- consumer_timeout_ms=self.consumer_timeout_ms
- )
- self.tp = TopicPartition(topic=self.topic, partition=0) # 设置我们要消费的分区
- self.consumer.assign([self.tp]) # 由consumer对象分配分区
-
- def beginConsumer(self):
- now_offset = 0 # 当前偏移量
-
- es_conn = Create_ES()
- Actions = []
- while True:
- for message in self.consumer:
- now_offset = message.offset # 获取当前偏移量
- data = eval(message.value.decode()) # 解析数据
- action = {
- "_index": self.es_index,
- "_type": self.type,
- "_source": data
- }
- Actions.append(action)
- if len(Actions) >= 50000:
- result = es_conn.set_multi_data(Actions) # 批量插入数据
- Actions = []
- # 提交偏移量,now_offset+1的原因是因为我发现如果不加1,下次消费会从上次消费最后一条数据开始,重复消费
- self.consumer.commit(offsets={tp:(OffsetAndMetadata(now_offset+1, None))})
-
- if len(Actions) > 0:
- result = es_conn.set_multi_data(Actions)
- Actions = []
- self.consumer.commit(offsets={tp:(OffsetAndMetadata(now_offset+1, None))})
-
-
- def delconnect(self):
- self.consumer.close()
-
- # 执行任务
- ks = AppKfkConsumer()
- ks.get_connect()
- ks.beginConsumer()
赞
踩
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。