赞
踩
借鉴:https://blog.csdn.net/hetry_liang/article/details/110938156
https://blog.csdn.net/u013433591/article/details/128475325
1.生产者代码:
package com.njbdqn.mykafka;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class MyProducer {
public static void main(String[] args) {
Properties prop = new Properties();
//准备服务器参数
prop.put("bootstrap.servers","192.168.153.200:9092");
prop.put("acks","all");
prop.put("retries","0");
prop.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
prop.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String,String> producer = new KafkaProducer<>(prop);
for (int i = 0; i < 10; i++) {
//循环向kafka中生产10条数据
ProducerRecord<String, String> rec = new ProducerRecord<>("msg01", ""+i, "test"+i);
//同步发送数据
producer.send(rec);
}
//关闭资源,kafka才能接收到信息
producer.close();
}
}
2.消费者代码:
package com.njbdqn.mykafka;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
public class MyConsumer {
public static void main(String[] args) {
Properties prop = new Properties();
prop.put("bootstrap.servers","192.168.153.200:9092");
prop.put("group.id","henry5");
prop.put("enable.auto.commit",true);
prop.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
prop.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
//从消息队列的第一条数据开始拉取
prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
KafkaConsumer<String,String> consumer = new KafkaConsumer<>(prop);
//拉取所有topic中的数据
consumer.subscribe(Arrays.asList("msg01"));
while (true){
//每隔1秒拉取一行数据
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
if(!records.isEmpty()){
for(ConsumerRecord<String,String> rec : records){
System.out.println(rec.value());
}
}
}
}
}
3.多线程消费代码:(实现Runnable接口的消费类,实例中消费的topic或者分区不同)
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
public class MultiThreadConsumer {
public static void main(String[] args) {
String brokers = "localhost:9092";
String topic = "topic_t40";
String groupId = "app_q";
int consumers = 2;
for(int i = 0;i < consumers;i++){//也可以线程池
final ConsumerRunnable consumer = new ConsumerRunnable(brokers, groupId, topic, "thread" + i);
new Thread(consumer).start();
}
}
static class ConsumerRunnable implements Runnable{
private final KafkaConsumer<String,String> consumer;
private volatile boolean isRunning = true;
private String threadName ;
public ConsumerRunnable(String brokers,String groupId,String topic,String threadName) {
Properties props = new Properties();
props.put("bootstrap.servers", brokers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic));
this.threadName = threadName;
}
@Override
public void run() {
try {
while (isRunning) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
records.forEach(record -> {
System.out.println(this.threadName + " : Message received " + record.value() + ", partition " + record.partition());
});
}
}finally {
consumer.close();
}
}
}
}
1.集群下生产者代码中的Properties:
// 生产者
Properties properties = new Properties();
// 连接集群 bootstrap.servers
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.25.129:9092,192.168.25.128:9092");
// 指定对应的key和value的序列化类型 key.serializer
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
2.生产者发送数据的协议
是自行设计基于TCP之上的二进制协议
Kafka:单机写入百万/s、不支持定时消息、不支持分布式事务、使用短轮询(实时性差一点)
RocketMQ:单机写入7万/s、支持定时消息、分布式事务、使用长轮询
C ( Consistency )表示强一致性
A ( Availability )表示可用性
P ( Partition Tolerance )表示分区容错性
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。