赞
踩
@Override public ListTopicsResult listTopics(final ListTopicsOptions options) { final KafkaFutureImpl<Map<String, TopicListing>> topicListingFuture = new KafkaFutureImpl<>(); final long now = time.milliseconds(); runnable.call(new Call("listTopics", calcDeadlineMs(now, options.timeoutMs()), new LeastLoadedNodeProvider()) { @Override MetadataRequest.Builder createRequest(int timeoutMs) { return MetadataRequest.Builder.allTopics(); } @Override void handleResponse(AbstractResponse abstractResponse) { MetadataResponse response = (MetadataResponse) abstractResponse; Map<String, TopicListing> topicListing = new HashMap<>(); for (MetadataResponse.TopicMetadata topicMetadata : response.topicMetadata()) { String topicName = topicMetadata.topic(); boolean isInternal = topicMetadata.isInternal(); if (!topicMetadata.isInternal() || options.shouldListInternal()) topicListing.put(topicName, new TopicListing(topicName, isInternal)); } topicListingFuture.complete(topicListing); } @Override void handleFailure(Throwable throwable) { topicListingFuture.completeExceptionally(throwable); } }, now); return new ListTopicsResult(topicListingFuture); }
listTopics()方法接收一个ListTopicsOptions类型的参数,KafkaAdminClient中基本所有的应用类方法都有一个类似XXXOptions类型的参数,这个类型一般只包含timeoutMs这个成员变量,用来设定请求的超时时间,如果没有指定则使用默认的request.timeout.ms参数值,即30000ms。
最后关于那个管理topic,传入XXXOptions,就是调用方法的话,不传参options参数的话,会调用Admin接口中listTopic方法中一个参数的那个方法,它又返回了两个参数的listtopic()方法,第二参数会自动创建无参构造函数创建对象option,如果要传入XXXOptions的话,就是传入一个时间的参数,设定请求超时时间。
default DeleteTopicsResult deleteTopics(Collection<String> topics) {
return this.deleteTopics(topics, new DeleteTopicsOptions());
}
DeleteTopicsResult deleteTopics(Collection<String> var1, DeleteTopicsOptions var2);
上述的这个最后还是追到了KafkaAdminClient里的deleteTopics了。所以之后写的小demo是对的。
不过ListTopicsOptions扩展了一个成员变量listInternal,用来指明是否需要罗列内部Topic,比如在Kafka解析之topic创建(1)中提及的“__consumer_offsets”和“transaction_state”就是两个内部Topic。ListTopicsOptions的代码如所示:
@InterfaceStability.Evolving
public class ListTopicsOptions extends AbstractOptions<ListTopicsOptions> {
private boolean listInternal = false;
public ListTopicsOptions timeoutMs(Integer timeoutMs) {
this.timeoutMs = timeoutMs;
return this;
}
public ListTopicsOptions listInternal(boolean listInternal) {
this.listInternal = listInternal;
return this;
}
public boolean shouldListInternal() {
return listInternal;
}
}
接下去继续讲解listTopics()方法,其返回值为ListTopicResult类型。与ListTopicsOptions对应,KafkaAdminClient中基本所有的应用类方法都有一个类似XXXResult类型的返回值,其内部一般包含一个KafkaFuture,用于异步发送请求之后等待操作结果。KafkaFuture实现了Java中的Future接口,用来支持链式调用以及其他异步编程模型。
@InterfaceStability.Evolving public class ListTopicsResult { final KafkaFuture<Map<String, TopicListing>> future; ListTopicsResult(KafkaFuture<Map<String, TopicListing>> future) { this.future = future; } public KafkaFuture<Map<String, TopicListing>> namesToListings() { return future; } public KafkaFuture<Collection<TopicListing>> listings() { return future.thenApply(namesToDescriptions -> namesToDescriptions.values()); } public KafkaFuture<Set<String>> names() { return future.thenApply(namesToListings -> namesToListings.keySet()); } }
Future就是对于具体的Runnable或者Callable任务的执行结果进行取消、查询是否完成、获取结果。必要时可以通过get方法获取执行结果,该方法会阻塞直到任务返回结果。
Future类位于java.util.concurrent包下,它是一个接口:
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
在Future接口中声明了5个方法,下面依次解释每个方法的作用:
KafkaFuture和KafkaFutureImpl的源码其实我没有看的太懂,里面涉及类的我看着比较奇怪。所以后来我去再看了一下java8的Future的实现类FutureTask看了一遍,走了一遍逻辑再回来看,还是不太懂KafkaFuture和KafkaFutureImpl的源码里面设计到的BiConsumer,BaseFunction这些是做什么的,一头雾水,后面可以继续再看一下
import java.util.concurrent.locks.LockSupport; import sun.misc.Unsafe; public class FutureTask<V> implements RunnableFuture<V> { private volatile int state;//任务状态 /** * state、runner、waiters三个成员变量是volatile修饰的,也就是说这几个变量的具有内存可见性,当变量的值发生变化后,系统会立即将最新的值刷新回主内存,即使在不同的线程中,看到的变量的值也是一样的,都是最新的。 */ private static final int NEW = 0; private static final int COMPLETING = 1; private static final int NORMAL = 2; private static final int EXCEPTIONAL = 3; private static final int CANCELLED = 4; private static final int INTERRUPTING = 5; private static final int INTERRUPTED = 6; private Callable<V> callable;//是FutureTask具体要执行的任务,由外部传入 private Object outcome;//任务执行完后结果 private volatile Thread runner;//真正执行任务的线程worker private volatile FutureTask.WaitNode waiters;//等待任务的头节点,waiters是一个等待节点,而是是最顶层节点,类似头节点。FutureTask中的等待队列主要作用用于当多线程同时请求get方法获取结果时,这些线程可能要同时阻塞,因此将这些线程的阻塞信息保存在了等待节点中,并构成一个栈的等待结构 //原子操作的UNSAFE类以 private static final Unsafe UNSAFE; //state、runner、waiters的偏移量,在静态代码块中就初始化的 private static final long stateOffset; private static final long runnerOffset; private static final long waitersOffset; //返回结果 private V report(int var1) throws ExecutionException { Object var2 = this.outcome; if (var1 == 2) { return var2; } else if (var1 >= 4) { throw new CancellationException(); } else { throw new ExecutionException((Throwable)var2); } } //构造函数1 /** * 不同情况下的状态转变: 正常执行:NEW -> COMPLETING -> NORMAL 出现异常: NEW -> COMPLETING -> EXCEPTIONAL 取消:NEW -> CANCELLED 线程中断:NEW -> INTERRUPTING -> INTERRUPTED */ public FutureTask(Callable<V> var1) { if (var1 == null) { throw new NullPointerException(); } else { this.callable = var1; this.state = 0;//任务开始状态是New } } //构造函数2 public FutureTask(Runnable var1, V var2) { this.callable = Executors.callable(var1, var2); this.state = 0; } //是否取消了 public boolean isCancelled() { return this.state >= 4; } //是否完成了 public boolean isDone() { return this.state != 0; } //取消任务 /*** *如果是cancle(false),将state设置为CANCELLED,然后结束。如果是cancle(true),表明是由于线程中断,将state设置为INTERRUPTING,然后走try代码块,调用当前线程的interrupt(),设置线程的中断标志位true。这里只是给个中断的标记,并不真正终止线程,所以任务还是要继续。最后将将state设置为INTERRUPTED,INTERRUPTING在这里也只是个瞬时的过渡状态。 **/ public boolean cancel(boolean var1) { if (this.state == 0 && UNSAFE.compareAndSwapInt(this, stateOffset, 0, var1 ? 5 : 4)) { try { if (var1) {//状态为Completing try { Thread var2 = this.runner; if (var2 != null) { var2.interrupt(); } } finally { UNSAFE.putOrderedInt(this, stateOffset, 6); } } } finally { this.finishCompletion(); } return true; } else { return false; } } //获取结果 public V get() throws InterruptedException, ExecutionException { int var1 = this.state; if (var1 <= 1) {//任务执行状态为new或completing时,等待任务执行完成 var1 = this.awaitDone(false, 0L);//等待任务执行完成, /*不是NEW或COMPLETING时,可以任务执行的获取结果, *当调用cancel方法将抛出CancellationException, *当任务执行中出现异常时将抛出ExecutionException */ } return this.report(var1);//得到结果 } //在等待时间内获取结果 public V get(long var1, TimeUnit var3) throws InterruptedException, ExecutionException, TimeoutException { if (var3 == null) { throw new NullPointerException(); } else { int var4 = this.state; if (var4 <= 1 && (var4 = this.awaitDone(true, var3.toNanos(var1))) <= 1) { throw new TimeoutException(); } else { return this.report(var4); } } } // protected void done() { } //正常情况下设置结果 protected void set(V var1) { if (UNSAFE.compareAndSwapInt(this, stateOffset, 0, 1)) {//如果状态是0 new,就把状态改成1 completing this.outcome = var1;//保存正常执行call()返回的结果 UNSAFE.putOrderedInt(this, stateOffset, 2);//状态改为2 Normal this.finishCompletion(); } } //执行中出现异常时设置异常 protected void setException(Throwable var1) { //如果状态是0 new,就把状态改成1 completing if (UNSAFE.compareAndSwapInt(this, stateOffset, 0, 1)) { this.outcome = var1;//保存异常结果 UNSAFE.putOrderedInt(this, stateOffset, 3);//把状态改成3 Exception this.finishCompletion(); } } //线程的run public void run() { //状态机不为0,不为new,表示执行完成或者任务被取消,直接返回 //状态机为0 new,同时将runner设置为当前线程,保证同一时刻只有一个线程执行run方法,如果设置失败也直接返回 if (this.state == 0 && UNSAFE.compareAndSwapObject(this, runnerOffset, (Object)null, Thread.currentThread())) { boolean var9 = false; try { var9 = true; Callable var1 = this.callable;//任务 if (var1 != null) { if (this.state == 0) { //去除任务检测不为空,并且再次检查状态为new Object var2;//是结果 boolean var3;//表示是否正常执行 try { //直接调用Callable的call()方法,也就是我们的逻辑代码,得到需要的结果 var2 = var1.call(); var3 = true;//代码执行到这里,说明没有异常,正常执行了, } catch (Throwable var10) {//出现异常时的处理 var2 = null;//结果为空 var3 = false; this.setException(var10);//任务保存异常信息,而不直接抛出 } if (var3) {//如果正常执行 this.set(var2);//设置结果 var9 = false; } else { var9 = false; } } else { var9 = false; } } else { var9 = false; } } finally { if (var9) { this.runner = null;//这里清空了线程变量,与方法开头判断对应 int var6 = this.state; //任务状态 if (var6 >= 5) {/*如果被设置为了中断状态则进行中断处理*/ this.handlePossibleCancellationInterrupt(var6); } } } this.runner = null; int var12 = this.state; if (var12 >= 5) { this.handlePossibleCancellationInterrupt(var12); } } } //这个在schedule线程池中由用到,有时间可以研究再分享 protected boolean runAndReset() { if (this.state == 0 && UNSAFE.compareAndSwapObject(this, runnerOffset, (Object)null, Thread.currentThread())) { boolean var1 = false; int var2 = this.state; try { Callable var3 = this.callable; if (var3 != null && var2 == 0) { try { var3.call(); var1 = true; } catch (Throwable var8) { this.setException(var8); } } } finally { this.runner = null; var2 = this.state; if (var2 >= 5) { this.handlePossibleCancellationInterrupt(var2); } } return var1 && var2 == 0; } else { return false; } } //处理可能的取消中断 private void handlePossibleCancellationInterrupt(int var1) { if (var1 == 5) { while(this.state == 5) { Thread.yield(); } } } // private void finishCompletion() { while(true) { FutureTask.WaitNode var1; if ((var1 = this.waiters) != null) { if (!UNSAFE.compareAndSwapObject(this, waitersOffset, var1, (Object)null)) { continue; } while(true) { Thread var2 = var1.thread; if (var2 != null) { var1.thread = null; LockSupport.unpark(var2); } FutureTask.WaitNode var3 = var1.next; if (var3 == null) { break; } var1.next = null; var1 = var3; } } this.done(); this.callable = null; return; } } //等待任务执行完成,线程阻塞等待方法 /** * var1: 是不是用等待时间 * var2:等待的时间 **/ /** * 如果线程中断了,删除节点,并抛出异常。 如果字体大于 COMPLETING ,说明任务完成了,返回结果。 如果等于 COMPLETING,说明任务快要完成了,自旋一会。 如果 q 是 null,说明这是第一次进入,创建一个新的节点。保存当前线程引用。 如果还没有修改过 waiters 变量,就使用 CAS 修改当前 waiters 为当前节点,这里是一个栈的结构。 根据时间策略挂起当前线程。 当线程醒来后,继续上面的判断,正常情况下,返回数据。 **/ private int awaitDone(boolean var1, long var2) throws InterruptedException { long var4 = var1 ? System.nanoTime() + var2 : 0L;//计算阻塞超时时间 FutureTask.WaitNode var6 = null;//等待节点,头结点 boolean var7 = false;//排队队列,默认不阻塞 while(!Thread.interrupted()) {//如果阻塞线没有被中断 int var8 = this.state;//当前状态 if (var8 > 1) {//任务完成 if (var6 != null) //如果阻塞队列有节点 var6.thread = null;//线程为空 } return var8;//直接返回结果 } if (var8 == 1) {//如果任务执行完成,但还差最后一步最终完成,则让出CPU给任务执行线程继续执行 Thread.yield(); } else if (var6 == null) { var6 = new FutureTask.WaitNode();//新进来的线程当前线程var6 等待添加节点 } else if (!var7) {//如果没有加入等待队列,就将自身加入链表的头部 var7 = UNSAFE.compareAndSwapObject(this, waitersOffset, var6.next = this.waiters, var6);//上一步节点创建完,还没有将其条件到waiters栈中,因此在下一个循环就会执行此处进行入栈操作,并将当前线程的等待节点置于栈顶 } else if (var1) { //如果设置了阻塞超时时间,则进行检查是否达到阻塞超时时间,达到了则删除当前线程的等待节点并退出循环返回,否则继续阻塞 //超时处理 var2 = var4 - System.nanoTime(); if (var2 <= 0L) { this.removeWaiter(var6); return this.state; } //是当前线程进入等待 LockSupport.parkNanos(this, var2); } else { //非超时阻塞 LockSupport.park(this); } } this.removeWaiter(var6); throw new InterruptedException(); } //移除等待节点 private void removeWaiter(FutureTask.WaitNode var1) { if (var1 != null) { var1.thread = null; label29: while(true) { FutureTask.WaitNode var2 = null; FutureTask.WaitNode var4; for(FutureTask.WaitNode var3 = this.waiters; var3 != null; var3 = var4) { var4 = var3.next; if (var3.thread != null) { var2 = var3; } else if (var2 != null) { var2.next = var4; if (var2.thread == null) { continue label29; } } else if (!UNSAFE.compareAndSwapObject(this, waitersOffset, var3, var4)) { continue label29; } } return; } } /*state、runner、waiters的偏移量offset,在静态块中就初始化的,偏移量代表该变量的相对于对象的指针的相对位置,通过这个就可以得到对象中该变量的值。*/ static { try { UNSAFE = Unsafe.getUnsafe(); Class var0 = FutureTask.class; stateOffset = UNSAFE.objectFieldOffset(var0.getDeclaredField("state")); runnerOffset = UNSAFE.objectFieldOffset(var0.getDeclaredField("runner")); waitersOffset = UNSAFE.objectFieldOffset(var0.getDeclaredField("waiters")); } catch (Exception var1) { throw new Error(var1); } } //等待节点的定义 static final class WaitNode { volatile Thread thread = Thread.currentThread(); volatile FutureTask.WaitNode next; WaitNode() { } } }
KafkaFuture和KafkaFutureImpl的源码暂时不理解,但知道它是返回异步调用任务的类,主要再看回ListTopic源码中的主要的部分,
runnable.call(new Call("listTopics", calcDeadlineMs(now, options.timeoutMs()), new LeastLoadedNodeProvider()) { @Override MetadataRequest.Builder createRequest(int timeoutMs) { return MetadataRequest.Builder.allTopics(); } @Override void handleResponse(AbstractResponse abstractResponse) { MetadataResponse response = (MetadataResponse) abstractResponse; Map<String, TopicListing> topicListing = new HashMap<>(); for (MetadataResponse.TopicMetadata topicMetadata : response.topicMetadata()) { String topicName = topicMetadata.topic(); boolean isInternal = topicMetadata.isInternal(); if (!topicMetadata.isInternal() || options.shouldListInternal()) topicListing.put(topicName, new TopicListing(topicName, isInternal)); } topicListingFuture.complete(topicListing); } @Override void handleFailure(Throwable throwable) { topicListingFuture.completeExceptionally(throwable); } }, now);
这一大段其实就是runnable对象调用call方法,call方法两个参数,一个是Call类的实例化对象,一个是时间
其中runnable对象的类型是AdminClientRunnable,它是KafkaAdminClient负责处理与服务端交互请求的服务线程。
看一下call方法
void call(Call call, long now) {
//在关闭操作期间,这是我们将使所有挂起的操作超时并强制RPC线程退出的时间。如果管理客户端未关闭,则该值为0。
if (hardShutdownTimeMs.get() != INVALID_SHUTDOWN_TIME) {//无效的关机时间,表示尚未执行关机。
log.debug("The AdminClient is not accepting new calls. Timing out {}.", call);
call.fail(Long.MAX_VALUE, new TimeoutException("The AdminClient thread is not accepting new calls."));
} else {
enqueue(call, now);
}
}
void enqueue(Call call, long now) { if (log.isDebugEnabled()) { log.debug("Queueing {} with a timeout {} ms from now.", call, call.deadlineMs - now); } boolean accepted = false;//设定的是不能接收Call请求 synchronized (this) { if (newCalls != null) { newCalls.add(call);//加入newCall队列当中,这个不是优先队列实现的是直接用的LinkedList,private List<Call> newCalls = new LinkedList<>(); accepted = true; //可以接收Call请求 } } if (accepted) {//如果可以接收call请求 client.wakeup(); // 如果client被阻塞,请唤醒,这个client是KafkaClient接口的函数,具体有两个实现类NetworkClient,MockClient,这两个类重写了wakeup()方法,最后调用了Object里的notify() } else {//如果不能接收call请求,打印一些相应的信息 log.debug("The AdminClient thread has exited. Timing out {}.", call); call.fail(Long.MAX_VALUE, new TimeoutException("The AdminClient thread has exited.")); } }
总体来讲,AdminClientRunnable中的call方法用作入队一个Call请求,进而对其处理。Call请求代表与服务端的一次请求交互,比如listTopics和createTopics都是一次Call请求,AdminClientRunnable线程负责处理这些Call请求。
再说回来call方法的第一个参数,Call类的实例化对象,来看看Call类
Call(String callName, long deadlineMs, NodeProvider nodeProvider) {
this(false, callName, deadlineMs, nodeProvider);
}
Call类是一个抽象类,构造方法接收三个参数:本次请求的名称callName、超时时间deadlineMs、以及节点提供器nodeProvider。nodeProvider是NodeProvider类型,用来提供本次请求所交互的Broker节点。
Call类中还有3个抽象方法:createRequest()、handleResponse()、handleFailure(),分别用来创建请求、处理回执和处理失败。
abstract AbstractRequest.Builder createRequest(int timeoutMs);//创建请求
abstract void handleResponse(AbstractResponse abstractResponse);//处理call请求的响应
abstract void handleFailure(Throwable throwable);//处理失败
对于我们今天研究的 listTopics()方法而言,主要的处理逻辑封装在createRequest()、handleResponse()、handleFailure()这三个方法之中了。
先看 createRequest()
@Override
MetadataRequest.Builder createRequest(int timeoutMs) {
return MetadataRequest.Builder.allTopics();
}
发送MetadataRequest请求,调用的是allTopics()的方法
public static Builder allTopics() {//Builder是MetadataRequest一个内部类,
return new Builder(ALL_TOPICS_REQUEST_DATA);
}
而 new Builder(ALL_TOPICS_REQUEST_DATA);
public Builder(MetadataRequestData data) {//MetadataRequest.Builder的构造函数
super(ApiKeys.METADATA);
this.data = data;
}
private static final MetadataRequestData ALL_TOPICS_REQUEST_DATA = new MetadataRequestData().
METADATA(3, "Metadata", MetadataRequestData.SCHEMAS, MetadataResponseData.SCHEMAS),
所以 createRequest,就是发送了一个MetadataRequest请求里的alltopic内容,这个最后到APiKeys.METADATA的这个数据,但是我没有明白这个ApiKeys类,后续怎么使用,怎么跟前面联系在一起的的这个逻辑。我知道的是kafka2.5.0这个类这里面一共有48种请求类型,后续我还要研究一下。
再看 handleResponse()
@Override
void handleResponse(AbstractResponse abstractResponse) {
MetadataResponse response = (MetadataResponse) abstractResponse;//处理一个MetadataResponse
Map<String, TopicListing> topicListing = new HashMap<>();//创建一个Topiclisting的Map容器
for (MetadataResponse.TopicMetadata topicMetadata : response.topicMetadata()) {//对每一个创建好的topic都去做响应
String topicName = topicMetadata.topic();//得到topic的name
boolean isInternal = topicMetadata.isInternal();//包不包含内部的topic,像__consumer_offsets之类的的内部topic
if (!topicMetadata.isInternal() || options.shouldListInternal())//如果不是内部的topic,或者内部的可以被列出来,
topicListing.put(topicName, new TopicListing(topicName, isInternal));//将TopicName放进topicListing集合里
}
topicListingFuture.complete(topicListing);//异步处理结果响应是否完成
}
最后是 handleFailure(),处理失败了怎么办
@Override
void handleFailure(Throwable throwable) {
topicListingFuture.completeExceptionally(throwable);//抛出异常
}
其实,KafkaAdminClient的其他关于topic的操作,都是这样,先定义XXXOptions,再是XXResult,然后调用call方法,call中定义Call的实例对象,然后实现Call类中的3个抽象方法,把相应的逻辑写进去就可以。
最后我们使用它KafkaAdmiClient里封装好的管理topic的方法,写了一个小程序
public boolean createTopic(String topic, int partitionNum, short replicationFactor){ ArrayList<NewTopic> topics = new ArrayList<NewTopic>(); NewTopic newTopic = new NewTopic(topic, partitionNum, replicationFactor); topics.add(newTopic);//把topic加进去 CreateTopicsResult createTopicsResult = kafkaAdminClient.createTopics(topics);//调用KafkaAdminClient的方法 boolean success = false; try { createTopicsResult.all().get();// 阻塞线程,直到所有主题创建成功 success = true; } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } return success; }
public boolean deleteTopic(String topic){
DeleteTopicsResult deleteTopicsResult = kafkaAdminClient.deleteTopics(Arrays.asList(topic));
boolean success = false;
try {
deleteTopicsResult.all().get(); // 阻塞线程,直到所有主题删除成功
success = true;
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
return success;
}
public List<String> findAllTopic(){
ListTopicsResult listTopicsResult = kafkaAdminClient.listTopics();
List<String> lst = new ArrayList<String>();
try {
Set<String> s = listTopicsResult.names().get();
for(String str: s){
lst.add(str);
}
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
return lst;
}
public class test {
public static void main(String[] args) {
KafkaAdminClientTopicOP kafkaAdminClientTest = new KafkaAdminClientTopicOP("localhost:9092");
// kafkaAdminClientTest.findAllTopic();
kafkaAdminClientTest.createTopic("ideatopic_1", 2, (short) 1);
kafkaAdminClientTest.createTopic("ideatopic_2", 2, (short) 1);
kafkaAdminClientTest.createTopic("ideatopic_3", 2, (short) 1);
kafkaAdminClientTest.createTopic("ideatopic_4", 2, (short) 1);
// kafkaAdminClientTest.findAllTopic();
kafkaAdminClientTest.deleteTopic("ideatopic_4");
List<String> lst = kafkaAdminClientTest.findAllTopic();
for (String s : lst)
System.out.println(s);
}
}
启动kafka服务,结果,可以通过KafkaAdminClient对topic进行管理
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。