赞
踩
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.12</artifactId> <version>2.5.0</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.5.0</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> <version>2.5.0</version> </dependency>
package com.tyhh.test; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import java.util.Properties; import java.util.concurrent.Future; /** * @author: * @version: v1.0 * @description: * @date: **/ public class KafkaProducerTest { public static void main(String[] args) { Properties props = new Properties(); String topic = "my-topic"; //连接地址 props.put("bootstrap.servers", "node1:9092,node2:9092,node3:9092"); props.put("acks", "all");//所有副本写入该消息才算成功 props.put("retries", 0);//retries=MAX 无限尝试 props.put("batch.size", 16384);//默认批量处理消息字节数 props.put("linger.ms", 1);//延时1ms发送 props.put("buffer.memory", 33554432);//缓冲区大小 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");//序列化 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");//序列化 KafkaProducer<String, String> producer = new KafkaProducer<>(props); for (int i = 0; i < 10; i++) { ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, "topic_key_" + i, "topic_value_" + i); Future<RecordMetadata> metadataFuture = producer.send(record); RecordMetadata recordMetadata = null; try { recordMetadata = metadataFuture.get(); System.out.println("发送成功!"); System.out.println("topic:" + recordMetadata.topic()); System.out.println("partition:" + recordMetadata.partition()); System.out.println("offset:" + recordMetadata.offset()); } catch (Exception e) { System.out.println("发送失败!"); e.printStackTrace(); } } producer.flush(); producer.close(); } }
package com.tyhh.test; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import java.util.Properties; import java.util.concurrent.Future; /** * @author: * @version: v1.0 * @description: * @date: **/ public class KafkaProducerTest { public static void main(String[] args) { Properties props = new Properties(); String user = "admin"; String password = "admin"; String topic = "my-topic"; //连接地址 props.put("bootstrap.servers", "node1:9092,node2:9092,node3:9092"); props.put("acks", "all");//所有副本写入该消息才算成功 props.put("retries", 0);//retries=MAX 无限尝试 props.put("batch.size", 16384);//默认批量处理消息字节数 props.put("linger.ms", 1);//延时1ms发送 props.put("buffer.memory", 33554432);//缓冲区大小 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");//序列化 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");//序列化 //ssl加密和认证 properties.put("security.protocol", "SASL_PLAINTEXT"); properties.put("sasl.mechanism", "PLAIN"); properties.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule " + "required username=\"" + user + "\" password=\"" + password + "\";"); KafkaProducer<String, String> producer = new KafkaProducer<>(props); for (int i = 0; i < 10; i++) { ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, "topic_key_" + i, "topic_value_" + i); Future<RecordMetadata> metadataFuture = producer.send(record); RecordMetadata recordMetadata = null; try { recordMetadata = metadataFuture.get(); System.out.println("发送成功!"); System.out.println("topic:" + recordMetadata.topic()); System.out.println("partition:" + recordMetadata.partition()); System.out.println("offset:" + recordMetadata.offset()); } catch (Exception e) { System.out.println("发送失败!"); e.printStackTrace(); } } producer.flush(); producer.close(); } }
kafka_client_jaas_plain配置文件信息:
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin";
};
具体代码实现:
package com.tyhh.test; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import java.util.Properties; import java.util.concurrent.Future; /** * @author: * @version: v1.0 * @description: * @date: **/ public class KafkaProducerTest { public static void main(String[] args) { //使用配置文件方式进行ssl认证 System.setProperty("java.security.auth.login.config","./kafka_ssl_conf/kafka_client_jaas_plain.conf"); Properties props = new Properties(); String topic = "my-topic"; //连接地址 props.put("bootstrap.servers", "node1:9092,node2:9092,node3:9092"); props.put("acks", "all");//所有副本写入该消息才算成功 props.put("retries", 0);//retries=MAX 无限尝试 props.put("batch.size", 16384);//默认批量处理消息字节数 props.put("linger.ms", 1);//延时1ms发送 props.put("buffer.memory", 33554432);//缓冲区大小 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");//序列化 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");//序列化 KafkaProducer<String, String> producer = new KafkaProducer<>(props); for (int i = 0; i < 10; i++) { ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, "topic_key_" + i, "topic_value_" + i); Future<RecordMetadata> metadataFuture = producer.send(record); RecordMetadata recordMetadata = null; try { recordMetadata = metadataFuture.get(); System.out.println("发送成功!"); System.out.println("topic:" + recordMetadata.topic()); System.out.println("partition:" + recordMetadata.partition()); System.out.println("offset:" + recordMetadata.offset()); } catch (Exception e) { System.out.println("发送失败!"); e.printStackTrace(); } } producer.flush(); producer.close(); } }
package com.tyhh.test; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import java.util.Properties; import java.util.concurrent.Future; /** * @author: * @version: v1.0 * @description: * @date: **/ public class KafkaProducerTest { public static void main(String[] args) { Properties props = new Properties(); String user = "admin"; String password = "admin"; String topic = "my-topic"; //连接地址 props.put("bootstrap.servers", "node1:9092,node2:9092,node3:9092"); props.put("acks", "all");//所有副本写入该消息才算成功 props.put("retries", 0);//retries=MAX 无限尝试 props.put("batch.size", 16384);//默认批量处理消息字节数 props.put("linger.ms", 1);//延时1ms发送 props.put("buffer.memory", 33554432);//缓冲区大小 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");//序列化 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");//序列化 //ssl加密和认证 props.put("security.protocol", "SASL_PLAINTEXT"); props.put("sasl.mechanism", "SCRAM-SHA-512"); props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule " + "required username=\"" + user + "\" password=\"" + password + "\";"); KafkaProducer<String, String> producer = new KafkaProducer<>(props); for (int i = 0; i < 10; i++) { ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, "topic_key_" + i, "topic_value_" + i); Future<RecordMetadata> metadataFuture = producer.send(record); RecordMetadata recordMetadata = null; try { recordMetadata = metadataFuture.get(); System.out.println("发送成功!"); System.out.println("topic:" + recordMetadata.topic()); System.out.println("partition:" + recordMetadata.partition()); System.out.println("offset:" + recordMetadata.offset()); } catch (Exception e) { System.out.println("发送失败!"); e.printStackTrace(); } } producer.flush(); producer.close(); } }
kafka_client_jaas_scram配置文件信息:
KafkaClient {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="admin"
password="admin";
};
具体代码实现:
package com.tyhh.test; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import java.util.Properties; import java.util.concurrent.Future; /** * @author: * @version: v1.0 * @description: * @date: **/ public class KafkaProducerTest { public static void main(String[] args) { //使用配置文件方式进行ssl认证 System.setProperty("java.security.auth.login.config","./kafka_ssl_conf/kafka_client_jaas_scram.conf"); Properties props = new Properties(); String topic = "my-topic"; //连接地址 props.put("bootstrap.servers", "node1:9092,node2:9092,node3:9092"); props.put("acks", "all");//所有副本写入该消息才算成功 props.put("retries", 0);//retries=MAX 无限尝试 props.put("batch.size", 16384);//默认批量处理消息字节数 props.put("linger.ms", 1);//延时1ms发送 props.put("buffer.memory", 33554432);//缓冲区大小 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");//序列化 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");//序列化 KafkaProducer<String, String> producer = new KafkaProducer<>(props); for (int i = 0; i < 10; i++) { ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, "topic_key_" + i, "topic_value_" + i); Future<RecordMetadata> metadataFuture = producer.send(record); RecordMetadata recordMetadata = null; try { recordMetadata = metadataFuture.get(); System.out.println("发送成功!"); System.out.println("topic:" + recordMetadata.topic()); System.out.println("partition:" + recordMetadata.partition()); System.out.println("offset:" + recordMetadata.offset()); } catch (Exception e) { System.out.println("发送失败!"); e.printStackTrace(); } } producer.flush(); producer.close(); } }
package com.tyhh.test; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.Collections; import java.util.Properties; /** * @author: * @version: v1.0 * @description: * @date: **/ public class KafkaConsumerTest { public static void main(String[] args) { String topic = "my-topic"; String groupId = "my-group"; String autoCommit = "true"; String offsetReset = "earliest"; Properties props = new Properties(); props.put("bootstrap.servers", "node1:9092,node2:9092,node3:9092"); props.put("group.id", groupId); //是否自动提交偏移量 props.put("enable.auto.commit", autoCommit); props.put("auto.offset.reset", offsetReset); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); //序列化方式 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); consumer.subscribe(Collections.singletonList(topic)); while (true) { ConsumerRecords<String, String> records = consumer.poll(1000L); for (ConsumerRecord<String, String> record : records) { System.out.printf("partition= %d, offset = %d, key = %s, value = %s\n", record.partition(), record.offset(), record.key(), record.value()); } } } }
package com.tyhh.test; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.Collections; import java.util.Properties; /** * @author: * @version: v1.0 * @description: * @date: **/ public class KafkaConsumerTest { public static void main(String[] args) { String user = "admin"; String password = "admin"; String topic = "my-topic"; String groupId = "my-group"; String autoCommit = "true"; String offsetReset = "earliest"; Properties props = new Properties(); props.put("bootstrap.servers", "node1:9092,node2:9092,node3:9092"); props.put("group.id", groupId); //是否自动提交偏移量 props.put("enable.auto.commit", autoCommit); props.put("auto.offset.reset", offsetReset); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); //序列化方式 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //ssl加密和认证 properties.put("security.protocol", "SASL_PLAINTEXT"); properties.put("sasl.mechanism", "PLAIN"); properties.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule " + "required username=\"" + user + "\" password=\"" + password + "\";"); KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); consumer.subscribe(Collections.singletonList(topic)); while (true) { ConsumerRecords<String, String> records = consumer.poll(1000L); for (ConsumerRecord<String, String> record : records) { System.out.printf("partition= %d, offset = %d, key = %s, value = %s\n", record.partition(), record.offset(), record.key(), record.value()); } } } }
kafka_client_jaas_plain配置文件信息:
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin";
};
具体代码实现:
package com.tyhh.test; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.Collections; import java.util.Properties; /** * @author: * @version: v1.0 * @description: * @date: **/ public class KafkaConsumerTest { public static void main(String[] args) { //使用配置文件方式进行ssl认证 System.setProperty("java.security.auth.login.config","./kafka_ssl_conf/kafka_client_jaas_plain.conf"); String topic = "my-topic"; String groupId = "my-group"; String autoCommit = "true"; String offsetReset = "earliest"; Properties props = new Properties(); props.put("bootstrap.servers", "node1:9092,node2:9092,node3:9092"); props.put("group.id", groupId); //是否自动提交偏移量 props.put("enable.auto.commit", autoCommit); props.put("auto.offset.reset", offsetReset); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); //序列化方式 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); consumer.subscribe(Collections.singletonList(topic)); while (true) { ConsumerRecords<String, String> records = consumer.poll(1000L); for (ConsumerRecord<String, String> record : records) { System.out.printf("partition= %d, offset = %d, key = %s, value = %s\n", record.partition(), record.offset(), record.key(), record.value()); } } } }
package com.tyhh.test; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.Collections; import java.util.Properties; /** * @author: * @version: v1.0 * @description: * @date: **/ public class KafkaConsumerTest { public static void main(String[] args) { String user = "admin"; String password = "admin"; String topic = "my-topic"; String groupId = "my-group"; String autoCommit = "true"; String offsetReset = "earliest"; Properties props = new Properties(); props.put("bootstrap.servers", "node1:9092,node2:9092,node3:9092"); props.put("group.id", groupId); //是否自动提交偏移量 props.put("enable.auto.commit", autoCommit); props.put("auto.offset.reset", offsetReset); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); //序列化方式 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //ssl加密和认证 props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("security.protocol", "SASL_PLAINTEXT"); props.put("sasl.mechanism", "SCRAM-SHA-512"); props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username='"+user+"' password='"+password+"';"); KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); consumer.subscribe(Collections.singletonList(topic)); while (true) { ConsumerRecords<String, String> records = consumer.poll(1000L); for (ConsumerRecord<String, String> record : records) { System.out.printf("partition= %d, offset = %d, key = %s, value = %s\n", record.partition(), record.offset(), record.key(), record.value()); } } } }
kafka_client_jaas_scram配置文件信息:
KafkaClient {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="admin"
password="admin";
};
具体代码实现:
package com.tyhh.test; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.Collections; import java.util.Properties; /** * @author: * @version: v1.0 * @description: * @date: **/ public class KafkaConsumerTest { public static void main(String[] args) { //使用配置文件方式进行ssl认证 System.setProperty("java.security.auth.login.config","./kafka_ssl_conf/kafka_client_jaas_scram.conf"); String topic = "my-topic"; String groupId = "my-group"; String autoCommit = "true"; String offsetReset = "earliest"; Properties props = new Properties(); props.put("bootstrap.servers", "node1:9092,node2:9092,node3:9092"); props.put("group.id", groupId); //是否自动提交偏移量 props.put("enable.auto.commit", autoCommit); props.put("auto.offset.reset", offsetReset); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); //序列化方式 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); consumer.subscribe(Collections.singletonList(topic)); while (true) { ConsumerRecords<String, String> records = consumer.poll(1000L); for (ConsumerRecord<String, String> record : records) { System.out.printf("partition= %d, offset = %d, key = %s, value = %s\n", record.partition(), record.offset(), record.key(), record.value()); } } } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。