当前位置:   article > 正文

Rust消费kafka

Rust消费kafka
  1. use futures::stream::StreamExt; // 引入 StreamExt 以使用 next() 方法
  2. use rdkafka::config::ClientConfig;
  3. use rdkafka::consumer::{CommitMode, Consumer, StreamConsumer};
  4. use rdkafka::error::KafkaResult;
  5. use rdkafka::message::{Message};
  6. async fn run_consumer() -> KafkaResult<()> {
  7. let consumer: StreamConsumer = ClientConfig::new()
  8. .set("group.id", "test_group")
  9. .set("bootstrap.servers", "localhost:9092")
  10. .set("enable.auto.commit", "true")
  11. .set("session.timeout.ms", "6000")
  12. .set("auto.offset.reset", "earliest")
  13. .create()
  14. .expect("Consumer creation failed");
  15. consumer.subscribe(&["test_topic"]).expect("Can't subscribe to specified topics");
  16. let mut message_stream = consumer.stream();
  17. while let Some(message) = message_stream.next().await {
  18. match message {
  19. Ok(m) => {
  20. match m.payload_view::<str>() {
  21. Some(Ok(payload)) => {
  22. println!("Key: '{:?}', Payload: '{}'", m.key(), payload);
  23. }
  24. Some(Err(e)) => {
  25. eprintln!("Error while deserializing message payload: {:?}", e);
  26. }
  27. None => {
  28. println!("Key: '{:?}', Payload: <empty>", m.key());
  29. }
  30. }
  31. consumer.commit_message(&m, CommitMode::Async)?;
  32. }
  33. Err(e) => eprintln!("Kafka error: {}", e),
  34. }
  35. }
  36. Ok(())
  37. }
  38. fn main() {
  39. let runtime = tokio::runtime::Runtime::new().unwrap();
  40. runtime.block_on(run_consumer()).unwrap();
  41. }

[dependencies]

rdkafka = "0.36.2"

tokio = { version = "1.36.0", features = ["full"] }

futures = "0.3.30"

[target.x86_64-unknown-linux-musl]

linker = "x86_64-linux-musl-gcc"

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/繁依Fanyi0/article/detail/78472
推荐阅读
相关标签
  

闽ICP备14008679号