赞
踩
1、上传压缩包到任意节点
2、解压,配置环境变量 所有节点都配置
3、修改config/server.properties
1、broker.id=0,每一个节点broker.id 要不一样
2、zookeeper.connect=master:2181,node1:2181,node2:2181
3、log.dirs=/usr/local/soft/kafka_2.11-1.0.0/data 消息存放的位置
4、复制到其它节点
scp -r kafka_2.11-1.0.0 node2:pwd
scp -r kafka_2.11-1.0.0 node1:pwd
5、修改每个节点的broker.id master=0 node1=1 node2=2
6、启动(kafka可以不依赖于Hadoop,但是要依赖于zookeeper)
1、启动zookeeper, 需要在所有节点启动
zkServer.sh start
查看状态
zkServer.sh status
3,在每台节点启动broker, kafka是去中心化的架构 -daemon 后台启动 在所有节点启动
kafka-server-start.sh -daemon /usr/local/soft/kafka_2.11-1.0.0/config/server.properties
1、创建topic
–replication-factor —每一个分区的副本数量
–partition --分区数, 根据数据量设置
伪分布式的时候,副本数设置一个就可
kafka-topics.sh --create --zookeeper master:2181 --replication-factor 1 --partitions 3 --topic test_topic1
2、查看topic描述信息
kafka-topics.sh --describe --zookeeper master:2181 --topic test_topic1
3、获取所有topic
kafka-topics.sh --list --zookeeper master:2181
4、创建控制台生产者
kafka-console-producer.sh --broker-list master:9092 --topic test_topic1
5、创建控制台消费者 --from-beginning 从头消费,, 如果不在执行消费的新的数据
kafka-console-consumer.sh --bootstrap-server master:9092 --from-beginning --topic test_topic1
重置kafka
1、关闭kafka
kill -9
2、删除元数据 zk
zkCli.sh
删除预kafka有关的所有信息
ls /
rmr /config
rmr /brokers
3、删除kafka的数据 所有节点都要删除
rm -rf /usr/local/soft/kafka_2.11-1.0.0/data
4 重启
kafka-server-start.sh -daemon /usr/local/soft/kafka_2.11-1.0.0/config/server.properties
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>ShuJia01</artifactId> <groupId>ShuJia</groupId> <version>1.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>kafka</artifactId> <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.11</artifactId> <version>1.11.2</version> </dependency> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>2.11.12</version> </dependency> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-compiler</artifactId> <version>2.11.12</version> </dependency> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-reflect</artifactId> <version>2.11.12</version> </dependency> </dependencies> <build> <plugins> <!-- Java Compiler --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.1</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> <!-- Scala Compiler --> <plugin> <groupId>org.scala-tools</groupId> <artifactId>maven-scala-plugin</artifactId> <version>2.15.2</version> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project>
package com.shujia.source import java.util.Properties import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer object Demo3KafkaProducer { def main(args: Array[String]): Unit = { val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment val properties = new Properties() properties.setProperty("bootstrap.servers", "192.168.5.201:9092") properties.setProperty("group.id", "test") //创建flink kafka 消费者 val flinkKafkaConsumer = new FlinkKafkaConsumer[String]("test_topic1", new SimpleStringSchema(), properties) // flinkKafkaConsumer.setStartFromEarliest() // 尽可能从最早的记录开始 // flinkKafkaConsumer.setStartFromLatest() // 从最新的记录开始 //flinkKafkaConsumer.setStartFromTimestamp(...) // 从指定的时间开始(毫秒) /** * 如果消费者组之前不存在,读取最新的数据 * 如果消费者组已存在,接着之前读取数据 * */ flinkKafkaConsumer.setStartFromEarliest() // 默认的方法 val kafkaDS: DataStream[String] = env.addSource(flinkKafkaConsumer) kafkaDS.print() env.execute() } }
package com.shujia import java.util import java.util.Properties import org.apache.kafka.clients.consumer.{ConsumerRecord, ConsumerRecords, KafkaConsumer} object Demo3Comsumer { def main(args: Array[String]): Unit = { //1、创建消费者 val properties = new Properties() //指定kafka的broker的地址 properties.setProperty("bootstrap.servers", "master:9092") properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") properties.setProperty("group.id", "asdasdd") /** * earliest * 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费 * latest * 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据 * none * topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常 * */ //从最早读取数据 properties.put("auto.offset.reset", "earliest") val consumer = new KafkaConsumer[String, String](properties) println("链接创建成功") //订阅topic val topics = new util.ArrayList[String]() topics.add("student2") consumer.subscribe(topics) while (true) { //消费数据 val records: ConsumerRecords[String, String] = consumer.poll(1000) println("正在消费数据") //获取读到的所有数据 val iterator: util.Iterator[ConsumerRecord[String, String]] = records.iterator() while (iterator.hasNext) { //获取一行数据 val record: ConsumerRecord[String, String] = iterator.next() val topic: String = record.topic() val patition: Int = record.partition() val offset: Long = record.offset() val key: String = record.key() val value: String = record.value() //默认是系统时间 val ts: Long = record.timestamp() println(s"$topic\t$patition\t$offset\t$key\t$value\t$ts") } } consumer.close() } }
package com.shujia import java.util.Properties import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} object Demo1kafkaproducer { def main(args: Array[String]): Unit = { /** * 1、创建kfaka链接 * 创建生产者 */ val properties = new Properties() //指定kafka的broker的地址 properties.setProperty("bootstrap.servers", "master:9092") //key和value序列化类 properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer") //生产者 val producer = new KafkaProducer[String, String](properties) //生产数据 //topic 不存在会自动创建一个分区为1副本为1的topic val record = new ProducerRecord[String, String]("test1", "java") producer.send(record) //将数据刷到kafka中 producer.flush() //关闭链接 producer.close() } }
package com.shujia import java.util.Properties import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} import scala.io.Source object Demo2Studentkafka { def main(args: Array[String]): Unit = { /** * 1、创建kfaka链接 * 创建生产者 * */ val properties = new Properties() //指定kafka的broker的地址 properties.setProperty("bootstrap.servers", "master:9092") //key和value序列化类 properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer") //生产者 val producer = new KafkaProducer[String, String](properties) //读取学生表 Source .fromFile("data/students.txt") .getLines() .foreach(student => { //将用一个班级的学生打入同一个分区 val clazz: String = student.split(",")(4) val partition: Int = math.abs(clazz.hashCode) % 2 //将数据发送到kafka //kafka-topics.sh --create --zookeeper master:2181 --replication-factor 1 --partitions 2 --topic student2 val record = new ProducerRecord[String, String]("student2", partition, null, student) producer.send(record) producer.flush() }) //关闭链接 producer.close() } }
感谢阅读,我是啊帅和和,一位大数据专业大四学生,祝你快乐。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。