当前位置:   article > 正文

Kafka2.5.0源码关于 KafkaAdminClient 中 listTopic( ) 的源码详解

Kafka2.5.0源码关于 KafkaAdminClient 中 listTopic( ) 的源码详解

Kafka2.5.0源码关于 KafkaAdminClient 中 listTopic( ) 的源码详解

参考链接:https://blog.csdn.net/u013256816/article/details/79996056
@Override
public ListTopicsResult listTopics(final ListTopicsOptions options) {
    final KafkaFutureImpl<Map<String, TopicListing>> topicListingFuture = new KafkaFutureImpl<>();
    final long now = time.milliseconds();
    runnable.call(new Call("listTopics", calcDeadlineMs(now, options.timeoutMs()),
        new LeastLoadedNodeProvider()) {
        @Override
        MetadataRequest.Builder createRequest(int timeoutMs) {
            return MetadataRequest.Builder.allTopics();
        }
        @Override
        void handleResponse(AbstractResponse abstractResponse) {
            MetadataResponse response = (MetadataResponse) abstractResponse;
            Map<String, TopicListing> topicListing = new HashMap<>();
            for (MetadataResponse.TopicMetadata topicMetadata : response.topicMetadata()) {
                String topicName = topicMetadata.topic();
                boolean isInternal = topicMetadata.isInternal();
                if (!topicMetadata.isInternal() || options.shouldListInternal())
                    topicListing.put(topicName, new TopicListing(topicName, isInternal));
            }
            topicListingFuture.complete(topicListing);
        }
        @Override
        void handleFailure(Throwable throwable) {
            topicListingFuture.completeExceptionally(throwable);
        }
    }, now);
    return new ListTopicsResult(topicListingFuture);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29

listTopics()方法接收一个ListTopicsOptions类型的参数,KafkaAdminClient中基本所有的应用类方法都有一个类似XXXOptions类型的参数,这个类型一般只包含timeoutMs这个成员变量,用来设定请求的超时时间,如果没有指定则使用默认的request.timeout.ms参数值,即30000ms。

最后关于那个管理topic,传入XXXOptions,就是调用方法的话,不传参options参数的话,会调用Admin接口中listTopic方法中一个参数的那个方法,它又返回了两个参数的listtopic()方法,第二参数会自动创建无参构造函数创建对象option,如果要传入XXXOptions的话,就是传入一个时间的参数,设定请求超时时间。

default DeleteTopicsResult deleteTopics(Collection<String> topics) {
    return this.deleteTopics(topics, new DeleteTopicsOptions());
}
  • 1
  • 2
  • 3
DeleteTopicsResult deleteTopics(Collection<String> var1, DeleteTopicsOptions var2);
  • 1

上述的这个最后还是追到了KafkaAdminClient里的deleteTopics了。所以之后写的小demo是对的。

不过ListTopicsOptions扩展了一个成员变量listInternal,用来指明是否需要罗列内部Topic,比如在Kafka解析之topic创建(1)中提及的“__consumer_offsets”和“transaction_state”就是两个内部Topic。ListTopicsOptions的代码如所示:

@InterfaceStability.Evolving
public class ListTopicsOptions extends AbstractOptions<ListTopicsOptions> {
    private boolean listInternal = false;
    public ListTopicsOptions timeoutMs(Integer timeoutMs) {
        this.timeoutMs = timeoutMs;
        return this;
    }
    public ListTopicsOptions listInternal(boolean listInternal) {
        this.listInternal = listInternal;
        return this;
    }
    public boolean shouldListInternal() {
        return listInternal;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

接下去继续讲解listTopics()方法,其返回值为ListTopicResult类型。与ListTopicsOptions对应,KafkaAdminClient中基本所有的应用类方法都有一个类似XXXResult类型的返回值,其内部一般包含一个KafkaFuture,用于异步发送请求之后等待操作结果。KafkaFuture实现了Java中的Future接口,用来支持链式调用以及其他异步编程模型。

@InterfaceStability.Evolving
public class ListTopicsResult {
    final KafkaFuture<Map<String, TopicListing>> future;
    ListTopicsResult(KafkaFuture<Map<String, TopicListing>> future) {
        this.future = future;
    }
    public KafkaFuture<Map<String, TopicListing>> namesToListings() {
        return future;
    }
    public KafkaFuture<Collection<TopicListing>> listings() {
        return future.thenApply(namesToDescriptions -> namesToDescriptions.values());
    }
    public KafkaFuture<Set<String>> names() {
        return future.thenApply(namesToListings -> namesToListings.keySet());
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

Future就是对于具体的Runnable或者Callable任务的执行结果进行取消、查询是否完成、获取结果。必要时可以通过get方法获取执行结果,该方法会阻塞直到任务返回结果。

Future类位于java.util.concurrent包下,它是一个接口:

public interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning);
    boolean isCancelled();
    boolean isDone();
    V get() throws InterruptedException, ExecutionException;
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

在Future接口中声明了5个方法,下面依次解释每个方法的作用:

  • cancel方法用来取消任务,如果取消任务成功则返回true,如果取消任务失败则返回false。参数mayInterruptIfRunning表示是否允许取消正在执行却没有执行完毕的任务,如果设置true,则表示可以取消正在执行过程中的任务。如果任务已经完成,则无论mayInterruptIfRunning为true还是false,此方法肯定返回false,即如果取消已经完成的任务会返回false;如果任务正在执行,若mayInterruptIfRunning设置为true,则返回true,若mayInterruptIfRunning设置为false,则返回false;如果任务还没有执行,则无论mayInterruptIfRunning为true还是false,肯定返回true。
  • isCancelled方法表示任务是否被取消成功,如果在任务正常完成前被取消成功,则返回 true。
  • isDone方法表示任务是否已经完成,若任务完成,则返回true;
  • get()方法用来获取执行结果,这个方法会产生阻塞,会一直等到任务执行完毕才返回;
  • get(long timeout, TimeUnit unit)用来获取执行结果,如果在指定时间内,还没获取到结果,就直接返回null。

KafkaFuture和KafkaFutureImpl的源码其实我没有看的太懂,里面涉及类的我看着比较奇怪。所以后来我去再看了一下java8的Future的实现类FutureTask看了一遍,走了一遍逻辑再回来看,还是不太懂KafkaFuture和KafkaFutureImpl的源码里面设计到的BiConsumer,BaseFunction这些是做什么的,一头雾水,后面可以继续再看一下

import java.util.concurrent.locks.LockSupport;
import sun.misc.Unsafe;
public class FutureTask<V> implements RunnableFuture<V> {
    private volatile int state;//任务状态
    /**
    * state、runner、waiters三个成员变量是volatile修饰的,也就是说这几个变量的具有内存可见性,当变量的值发生变化后,系统会立即将最新的值刷新回主内存,即使在不同的线程中,看到的变量的值也是一样的,都是最新的。
    */
    private static final int NEW = 0;
    private static final int COMPLETING = 1;
    private static final int NORMAL = 2;
    private static final int EXCEPTIONAL = 3;
    private static final int CANCELLED = 4;
    private static final int INTERRUPTING = 5;
    private static final int INTERRUPTED = 6;
    private Callable<V> callable;//是FutureTask具体要执行的任务,由外部传入
    private Object outcome;//任务执行完后结果
    private volatile Thread runner;//真正执行任务的线程worker
    private volatile FutureTask.WaitNode waiters;//等待任务的头节点,waiters是一个等待节点,而是是最顶层节点,类似头节点。FutureTask中的等待队列主要作用用于当多线程同时请求get方法获取结果时,这些线程可能要同时阻塞,因此将这些线程的阻塞信息保存在了等待节点中,并构成一个栈的等待结构
    //原子操作的UNSAFE类以
    private static final Unsafe UNSAFE;
    //state、runner、waiters的偏移量,在静态代码块中就初始化的
    private static final long stateOffset;
    private static final long runnerOffset;
    private static final long waitersOffset;
    //返回结果
    private V report(int var1) throws ExecutionException {
        Object var2 = this.outcome;
        if (var1 == 2) {
            return var2;
        } else if (var1 >= 4) {
            throw new CancellationException();
        } else {
            throw new ExecutionException((Throwable)var2);
        }
    }
    //构造函数1
    /**
    * 不同情况下的状态转变:
	正常执行:NEW -> COMPLETING -> NORMAL
	出现异常: NEW -> COMPLETING -> EXCEPTIONAL
	取消:NEW -> CANCELLED
	线程中断:NEW -> INTERRUPTING -> INTERRUPTED
    */
    public FutureTask(Callable<V> var1) {
        if (var1 == null) {
            throw new NullPointerException();
        } else {
            this.callable = var1;
            this.state = 0;//任务开始状态是New
        }
    }
    //构造函数2
    public FutureTask(Runnable var1, V var2) {
        this.callable = Executors.callable(var1, var2);
        this.state = 0;
    }
    //是否取消了
    public boolean isCancelled() {
        return this.state >= 4;
    }
    //是否完成了
    public boolean isDone() {
        return this.state != 0;
    }
    //取消任务
    /***
    *如果是cancle(false),将state设置为CANCELLED,然后结束。如果是cancle(true),表明是由于线程中断,将state设置为INTERRUPTING,然后走try代码块,调用当前线程的interrupt(),设置线程的中断标志位true。这里只是给个中断的标记,并不真正终止线程,所以任务还是要继续。最后将将state设置为INTERRUPTED,INTERRUPTING在这里也只是个瞬时的过渡状态。
    **/
    public boolean cancel(boolean var1) {
        if (this.state == 0 && UNSAFE.compareAndSwapInt(this, stateOffset, 0, var1 ? 5 : 4)) {
            try {
                if (var1) {//状态为Completing
                    try {
                        Thread var2 = this.runner;
                        if (var2 != null) {
                            var2.interrupt();
                        }
                    } finally {
                        UNSAFE.putOrderedInt(this, stateOffset, 6);
                    }
                }
            } finally {
                this.finishCompletion();
            }
            return true;
        } else {
            return false;
        }
    }
    //获取结果
    public V get() throws InterruptedException, ExecutionException {
        int var1 = this.state;
        if (var1 <= 1) {//任务执行状态为new或completing时,等待任务执行完成
            var1 = this.awaitDone(false, 0L);//等待任务执行完成,
              /*不是NEW或COMPLETING时,可以任务执行的获取结果,       
       *当调用cancel方法将抛出CancellationException,       
       *当任务执行中出现异常时将抛出ExecutionException       
       */
        }
        return this.report(var1);//得到结果
    }
    //在等待时间内获取结果
    public V get(long var1, TimeUnit var3) throws InterruptedException, ExecutionException, TimeoutException {
        if (var3 == null) {
            throw new NullPointerException();
        } else {
            int var4 = this.state;
            if (var4 <= 1 && (var4 = this.awaitDone(true, var3.toNanos(var1))) <= 1) {
                throw new TimeoutException();
            } else {
                return this.report(var4);
            }
        }
    }
    //
    protected void done() {
    }
    //正常情况下设置结果
    protected void set(V var1) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, 0, 1)) {//如果状态是0 new,就把状态改成1 completing
            this.outcome = var1;//保存正常执行call()返回的结果
            UNSAFE.putOrderedInt(this, stateOffset, 2);//状态改为2 Normal
            this.finishCompletion();
        }

    }
    //执行中出现异常时设置异常
    protected void setException(Throwable var1) {
        //如果状态是0 new,就把状态改成1 completing
        if (UNSAFE.compareAndSwapInt(this, stateOffset, 0, 1)) {
            this.outcome = var1;//保存异常结果
            UNSAFE.putOrderedInt(this, stateOffset, 3);//把状态改成3 Exception
            this.finishCompletion();
        }

    }
    //线程的run
    public void run() {
        //状态机不为0,不为new,表示执行完成或者任务被取消,直接返回
        //状态机为0 new,同时将runner设置为当前线程,保证同一时刻只有一个线程执行run方法,如果设置失败也直接返回
        if (this.state == 0 && UNSAFE.compareAndSwapObject(this, runnerOffset, (Object)null, Thread.currentThread())) {
            boolean var9 = false;
            try {
                var9 = true;
                Callable var1 = this.callable;//任务
                if (var1 != null) {
                    if (this.state == 0) { //去除任务检测不为空,并且再次检查状态为new
                        Object var2;//是结果
                        boolean var3;//表示是否正常执行
                        try {
                            //直接调用Callable的call()方法,也就是我们的逻辑代码,得到需要的结果
                            var2 = var1.call();
                            var3 = true;//代码执行到这里,说明没有异常,正常执行了,
                        } catch (Throwable var10) {//出现异常时的处理
                            var2 = null;//结果为空
                            var3 = false;
                            this.setException(var10);//任务保存异常信息,而不直接抛出
                        }
                        if (var3) {//如果正常执行
                            this.set(var2);//设置结果
                            var9 = false;
                        } else {
                            var9 = false;
                        }
                    } else {
                        var9 = false;
                    }
                } else {
                    var9 = false;
                }
            } finally {
                if (var9) {
                    this.runner = null;//这里清空了线程变量,与方法开头判断对应
                    int var6 = this.state; //任务状态
                    if (var6 >= 5) {/*如果被设置为了中断状态则进行中断处理*/                  this.handlePossibleCancellationInterrupt(var6);
                    }

                }
            }
            this.runner = null;
            int var12 = this.state;
            if (var12 >= 5) {
                this.handlePossibleCancellationInterrupt(var12);
            }
        }
    }
    //这个在schedule线程池中由用到,有时间可以研究再分享
    protected boolean runAndReset() {
        if (this.state == 0 && UNSAFE.compareAndSwapObject(this, runnerOffset, (Object)null, Thread.currentThread())) {
            boolean var1 = false;
            int var2 = this.state;
            try {
                Callable var3 = this.callable;
                if (var3 != null && var2 == 0) {
                    try {
                        var3.call();
                        var1 = true;
                    } catch (Throwable var8) {
                        this.setException(var8);
                    }
                }
            } finally {
                this.runner = null;
                var2 = this.state;
                if (var2 >= 5) {
                    this.handlePossibleCancellationInterrupt(var2);
                }
            }
            return var1 && var2 == 0;
        } else {
            return false;
        }
    }
    //处理可能的取消中断
    private void handlePossibleCancellationInterrupt(int var1) {
        if (var1 == 5) {
            while(this.state == 5) {
                Thread.yield();
            }
        }
    }
    //
    private void finishCompletion() {
        while(true) {
            FutureTask.WaitNode var1;
            if ((var1 = this.waiters) != null) {
                if (!UNSAFE.compareAndSwapObject(this, waitersOffset, var1, (Object)null)) {
                    continue;
                }
                while(true) {
                    Thread var2 = var1.thread;
                    if (var2 != null) {
                        var1.thread = null;
                        LockSupport.unpark(var2);
                    }
                    FutureTask.WaitNode var3 = var1.next;
                    if (var3 == null) {
                        break;
                    }
                    var1.next = null;
                    var1 = var3;
                }
            }
            this.done();
            this.callable = null;
            return;
        }
    }
    //等待任务执行完成,线程阻塞等待方法
    /**
    * var1: 是不是用等待时间
    * var2:等待的时间
    **/
    /**
    * 如果线程中断了,删除节点,并抛出异常。
如果字体大于  COMPLETING ,说明任务完成了,返回结果。
如果等于 COMPLETING,说明任务快要完成了,自旋一会。
如果 q 是 null,说明这是第一次进入,创建一个新的节点。保存当前线程引用。
如果还没有修改过 waiters 变量,就使用 CAS  修改当前 waiters 为当前节点,这里是一个栈的结构。
根据时间策略挂起当前线程。
当线程醒来后,继续上面的判断,正常情况下,返回数据。
    **/
    private int awaitDone(boolean var1, long var2) throws InterruptedException {
        long var4 = var1 ? System.nanoTime() + var2 : 0L;//计算阻塞超时时间
        FutureTask.WaitNode var6 = null;//等待节点,头结点
        boolean var7 = false;//排队队列,默认不阻塞
        while(!Thread.interrupted()) {//如果阻塞线没有被中断
            int var8 = this.state;//当前状态
            if (var8 > 1) {//任务完成
                if (var6 != null) //如果阻塞队列有节点
                    var6.thread = null;//线程为空
                }
                return var8;//直接返回结果
            }
            if (var8 == 1) {//如果任务执行完成,但还差最后一步最终完成,则让出CPU给任务执行线程继续执行
                Thread.yield();
            } else if (var6 == null) {
                var6 = new FutureTask.WaitNode();//新进来的线程当前线程var6 等待添加节点
            } else if (!var7) {//如果没有加入等待队列,就将自身加入链表的头部
                var7 = UNSAFE.compareAndSwapObject(this, waitersOffset, var6.next = this.waiters, var6);//上一步节点创建完,还没有将其条件到waiters栈中,因此在下一个循环就会执行此处进行入栈操作,并将当前线程的等待节点置于栈顶
            } else if (var1) {
                //如果设置了阻塞超时时间,则进行检查是否达到阻塞超时时间,达到了则删除当前线程的等待节点并退出循环返回,否则继续阻塞
                //超时处理
                var2 = var4 - System.nanoTime();
                if (var2 <= 0L) {
                    this.removeWaiter(var6);
                    return this.state;
                }
                //是当前线程进入等待
                LockSupport.parkNanos(this, var2);
            } else {
                //非超时阻塞
                LockSupport.park(this);
            }
        }
        this.removeWaiter(var6);
        throw new InterruptedException();
    }
    //移除等待节点
    private void removeWaiter(FutureTask.WaitNode var1) {
        if (var1 != null) {
            var1.thread = null;
            label29:
            while(true) {
                FutureTask.WaitNode var2 = null;
                FutureTask.WaitNode var4;
                for(FutureTask.WaitNode var3 = this.waiters; var3 != null; var3 = var4) {
                    var4 = var3.next;
                    if (var3.thread != null) {
                        var2 = var3;
                    } else if (var2 != null) {
                        var2.next = var4;
                        if (var2.thread == null) {
                            continue label29;
                        }
                    } else if (!UNSAFE.compareAndSwapObject(this, waitersOffset, var3, var4)) {
                        continue label29;
                    }
                }
                return;
            }
        }
         /*state、runner、waiters的偏移量offset,在静态块中就初始化的,偏移量代表该变量的相对于对象的指针的相对位置,通过这个就可以得到对象中该变量的值。*/
    static {
        try {
            UNSAFE = Unsafe.getUnsafe();
            Class var0 = FutureTask.class;
            stateOffset = UNSAFE.objectFieldOffset(var0.getDeclaredField("state"));
            runnerOffset = UNSAFE.objectFieldOffset(var0.getDeclaredField("runner"));
            waitersOffset = UNSAFE.objectFieldOffset(var0.getDeclaredField("waiters"));
        } catch (Exception var1) {
            throw new Error(var1);
        }
    }
    //等待节点的定义
    static final class WaitNode {
        volatile Thread thread = Thread.currentThread();
        volatile FutureTask.WaitNode next;
        WaitNode() {
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167
  • 168
  • 169
  • 170
  • 171
  • 172
  • 173
  • 174
  • 175
  • 176
  • 177
  • 178
  • 179
  • 180
  • 181
  • 182
  • 183
  • 184
  • 185
  • 186
  • 187
  • 188
  • 189
  • 190
  • 191
  • 192
  • 193
  • 194
  • 195
  • 196
  • 197
  • 198
  • 199
  • 200
  • 201
  • 202
  • 203
  • 204
  • 205
  • 206
  • 207
  • 208
  • 209
  • 210
  • 211
  • 212
  • 213
  • 214
  • 215
  • 216
  • 217
  • 218
  • 219
  • 220
  • 221
  • 222
  • 223
  • 224
  • 225
  • 226
  • 227
  • 228
  • 229
  • 230
  • 231
  • 232
  • 233
  • 234
  • 235
  • 236
  • 237
  • 238
  • 239
  • 240
  • 241
  • 242
  • 243
  • 244
  • 245
  • 246
  • 247
  • 248
  • 249
  • 250
  • 251
  • 252
  • 253
  • 254
  • 255
  • 256
  • 257
  • 258
  • 259
  • 260
  • 261
  • 262
  • 263
  • 264
  • 265
  • 266
  • 267
  • 268
  • 269
  • 270
  • 271
  • 272
  • 273
  • 274
  • 275
  • 276
  • 277
  • 278
  • 279
  • 280
  • 281
  • 282
  • 283
  • 284
  • 285
  • 286
  • 287
  • 288
  • 289
  • 290
  • 291
  • 292
  • 293
  • 294
  • 295
  • 296
  • 297
  • 298
  • 299
  • 300
  • 301
  • 302
  • 303
  • 304
  • 305
  • 306
  • 307
  • 308
  • 309
  • 310
  • 311
  • 312
  • 313
  • 314
  • 315
  • 316
  • 317
  • 318
  • 319
  • 320
  • 321
  • 322
  • 323
  • 324
  • 325
  • 326
  • 327
  • 328
  • 329
  • 330
  • 331
  • 332
  • 333
  • 334
  • 335
  • 336
  • 337
  • 338
  • 339
  • 340
  • 341
  • 342

KafkaFuture和KafkaFutureImpl的源码暂时不理解,但知道它是返回异步调用任务的类,主要再看回ListTopic源码中的主要的部分,

runnable.call(new Call("listTopics", calcDeadlineMs(now, options.timeoutMs()),
            new LeastLoadedNodeProvider()) {
            @Override
            MetadataRequest.Builder createRequest(int timeoutMs) {
                return MetadataRequest.Builder.allTopics();
            }
            @Override
            void handleResponse(AbstractResponse abstractResponse) {
                MetadataResponse response = (MetadataResponse) abstractResponse;
                Map<String, TopicListing> topicListing = new HashMap<>();
                for (MetadataResponse.TopicMetadata topicMetadata : response.topicMetadata()) {
                    String topicName = topicMetadata.topic();
                    boolean isInternal = topicMetadata.isInternal();
                    if (!topicMetadata.isInternal() || options.shouldListInternal())
                        topicListing.put(topicName, new TopicListing(topicName, isInternal));
                }
                topicListingFuture.complete(topicListing);
            }
            @Override
            void handleFailure(Throwable throwable) {
                topicListingFuture.completeExceptionally(throwable);
            }
        }, now);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

这一大段其实就是runnable对象调用call方法,call方法两个参数,一个是Call类的实例化对象,一个是时间

其中runnable对象的类型是AdminClientRunnable,它是KafkaAdminClient负责处理与服务端交互请求的服务线程。

看一下call方法

void call(Call call, long now) {
    //在关闭操作期间,这是我们将使所有挂起的操作超时并强制RPC线程退出的时间。如果管理客户端未关闭,则该值为0。
    if (hardShutdownTimeMs.get() != INVALID_SHUTDOWN_TIME) {//无效的关机时间,表示尚未执行关机。
        log.debug("The AdminClient is not accepting new calls. Timing out {}.", call);
        call.fail(Long.MAX_VALUE, new TimeoutException("The AdminClient thread is not accepting new calls."));
    } else {
        enqueue(call, now);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
void enqueue(Call call, long now) {
    if (log.isDebugEnabled()) {
        log.debug("Queueing {} with a timeout {} ms from now.", call, call.deadlineMs - now);
    }
    boolean accepted = false;//设定的是不能接收Call请求
    synchronized (this) {
        if (newCalls != null) {
            newCalls.add(call);//加入newCall队列当中,这个不是优先队列实现的是直接用的LinkedList,private List<Call> newCalls = new LinkedList<>();
            accepted = true; //可以接收Call请求
        }
    }
    if (accepted) {//如果可以接收call请求
        client.wakeup(); // 如果client被阻塞,请唤醒,这个client是KafkaClient接口的函数,具体有两个实现类NetworkClient,MockClient,这两个类重写了wakeup()方法,最后调用了Object里的notify()
    } else {//如果不能接收call请求,打印一些相应的信息
        log.debug("The AdminClient thread has exited. Timing out {}.", call);
        call.fail(Long.MAX_VALUE, new TimeoutException("The AdminClient thread has exited."));
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

总体来讲,AdminClientRunnable中的call方法用作入队一个Call请求,进而对其处理。Call请求代表与服务端的一次请求交互,比如listTopics和createTopics都是一次Call请求,AdminClientRunnable线程负责处理这些Call请求。

再说回来call方法的第一个参数,Call类的实例化对象,来看看Call类

Call(String callName, long deadlineMs, NodeProvider nodeProvider) {
    this(false, callName, deadlineMs, nodeProvider);
}
  • 1
  • 2
  • 3

Call类是一个抽象类,构造方法接收三个参数:本次请求的名称callName、超时时间deadlineMs、以及节点提供器nodeProvider。nodeProvider是NodeProvider类型,用来提供本次请求所交互的Broker节点。

Call类中还有3个抽象方法:createRequest()、handleResponse()、handleFailure(),分别用来创建请求、处理回执和处理失败。

abstract AbstractRequest.Builder createRequest(int timeoutMs);//创建请求

abstract void handleResponse(AbstractResponse abstractResponse);//处理call请求的响应

abstract void handleFailure(Throwable throwable);//处理失败
  • 1
  • 2
  • 3
  • 4
  • 5

对于我们今天研究的 listTopics()方法而言,主要的处理逻辑封装在createRequest()、handleResponse()、handleFailure()这三个方法之中了。

先看 createRequest()

@Override
MetadataRequest.Builder createRequest(int timeoutMs) {
    return MetadataRequest.Builder.allTopics();
}
  • 1
  • 2
  • 3
  • 4

发送MetadataRequest请求,调用的是allTopics()的方法

public static Builder allTopics() {//Builder是MetadataRequest一个内部类,
    return new Builder(ALL_TOPICS_REQUEST_DATA);
}
  • 1
  • 2
  • 3

而 new Builder(ALL_TOPICS_REQUEST_DATA);

public Builder(MetadataRequestData data) {//MetadataRequest.Builder的构造函数
    super(ApiKeys.METADATA);
    this.data = data;
}
  • 1
  • 2
  • 3
  • 4
private static final MetadataRequestData ALL_TOPICS_REQUEST_DATA = new MetadataRequestData().
  • 1
METADATA(3, "Metadata", MetadataRequestData.SCHEMAS, MetadataResponseData.SCHEMAS),
  • 1

所以 createRequest,就是发送了一个MetadataRequest请求里的alltopic内容,这个最后到APiKeys.METADATA的这个数据,但是我没有明白这个ApiKeys类,后续怎么使用,怎么跟前面联系在一起的的这个逻辑。我知道的是kafka2.5.0这个类这里面一共有48种请求类型,后续我还要研究一下。

再看 handleResponse()

@Override
void handleResponse(AbstractResponse abstractResponse) {
    MetadataResponse response = (MetadataResponse) abstractResponse;//处理一个MetadataResponse
    Map<String, TopicListing> topicListing = new HashMap<>();//创建一个Topiclisting的Map容器
    for (MetadataResponse.TopicMetadata topicMetadata : response.topicMetadata()) {//对每一个创建好的topic都去做响应
        String topicName = topicMetadata.topic();//得到topic的name
        boolean isInternal = topicMetadata.isInternal();//包不包含内部的topic,像__consumer_offsets之类的的内部topic
        if (!topicMetadata.isInternal() || options.shouldListInternal())//如果不是内部的topic,或者内部的可以被列出来,
            topicListing.put(topicName, new TopicListing(topicName, isInternal));//将TopicName放进topicListing集合里
    }
    topicListingFuture.complete(topicListing);//异步处理结果响应是否完成
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

最后是 handleFailure(),处理失败了怎么办

@Override
void handleFailure(Throwable throwable) {
    topicListingFuture.completeExceptionally(throwable);//抛出异常
}
  • 1
  • 2
  • 3
  • 4

其实,KafkaAdminClient的其他关于topic的操作,都是这样,先定义XXXOptions,再是XXResult,然后调用call方法,call中定义Call的实例对象,然后实现Call类中的3个抽象方法,把相应的逻辑写进去就可以。

最后我们使用它KafkaAdmiClient里封装好的管理topic的方法,写了一个小程序

public boolean createTopic(String topic, int partitionNum, short replicationFactor){
    ArrayList<NewTopic> topics = new ArrayList<NewTopic>();
    NewTopic newTopic = new NewTopic(topic, partitionNum, replicationFactor);
    topics.add(newTopic);//把topic加进去
    CreateTopicsResult createTopicsResult = kafkaAdminClient.createTopics(topics);//调用KafkaAdminClient的方法
    boolean success = false;
    try {
        createTopicsResult.all().get();// 阻塞线程,直到所有主题创建成功
        success = true;
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (ExecutionException e) {
        e.printStackTrace();
    }
    return success;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
public boolean deleteTopic(String topic){
    DeleteTopicsResult deleteTopicsResult = kafkaAdminClient.deleteTopics(Arrays.asList(topic));
    boolean success = false;
    try {
        deleteTopicsResult.all().get(); // 阻塞线程,直到所有主题删除成功
        success = true;
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (ExecutionException e) {
        e.printStackTrace();
    }
    return success;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
public List<String> findAllTopic(){
    ListTopicsResult listTopicsResult = kafkaAdminClient.listTopics();
    List<String> lst = new ArrayList<String>();
    try {
        Set<String> s = listTopicsResult.names().get();
        for(String str: s){
            lst.add(str);
        }
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (ExecutionException e) {
        e.printStackTrace();
    }
    return lst;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
public class test {
    public static void main(String[] args) {
        KafkaAdminClientTopicOP kafkaAdminClientTest = new KafkaAdminClientTopicOP("localhost:9092");
//        kafkaAdminClientTest.findAllTopic();
        kafkaAdminClientTest.createTopic("ideatopic_1", 2, (short) 1);
        kafkaAdminClientTest.createTopic("ideatopic_2", 2, (short) 1);
        kafkaAdminClientTest.createTopic("ideatopic_3", 2, (short) 1);
        kafkaAdminClientTest.createTopic("ideatopic_4", 2, (short) 1);
//        kafkaAdminClientTest.findAllTopic();
        kafkaAdminClientTest.deleteTopic("ideatopic_4");
        List<String> lst = kafkaAdminClientTest.findAllTopic();
        for (String s : lst)
            System.out.println(s);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

启动kafka服务,结果,可以通过KafkaAdminClient对topic进行管理
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/凡人多烦事01/article/detail/288983
推荐阅读
相关标签
  

闽ICP备14008679号