Kafka元数据拉取流程
admin
2024-02-26 17:31:07
0

文章目录

  • 元数据采用的数据结构
  • KafkaProducer初始化时的拉取流程
  • 消息发送时如何拉取元数据
    • 1.更新拉取标志位
    • 2.唤醒Sender线程,异步拉取
    • 3.同步阻塞,等待拉取结果
  • 总结

元数据采用的数据结构

public final class Cluster {private final boolean isBootstrapConfigured;// 一个Node就代表一个Brokerprivate final List nodes;// 尚未被授权访问的Kafka列表,Kafka是支持权限访问的private final Set unauthorizedTopics;// 映射关系为:“某个Topic下的某个Partition:Partition的详细信息”// TopicPartition指的是Topic1中的Partition1,PartitionInfo为具体某个Partition的详细信息private final Map partitionsByTopicPartition;// 映射关系为:“某个Topic:这个Topic下的Partition列表”private final Map> partitionsByTopic;// 映射关系为:“某个Topic:这个Topic下的可用的Partition列表”private final Map> availablePartitionsByTopic;// 映射关系为:“某个Broker ID:这个Broker上的所有Partition”// 某个Broker上有哪些Partition(可能来自不同的Topic)private final Map> partitionsByNode;// broker.id映射到Node的数据结构,映射关系为:“broker.id:Node”private final Map nodesById;
}

KafkaProducer初始化时的拉取流程

在KafkaProducer初始化时,会构造出集群元数据组件Metadata,且在初始化方法里有一次Metadata#update()方法调用。

// 构造核心组件:Metadata;用于去Broker集群拉取元数据(有哪些Topic,对应哪些Partition,其中哪个是leader、哪个是follower)
// 想往Broker发送一条ProducerRecord,就必须知道目标Topic,有哪几个Partition,其中Partition Leader在哪个Broker上
// 在KafkaProducer初始化时拉取一次元数据;后面每隔一段时间(metadata.max.age.ms,默认:5min)会刷新元数据;发送消息时如果元数不在本地,还得通过Metadata发送请求
this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG));// 省略部分代码......// 会把我们配置的Kafka Broker地址作为参数传入
this.metadata.update(Cluster.bootstrap(addresses), time.milliseconds());

在方法调用中,会传递2个参数,分别是Cluster实例对象和当前时间戳。对于Cluster实例的创建,是利用我们配置的Broker地址,将其包装成Node,并add到List< Node >中。最后利用List< Node >构造出Cluster实例对象。

public static Cluster bootstrap(List addresses) {List nodes = new ArrayList<>();int nodeId = -1;// 遍历传进来的Kafka Broker地址for (InetSocketAddress address : addresses)// 将Broker地址包装成Node后,添加到List集合中nodes.add(new Node(nodeId--, address.getHostString(), address.getPort()));// 利用List包装Cluster实例并返回return new Cluster(true, nodes, new ArrayList(0), Collections.emptySet());
}

之后再执行update()方法时,会将构造好的Cluster实例和当前时间戳传入。

/*** KafkaProducer初始化时,只是将将配置的Broker地址包装成Node后,添加到List集合中,利用List集合创建Cluster实例。* 传进来的参数就是Cluster实例。* 在KafkaProducer初始化时,并没有真正的去某个Broker上拉取元数据,只是将配置的Broker地址转换成了Node,* 以List的形式存到了Cluster实例中** 后面拉取元数据成功后处理响应时再调用该方法,就是更新Cluster了!*/
public synchronized void update(Cluster cluster, long now) {// 将“是否需要update元数据”的标记设为false,即不需要updatethis.needUpdate = false;// 将“最近的刷新时间”和“成功刷新时间”都设为nowthis.lastRefreshMs = now;this.lastSuccessfulRefreshMs = now;// 每次成功update元数据后,就会对version加1this.version += 1;// 拉取元数据使用的监听器for (Listener listener: listeners)listener.onMetadataUpdate(cluster);// Do this after notifying listeners as subscribed topics' list can be changed by listeners// needMetadataForAllTopics(默认false)表示:将所有Topic的元数据都刷新一次// 于是将刚刚包装好的Cluster实例赋值给this.clusterthis.cluster = this.needMetadataForAllTopics ? getClusterForCurrentTopics(cluster) : cluster;// 由于本方法由synchronized修饰,是线程安全的,所以Thread-1抢到了锁,执行该方法,Thread-2就得wait进入休眠状态// 此时调用notifyAll()方法就会唤醒(处于休眠状态的)Thread-2,Thread-2就又能争抢锁了notifyAll();log.debug("Updated cluster metadata version {} to {}", this.version, this.cluster);
}

可以看出,用来标记元数据是否需要拉取的标志位默认为false,还把管理元数据的版本号自增,此时很明显就不会去拉取元数据。
由于update()方法由synchronized修饰,所以在多线程并发执行时,同一时刻只会有一个线程抢占到锁(其他线程进入休眠等待状态),进而执行“更新元数据”操作。等本方法执行完毕后,就会通过notifyAll()唤醒其他处于休眠状态的线程。

消息发送时如何拉取元数据

在调用Kafka API的doSend()方法生产消息时,会(按需、以同步阻塞的方式)拉取元数据

// 以同步阻塞等待的方式(传参:同步阻塞的最大时间),去连接Broker拉取元数据:如果想往Topic发送消息,必须知道元数据,这样才能通过Partitioner选择一个Partition,
// 然后才能跟这个Partition对应的Leader建立连接、发送消息。其中调用本方法最多能够阻塞等待时间是:max.block.ms
// 返回的是“为了拉取元数据,总共花费的时间” = 元数据拉取的时间 + 一些边边角角
long waitedOnMetadataMs = waitOnMetadata(record.topic(), this.maxBlockTimeMs);

得到的就是元数据拉取流程所花费的时间

private long waitOnMetadata(String topic, long maxWaitMs) throws InterruptedException {// Metadata组件中已经缓存、加载过元数据的Topic,会放到Set集合中。// 第一次发送消息到某个Topic,Set集合没有这个Topic,那就准备拉取if (!this.metadata.containsTopic(topic))this.metadata.add(topic);// metadata.fetch()得到的就是Cluster实例,这里是判断Cluster中的Map>是否有这个Topic// 说明这个Topic的元数据信息在Cluster Map中能查到(已经被缓存了),无需等待拉取if (metadata.fetch().partitionsForTopic(topic) != null)// 元数据拉取过程中的阻塞等待时间 = 0return 0;long begin = time.milliseconds();// 最多能阻塞等待的时间,默认:60slong remainingWaitMs = maxWaitMs;// 只要Cluster实例中的Map>集合中没有这个Topic,就得触发“元数据拉取操作”while (metadata.fetch().partitionsForTopic(topic) == null) {log.trace("Requesting metadata update for topic {}.", topic);// step 1:将Sender线程拉取元数据的标志位,设为trueint version = metadata.requestUpdate();// step 2:唤醒Sender线程,底层就是唤醒NetworkClient(让它不要阻塞等待了),准备异步拉取元数据sender.wakeup();// step 3:Metadata准备以同步阻塞的方式,等待元数据的拉取结果metadata.awaitUpdate(version, remainingWaitMs);// 整个“元数据异步拉取而同步等待”所花费的时间 = 当前时间戳 - 元数据拉取前夕的时间戳long elapsed = time.milliseconds() - begin;// 如果等待元数据拉取所花费的时间大于默认的60s,抛出超时异常if (elapsed >= maxWaitMs)throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");// 某个Topic尚未被授权访问,抛异常if (metadata.fetch().unauthorizedTopics().contains(topic))throw new TopicAuthorizationException(topic);// 剩余等待时间 = 默认的60s - 同步阻塞所花费的时间remainingWaitMs = maxWaitMs - elapsed;}// 以同步阻塞的方式等待元数据拉取成功所花费的时间 = 元数据拉取所花费的时间 + 一些边边角角return time.milliseconds() - begin;
}

首先要判断目标Topic的元数据是否已经缓存,如果没有,那就准备拉取。

1.更新拉取标志位

首先将Metadata组件中是否需要拉取元数据的标志位,设为true,表示现在需要拉取元数据。

public synchronized int requestUpdate() {// 是否需要拉取元数据的标志位,设为truethis.needUpdate = true;// 返回“拉取元数据”过程中用到的版本号return this.version;
}

2.唤醒Sender线程,异步拉取

元数据拉取工作是由Sender负责完成的,底层就是唤醒NetworkClient,让它不要阻塞等待了,准备异步拉取元数据

当Sender线程运行时,会触发执行它的run()方法。

void run(long now) {// 如果某些做好发送准备的Partition的元数据都还没拉取到(不知道Leader是谁),就标识一下if (result.unknownLeadersExist)// 将“需要拉取元数据的标志位”更新为:truethis.metadata.requestUpdate();// 省略部分代码...// 万能poll()方法this.client.poll(pollTimeout, now);
}

其中如果哪个Partition的Leader还不知道是谁,就强制刷新一次元数据。最后调用万能poll方法拉取元数据

public List poll(long timeout, long now) {// MetadataUpdater组件是专门用来更新元数据的,调用MetadataUpdater#maybeUpdate()拉取元数据// 内部会构建专门用于向Broker发送请求的MetadataRequestlong metadataTimeout = metadataUpdater.maybeUpdate(now);// 省略部分代码...List responses = new ArrayList<>();handleCompletedSends(responses, updatedNow);// 发送出去的MetadataRequest,收到了响应,现在处理这些响应handleCompletedReceives(responses, updatedNow);// 省略部分代码...return responses;
}

底层通过MetadataUpdater组件完成拉取动作,本质就是创建拉取元数据的请求–MetadataRequest,将其封装成ClientRequest,最后由Selector将其发送出去

private void maybeUpdate(long now, Node node) {if (node == null) {log.debug("Give up sending metadata request since no node is available");this.lastNoNodeAvailableMs = now;return;}String nodeConnectionId = node.idString();if (canSendRequest(nodeConnectionId)) {this.metadataFetchInProgress = true;// 首先创建好拉取元数据要发送的请求:MetadataRequestMetadataRequest metadataRequest;if (metadata.needMetadataForAllTopics())metadataRequest = MetadataRequest.allTopics();elsemetadataRequest = new MetadataRequest(new ArrayList<>(metadata.topics()));// 将拉取元数据的请求,封装成ClientRequestClientRequest clientRequest = request(now, nodeConnectionId, metadataRequest);log.debug("Sending metadata request {} to node {}", metadataRequest, node.id());// 核心:真正发送请求调用的是request()方法doSend(clientRequest, now);} else if (connectionStates.canConnect(nodeConnectionId, now)) {log.debug("Initialize connection to node {} for sending metadata request", node.id());initiateConnect(node, now);} else { this.lastNoNodeAvailableMs = now;}
}

然后将这个请求发送出去,走的是基于Java NIO封装的KafkaChannel将其发送到

/*** 真正发送请求的方法*/
private void doSend(ClientRequest request, long now) {request.setSendTimeMs(now);this.inFlightRequests.add(request);// 通过Selectable组件发起请求,该组件是Kafka中专用于网络I/O操作的selector.send(request.request());
}

万能poll()方法将“元数据拉取”的这个ClientRequest 发送出去后,总归是能接收到响应的。于是,调用handleCompletedReceives()方法处理响应

/***  处理Broker对MetadataRequest的响应*/
private void handleCompletedReceives(List responses, long now) {for (NetworkReceive receive : this.selector.completedReceives()) {String source = receive.source();ClientRequest req = inFlightRequests.completeNext(source);Struct body = parseResponse(receive.payload(), req.request().header());// 如果这个请求是一个metadata request,那就立即处理,并返回trueif (!metadataUpdater.maybeHandleCompletedReceive(req, now, body))responses.add(new ClientResponse(req, now, false, body));}
}@Override
public boolean maybeHandleCompletedReceive(ClientRequest req, long now, Struct body) {short apiKey = req.request().header().apiKey();if (apiKey == ApiKeys.METADATA.id && req.isInitiatedByNetworkClient()) {handleResponse(req.request().header(), body, now);return true;}return false;
}/*** 处理响应
*/
private void handleResponse(RequestHeader header, Struct body, long now) {this.metadataFetchInProgress = false;MetadataResponse response = new MetadataResponse(body);// 从MetadataResponse中取出最新拉取到的元数据Cluster cluster = response.cluster();// check if any topics metadata failed to get updatedMap errors = response.errors();if (!errors.isEmpty())log.warn("Error while fetching metadata with correlation id {} : {}", header.correlationId(), errors);// don't update the cluster if there are no valid nodes...the topic we want may still be in the process of being// created which means we will get errors and no nodes until it existsif (cluster.nodes().size() > 0) {// 拉取元数据的请求,最终会得到响应。// 现在就是要将响应的Cluster交给Metadata更新,内部会调用notifyAll方法唤醒当初阻塞等待拉取结果的主线程this.metadata.update(cluster, now);} else {log.trace("Ignoring empty metadata response with correlation id {}.", header.correlationId());this.metadata.failedUpdate(now);}
}

从获取到的响应MetadataResponse 中,取出最新拉取到的元数据Cluster,将其更新到Metadata组件中去,最后notifyAll()唤醒当初因为wait(60s)而阻塞的线程

/*** 向Broker发请求拉取元数据,得到响应之后,会从响应中取出最新拉取到的Cluster,将其赋值给this.cluster。* 然后notifyAll()唤醒当初因为wait(60s)而阻塞的线程*/
public synchronized void update(Cluster cluster, long now) {// 将“是否需要update元数据”的标记设为false,即现在不需要updatethis.needUpdate = false;// 将“最近的刷新时间”和“成功刷新时间”都设为nowthis.lastRefreshMs = now;this.lastSuccessfulRefreshMs = now;// 每次成功update元数据后,就会对version加1this.version += 1;// 拉取元数据使用的监听器for (Listener listener: listeners)listener.onMetadataUpdate(cluster);// Do this after notifying listeners as subscribed topics' list can be changed by listeners// needMetadataForAllTopics(默认false)表示:将所有Topic的元数据都刷新一次// 于是将刚刚包装好的Cluster实例赋值给this.clusterthis.cluster = this.needMetadataForAllTopics ? getClusterForCurrentTopics(cluster) : cluster;// 由于本方法由synchronized修饰,是线程安全的,所以Thread-1抢到了锁,执行该方法,Thread-2就得wait进入休眠状态// 此时调用notifyAll()方法就会唤醒(处于休眠状态的)Thread-2,Thread-2就又能争抢锁了notifyAll();log.debug("Updated cluster metadata version {} to {}", this.version, this.cluster);
}

3.同步阻塞,等待拉取结果

通过wait()方法让所在线程以同步阻塞的方式,等待元数据拉取结果。while循环的判定条件是以元数据版本号version为准,只要元数据拉取成功,必然会更新version,此时也就能跳出while循环了。

public synchronized void awaitUpdate(final int lastVersion, final long maxWaitMs) throws InterruptedException {if (maxWaitMs < 0) {throw new IllegalArgumentException("Max time to wait for metadata updates should not be < 0 milli seconds");}long begin = System.currentTimeMillis();// 最多能够阻塞等待的时间long remainingWaitMs = maxWaitMs;// while循环等待元数据拉取结果,啥时候拉取成功了,version就会自增+1,就能跳出while循环while (this.version <= lastVersion) {// 最多能够阻塞等待的时间也正常if (remainingWaitMs != 0)// wait释放锁,让业务逻辑所在的线程阻塞等待最长60swait(remainingWaitMs);// 已经因为阻塞等待而耗费的时间long elapsed = System.currentTimeMillis() - begin;// 如果等待元数据拉取结果的的时间超过了默认的60s,就抛出异常if (elapsed >= maxWaitMs)throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");// 否则,表示元数据拉取过程并未超时,计算出剩余还需要阻塞等待的时间 = 默认的60s - 已经花费的时间remainingWaitMs = maxWaitMs - elapsed;}
}

如果在默认的阻塞等待时间内,成功拉取到了集群元数据,那就计算出整个拉取流程的耗费时间并return。

一旦拉取超时,那就得抛出TimeoutException。异常抛出会中断上述while循环,异常信息会传递到waitOnMetadata()方法,于是waitOnMetadata()方法就会抛出InterruptedException。

最外层的doSend方法捕获到InterruptedException异常后,会专门对其进行处理:

catch (InterruptedException e) {// 如果拉取元数据的过程超过了60s,就会将TimeoutException抛出来,在这里catch住,并通过onSendError回调交给开发者this.errors.record();if (this.interceptors != null)this.interceptors.onSendError(record, tp, e);throw new InterruptException(e);
} 

总结

在KafkaProducer初始化时,并不会拉取集群元数据,仅仅是将Broker包装成了Node,并add到了List中用来构建Cluster实例。

发送消息时加载元数据,之所以采用“同步阻塞等待 + 异步拉取”的方式,是因为既不想无脑的同步阻塞在那,也不想无限制的等待异步结果。如果60s内拉取成功,wait的线程就会唤醒,正常走以后的逻辑;如果60s内没拉取到,那就主动抛异常让最外层捕获、视情况处理…

相关内容

热门资讯

4月份银行理财规模环比增加2.... 钛媒体App 5月16日消息,银行理财市场在4月份迎来规模与收益的双增长。据华源证券廖志明团队发布的...
【光明日报】黑龙江:免签红利释... 5月10日早上7时,一辆国际大巴缓缓停靠在黑龙江绥芬河公路口岸入境大厅前。游客们提着大包小裹,依次走...
又一跨国高端化工合作项目落子乐... 5月15日,福华化学携手瑞士特种化学品企业科莱恩打造的创新型高端磷系无卤阻燃剂项目(以下简称“福华科...
鸿蒙智行:已拥有1951家销售... IT之家 5 月 15 日消息,鸿蒙智行智界 V9 发布会正在进行,官方透露目前已拥有 1951 家...
黄金、白银,直线大跌! 5月15日晚间,贵金属价格突然大跌! 截至记者发稿时,现货黄金跌超2%,暂报4553美元/盎司附近。...
央视《焦点访谈》聚焦!万兴科技... 深圳商报·读创客户端首席记者 谢惠茜 5月14日,中央电视台《焦点访谈》推出专题节目《扩能提质强服务...
东方嘉富人寿董事长履职半年被换... 文|达摩财经 东方嘉富人寿再度进行人事调整。 5月13日,东方嘉富人寿发布公告称,自2026年4月...
重返西决!文班19+6卡斯尔3... 【搜狐体育战报】北京时间5月16日NBA季后赛,客场作战的马刺以139-109击败森林狼,总比分4-...
原创 美... 十万亿美债为什么还没有崩盘?或许答案在于,中国的存在让局势与众不同。现在的美债就像一张看似脆弱的网,...
原创 茅... 最近打开股票软件看白酒板块,是不是心里拔凉拔凉的? 茅台又回到1300元区间了,五粮液跌破90元,洋...
茅台宣布涨价 5月15日深夜,“i茅台”APP发布公告称,按照随行就市、供需适配、量价平衡、相对平稳的原则,贵州茅...
最高涨200元!茅台官宣4款产... 贵州茅台(600519.SH)凌晨宣布涨价几款产品。 茅台数字营销平台“i茅台”今日(5月16日)发...
面向地方国资产融转型全链条,X... 5月15日,XOD创新投融资模式3.0产品发布会在广州举办。该产品主要面向地方国资产融协同创新转型提...
2026Q1:10家上市商超9... 截至4月30日,所有A股上市公司2026年Q1财报全部出炉,传统商超也晒出自己的成绩单。10家披露的...
入主盟科药业失利后,拟借款2.... 来源:时代周报-时代在线 继去年试图通过定增入主盟科药业(688373.SH)失败后,海鲸药业再度出...
同比激增86%、规模突破760... 图片来源:界面图库 界面新闻记者 | 孙艺真 今年以来,证券行业融资补血热潮持续升温。前5个月...
促进青年消费,扶持青年创业,上... 5月14日,上海市政协团青界别、经济界跨界别活动在市政协全过程人民民主实践点举行。 今年初,团市委立...
苹果股价昨日创收盘新高,站上3... IT之家 5 月 16 日消息,苹果公司股价昨日(5 月 15 日)收于 300.23 美元,首次站...
杭州首批配售型保障房正式入市 杭州首批配售型保障房正式入市 价格约为周边商品房5折,18日开始报名 不能入市交易,可由政府指定机构...
“后巴菲特时代”,伯克希尔调仓... 当地时间5月15日,伯克希尔披露了2026年一季度美股持仓报告。这是伯克希尔在巴菲特卸任CEO并由阿...