当前位置:   article > 正文

Java连接kafka

at org.apache.kafka.clients.consumer.kafkaconsumer.(kafkaconsumer.java

1、maven依赖:

  1. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  2. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  3. <modelVersion>4.0.0</modelVersion>
  4. <groupId>com.xxx.test</groupId>
  5. <artifactId>xxx</artifactId>
  6. <version>1.0-SNAPSHOT</version>
  7. <inceptionYear>2008</inceptionYear>
  8. <properties>
  9. <scala.version>2.11.12</scala.version>
  10. <kafka.version>0.10.2.0</kafka.version>
  11. <!--<kafka.version>1.1.0</kafka.version>-->
  12. </properties>
  13. <repositories>
  14. <repository>
  15. <id>scala-tools.org</id>
  16. <name>Scala-Tools Maven2 Repository</name>
  17. <url>http://scala-tools.org/repo-releases</url>
  18. </repository>
  19. </repositories>
  20. <pluginRepositories>
  21. <pluginRepository>
  22. <id>scala-tools.org</id>
  23. <name>Scala-Tools Maven2 Repository</name>
  24. <url>http://scala-tools.org/repo-releases</url>
  25. </pluginRepository>
  26. </pluginRepositories>
  27. <dependencies>
  28. <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
  29. <dependency>
  30. <groupId>org.apache.kafka</groupId>
  31. <artifactId>kafka_2.12</artifactId>
  32. <version>${kafka.version}</version>
  33. </dependency>
  34. <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
  35. <dependency>
  36. <groupId>org.apache.kafka</groupId>
  37. <artifactId>kafka-clients</artifactId>
  38. <version>${kafka.version}</version>
  39. </dependency>
  40. </dependencies>
  41. <build>
  42. <sourceDirectory>src/main/scala</sourceDirectory>
  43. <testSourceDirectory>src/test/scala</testSourceDirectory>
  44. <plugins>
  45. <plugin>
  46. <groupId>org.scala-tools</groupId>
  47. <artifactId>maven-scala-plugin</artifactId>
  48. <executions>
  49. <execution>
  50. <goals>
  51. <goal>compile</goal>
  52. <goal>testCompile</goal>
  53. </goals>
  54. </execution>
  55. </executions>
  56. <configuration>
  57. <scalaVersion>${scala.version}</scalaVersion>
  58. <args>
  59. <arg>-target:jvm-1.8</arg>
  60. </args>
  61. </configuration>
  62. </plugin>
  63. <plugin>
  64. <groupId>org.apache.maven.plugins</groupId>
  65. <artifactId>maven-eclipse-plugin</artifactId>
  66. <configuration>
  67. <downloadSources>true</downloadSources>
  68. <buildcommands>
  69. <buildcommand>ch.epfl.lamp.sdt.core.scalabuilder</buildcommand>
  70. </buildcommands>
  71. <additionalProjectnatures>
  72. <projectnature>ch.epfl.lamp.sdt.core.scalanature</projectnature>
  73. </additionalProjectnatures>
  74. <classpathContainers>
  75. <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
  76. <classpathContainer>ch.epfl.lamp.sdt.launching.SCALA_CONTAINER</classpathContainer>
  77. </classpathContainers>
  78. </configuration>
  79. </plugin>
  80. <plugin>
  81. <artifactId>maven-assembly-plugin</artifactId>
  82. <configuration>
  83. <appendAssemblyId>false</appendAssemblyId>
  84. <descriptorRefs>
  85. <descriptorRef>jar-with-dependencies</descriptorRef>
  86. </descriptorRefs>
  87. <archive>
  88. <manifest>
  89. <!-- 此处指定main方法入口的class -->
  90. <mainClass></mainClass>
  91. </manifest>
  92. </archive>
  93. </configuration>
  94. <executions>
  95. <execution>
  96. <id>make-assembly</id>
  97. <phase>package</phase>
  98. <goals>
  99. <goal>assembly</goal>
  100. </goals>
  101. </execution>
  102. </executions>
  103. </plugin>
  104. </plugins>
  105. </build>
  106. <reporting>
  107. <plugins>
  108. <plugin>
  109. <groupId>org.scala-tools</groupId>
  110. <artifactId>maven-scala-plugin</artifactId>
  111. <configuration>
  112. <scalaVersion>${scala.version}</scalaVersion>
  113. </configuration>
  114. </plugin>
  115. </plugins>
  116. </reporting>
  117. </project>

  

2、Java代码:

  1. import org.apache.kafka.clients.consumer.ConsumerConfig;
  2. import org.apache.kafka.clients.consumer.ConsumerRecord;
  3. import org.apache.kafka.clients.consumer.ConsumerRecords;
  4. import org.apache.kafka.clients.consumer.KafkaConsumer;
  5. import java.util.Collections;
  6. import java.util.Properties;
  7. public class MyConsumer{
  8. private final static String TOPIC = "test_topic";
  9. private final static String KAFKA_SERVER_URL = "10.31.7.200";
  10. private final static String KAFKA_SERVER_PORT = "9092";
  11. public static final String HOST_NAME = KAFKA_SERVER_URL;
  12. public static KafkaConsumer<Integer, String> getConsumer() {
  13. KafkaConsumer<Integer, String> consumer;
  14. Properties props = new Properties();
  15. props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_SERVER_URL + ":" + KAFKA_SERVER_PORT);
  16. props.put(ConsumerConfig.GROUP_ID_CONFIG, "DemoConsumer");
  17. props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
  18. props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
  19. props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
  20. props.put("host.name",HOST_NAME);
  21. props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
  22. props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
  23. // props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer");
  24. // props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
  25. // props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
  26. consumer = new KafkaConsumer<Integer, String>(props);
  27. return consumer;
  28. }
  29. public static void main(String[] args) {
  30. KafkaConsumer<Integer, String> consumer = getConsumer();
  31. consumer.subscribe(Collections.singletonList(TOPIC));
  32. ConsumerRecords<Integer, String> records ;
  33. records = consumer.poll(1000);
  34. for (ConsumerRecord<Integer, String> record : records) {
  35. System.out.println("Received message: (" + record.key() + ", " + record.value() + ") at offset " + record.offset());
  36. }
  37. }
  38. }

 3、可能的报错:

  需要设置 props.put("host.name",ip);
  org.apache.kafka.common.errors.TimeoutException

 

这个报错需要检查k/v的序列化类,要求序列化类是org.apache.kafka.common.serialization.Deserializer的子类
Exception in thread "main" org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition event_topic-0 at offset 161529729
Caused by: org.apache.kafka.common.errors.SerializationException: Size of data received by IntegerDeserializer is not 4

1、需要设置 value.deserializer 的值
Exception in thread "main" org.apache.kafka.common.config.ConfigException: Missing required configuration "value.deserializer" which has no default value.
at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:436)
at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:56)
at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:63)
at org.apache.kafka.clients.consumer.ConsumerConfig.<init>(ConsumerConfig.java:426)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:597)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:579)
at com.fumi.test.MyConsumer2.get(MyConsumer2.java:46)
at com.fumi.test.MyConsumer2.main(MyConsumer2.java:56)

2、k/v的 序列化类 org.apache.kafka.connect.json.JsonConverter 不是 org.apache.kafka.common.serialization.Deserializer 的子类,所以报错了。
Exception in thread "main" org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:717)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:597)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:579)
at com.fumi.test.MyConsumer2.get(MyConsumer2.java:46)
at com.fumi.test.MyConsumer2.main(MyConsumer2.java:56)
Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.connect.json.JsonConverter is not an instance of org.apache.kafka.common.serialization.Deserializer
at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:205)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:645)
... 4 more


这个报错也是类似
Exception in thread "main" org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:717)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:597)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:579)
at com.fumi.test.MyConsumer2.get(MyConsumer2.java:46)
at com.fumi.test.MyConsumer2.main(MyConsumer2.java:56)
Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.serialization.StringSerializer is not an instance of org.apache.kafka.common.serialization.Deserializer
at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:205)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:637)
... 4 more

 

转载于:https://www.cnblogs.com/fillPv/p/9258757.html

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家小花儿/article/detail/958861
推荐阅读
相关标签
  

闽ICP备14008679号