当前位置:   article > 正文

Kafka消息处理

Kafka消息处理
1、proto文件转为pb
protoc commonapis.proto --python_out=.
  • 1
2、kafka的连接
import kafka
c = kafka.KafkaConsumer(topic,
                        bootstrap_servers=host, # 多个host用列表
                        auto_offset_reset='latest', # 消费最新
                        group_id=group_id # 设置组防止消费丢或重
                        ) 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
3、消费kafka数据
msg1 = next(c)  # 阻塞式单条消费
msg2 = c.poll(timeout_ms=1000, max_records=1) # 多条消费可设置超时时间
  • 1
  • 2
4、数据的反序列化
import *_pb2 as pb
struct_msg = pb.struct()  # proto中定义的结构体<struct>
struct_msg.ParseFromString(msg1.value)  # msg1反序列化
struct_msg.ParseFromString(list(msg2.values())[0][0].value)  
# msg2结构比msg1处理复杂点 (数据会回填到struct_msg)
  • 1
  • 2
  • 3
  • 4
  • 5
5、反序列化后dict化
from google.protobuf.json_fo
    声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/空白诗007/article/detail/982108
    推荐阅读
    相关标签
      

    闽ICP备14008679号