赞
踩
- <!--导入kafka客户端依赖-->
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka-clients</artifactId>
- <version>2.4.1</version>
- </dependency>
注意:序列化&反序列化,可参考使用第三方库,本例只用基础的数据类型作为k/v
-
-
- // 公共参数
- private static String bootServer = "kafka1:9092,kafka2:9092,kafka3:9092";
- private static String TOPIC_NAME = "kafkaQQRun";
- private static String CONSUMER_GROUP_NAME = "consumer1";
-
- public void log(String fmt, Object... objs) {
- if (objs != null && objs.length > 0) {
- String s = String.format(fmt, objs);
- System.out.println(s);
- } else {
- System.out.println(fmt);
- }
- }
-
- @Test
- public void producer1() throws ExecutionException, InterruptedException {
-
- // 设置参数
- Properties props = new Properties();
- // 指定服务器配置【ip:端口】
- props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootServer);
-
- // key/value 序列化
- props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
- props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
-
- // 创建生产消息的客户端,传入参数
- Producer<Long, String> producer = new KafkaProducer<Long, String>(props);
-
- for (int i = 0; i < 100; i++) {
- // 创建消息;key:作用是决定了往哪个分区上发,value:具体要发送的消息内容
- ProducerRecord<Long, String> message = new ProducerRecord<>(
- TOPIC_NAME, (long) i, "helloKafka");
-
- // 同步发送消息
- RecordMetadata metadata = producer.send(message).get();
- log("send sync:topic:%s, partition:%d, key:%d, value:%s",
- metadata.topic(), metadata.partition(), metadata.offset(),
- message.key(), message.value());
- }
-
- for (int i = 0; i < 10000; i++) {
- ProducerRecord<Long, String> message = new ProducerRecord<>(TOPIC_NAME, 10000000l + i, "v2");
- producer.send(message, new Callback() {
- @Override
- public void onCompletion(RecordMetadata recordMetadata, Exception e) {
- log("send async complete , topic:%s, partition:%d, offset:%d, key:%d, value:%s",
- recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset(),
- message.key(), message.value());
- }
- });
- }
-
- Thread.currentThread().join();
- }

- @Test
- public void consumer1() {
-
- // 设置配置
- Properties props = new Properties();
- props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootServer);
-
- // 消费分组名
- props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_NAME);
- // key/value反序列化
- props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
- props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- // 是否自动提交offset, default:true
- props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
-
- // 创建一个消费者
- KafkaConsumer<Long, String> consumer = new KafkaConsumer<Long, String>(props);
- // 消费者订阅主题
- consumer.subscribe(Arrays.asList(TOPIC_NAME));
-
- while (true) {
- // poll() 长轮询拉取消息的
- ConsumerRecords<Long, String> messages = consumer.poll(Duration.ofMillis(1000));
- for (ConsumerRecord<Long, String> r : messages) {
- log("receive:partition:%d, offset:%d, key:%d, value:%s", r.partition(), r.offset(), r.key(), r.value());
- }
-
- int count = messages.count();
- // 阻塞手动提交
- if (count > 0) {
- // consumer.commitAsync();
- // log("commitAsync finish records:%d", count);
-
- // 异步提交
- consumer.commitAsync(new OffsetCommitCallback() {
- @Override
- public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
- for (TopicPartition p : offsets.keySet()) {
- OffsetAndMetadata d = offsets.get(p);
- log("commitAsync complete, topic:%s, partition:%d, offset:%d", p.topic(), p.partition(), d.offset());
- }
- log("commitAsync complete records:%d", count);
- }
- });
- }
- }
- }

Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。