当前位置:   article > 正文

kafka接口操作topic_接口包括topic

接口包括topic

翻译主要的创建参数备忘,忒TM烦人的多

  1. import com.google.common.collect.ImmutableList;
  2. import org.apache.kafka.clients.admin.*;
  3. import org.apache.kafka.common.KafkaFuture;
  4. import org.apache.kafka.common.config.TopicConfig;
  5. import org.apache.log4j.Logger;
  6. import java.util.*;
  7. import java.util.concurrent.ExecutionException;
  8. public class KafkaClient implements AutoCloseable{
  9. private KafkaAdminClient client;
  10. private int timeout=6000;
  11. private final static Logger logger=Logger.getLogger(KafkaClient.class);
  12. public KafkaClient(KafkaSettings settings){
  13. Map<String,Object> map=new HashMap<>();
  14. String[] hosts=settings.getArray(KafkaSettings.PREFIX_ALL,AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,",");
  15. map.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, Arrays.asList(hosts));
  16. client=(KafkaAdminClient)KafkaAdminClient.create(map);
  17. if (logger.isInfoEnabled()){
  18. logger.info("Kafka集群已成功连接");
  19. }
  20. }
  21. public KafkaClient(String[] hosts){
  22. Map<String,Object> map=new HashMap<>();
  23. map.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, Arrays.asList(hosts));
  24. client=(KafkaAdminClient)KafkaAdminClient.create(map);
  25. if (logger.isInfoEnabled()){
  26. logger.info("Kafka集群已成功连接");
  27. }
  28. }
  29. /**
  30. * 创建一个新的topic
  31. * @param name topic名
  32. * @param numPartitions 分区数
  33. * @param replicationFactor 副本数
  34. * @return 是否创建成功
  35. * @throws
  36. */
  37. public boolean createTopic(String name, int numPartitions, short replicationFactor) throws ExecutionException, InterruptedException {
  38. Map<String,String> topicConfig=new HashMap<>();
  39. /** 旧日志段的保留测率,删除或压缩,此时选择删除 */
  40. topicConfig.put(TopicConfig.CLEANUP_POLICY_CONFIG,TopicConfig.CLEANUP_POLICY_DELETE);
  41. /** 过期数据的压缩方式,如果上面选项为压缩的话才有效 */
  42. //topicConfig.put(TopicConfig.COMPRESSION_TYPE_CONFIG,"snappy");
  43. /**
  44. * The amount of time to retain delete tombstone markers for log compacted topics.
  45. * This setting also gives a bound on the time in which a consumer must complete a
  46. * read if they begin from offset 0 to ensure that they get a valid snapshot of the
  47. * final stage (otherwise delete tombstones may be collected before they complete their scan).
  48. * 默认1天
  49. * */
  50. topicConfig.put(TopicConfig.DELETE_RETENTION_MS_CONFIG,"86400000");
  51. /** 文件在文件系统上被删除前的保留时间,默认为60秒 */
  52. topicConfig.put(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG,"60000");
  53. /** 将数据强制刷入日志的条数间隔 */
  54. //topicConfig.put(TopicConfig.FLUSH_MESSAGES_INTERVAL_CONFIG,"9223372036854775807");
  55. /** 将数据强制刷入日志的时间间隔 */
  56. //topicConfig.put(TopicConfig.FLUSH_MS_CONFIG,"9223372036854775807");
  57. /** offset设置 */
  58. //topicConfig.put(TopicConfig.INDEX_INTERVAL_BYTES_CONFIG,"4096");
  59. /** 每个批量消息最大字节数 */
  60. //topicConfig.put(TopicConfig.MAX_MESSAGE_BYTES_CONFIG,"1000012");
  61. /** 记录标记时间与kafka本机时间允许的最大间隔,超过此值的将被拒绝 */
  62. //topicConfig.put(TopicConfig.MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG,"9223372036854775807");
  63. /** 标记时间类型,是创建时间还是日志时间 CreateTime/LogAppendTime */
  64. //topicConfig.put(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG,"CreateTime");
  65. /** 如果日志压缩设置为可用的话,设置日志压缩器清理日志的频率。默认情况下,压缩比率超过50%时会避免清理日志。
  66. 此比率限制重复日志浪费的最大空间,设置为50%,意味着最多50%的日志是重复的。更高的比率设置意味着更少、更高效
  67. 的清理,但会浪费更多的磁盘空间。*/
  68. //topicConfig.put(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG,"0.5");
  69. /** 消息在日志中保持未压缩状态的最短时间,只对已压缩的日志有效 */
  70. //topicConfig.put(TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG,"0");
  71. /** 当一个producer的ack设置为all(或者-1)时,此项设置的意思是认为新记录写入成功时需要的最少副本写入成功数量。
  72. 如果此最小数量没有达到,则producer抛出一个异常(NotEnoughReplicas 或者NotEnoughReplicasAfterAppend)。
  73. 你可以同时使用min.insync.replicas 和ack来加强数据持久话的保障。一个典型的情况是把一个topic的副本数量设置为3,
  74. min.insync.replicas的数量设置为2,producer的ack模式设置为all,这样当没有足够的副本没有写入数据时,producer会抛出一个异常。*/
  75. topicConfig.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG,"1");
  76. /** 如果设置为true,会在新日志段创建时预分配磁盘空间 */
  77. topicConfig.put(TopicConfig.PREALLOCATE_CONFIG,"true");
  78. /** 当保留策略为删除(delete)时,此设置控制在删除就日志段来清理磁盘空间前,保存日志段的partition能增长到的最大尺寸。
  79. * 默认情况下没有尺寸大小限制,只有时间限制。。由于此项指定的是partition层次的限制,它的数量乘以分区数才是topic层面保留的数量。 */
  80. // topicConfig.put(TopicConfig.RETENTION_BYTES_CONFIG,"-1");
  81. /**
  82. * 当保留策略为删除(delete)时,此设置用于控制删除旧日志段以清理磁盘空间前,日志保留的最长时间。默认为7天。
  83. * 这是consumer在多久内必须读取数据的一个服务等级协议(SLA)。
  84. * */
  85. topicConfig.put(TopicConfig.RETENTION_MS_CONFIG,"604800000");
  86. /**
  87. * 此项用于控制日志段的大小,日志的清理和持久话总是同时发生,所以大的日志段代表更少的文件数量和更小的操作粒度。
  88. * */
  89. topicConfig.put(TopicConfig.SEGMENT_BYTES_CONFIG,"1073741824");
  90. /**
  91. * 此项用于控制映射数据记录offsets到文件位置的索引的大小。我们会给索引文件预先分配空间,然后在日志滚动时收缩它。
  92. * 一般情况下你不需要改动这个设置。
  93. * */
  94. //topicConfig.put(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG,"10485760");
  95. /** 从预订的段滚动时间中减去最大的随机抖动,避免段滚动时的惊群(thundering herds) */
  96. //topicConfig.put(TopicConfig.SEGMENT_JITTER_MS_CONFIG,"0");
  97. /** 此项用户控制kafka强制日志滚动时间,在此时间后,即使段文件没有满,也会强制滚动,以保证持久化操作能删除或压缩就数据。默认7天 */
  98. topicConfig.put(TopicConfig.SEGMENT_MS_CONFIG,"604800000");
  99. /**
  100. * 是否把一个不在isr中的副本被选举为leader作为最后手段,即使这样做会带来数据损失
  101. * */
  102. topicConfig.put(TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG,"false");
  103. NewTopic newTopic=new NewTopic(name,numPartitions,replicationFactor);
  104. newTopic.configs(topicConfig);
  105. CreateTopicsOptions options=new CreateTopicsOptions();
  106. options.timeoutMs(timeout);
  107. CreateTopicsResult result=client.createTopics(ImmutableList.of(newTopic),options);
  108. for(Map.Entry<String,KafkaFuture<Void>> e : result.values().entrySet()){
  109. KafkaFuture<Void> future= e.getValue();
  110. future.get();
  111. boolean success=!future.isCompletedExceptionally();
  112. if(logger.isInfoEnabled()&&success){
  113. logger.info("已成功创建Kafka topic "+name+" ,分区 "+numPartitions+" ,副本 "+replicationFactor);
  114. }
  115. return success;
  116. }
  117. return false;
  118. }
  119. /**
  120. * 当topic不存在时,主动创建
  121. * @param name topic名
  122. * @param numPartitions 分区数
  123. * @param replicationFactor 副本数
  124. * @return 是否可以使用此topic,如果为true,可能是新创建或已存在
  125. * @throws ExecutionException
  126. * @throws InterruptedException
  127. */
  128. public boolean createTopicIfNotExists(String name, int numPartitions, short replicationFactor) throws ExecutionException, InterruptedException {
  129. if(!listTopics().contains(name)){
  130. return createTopic(name,numPartitions,replicationFactor);
  131. }
  132. if(logger.isInfoEnabled()){
  133. logger.info("Kafka topic "+name+" 已存在");
  134. }
  135. return true;
  136. }
  137. /**
  138. * 列出所有非内部topic
  139. * @return
  140. * @throws ExecutionException
  141. * @throws InterruptedException
  142. */
  143. public Set<String> listTopics() throws ExecutionException, InterruptedException {
  144. ListTopicsOptions options=new ListTopicsOptions();
  145. //设置超时时间
  146. options.timeoutMs(timeout);
  147. //不列出kafka内部topic
  148. options.listInternal(false);
  149. ListTopicsResult result=client.listTopics(options);
  150. Set<String> topics= result.names().get();
  151. if(logger.isDebugEnabled()){
  152. logger.debug("发现"+topics.size()+"个Kafka topic : "+topics.toString());
  153. }
  154. return topics;
  155. }
  156. /**
  157. * 检查topic是否存在
  158. * @param topics 待检查的topic
  159. * @return topic是否存在
  160. * @throws ExecutionException
  161. * @throws InterruptedException
  162. */
  163. public List<PairObject<String,Boolean>> checkExists(String[] topics) throws ExecutionException, InterruptedException {
  164. Set<String> topicSet= listTopics();
  165. List<PairObject<String,Boolean>> exists=new ArrayList<>();
  166. for (String topic :topics){
  167. exists.add(new PairObject<>(topic,topicSet.contains(topic)));
  168. }
  169. return exists;
  170. }
  171. /**
  172. * 删除指定topic(如果broker那没有设置允许删除topic的话,此调用会持续等待最终超时返回)
  173. * @param topics 待删除的topic
  174. * @return 删除是否成功
  175. * @throws ExecutionException
  176. * @throws InterruptedException
  177. */
  178. public List<PairObject<String,Boolean>> deleteTopic(String[] topics) throws ExecutionException, InterruptedException {
  179. DeleteTopicsOptions options=new DeleteTopicsOptions();
  180. options.timeoutMs(timeout);
  181. DeleteTopicsResult deleteTopicsResult=client.deleteTopics(Arrays.asList(topics),options);
  182. List<PairObject<String,Boolean>> result=new ArrayList<>();
  183. for(Map.Entry<String,KafkaFuture<Void>> e : deleteTopicsResult.values().entrySet()){
  184. String topic=e.getKey();
  185. KafkaFuture<Void> future=e.getValue();
  186. future.get();
  187. result.add(new PairObject<>(topic,!future.isCompletedExceptionally()));
  188. }
  189. return result;
  190. }
  191. @Override
  192. public void close() throws Exception {
  193. if (client!=null){
  194. client.close();
  195. }
  196. }
  197. public void setTimeout(int timeout) {
  198. this.timeout = timeout;
  199. }
  200. }


声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/知新_RL/article/detail/894047
推荐阅读
相关标签
  

闽ICP备14008679号