赞
踩
启动Kafka前需先启动zookeeper,如果已装zookeeper,请忽略
1.一台服务器
2.伪分布式kafka 1,2,3
3.创建Kafka目录结构
kafkalog_1/2/3 用于存放不同节点的日志
4.将kafka目录解压到此处并修改为kafka_2.11
5.配置环境变量:vim /etc/profile
添加 export KAFKA_HOME=/zxy/apps/kafka/kafka_2.11
export PATH=$PATH:$ZK_HOME/bin:$KAFKA_HOME/bin
刷新资源 source /etc/profile
[root@hadoop_zxy config]#
cp server.properties ./server_1.properties
cp server.properties ./server_2.properties
cp server.properties ./server_3.properties
## 主要修改或添加一下内容,伪分布式安装时broker.id不同,不同节点配置不同的端口
broker.id=1
port=9092
listeners=PLAINTEXT://hadoop_zxy:9092
log.dirs=/zxy/apps/kafka/kafkalog_1
zookeeper.connect=hadoop_zxy:2181,hadoop_zxy:2182,hadoop_zxy:2183
# 指定配置文件启动Kafka服务
[root@hadoop_zxy kafka_2.11]#
bin/kafka-server-start.sh -daemon config/server_1.properties
bin/kafka-server-start.sh -daemon config/server_2.properties
bin/kafka-server-start.sh -daemon config/server_3.properties
[root@hadoop_zxy kafka_2.11]# jps
5104 Kafka
5936 ZooKeeperMain
29858 QuorumPeerMain
29715 QuorumPeerMain
12069 Jps
4678 Kafka
29977 QuorumPeerMain
5564 Kafka
[root@hadoop_zxy kafka_2.11]#
[root@hadoop_zxy bin]# sh zookeeper-shell.sh hadoop_zxy:2181
Connecting to hadoop_zxy:2181
Welcome to ZooKeeper!
JLine support is disabled
WATCHER::
WatchedEvent state:SyncConnected type:None path:null
ls /
[cluster, controller, controller_epoch, brokers, zookeeper, admin, isr_change_notification, consumers, log_dir_event_notification, latest_producer_id_block, config]
ls /brokers
[ids, topics, seqid]
ls /brokers/ids
[1, 2, 3]
[root@hadoop_zxy kafka_2.11]# bin/kafka-topics.sh --create --zookeeper hadoop_zxy:2181 --replication-factor 2 --partitions 3 --topic test
Created topic "test".
[root@hadoop_zxy kafka_2.11]#
生产者
[root@hadoop_zxy kafka_2.11]# bin/kafka-console-producer.sh --broker-list hadoop_zxy:9092 --topic test
>zxy
>
消费者
[root@hadoop_zxy kafka_2.11]# sh $KAFKA_HOME/bin/kafka-console-consumer.sh --topic test --bootstrap-server hadoop_zxy:9092
zxy
开启Kafka
[root@hadoop_zxy scripts]# sh start-kafka.sh k1
[root@hadoop_zxy scripts]# sh start-kafka.sh k2
[root@hadoop_zxy scripts]# sh start-kafka.sh k3
[root@hadoop_zxy scripts]# jps
20545 Kafka
29858 QuorumPeerMain
29715 QuorumPeerMain
20197 Kafka
29977 QuorumPeerMain
20574 Jps
19806 Kafka
[root@hadoop_zxy scripts]#
开启producer
[root@hadoop_zxy scripts]# sh start-producter.sh test
>zxy
开启consumer
[root@hadoop_zxy scripts]# sh start-consumer.sh test
zxy
start-kafka.sh 用于开启zookeeper,kafka服务,由于开启前后顺序不能变,所以这里应手动先开启zookeeper,再开启Kafka
start-producer.sh 用于开启Kafka的生产端
start-consumer.sh 用于开启Kafka的消费端
#!/bin/bash # filename:start-kafka.sh # autho:zxy # date:2021-07-19 # KAFKA的安装路径 KAFKA_HOME=/data/apps/kafka_2.11-2.4.1 # 接受参数 CMD=$1 ## 帮助函数 usage() { echo "usage:" echo "start-kafka.sh zookeeper/kafka/stopz/stopk" echo "description:" echo " zookeeper:start zookeeperService" echo " kafka:start kafkaService" echo " stopz:stop zookeeperService" echo " stopk:stop kafkaService" exit 0 } if [ ${CMD} == "zookeeper" ];then # 启动kafka的zookeeper服务 sh $KAFKA_HOME/bin/zookeeper-server-start.sh -daemon $KAFKA_HOME/config/zookeeper.properties elif [ ${CMD} == "kafka" ];then # 启动kafka的Kafka服务 sh $KAFKA_HOME/bin/kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties elif [ ${CMD} == "stopz" ];then sh $KAFKA_HOME/bin/zookeeper-server-stop.sh -daemon $KAFKA_HOME/config/zookeeper.properties elif [ ${CMD} == "stopk" ];then sh $KAFKA_HOME/bin/kafka-server-stop.sh -daemon $KAFKA_HOME/config/server.properties ps -aux|grep kafka|gawk '{print $2}'|xargs -n1 kill -9 else usage fi
#!/bin/bash # filename:start-producer.sh # autho:zxy # date:2021-07-19 # KAFKA的安装路径 KAFKA_HOME=/data/apps/kafka_2.11-2.4.1 #接收参数 CMD=$1 ##帮助函数 usage(){ echo "usage:" echo "sh start-producer.sh topicName" } sh $KAFKA_HOME/bin/kafka-console-producer.sh --topic ${CMD} --broker-list hadoop:9092
#!/bin/bash # filename:start-consumer.sh # autho:zxy # date:2021-07-19 # KAFKA的安装路径 KAFKA_HOME=/data/apps/kafka_2.11-2.4.1 #接收参数 CMD=$1 ##帮助函数 usage(){ echo "usage:" echo "sh start-consumer.sh topicName" } sh $KAFKA_HOME/bin/kafka-console-consumer.sh --topic ${CMD} --bootstrap-server hadoop:9092
package com.zxy.bigdata.spark.streaming.day2 import com.zxy.bigdata.spark.streaming.day2.Demo2_Offset_Zookeeper.client import kafka.common.TopicAndPartition import kafka.message.MessageAndMetadata import kafka.serializer.StringDecoder import org.apache.curator.framework.CuratorFrameworkFactory import org.apache.curator.retry.ExponentialBackoffRetry import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.InputDStream import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange} import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.zookeeper.CreateMode import scala.collection.{JavaConversions, mutable} object Demo2_Offset_Zookeeper extends LoggerTrait { // 就是启动之后的zk client val client = { val client = CuratorFrameworkFactory.builder() .connectString("hadoop:2181") .retryPolicy(new ExponentialBackoffRetry(1000, 3)) .build() client.start() client } val BASEPATH = "/kafka/consumers/offsets" def main(args: Array[String]): Unit = { val sparkConf = new SparkConf() .setAppName("Demo2_Offset_Zookeeper") .setMaster("local[*]") val duration = Seconds(2) val ssc = new StreamingContext(sparkConf, duration) val kafkaParams = Map[String, String]( "bootstrap.servers" -> "hadoop:9092", "group.id" -> "hzbigdata2103", "auto.offset.reset" -> "smallest" ) val topics: Set[String] = "kafka-zk".split(",").toSet val messages: InputDStream[(String, String)] = createMsg(ssc, kafkaParams, topics) messages.foreachRDD((rdd, time) => { if (!rdd.isEmpty()) { println("-" * 30) println(s"Time : ${time}") println(s"####### RDD Count : ${rdd.count()}") // 存储偏移量 saveOffsets(rdd.asInstanceOf[HasOffsetRanges].offsetRanges, kafkaParams("group.id")) println("-" * 30) } }) ssc.start() ssc.awaitTermination() } /** * 从指定偏移量开始获取kafka的数据 * 1. 从zk中读取到指定主题的偏移量 * 2. 然后kafka从指定的偏移量开始消费 * 3. 如果没有读取到偏移量说明,这个消费者组是第一次消费这个主题,那么我们需要在zk中创建目录,然后从kafka的这个主题的最开始的位置开始消费 */ def createMsg(ssc:StreamingContext, kafkaParams:Map[String, String], topics: Set[String]): InputDStream[(String, String)] = { // 1. 从zk中读取到指定主题的偏移量 val fromOffsets: Map[TopicAndPartition, Long] = getFromOffsets(topics, kafkaParams("group.id")) // 2. 没有读取到,fromOffsets是Map() var messages:InputDStream[(String, String)] = null // 3. 根据偏移量的map决定如何读取得到kafka的message if (fromOffsets.isEmpty) { // 没有读取到偏移量信息, 从头开始读取 messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics) }else { // 从偏移量位置开始读取 val messageHandler = (msgHandler:MessageAndMetadata[String, String]) => (msgHandler.key(), msgHandler.message()) messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder,(String, String)](ssc, kafkaParams, fromOffsets, messageHandler) } messages } /** * 从zk中读取指定消费者组的指定的主题的偏移量信息 * 官方:/kafka/consumers/${group.id}/offsets/${topic}/${partition} --> data offset * 存放:/kafka/consumers/offsets/${topic}/${group.id}/${partition} --> data offset * * zk:Curator */ def getFromOffsets(topics:Set[String], groupId:String):Map[TopicAndPartition, Long] = { //1. 创建一个可变map用于存放结果 val offsets: mutable.Map[TopicAndPartition, Long] = mutable.Map[TopicAndPartition, Long]() //2. 遍历我需要的主题 topics.foreach(topic => { val path = s"${BASEPATH}/${topic}/${groupId}" // 3. 判断此路径是否存在 checkExists(path) // 4. 遍历partition JavaConversions.asScalaBuffer(client.getChildren.forPath(path)).foreach(partition => { val fullPath = s"${path}/${partition}" // 拼凑完整路径 val offset = new String(client.getData.forPath(fullPath)).toLong // 获取到偏移量 offsets.put(TopicAndPartition(topic, partition.toInt), offset) }) }) //5. 返回存放的结果即可 offsets.toMap } /** * 校验路径是否存在,如果存在,啥也不做,如果不存在,就创建之 */ def checkExists(path: String) = { if (client.checkExists().forPath(path) == null) { // 说明不存在 client.create() // 就创建 .creatingParentsIfNeeded() .withMode(CreateMode.PERSISTENT) .forPath(path) } } /** * 存储偏移量到zk */ def saveOffsets(offsetRanges:Array[OffsetRange], groupId:String) = { for(offsetRange <- offsetRanges) { val topic = offsetRange.topic val partition = offsetRange.partition val offset = offsetRange.untilOffset + 1L val path = s"${BASEPATH}/${topic}/${groupId}/${partition}" checkExists(path) client.setData().forPath(path, offset.toString.getBytes) println(s"${topic} -> ${partition} -> ${offsetRange.fromOffset} -> ${offset}") } } }
##1. 创建一个新的主题
[root@hadoop bin]# kafka-topics.sh --create --topic kafka-zk --partitions 3 --replication-factor 1 --zookeeper hadoop:2181/kafka
##2. 开启生产者
[root@hadoop bin]# kafka-console-producer.sh --topic kafka-zk --broker-list hadoop:9092
##3. 执行代码即可
<!-- HBase -->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>${hbase.version}</version>
</dependency>
package com.zxy; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.filter.BinaryComparator; import org.apache.hadoop.hbase.filter.CompareFilter; import org.apache.hadoop.hbase.filter.RowFilter; import org.junit.Test; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; public class HBaseConnectionPool { private static LinkedList<Connection> pool = new LinkedList<Connection>(); static { try { Configuration config = HBaseConfiguration.create(); config.addResource(HBaseConfiguration.class.getClassLoader().getResourceAsStream("hbase-site.xml")); for (int i = 0; i < 5; i++) { pool.push(ConnectionFactory.createConnection(config)); } }catch (Exception e) { e.printStackTrace(); } } /** * 获取的connection实际是从pool中获取 */ public static Connection getConnection() { while (pool.isEmpty()) { try { System.out.println("pool is empty, please wait for a moment"); Thread.sleep(1000); }catch (Exception e) { e.printStackTrace(); } } return pool.poll(); } /** * 将使用完毕之后的连接对象归还给连接池 */ public static void realse(Connection connection) { if (connection != null) pool.push(connection); } /** * 向hbase中保存数据 */ public static void set(Connection connection, TableName tableName, byte[] rowkey, byte[] columnFamily, byte[] column, byte[] value) { try { Table table = connection.getTable(tableName); Put put = new Put(rowkey); put.addColumn(columnFamily, column, value); table.put(put); table.close(); }catch (Exception e) { e.printStackTrace(); } } /** * 获取到指定的hbase中的表中的指定行键的列簇的列的值 */ public static String getColValue(Connection connection, TableName tableName, byte[] rowkey, byte[] columnFamily, byte[] column) { try { Table table = connection.getTable(tableName); Result result = table.get(new Get(rowkey)); return new String(result.getValue(columnFamily, column)); } catch (IOException e) { e.printStackTrace(); } return null; } /** * rowkey : topic-group * return partion-offset */ public static Map<Integer, Long> getColValue(Connection connection, TableName tableName, byte[] rowkey) { Map<Integer, Long> partition2Offset = new HashMap<>(); try { Table table = connection.getTable(tableName); Scan scan = new Scan(); RowFilter rowFilter = new RowFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(rowkey)); scan.setFilter(rowFilter); ResultScanner scanner = table.getScanner(scan); for (Result result : scanner) { List<Cell> cells = result.listCells(); for (Cell cell : cells) { //col : partition Integer partition = Integer.parseInt(new String(CellUtil.cloneQualifier(cell))); //value : offset Long offset = Long.parseLong(new String(CellUtil.cloneValue(cell))); partition2Offset.put(partition, offset); } } table.close(); } catch (IOException e) { e.printStackTrace(); } return partition2Offset; } @Test public void test() { Connection connection = HBaseConnectionPool.getConnection(); TableName tableName = TableName.valueOf("hbase-spark"); // HBaseConnectionPool.set(connection, tableName, "kafka-zk".getBytes(), "cf".getBytes(), "0".getBytes(), "0".getBytes()); Map<Integer, Long> map = HBaseConnectionPool.getColValue(connection, tableName, "kafka-zk".getBytes()); System.out.println(map.size()); for (Map.Entry<Integer, Long> entry : map.entrySet()) { System.out.println(entry.getKey() + ":" + entry.getValue()); } } }
<?xml version="1.0"?> <?xml-stylesheet type="text/xsl" href="configuration.xsl"?> <!-- /** * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ --> <configuration> <property> <name>hbase.cluster.distributed</name> <value>true</value> </property> <property> <name>hbase.rootdir</name> <value>hdfs://hadoop:9000/hbase</value> </property> <property> <name>hbase.zookeeper.quorum</name> <value>hadoop</value> </property> </configuration>
package com.zxy.bigdata.spark.streaming.day2 import com.zxy.HBaseConnectionPool import kafka.common.TopicAndPartition import kafka.message.MessageAndMetadata import kafka.serializer.StringDecoder import org.apache.curator.framework.CuratorFrameworkFactory import org.apache.curator.retry.ExponentialBackoffRetry import org.apache.hadoop.hbase.TableName import org.apache.hadoop.hbase.client.Connection import org.apache.hadoop.hbase.util.Bytes import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.InputDStream import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange} import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.zookeeper.CreateMode import java.{lang, util} import scala.collection.{JavaConversions, mutable} object Demo3_Offset_HBase extends LoggerTrait { def main(args: Array[String]): Unit = { if (args == null || args.length != 3) { println( """ |usage <brokerList> <groupId> <topicstr> |""".stripMargin) System.exit(-1) } val Array(brokerList, groupId, topicstr) = args; val sparkConf = new SparkConf() .setAppName("Demo3_Offset_HBase") .setMaster("local[*]") val duration = Seconds(2) val ssc = new StreamingContext(sparkConf, duration) val kafkaParams = Map[String, String]( "bootstrap.servers" -> brokerList, "group.id" -> groupId, "auto.offset.reset" -> "smallest" ) val topics: Set[String] = topicstr.split(",").toSet val messages: InputDStream[(String, String)] = createMsg(ssc, kafkaParams, topics) messages.foreachRDD((rdd, time) => { if (!rdd.isEmpty()) { println("-" * 30) println(s"Time : ${time}") println(s"####### RDD Count : ${rdd.count()}") // 存储偏移量 saveOffsets(rdd.asInstanceOf[HasOffsetRanges].offsetRanges, kafkaParams("group.id")) println("-" * 30) } }) ssc.start() ssc.awaitTermination() } /** * 从指定偏移量开始获取kafka的数据 * 1. 从zk中读取到指定主题的偏移量 * 2. 然后kafka从指定的偏移量开始消费 * 3. 如果没有读取到偏移量说明,这个消费者组是第一次消费这个主题,那么我们需要在zk中创建目录,然后从kafka的这个主题的最开始的位置开始消费 */ def createMsg(ssc:StreamingContext, kafkaParams:Map[String, String], topics: Set[String]): InputDStream[(String, String)] = { // 1. 从zk中读取到指定主题的偏移量 val fromOffsets: Map[TopicAndPartition, Long] = getFromOffsets(topics, kafkaParams("group.id")) // 2. 没有读取到,fromOffsets是Map() var messages:InputDStream[(String, String)] = null // 3. 根据偏移量的map决定如何读取得到kafka的message if (fromOffsets.isEmpty) { // 没有读取到偏移量信息, 从头开始读取 messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics) }else { // 从偏移量位置开始读取 val messageHandler = (msgHandler:MessageAndMetadata[String, String]) => (msgHandler.key(), msgHandler.message()) messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder,(String, String)](ssc, kafkaParams, fromOffsets, messageHandler) } messages } /** * 从zk中读取指定消费者组的指定的主题的偏移量信息 * 官方:/kafka/consumers/${group.id}/offsets/${topic}/${partition} --> data offset * 存放:/kafka/consumers/offsets/${topic}/${group.id}/${partition} --> data offset * * zk:Curator */ def getFromOffsets(topics:Set[String], groupId:String):Map[TopicAndPartition, Long] = { //1. 创建一个可变map用于存放结果 val offsets: mutable.Map[TopicAndPartition, Long] = mutable.Map[TopicAndPartition, Long]() //2. 遍历我需要的主题 val connection: Connection = HBaseConnectionPool.getConnection val tableName: TableName = TableName.valueOf("hbase-spark") val cf: Array[Byte] = Bytes.toBytes("cf") topics.foreach(topic => { val rk = Bytes.toBytes(s"${topic}-${groupId}") val partition2Offsets: util.Map[Integer, lang.Long] = HBaseConnectionPool.getColValue(connection, tableName, rk) JavaConversions.mapAsScalaMap(partition2Offsets).foreach { case (partition, offset) => offsets.put(TopicAndPartition(topic, partition), offset) } }) HBaseConnectionPool.realse(connection) //5. 返回存放的结果即可 offsets.toMap } /** * 存储偏移量到zk */ def saveOffsets(offsetRanges:Array[OffsetRange], groupId:String) = { val connection: Connection = HBaseConnectionPool.getConnection val tableName: TableName = TableName.valueOf("hbase-spark") val cf: Array[Byte] = Bytes.toBytes("cf") for(offsetRange <- offsetRanges) { val topic = offsetRange.topic val partition = offsetRange.partition val offset = offsetRange.untilOffset + 1L val rk = s"${topic}-${partition}".getBytes() HBaseConnectionPool.set(connection, tableName, rk, cf, partition.toString.getBytes(), offset.toString.getBytes()) println(s"${topic} -> ${partition} -> ${offsetRange.fromOffset} -> ${offset}") } HBaseConnectionPool.realse(connection) } }
hbase(main):002:0> create 'hbase-spark','cf'
0 row(s) in 1.4630 seconds
=> Hbase::Table - hbase-spark
## 开启16000、16020端口
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。