赞
踩
先创建一个主题
更改分区
原先分区副本有1、2、3
输入以下计划
{ "version":1, "partitions":[ { "topic":"three","partition":0,"replicas":[2,1] }, { "topic":"three","partition":1,"replicas":[2,1] }, { "topic":"three","partition":2,"replicas":[1,2] }, { "topic":"three","partition":3,"replicas":[1,2] } ] }
执行计划
kafka-reassign-partitions.sh --bootstrap-server master:9092 --reassignment-json-file increase-replication-factor.json --execute
现分区副本 只有1、2
不能通过命令行的方式,只能执行计划
{ "version":1, "partitions": [ { "topic":"four","partition":1,"replicas":[3,1,2] }, { "topic":"four","partition":2,"replicas":[3,1,2] }, { "topic":"four","partition":3,"replicas":[3,1,2] } ] }
执行计划
通过工具可以查看,否则是序列化后的
kafka-run-class.sh kafka.tools.DumpLogSegments --files 00000000000000000000.log
两种消费方式:
pull 模式
pull 模式是kafka采用的,注意:如果kafka没有数据,那么consumer可能回陷入循环,一直返回空数据
push 模式
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.Properties; public class CustomConsumer { public static void main(String[] args) { //配置文件 Properties prop = new Properties(); //连接集群 prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.10.101:9092,192.168.10.102:9092"); //反序列化 prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); //配置groupid prop.put(ConsumerConfig.GROUP_ID_CONFIG,"test"); //创建一个消费者 KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(prop); //订阅主题 List<String> topics = new ArrayList<>(); topics.add("first"); kafkaConsumer.subscribe(topics); //消费数据 while (true){ ConsumerRecords<String, String> poll = kafkaConsumer.poll(Duration.ofSeconds(1)); for (ConsumerRecord<String, String> record : poll) { System.out.println("key:"+record.key()+"\t"+"value:"+record.value()); System.out.println(); } } } }
消费指定分区
List<TopicPartition> topics = new ArrayList<>();
topics.add(new TopicPartition("first",0));
kafkaConsumer.assign(topics);
//指定分区策略
prop.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RoundRobinAssignor.class.getName());
//自动提交offset
prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);
//设置时间间隔
prop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000);
手动提交
//手动提交offset
prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
//消费数据
while (true){
ConsumerRecords<String, String> poll = kafkaConsumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : poll) {
System.out.println("key:"+record.key()+"\t"+"value:"+record.value());
System.out.println();
}
//手动提交offset,在拉取完数据后,异步
kafkaConsumer.commitAsync();
//同步提交
kafkaConsumer.commitSync();
}
从指定offset开始
//在指定位置进行消费之前需要保证分区分配方案以及指定完毕
Set<TopicPartition> assignment = kafkaConsumer.assignment();
while (assignment.size()==0){
kafkaConsumer.poll(Duration.ofSeconds(1));
assignment = kafkaConsumer.assignment();
}
//指定位置进行消费
for (TopicPartition topicPartition : assignment) {
kafkaConsumer.seek(topicPartition,1000);
}
按照时间开始
//在指定位置进行消费之前需要保证分区分配方案以及指定完毕 Set<TopicPartition> assignment = kafkaConsumer.assignment(); while (assignment.size()==0){ kafkaConsumer.poll(Duration.ofSeconds(1)); assignment = kafkaConsumer.assignment(); } //把时间转换成offset Map<TopicPartition, Long> map = new HashMap<>(); //封装对应的集合 for (TopicPartition topicPartition : assignment) { map.put(topicPartition,System.currentTimeMillis()-1*24_3600*24); } Map<TopicPartition, OffsetAndTimestamp> offsetAndTimestampMap = kafkaConsumer.offsetsForTimes(map); //指定位置进行消费 for (TopicPartition topicPartition : assignment) { OffsetAndTimestamp timestamp = offsetAndTimestampMap.get(topicPartition); kafkaConsumer.seek(topicPartition,timestamp.offset()); }
在config目录下还有一个目录kraft
打开server.properties
其中process.roles 表示承担的角色,
node.id 和broke.id 类似
controller.quorum.voters 表示集群地址
表示对外端口
日志文件存储位置,其实也是数据存储位置
启动kafka
首先初始化
生成一个uuid
bin/kafka-storage.sh random-uuid
格式化
bin/kafka-storage.sh format -t mjWD2tpuRranR4SyjRPOXA -c /opt/soft/kafka2/config/kraft/server.properties
启动
bin/kafka-server-start.sh -daemon config/kraft/server.properties
import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import java.util.ArrayList; import java.util.List; import java.util.Properties; public class FlinkKafka { public static void main(String[] args) throws Exception { //获取环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(3); //准备数据源 List<String> list = new ArrayList<>(); list.add("hello"); list.add("world"); list.add("hadoop"); list.add("java"); list.add("flink"); DataStreamSource<String> streamSource = env.fromCollection(list); //创建一个kafka生产者 Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.10.101:9092,192.168.102:9092"); FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>("first", new SimpleStringSchema(), properties); //添加数据源 streamSource.addSink(kafkaProducer); //执行代码 env.execute(); } }
import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import java.util.Properties; public class FlinkKafkaConsumer1 { public static void main(String[] args) throws Exception { //获取环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); //创建一个kafka消费者 Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.10.101:9092,192.168.102:9092,192.168.103:9092"); properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test1"); FlinkKafkaConsumer<String> kafkaProducer = new FlinkKafkaConsumer<>("five", new SimpleStringSchema(), properties); //关联 env.addSource(kafkaProducer).print(); //执行代码 env.execute(); } }
生产者
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; @RestController public class KafkaController { @Autowired KafkaTemplate<String,String> kafka; @GetMapping("/kafka") public String data(String msg){ kafka.send("five",msg); return "ok"; } }
spring:
kafka:
bootstrap-servers: 192.168.10.101:9092,192.168.10.102:9092,192.168.10.103:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
group-id: test1
消费者
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.KafkaListeners;
@Configuration
public class KafkaConsumer {
@KafkaListener(topics = {"five"})
public void kafkaListener(String msg){
System.out.println("收到kafka消息:"+msg);
}
}
布丁味的博客:https://blog.csdn.net/wang5g/article/details/124777990
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。