Grails 本身提供了一个异步支持的插件grails-async
,提供异步特性。最近正好手头某个项目需要异步支持,自然而然想到这个插件,但是遇到了不少坑,特地写此文记录下来,以便以后查阅。
插件使用上没有太大问题,比较简单,参考官方文档即可。在 grails 的build.gradle
配置文件中加入 grails-async 的依赖即可。
dependencies {
runtime "org.grails.plugins:async"
// 需要用到eventbus才需要引入这个依赖
runtime "org.grails.plugins:events"
}
可以不加版本号,因为在 grails 的buildscript
的 classpath 已经引入了 grails 的 maven bom,会自动根据当前使用的 Grails 版本选择合适的版本号,grails 自带的插件基本都满足这个套路,这一点可以在脚手架生成的build.gradle
看出来。
Async
插件主要提供了两个异步支持的功能——Promise
和Event
。基本上用过异步框架的开发者对这两种模型都不陌生,具体使用方法就不说了,官方文档代码一看就懂,说几个主要需要注意的知识点吧。
Grails 的Promise
默认的实现方式是 CachedThreadPoolPromiseFactory,其实就是 Java 自带的 Thread Pool(java.util.concurrent.Executors.newCachedThreadPool()
),同时还支持的实现方式有GPars
和RxJava
,只要在runtime
依赖级别中添加对应的依赖项就会自动使用对应的实现方式。Event
也一样,默认使用Promise
的默认实现方式,即 CachedThreadPoolPromiseFactory,同样也支持GPars
和RxJava
,切换实现的方法也一样,不再赘述。
这个官方文档只用了一小段话简单带过了,后面会说到还有坑的。
为了便于测试,Grails Async 插件提供了一个同步实现的方式org.grails.async.factory.SynchronousPromiseFactory
,该类会将异步类库进行阻塞,从而转换为同步方式,这样就可以在测试中比较容易写出测试代码了,异步的测试一直都是一个难点。
import org.grails.async.factory.*
import grails.async.*
Promises.promiseFactory = new SynchronousPromiseFactory()
为了在自动化测试中满足每个场景都是一个干净的测试环境的原则,也可以保存原来的 Promise 实现,以便测试完毕后还原环境:
// spock测试框架中的部分片段
void 'such a method within promise'() {
setup:
PromiseFactory old = Promises.promiseFactory
Promises.promiseFactory = new SynchronousPromiseFactory()
... SNIP ...
when:
... SNIP ...
then:
... SNIP ...
cleanup:
Promises.promiseFactory = old
}
基本上使用时需要注意的也就这么多了,这个插件总体来说使用上还是没什么难度的。接下来说说项目中遇到的坑了。
这个严格来说也不算是坑,只不过官方文档对这个问题提之甚少,以致于可能让上手的人忽略了这个问题,造成浪费大量的时间去填坑。接下来我们举一个简单的例子说明这个问题。先来看看一个最基本的 Promise 使用:
Promise task = Promises.task {
// 期望异步执行的代码块
}
task.onComplete { List result ->
// 成功的回调内容,可以根据result.size()判断成功了多少
}
task.onError { Throwable t->
// 错误的回调,在这里可以处理task中的异常,不需要异常处理可以不要这个block
}
乍一看没什么,几乎所有的异步编程框架都满足这个套路。可是 Grails 的异步处理中如果包含了 Domain 的写入或修改时,就很容易踩到坑了,这个坑官方文档中却提之甚少,甚至连解决方案都言语不详。
我们接下来看一个实际的例子:
class MyService {
@Transactional
void myMethod(List<Domain> domainList, Logrecord record) {
PromiseList<Domain> tasks = new PromiseList<>()
domainList.each { Domain domain ->
tasks << { modifyDomain(domain) }
}
tasks.onComplete { List<Domain> result ->
record.message = "Domain list size {result.size()} finished."
record.save()
}
}
@Transactional
Domain modifyDomain(Domain domain) {
// 一些修改动作
domain.save()
}
}
我们期望对List<Domain>
进行异步并行处理,将每个 domain 修改过的结果写回数据库,所有任务完成之后我们希望在Logrecord
中记录一笔。发现这其中的问题了么?
Grails 对 Hibernate 封装的太好了,以致于大部分情况下让人忽视了 Hibernate Session 的存在。Domain 要想修改,必须工作在 Hibernate 的 Persistence Session 之下,而 Hibernate Session 是 thread local 绑定的,不在线程中共享。
所以这里的由于 Promise 的task
部分和onComplete
会在新的线程执行,所以根本没有工作在当前 Hibernate Session 环境下,以致于执行到domain.save()
这一步的时候方法会卡住,onComplete
方法永远也得不到执行。在官方文档一处不起眼的地方发现了解决方案:
tasks << {
Domain.withNewSession {
modifyDomain(domain.attach())
}
}
其实这里已经是踩了半天坑之后才搞定的了,直接把domain
传进去是不行的(modifyDomain(domain)
这种写法是不行的),即使加了withNewSession
你会发现数据也没有持久化到数据库。加上 hibernate 的 sql 打印会发现虽然方法都执行了,onComplete 方法能调用,但是 save 之后并没有期望中的update
语句产生,这又是为什么呢?
还是那个容易被忽略的 Hibernate Session 问题,domain
这个对象工作在先前的 Hibernate Session 中,自然直接 save 不了(用明朝的剑斩清朝的官??)。那么此时肯定有不少人都想到应对策略了,只要我想办法刷新一下 Session 不就可以了吗?于是我评估了以下几种方式
session.get()
session.load()
domain.refresh()
....
经过我的评估,其实session.merge
比较合适,这样会减少数据库的读取,而 Grails 提供了attach()方法,效果是一样的(注意 Grails 的 domain.merge()方法和 hibernate 的 session.merge()方法方向是反的,只有 domain.attach()才是 hibernate 的 session.merge()的行为),换成session.merge(domain)
也没问题。这样重新拿到了当前的 Persistence Session,于是modifyDomain(domain)
这个方法得以顺利执行。
但是对于onComplete
来说,更坑的来了,由于onComplete
这里执行的是结束回调,到这里之前的 session 已经 end 了,根本无法通过session.merge()
或者domain.attach()
接管到当前的 Session 中,导致 onComplete 卡住。如果你不想重新读取数据库获取新的 Session(session.get()或者 Domain.get(),其实是一样的),基本只能通过 HQL 做了:
tasks.onComplete { List<Domain> result ->
Logrecord.withNewTransaction {
Logrecord.executeUpdate('UPDATE Logrecord SET message = :message WHERE id = :recordId', [message: "Domain list size {result.size()} finished.", recordId: record.id])
}
}
由于之前的 session 早已 end,我们只能通过withNewSession
或withNewTransaction
方法重新开启一个新的 Hibernate Session 完成执行 HQL 的操作,不过这里就没有乐观锁了,想要实现乐观锁就得自己加上version
的 where 条件,不如save()
方法那么方便了。如果期望直接利用 GORM 做,我目前没有找到太好的不用重新读库的方法(Domian.get()会重新读库)。
Grails 默认的 ORM 模式是 lazy loading,即 domain 中有其他 domian 的关联,那么不是一开始就 fetch 过来,而是读取对应的属性的时候才会触发数据库 select 操作。还是上面的问题,如果在modifyDomain(Domain domain)
方法中有 lazy loading 的行为:
Domain modifyDomain(Domain domain) {
if (domain.otherDomain.prop == 'some prop') {
// 一些逻辑处理
}
domain.save()
}
执行到 lazy loading 就会卡住。解决方案其实有两种,一种是将 fetch 方式由 Lazy fetch 改为 Eager fetch:
static mapping = {
otherDomain lazy: false
}
另外一种解决方案是在 task 闭包之外手工 fetch 一次,这样这个 otherDomain 就会进入 session:
String otherDomainProp = domain.otherDomain.prop
tasks << {
Domain.withNewSession {
modifyDomain(domain.attach())
}
}
当我们自认为解决了 Promise 的坑,兴奋的去跑自动化测试的时候,发现之前的异步测试全挂了。打开日志,发现了 Hibernate 抛出了这样的异常: row was deleted or updated by another transaction.
,简直要晕掉了。
稍微一想就很容易明白原因,由于SynchronousPromiseFactory
会阻塞异步实现,转变为同步实现以便于测试。所以等于我们的task
和onPromise
全是同步代码了,这样一来等于原来的 session 还没有 end 呢,我们又新开了一个 session 去修改了 domain 的内容,于是 hibernate 就报错了。
这里我还没想到其他解决方案,所以只好写了一段判断:
@Transactional
void myMethod(List<Domain> domainList, Logrecord record) {
PromiseList<Domain> tasks = new PromiseList<>()
domainList.each { Domain domain ->
tasks << {
if (Promises.promiseFactory instanceof SynchronousPromiseFactory) {
modifyDomain(domain)
} else {
Domain.withNewSession {
modifyDomain(domain.attach())
}
}
}
}
tasks.onComplete { List<Domain> result ->
if (Promises.promiseFactory instanceof SynchronousPromiseFactory) {
record.message = "Domain list size {result.size()} finished."
record.save()
} else {
Logrecord.withNewTransaction {
Logrecord.executeUpdate('UPDATE Logrecord SET message = :message WHERE id = :recordId', [message: "Domain list size {result.size()} finished.", recordId: record.id])
}
}
}
}
判断当前的 Promise 实现是不是同步实现,如果是同步实现就不开启新的 session,避免 Hibernate 的报错。于是测试终于通过了。
Grails 的 Async 插件使用确实非常方便,让人感觉上手很容易。实际体验之后发现这里的异步 Session 问题是一个非常大的坑,官方文档却提得非常少。这些解决方案差不多花了我一整天的时间研究,觉得值得记录下来以便以后翻阅,隧成此文。
觉得有帮助的话,不妨考虑购买付费文章来支持我们 🙂 :
付费文章