public final class Cluster {private final boolean isBootstrapConfigured;// 一个Node就代表一个Brokerprivate final List nodes;// 尚未被授权访问的Kafka列表,Kafka是支持权限访问的private final Set unauthorizedTopics;// 映射关系为:“某个Topic下的某个Partition:Partition的详细信息”// TopicPartition指的是Topic1中的Partition1,PartitionInfo为具体某个Partition的详细信息private final Map partitionsByTopicPartition;// 映射关系为:“某个Topic:这个Topic下的Partition列表”private final Map> partitionsByTopic;// 映射关系为:“某个Topic:这个Topic下的可用的Partition列表”private final Map> availablePartitionsByTopic;// 映射关系为:“某个Broker ID:这个Broker上的所有Partition”// 某个Broker上有哪些Partition(可能来自不同的Topic)private final Map> partitionsByNode;// broker.id映射到Node的数据结构,映射关系为:“broker.id:Node”private final Map nodesById;
}
在KafkaProducer初始化时,会构造出集群元数据组件Metadata,且在初始化方法里有一次Metadata#update()方法调用。
// 构造核心组件:Metadata;用于去Broker集群拉取元数据(有哪些Topic,对应哪些Partition,其中哪个是leader、哪个是follower)
// 想往Broker发送一条ProducerRecord,就必须知道目标Topic,有哪几个Partition,其中Partition Leader在哪个Broker上
// 在KafkaProducer初始化时拉取一次元数据;后面每隔一段时间(metadata.max.age.ms,默认:5min)会刷新元数据;发送消息时如果元数不在本地,还得通过Metadata发送请求
this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG));// 省略部分代码......// 会把我们配置的Kafka Broker地址作为参数传入
this.metadata.update(Cluster.bootstrap(addresses), time.milliseconds());
在方法调用中,会传递2个参数,分别是Cluster实例对象和当前时间戳。对于Cluster实例的创建,是利用我们配置的Broker地址,将其包装成Node,并add到List< Node >中。最后利用List< Node >构造出Cluster实例对象。
public static Cluster bootstrap(List addresses) {List nodes = new ArrayList<>();int nodeId = -1;// 遍历传进来的Kafka Broker地址for (InetSocketAddress address : addresses)// 将Broker地址包装成Node后,添加到List集合中nodes.add(new Node(nodeId--, address.getHostString(), address.getPort()));// 利用List包装Cluster实例并返回return new Cluster(true, nodes, new ArrayList(0), Collections.emptySet());
}
之后再执行update()方法时,会将构造好的Cluster实例和当前时间戳传入。
/*** KafkaProducer初始化时,只是将将配置的Broker地址包装成Node后,添加到List集合中,利用List集合创建Cluster实例。* 传进来的参数就是Cluster实例。* 在KafkaProducer初始化时,并没有真正的去某个Broker上拉取元数据,只是将配置的Broker地址转换成了Node,* 以List的形式存到了Cluster实例中** 后面拉取元数据成功后处理响应时再调用该方法,就是更新Cluster了!*/
public synchronized void update(Cluster cluster, long now) {// 将“是否需要update元数据”的标记设为false,即不需要updatethis.needUpdate = false;// 将“最近的刷新时间”和“成功刷新时间”都设为nowthis.lastRefreshMs = now;this.lastSuccessfulRefreshMs = now;// 每次成功update元数据后,就会对version加1this.version += 1;// 拉取元数据使用的监听器for (Listener listener: listeners)listener.onMetadataUpdate(cluster);// Do this after notifying listeners as subscribed topics' list can be changed by listeners// needMetadataForAllTopics(默认false)表示:将所有Topic的元数据都刷新一次// 于是将刚刚包装好的Cluster实例赋值给this.clusterthis.cluster = this.needMetadataForAllTopics ? getClusterForCurrentTopics(cluster) : cluster;// 由于本方法由synchronized修饰,是线程安全的,所以Thread-1抢到了锁,执行该方法,Thread-2就得wait进入休眠状态// 此时调用notifyAll()方法就会唤醒(处于休眠状态的)Thread-2,Thread-2就又能争抢锁了notifyAll();log.debug("Updated cluster metadata version {} to {}", this.version, this.cluster);
}
可以看出,用来标记元数据是否需要拉取的标志位默认为false,还把管理元数据的版本号自增,此时很明显就不会去拉取元数据。
由于update()方法由synchronized修饰,所以在多线程并发执行时,同一时刻只会有一个线程抢占到锁(其他线程进入休眠等待状态),进而执行“更新元数据”操作。等本方法执行完毕后,就会通过notifyAll()唤醒其他处于休眠状态的线程。
在调用Kafka API的doSend()方法生产消息时,会(按需、以同步阻塞的方式)拉取元数据
// 以同步阻塞等待的方式(传参:同步阻塞的最大时间),去连接Broker拉取元数据:如果想往Topic发送消息,必须知道元数据,这样才能通过Partitioner选择一个Partition,
// 然后才能跟这个Partition对应的Leader建立连接、发送消息。其中调用本方法最多能够阻塞等待时间是:max.block.ms
// 返回的是“为了拉取元数据,总共花费的时间” = 元数据拉取的时间 + 一些边边角角
long waitedOnMetadataMs = waitOnMetadata(record.topic(), this.maxBlockTimeMs);
得到的就是元数据拉取流程所花费的时间
private long waitOnMetadata(String topic, long maxWaitMs) throws InterruptedException {// Metadata组件中已经缓存、加载过元数据的Topic,会放到Set集合中。// 第一次发送消息到某个Topic,Set集合没有这个Topic,那就准备拉取if (!this.metadata.containsTopic(topic))this.metadata.add(topic);// metadata.fetch()得到的就是Cluster实例,这里是判断Cluster中的Map>是否有这个Topic// 说明这个Topic的元数据信息在Cluster Map中能查到(已经被缓存了),无需等待拉取if (metadata.fetch().partitionsForTopic(topic) != null)// 元数据拉取过程中的阻塞等待时间 = 0return 0;long begin = time.milliseconds();// 最多能阻塞等待的时间,默认:60slong remainingWaitMs = maxWaitMs;// 只要Cluster实例中的Map>集合中没有这个Topic,就得触发“元数据拉取操作”while (metadata.fetch().partitionsForTopic(topic) == null) {log.trace("Requesting metadata update for topic {}.", topic);// step 1:将Sender线程拉取元数据的标志位,设为trueint version = metadata.requestUpdate();// step 2:唤醒Sender线程,底层就是唤醒NetworkClient(让它不要阻塞等待了),准备异步拉取元数据sender.wakeup();// step 3:Metadata准备以同步阻塞的方式,等待元数据的拉取结果metadata.awaitUpdate(version, remainingWaitMs);// 整个“元数据异步拉取而同步等待”所花费的时间 = 当前时间戳 - 元数据拉取前夕的时间戳long elapsed = time.milliseconds() - begin;// 如果等待元数据拉取所花费的时间大于默认的60s,抛出超时异常if (elapsed >= maxWaitMs)throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");// 某个Topic尚未被授权访问,抛异常if (metadata.fetch().unauthorizedTopics().contains(topic))throw new TopicAuthorizationException(topic);// 剩余等待时间 = 默认的60s - 同步阻塞所花费的时间remainingWaitMs = maxWaitMs - elapsed;}// 以同步阻塞的方式等待元数据拉取成功所花费的时间 = 元数据拉取所花费的时间 + 一些边边角角return time.milliseconds() - begin;
}
首先要判断目标Topic的元数据是否已经缓存,如果没有,那就准备拉取。
首先将Metadata组件中是否需要拉取元数据的标志位,设为true,表示现在需要拉取元数据。
public synchronized int requestUpdate() {// 是否需要拉取元数据的标志位,设为truethis.needUpdate = true;// 返回“拉取元数据”过程中用到的版本号return this.version;
}
元数据拉取工作是由Sender负责完成的,底层就是唤醒NetworkClient,让它不要阻塞等待了,准备异步拉取元数据
当Sender线程运行时,会触发执行它的run()方法。
void run(long now) {// 如果某些做好发送准备的Partition的元数据都还没拉取到(不知道Leader是谁),就标识一下if (result.unknownLeadersExist)// 将“需要拉取元数据的标志位”更新为:truethis.metadata.requestUpdate();// 省略部分代码...// 万能poll()方法this.client.poll(pollTimeout, now);
}
其中如果哪个Partition的Leader还不知道是谁,就强制刷新一次元数据。最后调用万能poll方法拉取元数据
public List poll(long timeout, long now) {// MetadataUpdater组件是专门用来更新元数据的,调用MetadataUpdater#maybeUpdate()拉取元数据// 内部会构建专门用于向Broker发送请求的MetadataRequestlong metadataTimeout = metadataUpdater.maybeUpdate(now);// 省略部分代码...List responses = new ArrayList<>();handleCompletedSends(responses, updatedNow);// 发送出去的MetadataRequest,收到了响应,现在处理这些响应handleCompletedReceives(responses, updatedNow);// 省略部分代码...return responses;
}
底层通过MetadataUpdater组件完成拉取动作,本质就是创建拉取元数据的请求–MetadataRequest,将其封装成ClientRequest,最后由Selector将其发送出去
private void maybeUpdate(long now, Node node) {if (node == null) {log.debug("Give up sending metadata request since no node is available");this.lastNoNodeAvailableMs = now;return;}String nodeConnectionId = node.idString();if (canSendRequest(nodeConnectionId)) {this.metadataFetchInProgress = true;// 首先创建好拉取元数据要发送的请求:MetadataRequestMetadataRequest metadataRequest;if (metadata.needMetadataForAllTopics())metadataRequest = MetadataRequest.allTopics();elsemetadataRequest = new MetadataRequest(new ArrayList<>(metadata.topics()));// 将拉取元数据的请求,封装成ClientRequestClientRequest clientRequest = request(now, nodeConnectionId, metadataRequest);log.debug("Sending metadata request {} to node {}", metadataRequest, node.id());// 核心:真正发送请求调用的是request()方法doSend(clientRequest, now);} else if (connectionStates.canConnect(nodeConnectionId, now)) {log.debug("Initialize connection to node {} for sending metadata request", node.id());initiateConnect(node, now);} else { this.lastNoNodeAvailableMs = now;}
}
然后将这个请求发送出去,走的是基于Java NIO封装的KafkaChannel将其发送到
/*** 真正发送请求的方法*/
private void doSend(ClientRequest request, long now) {request.setSendTimeMs(now);this.inFlightRequests.add(request);// 通过Selectable组件发起请求,该组件是Kafka中专用于网络I/O操作的selector.send(request.request());
}
万能poll()方法将“元数据拉取”的这个ClientRequest 发送出去后,总归是能接收到响应的。于是,调用handleCompletedReceives()方法处理响应
/*** 处理Broker对MetadataRequest的响应*/
private void handleCompletedReceives(List responses, long now) {for (NetworkReceive receive : this.selector.completedReceives()) {String source = receive.source();ClientRequest req = inFlightRequests.completeNext(source);Struct body = parseResponse(receive.payload(), req.request().header());// 如果这个请求是一个metadata request,那就立即处理,并返回trueif (!metadataUpdater.maybeHandleCompletedReceive(req, now, body))responses.add(new ClientResponse(req, now, false, body));}
}@Override
public boolean maybeHandleCompletedReceive(ClientRequest req, long now, Struct body) {short apiKey = req.request().header().apiKey();if (apiKey == ApiKeys.METADATA.id && req.isInitiatedByNetworkClient()) {handleResponse(req.request().header(), body, now);return true;}return false;
}/*** 处理响应
*/
private void handleResponse(RequestHeader header, Struct body, long now) {this.metadataFetchInProgress = false;MetadataResponse response = new MetadataResponse(body);// 从MetadataResponse中取出最新拉取到的元数据Cluster cluster = response.cluster();// check if any topics metadata failed to get updatedMap errors = response.errors();if (!errors.isEmpty())log.warn("Error while fetching metadata with correlation id {} : {}", header.correlationId(), errors);// don't update the cluster if there are no valid nodes...the topic we want may still be in the process of being// created which means we will get errors and no nodes until it existsif (cluster.nodes().size() > 0) {// 拉取元数据的请求,最终会得到响应。// 现在就是要将响应的Cluster交给Metadata更新,内部会调用notifyAll方法唤醒当初阻塞等待拉取结果的主线程this.metadata.update(cluster, now);} else {log.trace("Ignoring empty metadata response with correlation id {}.", header.correlationId());this.metadata.failedUpdate(now);}
}
从获取到的响应MetadataResponse 中,取出最新拉取到的元数据Cluster,将其更新到Metadata组件中去,最后notifyAll()唤醒当初因为wait(60s)而阻塞的线程
/*** 向Broker发请求拉取元数据,得到响应之后,会从响应中取出最新拉取到的Cluster,将其赋值给this.cluster。* 然后notifyAll()唤醒当初因为wait(60s)而阻塞的线程*/
public synchronized void update(Cluster cluster, long now) {// 将“是否需要update元数据”的标记设为false,即现在不需要updatethis.needUpdate = false;// 将“最近的刷新时间”和“成功刷新时间”都设为nowthis.lastRefreshMs = now;this.lastSuccessfulRefreshMs = now;// 每次成功update元数据后,就会对version加1this.version += 1;// 拉取元数据使用的监听器for (Listener listener: listeners)listener.onMetadataUpdate(cluster);// Do this after notifying listeners as subscribed topics' list can be changed by listeners// needMetadataForAllTopics(默认false)表示:将所有Topic的元数据都刷新一次// 于是将刚刚包装好的Cluster实例赋值给this.clusterthis.cluster = this.needMetadataForAllTopics ? getClusterForCurrentTopics(cluster) : cluster;// 由于本方法由synchronized修饰,是线程安全的,所以Thread-1抢到了锁,执行该方法,Thread-2就得wait进入休眠状态// 此时调用notifyAll()方法就会唤醒(处于休眠状态的)Thread-2,Thread-2就又能争抢锁了notifyAll();log.debug("Updated cluster metadata version {} to {}", this.version, this.cluster);
}
通过wait()方法让所在线程以同步阻塞的方式,等待元数据拉取结果。while循环的判定条件是以元数据版本号version为准,只要元数据拉取成功,必然会更新version,此时也就能跳出while循环了。
public synchronized void awaitUpdate(final int lastVersion, final long maxWaitMs) throws InterruptedException {if (maxWaitMs < 0) {throw new IllegalArgumentException("Max time to wait for metadata updates should not be < 0 milli seconds");}long begin = System.currentTimeMillis();// 最多能够阻塞等待的时间long remainingWaitMs = maxWaitMs;// while循环等待元数据拉取结果,啥时候拉取成功了,version就会自增+1,就能跳出while循环while (this.version <= lastVersion) {// 最多能够阻塞等待的时间也正常if (remainingWaitMs != 0)// wait释放锁,让业务逻辑所在的线程阻塞等待最长60swait(remainingWaitMs);// 已经因为阻塞等待而耗费的时间long elapsed = System.currentTimeMillis() - begin;// 如果等待元数据拉取结果的的时间超过了默认的60s,就抛出异常if (elapsed >= maxWaitMs)throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");// 否则,表示元数据拉取过程并未超时,计算出剩余还需要阻塞等待的时间 = 默认的60s - 已经花费的时间remainingWaitMs = maxWaitMs - elapsed;}
}
如果在默认的阻塞等待时间内,成功拉取到了集群元数据,那就计算出整个拉取流程的耗费时间并return。
一旦拉取超时,那就得抛出TimeoutException。异常抛出会中断上述while循环,异常信息会传递到waitOnMetadata()方法,于是waitOnMetadata()方法就会抛出InterruptedException。
最外层的doSend方法捕获到InterruptedException异常后,会专门对其进行处理:
catch (InterruptedException e) {// 如果拉取元数据的过程超过了60s,就会将TimeoutException抛出来,在这里catch住,并通过onSendError回调交给开发者this.errors.record();if (this.interceptors != null)this.interceptors.onSendError(record, tp, e);throw new InterruptException(e);
}
在KafkaProducer初始化时,并不会拉取集群元数据,仅仅是将Broker包装成了Node,并add到了List中用来构建Cluster实例。
发送消息时加载元数据,之所以采用“同步阻塞等待 + 异步拉取”的方式,是因为既不想无脑的同步阻塞在那,也不想无限制的等待异步结果。如果60s内拉取成功,wait的线程就会唤醒,正常走以后的逻辑;如果60s内没拉取到,那就主动抛异常让最外层捕获、视情况处理…