当前位置:   article > 正文

kafka生产者异步发送、同步发送、回调异步发送,是什么情况?_kafka同步发送和异步发送

kafka同步发送和异步发送

Kafka是一种分布式流处理平台,它是一种高吞吐量、可扩展、可持久化的消息队列系统,用于处理和存储实时流式数据。

Kafka基于发布-订阅模式,采用了分布式、多副本、分区的架构。它允许生产者将数据以消息的形式发送到Kafka集群的一个或多个主题(topic)中,而消费者可以从这些主题中订阅并获取数据。每个主题可以被分为多个分区(partition),每个分区可以在多个服务器上进行副本(replica)以实现高可用性和容错性。

Kafka具有以下几个主要特点

  • 高吞吐量:Kafka采用了顺序磁盘写入和批量发送等优化策略,能够支持大规模数据的高吞吐量处理。
  • 可扩展性:Kafka的分布式架构和分区机制使得它能够轻松地水平扩展,以适应日益增长的数据流量。
  • 持久性:Kafka将消息持久化存储在磁盘上,保证数据不会丢失。
  • 容错性:Kafka通过将数据分散到多个服务器上的多个副本来实现容错,即使某个节点故障,仍然能够继续正常运行。
  • 实时处理:Kafka支持实时处理和流式计算,能够将大规模数据流实时传输给不同的数据处理系统。

Kafka在很多场景中都有广泛应用,特别是在大数据领域和实时数据处理方面,如日志收集、数据管道、实时流处理、在线分析等。

在这里插入图片描述

一、异步发送

Kafka的生产者异步发送指的是在发送消息到Kafka集群时,并不等待服务器的响应,而是继续发送下一个消息。这样可以提高发送消息的吞吐量。以下是使用Java编写的Kafka生产者异步发送的示例代码:

import org.apache.kafka.clients.producer.*;

import java.util.Properties;

public class KafkaProducerAsyncExample {

    public static void main(String[] args) {
        // 设置Kafka生产者的配置属性
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:9092");
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        // 创建Kafka生产者
        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

        // 发送多条消息
        for (int i = 0; i < 10; i++) {
            // 创建消息对象
            ProducerRecord<String, String> record = new ProducerRecord<>("topicName", "key" + i, "value" + i);

            // 异步发送消息
            producer.send(record, new Callback() {
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (exception != null) {
                        System.err.println("发送消息失败:" + exception.getMessage());
                    } else {
                        System.out.println("消息发送成功,topic:" + metadata.topic() +
                                ",partition:" + metadata.partition() + ",offset:" + metadata.offset());
                    }
                }
            });
        }

        // 关闭Kafka生产者
        producer.close();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38

以上代码创建了一个Kafka生产者,并通过异步发送方法send()发送多条消息到指定的topic。在回调函数onCompletion()中处理发送结果。

在这里插入图片描述

二、同步发送

Kafka的生产者同步发送指的是在发送消息到Kafka集群后,等待服务器的响应并确认消息是否发送成功,然后再继续发送下一个消息。以下是使用Java编写的Kafka生产者同步发送的示例代码:

import org.apache.kafka.clients.producer.*;
import java.util.Properties;

public class KafkaProducerSyncExample {
    public static void main(String[] args) {
        // 设置Kafka生产者的配置属性
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:9092");
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        // 创建Kafka生产者
        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

        // 发送多条消息
        for (int i = 0; i < 10; i++) {
            // 创建消息对象
            ProducerRecord<String, String> record = new ProducerRecord<>("topicName", "key" + i, "value" + i);
            try {
                // 同步发送消息
                RecordMetadata metadata = producer.send(record).get();
                System.out.println("消息发送成功, topic:" + metadata.topic() + ", partition:" + metadata.partition() + ", offset:" + metadata.offset());
            } catch (Exception e) {
                System.err.println("发送消息失败:" + e.getMessage());
            }
        }

        // 关闭Kafka生产者
        producer.close();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31

以上代码创建了一个Kafka生产者,并通过同步发送方法send()发送多条消息到指定的topic。使用.get()方法等待服务器的确认响应。在捕获异常时处理发送失败的情况。

在这里插入图片描述

三、回调异步发送

Kafka的生产者回调异步发送是指在发送消息到Kafka集群后,通过回调函数来处理发送结果。回调函数会在消息成功发送或发送失败时被调用。以下是使用Java编写的Kafka生产者回调异步发送的示例代码:

import org.apache.kafka.clients.producer.*;
import java.util.Properties;

public class KafkaProducerCallbackExample {
    public static void main(String[] args) {
        // 设置Kafka生产者的配置属性
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:9092");
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        // 创建Kafka生产者
        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

        // 发送多条消息
        for (int i = 0; i < 10; i++) {
            // 创建消息对象
            ProducerRecord<String, String> record = new ProducerRecord<>("topicName", "key" + i, "value" + i);

            // 异步发送消息,通过回调函数处理发送结果
            producer.send(record, new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (exception != null) {
                        System.err.println("发送消息失败:" + exception.getMessage());
                    } else {
                        System.out.println("消息发送成功,topic:" + metadata.topic() + ", partition:" + metadata.partition() + ", offset:" + metadata.offset());
                    }
                }
            });
        }

        // 关闭Kafka生产者
        producer.close();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36

以上代码创建了一个Kafka生产者,并通过回调函数处理异步发送的消息。在回调函数onCompletion()中处理发送结果。如果发送成功,打印出消息发送的相关信息;如果发送失败,打印出发送失败的原因。

本文内容由网友自发贡献,转载请注明出处:https://www.wpsshop.cn/w/我家自动化/article/detail/839417
推荐阅读
相关标签
  

闽ICP备14008679号