当前位置:   article > 正文

(三)kafka从入门到精通之使用场景_kafka什么用处

kafka什么用处

1、kafka简介

Kafka 是一种流处理平台,主要用于处理大量数据流,如实时事件、日志文件和传感器数据等。Kafka的目的是实现高吞吐量、低延迟和高可用性的数据处理。

Kafka提供了一个高度可扩展的架构,可以轻松地添加和删除节点,并且能够处理数百亿条消息/分区。Kafka的消息可以容错,即使某个节点失败,消息也会在集群中的其他节点上得到处理。

2、Kafka 的使用场景包括:

实时数据处理:

Kafka 非常适合处理实时事件,例如实时交易、实时搜索结果和实时推文等。Kafka 可以将数据快速地发布和订阅,从而实现实时处理。

日志文件处理:

Kafka 可以处理大量的日志文件,例如 Web 服务器日志、数据库日志和操作系统日志等。Kafka可以将日志文件的数据快速地发布和订阅,并且提供了多种聚合和分析日志数据的方法,例如使用 Apache Storm 或 ApacheFlink。

传感器数据处理:

Kafka 可以处理来自传感器的数据,例如温度、湿度和气压等传感器数据。Kafka可以将传感器数据快速地发布和订阅,并且可以将数据发送到分布式处理系统,例如 Apache Hadoop 或 ApacheSpark,进行处理。

流处理:

Kafka 可以作为流处理系统,例如 Apache Nifi 或 Apache Beam 的底层存储系统。Kafka可以将数据流快速地发布和订阅,并且可以支持多种流处理模式,例如按时间排序、字段过滤和路由规则。

分布式消息队列:

Kafka 可以作为分布式消息队列,用于多个应用程序之间的数据传输。Kafka提供了多种消息传输模式,例如点对点模式、多主节点模式和发布/订阅模式。

数据备份:

Kafka 可以作为数据备份系统,用于备份数据到多个节点上。Kafka 可以将数据发布到多个主题中,从而实现数据备份。

3、简单使用

Kafka 的使用非常简单,可以使用多种语言来编写客户端库和消费者。以下是使用 Java 客户端库的示例:

首先,您需要在项目中添加 Kafka 依赖项:

<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-clients</artifactId>
  <version>2.8.0</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5

然后,您需要编写一个生产者,以将消息发布到指定的主题中:

package com.yinfeng.test.demo.kafka;

import lombok.SneakyThrows;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

/**
 * @author admin
 * @date 2023/7/2 19:02
 * @description
 */
public class KafkaProducerDemo {
    @SneakyThrows
    public static void main(String[] args) {
        Properties props = new Properties();
        // Kafka 集群地址
        props.put("bootstrap.servers", "localhost:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<>(props);

        // 发送3条消息
        for (int i = 0; i < 3; i++) {
            ProducerRecord<String, String> record1 = new ProducerRecord<>("test", "key"+i, "hello"+i);
            producer.send(record1, (metadata, exception) -> {
                System.out.println("消息发送成功 topic="+metadata.topic()+", msg=>" + record1.value());
            });
        }

        // kafka异步发送,延时等待执行完成
        Thread.sleep(5000);

    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44

执行之后可在控制台看到3条消息已发送
在这里插入图片描述

最后,您需要编写一个消费者,以从指定的主题中接收消息:

package com.yinfeng.test.demo.kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

/**
 * @author admin
 * @date 2023/7/2 19:02
 * @description
 */
public class KafkaConsumerDemo {
    public static void main(String[] args) {
        Properties props = new Properties();
        // Kafka 集群地址
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "my_group");
        props.put("auto.offset.reset", "earliest");
        props.put("enable.auto.commit", "true");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props, new StringDeserializer(), new StringDeserializer());

        consumer.subscribe(Collections.singleton("test"));

        // 循环拉取消息
        while (true){
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
            for (ConsumerRecord<String, String> record : records) {
                System.out.println("Received message: " + record.value());
            }
        }

    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42

在这里插入图片描述

以上示例只是一个简单的例子,Kafka 还提供了更多的功能和配置选项,例如订阅多个主题、设置消息过滤器和消息压缩等。

4、总结

总的来说,Kafka 是一个非常强大的数据处理平台,可以用于实时数据处理、日志文件处理、传感器数据处理和流处理等场景。其使用简单、功能丰富,并且可以扩展到数百亿条消息/分区,适用于各种大规模的数据处理场景。

使用 Kafka 需要一定的学习和配置,但是一旦您熟悉了其使用方法,Kafka 将会成为您的一个得力工具,可以提高您的工作效率并为您的业务增添价值。 如果您对使用 Kafka 有疑问或遇到问题,可以查看 Kafka 的官方文档和社区,或者使用 Kafka 提供的丰富的客户端库和文档,包括 Apache Kafka、Apache Confluent Kafka 和 Confluent Kubernetes 等。

总之,Kafka 是一种高性能、可扩展、易于使用的数据处理平台,可以广泛应用于实时数据处理、日志文件处理、传感器数据处理和流处理等领域。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/菜鸟追梦旅行/article/detail/583754
推荐阅读
相关标签
  

闽ICP备14008679号