当前位置:   article > 正文

kafka总结_卡夫卡写总结

卡夫卡写总结

1.介绍

Kafka:是一个消息队列,
流平台有三个关键功能:
发布和订阅记录流,类似于消息队列或企业消息传递系统。
以容错、持久的方式存储记录流。
当记录发生时,处理记录流。
Kafka通常用于两大类应用:
构建可靠地在系统或应用程序之间获取数据的实时流数据管道。
构建转换数据流或对数据流作出反应的实时流应用程序。
概念:
Kafka作为集群运行在一个或多个服务器上,可以跨越多个数据中心。
Kafka集群存储的流记录在类别中称为topic.
每个记录由一个键、一个值和一个时间戳组成。
Kafka有五个核心API:
生产者API允许应用程序将记录流发布到一个或多个Kafka主题。
消费者API允许应用程序订阅一个或多个主题并处理向其生成的记录流。
流API允许应用程序充当流处理器,从一个或多个主题消耗输入流,并产生输出流到一个或多个输出主题,从而有效地将输入流转换为输出流。
连接器API允许构建和运行可重用的生产者或使用者,将Kafka主题连接到现有的应用程序或数据系统。例如,连接到关系数据库的连接器可能捕获对表的每一项更改。
adminApi:管理员的API;
在这里插入图片描述

2. Why要学习kafka

火车排队买票,有以下几种情况
为什么要排队;那是因为卖票窗口的速度太慢,同时买的票人又多;(去买票的时候不用排队)
买票的人排队有一个最长的时间限制;(最多等一个小时,不买了)
在一个小时以内,我接受买票成功
永远等

在这里插入图片描述

2.1 主题和日志

主题是将记录发布到的类别或提要名称。Kafka中的主题总是多订阅者;也就是说,一个topic可以有零、一个或多个订阅写入它的数据的使用者。(kafka有多个生产者和消费者)
每个分区都有偏移量(offset)
kafka里面的记录默认保留两天,超过两天,就会删除,(可以配置)
日志中的分区有几种用途。
它们允许日志扩展到适合于单个服务器的大小之外。每个单独的分区必须适合承载它的服务器,但是一个主题可能有许多分区,因此它可以处理任意数量的数据。
它们充当并行性的单位-更详细地介绍一下
每个分区有一个服务器充当“领导者”,零个或多个服务器充当“追随者”。领导者处理分区的所有读写请求,而跟随者则被动地复制领导。如果领导失败,其中一个追随者将自动成为新的领导。每个服务器都充当一些分区的领导者和其他分区的跟随者,因此负载在集群中是很好的平衡。
在这里插入图片描述

2.2 生产者

在这里插入图片描述

2.3 消费者:

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

2.4.卡夫卡作为一种信息传递系统

在这里插入图片描述

2.5 卡夫卡作为一个存储系统

在这里插入图片描述

2.6.用于流处理的Kafka

在这里插入图片描述

2.7.把碎片放在一起

在这里插入图片描述

3.实战

3.1单机版

因为kafka自带zookeeper,所以不用启动
随机选择一台服务器把kafka传上去:(以node2为例)
解压

tar -xzf kafka_2.12-2.5.0.tgz
  • 1

重命名

mv kafka_2.12-2.5.0 kafka
  • 1

删除tar包

rm -rf kafka_2.12-2.5.0.tgz
  • 1

启动:

bin/zookeeper-server-start.sh config/zookeeper.properties
  • 1

在这里插入图片描述

后台启动:

nohup bin/zookeeper-server-start.sh config/zookeeper.properties  &
  • 1

创建一个主题:

bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic kafka-hw
  • 1

在这里插入图片描述

查看主题列表:

 bin/kafka-topics.sh --list --bootstrap-server localhost:9092
  • 1

在这里插入图片描述

描述主题

bin/kafka-topics.sh --describe --bootstrap-server localhost:9092  --topic kafka-hw
  • 1

在这里插入图片描述
创建一个生产者:

bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic kafka-hw
  • 1

创建一个消费者:
1.默认只有我启动的之后的数据才可以读到

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic kafka-hw
  • 1

2.从头开始读

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic kafka-hw  --from-beginning
  • 1

在这里插入图片描述

3.2高可用

先启动本地zookeeper
在四台机器上分别都装上kafka(把node2上的kafka复制到三台电脑上)
在这里插入图片描述
然后在四台服务器上要修改配置文件:(config/server.properties)

每一台kafa的broderid都不能一样node1–node4对应1–4

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=1
  • 1
  • 2

日志的目录千万不要写到tmp下面;

# A comma separated list of directories under which to store log files
log.dirs=/data/kafka/data/kafka-logs
  • 1
  • 2

zookeepr;要配置上自己的zookeeper集群

zookeeper.connect=node1:2181,node2:2181,node3:2181
  • 1

在这里插入图片描述
在四台电脑上同时启动4个kafka
nohup bin/kafka-server-start.sh config/server.properties &
在这里插入图片描述
在这里插入图片描述
查看日志:

more nohup.out
  • 1

在这里插入图片描述
创建一个主题

bin/kafka-topics.sh --create --zookeeper node1:2181,node2:2181,node3:2181 --replication-factor 1 --partitions 1 --topic test
  • 1

在这里插入图片描述
查看主题

bin/kafka-topics.sh --list --zookeeper node-1:2181,node-2:2181,node-3:2181
  • 1
bin/kafka-topics.sh --list --bootstrap-server node-1:9092,node-2:9092,node-3:9092,node-4:9092
  • 1

发送一些消息(生产者)

bin/kafka-console-producer.sh --broker-list node1:9092,node2:9092,node3:9092,node4:9092 --topic test-clus
  • 1

在启动一个消费者

bin/kafka-console-consumer.sh --bootstrap-server node7-1:9092,node7-2:9092,node7-3:9092,node7-4:9092 --topic test
  • 1

从头开始读

bin/kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092,node3:9092,node4:9092 --topic test-clus --from-beginning
  • 1

描述主题:

bin/kafka-topics.sh --describe --zookeeper node1:2181,node2:2181,node3:2181 --topic test-clus
  • 1

删除一个topic

bin/kafka-topics.sh --delete --zookeeper node7-1:2181,node7-2:2181,node7-3:2181
  • 1

3.3 其他实战

创建主题—多分区多副本

bin/kafka-topics.sh --create --zookeeper node-1:2181,node-2:2181,node-3:2181 --replication-factor 3 --partitions 2 --topic multi-test
  • 1

描述主题

bin/kafka-topics.sh --describe --zookeeper node-1:2181,node-2:2181,node-3:2181 --topic multi-test 
  • 1

3.4 测试高可用

创建的主题一定得是多个分区,多个副本数
描述主题:

bin/kafka-topics.sh --describe --zookeeper node-1:2181,node-2:2181,node-3:2181 --topic multi-test 
  • 1

启动生产者

bin/kafka-console-producer.sh --broker-list node-1:9092,node-2:9092,node-3:9092,node-4:9092 --topic multi-test
  • 1

启动消费者

bin/kafka-console-consumer.sh --bootstrap-server node-1:9092,node-2:9092,node-3:9092,node-4:9092 --topic multi-test --from-beginning
  • 1

杀死领导者,leader会变成另外一个,不影响我们的使用
在这里插入图片描述

3.5 Kafka的导入导出操作

输出一个字符串到文件中;>(覆盖)>>(追加)

echo -e 'aaaaaaaaaaaaaa\nbbbbbbbbbbbbbb' > ~/kafka.txt
  • 1

在这里插入图片描述
配置文件:(config/connect-file-source.properties)

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=test.txt
topic=connect-test
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

配置文件:(config/connect-file-sink.properties)

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

name=local-file-sink
connector.class=FileStreamSink
tasks.max=1
file=test.sink.txt
topics=connect-test
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

配置文件:(connect-standalone.properties)

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# These are defaults. This file just demonstrates how to override some settings.
bootstrap.servers=localhost:9092

# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
# need to configure these based on the format they want their data in when loaded from or stored into Kafka
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply
# it to
key.converter.schemas.enable=true
value.converter.schemas.enable=true

offset.storage.file.filename=/tmp/connect.offsets
# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000

# Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins
# (connectors, converters, transformations). The list should consist of top level directories that include 
# any combination of: 
# a) directories immediately containing jars with plugins and their dependencies
# b) uber-jars with plugins and their dependencies
# c) directories immediately containing the package directory structure of classes of plugins and their dependencies
# Note: symlinks will be followed to discover dependencies or plugins.
# Examples: 
# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
#plugin.path=

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42

启动命令

bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
  • 1

在这里插入图片描述
注意:由于配置文件中的文件名和路径已经写死了,要把文件改修改为一样的在这里插入图片描述
目的:数据都是来自生产者(sh文件);变成了一个文本文件;kafka中的数据导出为一个文件;(导入和导出)

3.6 代码连接kafka

创建主题:

 bin/kafka-topics.sh --create \
    --bootstrap-server node-1:9092,node-2:9092,node-3:9092,node-4:9092 \
    --replication-factor 3 \
    --partitions 2 \
    --topic streams-plaintext-input
  • 1
  • 2
  • 3
  • 4
  • 5

查看主题

bin/kafka-topics.sh --list --zookeeper node-1:2181,node-2:2181,node-3:2181
  • 1

描述主题

bin/kafka-topics.sh --describe --zookeeper node-1:2181,node-2:2181,node-3:2181 --topic streams-plaintext-input
  • 1

启动示例程序

bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo
  • 1

在这里插入图片描述
启动一个生产者

bin/kafka-console-producer.sh --broker-list node-1:9092,node-2:9092,node-3:9092,node-4:9092 --topic streams-plaintext-input
  • 1

在这里插入图片描述
启动消费者

bin/kafka-console-consumer.sh --bootstrap-server node-1:9092,node-2:9092,node-3:9092,node-4:9092 \
    --topic streams-wordcount-output \
    --from-beginning \
    --formatter kafka.tools.DefaultMessageFormatter \
    --property print.key=true \
    --property print.value=true \
    --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
    --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

在这里插入图片描述
示例代码:

package org.apache.kafka.streams.examples.wordcount;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Produced;

import java.util.Arrays;
import java.util.Locale;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;

public final class WordCountDemo {

    public static void main(final String[] args) {
        final Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "node-1:9092,node-2:9092,node-3:9092,node-4:9092");
        props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

        // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
        // Note: To re-run the demo, you need to use the offset reset tool:
        // https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Application+Reset+Tool
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        final StreamsBuilder builder = new StreamsBuilder();

        final KStream<String, String> source = builder.stream("streams-plaintext-input");

        final KTable<String, Long> counts = source
            .flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" ")))
            .groupBy((key, value) -> value)
            .count();

        // need to override value serde to Long type
        counts.toStream().to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));

        final KafkaStreams streams = new KafkaStreams(builder.build(), props);
        final CountDownLatch latch = new CountDownLatch(1);

        // attach shutdown handler to catch control-c
        Runtime.getRuntime().addShutdownHook(new Thread("streams-wordcount-shutdown-hook") {
            @Override
            public void run() {
                streams.close();
                latch.countDown();
            }
        });

        try {
            streams.start();
            latch.await();
        } catch (final Throwable e) {
            System.exit(1);
        }
        System.exit(0);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64

在这里插入图片描述

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/寸_铁/article/detail/933064
推荐阅读
相关标签
  

闽ICP备14008679号