赞
踩
AliLinux是基于CentOS的。
1、java 环境
2、mysql环境
3、kafka环境
4、flink环境
5、dinky环境
这些环境,本想直接dnf安装在宿主机上,思来想去,还是用docker方便学习,也方便统一管理和使用
sudo dnf update && sudo dnf upgrade
Docker 是一个广泛使用的容器化平台,提供了一系列的命令用于管理容器、镜像和容器化应用程序。以下是 Docker 的一些常用命令及其简要介绍: docker run: 创建并运行一个新的容器。可以指定所使用的镜像、容器名称、端口映射、环境变量等选项。 docker start: 启动已经被创建但处于停止状态的容器。 docker stop: 停止正在运行的容器。 docker restart: 重启容器。 docker rm: 删除一个或多个容器。 docker ps: 列出正在运行的容器。 docker images: 列出本地存在的镜像。 docker pull: 从远程仓库下载镜像。 docker push: 将本地的镜像推送到远程仓库。 docker exec: 在运行中的容器中执行命令。 docker logs: 查看容器的日志输出。 docker inspect: 获取容器或镜像的详细信息。 docker build: 根据 Dockerfile 构建一个镜像。 docker-compose: 使用 Compose 文件定义和管理多个容器的应用程序。 docker network: 管理 Docker 网络,如创建自定义网络、连接容器到网络等。 这只是 Docker 命令的一小部分,Docker 还提供了许多其他有用的命令和选项,用于管理容器、镜像、卷、网络和其他资源。你可以通过运行 docker --help 或者 docker [command] --help 来获取更详细的命令帮助文档。
sudo dnf config-manager --add-repo=https://download.docker.com/linux/centos/docker-ce.repo
需要注意不同的linux的版本,因为还有很多linux版本,比如redhat,fedora,centos等。
sudo dnf install docker-ce docker-ce-cli containerd.io
因此,运行sudo dnf install docker-ce docker-ce-cli containerd.io命令将会下载并安装Docker引擎、Docker命令行工具和containerd容器运行时,以便你可以在Linux系统上使用Docker来构建和管理容器化应用程序。
sudo systemctl start docker
docker --version
本人的环境
[root@iZbp1bvzo2rsslr2bubzlnZ ~]# docker --version
Docker version 25.0.3, build 4debf41
1、java 环境
2、mysql环境
3、kafka环境
4、flink环境
5、dinky环境
docker pull openjdk:11
docker run -d -it --name java-11 openjdk:11
命令解释
命令 docker run -d -it --name java-11 openjdk:11 的含义如下:
docker run: 运行一个新的容器。
-d: 在后台(detached)模式下运行容器,即容器在后台执行,不会阻塞终端。
-it: 分配一个伪终端(pseudo-TTY),并保持标准输入(stdin)打开。这允许你与容器进行交互。
--name java-11: 容器名称,这里是 "java-11"。
openjdk:11: 指定要运行的镜像,这里是 OpenJDK 11。
综合起来,该命令的作用是在后台运行一个基于 OpenJDK 11 的容器,并分配一个伪终端,使用户能够与容器进行交互。容器的名称被设置为 "java-11"。
docker exec -it java-11 /bin/bash
命令解释
docker exec docker的执行命令
-it: 分配一个伪终端(pseudo-TTY),并保持标准输入(stdin)打开。这允许你与容器进行交互。
-- java-11 :容器名称
/bin/bash : 进入命令
java -version
本机显示
root@d4e8342175d9:/# java -version
openjdk version "11.0.16" 2022-07-19
OpenJDK Runtime Environment 18.9 (build 11.0.16+8)
OpenJDK 64-Bit Server VM 18.9 (build 11.0.16+8, mixed mode, sharing)
docker pull mysql
docker run -d --name mysql -e MYSQL_ROOT_PASSWORD=123456 -p 3306:3306 mysql
docker ps
如果看到名为 “mysql” 的容器在运行,则表示 MySQL 容器已成功启动。
docker exec -it mysql mysql -uroot -p12345
命令解释
docker run -d --name mysql -e MYSQL_ROOT_PASSWORD=123456 -p 3306:3306 mysql
这个命令的意思是: docker run: 运行一个新的容器。 -d: 在后台(detached)模式下运行容器,即容器在后台执行,不会阻塞终端。 --name mysql: 为容器指定一个名称,这里是 "mysql"。 -e MYSQL_ROOT_PASSWORD=123456: 设置 MySQL 容器的环境变量,其中 123456 是你自己设置的 MySQL root 用户的密码。 -p 3306:3306: 将容器的 3306 端口映射到主机的 3306 端口,允许从主机上的应用程序连接到容器中运行的 MySQL 服务。 mysql: 指定要运行的镜像,这里是 MySQL 镜像。 综合起来,该命令的作用是在后台运行一个名为 "mysql-container" 的容器,使用提供的 MySQL 镜像。容器将暴露 3306 端口,允许从主机上的应用程序连接到 MySQL 服务。MySQL root 用户的密码将通过环境变量传递给容器。 这个命令将创建并运行一个 MySQL 容器,你可以使用 MySQL 客户端工具连接到容器中的 MySQL 服务,并使用设置的密码进行身份验证。
https://blog.csdn.net/m0_64210833/article/details/134199061
kafka依赖Zookeeper,当然也可以用内置的kraft。
安装前提条件
1.安装Zookeeper
1.1运行ZooKeeper容器
2.运行Kafka容器
2.1启动Kafka容器
3.验证
3.1进入Kafka容器
3.2查看容器状态
3.3查看容器日志
3.4重新启动容器
3.5创建测试主题
docker pull bitnami/zookeeper
使用以下命令运行一个ZooKeeper的Docker容器:
docker run -d --restart=always -e ALLOW_ANONYMOUS_LOGIN=yes --log-driver json-file --log-opt max-size=100m --log-opt max-file=2 --name zookeeper -p 2181:2181 -v /etc/localtime:/etc/localtime bitnami/zookeeper:latest
这个命令会启动一个名为“zookeeper”的Docker容器,并且映射它的2181端口到你的机器的2181端口。
接下来,你需要运行Kafka的Docker容器,并且配置它连接到你刚刚启动的ZooKeeper实例。如果你还没有Kafka的Docker镜像,你可以使用如下命令拉取:
docker pull bitnami/kafka
docker run -d --log-driver json-file --log-opt max-size=100m --log-opt max-file=2 --name kafka -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=150.158.119.225:2181/kafka -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://150.158.119.225:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -v /etc/localtime:/etc/localtime bitnami/kafka:latest
docker run: 这是Docker命令的起始点,用于创建和运行一个新的容器。 -d: 这个选项告诉Docker在后台(detached mode)运行容器,这样容器将在后台运行而不会占用当前终端。 --log-driver json-file: 这个选项指定了容器的日志驱动程序。在这种情况下,它使用json-file驱动程序,将容器的日志输出到文件中。 --log-opt max-size=100m: 这个选项设置日志文件的最大大小为100兆字节(MB)。当日志文件大小达到该限制时,Docker将创建一个新的日志文件。 --log-opt max-file=2: 这个选项设置日志文件的最大数量为2个。当达到最大数量时,Docker会循环使用这些日志文件,最早的日志文件会被覆盖。 --name kafka: 这个选项为容器指定一个名称"kafka",以便后续对容器进行引用。 -p 9092:9092: 这个选项将主机的端口9092映射到容器的端口9092。Kafka通常使用9092端口进行客户端连接。 -e KAFKA_BROKER_ID=0: 这个选项设置Kafka的Broker ID为0。每个Kafka Broker都应具有唯一的Broker ID。 -e KAFKA_ZOOKEEPER_CONNECT=150.158.119:2181/kafka: 这个选项设置Kafka连接到的Zookeeper的地址和端口。在这种情况下,Kafka将连接到地址为150.158.119的Zookeeper实例的2181端口,并使用"/kafka"作为Kafka在Zookeeper中的根目录。 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://150.158.119:9092: 这个选项设置Kafka的广告侦听器(advertised listeners)。它指定Kafka广告的监听器的地址和端口。在这种情况下,Kafka将使用地址为150.158.119的主机的9092端口作为广告的监听地址。 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092: 这个选项设置Kafka的监听器。它指定Kafka监听的地址和端口。在这种情况下,Kafka将监听所有可用的网络接口(0.0.0.0)上的9092端口。 -v /etc/localtime:/etc/localtime: 这个选项将主机系统的时区设置挂载到容器内的/etc/localtime文件,以确保容器和主机具有相同的时区设置。 bitnami/kafka:latest: 这是要使用的Docker镜像的名称和标签。在这种情况下,它使用Bitnami提供的Kafka镜像,并使用"latest"标签来获取最新的版本。 参考网址https://www.jianshu.com/p/26495e334613 其实kafka客户端访问kafka是分两步走: 第一步,不管什么方式,客户端只要能连接到KAFKA_LISTENERS标识的地址,成功完成必要的认证后,就可以得到一个brokers返回地址。 第二步,通过返回的brokers重新建立和kafka的连接,生成producer/consumer。这个返回的brokers就是KAFKA_ADVERTISED_LISTENERS的值。
注意,如果要连外网,需要修改
–env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092
localhost改为 当前公网ip 118.31.250.73 (当前ali买的服务器的的公网IP)
别用我的ip
docker exec -it kafka /bin/bash
cd /opt/bitnami/kafka/bin
注意:可能出现报错:Error response from daemon: Container 62b9c056c0aa9d6ba917690abae1c6fe16c750a96fe428cdaa43f4c692a146ca is not running
说明kafka并没有运行。
在Kafka容器中,运行以下命令创建一个测试主题:
./kafka-topics.sh --create --topic test-kafka --bootstrap-server localhost:9092
打开一个生产者,来输入测试主题的消息(也需要进入kafka容器哈):
./kafka-console-producer.sh --topic test-kafka --bootstrap-server localhost:9092
然后你可以输入一些消息(比如输入hello按下enter)。
在另一个终端窗口中,打开一个消费者来读取测试主题的消息(也需要进入kafka容器哈):
./kafka-console-consumer.sh --topic test-kafka --from-beginning --bootstrap-server localhost:9092
如果一切设置正确,你应该能在消费者终端中看到你在生产者终端输入的消息。
这就完成了使用Docker运行ZooKeeper和Kafka,并进行基本验证的过程。
在生产者页面输入测试内容:
{"id":1,"name":"arvin"}
待更新。
kafka-console-ui
https://github.com/xxd763795151/kafka-console-ui
docker pull wdkang/kafka-console-ui
docker run -d -p 7766:7766 -v $PWD/data:/app/data -v $PWD/log:/app/log wdkang/kafka-console-ui
http://外网ip:7766
http://150.158.119.225:7766
docker pull flink
docker network create flink-network
# 创建 JobManager
docker run \
-itd \
--name=jobmanager \
--publish 8081:8081 \
--network flink-network \
--env FLINK_PROPERTIES="jobmanager.rpc.address: jobmanager" \
flink:latest jobmanager
# 创建 TaskManager
docker run \
-itd \
--name=taskmanager \
--network flink-network \
--env FLINK_PROPERTIES="jobmanager.rpc.address: jobmanager" \
flink:latest taskmanager
http://localhost:8081/
访问 http://150.158.119.225/:8081/
默认的Slots num是1,我们可以修改为5:
修改的目录是jobmanager和taskmanager的/opt/flink/conf
的flink-conf.yaml
文件:
修改taskmanager.numberOfTaskSlots:
即可。
注意:默认的docker容器中没有vi/vim命令,可以使用docker cp命令,复制出来修改,然后在复制回去,如下:
docker cp taskmanager:/opt/flink/conf/flink-conf.yaml .
docker cp flink-conf.yaml taskmanager:/opt/flink/conf/
Docker安装kafka 3.5
并且通过python,简单写一个生产者
Python生产、消费Kafka
顾名思义,用于连接flinksql和kafka。
进入flink
docker exec -it jobmanager /bin/bash
进入 flink的bin目录
cd /opt/flink/bin
查看flink版本:
flink --version
根据自己的flink版本,下载对应的 flink-sql-connector-kafka jar包
https://mvnrepository.com/artifact/org.apache.flink/flink-sql-connector-kafka
因为我是1.18.0,所以选择下图的版本包:
将下载的jar包,分别在jobmanager,taskmanager /opt/flink/lib
目录下,注意,是两个都要放,如下图:
可以使用docker cp test.txt jobmanager:/opt/flink/lib
命令,用户宿主机和docker容器文件传输。把test.txt换成对应的jar包即可
docker cp test.txt jobmanager:/opt/flink/lib
docker cp test.txt taskmanager:/opt/flink/lib
java结合日志
kafka.send("GatewayLog", JSONUtil.toJsonStr(gatewayLog));
GatewayLog是topic
yaml的服务配置
spring:
kafka:
bootstrap-servers: "10.10.10.155:9092"
consumer:
group-id: "teleGatewayGroup"
我在本地生成了一条log,将使用flinksql处理这个数据。
进入jobmanager中,执行
cd /opt/flink/bin
sql-client.sh
Flink SQL执行以下语句:
CREATE TABLE GatewayLog ( platform VARCHAR, serviceId VARCHAR, targetServer VARCHAR, requestPath VARCHAR, requestMethod VARCHAR, schema VARCHAR, requestContentType VARCHAR, headers VARCHAR, requestBody VARCHAR, ip VARCHAR, startTime TIMESTAMP, endTime VARCHAR, executeTime VARCHAR, status VARCHAR, nickName VARCHAR, account VARCHAR, accountType VARCHAR, serviceName VARCHAR, orgCode VARCHAR ) WITH ( 'connector' = 'kafka', 'topic' = 'GatewayLog', 'properties.bootstrap.servers' = '150.158.119.225:9092', 'properties.group.id' = 'flinKGroup', 'scan.startup.mode' = 'earliest-offset', 'format' = 'json' ); select * from GatewayLog;
可以看到Flink在消费kafka数据,如下图:
中间缺少很多包。
flink-connector-kafka
https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka/3.1.0-1.18
依赖的kafka-clients
https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients/3.6.1
然后在Linux需要看权限问题。
chmod -R 777 /lib
把文件夹都改成777 所有人。
然后执行
sql最好先改成varchar,变成成功。
最后select * from table
执行成功。
dinky环境
不用管。
安装flink
CREATE CATALOG kfksc WITH ( 'type' = 'inmemory' ); USE CATALOG kfksc; CREATE DATABASE topic; USE topic; CREATE TABLE kfksc.topic.GatewayLog ( platform VARCHAR, serviceId VARCHAR, targetServer VARCHAR, requestPath VARCHAR, requestMethod VARCHAR, schema VARCHAR, requestContentType VARCHAR, headers VARCHAR, requestBody VARCHAR, ip VARCHAR, startTime TIMESTAMP, endTime TIMESTAMP, executeTime BIGINT, status INT, nickName VARCHAR, account VARCHAR, accountType VARCHAR, serviceName VARCHAR, orgCode VARCHAR ) WITH ( 'connector' = 'kafka', 'topic' = 'GatewayLog', 'properties.bootstrap.servers' = '150.158.119.225:9092', 'properties.group.id' = 'flinKGroup', 'scan.startup.mode' = 'earliest-offset', 'format' = 'json' ); show tables; select * from GatewayLog;
https://blog.csdn.net/qq_35515661/article/details/134312204
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。