RabbitMQ Java开发教程(三)—官方原版
创始人
2025-05-30 02:38:42
0

一、高级连接选项

1、使用者操作线程池

使用者线程(请参阅下面的接收)是 在新的执行程序服务线程池中自动分配 默认情况下。如果需要更大的控制,请在 newConnection() 方法上提供 ExecutorService,以便此线程池 改用。下面是一个示例,其中较大的线程池是 供应量超过正常分配量:

ExecutorService es = Executors.newFixedThreadPool(20);
Connection conn = factory.newConnection(es);

执行程序和执行程序服务类 在 java.util.并发包中。

当连接关闭时,默认的 ExecutorService 将是 shutdown(),但用户提供的 ExecutorService(如上面的 es)不会是 shutdown()。 提供自定义执行器服务的客户端必须确保 它最终会关闭(通过调用其 shutdown() 方法),否则池的线程可能会阻止 JVM 终止。

同一个执行器服务可以在多个连接之间共享, 或在重新连接时串行重复使用,但在关闭()后无法使用。

只有在有证据的情况下才应考虑使用此功能 消费者回调的处理存在严重瓶颈。 如果没有执行的消费者回调,或者很少执行,则默认 分配绰绰有余。开销最初是最小的,并且 分配的总线程资源是有限制的,即使使用者突发 活动可能偶尔发生。

2、使用地址解析程序接口进行服务发现

可以使用地址解析器的实现来更改端点解析算法 连接时使用:

Connection conn = factory.newConnection(addressResolver);

地址解析程序接口如下所示:

public interface AddressResolver {List
getAddresses() throws IOException;}

就像端点列表一样, 返回的第一个地址将首先尝试,然后 如果客户端无法连接到第一个,则为第二个,依此类推。

如果还提供了执行人服务(使用 formfactory.newConnection(es, addressResolver)) 线程池是 与(第一个)成功连接相关联。

地址解析器是实现的理想场所 自定义服务发现逻辑,这在动态中特别有用 基础设施。结合自动恢复, 客户端可以自动连接到甚至没有启动的节点 刚开始的时候。关联性和负载平衡是其他 自定义地址解析程序可能有用的方案。

Java 客户端附带以下实现 (有关详细信息,请参阅 javadoc):

  • DnsRecordIpAddressResolver:给定名称 ,返回其 IP 地址(分辨率针对 平台 DNS 服务器)。这对于简单的 基于 DNS 的负载平衡或故障转移。

  • DnsSrvRecordAddressResolver:给定名称 ,返回主机名/端口对。搜索是 作为 DNS SRV 请求实现。这可能很有用 使用像HashiCorp Consul这样的服务注册表时。

3、检测信号超时

要在 Java 客户机中配置检测信号超时,请在之前使用 ConnectionFactory#setRequestHeartbeat 进行设置。 创建连接:

ConnectionFactory cf = new ConnectionFactory();// set the heartbeat timeout to 60 seconds
cf.setRequestedHeartbeat(60);

请注意,如果 RabbitMQ 服务器具有非零检测信号超时 配置(这是默认值),客户端只能降低该值,而不能增加该值。

二、自定义线程工厂

Google App Engine (GAE) 等环境可能会受到限制 直接线程实例化。要在此类环境中使用 RabbitMQ Java 客户端, 有必要配置一个自定义线程工厂,该 实例化线程的适当方法,例如 GAE 的 ThreadManager。

下面是谷歌应用引擎的示例。

import com.google.appengine.api.ThreadManager;ConnectionFactory cf = new ConnectionFactory();
cf.setThreadFactory(ThreadManager.backgroundThreadFactory());

三、支持 Java 非阻塞 IO

Java 客户端 4.0 版带来了对 Java 非阻塞的支持 IO(又名Java NIO)。蔚来不应该比阻塞IO更快, 它只是允许更轻松地控制资源(在本例中为线程)。

在默认阻塞 IO 模式下,每个连接都使用线程来读取 从网络套接字。使用NIO模式,您可以控制数量 从网络套接字读取和写入的线程。

如果您的 Java 进程使用多个连接(数十个或数百个),请使用 NIO 模式。 与默认阻塞模式相比,应使用更少的线程。随着 设置适当数量的线程,您不应该 遇到任何性能下降,尤其是在连接 没那么忙。

必须显式启用 NIO:

ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.useNio();

NIO 模式可以通过 NioParams 类进行配置:

connectionFactory.setNioParams(new NioParams().setNbIoThreads(4));

NIO 模式使用合理的默认值,但您可能需要根据 到您自己的工作负载。一些设置是:IO 的总数 使用的线程、缓冲区的大小、用于 IO 循环的服务执行器, 内存中写入队列的参数(写入请求在之前排队 在网络上发送)。请阅读 Javadoc 了解详细信息和默认值。

四、从网络故障中自动恢复

1、连接恢复

客户端和 RabbitMQ 节点之间的网络连接可能会失败。 RabbitMQ Java 客户端支持自动恢复连接 和拓扑(队列、交换、绑定和使用者)。

许多应用程序的自动恢复过程遵循以下步骤:

  • 重新连接

  • 恢复连接侦听器

  • 重新打开通道

  • 恢复频道侦听器

  • 恢复通道基本qos设置、发布者确认和事务设置

拓扑恢复包括对每个通道执行的以下操作

  • 重新申报交易所(预定义交易所除外)

  • 重新声明队列

  • 恢复所有绑定

  • 恢复所有消费者

从 Java 客户机版本 4.0.0 开始,已启用自动恢复 默认情况下(因此拓扑恢复也是如此)。

拓扑恢复依赖于实体(队列、交换、 绑定,消费者)。例如,当在连接上声明队列时,它将被添加到缓存中。 何时删除或计划删除(例如,因为它是自动删除的) 它将被删除。此模型具有下面介绍的一些限制。

要禁用或启用自动连接恢复,请使用 factory.setAutomaticRecoveryEnabled(boolean)方法。以下代码片段演示如何显式 启用自动恢复(例如,对于 4.0.0 之前的 Java 客户机):

ConnectionFactory factory = new ConnectionFactory();
factory.setUsername(userName);
factory.setPassword(password);
factory.setVirtualHost(virtualHost);
factory.setHost(hostName);
factory.setPort(portNumber);
factory.setAutomaticRecoveryEnabled(true);
// connection that will recover automatically
Connection conn = factory.newConnection();

如果恢复由于异常而失败(例如 RabbitMQ 节点 仍然无法访问),它将在固定时间间隔后重试(默认 为 5 秒)。可以配置间隔:

ConnectionFactory factory = new ConnectionFactory();
// attempt recovery every 10 seconds
factory.setNetworkRecoveryInterval(10000);

提供地址列表时,列表将被打乱,并且 一个接一个地尝试所有地址:

ConnectionFactory factory = new ConnectionFactory();Address[] addresses = {new Address("192.168.1.4"), new Address("192.168.1.5")};
factory.newConnection(addresses);

2、何时会触发连接恢复?

自动连接恢复(如果启用)将由以下事件触发:

  • 在连接的 I/O 循环中引发 I/O 异常

  • 套接字读取操作超时

  • 检测到丢失的服务器检测信号

  • 在连接的 I/O 循环中引发任何其他意外异常

以先发生者为准。

如果客户端与 RabbitMQ 节点的初始连接失败,则自动连接 复苏不会启动。应用程序开发人员负责重试 此类连接,记录失败的尝试,对数量实施限制 重试次数等。这是一个非常基本的例子:

ConnectionFactory factory = new ConnectionFactory();
// configure various connection settingstry {Connection conn = factory.newConnection();
} catch (java.net.ConnectException e) {Thread.sleep(5000);// apply retry logic
}

当应用程序通过 Connection.Close 方法关闭连接时, 不会启动连接恢复。

通道级异常不会像往常那样触发任何类型的恢复 指示应用程序中的语义问题(例如,尝试从 不存在的队列)。

3、恢复侦听器

可以在可恢复的连接上注册一个或多个恢复侦听器 和频道。启用连接恢复后,ConnectionFactory#newConnection 和 Connection#createChannel 返回的连接实现 com.rabbitmq.client.Recoverable,提供两种方法 相当描述性的名称:

  • addRecoveryListener

  • removeRecoveryListener

请注意,您当前需要将连接和通道转换为可恢复才能使用这些方法。

4、对发布的影响

连接断开时使用Channel.basicPublish发布的消息将丢失。连接恢复后,客户端不会将它们排入队列以进行传递。为了确保已发布的消息到达RabbitMQ应用程序需要使用Publisher Confirms并说明连接故障。

5、拓扑恢复

拓扑恢复涉及交换、队列、绑定的恢复 和消费者。当自动恢复 启用。默认情况下,拓扑恢复在新式版本的客户端中处于启用状态。

如果需要,可以显式禁用拓扑恢复:

ConnectionFactory factory = new ConnectionFactory();Connection conn = factory.newConnection();
// enable automatic recovery (e.g. Java client prior 4.0.0)
factory.setAutomaticRecoveryEnabled(true);
// disable topology recovery
factory.setTopologyRecoveryEnabled(false);

6、故障检测和恢复限制

自动连接恢复有许多限制和有意 应用程序开发人员需要注意的设计决策。

拓扑恢复依赖于实体(队列、交换、 绑定,消费者)。例如,当在连接上声明队列时,它将被添加到缓存中。 何时删除或计划删除(例如,因为它是自动删除的) 它将被删除。这使得在不同 频道没有意外结果。这也意味着消费者标签(特定于渠道的标识符) 在使用自动连接恢复的连接上的所有通道中必须是唯一的。

当连接断开或丢失时,需要时间来检测。 因此,有一个时间窗口,其中 库和应用程序不知道有效 连接失败。在此期间发布的任何消息 时间范围被序列化并写入 TCP 套接字 照常。他们交付给经纪人只能是 通过发布者保证 确认:在AMQP 0-9-1中发布完全是 异步设计。

当 启用自动恢复的连接,恢复 在可配置的延迟后开始,5 秒后 违约。此设计假设即使很多 网络故障是暂时性的,通常很短 活着,他们不会瞬间消失。有延迟 还避免了服务器端资源之间的固有争用条件 清理(例如独占或自动删除队列删除) 以及对同一资源上新打开的连接执行的操作。

默认情况下,连接恢复尝试将以相同的时间间隔继续,直到 已成功打开新连接。 可以通过向 ConnectionFactory#setRecoveryDelayHandler 提供 RecoveryDelayHandler 实现实例来使恢复延迟成为动态的。 使用动态计算延迟间隔的实现应避免 过低的值(例如小于 2 秒的值)。

当连接处于恢复状态时,任何 在其频道上尝试发布将被拒绝 有一个例外。客户端当前未执行 此类传出消息的任何内部缓冲。是的 应用程序开发人员有责任跟踪此类情况 消息,并在恢复成功后重新发布它们。发布者确认是协议扩展 这应该由无法承受消息丢失的发布者使用。

当通道因以下原因而关闭时,连接恢复不会启动 通道级异常。此类异常通常表示应用程序级别 问题。图书馆无法就何时做出明智的决定 案子。

即使在连接恢复启动后,也不会恢复已关闭的频道。 这包括显式关闭的通道和通道级异常 以上案例。

7、手动确认和自动恢复

使用手动确认时,可能会 消息之间与 RabbitMQ 节点的网络连接失败 交付和确认。连接恢复后, RabbitMQ 将重置所有通道上的交付标签。

这意味着使用旧交付标记的 basic.ack、basic.nack 和 basic.reject 将导致通道异常。为了避免这种情况, RabbitMQ Java 客户端跟踪并更新交付标签,使其单调 在恢复之间增长。

Channel.basicAck、Channel.basicNack 和 Channel.basicReject 然后翻译调整 将标签传递到RabbitMQ使用的标签中。

不会发送带有过时递送标签的确认。应用 使用手动确认和自动恢复必须 能够处理重新交付。

8、通道生命周期和拓扑恢复

自动连接恢复应尽可能透明 对于应用程序开发人员来说,这就是通道实例的原因 即使多个连接失败并在后台恢复,也保持不变。 从技术上讲,当自动恢复处于打开状态时,通道实例 充当代理或装饰者:他们将 AMQP 业务委托给 实际的AMQP通道实现并围绕它实现一些恢复机制。 这就是为什么您不应该在频道创建一些资源后关闭频道的原因 这些资源的队列、交换、绑定或拓扑恢复 稍后将失败,因为通道已关闭。相反,请保持创建渠道处于打开状态 在应用程序的生命周期内。

相关内容

热门资讯

四川五日游报团指南及详细行程,... 四川,这片位于中国西南的神奇土地,以其独特的自然风光、丰富的文化遗产和诱人的美食而闻名遐迩。从成都的...
原创 中... 在2025年4月初,时任美国总统的特朗普正式启动了针对世界各国的关税战,旨在通过实施经济制裁来促进美...
牛市主升浪开启了?别急!珍惜布... 本周,A股市场上行,主要宽基指数都收获了或多或少的周涨幅,其中,科创50、微盘股涨幅居前。板块方面,...
公募二季报两大看点!港股配置逼... 本报(chinatimes.net.cn)记者栗鹏菲 叶青 北京报道 2025年公募基金二季报披露收...
长和出售港口磋商期或延长 随着可能出现的各方介入及交易结构变化,此次长和港口出售交易如继续进行,其复杂性会提升 文 |《财经》...
中航重机涨0.17%,成交额4... 来源:新浪证券-红岸工作室 7月25日,中航重机涨0.17%,成交额4.14亿元,换手率1.52%,...
重仓电子和新能源行业 【深圳商报讯】(记者 陈燕青)基金二季报出炉,公募二季度依然重仓电子、新能源、食品饮料等行业。公募排...
大婚之后,大笔减持!昔日全球首... 当地时间7月25日,亚马逊公司提交至美国证券交易委员会的文件显示,前全球首富、亚马逊创始人杰夫·贝索...
创源股份涨2.32%,成交额3... 来源:新浪证券-红岸工作室 7月25日,创源股份涨2.32%,成交额3.50亿元,换手率8.32%,...
筹备登陆韩国综合股价指数!大韩... 近日,大韩造船(Daehan Shipbuilding)的首次公开募股(IPO)发行价最终确定为每股...
山东政商要情(7.21—7.2... 记者 王惠 1,2025年上半年山东GDP50046亿元 增长5.6% 7月21日,山东省统计局、国...
《法学基本概念导论》| 专研法... 导言 本书是对权利、义务、法律主体、法律规范、法律渊源、法律行为等法学基本概念(juristic f...
上海AI新动向:世界AI合作组... 在今日的天气状况下,上海迎来了阴到多云的天气,偶尔还有阵雨光顾,气温徘徊在27至31摄氏度之间,给市...
山鹰国际跌1.52%,成交额2... 来源:新浪证券-红岸工作室 7月25日,山鹰国际跌1.52%,成交额2.50亿元,换手率2.33%,...
马斯克擎天柱解决不了无「手」难... 新智元报道 编辑:英智 【新智元导读】马斯克说人形机器人是特斯拉的未来,可今年5000台的目标才刚...
开封警方回应网传“释永信相关警... 7月27日,开封市公安局官方微博回复网友评论时表示:“(网传释永信相关)通报是假的,请不要再传播,目...
创新业务模式 提升开放水平 近日,在东营综合保税区食用油分装生产车间,工人们正在进行进口豆油灌装作业。 近年来,东营综合保税区...
中国资本市场学会成立!吴清当选... 来源:证监会发布 2025年7月26日,中国资本市场学会成立大会暨第一届第一次会员代表大会在上...
本周外盘看点丨美联储最新决议来... 来源:第一财经 欧美二季度GDP表现如何,特朗普关税谈判“大限”到来。 上周国际市场风云变幻,美国...
生态环境部逯世泽:全国碳市场量... 21世纪经济报道记者雷椰 李德尚玉 北京报道 7月26日,由冶金工业规划研究院主办,中国节能协会冶金...