" + metadata.topic() + " 偏移量为:" + metadata.offset());System.out.println("消息发送失败 " + exception.getMessage());在target下的classes下添加生产者配置文件client.properties,ssl密钥的路径按照你自己的路径写。clickhouse连接带有sasl_ssl的kafka的配置参考我的另一篇。// 创建生产者对象。_java">
赞
踩
clickhouse连接带有sasl_ssl的kafka的配置参考我的另一篇
一.Java配置
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import java.io.IOException; import java.util.Properties; public class kafka_producer { public static void main(String[] args) throws IOException { // 创建配置类 Properties properties = new Properties(); // 加载生产者配置文件 System.out.println("222"); properties.load(kafka_producer.class.getClassLoader().getResourceAsStream("client.properties")); // 创建生产者对象 System.out.println("333"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put("bootstrap.servers", "172.29.128.71:9093"); KafkaProducer<String, String> producer = new KafkaProducer<>(properties); ProducerRecord<String, String> producerRecord = new ProducerRecord<>("sasl_ssl", "javatest", "java2"); producer.send(producerRecord, (metadata, exception) -> { if (exception == null) { System.out.println("消息发送至 --> " + metadata.topic() + " 偏移量为:" + metadata.offset()); } else { System.out.println("消息发送失败 " + exception.getMessage()); } }); producer.close(); } }
在target下的classes下添加生产者配置文件client.properties,ssl密钥的路径按照你自己的路径写
security.protocol=SASL_SSL sasl.mechanism=PLAIN ssl.truststore.location=C://Users//22160//Desktop//PROJECT_JAVA_2023-04-06//untitled//src//conf//server.truststore.jks ssl.truststore.password=123456 ssl.keystore.location=C://Users//22160//Desktop//PROJECT_JAVA_2023-04-06//untitled//src//conf//server.keystore.jks ssl.keystore.password=123456 ssl.key.password=123456 sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-secret"; ssl.endpoint.identification.algorithm=
运行成功
查看是否消费到数据
消费成功
查看clickhouse里的物化视图是否收到数据
数据成功写到click house
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。