我目前正在使用 SpringBoot 2、spring-boot-starter-webflux 在 netty 和 jOOQ 上开发应用程序。
下面是我经过数小时的研究和 stackoverflow 搜索得出的代码。我内置了很多
记录以查看哪个线程上发生了什么。
用户 Controller :
@RequestMapping(value = "/user", method = RequestMethod.POST)
public Mono<ResponseEntity<Integer>> createUser(@RequestBody ImUser user) {
return Mono.just(user)
.map(it -> {
logger.debug("Receiving request on thread: " + Thread.currentThread().getName());
return it;
})
.map(userService::create)
.map(it -> {
logger.debug("Sending response on thread: " + Thread.currentThread().getName());
return ResponseEntity.status(HttpStatus.CREATED).body(it);
})
.mapError(DuplicateKeyException.class, e -> new SomeSpecialException(e.getMessage(), e));
}
用户服务:
public int create(ImUser user) {
return Mono.just(user)
.subscribeOn(Schedulers.elastic())
.map(u -> {
logger.debug("UserService thread: " + Thread.currentThread().getName());
return imUserDao.insertUser(u);
})
.block();
}
用户道:
@Transactional(propagation = Propagation.REQUIRED, isolation = Isolation.READ_COMMITTED, rollbackFor = Exception.class)
public int insertUser(ImUser user) {
logger.debug("Insert DB on thread: " + Thread.currentThread().getName());
return dsl.insertInto(IM_USER,IM_USER.VERSION, IM_USER.FIRST_NAME, IM_USER.LAST_NAME, IM_USER.BIRTHDATE, IM_USER.GENDER)
.values(1, user.getFirstName(), user.getLastName(), user.getBirthdate(), user.getGender())
.returning(IM_USER.ID)
.fetchOne()
.getId();
}
代码按预期工作,“接收请求”和“发送响应”都在同一个线程上运行 (reactor-http-server-epoll-x) 而阻塞代码(对 imUserDao.insertUser(u) 的调用)在弹性调度程序线程 (elastic-x) 上运行。 事务绑定(bind)到调用注释方法的线程(即 elastic-x),因此按预期工作(我已经测试过 为简单起见,这里未发布其他方法)。
这是一个日志示例:
20:57:21,384 DEBUG admin.UserController| Receiving request on thread: reactor-http-server-epoll-7
20:57:21,387 DEBUG admin.UserService| UserService thread: elastic-2
20:57:21,391 DEBUG admin.ExtendedUserDao| Insert DB on thread: elastic-2
20:57:21,393 DEBUG tools.LoggerListener| Executing query
...
20:57:21,401 DEBUG tools.StopWatch| Finishing : Total: 9.355ms, +3.355ms
20:57:21,409 DEBUG admin.UserController| Sending response on thread: reactor-http-server-epoll-7
我已经研究响应式编程很长时间了,但从来没有完全接触过任何响应式编程。既然我是,我想知道我是否做对了。 所以这是我的问题:
1. 上面的代码是处理传入 HTTP 请求、查询数据库然后响应的好方法吗? 请忽略我为了理智而内置的 logger.debug(...) 调用 :) 我有点希望有一个 Flux< imuser=""> 作为 Controller 方法的参数,从某种意义上说,我有一连串的多个潜在请求 这将在某个时候出现,并且将以相同的方式处理。相反,我发现的示例会在每次收到请求时创建一个 Mono.from(...);。
2. 在 UserService ( Mono.just(user) ) 中创建的第二个 Mono 感觉有些尴尬。我知道我需要开始一个新的流才能 在弹性调度程序上运行代码,但没有执行此操作的运算符(operator)吗?
3. 从代码的编写方式来看,我了解到 UserService 中的 Mono 将被阻塞,直到数据库操作完成, 但是服务于请求的原始流没有被阻塞。这个对吗?
4. 我计划将 Schedulers.elastic() 替换为并行调度程序,我可以在其中配置工作线程的数量。这个想法是最大工作线程数应该与最大数据库连接数相同。 当 Scheduler 中的所有工作线程都忙时会发生什么?那是背压进入的时候吗?
5. 我最初希望在我的 Controller 中包含以下代码:
return userService.create(user)
.map(it -> ResponseEntity.status(HttpStatus.CREATED).body(it))
.mapError(DuplicateKeyException.class, e -> new SomeSpecialException(e.getMessage(), e));
但我没能做到这一点并让事情在正确的线程中运行。有什么方法可以在我的代码中实现这一点吗?
如有任何帮助,我们将不胜感激。谢谢!
最佳答案
服务和 Controller
您的服务阻塞的事实是有问题的,因为在 Controller 中您从 map 内部调用一个阻塞方法,该方法不会在单独的线程上移动。这有可能阻止所有 Controller 。
您可以做的是从 UserService#create 返回一个 Mono(删除末尾的 block())。由于该服务确保 Dao 方法调用是隔离的,因此问题较少。从那里开始,无需在 Controller 中执行 Mono.just(user):只需直接在生成的 Mono 上调用创建并开始链接运算符:
@RequestMapping(value = "/user", method = RequestMethod.POST)
public Mono<ResponseEntity<Integer>> createUser(@RequestBody ImUser user) {
//this log as you saw was executed in the same thread as the controller method
logger.debug("Receiving request on thread: " + Thread.currentThread().getName());
return userService.create(user)
.map(it -> {
logger.debug("Sending response on thread: " + Thread.currentThread().getName());
return ResponseEntity.status(HttpStatus.CREATED).body(it);
})
.mapError(DuplicateKeyException.class, e -> new SomeSpecialException(e.getMessage(), e));
}
记录
请注意,如果您想记录某些内容,有几个比执行 map 并返回 it 更好的选择:
doOnNext 方法专为此量身定制:对 react 信号之一使用react(在本例中,onNext:发出一个值)并执行一些非-变异 Action ,使输出序列与源序列完全相同。 doOn 的“副作用”可以写入控制台或递增统计计数器,例如……还有 doOnComplete、doOnError、doOnSubscribe、doOnCancel 等……
log 只是记录其上方序列中的所有事件。它将检测您是否使用 SLF4J,如果是,则在 DEBUG 级别使用配置的记录器。否则它将使用 JDK 日志记录功能(因此您还需要将其配置为显示 DEBUG 级别日志)。
关于交易的一句话或者更确切地说任何依赖于ThreadLocal
的事情
ThreadLocal 和线程粘性在响应式(Reactive)编程中可能会出现问题,因为底层执行模型在整个序列中保持不变的保证较少。 Flux 可以在多个步骤中执行,每个步骤在不同的 Scheduler(以及线程或线程池)中。即使在特定步骤,一个值也可以由底层线程池的线程 A 处理,而下一个值稍后到达,将在线程 B 上处理。
在这种情况下,依赖 Thread Local 就不那么简单了,我们目前正在积极致力于提供更适合 react 世界的替代方案。
您创建连接池大小的池的想法很好,但不一定足够,事务流量可能会使用多个线程,因此可能会污染某些线程。
当线程池用完线程时会发生什么
如果您使用特定的 Scheduler 来隔离阻塞行为,就像这里一样,一旦它用完线程,它就会抛出一个 RejectedExecutionException。
关于java - 了解 Spring 的 Web 响应式框架,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43130036/
我真的很习惯使用Ruby编写以下代码:my_hash={}my_hash['test']=1Java中对应的数据结构是什么? 最佳答案 HashMapmap=newHashMap();map.put("test",1);我假设? 关于java-等价于Java中的RubyHash,我们在StackOverflow上找到一个类似的问题: https://stackoverflow.com/questions/22737685/
我正在使用Ruby2.1.1和Rails4.1.0.rc1。当执行railsc时,它被锁定了。使用Ctrl-C停止,我得到以下错误日志:~/.rvm/gems/ruby-2.1.1/gems/spring-1.1.2/lib/spring/client/run.rb:47:in`gets':Interruptfrom~/.rvm/gems/ruby-2.1.1/gems/spring-1.1.2/lib/spring/client/run.rb:47:in`verify_server_version'from~/.rvm/gems/ruby-2.1.1/gems/spring-1.1.
我是Google云的新手,我正在尝试对其进行首次部署。我的第一个部署是RubyonRails项目。我基本上是在关注thisguideinthegoogleclouddocumentation.唯一的区别是我使用的是我自己的项目,而不是他们提供的“helloworld”项目。这是我的app.yaml文件runtime:customvm:trueentrypoint:bundleexecrackup-p8080-Eproductionconfig.ruresources:cpu:0.5memory_gb:1.3disk_size_gb:10当我转到我的项目目录并运行gcloudprevie
我正在尝试使用boilerpipe来自JRuby。我看过guide从JRuby调用Java,并成功地将它与另一个Java包一起使用,但无法弄清楚为什么同样的东西不能用于boilerpipe。我正在尝试基本上从JRuby中执行与此Java等效的操作:URLurl=newURL("http://www.example.com/some-location/index.html");Stringtext=ArticleExtractor.INSTANCE.getText(url);在JRuby中试过这个:require'java'url=java.net.URL.new("http://www
我只想对我一直在思考的这个问题有其他意见,例如我有classuser_controller和classuserclassUserattr_accessor:name,:usernameendclassUserController//dosomethingaboutanythingaboutusersend问题是我的User类中是否应该有逻辑user=User.newuser.do_something(user1)oritshouldbeuser_controller=UserController.newuser_controller.do_something(user1,user2)我
什么是ruby的rack或python的Java的wsgi?还有一个路由库。 最佳答案 来自Python标准PEP333:Bycontrast,althoughJavahasjustasmanywebapplicationframeworksavailable,Java's"servlet"APImakesitpossibleforapplicationswrittenwithanyJavawebapplicationframeworktoruninanywebserverthatsupportstheservletAPI.ht
这篇文章是继上一篇文章“Observability:从零开始创建Java微服务并监控它(一)”的续篇。在上一篇文章中,我们讲述了如何创建一个Javaweb应用,并使用Filebeat来收集应用所生成的日志。在今天的文章中,我来详述如何收集应用的指标,使用APM来监控应用并监督web服务的在线情况。源码可以在地址 https://github.com/liu-xiao-guo/java_observability 进行下载。摄入指标指标被视为可以随时更改的时间点值。当前请求的数量可以改变任何毫秒。你可能有1000个请求的峰值,然后一切都回到一个请求。这也意味着这些指标可能不准确,你还想提取最小/
HashMap中为什么引入红黑树,而不是AVL树呢1.概述开始学习这个知识点之前我们需要知道,在JDK1.8以及之前,针对HashMap有什么不同。JDK1.7的时候,HashMap的底层实现是数组+链表JDK1.8的时候,HashMap的底层实现是数组+链表+红黑树我们要思考一个问题,为什么要从链表转为红黑树呢。首先先让我们了解下链表有什么不好???2.链表上述的截图其实就是链表的结构,我们来看下链表的增删改查的时间复杂度增:因为链表不是线性结构,所以每次添加的时候,只需要移动一个节点,所以可以理解为复杂度是N(1)删:算法时间复杂度跟增保持一致查:既然是非线性结构,所以查询某一个节点的时候
Transformers开始在视频识别领域的“猪突猛进”,各种改进和魔改层出不穷。由此作者将开启VideoTransformer系列的讲解,本篇主要介绍了FBAI团队的TimeSformer,这也是第一篇使用纯Transformer结构在视频识别上的文章。如果觉得有用,就请点赞、收藏、关注!paper:https://arxiv.org/abs/2102.05095code(offical):https://github.com/facebookresearch/TimeSformeraccept:ICML2021author:FacebookAI一、前言Transformers(VIT)在图
遍历文件夹我们通常是使用递归进行操作,这种方式比较简单,也比较容易理解。本文为大家介绍另一种不使用递归的方式,由于没有使用递归,只用到了循环和集合,所以效率更高一些!一、使用递归遍历文件夹整体思路1、使用File封装初始目录,2、打印这个目录3、获取这个目录下所有的子文件和子目录的数组。4、遍历这个数组,取出每个File对象4-1、如果File是否是一个文件,打印4-2、否则就是一个目录,递归调用代码实现publicclassSearchFile{publicstaticvoidmain(String[]args){//初始目录Filedir=newFile("d:/Dev");Datebeg