RabbitMQ系列【17】RabbitOperations接口详解
admin
2024-03-03 08:54:55
0

有道无术,术尚可求,有术无道,止于术。

文章目录

    • 前言
    • send
    • convertAndSend
    • convertSendAndReceive
    • execute
    • invoke
    • waitForConfirms
    • getConnectionFactory
    • Other

前言

在上一篇,我们介绍了RabbitTemplate 实现AmqpTemplate接口的所有方法,接下来学习下其实现的另外一个接口RabbitOperations

AmqpTemplate是对AMQP协议的支持,完成了基本的发送、接收消息,而RabbitOperations是对RabbitMQ的直接集成,提供了更细致的操作。

send

send 方法,主要是添加了一个CorrelationData参数。

CorrelationData用于发布确认、退回模式时进行数据封装,该对象会返回ACK以及原因,开启了退回模式时,还会返回退回信息。

public class CorrelationData implements Correlation {// 异步执行的结果,Confirm表示返回结果的类型private final SettableListenableFuture future = new SettableListenableFuture();// 唯一ID,如果未提供id将自动设置为唯一值。private volatile String id;// 退回时返回信息private volatile ReturnedMessage returnedMessage;
}
	// 发送消息,传递CorrelationData 对象default void send(String routingKey, Message message, CorrelationData correlationData)throws AmqpException {throw new UnsupportedOperationException("This implementation does not support this method");}// 指定交换机、路由、传递CorrelationData 对象void send(String exchange, String routingKey, Message message, CorrelationData correlationData)throws AmqpException;

convertAndSend

convertAndSendsend方法的基础上,可以直接发送JAVA 对象,并可以添加一个MessagePostProcessor 消息处理器。

	// 使用自定义路由KEY。发送消息到默认交换机,并携带CorrelationData void convertAndSend(String routingKey, Object message, CorrelationData correlationData) throws AmqpException;void convertAndSend(String exchange, String routingKey, Object message, CorrelationData correlationData)throws AmqpException;void convertAndSend(Object message, MessagePostProcessor messagePostProcessor, CorrelationData correlationData)throws AmqpException;void convertAndSend(String routingKey, Object message, MessagePostProcessor messagePostProcessor,CorrelationData correlationData) throws AmqpException;void convertAndSend(String exchange, String routingKey, Object message, MessagePostProcessor messagePostProcessor,CorrelationData correlationData) throws AmqpException;void correlationConvertAndSend(Object message, CorrelationData correlationData) throws AmqpException;

convertSendAndReceive

convertSendAndReceiveAmqpTemplate接口中发送并接收消息一样,是RPC模式,区别是多了个CorrelationData 参数。

	@NullableObject convertSendAndReceive(Object message, CorrelationData correlationData) throws AmqpException;@NullableObject convertSendAndReceive(String routingKey, Object message, CorrelationData correlationData)throws AmqpException;@NullableObject convertSendAndReceive(String exchange, String routingKey, Object message,CorrelationData correlationData) throws AmqpException;@NullableObject convertSendAndReceive(Object message, MessagePostProcessor messagePostProcessor,CorrelationData correlationData) throws AmqpException;@NullableObject convertSendAndReceive(String routingKey, Object message,MessagePostProcessor messagePostProcessor, CorrelationData correlationData) throws AmqpException;@NullableObject convertSendAndReceive(String exchange, String routingKey, Object message,MessagePostProcessor messagePostProcessor, CorrelationData correlationData)throws AmqpException;@Nullable T convertSendAndReceiveAsType(Object message, CorrelationData correlationData,ParameterizedTypeReference responseType) throws AmqpException;@Nullable T convertSendAndReceiveAsType(String routingKey, Object message, CorrelationData correlationData,ParameterizedTypeReference responseType) throws AmqpException;@Nullabledefault  T convertSendAndReceiveAsType(String exchange, String routingKey, Object message,@Nullable CorrelationData correlationData, ParameterizedTypeReference responseType)throws AmqpException {return convertSendAndReceiveAsType(exchange, routingKey, message, null, correlationData, responseType);}@Nullable T convertSendAndReceiveAsType(Object message, MessagePostProcessor messagePostProcessor,CorrelationData correlationData, ParameterizedTypeReference responseType) throws AmqpException;@Nullable T convertSendAndReceiveAsType(String routingKey, Object message,MessagePostProcessor messagePostProcessor, CorrelationData correlationData,ParameterizedTypeReference responseType) throws AmqpException;@Nullable T convertSendAndReceiveAsType(String exchange, String routingKey, Object message,@Nullable MessagePostProcessor messagePostProcessor,@Nullable CorrelationData correlationData,ParameterizedTypeReference responseType) throws AmqpException;

execute

execute方法可以获取原生Channel执行操作,需要一个ChannelCallback参数。

	@Nullable T execute(ChannelCallback action) throws AmqpException;

ChannelCallback是一个函数式接口,使用该接口,可以获取RabbitMQChannel,执行任意操作,并返回结果。

@FunctionalInterface
public interface ChannelCallback {/*** @param channel 通道* @return 返回结果*/@NullableT doInRabbit(Channel var1) throws Exception;
}

示例:

        ChannelCallback stringChannelCallback = new ChannelCallback() {@Overridepublic Boolean doInRabbit(Channel channel) throws Exception {// 调用Channel 发送消息channel.basicPublish(MqBizConfig.BIZ_EXCHANGE,MqBizConfig.BIZ_ROUTE_KEY,null,"消息".getBytes());System.out.println("doInRabbit");return true;}};Boolean execute = rabbitTemplate.execute(stringChannelCallback);System.out.println("结果:"+execute);

invoke

invoke方法需要一个OperationsCallback参数,在该对象的doInRabbit()方法中,任何操作都使用相同的专用通道,该通道将在结束时关闭(不会返回到缓存)。这种使用方式就叫做范围内操作。

	@Nullabledefault  T invoke(OperationsCallback action) throws AmqpException {ret@Nullable T invoke(OperationsCallback action, @Nullable com.rabbitmq.client.ConfirmCallback acks,@Nullable com.rabbitmq.client.ConfirmCallback nacks);
}

OperationsCallback操作回调,可以获取RabbitOperations 执行操作,并返回结果。

	@FunctionalInterfaceinterface OperationsCallback {/*** @param operations RabbitOperations.* @return 结果.*/@NullableT doInRabbit(RabbitOperations operations);
}

waitForConfirms

waitForConfirmswaitForConfirmsOrDie都是等待确认,但是必须在invoke方法中使用,

	// 等待确认 boolean waitForConfirms(long timeout) throws AmqpException;// 等待确认,异常后信道被关闭,生产者发布不能继续发布消息void waitForConfirmsOrDie(long timeout) throws AmqpException;

getConnectionFactory

返回此操作的连接工厂。

	ConnectionFactory getConnectionFactory();

Other

startstop没有实现的方法,只是为了向后兼容。

	@Overridedefault void start() {// No-op - implemented for backward compatibility}@Overridedefault void stop() {// No-op - implemented for backward compatibility}@Overridedefault boolean isRunning() {return false;}

相关内容

热门资讯

“我真的撑不住了”,2000万... 5月14日、15日两天,知名搞笑博主“大连老湿王博文”,分别在微信公众号和小红书上发表长文,宣布断更...
原创 9... 邱 林 没有想到的是,日本对中东地区石油依赖度竟高达96%,其中,阿联酋占43%,沙特阿拉伯占39%...
华金策略:A股短期可能难大调整... 来源:市场资讯 来源:华金证券 投资要点 复盘历史,驱动TMT行情结束的核心因素是外部事件和政策偏空...
5月18日突然大跌,金价行情拐... 刚刷完5月18日凌晨的金价数据,伦敦金现直接暴跌113.8美元,报4537.83美元/盎司,单日跌幅...
深化资本与产业协同 打造AI智... 央广网北京5月18日消息(记者 郭彦伟)“这款熊猫医生AI机器人主要能帮助大家实现生命体征检测、AI...
实地调研深圳融资市场 细数贷款... 在当下经济发展节奏较快的深圳,各行各业的资金周转需求愈发普遍,从个体日常大额支出、家庭置业规划,到个...
上市公司交出近三年最好成绩单 ... 上市公司是经济高质量发展的重要微观基础,稳中向好的成绩单有力印证中国经济的强大韧性与活力。从上市公司...
接连吃罚单!这家券商债券业务“... 5月15日,国都证券及其债券从业人员收到了北京证监局发出的5份行政处罚。 罚单显示,因在公司债券承销...
原创 美... 特朗普本次的中国之行,其深远影响将直接牵动美国今年中期选举的最终走向,因此,他此番远渡重洋,无疑是怀...
AI高景气与盈利持续兑现 机构... 存储芯片指数日K线图   范雨露 制图 上周,全球主要股指普遍回调,A股市场同样冲高回落,创业板指创...
2026天津房交会暨“新房市集... 近日,2026天津房交会暨“新房市集”活动在津一·PARK正式启幕。此次房交会由天津市房地产市场服务...
原创 【... 各位朋友,最近是不是感觉金店门口的“今日金价”牌子,数字变得有点“刺眼”?没错,黄金它……真的跌了,...
原创 推... 俄罗斯财长安东·西卢安诺夫接受自家媒体采访,透露了两条重磅消息。 第一个:中俄双边贸易中,本币结算率...
兆易创新盘中涨停续创历史新高 ... 5月18日早盘,兆易创新盘中涨停,股价续创历史新高,报412.87元/股,成交金额超130亿元,A+...
原创 价... 过去三年价格战硝烟弥漫,汽车价格一降再降。 然而曾经杀得眼红的车企们,如今集体踩下刹车,汽车售价不降...
4月居民贷款大幅缩水近8000... 一边是楼市延续修复态势,“小阳春”行情持续演绎,重点城市二手房成交量大幅攀升;另一边是居民信贷数据的...
金价暴涨里的“套保”迷影,山东... 山东黄金冶炼业务。图源:企业官网 本报(chinatimes.net.cn)记者张蓓 黄指南 深圳报...
扬帆出海获佳绩!盐田区携手黄金... 2026年5月8日至10日 在马来西亚槟城举办的 “2026马来西亚黄金珠宝展销会”上 深圳市盐田区...
政策底与情绪顶:5月18日-2... 文/金透社 万捷 2026年5月第三周(5月11日-15日),A股市场走出了鲜明的分化格局。上证指数...
证监会重罚欺诈发行,广发证券被... 4.63亿元。 这是2026年5月,证监会对清越科技、元道通信两家公司欺诈发行、财务造假的罚款总额。...