当前位置:   article > 正文

Kafka 初体验_connect-file-3.5.2.jar

connect-file-3.5.2.jar

Kafka 快速入门

参考:Apache Kafka

在这里插入图片描述

抽不到卡芙卡麻麻我快s了啊啊啊啊,没有卡芙卡麻麻在身边,身上仿佛有蚂蚁在爬啊(阴暗)(扭曲)(爬行)(大口喘气)
受不了了,只能通过学习《Kafka 快速上手》 来平复下心情了……

1. 安装 Kafka

前置条件:Kafka 依赖于 Java 和 ZooKeeper执行,因此需要先安装这两者。

1)在Ubuntu上安装Java

Kafka要求安装Java 8+,这里安装OpenJDK 11

运行下面的命令,更新软件包索引,并且安装OpenJDK 11 JDK 软件包:

sudo apt update
sudo apt install openjdk-11-jdk
  • 1
  • 2

通过检查 Java 版本来验证:

java -version
  • 1

输出:

OpenJDK Runtime Environment (build 11.0.20+8-post-Ubuntu-1ubuntu122.04)
OpenJDK 64-Bit Server VM (build 11.0.20+8-post-Ubuntu-1ubuntu122.04, mixed mode, sharing)
  • 1
  • 2

此时,说明已经成功安装好了 Java。

补充:

  • 设置默认版本

如果在 Ubuntu 系统上安装了多个 Java 版本,可以输入下面的命令,检测哪个版本被设置成了默认值:

java -version
  • 1

想要修改默认的版本,使用update-alternatives命令:

sudo update-alternatives --config java
  • 1

输出像下面这样:

有 2 个候选项可用于替换 java (提供 /usr/bin/java)。

  选择       路径                                          优先级  状态
------------------------------------------------------------
* 0            /usr/lib/jvm/java-11-openjdk-amd64/bin/java      1111      自动模式
  1            /usr/lib/jvm/java-11-openjdk-amd64/bin/java      1111      手动模式
  2            /usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java   1081      手动模式

要维持当前值[*]请按<回车键>,或者键入选择的编号:
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

所有已经安装的 Java 版本将会列出来。输入想要设置为默认值的序号,并且按"Enter”即可。

注意

kafka在运行脚本时,需要识别 JAVA_HOME环境变量。因此这里需要进行配置。

首先使用update-alternatives找到 Java 安装路径:

sudo update-alternatives --config java
  • 1

如上可见,OpenJDK 11安装在/usr/lib/jvm/java-11-openjdk-amd64/bin/java

打开/etc/environment文件:

sudo vim /etc/environment
  • 1

在文件的末尾添加:

JAVA_HOME="/usr/lib/jvm/java-11-openjdk-amd64"
  • 1

注意不是 /usr/lib/jvm/java-11-openjdk-amd64/bin/java

想要让修改在当前 shell 生效,可以登出系统,再登入系统,或者运行下面的命令:

source /etc/environment
  • 1

最好重启一下,否则只在当前终端生效

验证 JAVA_HOME 环境变量被正确设置:

echo $JAVA_HOME
  • 1

可以看到 Java 安装路径:

/usr/lib/jvm/java-11-openjdk-amd64
  • 1

2)安装ZooKeeper

下载网址:Apache ZooKeeper

选择稳定版本:

Apache ZooKeeper 3.8.2 (latest stable release):Apache ZooKeeper 3.8.2(asc, sha512)

解压:

tar -zxf apache-zookeeper-3.8.2-bin.tar.gz
  • 1

3)安装Kafka

下载最新版本的Kafka并解压

https://www.apache.org/dyn/closer.cgi?path=/kafka/3.5.0/kafka_2.13-3.5.0.tgz

tar -xzf kafka_2.13-3.5.0.tgz
  • 1

2. 启动 Kafka 环境

首先需要启动ZooKeeper服务。在启动ZooKeeper服务前,需要有一个配置文件,如下,在conf/下创建zoo.cfg文件:

tickTime=2000
dataDir=/home/robin/Project/zookeeper/zookeeper
clientPort=2181
  • 1
  • 2
  • 3

该文件可以命名为任何名称,这里将其命名为conf/zoo.cfg。设定dataDir 的值以指定目录(需为空目录)。以下是每个字段的含义:

  • tickTime :ZooKeeper 使用的基本时间单位(以毫秒为单位)。它用于进行心跳,最小会话超时将是tickTime 的两倍。
  • dataDir :存储内存数据库快照的位置,除非另有指定,否则存储数据库更新的事务日志。
  • clientPort : 监听客户端连接的端口

启动ZooKeeper服务:

$ cd apache-zookeeper-3.8.2-bin/
$ bin/zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /home/robin/Project/apache-zookeeper-3.8.2-bin/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
  • 1
  • 2
  • 3
  • 4
  • 5

注:这里最开始是启动失败的。

通过查看logs中的out文件,可以看到提示类似这样的错误(图源网络,自己的忘记截图了)

在这里插入图片描述

即端口8080被占用。

通过文章安装Zookeeper出现Unable to start AdminServer,existing abnormally问题解决方法_星星点灯_hard的博客-CSDN博客中的解决方法,在zoo.cfg中添加一条配置语句

admin.serverPort=8001
  • 1

避免与8080端口冲突。之后再重新启动即成功。

打开另一个终端并执行以下语句以开启kafka broker 服务

$ bin/kafka-server-start.sh config/server.properties 
  • 1

会打印很多INFO 信息。

一旦所有的服务都成功启动,就有了一个基本的Kafka环境,可以运行并准备使用了。

3. 创建主题以存储事件

在编写第一个事件(即消息)之前,必须创建一个主题(topic)。

打开另一个终端,执行:

$ cd Project/kafka_2.13-3.5.0/
$ bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092
Created topic quickstart-events.
  • 1
  • 2
  • 3

所有Kafka的命令行工具都有额外的选项:运行Kafka -topics.sh命令,不带任何参数来显示使用信息。

例如,还可以显示新主题的分区计数等详细信息:

$ bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092
Topic: quickstart-events	TopicId: c4dKHqF9SNCnFT_0WP5G_Q	PartitionCount: 1	ReplicationFactor: 1	Configs: 
	Topic: quickstart-events	Partition: 0	Leader: 0	Replicas: 0	Isr: 0
  • 1
  • 2
  • 3

4. 将事件写入主题中

Kafka客户端通过网络与Kafka brokers(代理)通信,以写(或读)事件。一旦收到,代理将以持久和容错的方式存储事件。

运行console producer客户端,将一些事件写入主题。默认情况下,输入的每一行都将导致将一个单独的事件写入主题。

$ bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092
>This is my first event
>This is my second event
  • 1
  • 2
  • 3

可以随时使用Ctrl-C停止生产者客户端。

5. 读取事件

打开另一个终端会话并运行console consumer客户端以读取刚刚创建的事件:

$ bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
This is my first event
This is my second event
  • 1
  • 2
  • 3

可以随时使用Ctrl-C停止消费者客户端。

在上一个终端即生产者客户端继续输入

> This is my third event
  • 1

可在消费者客户端同步看到对应消息。

注意事件被持久地存储在Kafka中,它们可以被你想要的尽可能多的消费者读取多次。您可以很容易地通过打开另一个终端会话并再次重新运行前面的命令来验证这一点。

6. 使用 Kafka Connect 将数据作为事件流导入/导出

您可能在关系数据库或传统消息传递系统等现有系统中拥有大量数据,以及有已经使用这些系统的许多应用程序。Kafka Connect 允许您不断地将数据从外部系统摄取到 Kafka 中,反之亦然。它是一个运行 Connectors 的可扩展工具,它实现与外部系统交互的自定义逻辑。因此,将现有系统与 Kafka 集成非常容易。为了使这个过程更加容易,有数百个这样的连接器可供使用。

下面使用简单的连接器运行 Kafka Connect,将数据从文件导入到 Kafka 主题,并将数据从 Kafka 主题导出到文件。

首先,确保将connect-file-3.5.0.jar添加到Connect worker配置中的plugin.path属性。即在 connect-standalone.properties文件中添加(或修改)plugin.path属性

plugin.path=libs/connect-file-3.5.0.jar
  • 1

在本示例中指定连接器包时使用一个相对路径(相对于kafka root目录),并将连接器的包视为一个uber jar。然而,值得注意的是,对于生产部署,使用绝对路径总是可取的。

其中connect-file-3.5.0.jar是在libs文件夹中的jar包

$ ls libs | grep connect-file
connect-file-3.5.0.jar
  • 1
  • 2

然后,首先创建一些种子数据进行测试:在kafka root目录(即kafka_2.13-3.5.0/下执行以下语句:

echo -e "foo\nbar" > test.txt
  • 1

则会在此目录下创建test.txt文件,其内容为:

foo
bar
  • 1
  • 2

接下来,启动以独立模式运行的两个连接器,这意味着它们在单个本地专用进程中运行。

$ bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
  • 1

其中,提供了三个配置文件作为参数。第一个往往是Kafka Connect进程的配置,包含常见的配置,比如Kafka要连接的代理和数据的序列化格式。其余的配置文件分别指定要创建的连接器。这些文件包括唯一的连接器名称、要实例化的连接器类以及连接器所需的任何其他配置。

在connect-file-source.properties中指定了以下属性:

name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=test.txt
topic=connect-test
  • 1
  • 2
  • 3
  • 4
  • 5

可见其指定了读取文件为kafka root目录下的test.txt(也是相对路径),从该文件读取数据到kafka

而在connect-file-sink.properties中指定了以下属性:

name=local-file-sink
connector.class=FileStreamSink
tasks.max=1
file=test.sink.txt
topics=connect-test
  • 1
  • 2
  • 3
  • 4
  • 5

可见其指定了从kafka读取数据并输出到test.sink.txt文件中

且两者都指定了topic为connect-test

执行以上语句后,kafka会使用 默认本地集群配置 (应该是指bin/kafka-server-start.sh config/server.properties 中的配置文件,即要先启动kafka服务才可以)并创建两个连接器:第一个是源连接器,它从输入文件中读取行并将每一行数据生成(produce)到Kafka topic(这里是connect-test,会自动被创建),第二个是接收连接器,它从Kafka topic中读取消息并将每个消息作为输出文件中的一行生成。

查看kafka root目录下生成的test.sink.txt文件,可看到与 test.txt文件一样的内容:

$ cat test.sink.txt 
foo
bar
  • 1
  • 2
  • 3

注意数据会被保存在 Kafka topic connect-test中,因此可以运行一个 console consumer来查看数据:

$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
{"schema":{"type":"string","optional":false},"payload":"foo"}
{"schema":{"type":"string","optional":false},"payload":"bar"}
  • 1
  • 2
  • 3

连接器会持续处理数据,因此可以将数据添加到文件中,并看到它在管道中移动:

echo Another line>> test.txt
  • 1

可以看到这一行出现在console consumer输出和接收文件中。

{"schema":{"type":"string","optional":false},"payload":"Another line"}
  • 1

7. 用 Kafka Streams 处理事件

暂略

8. 终止 Kafka 环境

使用Ctrl-C停止终端中还在执行的任务,如produce和consumer客户端,Kafka broker和ZooKeeper服务等。

如果想删除本地Kafka环境的任何数据,包括在以上过程中创建的任何事件,运行该命令:

$ rm -rf /tmp/kafka-logs /home/robin/Project/zookeeper/zookeeper
  • 1

Others

To learn more, we suggest the following next steps:

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/笔触狂放9/article/detail/670985
推荐阅读
相关标签
  

闽ICP备14008679号