DTeam 团队日志

Doer、Delivery、Dream

Grails Async踩坑记——Hibernate Session

冯宇 Posted at — Sep 2, 2018 阅读

Grails 本身提供了一个异步支持的插件grails-async,提供异步特性。最近正好手头某个项目需要异步支持,自然而然想到这个插件,但是遇到了不少坑,特地写此文记录下来,以便以后查阅。

插件使用

插件使用上没有太大问题,比较简单,参考官方文档即可。在 grails 的build.gradle配置文件中加入 grails-async 的依赖即可。

dependencies {
    runtime "org.grails.plugins:async"
    // 需要用到eventbus才需要引入这个依赖
    runtime "org.grails.plugins:events"
}

可以不加版本号,因为在 grails 的buildscript的 classpath 已经引入了 grails 的 maven bom,会自动根据当前使用的 Grails 版本选择合适的版本号,grails 自带的插件基本都满足这个套路,这一点可以在脚手架生成的build.gradle看出来。

Async插件主要提供了两个异步支持的功能——PromiseEvent。基本上用过异步框架的开发者对这两种模型都不陌生,具体使用方法就不说了,官方文档代码一看就懂,说几个主要需要注意的知识点吧。

Promise 和 Event 的实现

Grails 的Promise默认的实现方式是 CachedThreadPoolPromiseFactory,其实就是 Java 自带的 Thread Pool(java.util.concurrent.Executors.newCachedThreadPool()),同时还支持的实现方式有GParsRxJava,只要在runtime依赖级别中添加对应的依赖项就会自动使用对应的实现方式。Event也一样,默认使用Promise的默认实现方式,即 CachedThreadPoolPromiseFactory,同样也支持GParsRxJava,切换实现的方法也一样,不再赘述。

关于测试

这个官方文档只用了一小段话简单带过了,后面会说到还有坑的。

为了便于测试,Grails Async 插件提供了一个同步实现的方式org.grails.async.factory.SynchronousPromiseFactory,该类会将异步类库进行阻塞,从而转换为同步方式,这样就可以在测试中比较容易写出测试代码了,异步的测试一直都是一个难点。

import org.grails.async.factory.*
import grails.async.*

Promises.promiseFactory = new SynchronousPromiseFactory()

为了在自动化测试中满足每个场景都是一个干净的测试环境的原则,也可以保存原来的 Promise 实现,以便测试完毕后还原环境:

// spock测试框架中的部分片段
void 'such a method within promise'() {
    setup:
    PromiseFactory old = Promises.promiseFactory
    Promises.promiseFactory = new SynchronousPromiseFactory()
    ... SNIP ...
    when:
    ... SNIP ...
    then:
    ... SNIP ...
    cleanup:
    Promises.promiseFactory = old
}

基本上使用时需要注意的也就这么多了,这个插件总体来说使用上还是没什么难度的。接下来说说项目中遇到的坑了。

Async 插件的坑——Hibernate Session

这个严格来说也不算是坑,只不过官方文档对这个问题提之甚少,以致于可能让上手的人忽略了这个问题,造成浪费大量的时间去填坑。接下来我们举一个简单的例子说明这个问题。先来看看一个最基本的 Promise 使用:

Promise task = Promises.task {
    // 期望异步执行的代码块
}

task.onComplete { List result ->
    // 成功的回调内容,可以根据result.size()判断成功了多少
}

task.onError { Throwable t->
    // 错误的回调,在这里可以处理task中的异常,不需要异常处理可以不要这个block
}

乍一看没什么,几乎所有的异步编程框架都满足这个套路。可是 Grails 的异步处理中如果包含了 Domain 的写入或修改时,就很容易踩到坑了,这个坑官方文档中却提之甚少,甚至连解决方案都言语不详。

我们接下来看一个实际的例子:

class MyService {
    @Transactional
    void myMethod(List<Domain> domainList, Logrecord record) {
        PromiseList<Domain> tasks = new PromiseList<>()
        domainList.each { Domain domain ->
            tasks << { modifyDomain(domain) }
        }

        tasks.onComplete { List<Domain> result ->
            record.message = "Domain list size {result.size()} finished."
            record.save()
        }
    }

    @Transactional
    Domain modifyDomain(Domain domain) {
        // 一些修改动作
        domain.save()
    }
}

Promises.task 中的坑

我们期望对List<Domain>进行异步并行处理,将每个 domain 修改过的结果写回数据库,所有任务完成之后我们希望在Logrecord中记录一笔。发现这其中的问题了么?

Grails 对 Hibernate 封装的太好了,以致于大部分情况下让人忽视了 Hibernate Session 的存在。Domain 要想修改,必须工作在 Hibernate 的 Persistence Session 之下,而 Hibernate Session 是 thread local 绑定的,不在线程中共享。

所以这里的由于 Promise 的task部分和onComplete会在新的线程执行,所以根本没有工作在当前 Hibernate Session 环境下,以致于执行到domain.save()这一步的时候方法会卡住,onComplete方法永远也得不到执行。在官方文档一处不起眼的地方发现了解决方案

tasks << {
    Domain.withNewSession {
        modifyDomain(domain.attach())
    }
}

其实这里已经是踩了半天坑之后才搞定的了,直接把domain传进去是不行的(modifyDomain(domain)这种写法是不行的),即使加了withNewSession你会发现数据也没有持久化到数据库。加上 hibernate 的 sql 打印会发现虽然方法都执行了,onComplete 方法能调用,但是 save 之后并没有期望中的update语句产生,这又是为什么呢?

还是那个容易被忽略的 Hibernate Session 问题,domain这个对象工作在先前的 Hibernate Session 中,自然直接 save 不了(用明朝的剑斩清朝的官??)。那么此时肯定有不少人都想到应对策略了,只要我想办法刷新一下 Session 不就可以了吗?于是我评估了以下几种方式

session.get()
session.load()
domain.refresh()
....

经过我的评估,其实session.merge比较合适,这样会减少数据库的读取,而 Grails 提供了attach()方法,效果是一样的(注意 Grails 的 domain.merge()方法和 hibernate 的 session.merge()方法方向是反的,只有 domain.attach()才是 hibernate 的 session.merge()的行为),换成session.merge(domain)也没问题。这样重新拿到了当前的 Persistence Session,于是modifyDomain(domain)这个方法得以顺利执行。

Promises.onComplete 中的坑

但是对于onComplete来说,更坑的来了,由于onComplete这里执行的是结束回调,到这里之前的 session 已经 end 了,根本无法通过session.merge()或者domain.attach()接管到当前的 Session 中,导致 onComplete 卡住。如果你不想重新读取数据库获取新的 Session(session.get()或者 Domain.get(),其实是一样的),基本只能通过 HQL 做了:

        tasks.onComplete { List<Domain> result ->
            Logrecord.withNewTransaction {
                Logrecord.executeUpdate('UPDATE Logrecord SET message = :message WHERE id = :recordId', [message: "Domain list size {result.size()} finished." recordId: record.id])
            }
        }

由于之前的 session 早已 end,我们只能通过withNewSessionwithNewTransaction方法重新开启一个新的 Hibernate Session 完成执行 HQL 的操作,不过这里就没有乐观锁了,想要实现乐观锁就得自己加上version的 where 条件,不如save()方法那么方便了。如果期望直接利用 GORM 做,我目前没有找到太好的不用重新读库的方法(Domian.get()会重新读库)。

Lazy loading 卡住

Grails 默认的 ORM 模式是 lazy loading,即 domain 中有其他 domian 的关联,那么不是一开始就 fetch 过来,而是读取对应的属性的时候才会触发数据库 select 操作。还是上面的问题,如果在modifyDomain(Domain domain)方法中有 lazy loading 的行为:

Domain modifyDomain(Domain domain) {
    if (domain.otherDomain.prop == 'some prop') {
        // 一些逻辑处理
    }

    domain.save()
}

执行到 lazy loading 就会卡住。解决方案其实有两种,一种是将 fetch 方式由 Lazy fetch 改为 Eager fetch:

    static mapping = {
        otherDomain lazy: false
    }

另外一种解决方案是在 task 闭包之外手工 fetch 一次,这样这个 otherDomain 就会进入 session:

String otherDomainProp = domain.otherDomain.prop
tasks << {
    Domain.withNewSession {
        modifyDomain(domain.attach())
    }
}

SynchronousPromiseFactory 的坑

当我们自认为解决了 Promise 的坑,兴奋的去跑自动化测试的时候,发现之前的异步测试全挂了。打开日志,发现了 Hibernate 抛出了这样的异常: row was deleted or updated by another transaction.,简直要晕掉了。

稍微一想就很容易明白原因,由于SynchronousPromiseFactory会阻塞异步实现,转变为同步实现以便于测试。所以等于我们的taskonPromise全是同步代码了,这样一来等于原来的 session 还没有 end 呢,我们又新开了一个 session 去修改了 domain 的内容,于是 hibernate 就报错了。

这里我还没想到其他解决方案,所以只好写了一段判断:

    @Transactional
    void myMethod(List<Domain> domainList, Logrecord record) {
        PromiseList<Domain> tasks = new PromiseList<>()
        domainList.each { Domain domain ->
            tasks << {
                if (Promises.promiseFactory instanceof SynchronousPromiseFactory) {
                    modifyDomain(domain)
                } else {
                    Domain.withNewSession {
                        modifyDomain(domain.attach())
                    }
                }
            }
        }

        tasks.onComplete { List<Domain> result ->
            if (Promises.promiseFactory instanceof SynchronousPromiseFactory) {
                record.message = "Domain list size {result.size()} finished."
                record.save()
            } else {
                Logrecord.withNewTransaction {
                    Logrecord.executeUpdate('UPDATE Logrecord SET message = :message WHERE id = :recordId', [message: "Domain list size {result.size()} finished." recordId: record.id])
                }
            }
        }
    }

判断当前的 Promise 实现是不是同步实现,如果是同步实现就不开启新的 session,避免 Hibernate 的报错。于是测试终于通过了。

结语

Grails 的 Async 插件使用确实非常方便,让人感觉上手很容易。实际体验之后发现这里的异步 Session 问题是一个非常大的坑,官方文档却提得非常少。这些解决方案差不多花了我一整天的时间研究,觉得值得记录下来以便以后翻阅,隧成此文。