赞
踩
1:人脸识别系统被调失败,导致图片上传失败
2:延迟高,需要人脸识别系统处理完成后,再返回给客户端,即使用户并不需要立即知道结果
3:图片上传系统与人脸识别系统之间互相调用,需要做耦合
192.168.164.11 node1
192.168.164.12 node2
192.168.164.13 node3
#具体ip根据自己设置的机器ip一致
安装包存放的目录:/export/software
安装程序存放的目录:/export/servers
数据目录:/export/data
日志目录:/export/logs #虽然可能没有使用
创建各级目录命令:
mkdir -p /export/servers/
mkdir -p /export/software/
mkdir -p /export/data/
mkdir -p /export/logs/
#到如下目录
cd /home
# 使用rpm安装JDK
rpm -ivh jdk-8u261-linux-x64.rpm
# 默认的安装路径是/usr/java/jdk1.8.0_261-amd64
# 配置JAVA_HOME
vi /etc/profile
# 文件最后添加两行
export JAVA_HOME=/usr/java/jdk1.8.0_261-amd64
export PATH=$PATH:$JAVA_HOME/bin
# 退出vi,使配置生效
source /etc/profile
#然后输入命令进行测试:
java -version
# 解压到/opt目录 tar -zxf zookeeper-3.4.14.tar.gz -C /opt # -C 解压到指定目录里面,好像没有-c(小c的命令) # 配置 cd /opt/zookeeper-3.4.14/conf # 配置文件重命名后生效 cp zoo_sample.cfg zoo.cfg #编辑 vi zoo.cfg # 设置数据目录 dataDir=/var/lagou/zookeeper/data # 添加 server.1=node1:2881:3881 server.2=node2:2881:3881 server.3=node3:2881:3881 # 退出vim(vi) mkdir -p /var/lagou/zookeeper/data echo 1 > /var/lagou/zookeeper/data/myid #其他两台机器,分别是node2给2,node3给3 #他是覆盖写入的,也就是说,相当于里面的值全部都被单独的1覆盖了,所以文件里面的值就是1,自己可以进行测试 #可以这样理解:首先清空文件,然后输入1 # 配置环境变量 vi /etc/profile # 添加 export ZOOKEEPER_PREFIX=/opt/zookeeper-3.4.14 export PATH=$PATH:$ZOOKEEPER_PREFIX/bin export ZOO_LOG_DIR=/var/lagou/zookeeper/log # 退出vi,让配置生效 source /etc/profile
#到对应解压后的zookeeper-3.4.14文件里面的bin里面执行如下
#实际上因为设置了环境变量的原因(也是设置操作了对应bin里面的),所以可以直接在任意地方执行如下:
# 在三台Linux上启动Zookeeper
zkServer.sh start
# 在三台Linux上查看Zookeeper的状态
zkServer.sh status
#因为环境变量的原因,可以不用加上"./"了
#这里就与windows有一点不同,需要明确的指定才可当作命令,而windows却已命令为主
#使用 rz 命令将安装包上传至 /export/software
#切换目录上传安装包
cd /export/software
rz
# 选择对应安装包上传即可
#解压安装包到指定目录下
tar -zxvf kafka_2.11-1.0.0.tgz -C /export/servers/
cd /export/servers/
#重命名(因为名称太长了)
mv kafka_2.11-1.0.0 kafka
cd /export/servers/kafka/config/ vi server.properties #主要修改一下6个地方: broker.id 需要保证每一台kafka都有一个独立的broker log.dirs 数据存放的目录 zookeeper.connect zookeeper的连接地址信息 delete.topic.enable 是否可以直接删除topic host.name 主机的名称 #修改: listeners=PLAINTEXT://node1:9092 #这里记得修改成这样 #上面就是6个主要的地方 #broker.id 标识了kafka集群中一个唯一broker broker.id=0 #这里记得要修改,因为如果相同,那么后启动的会报错,即会导致启动失败 #好像可以设置负数 ,但只能是-1,否则也会报错,因为默认的就是-1 #但是操作时,默认却是从上一个id开始(删除数据文件,然后重新创建即可),没有默认为-1 #但是有些时候,却未必是-1,而是从1001开始,多次的启动,会加1,整个集群都是如此 #无论是否删除数据目录,因为是zookeeper操作的(大概是吧,可以百度查看) #注意即可 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 # 存放生产者生产的数据的目录,数据一般以topic的方式(该方式后面会说明,一般是"主题_分区id"作为名称)存放 log.dirs=/export/data/kafka #这里记得要修改 num.partitions=1 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 log.retention.hours=168 log.segment.bytes=1073741824 #1024*1024*1024=1073741824,即1GB,1073741824B=1048576KB=1024MB=1GB log.retention.check.interval.ms=300000 # zk的信息,代表操作zookeeper,否则不会操作,但通常规定需要指定,否则可能启动不了 zookeeper.connect=node1:2181,node2:2181,node3:2181 #这里记得要修改,这里的逗号位置并不需要死磕 #虽然以前死磕过,比如89章博客Spring Cloud和91章博客ES里面就说明过 #因为没有意义,通常没有说明的,是不能加上空格的,可能也可以,但我们统一认为不加空格,无论是否对错 #因为别人修改一下源代码,那么这里的解释可能就会不正确了,即逗号位置没有意义(统一认为不加空格) #大多数情况下也是如此,注意即可 zookeeper.connection.timeout.ms=6000 group.initial.rebalance.delay.ms=0 #下面两个需要添加 delete.topic.enable=true host.name=node1 #注意:如果不想修改,可以清空文件,复制这里的数据也可以 #虽然可能会有问题,但一般都没有,除非上面漏写或者写错了
cd /export/servers
scp -r kafka/ node2:$PWD #这里的-r代表可以操作目录,如果这里没有-r,则代表只能操作文件
#你自己去进行测试即可
scp -r kafka/ node3:$PWD
#ip为11的服务器:
broker.id=0
host.name=node1
listeners=PLAINTEXT://node1:9092
#ip为12的服务器:
broker.id=1
host.name=node2
listeners=PLAINTEXT://node2:9092
#ip为13的服务器:
broker.id=2
host.name=node3
listeners=PLAINTEXT://node3:9092
mkdir -p /export/data/kafka
cd /export/servers/kafka/bin
#前台启动(到对应的控制台地方了,可以使用ctrl+c退出)
./kafka-server-start.sh /export/servers/kafka/config/server.properties
#后台启动
nohup ./kafka-server-start.sh /export/servers/kafka/config/server.properties 2>&1 &
#启动后,实际上虽然没有出现对应的命令行,但相当于在命令行,可以直接执行jps或者随意的输入什么来显示
#注意:可以启动一台broker,单机版,也可以同时启动三台broker,组成一个kafka集群版
#kafka停止
./kafka-server-stop.sh
#启动zookeeper客户端,在程序里一般称为得到客户端,因为这里使用自己的来使用,而程序是使用他得到的客户端来使用
zkCli.sh
#输入如下:
ls /brokers/ids
#返回了[0, 1, 2]
#可以发现的确注册了
#具体的ls目录,一般来说如果没有其他的操作的话
#只有zookeeper这一个节点(这个节点不是服务器,而是目录的结构说明)
#如果出现其他的,那么基本是操作了zookeeper,或者zookeeper版本的问题
#curl 是一种命令行工具,作用是发出网络请求,然后获取数据
curl -L https://github.com/docker/compose/releases/download/1.8.0/run.sh > /usr/local/bin/docker-compose
#-L 跟随链接重定向,他一般返回的是获取对应文件的数据
#当然,如果是ip加端口,一般是默认其操作的数据,再81章博客内容的最后返回的那个数据就是
#chmod(change mode)命令是控制用户对文件的权限的命令
chmod +x /usr/local/bin/docker-compose #放在这里是为了可以随时的使用的,而不用到对应的目录里面使用"./"执行
#查看版本(顺便会进行拉取操作)
docker-compose --version #因为在/usr/local/bin/里面,所以可以这样执行
#当然不只是查询版本的操作,比如后面的操作创建容器的操作,具体看后面就知道了
#拉取Zookeeper镜像
docker pull zookeeper:3.4
#拉取kafka镜像
docker pull wurstmeister/kafka
#拉取kafka-manager镜像
docker pull sheepkiller/kafka-manager:latest
#既然是镜像,那么一定是弄好了环境,我们只需要启动即可,比如zookeeper镜像里面就有jdk环境
#创建
docker network create --driver bridge --subnet 192.168.0.0/24 --gateway 192.168.0.1 kafka
#查看
docker network ls
#新建网段之后,比如后面创建容器的命令docker-compose,可能会出现:
WARNING: IPv4 forwarding is disabled. Networking will not work.
#解决方式:
#第一步:在宿主机上执行:
echo "net.ipv4.ip_forward=1" >>/usr/lib/sysctl.d/00-system.conf
#第二步:重启network和docker服务:
systemctl restart network && systemctl restart docker
#之后再次的操作即可,如果还没有解决,那么就去百度吧
version: '2' #指定 compose 文件的版本 services: #通过镜像安装容器的配置 zoo1: #一般要与容器名称一样,当然也可以不同,即这里基本可以随便写 #只要没有与这个位置名称相同即可,即唯一即可,这样就可以认为是一个整体的操作配置 #如果有相同的名称,那么谁的配置文件在后面,那么就以谁为主,即覆盖了前面的了,自己测试就知道了 #比如若这里的名称是zoo2,那么就没有zoo1这个容器,因为后面的zoo2覆盖了,即只会出现创建zoo3和zoo2这两个容器 image: zookeeper:3.4 #使用的镜像 restart: always #当Docker重启时,该容器重启 hostname: zoo1 #类似于之前在基于Linux虚拟机Kafka集群中hosts文件的对应值,可以说是主机别名 #当然并不是一定要与容器名称一致,基本可以随便写 container_name: zoo1 #容器的名称 ports: - 2184:2181 #端口映射 environment: #集群环境 ZOO_MY_ID: 1 #当前Zookeeper实例的id #集群节点 ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=zoo2:2888:3888 server.3=zoo3:2888:3888 networks: #使用的网络配置 kafka: ipv4_address: 192.168.0.11 zoo2: image: zookeeper:3.4 restart: always hostname: zoo2 container_name: zoo2 ports: - 2185:2181 environment: ZOO_MY_ID: 2 ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=0.0.0.0:2888:3888 server.3=zoo3:2888:3888 networks: kafka: ipv4_address: 192.168.0.12 zoo3: image: zookeeper:3.4 restart: always hostname: zoo3 container_name: zoo3 ports: - 2186:2181 environment: ZOO_MY_ID: 3 ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=zoo2:2888:3888 server.3=0.0.0.0:2888:3888 networks: kafka: ipv4_address: 192.168.0.13 networks: kafka: external: name: kafka
version: '2' services: kafka1: image: wurstmeister/kafka #image restart: always hostname: kafka1 container_name: kafka1 privileged: true ports: - 9092:9092 environment: #集群环境配置 KAFKA_ADVERTISED_HOST_NAME: kafka1 KAFKA_LISTENERS: PLAINTEXT://kafka1:9092 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:9092 KAFKA_ADVERTISED_PORT: 9092 KAFKA_ZOOKEEPER_CONNECT: zoo1:2181,zoo2:2181,zoo3:2181 external_links: # 配置Zookeeper集群的地址,上面的zoo1,zoo2,zoo3就是下面的信息,下面去找对应容器名称 #所以这里是上面集群的操作zookeeper的前提 - zoo1 - zoo2 - zoo3 networks: kafka: ipv4_address: 192.168.0.14 kafka2: image: wurstmeister/kafka restart: always hostname: kafka2 container_name: kafka2 privileged: true ports: - 9093:9093 environment: KAFKA_ADVERTISED_HOST_NAME: kafka2 KAFKA_LISTENERS: PLAINTEXT://kafka2:9093 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka2:9093 KAFKA_ADVERTISED_PORT: 9093 KAFKA_ZOOKEEPER_CONNECT: zoo1:2181,zoo2:2181,zoo3:2181 external_links: - zoo1 - zoo2 - zoo3 networks: kafka: ipv4_address: 192.168.0.15 kafka3: image: wurstmeister/kafka restart: always hostname: kafka3 container_name: kafka3 privileged: true ports: - 9094:9094 environment: KAFKA_ADVERTISED_HOST_NAME: kafka3 KAFKA_LISTENERS: PLAINTEXT://kafka3:9094 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka3:9094 KAFKA_ADVERTISED_PORT: 9094 KAFKA_ZOOKEEPER_CONNECT: zoo1:2181,zoo2:2181,zoo3:2181 external_links: - zoo1 - zoo2 - zoo3 networks: kafka: ipv4_address: 192.168.0.16 networks: kafka: external: name: kafka
version: '2' services: kafka-manager: image: sheepkiller/kafka-manager:latest restart: always container_name: kafka-manager hostname: kafka-manager ports: - 9000:9000 environment: #可以管理zookeeper集群和kafka集群 #单独写一个也行,因为是集群,如果删除,那么访问时(可以访问),对应的界面可能会出现错误提示 #当然,如果有不存在的,即无论是否有正确的,只要有不存在的,那么也是有错误提示,他们基本是同一个错误提示 ZK_HOSTS: zoo1:2181,zoo2:2181,zoo3:2181 #这里如果都是9092或者单独,其实也可以,因为只要有一个正确的kafka即可(可能都不需要指定,即这里可以删除) #因zookeeper里面包含了kafka集群的信息,自然也包含了地址等等,即可以不用写kafka的配置,即可以删除 KAFKA_BROKERS: kafka1:9092,kafka2:9093,kafka3:9094 APPLICATION_SECRET: letmein KM_ARGS: -Djava.net.preferIPv4Stack=true networks: kafka: ipv4_address: 192.168.0.17 networks: kafka: external: name: kafka
#登录到Kafka容器
#进入一个kafka集群,这里的9218e985e160指定的是容器id,当然指定容器名称也可以
#自己选择一个kafka集群的其中一个
docker exec -it 9218e985e160 /bin/bash
#切换到bin目录
cd opt/kafka/bin/
#执行创建test主题
kafka-topics.sh --create --zookeeper zoo1:2181 --replication-factor 3 --partitions 1 --topic test
#如果test存在,一般执行时会提示错误信息,可以创建testt,将上面的test修改成testt即可
#--create:新建命令
#--zookeeper:Zookeeper节点,一个或多个,因为是集群,创建主题一般会进行操作zookeeper集群,如注册(添加)信息
#其中的zoo1,代表是操作zoo1名称的容器
#--replication-factor:指定副本,每个分区有三个副本
#--partitions:表示几个分区
kafka-topics.sh --create --zookeeper zoo1:2181 --replication-factor 3 --partitions 3 --topic testtt
kafka-topics.sh --list --zookeeper zoo1:2181,zoo2:2181,zoo3:2181 #当然,因为集群,选择一个也可以
kafka-console-producer.sh --broker-list kafka1:9092,kafka2:9093,kafka3:9094 --topic test #上面可以进入控制台,但也只针对于test主题来操作的,其中也可以只写一个节点(kafka1:9092),而不用都写上 #因为他们是集群,只需要指定一个即可,一般是集群的,通常只需要指定一个,有些会自动导致操作集群的可以不写 #如前面docker-compose-manager.yml文件里面的kafka集群配置那里,就可以不写,即可以删除 #然后输入如下: This is a message This is another message dd #ctrl+c退出命令行 #控制台,客户端,命令行,他们可以是一个意思 #无论是否有对应专属的命令窗口,只要可以直接操作具体数据,那么都是一个意思 #或者也可以认为没有专属的会默认到专属窗口里面去,我们一般以这个为主 #那么无论你是使用java得到客户端还是直接使用客户端,都只是该专属窗口的多种打开方式,即客户端可以认为是专属窗口 #就如mysql一样,打开多个窗口,但都可以操作数据
kafka-console-consumer.sh --bootstrap-server kafka1:9092,kafka2:9093,kafka3:9094 --topic test --from-beginning
#显示:
This is a message
This is another message
dd
#使用ctrl+c退出显示,一般使用后,会出现显示了多少行的信息出来
#--broker-list与--bootstrap-server是一样的操作,但针对于命令来说不一样,其中--broker-list是旧版本命令
#即虽然是实现一个功能
#但是对于kafka-console-producer.sh可以是--broker-list和--bootstrap-server
#而kafka-console-consumer.sh只能是--bootstrap-server,否则会提示命令错误
#查看topic主题详情,Zookeeper节点写一个和全部写,效果一致
kafka-topics.sh --describe --zookeeper zoo1:2181,zoo2:2181,zoo3:2181 --topic test
kafka-topics.sh --zookeeper zoo1:2181 --alter --topic test --partitions 8
kafka-topics.sh --zookeeper zoo1:2181 --alter --topic test --config flush.messages=1
#实际上命令的"--xxx"可以不分先后(在54章有类似的说明,比如"-"),自己测试即可
#当然固定后面要有数据的,不能分开
kafka-topics.sh --zookeeper zoo1:2181 --alter --topic test --delete-config flush.messages
#删除后的配置,一般就是根据默认值了,实际上只是没有属性操作赋值而已,所以是操作默认值
kafka-topics.sh --zookeeper zoo1:2181 --delete --topic test
192.168.164.20 kafka1
192.168.164.20 kafka2
192.168.164.20 kafka3
#后面操作时,根据名称自己加上对应端口即可,可以看上图就知道了:kafka1是9092,kafka2是9093,kafka3是9094
#这样主要是为了规范以及好的分别(分别处理,即区分),当然,你直接的写地址也可以
<dependencies> <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients --> <dependency> <!--有KafkaProducer类(消息生产者类)可以操作--> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>1.0.0</version> </dependency> </dependencies> <build> <plugins> <!-- java编译插件 --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.2</version> <configuration> <source>1.8</source><!--开发中的jdk版本--> <target>1.8</target><!--开发后的,即class文件的jdk版本--> <encoding>UTF-8</encoding><!--整个代码使用UTF-8编码--> </configuration> </plugin> </plugins> </build>
package com.lagou; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; import java.util.concurrent.ExecutionException; /** * 消息的生成者 */ public class Producer { public static void main(String[] args) throws ExecutionException, InterruptedException { //要构造消息生产者的对象,需要有关于kafka集群等的配置,可以从Properties文件里加载 //或者说成也可以从Properties对象中加载 //kafka生产者按照固定的key取出对应的value Properties properties = new Properties(); //指定集群节点,在前面的命令中,我们知道 //对于kafka-console-producer.sh可以是--broker-list和--bootstrap-server properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.164.20:9092,192.168.164.20:9093,192.168.164.20:9094"); //只要有一个即可,因为是集群,若有正确的,那么无论是否有不正确的都可以,与zookeeper不同 //zookeeper操作选择,当然并不绝对 //发送消息,网络传输,需要对key和value指定对应的序列化类 properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); //创建消息生产者类的对象,一般通过网络直接操作信息 //通常会操作网络序列化传递(比如dubbo,rabbitmq) //当然上面的传递,一般都理解为序列化(序列化:数据的传递规则,比如AMQP,http,tcp的等等) //并不是java说的类的序列化,当然他也可以说是,因为也是一种规则 //当然AMQP协议相当于我们定义的(虽然用在rabbitmq上),基本上http或者tcp是固定 //所以可以理解成相当于我们自己操作一个规则来操作数据,然后通过http或者tcp传输一样的意思 //通常我们将网络数据的传输操作,称为序列化,自己之间的就叫传递,因为网络之间的传输一般是有一定规则的 //而自己之间通常没有规则 KafkaProducer<String, String> producer = new KafkaProducer<String,String>(properties); //定义主题 String topic = "lagou"; //发出100条消息 for(int i = 1; i<=100; i++){ //设置消息的内容 String msg = "hello," + i; //构建一个消息对象:指定主题和消息 ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topic,msg); //发送,即通过上面的操作,我们可以直接的给主题发送消息 //你可以理解为我们首先进入客户端,然后发送消息,但是这是死板的理解 //不同的连接操作可能不同,实际上只要数据对应,自然也是发送 //所以我们也可以这样理解,窗口只是给定初始数据或者单纯的平台 //具体的执行才是发送,只是这里一步到位(直接发送)而已 //这里是创建主题的地方(方法) //如果主题存在,那么直接给该主题发送消息 //否则若不存在,会帮你创建该主题(名称是你传入的名称),这里就是lagou //然后给该主题发送消息(创建的主题,操作一个分区,一个副本) producer.send(producerRecord); /* 也可以这样,来确认是否发送成功 RecordMetadata recordMetadata = producer.send(producerRecord).get(); System.out.println(recordMetadata); 如果返回了消息,则代表发送成功,否则如果发送失败的话,就会抛出异常,或者返回null,一般是抛出异常 或者这样 Future<RecordMetadata> send = producer.send(producerRecord); RecordMetadata recordMetadata = send.get(); System.out.println(recordMetadata); */ System.out.println("消息发送成功,msg:"+msg); try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } } //关闭消息生产者对象,因为他是连接的,要不然怎么能执行语句呢 //或者说网络的连接的数据通道可以认为是文件的连接,虽然并不是,只是一种理解而已 producer.close(); } }
package com.lagou; import org.apache.kafka.clients.consumer.*; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Collections; import java.util.Properties; /** * */ public class consumer { public static void main(String[] args) { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.164.20:9092,192.168.164.20:9093,192.168.164.20:9094"); //只要有一个即可,因为是集群,若有正确的,那么无论是否有不正确的都可以,与zookeeper不同 //zookeeper操作选择,当然并不绝对 //接收消息,网络传输,需要对key和value指定对应的反序列化类 properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); //指定分组的名称 properties.put(ConsumerConfig.GROUP_ID_CONFIG,"lagou_group1"); //这里你可能会有疑问,为什么之前没有这样的操作呢,在这之前,首先问一个问题 //如果两个消费者同时执行(假设没有分组) //那么取得的数据是否可能是一样的或者可能少了消费数据(偏移量的问题) //答:可能都是同样的数据或者可能少了消费数据,他们都是取得生产者的数据,但实际上必须指定分组 //而使用分组,那么如果消费者都是在一个分组里面,那么你消费时,是分组给你消费的消息 //也就是说,无论这些消费者是启动多少,都是分组去拿数据然后给你的 //这也就使得了,数据不会取得一样的,因为分组只有一个,且当成消费者 //那么自然一个消费者的数据自然是完整的获取生产者的数据的,而不会出现获取相同的数据 //相当于即多个消费者抢占分组获取的消息,分组是保留到集群的,所以固定一个,否则也就相当于只有多个消费者了 //虽然这里说明成是分组给我们消息,实际上分成两种情况 //第一种,如果是一个消费者,那么相当于他去其他分区拿消息,以分组来说,该消费者可以就看出分组 //如果是第二种(上面说明的就是),那么相当于他们各自占用分区拿数据 //后面的消费者负载均衡机制里,会说明这种情况 //即第二种以分组来说,可以理解成,分组将分区的数据划分,而不是一个一个的划分 //上面说明两个消费者可以取得一个数据,那么也就认为不同的组 //可以取得相同的数据,这是必然的,因为按照实际情况来说,用户自然是很多的 //所以分组简单来说就是:约束消息的分发,而让消息只能发给一个消费者 //所以为了规定不会获取同一个消息,一般都操作了分组,否则不指定分组的话,一般会报错 //那么在命令的操作中,是否操作了分组,答:操作了分组 //一般命令那里的分组有默认的(这里java操作没有,所以需要指定) //可能是""(空串),或者是test-consumer-group //也有可能是随机的值(不会操作已经存在的),所以可以不用指定(我们操作命令时也并没有指定) //那么由于分组信息在集群中,所以你消费的数据,就不能再次的消费了 //即这里执行后,再次的执行,没有数据,但是命令操作有,因为不是同一个分组 //一般是分组偏移量的原因,该偏移量是对主题的总体偏移量 //虽然一个分区的是有序的,但是对于他们来说,虽然偏移量对自己都有相同的,但是整体却没有 /* 即可以这样的理解: ========================= = == == = 上面有三个分区,但是他们自己也自带一个偏移量,但是正是因为这样,所以在多个分区操作时 可能获取数据对应的偏移量可能上一个是100,下一个可能是30(到另外一个分区的偏移量了) 或者出现上一个是30,下一个也是30,另外一个正好也是30的偏移量 所以对于整体来说是没有的相同的(虽然偏移量相同) 但是我们也可以看到,当生产者启动,然后消费者启动消费时 对应的偏移量是首先先全部操作完毕,也就是说,三个分区,打印信息时 出现三个相同偏移量,然后再下一个,根据这个理解 我们可以继续解释前面的发送消息到分区域以及获取分区消息的策略,大概是偏移量平均 即首先先弄好所有的偏移量,然后才能下一个偏移量 但先给分区那个基本就是随机了(可能也有策略,如前面说的随机且有序) 获取消息也是如此,先获取同一个偏移量(具体是那个分区,可能也有策略,如前面说的随机且有序) 然后才能下一个偏移量,当然上面的偏移量可能也是操作策略,这里简称"偏移量策略" 这里是程序的打印,接下来我们可以通过命令的打印,可以发现,与程序的不同,通过研究发现 之所以不同是因为他先操作已经消费的数据,也就是说,如果是已经存在的数据,那么就有如下 他是选择一个分区,将所有的消息打印出现,而不是一个偏移量 只是这个选择分区也是根据策略的,如前面说的随机且有序 但他并不是根据相同偏移量来的(与偏移量策略不同),简称"直接策略" 继续通过测试,可以发现,无论是程序还是命令 如果是实时的获取,那么就是按照偏移量策略,否则就是直接策略 即很明显他们之间肯定是有不同的设置,具体可以百度策略 因为我们主要操作程序,所以获取消息的策略(不是选择分区的策略) 以偏移量策略为主,直接策略为辅 */ //消息消费者对象 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties); //定义主题 String topic = "lagou"; //订阅消息 consumer.subscribe(Collections.singletonList(topic)); while(true){ System.out.println(1); //打印多次次,因为获取值,只会获取一个值 //获取消息的方法是一个阻断式方法,只要没有消息就会一直等待,如果有消息立即读取消息 //他获取主题数据时,如果没有主题,那么他会帮忙创建(默认也是一个分区,一个副本) //名称是你传入的名称,这里就是lagou ConsumerRecords<String, String> poll = consumer.poll(500);//500就是超时时间 //如果在超时时间内,没有消息,那么返回数据 //即没有值的数据,自然下面的集合不会得到,因为是空的集合 //自然读取的是一个消息(包含了除了对应对应偏移量从0开始的消息外,还有其他信息,比如主题,偏移量等等) //集合的长度,如果没有数据(即空集合),则是0,否则一般有数据,也只是一个数据,则是1 System.out.println(poll.count()); for(ConsumerRecord<String,String> consumer1 : poll){ System.out.println("主题:"+consumer1.topic()); System.out.println("偏移量:"+consumer1.offset()); System.out.println("消息:"+consumer1.value()); } System.out.println("========"); } //因为上面是无限循环,所以这里不用写如下: //consumer.close(); //写了会报错提示,除非上面的代码不无限,这是idea的大致检查的作用 } }
//ack=0 //producer无需等待来自broker的确认而继续发送下一批消息 //这种情况下数据传输效率最高,但是数据可靠性确是最低的 properties.put(ProducerConfig.ACKS_CONFIG,"0"); //ack=1 //producer只要收到一个分区副本成功写入的通知就认为推送消息成功了 //这里有一个地方需要注意,这个副本必须是leader副本 //只有leader副本成功写入了,producer才会认为消息发送成功 //也就是说,我们在前面发送消息都是操作leader副本(或者说是主题存在的服务器,因为主题一般在主服务器) //然后他给我们操作到其他分区,那么可能给分区消息时,可能会先给主,其他的可能是随机且有序的 //当然,一般都是整体的 //而读取并没有这样的解释,读取是操作整体 properties.put(ProducerConfig.ACKS_CONFIG,"1"); //默认这个 //ack=-1 //简单来说就是,producer只有收到分区内所有副本的成功写入的通知才认为推送消息成功了 properties.put(ProducerConfig.ACKS_CONFIG,"-1"); //这里的副本,都是机器,分为:follower副本和leader副本,即kafka集群的节点 //上面写在生产者代码里面,使用对应的配置前面,这是基本的操作,使得生效
Kafka动作 | 看书动作 |
---|---|
消费消息 | 看书 |
offset位移 | 书签 |
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false"); #然后再消费代码后面加上提交位移的代码,具体可以百度,比如consumer.commitAsync();,异步提交 #commitSync()是同步提交 #同步自然是根据代码顺序来执行的,我没有执行完,后面的代码不能操作 #而异步则不会,后面的代码照常操作 #上面写在消费者代码里面,使用对应的配置前面,这是基本的操作,使得生效 #那么有个问题,消费者除了消息丢失外,还有其他的问题吗,答:有 #虽然是自动提交,但是再kafka中,自动这个词,可能是有延迟的,那么假设为5秒提交一次(虽然一般都默认是5秒) #那么在这个中间,是没有提交了,当同样的组的消费者消费时 #可能在这个间隙里面,会消费掉,也就使得消费了同一个消息了 #当然,如果你并没有设置手动的提交 #那么由于真实偏移量(我们操作消费的消息是操作下标,而不是起始,这个叫做下标偏移量)不变 #即虽然你消费时,是往后消费的 #但是其他的同组消费者或者本身再次的消费 #是以偏移量消费,而不是你的下标偏移量消费,具体的解释就是,只要你不提交偏移量 #那么其他的消费者就不会以对应的你没有提交的偏移量开始 #所以偏移量虽然说成是下标,实际上真正的意思却是可以认为是起始下标 #上面你可以通过测试来完成,其中自动的是有延迟的,可以等待多执行一下,一般上面的解释没有错误,因为我就是测试过的 #除非我的测试有误差
public interface Partitioner extends Configurable, Closeable { /** * Compute the partition for the given record. * * @param topic The topic name * @param key The key to partition on (or null if no key) * @param keyBytes The serialized key to partition on( or null if no key) * @param value The value to partition on or null * @param valueBytes The serialized value to partition on or null * @param cluster The current cluster metadata */ public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster); /** * This is called when partitioner is closed. */ public void close(); }
//设置这个就使用该类地址的方法,如果该类没有实现Partitioner接口,那么启动时会报错,自己测试便可
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.kafka.utils.PartitionerUtil");
//com.kafka.utils.PartitionerUtil这个就是自己定义的实现了Partitioner接口的类的地址
//那么就会使用他的方法,而不会使用默认的实现类DefaultPartitioner.partition()的方法了
//若自定义的方法里面并不操作分区策略,那么还是会操作默认的实现类DefaultPartitioner.partition()的方法了
//即自定义方法的操作分区策略会导致会不会操作默认的实现类DefaultPartitioner的分区策略,如果有操作分区策略
//即也就不会调用DefaultPartitioner.partition()方法,如果没有操作分区策略
//那么任然会调用DefaultPartitioner.partition()方法来操作分区策略
//当然,你可以验证是否执行了自定义的partition()方法,即可以在里面输入System.out.println(2);
//看看发送信息的打印信息
//你可以看到,在发送的代码中send方法里面操作了这个自定义的partition()方法,看打印先后就知道了
/** * Creates a record to be sent to a specified topic and partition * * @param topic The topic the record will be appended to * @param partition The partition to which the record should be sent * @param key The key that will be included in the record * @param value The record contents */ //之前发送消息的方法,new ProducerRecord<>(topic,msg);,即ProducerRecord的其他构造方法 public ProducerRecord(String topic, Integer partition, K key, V value) { this(topic, partition, null, key, value, null); } //其中partition代表分区编号,在kafka的界面客户端可以看到,从0开始,如果是三个分区,那么一般就是0,1,2
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { //获取该topic的分区列表 List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); //获得分区的个数 int numPartitions = partitions.size(); //如果key值为null if (keyBytes == null) { //如果没有指定key,那么就是轮询 //维护一个key为topic的ConcurrentHashMap,并通过CAS操作的方式对value值执行递增+1操作 int nextValue = this.nextValue(topic); //获取该topic的可用分区列表 List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic); if (availablePartitions.size() > 0) { //如果可用分区大于0 //执行求余操作,保证消息落在可用分区上 int part = Utils.toPositive(nextValue) % availablePartitions.size(); return ((PartitionInfo)availablePartitions.get(part)).partition(); } else { // 没有可用分区的话,就给出一个不可用分区 return Utils.toPositive(nextValue) % numPartitions; } } else { //不过指定了key,key肯定就不为null // 通过计算key的hash,确定消息分区 return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; } }
//指定分组的名称
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"lagou_group1");
/*
单位B<KB<MB<GB<TB<PB<EB<ZB<YB<BB
9223372036854775807B =
9223372036854775808B-1B =
9007199254740992KB-1B =
8796093022208MB-1B =
8589934592GB-1B =
8388608TB-1B =
8192PB-1B =
8EB-1B
*/
# 启动Zookeeper
zkServer.sh start
#启动Kafka
nohup ./kafka-server-start.sh /export/servers/kafka/config/server.properties 2>&1 &
#将安装包上传至 node01服务器的/export/software路径下,然后解压
cd /export/software/
unzip kafka-eagle.zip #如果没有unzip命令,下载yum install unzip即可
cd kafka-eagle/kafka-eagle-web/target/
tar -zxf kafka-eagle-web-2.0.1-bin.tar.gz -C /export/servers
cd /export/servers
CREATE DATABASE eagle CHARACTER SET utf8;
cd /export/servers/kafka-eagle-web-2.0.1/conf
vi system-config.properties
#内容如下:(记得修改如下)
kafka.eagle.zk.cluster.alias=cluster1 #指定集群的别名,指定了那么就会操作下面别名的配置
#没有的则不会操作,或者对应没有使用别名的配置直接跳过,所以你写上其他的基本不会有什么错误
#比如就在某一行写上dd也没有问题,但最好不要,可能不同的版本会有操作
cluster1.zk.list=node1:2181,node2:2181,node3:2181
#因为kafka在zookeeper里面,那么可以通过zookeeper来操作对应的kafka集群信息,自然只需要指定zookeeper集群即可
#数据库的地址连接
kafka.eagle.driver=com.mysql.jdbc.Driver
kafka.eagle.url=jdbc:mysql://192.168.164:128:3306/eagle
kafka.eagle.username=root
kafka.eagle.password=QiDian@666 #写上自己的密码
select host from user where user='root';
update user set host='%' where user='root';
flush privileges;
vi /etc/profile
#内容如下:
export KE_HOME=/export/servers/kafka-eagle-web-2.0.1
export PATH=:$KE_HOME/bin:$PATH
#让修改立即生效,执行
source /etc/profile
cd /export/servers/kafka-eagle-web-2.0.1/bin
chmod u+x ke.sh
./ke.sh start
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。