
前言: 最近没事浏览Spring官网,简单写一些相关的笔记,这篇文章整理Spring AMQP相关内容。文章并不包含所有技术点,只是记录有收获
目录
Spring AMQP提供了一个扮演核心角色的“模板”,定义操作的接口是AmqpTemplate , 接口操作涵盖了发送和接收消息的一般行为 . 因此他包含了发送和接收消息的所有基本操作. 模板接口的每一个实现都依赖特定的客户端类库, 目前只有RabbitTemplate


参考源码:
package org.springframework.amqp.core;
public interface AmqpTemplate {
/**
* 使用默认路由密钥向默认交换机发送消息
*/
void send(Message message) throws AmqpException;
/**
* 使用特定路由密钥向默认交换机发送消息
*/
void send(String routingKey, Message message) throws AmqpException;
/**
* 使用特定路由密钥向特定交换机发送消息
*/
void send(String exchange, String routingKey, Message message) throws AmqpException;
//代码略
}
AmqpTemplate指定AMQP操作的提供了发送消息的方法,其中最后一个方法有三个参数,它是显示发送消息的方法,他允许在运行时提供AMQP交换器 名称和路由关键字来发送消息,而最后的参数是实际常见的Message 。使用方法如下
amqpTemplate.send("marketData.topic", "quotes.nasdaq.THING1",
new Message("12.34".getBytes(), someProperties));
如果是使用同给一个交换器(exchange)可以通过设置(set)的方法设置交换器,然后再发送消息,例如
amqpTemplate.setExchange("marketData.topic");
amqpTemplate.send("quotes.nasdaq.FOO", new Message("12.34".getBytes(), someProperties));
如果amqpTemplate 设置了交换器(exchange)和路由键(routingKey)属性,那么只需要接收消息参数即可
amqpTemplate.setExchange("marketData.topic");
amqpTemplate.setRoutingKey("quotes.nasdaq.FOO");
amqpTemplate.send(new Message("12.34".getBytes(), someProperties));
AmqpTemplate提供了将Java对象转换为消息并发送的方法

参考源码如下:
package org.springframework.amqp.core;
public interface AmqpTemplate {
//将Java对象转换为消息并发送给默认交换机
void convertAndSend(Object message) throws AmqpException;
//将Java对象转换为Amqp消息,并将其发送到具有特定路由的默认交换机
void convertAndSend(String routingKey, Object message) throws AmqpException;
//将Java对象转换为Amqp消息,并使用特定的路由密钥将其发送到特定的交换机。
void convertAndSend(String exchange, String routingKey, Object message) throws AmqpException;
//将Java对象转换为Amqp消息,并使用默认路由密钥将其发送到默认交换机
void convertAndSend(Object message, MessagePostProcessor messagePostProcessor) throws AmqpException;
//将Java对象转换为Amqp消息,并使用特定的路由密钥将其发送到默认交换机
void convertAndSend(String routingKey, Object message, MessagePostProcessor messagePostProcessor)
throws AmqpException;
//将Java对象转换为Amqp消息,并使用特定的路由密钥将其发送到特定的交换机。
void convertAndSend(String exchange, String routingKey, Object message, MessagePostProcessor messagePostProcessor)
throws AmqpException;
//代码略
}
从接口AmqpTemplate 的实现类RabbitTemplate可以看到Object对象通过MessageConverter对象转换为成Message对象
package org.springframework.amqp.rabbit.core;
public class RabbitTemplate extends RabbitAccessor // NOSONAR type line count
implements BeanFactoryAware, RabbitOperations, ChannelAwareMessageListener,
ListenerContainerAware, PublisherCallbackChannel.Listener, BeanNameAware, DisposableBean {
//默认转换对象为SimpleMessageConverter
private MessageConverter messageConverter = new SimpleMessageConverter();
@Override
public void convertAndSend(String exchange, String routingKey, final Object message,
final MessagePostProcessor messagePostProcessor,
@Nullable CorrelationData correlationData) throws AmqpException {
Message messageToSend = convertMessageIfNecessary(message);
messageToSend = messagePostProcessor.postProcessMessage(messageToSend, correlationData,
nullSafeExchange(exchange), nullSafeRoutingKey(routingKey));
send(exchange, routingKey, messageToSend, correlationData);
}
protected Message convertMessageIfNecessary(final Object object) {
if (object instanceof Message) {
return (Message) object;
}
return getRequiredMessageConverter().toMessage(object, new MessageProperties());
}eturn converter;
}
private MessageConverter getRequiredMessageConverter() throws IllegalStateException {
MessageConverter converter = getMessageConverter();
if (converter == null) {
throw new AmqpIllegalStateException(
"No 'messageConverter' specified. Check configuration of RabbitTemplate.");
}
return converter;
}
//代码略
}
显示设置交换器(exchange)和路由键(routingKey)属性是比较推荐的使用方法,应为代码更清晰易读。 当不显示设置两个属性的时候也会有默认值。
AmqpTemplate默认交换器和默认路由键都是空String类型。因为AMQP规范将默认交换器定义为没有名称。 所有队列都自动绑定到默认交换器(direct exchange).
//参考RabbitTemplate 源码
package org.springframework.amqp.rabbit.core
public class RabbitTemplate extends RabbitAccessor // NOSONAR type line count
implements BeanFactoryAware, RabbitOperations, ChannelAwareMessageListener,
ListenerContainerAware, PublisherCallbackChannel.Listener, BeanNameAware, DisposableBean {
private static final String DEFAULT_EXCHANGE = "";
private static final String DEFAULT_ROUTING_KEY = "";
@Override
public void send(Message message) throws AmqpException {
send(this.exchange, this.routingKey, message);
}
@Override
public void send(String routingKey, Message message) throws AmqpException {
send(this.exchange, routingKey, message);
}
//省略其他代码
}
例如可以创建一个模板,用于将消息发送到某个队列
// 没有设置交换器就是默认交换器
RabbitTemplate template = new RabbitTemplate();
// 消息将会发送到名称为queue.helloWorld队列中
template.setRoutingKey("queue.helloWorld");
// 执行发送消息
template.send(new Message("Hello World".getBytes(), someProperties));
AmqpTemplate接口使用的Message参数,可以使用MessageBuilder和MessagePropertiesBuilder对象快速构建
MessageBuilder
package org.springframework.amqp.core;
public final class MessageBuilder extends MessageBuilderSupport<Message> {
//由构建器创建的消息正文将直接引用“body”
public static MessageBuilder withBody(byte[] body) {}
//由构建器创建的消息正文将是新数组中“body”的副本,通过Array.copyOf复制
public static MessageBuilder withClonedBody(byte[] body) {}
//由构建器创建的消息正文将是一个新数组,包含“body”的字节范围,通过Array.copyOf复制
public static MessageBuilder withBody(byte[] body, int from, int to) {}
//构建器创建的消息将具有一个包含参数的正文副本的新数组的主体。
//参数的属性被复制到一个新的MessageProperties对象
public static MessageBuilder fromClonedMessage(Message message) {}
//建设者创建的消息将具有一个直接引用参数体的主体。
//参数的属性被复制到一个新的MessageProperties对象
public static MessageBuilder fromMessage(Message message) {}
//构建返回Message 对象
@Override
public Message build() {
return new Message(this.body, this.buildProperties());
}
}
通过MessageBuilder构建message
Message message = MessageBuilder.withBody("foo".getBytes())
.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
.setMessageId("123")
.setHeader("bar", "baz")
.build();
MessagePropertiesBuilder
package org.springframework.amqp.core;
public final class MessagePropertiesBuilder extends MessageBuilderSupport<MessageProperties> {
public static MessagePropertiesBuilder newInstance() {}
public static MessagePropertiesBuilder fromProperties(MessageProperties properties) {}
public static MessagePropertiesBuilder fromClonedProperties(MessageProperties properties) {}
//构建返回MessageProperties对象
@Override
public MessageProperties build() {
return this.buildProperties();
}
}
通过MessagePropertiesBuilder构建MessageProperties 然后在通过MessageBuilder构建message
MessageProperties props = MessagePropertiesBuilder.newInstance()
.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
.setMessageId("123")
.setHeader("bar", "baz")
.build();
Message message = MessageBuilder.withBody("foo".getBytes())
.andProperties(props)
.build();
接收消息有两种方式,一种是通过轮询的方法来轮询单个消息,另一种是是通过监听器来异步接收消息。异步消息使用的专用组件而不是AmqpTemplate, AmqpTemplate可以用于轮询接收消息,在使用AmqpTemplate接收消息时,默认情况下如果没有消息的时候不会阻塞,直接返回空, 也可设置接收的阻塞时间,

参考源码如下:
package org.springframework.amqp.core;
public interface AmqpTemplate {
//如果存在来自默认队列的消息,则接收消息。立即返回
Message receive() throws AmqpException;
//如果有来自特定队列的消息,则接收消息 ,立刻返回
Message receive(String queueName) throws AmqpException;
//从默认队列接收消息,如果消息可用,则等待指定的等待时间
Message receive(long timeoutMillis) throws AmqpException;
//从特定队列接收消息,如果消息可用,则等待指定的等待时间
Message receive(String queueName, long timeoutMillis) throws AmqpException;
//代码略
}
AmqpTemplate提供了四个重载方法来接收POJO,而不是上面的Message对象, 它们返回对象是Object,用以替换返回值 Message。

如下源码所示
package org.springframework.amqp.core;
public interface AmqpTemplate {
//接收默认队列的消息,如果消息可用,并转换为JAVA对象。立即返回
Object receiveAndConvert() throws AmqpException;
//接收特定队列的消息,如果消息可用,将其转换为Java对象。立即返回,可能返回空值
Object receiveAndConvert(String queueName) throws AmqpException;
//接收默认队列的消息,如果消息可用,则等待指定的等待时间
Object receiveAndConvert(long timeoutMillis) throws AmqpException;
//接收特定队列的消息,如果消息可用,则等待指定的等待时间
Object receiveAndConvert(String queueName, long timeoutMillis) throws AmqpException;
//代码略
}
从接口AmqpTemplate 的实现类RabbitTemplate可以看到Message是通过MessageConverter对象转换为成Java对象
package org.springframework.amqp.rabbit.core;
public class RabbitTemplate extends RabbitAccessor // NOSONAR type line count
implements BeanFactoryAware, RabbitOperations, ChannelAwareMessageListener,
ListenerContainerAware, PublisherCallbackChannel.Listener, BeanNameAware, DisposableBean {
//默认转换对象为SimpleMessageConverter
private MessageConverter messageConverter = new SimpleMessageConverter();
//接收特定队列的消息,如果消息可用,则等待指定的等待时间
Object receiveAndConvert(String queueName, long timeoutMillis) throws AmqpException{
Message response = timeoutMillis == 0 ? doReceiveNoWait(queueName) : receive(queueName, timeoutMillis);
if (response != null) {
return getRequiredMessageConverter().fromMessage(response);
}
}
private MessageConverter getRequiredMessageConverter() throws IllegalStateException {
MessageConverter converter = getMessageConverter();
if (converter == null) {
throw new AmqpIllegalStateException(
"No 'messageConverter' specified. Check configuration of RabbitTemplate.");
}
return converter;
}
//代码略
}
除此之外AmqpTemplate还有几个receiveAndReply方法,这些方法用来实现同步接收,处理并回复消息

AmqpTemplate实现负责接收和回复阶段。在大多数情况下,提供ReceiveAndReplyCallback的实现来为接收到的消息执行一些业务逻辑,如果需要,可以构建回复对象或消息。需要注意的是ReceiveAndReplyCallback可能返回null。在这种情况下,没有发送回复消息
package org.springframework.amqp.core;
public interface AmqpTemplate {
//接收默认队列的消息,调用提供的ReceiveAndReplyCallback,
//如果回调返回一个消息,则将回复消息发送到MessageProperties中的replyTo地址,
//或者发送到默认交换和默认routingKe
<R, S> boolean receiveAndReply(ReceiveAndReplyCallback<R, S> callback) throws AmqpException;
//接收指定队列的消息,调用提供的ReceiveAndReplyCallback,
//如果回调返回一个消息,则将回复消息发送到MessageProperties中的replyTo地址,
//或者发送到默认交换和默认routingKe
<R, S> boolean receiveAndReply(String queueName, ReceiveAndReplyCallback<R, S> callback) throws AmqpException;
//接收默认队列的消息,调用提供的ReceiveAndReplyCallback,
//如果回调返回消息,则向提供的exchange和routingKey发送回复消息
<R, S> boolean receiveAndReply(ReceiveAndReplyCallback<R, S> callback, String replyExchange, String replyRoutingKey)
throws AmqpException;
//接收指定队列的消息,调用提供的ReceiveAndReplyCallback,
//如果回调返回消息,则向提供的exchange和routingKey发送回复消息
<R, S> boolean receiveAndReply(String queueName, ReceiveAndReplyCallback<R, S> callback, String replyExchange,
String replyRoutingKey) throws AmqpException;
//接收默认队列的消息,调用提供的ReceiveAndReplyCallback,
//如果回调返回一个消息,则向ReplyToAddressCallback的结果中的replyToAddress发送回复消息
<R, S> boolean receiveAndReply(ReceiveAndReplyCallback<R, S> callback,
ReplyToAddressCallback<S> replyToAddressCallback) throws AmqpException;
//接收指定队列的消息,调用提供的ReceiveAndReplyCallback,
//如果回调返回一个消息,则向ReplyToAddressCallback的结果中的replyToAddress发送回复消息
<R, S> boolean receiveAndReply(String queueName, ReceiveAndReplyCallback<R, S> callback,
ReplyToAddressCallback<S> replyToAddressCallback) throws AmqpException;
//代码略
}
例如
boolean b=template.receiveAndReply("myqueue",new ReceiveAndReplyCallback<String,String>(){
public String handle(String str) {
System.out.println("getMessage is :" + str);
//doSomething
return "OK";
}
});
我是rails的新手,想在form字段上应用验证。myviewsnew.html.erb.....模拟.rbclassSimulation{:in=>1..25,:message=>'Therowmustbebetween1and25'}end模拟Controller.rbclassSimulationsController我想检查模型类中row字段的整数范围,如果不在范围内则返回错误信息。我可以检查上面代码的范围,但无法返回错误消息提前致谢 最佳答案 关键是您使用的是模型表单,一种显示ActiveRecord模型实例属性的表单。c
我正在处理旧代码的一部分。beforedoallow_any_instance_of(SportRateManager).toreceive(:create).and_return(true)endRubocop错误如下:Avoidstubbingusing'allow_any_instance_of'我读到了RuboCop::RSpec:AnyInstance我试着像下面那样改变它。由此beforedoallow_any_instance_of(SportRateManager).toreceive(:create).and_return(true)end对此:let(:sport_
rails中是否有任何规定允许站点的所有AJAXPOST请求在没有authenticity_token的情况下通过?我有一个调用Controller方法的JqueryPOSTajax调用,但我没有在其中放置任何真实性代码,但调用成功。我的ApplicationController确实有'request_forgery_protection'并且我已经改变了config.action_controller.consider_all_requests_local在我的environments/development.rb中为false我还搜索了我的代码以确保我没有重载ajaxSend来发送
我有一个非常简单的RubyRack服务器,例如:app=Proc.newdo|env|req=Rack::Request.new(env).paramspreq.inspect[200,{'Content-Type'=>'text/plain'},['Somebody']]endRack::Handler::Thin.run(app,:Port=>4001,:threaded=>true)每当我使用JSON对象向服务器发送POSTHTTP请求时:{"session":{"accountId":String,"callId":String,"from":Object,"headers":
我的工作要求我为某些测试自动生成电子邮件。我一直在四处寻找,但未能找到可以快速实现的合理解决方案。它需要在outlook而不是其他邮件服务器中,因为我们有一些奇怪的身份验证规则,我们需要保存草稿而不是仅仅发送邮件的选项。显然win32ole可以做到这一点,但我找不到任何相当简单的例子。 最佳答案 假设存储了Outlook凭据并且您设置为自动登录到Outlook,WIN32OLE可以很好地完成此操作:require'win32ole'outlook=WIN32OLE.new('Outlook.Application')message=
我正在使用Ruby,我正在与一个网络端点通信,该端点在发送消息本身之前需要格式化“header”。header中的第一个字段必须是消息长度,它被定义为网络字节顺序中的2二进制字节消息长度。比如我的消息长度是1024。如何将1024表示为二进制双字节? 最佳答案 Ruby(以及Perl和Python等)中字节整理的标准工具是pack和unpack。ruby的packisinArray.您的长度应该是两个字节长,并且按网络字节顺序排列,这听起来像是n格式说明符的工作:n|Integer|16-bitunsigned,network(bi
SPI接收数据左移一位问题目录SPI接收数据左移一位问题一、问题描述二、问题分析三、探究原理四、经验总结最近在工作在学习调试SPI的过程中遇到一个问题——接收数据整体向左移了一位(1bit)。SPI数据收发是数据交换,因此接收数据时从第二个字节开始才是有效数据,也就是数据整体向右移一个字节(1byte)。请教前辈之后也没有得到解决,通过在网上查阅前人经验终于解决问题,所以写一个避坑经验总结。实际背景:MCU与一款芯片使用spi通信,MCU作为主机,芯片作为从机。这款芯片采用的是它规定的六线SPI,多了两根线:RDY和INT,这样从机就可以主动请求主机给主机发送数据了。一、问题描述根据从机芯片手
如果我在模型中设置验证消息validates:name,:presence=>{:message=>'Thenamecantbeblank.'}我如何让该消息显示在闪光警报中,这是我迄今为止尝试过的方法defcreate@message=Message.new(params[:message])if@message.valid?ContactMailer.send_mail(@message).deliverredirect_to(root_path,:notice=>"Thanksforyourmessage,Iwillbeintouchsoon")elseflash[:error]
RSpec似乎按顺序匹配方法接收的消息。我不确定如何使以下代码工作:allow(a).toreceive(:f)expect(a).toreceive(:f).with(2)a.f(1)a.f(2)a.f(3)我问的原因是a.f的一些调用是由我的代码的上层控制的,所以我不能对这些方法调用添加期望。 最佳答案 RSpecspy是测试这种情况的一种方式。要监视一个方法,用allowstub,除了方法名称之外没有任何约束,调用该方法,然后expect确切的方法调用。例如:allow(a).toreceive(:f)a.f(2)a.f(1)
s=Socket.new(Socket::AF_INET,Socket::SOCK_STREAM,0)s.connect(Socket.pack_sockaddr_in('port','hostname'))ssl=OpenSSL::SSL::SSLSocket.new(s,sslcert)ssl.connect从这里开始,如果ssl连接和底层套接字仍然是ESTABLISHED,或者它是否在默认值7200之后进入CLOSE_WAIT,我想检查一个线程几秒钟甚至更糟的是在实际上不需要.write()或.read()的情况下关闭。是用select()、IO.select()还是其他方法完成