有道无术,术尚可求,有术无道,止于术。
在上一篇,我们介绍了RabbitTemplate 实现AmqpTemplate接口的所有方法,接下来学习下其实现的另外一个接口RabbitOperations。
AmqpTemplate是对AMQP协议的支持,完成了基本的发送、接收消息,而RabbitOperations是对RabbitMQ的直接集成,提供了更细致的操作。
send 方法,主要是添加了一个CorrelationData参数。
CorrelationData用于发布确认、退回模式时进行数据封装,该对象会返回ACK以及原因,开启了退回模式时,还会返回退回信息。
public class CorrelationData implements Correlation {// 异步执行的结果,Confirm表示返回结果的类型private final SettableListenableFuture future = new SettableListenableFuture();// 唯一ID,如果未提供id将自动设置为唯一值。private volatile String id;// 退回时返回信息private volatile ReturnedMessage returnedMessage;
}
// 发送消息,传递CorrelationData 对象default void send(String routingKey, Message message, CorrelationData correlationData)throws AmqpException {throw new UnsupportedOperationException("This implementation does not support this method");}// 指定交换机、路由、传递CorrelationData 对象void send(String exchange, String routingKey, Message message, CorrelationData correlationData)throws AmqpException;
convertAndSend在send方法的基础上,可以直接发送JAVA 对象,并可以添加一个MessagePostProcessor 消息处理器。
// 使用自定义路由KEY。发送消息到默认交换机,并携带CorrelationData void convertAndSend(String routingKey, Object message, CorrelationData correlationData) throws AmqpException;void convertAndSend(String exchange, String routingKey, Object message, CorrelationData correlationData)throws AmqpException;void convertAndSend(Object message, MessagePostProcessor messagePostProcessor, CorrelationData correlationData)throws AmqpException;void convertAndSend(String routingKey, Object message, MessagePostProcessor messagePostProcessor,CorrelationData correlationData) throws AmqpException;void convertAndSend(String exchange, String routingKey, Object message, MessagePostProcessor messagePostProcessor,CorrelationData correlationData) throws AmqpException;void correlationConvertAndSend(Object message, CorrelationData correlationData) throws AmqpException;
convertSendAndReceive 和AmqpTemplate接口中发送并接收消息一样,是RPC模式,区别是多了个CorrelationData 参数。
@NullableObject convertSendAndReceive(Object message, CorrelationData correlationData) throws AmqpException;@NullableObject convertSendAndReceive(String routingKey, Object message, CorrelationData correlationData)throws AmqpException;@NullableObject convertSendAndReceive(String exchange, String routingKey, Object message,CorrelationData correlationData) throws AmqpException;@NullableObject convertSendAndReceive(Object message, MessagePostProcessor messagePostProcessor,CorrelationData correlationData) throws AmqpException;@NullableObject convertSendAndReceive(String routingKey, Object message,MessagePostProcessor messagePostProcessor, CorrelationData correlationData) throws AmqpException;@NullableObject convertSendAndReceive(String exchange, String routingKey, Object message,MessagePostProcessor messagePostProcessor, CorrelationData correlationData)throws AmqpException;@Nullable T convertSendAndReceiveAsType(Object message, CorrelationData correlationData,ParameterizedTypeReference responseType) throws AmqpException;@Nullable T convertSendAndReceiveAsType(String routingKey, Object message, CorrelationData correlationData,ParameterizedTypeReference responseType) throws AmqpException;@Nullabledefault T convertSendAndReceiveAsType(String exchange, String routingKey, Object message,@Nullable CorrelationData correlationData, ParameterizedTypeReference responseType)throws AmqpException {return convertSendAndReceiveAsType(exchange, routingKey, message, null, correlationData, responseType);}@Nullable T convertSendAndReceiveAsType(Object message, MessagePostProcessor messagePostProcessor,CorrelationData correlationData, ParameterizedTypeReference responseType) throws AmqpException;@Nullable T convertSendAndReceiveAsType(String routingKey, Object message,MessagePostProcessor messagePostProcessor, CorrelationData correlationData,ParameterizedTypeReference responseType) throws AmqpException;@Nullable T convertSendAndReceiveAsType(String exchange, String routingKey, Object message,@Nullable MessagePostProcessor messagePostProcessor,@Nullable CorrelationData correlationData,ParameterizedTypeReference responseType) throws AmqpException;
execute方法可以获取原生Channel执行操作,需要一个ChannelCallback参数。
@Nullable T execute(ChannelCallback action) throws AmqpException;
ChannelCallback是一个函数式接口,使用该接口,可以获取RabbitMQ的Channel,执行任意操作,并返回结果。
@FunctionalInterface
public interface ChannelCallback {/*** @param channel 通道* @return 返回结果*/@NullableT doInRabbit(Channel var1) throws Exception;
}
示例:
ChannelCallback stringChannelCallback = new ChannelCallback() {@Overridepublic Boolean doInRabbit(Channel channel) throws Exception {// 调用Channel 发送消息channel.basicPublish(MqBizConfig.BIZ_EXCHANGE,MqBizConfig.BIZ_ROUTE_KEY,null,"消息".getBytes());System.out.println("doInRabbit");return true;}};Boolean execute = rabbitTemplate.execute(stringChannelCallback);System.out.println("结果:"+execute);
invoke方法需要一个OperationsCallback参数,在该对象的doInRabbit()方法中,任何操作都使用相同的专用通道,该通道将在结束时关闭(不会返回到缓存)。这种使用方式就叫做范围内操作。
@Nullabledefault T invoke(OperationsCallback action) throws AmqpException {ret@Nullable T invoke(OperationsCallback action, @Nullable com.rabbitmq.client.ConfirmCallback acks,@Nullable com.rabbitmq.client.ConfirmCallback nacks);
}
OperationsCallback操作回调,可以获取RabbitOperations 执行操作,并返回结果。
@FunctionalInterfaceinterface OperationsCallback {/*** @param operations RabbitOperations.* @return 结果.*/@NullableT doInRabbit(RabbitOperations operations);
}
waitForConfirms、waitForConfirmsOrDie都是等待确认,但是必须在invoke方法中使用,
// 等待确认 boolean waitForConfirms(long timeout) throws AmqpException;// 等待确认,异常后信道被关闭,生产者发布不能继续发布消息void waitForConfirmsOrDie(long timeout) throws AmqpException;
返回此操作的连接工厂。
ConnectionFactory getConnectionFactory();
start、stop没有实现的方法,只是为了向后兼容。
@Overridedefault void start() {// No-op - implemented for backward compatibility}@Overridedefault void stop() {// No-op - implemented for backward compatibility}@Overridedefault boolean isRunning() {return false;}