当前位置:   article > 正文

confluent-kafka消费者_from confluent_kafka import consumer

from confluent_kafka import consumer
from confluent_kafka import Consumer

class Customer:
    def __init__(self) -> None:
        self.consumer = Consumer({'bootstrap.servers': 'localhost:9092','group.id':"customertest",
        'enable.auto.commit': False,
        "heartbeat.interval.ms":3000,
        # 'session.timeout.ms': 30000,
        # 'max.poll.interval.ms':30000,
        # 'auto.offset.reset': 'latest'}
        'auto.offset.reset': 'earliest',
        'message.max.bytes': 8388608,
        'compression.type': 'gzip'})
        self.consumer.subscribe(['filelack'])

    def getMessage(self):
        try:
            while 1:
                part = self.consumer.assignment()
                msg = self.consumer.poll(1)
                if msg is None:
                    continue
                else:
                    if not msg.error() is None:
                        print (msg.error())
                    else:
                        message = msg.value()
                        print(msg.partition(), msg.offset())
        except Exception as e:
            ...
        finally:
            self.close()


    def close(self):
        self.consumer.close()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36

https://docs.confluent.io/clients-confluent-kafka-python/current/overview.html#id1

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/从前慢现在也慢/article/detail/543440
推荐阅读
相关标签
  

闽ICP备14008679号