赞
踩
- use futures::stream::StreamExt; // 引入 StreamExt 以使用 next() 方法
- use rdkafka::config::ClientConfig;
- use rdkafka::consumer::{CommitMode, Consumer, StreamConsumer};
- use rdkafka::error::KafkaResult;
- use rdkafka::message::{Message};
-
- async fn run_consumer() -> KafkaResult<()> {
- let consumer: StreamConsumer = ClientConfig::new()
- .set("group.id", "test_group")
- .set("bootstrap.servers", "localhost:9092")
- .set("enable.auto.commit", "true")
- .set("session.timeout.ms", "6000")
- .set("auto.offset.reset", "earliest")
- .create()
- .expect("Consumer creation failed");
-
- consumer.subscribe(&["test_topic"]).expect("Can't subscribe to specified topics");
-
- let mut message_stream = consumer.stream();
-
- while let Some(message) = message_stream.next().await {
- match message {
- Ok(m) => {
- match m.payload_view::<str>() {
- Some(Ok(payload)) => {
- println!("Key: '{:?}', Payload: '{}'", m.key(), payload);
- }
- Some(Err(e)) => {
- eprintln!("Error while deserializing message payload: {:?}", e);
- }
- None => {
- println!("Key: '{:?}', Payload: <empty>", m.key());
- }
- }
- consumer.commit_message(&m, CommitMode::Async)?;
- }
- Err(e) => eprintln!("Kafka error: {}", e),
- }
- }
- Ok(())
- }
-
- fn main() {
- let runtime = tokio::runtime::Runtime::new().unwrap();
- runtime.block_on(run_consumer()).unwrap();
- }
[dependencies]
rdkafka = "0.36.2"
tokio = { version = "1.36.0", features = ["full"] }
futures = "0.3.30"
[target.x86_64-unknown-linux-musl]
linker = "x86_64-linux-musl-gcc"
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。