jjzjj

java - 在 Spring Integration 中定制变压器

coder 2023-09-18 原文

我的基于 spring 集成的 TCP 服务器运行良好。

如果服务需要很长时间,它还会处理服务器的回复超时。当服务花费的时间比设置的回复超时时间长时,它会将消息发送到错误 channel ,错误 channel 又将错误消息发送回客户端。

这是我的代码:

<int:transformer id="errorHandler"
    input-channel="errorChannel"
    ref="myTransformer" method="transform" />
<bean id="myTransformer" class="com.sample.MyTransformer" />

public class MyTransformer {
    private static Logger logger = Logger.getLogger(MyTransformer.class);

    public String transform(org.springframework.integration.handler.ReplyRequiredException e) {
        logger.error("timeout exception is thrown");
        return "Error in processing request.";
    }
}

上面的代码有效,客户端得到“处理请求时出错”,服务器日志中有条目“超时异常被抛出”。但是我在日志中也看到了以下异常:

2016-06-30 16:25:27,827 ERROR [org.springframework.integration.handler.LoggingHandler] org.springframework.integration.handler.ReplyRequiredException: No reply produced by handler 'org.springframework.integration.config.ServiceActivatorFactoryBean#0', and its 'requiresReply' property is set to true.
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:109)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:147)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:120)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:442)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSendAndReceive(GenericMessagingTemplate.java:150)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSendAndReceive(GenericMessagingTemplate.java:45)
    at org.springframework.messaging.core.AbstractMessagingTemplate.sendAndReceive(AbstractMessagingTemplate.java:42)
    at org.springframework.integration.core.MessagingTemplate.sendAndReceive(MessagingTemplate.java:97)
    at org.springframework.integration.gateway.MessagingGatewaySupport.doSendAndReceive(MessagingGatewaySupport.java:422)
    at org.springframework.integration.gateway.MessagingGatewaySupport.sendAndReceiveMessage(MessagingGatewaySupport.java:390)
    at org.springframework.integration.ip.tcp.TcpInboundGateway.doOnMessage(TcpInboundGateway.java:119)
    at org.springframework.integration.ip.tcp.TcpInboundGateway.onMessage(TcpInboundGateway.java:97)
    at org.springframework.integration.ip.tcp.connection.TcpNetConnection.run(TcpNetConnection.java:182)
    at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
    at java.lang.Thread.run(Thread.java:662)

MyTransformer 的实现似乎不正确。

请问如何定制变压器? 如何在转换方法中获取有效负载,以便我可以回复客户端“处理请求时出错。有效载荷 = ' + 有效载荷?

谢谢

更新:为了避免记录 ReplyRequiredException,我更改了 tcp-inbound-gateway 上的错误 channel ,如下所示:

<int-ip:tcp-inbound-gateway id="gatewayCrLf"
    connection-factory="crLfServer"
    request-channel="requestChannel"
    error-channel="tcpErrorChannel"
    reply-timeout="10000"
    />
<int:channel id="tcpErrorChannel" /> 

<int:service-activator input-channel="requestChannel" ref="gateway" requires-reply="true"/>
<int:gateway id="gateway" default-request-channel="timeoutChannel" default-reply-timeout="10000" />
<int:object-to-string-transformer id="serverBytes2String"
    input-channel="timeoutChannel"
    output-channel="serviceChannel"/>

<int:channel id="timeoutChannel">
    <int:dispatcher task-executor="timeoutExecutor"/>
</int:channel>

<bean id="timeoutExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
    <property name="corePoolSize" value="5" />
    <property name="maxPoolSize" value="100" />
    <property name="queueCapacity" value="50" />
</bean>

但我收到 MessageDeliveryException。 tcpErrorChannel 设置似乎有误。你能建议一下吗?这是堆栈跟踪:

2016-07-05 12:17:34,266 ERROR [org.springframework.integration.ip.tcp.connection.TcpNetConnection] Exception sending message: GenericMessage [payload=byte[67], headers={timestamp=1467735444239, id=30eb099e-955d-1bd3-1789-49aa9fc84b6f, ip_tcp_remotePort=64055, ip_address=127.0.0.1, ip_localInetAddress=/127.0.0.1, ip_hostname=127.0.0.1, ip_connectionId=127.0.0.1:64055:5678:908d39a1-d027-4753-b144-59b9c0390fd7}]
org.springframework.messaging.MessagingException: failure occurred in error-handling flow; nested exception is org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'org.springframework.context.support.FileSystemXmlApplicationContext@4876db09.tcpErrorChannel'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers
    at org.springframework.integration.gateway.MessagingGatewaySupport.doSendAndReceive(MessagingGatewaySupport.java:452)
    at org.springframework.integration.gateway.MessagingGatewaySupport.sendAndReceiveMessage(MessagingGatewaySupport.java:390)
    at org.springframework.integration.ip.tcp.TcpInboundGateway.doOnMessage(TcpInboundGateway.java:119)
    at org.springframework.integration.ip.tcp.TcpInboundGateway.onMessage(TcpInboundGateway.java:97)
    at org.springframework.integration.ip.tcp.connection.TcpNetConnection.run(TcpNetConnection.java:182)
    at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
    at java.lang.Thread.run(Thread.java:662)
Caused by: org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'org.springframework.context.support.FileSystemXmlApplicationContext@4876db09.tcpErrorChannel'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:81)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:442)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSendAndReceive(GenericMessagingTemplate.java:150)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSendAndReceive(GenericMessagingTemplate.java:45)
    at org.springframework.messaging.core.AbstractMessagingTemplate.sendAndReceive(AbstractMessagingTemplate.java:42)
    at org.springframework.integration.core.MessagingTemplate.sendAndReceive(MessagingTemplate.java:97)
    at org.springframework.integration.gateway.MessagingGatewaySupport.doSendAndReceive(MessagingGatewaySupport.java:449)
    ... 7 more
Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:153)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:120)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77)
    ... 14 more

上述异常将通过将 errorChennel 更改为 tcpErrorChannel 而消失:

非常感谢 Artem 的完美解决方案。

最佳答案

看起来是你的另一个问题的继续reply-timeout meaning in tcp-inbound-gateway in spring integration .

嗯,ReplyRequiredException<int:service-activator input-channel="requestChannel" ref="gateway" requires-reply="true"/> 的结果.

requires-reply="true" - 请阅读它的描述。你可以玩这个,把它做成false (默认一个),但是你的“超时”逻辑将不起作用。所以,你别无选择,除非继续忍受:

ERROR [org.springframework.integration.handler.LoggingHandler] org.springframework.integration.handler.ReplyRequiredException

不过,这只是对默认 errorChannel 的订阅者之一的影响: http://docs.spring.io/spring-integration/reference/html/configuration.html#namespace-errorhandler .

现在关于你的最后一个问题。所有异常都包装到 ErrorMessage 中并发送到那个 errorChannel .通常异常被包装到 MessagingException 中,例如在我们的例子中,它是这样的:

else if (this.requiresReply && !isAsync()) {
    throw new ReplyRequiredException(message, "No reply produced by handler '" +
            getComponentName() + "', and its 'requiresReply' property is set to true.");
}

注意那个message ctor 参数它恰好是一个 requestMessage在我们的场景中没有回复。您可以提取所需的 payload从中。

所以,现在你可以得到 failedMessage来自 ReplyRequiredException在你的transform()并提取其 payload为了你的目标。

更新

Since this ReplyRequiredException is a known exception, there is no need to see it as Error.

好吧,即使它是一个已知的异常,但它在其他组件和场景中也是一个重要的异常,当它可能不合适时不会收到回复。 ReplyRequiredException在 Framework 组件中非常常见,因此将它移到不同的类别对于您的应用程序来说可能是一个很大的架构错误。

为了避免这些日志,你不应该使用默认的 errorChannel ,但来自您的 TCP 入站网关的其他一些。类似于 error-channel="tcpErrorChannel"就这样。默认 errorChannel仍然会有LoggingHandler作为订阅者,但您不会向该 channel 发送错误消息。

关于java - 在 Spring Integration 中定制变压器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38133012/

有关java - 在 Spring Integration 中定制变压器的更多相关文章

  1. java - 等价于 Java 中的 Ruby Hash - 2

    我真的很习惯使用Ruby编写以下代码:my_hash={}my_hash['test']=1Java中对应的数据结构是什么? 最佳答案 HashMapmap=newHashMap();map.put("test",1);我假设? 关于java-等价于Java中的RubyHash,我们在StackOverflow上找到一个类似的问题: https://stackoverflow.com/questions/22737685/

  2. ruby - 如何进行排列以有效地定制输出 - 2

    这是一道面试题,我没有答对,但还是很好奇怎么解。你有N个人的大家庭,分别是1,2,3,...,N岁。你想给你的大家庭拍张照片。所有的家庭成员都排成一排。“我是家里的friend,建议家庭成员安排如下:”1岁的家庭成员坐在这一排的最左边。每两个坐在一起的家庭成员的年龄相差不得超过2岁。输入:整数N,1≤N≤55。输出:摄影师可以拍摄的照片数量。示例->输入:4,输出:4符合条件的数组:[1,2,3,4][1,2,4,3][1,3,2,4][1,3,4,2]另一个例子:输入:5输出:6符合条件的数组:[1,2,3,4,5][1,2,3,5,4][1,2,4,3,5][1,2,4,5,3][

  3. java - 从 JRuby 调用 Java 类的问题 - 2

    我正在尝试使用boilerpipe来自JRuby。我看过guide从JRuby调用Java,并成功地将它与另一个Java包一起使用,但无法弄清楚为什么同样的东西不能用于boilerpipe。我正在尝试基本上从JRuby中执行与此Java等效的操作:URLurl=newURL("http://www.example.com/some-location/index.html");Stringtext=ArticleExtractor.INSTANCE.getText(url);在JRuby中试过这个:require'java'url=java.net.URL.new("http://www

  4. java - 我的模型类或其他类中应该有逻辑吗 - 2

    我只想对我一直在思考的这个问题有其他意见,例如我有classuser_controller和classuserclassUserattr_accessor:name,:usernameendclassUserController//dosomethingaboutanythingaboutusersend问题是我的User类中是否应该有逻辑user=User.newuser.do_something(user1)oritshouldbeuser_controller=UserController.newuser_controller.do_something(user1,user2)我

  5. java - 什么相当于 ruby​​ 的 rack 或 python 的 Java wsgi? - 2

    什么是ruby​​的rack或python的Java的wsgi?还有一个路由库。 最佳答案 来自Python标准PEP333:Bycontrast,althoughJavahasjustasmanywebapplicationframeworksavailable,Java's"servlet"APImakesitpossibleforapplicationswrittenwithanyJavawebapplicationframeworktoruninanywebserverthatsupportstheservletAPI.ht

  6. Observability:从零开始创建 Java 微服务并监控它 (二) - 2

    这篇文章是继上一篇文章“Observability:从零开始创建Java微服务并监控它(一)”的续篇。在上一篇文章中,我们讲述了如何创建一个Javaweb应用,并使用Filebeat来收集应用所生成的日志。在今天的文章中,我来详述如何收集应用的指标,使用APM来监控应用并监督web服务的在线情况。源码可以在地址 https://github.com/liu-xiao-guo/java_observability 进行下载。摄入指标指标被视为可以随时更改的时间点值。当前请求的数量可以改变任何毫秒。你可能有1000个请求的峰值,然后一切都回到一个请求。这也意味着这些指标可能不准确,你还想提取最小/

  7. 【Java 面试合集】HashMap中为什么引入红黑树,而不是AVL树呢 - 2

    HashMap中为什么引入红黑树,而不是AVL树呢1.概述开始学习这个知识点之前我们需要知道,在JDK1.8以及之前,针对HashMap有什么不同。JDK1.7的时候,HashMap的底层实现是数组+链表JDK1.8的时候,HashMap的底层实现是数组+链表+红黑树我们要思考一个问题,为什么要从链表转为红黑树呢。首先先让我们了解下链表有什么不好???2.链表上述的截图其实就是链表的结构,我们来看下链表的增删改查的时间复杂度增:因为链表不是线性结构,所以每次添加的时候,只需要移动一个节点,所以可以理解为复杂度是N(1)删:算法时间复杂度跟增保持一致查:既然是非线性结构,所以查询某一个节点的时候

  8. 【Java入门】使用Java实现文件夹的遍历 - 2

    遍历文件夹我们通常是使用递归进行操作,这种方式比较简单,也比较容易理解。本文为大家介绍另一种不使用递归的方式,由于没有使用递归,只用到了循环和集合,所以效率更高一些!一、使用递归遍历文件夹整体思路1、使用File封装初始目录,2、打印这个目录3、获取这个目录下所有的子文件和子目录的数组。4、遍历这个数组,取出每个File对象4-1、如果File是否是一个文件,打印4-2、否则就是一个目录,递归调用代码实现publicclassSearchFile{publicstaticvoidmain(String[]args){//初始目录Filedir=newFile("d:/Dev");Datebeg

  9. java - 为什么 ruby​​ modulo 与 java/other lang 不同? - 2

    我基本上来自Java背景并且努力理解Ruby中的模运算。(5%3)(-5%3)(5%-3)(-5%-3)Java中的上述操作产生,2个-22个-2但在Ruby中,相同的表达式会产生21个-1-2.Ruby在逻辑上有多擅长这个?模块操作在Ruby中是如何实现的?如果将同一个操作定义为一个web服务,两个服务如何匹配逻辑。 最佳答案 在Java中,模运算的结果与被除数的符号相同。在Ruby中,它与除数的符号相同。remainder()在Ruby中与被除数的符号相同。您可能还想引用modulooperation.

  10. java - Ruby 相当于 Java 的 Collections.unmodifiableList 和 Collections.unmodifiableMap - 2

    Java的Collections.unmodifiableList和Collections.unmodifiableMap在Ruby标准API中是否有等价物? 最佳答案 使用freeze应用程序接口(interface):Preventsfurthermodificationstoobj.ARuntimeErrorwillberaisedifmodificationisattempted.Thereisnowaytounfreezeafrozenobject.SeealsoObject#frozen?.Thismethodretur

随机推荐