jjzjj

spring-boot - Spring-Integration:异常时未发送 Tcp 服务器响应

coder 2023-09-19 原文

我将遗留的 tcp 服务器代码迁移到 spring-boot 并添加了 spring-intergration(基于注释)依赖项来处理 tcp 套接字连接。

我的入站 channel 是 tcpIn() ,出站 channel 是 serviceChannel() 并且我创建了一个自定义 channel [exceptionEventChannel() ] 来保存异常事件消息。

我有一个自定义的序列化器/反序列化器方法(ByteArrayLengthPrefixSerializer() extends AbstractPooledBufferByteArraySerializer),以及一个 MessageHandler @ServiceActivator 方法来将响应发送回 tcp 客户端。


//SpringBoot 2.0.3.RELEASE, Spring Integration 5.0.6.RELEASE

package com.test.config;


import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.event.EventListener;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.annotation.Transformer;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.event.inbound.ApplicationEventListeningMessageProducer;
import org.springframework.integration.ip.IpHeaders;
import org.springframework.integration.ip.tcp.TcpReceivingChannelAdapter;
import org.springframework.integration.ip.tcp.TcpSendingMessageHandler;
import org.springframework.integration.ip.tcp.connection.*;
import org.springframework.integration.ip.tcp.serializer.TcpDeserializationExceptionEvent;
import org.springframework.integration.router.ErrorMessageExceptionTypeRouter;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandlingException;
import org.springframework.messaging.MessagingException;

import java.io.IOException;

@Configuration
@IntegrationComponentScan
public class TcpConfiguration {
    @SuppressWarnings("unused")
    @Value("${tcp.connection.port}")
    private int tcpPort;

    @Bean
    TcpConnectionEventListener customerTcpListener() {
        return new TcpConnectionEventListener();
    }

    @Bean
    public MessageChannel tcpIn() {
        return new DirectChannel();
    }

    @Bean
    public MessageChannel serviceChannel() {
        return new DirectChannel();
    }

    @ConditionalOnMissingBean(name = "errorChannel")
    @Bean
    public MessageChannel errorChannel() {
        return new DirectChannel();
    }

    @Bean
    public MessageChannel exceptionEventChannel() {
        return new DirectChannel();
    }

    @Bean
    public ByteArrayLengthPrefixSerializer byteArrayLengthPrefixSerializer() {
        ByteArrayLengthPrefixSerializer byteArrayLengthPrefixSerializer = new ByteArrayLengthPrefixSerializer();
        byteArrayLengthPrefixSerializer.setMaxMessageSize(98304); //max allowed size set to 96kb
        return byteArrayLengthPrefixSerializer;
    }

    @Bean
    public AbstractServerConnectionFactory tcpNetServerConnectionFactory() {
        TcpNetServerConnectionFactory tcpServerCf = new TcpNetServerConnectionFactory(tcpPort);
        tcpServerCf.setSerializer(byteArrayLengthPrefixSerializer());
        tcpServerCf.setDeserializer(byteArrayLengthPrefixSerializer());
        return tcpServerCf;

    }

    @Bean
    public TcpReceivingChannelAdapter tcpReceivingChannelAdapter() {
        TcpReceivingChannelAdapter adapter = new TcpReceivingChannelAdapter();
        adapter.setConnectionFactory(tcpNetServerConnectionFactory());
        adapter.setOutputChannel(tcpIn());
        adapter.setErrorChannel(exceptionEventChannel());
        return adapter;
    }

    @ServiceActivator(inputChannel = "exceptionEventChannel", outputChannel = "serviceChannel")
    public String handle(Message<MessagingException> msg) {
        //String unfilteredMessage = new String(byteMessage, StandardCharsets.US_ASCII);
        System.out.println("-----------------EXCEPTION ==> " + msg);
        return msg.toString();
    }

    @Transformer(inputChannel = "errorChannel", outputChannel = "serviceChannel")
    public String transformer(String msg) {
        //String unfilteredMessage = new String(byteMessage, StandardCharsets.US_ASCII);
        System.out.println("-----------------ERROR ==> " + msg);
        return msg.toString();
    }


    @ServiceActivator(inputChannel = "serviceChannel")
    @Bean
    public TcpSendingMessageHandler out(AbstractServerConnectionFactory cf) {
        TcpSendingMessageHandler tcpSendingMessageHandler = new TcpSendingMessageHandler();
        tcpSendingMessageHandler.setConnectionFactory(cf);
        return tcpSendingMessageHandler;
    }


    @Bean
    public ApplicationListener<TcpDeserializationExceptionEvent> listener() {
        return new ApplicationListener<TcpDeserializationExceptionEvent>() {

            @Override
            public void onApplicationEvent(TcpDeserializationExceptionEvent tcpDeserializationExceptionEvent) {
                exceptionEventChannel().send(MessageBuilder.withPayload(tcpDeserializationExceptionEvent.getCause())
                        .build());
            }

        };
    }
}

tcpIn() 中的消息被发送到单独的 @Component 类中的 @ServiceActivator 方法,其结构如下:

@Component
public class TcpServiceActivator {

    @Autowired
    public TcpServiceActivator() {
    }


    @ServiceActivator(inputChannel = "tcpIn", outputChannel = "serviceChannel")
    public String service(byte[] byteMessage) {
     // Business Logic returns String Ack Response
    }

我没有遇到运行成功场景的问题。我的 Tcp TestClient 按预期收到 Ack 响应。

但是,当我尝试模拟异常时,比如 Deserializer Exception,异常消息不会作为对 Tcp 客户端的响应发回。 我可以看到我的应用程序监听器获取 TcpDeserializationExceptionEvent 并将消息发送到 exceptionEventChannel。 @ServiceActivator 方法 handle(Message msg) 也打印我的异常消息。但它永远不会到达 MessageHandler 方法 out(AbstractServerConnectionFactory cf) 内的断点(在 Debug模式下)。

我正在努力了解出了什么问题。提前感谢您的帮助。


更新:我注意到套接字在发送响应之前因异常而关闭。我正在想办法解决这个问题

解决方案更新(2019 年 3 月 12 日):

由 Gary 提供,我编辑了反序列化器以返回一条消息,该消息可以通过 @Router 方法进行跟踪并重定向到 errorChannel。监听 errorchannel 的 ServiceActivator 然后将所需的错误消息发送到 outputChannel 。该解决方案似乎有效。

我在 ByteArrayLengthPrefixSerializer 中的反序列化器方法返回 Gary 推荐的“特殊值”,而不是原始的 inputStream 消息。

    public byte[] doDeserialize(InputStream inputStream, byte[] buffer) throws IOException {
        boolean isValidMessage = false;
        try {
            int messageLength = this.readPrefix(inputStream);
            if (messageLength > 0 && fillUntilMaxDeterminedSize(inputStream, buffer, messageLength)) {
                return this.copyToSizedArray(buffer, messageLength);
            }
            return EventType.MSG_INVALID.getName().getBytes(); 
        } catch (SoftEndOfStreamException eose) {
            return EventType.MSG_INVALID.getName().getBytes();
        }
    }

我还创建了一些新 channel 来容纳我的路由器,流程如下:

成功流程 tcpIn (@Router) -> serviceChannel(保存业务逻辑的@serviceActivator) -> outputChannel (@serviceActivator 向客户端发送响应)

异常流 tcpIn (@Router) -> errorChannel(@serviceActivator 准备错误响应消息) -> outputChannel (@serviceActivator 向客户端发送响应)

我的@Router 和'errorHandling' @serviceActivator -

    @Router(inputChannel = "tcpIn", defaultOutputChannel = "errorChannel")
    public String messageRouter(byte[] byteMessage) {
        String unfilteredMessage = new String(byteMessage, StandardCharsets.US_ASCII);
        System.out.println("------------------> "+unfilteredMessage);
        if (Arrays.equals(EventType.MSG_INVALID.getName().getBytes(), byteMessage)) {
            return "errorChannel";
        }
        return "serviceChannel";
    }


    @ServiceActivator(inputChannel = "errorChannel", outputChannel = "outputChannel")
    public String errorHandler(byte[] byteMessage) {
        return Message.ACK_RETRY;
    }

最佳答案

错误 channel 用于处理处理消息时发生的异常。反序列化错误发生在创建消息之前(反序列化器解码消息的有效负载)。

反序列化异常是致命的,正如您所观察到的,套接字已关闭。

一个选择是在反序列化器中捕获异常并返回一个指示反序列化异常发生的“特殊”值,然后在您的主流程中检查该值。

关于spring-boot - Spring-Integration:异常时未发送 Tcp 服务器响应,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55093129/

有关spring-boot - Spring-Integration:异常时未发送 Tcp 服务器响应的更多相关文章

  1. ruby - 具有身份验证的私有(private) Ruby Gem 服务器 - 2

    我想安装一个带有一些身份验证的私有(private)Rubygem服务器。我希望能够使用公共(public)Ubuntu服务器托管内部gem。我读到了http://docs.rubygems.org/read/chapter/18.但是那个没有身份验证-如我所见。然后我读到了https://github.com/cwninja/geminabox.但是当我使用基本身份验证(他们在他们的Wiki中有)时,它会提示从我的服务器获取源。所以。如何制作带有身份验证的私有(private)Rubygem服务器?这是不可能的吗?谢谢。编辑:Geminabox问题。我尝试“捆绑”以安装新的gem..

  2. ruby-on-rails - 带 Spring 锁的 Rails 4 控制台 - 2

    我正在使用Ruby2.1.1和Rails4.1.0.rc1。当执行railsc时,它被锁定了。使用Ctrl-C停止,我得到以下错误日志:~/.rvm/gems/ruby-2.1.1/gems/spring-1.1.2/lib/spring/client/run.rb:47:in`gets':Interruptfrom~/.rvm/gems/ruby-2.1.1/gems/spring-1.1.2/lib/spring/client/run.rb:47:in`verify_server_version'from~/.rvm/gems/ruby-2.1.1/gems/spring-1.1.

  3. ruby-on-rails - 启动 Rails 服务器时 ImageMagick 的警告 - 2

    最近,当我启动我的Rails服务器时,我收到了一长串警告。虽然它不影响我的应用程序,但我想知道如何解决这些警告。我的估计是imagemagick以某种方式被调用了两次?当我在警告前后检查我的git日志时。我想知道如何解决这个问题。-bcrypt-ruby(3.1.2)-better_errors(1.0.1)+bcrypt(3.1.7)+bcrypt-ruby(3.1.5)-bcrypt(>=3.1.3)+better_errors(1.1.0)bcrypt和imagemagick有关系吗?/Users/rbchris/.rbenv/versions/2.0.0-p247/lib/ru

  4. ruby-on-rails - s3_direct_upload 在生产服务器中不工作 - 2

    在Rails4.0.2中,我使用s3_direct_upload和aws-sdkgems直接为s3存储桶上传文件。在开发环境中它工作正常,但在生产环境中它会抛出如下错误,ActionView::Template::Error(noimplicitconversionofnilintoString)在View中,create_cv_url,:id=>"s3_uploader",:key=>"cv_uploads/{unique_id}/${filename}",:key_starts_with=>"cv_uploads/",:callback_param=>"cv[direct_uplo

  5. ruby-on-rails - 每次我尝试部署时,我都会得到 - (gcloud.preview.app.deploy) 错误响应 : [4] DEADLINE_EXCEEDED - 2

    我是Google云的新手,我正在尝试对其进行首次部署。我的第一个部署是RubyonRails项目。我基本上是在关注thisguideinthegoogleclouddocumentation.唯一的区别是我使用的是我自己的项目,而不是他们提供的“helloworld”项目。这是我的app.yaml文件runtime:customvm:trueentrypoint:bundleexecrackup-p8080-Eproductionconfig.ruresources:cpu:0.5memory_gb:1.3disk_size_gb:10当我转到我的项目目录并运行gcloudprevie

  6. ruby-on-rails - 在 Rails 中调试生产服务器 - 2

    您如何在Rails中的实时服务器上进行有效调试,无论是在测试版/生产服务器上?我试过直接在服务器上修改文件,然后重启应用,但是修改好像没有生效,或者需要很长时间(缓存?)我也试过在本地做“脚本/服务器生产”,但是那很慢另一种选择是编码和部署,但效率很低。有人对他们如何有效地做到这一点有任何见解吗? 最佳答案 我会回答你的问题,即使我不同意这种热修补服务器代码的方式:)首先,你真的确定你已经重启了服务器吗?您可以通过跟踪日志文件来检查它。您更改的代码显示的View可能会被缓存。缓存页面位于tmp/cache文件夹下。您可以尝试手动删除

  7. jquery - 我的 jquery AJAX POST 请求无需发送 Authenticity Token (Rails) - 2

    rails中是否有任何规定允许站点的所有AJAXPOST请求在没有authenticity_token的情况下通过?我有一个调用Controller方法的JqueryPOSTajax调用,但我没有在其中放置任何真实性代码,但调用成功。我的ApplicationController确实有'request_forgery_protection'并且我已经改变了config.action_controller.consider_all_requests_local在我的environments/development.rb中为false我还搜索了我的代码以确保我没有重载ajaxSend来发送

  8. ruby - 使用 Ruby 通过 Outlook 发送消息的最简单方法是什么? - 2

    我的工作要求我为某些测试自动生成电子邮件。我一直在四处寻找,但未能找到可以快速实现的合理解决方案。它需要在outlook而不是其他邮件服务器中,因为我们有一些奇怪的身份验证规则,我们需要保存草稿而不是仅仅发送邮件的选项。显然win32ole可以做到这一点,但我找不到任何相当简单的例子。 最佳答案 假设存储了Outlook凭据并且您设置为自动登录到Outlook,WIN32OLE可以很好地完成此操作:require'win32ole'outlook=WIN32OLE.new('Outlook.Application')message=

  9. spring.profiles.active和spring.profiles.include的使用及区别说明 - 2

    转自:spring.profiles.active和spring.profiles.include的使用及区别说明下文笔者讲述spring.profiles.active和spring.profiles.include的区别简介说明,如下所示我们都知道,在日常开发中,开发|测试|生产环境都拥有不同的配置信息如:jdbc地址、ip、端口等此时为了避免每次都修改全部信息,我们则可以采用以上的属性处理此类异常spring.profiles.active属性例:配置文件,可使用以下方式定义application-${profile}.properties开发环境配置文件:application-dev

  10. ruby-on-rails - 在 Ruby on Rails 中发送响应之前如何等待多个异步操作完成? - 2

    在我做的一些网络开发中,我有多个操作开始,比如对外部API的GET请求,我希望它们同时开始,因为一个不依赖另一个的结果。我希望事情能够在后台运行。我找到了concurrent-rubylibrary这似乎运作良好。通过将其混合到您创建的类中,该类的方法具有在后台线程上运行的异步版本。这导致我编写如下代码,其中FirstAsyncWorker和SecondAsyncWorker是我编写的类,我在其中混合了Concurrent::Async模块,并编写了一个名为“work”的方法来发送HTTP请求:defindexop1_result=FirstAsyncWorker.new.async.

随机推荐