DTeam 团队日志

Doer、Delivery、Dream

Grails Async踩坑记——Hibernate Session

冯宇 Posted at — Sep 2, 2018 阅读

Grails本身提供了一个异步支持的插件grails-async,提供异步特性。最近正好手头某个项目需要异步支持,自然而然想到这个插件,但是遇到了不少坑,特地写此文记录下来,以便以后查阅。

插件使用

插件使用上没有太大问题,比较简单,参考官方文档即可。在grails的build.gradle配置文件中加入grails-async的依赖即可。

dependencies {
    runtime "org.grails.plugins:async"
    // 需要用到eventbus才需要引入这个依赖
    runtime "org.grails.plugins:events"
}

可以不加版本号,因为在grails的buildscript的classpath已经引入了grails的maven bom,会自动根据当前使用的Grails版本选择合适的版本号,grails自带的插件基本都满足这个套路,这一点可以在脚手架生成的build.gradle看出来。

Async插件主要提供了两个异步支持的功能——PromiseEvent。基本上用过异步框架的开发者对这两种模型都不陌生,具体使用方法就不说了,官方文档代码一看就懂,说几个主要需要注意的知识点吧。

Promise和Event的实现

Grails的Promise默认的实现方式是 CachedThreadPoolPromiseFactory,其实就是Java自带的Thread Pool(java.util.concurrent.Executors.newCachedThreadPool()),同时还支持的实现方式有GParsRxJava,只要在runtime依赖级别中添加对应的依赖项就会自动使用对应的实现方式。Event也一样,默认使用Promise的默认实现方式,即 CachedThreadPoolPromiseFactory,同样也支持GParsRxJava,切换实现的方法也一样,不再赘述。

关于测试

这个官方文档只用了一小段话简单带过了,后面会说到还有坑的。

为了便于测试,Grails Async插件提供了一个同步实现的方式org.grails.async.factory.SynchronousPromiseFactory,该类会将异步类库进行阻塞,从而转换为同步方式,这样就可以在测试中比较容易写出测试代码了,异步的测试一直都是一个难点。

import org.grails.async.factory.*
import grails.async.*

Promises.promiseFactory = new SynchronousPromiseFactory()

为了在自动化测试中满足每个场景都是一个干净的测试环境的原则,也可以保存原来的Promise实现,以便测试完毕后还原环境:

// spock测试框架中的部分片段
void 'such a method within promise'() {
    setup:
    PromiseFactory old = Promises.promiseFactory
    Promises.promiseFactory = new SynchronousPromiseFactory()
    ... SNIP ...
    when:
    ... SNIP ...
    then:
    ... SNIP ...
    cleanup:
    Promises.promiseFactory = old
}

基本上使用时需要注意的也就这么多了,这个插件总体来说使用上还是没什么难度的。接下来说说项目中遇到的坑了。

Async插件的坑——Hibernate Session

这个严格来说也不算是坑,只不过官方文档对这个问题提之甚少,以致于可能让上手的人忽略了这个问题,造成浪费大量的时间去填坑。接下来我们举一个简单的例子说明这个问题。先来看看一个最基本的Promise使用:

Promise task = Promises.task {
    // 期望异步执行的代码块
}

task.onComplete { List result ->
    // 成功的回调内容,可以根据result.size()判断成功了多少
}

task.onError { Throwable t->
    // 错误的回调,在这里可以处理task中的异常,不需要异常处理可以不要这个block
}

乍一看没什么,几乎所有的异步编程框架都满足这个套路。可是Grails的异步处理中如果包含了Domain的写入或修改时,就很容易踩到坑了,这个坑官方文档中却提之甚少,甚至连解决方案都言语不详。

我们接下来看一个实际的例子:

class MyService {
    @Transactional
    void myMethod(List<Domain> domainList, Logrecord record) {
        PromiseList<Domain> tasks = new PromiseList<>()
        domainList.each { Domain domain ->
            tasks << { modifyDomain(domain) }
        }

        tasks.onComplete { List<Domain> result ->
            record.message = "Domain list size {result.size()} finished."
            record.save()
        }
    }

    @Transactional
    Domain modifyDomain(Domain domain) {
        // 一些修改动作
        domain.save()
    }
}

Promises.task中的坑

我们期望对List<Domain>进行异步并行处理,将每个domain修改过的结果写回数据库,所有任务完成之后我们希望在Logrecord中记录一笔。发现这其中的问题了么?

Grails对Hibernate封装的太好了,以致于大部分情况下让人忽视了Hibernate Session的存在。Domain要想修改,必须工作在Hibernate的Persistence Session之下,而Hibernate Session是thread local绑定的,不在线程中共享。

所以这里的由于Promise的task部分和onComplete会在新的线程执行,所以根本没有工作在当前Hibernate Session环境下,以致于执行到domain.save()这一步的时候方法会卡住,onComplete方法永远也得不到执行。在官方文档一处不起眼的地方发现了解决方案

tasks << {
    Domain.withNewSession {
        modifyDomain(domain.attach())
    }
}

其实这里已经是踩了半天坑之后才搞定的了,直接把domain传进去是不行的(modifyDomain(domain)这种写法是不行的),即使加了withNewSession你会发现数据也没有持久化到数据库。加上hibernate的sql打印会发现虽然方法都执行了,onComplete方法能调用,但是save之后并没有期望中的update语句产生,这又是为什么呢?

还是那个容易被忽略的Hibernate Session问题,domain这个对象工作在先前的Hibernate Session中,自然直接save不了(用明朝的剑斩清朝的官??)。那么此时肯定有不少人都想到应对策略了,只要我想办法刷新一下Session不就可以了吗?于是我评估了以下几种方式

session.get()
session.load()
domain.refresh()
....

经过我的评估,其实session.merge比较合适,这样会减少数据库的读取,而Grails提供了attach()方法,效果是一样的(注意Grails的domain.merge()方法和hibernate的session.merge()方法方向是反的,只有domain.attach()才是hibernate的session.merge()的行为),换成session.merge(domain)也没问题。这样重新拿到了当前的Persistence Session,于是modifyDomain(domain)这个方法得以顺利执行。

Promises.onComplete中的坑

但是对于onComplete来说,更坑的来了,由于onComplete这里执行的是结束回调,到这里之前的session已经end了,根本无法通过session.merge()或者domain.attach()接管到当前的Session中,导致onComplete卡住。如果你不想重新读取数据库获取新的Session(session.get()或者Domain.get(),其实是一样的),基本只能通过HQL做了:

        tasks.onComplete { List<Domain> result ->
            Logrecord.withNewTransaction {
                Logrecord.executeUpdate('UPDATE Logrecord SET message = :message WHERE id = :recordId', [message: "Domain list size {result.size()} finished." recordId: record.id])
            }
        }

由于之前的session早已end,我们只能通过withNewSessionwithNewTransaction方法重新开启一个新的Hibernate Session完成执行HQL的操作,不过这里就没有乐观锁了,想要实现乐观锁就得自己加上version的where条件,不如save()方法那么方便了。如果期望直接利用GORM做,我目前没有找到太好的不用重新读库的方法(Domian.get()会重新读库)。

Lazy loading卡住

Grails默认的ORM模式是lazy loading,即domain中有其他domian的关联,那么不是一开始就fetch过来,而是读取对应的属性的时候才会触发数据库select操作。还是上面的问题,如果在modifyDomain(Domain domain)方法中有lazy loading的行为:

Domain modifyDomain(Domain domain) {
    if (domain.otherDomain.prop == 'some prop') {
        // 一些逻辑处理
    }

    domain.save()
}

执行到lazy loading就会卡住。解决方案其实有两种,一种是将fetch方式由Lazy fetch改为Eager fetch:

    static mapping = {
        otherDomain lazy: false
    }

另外一种解决方案是在task闭包之外手工fetch一次,这样这个otherDomain就会进入session:

String otherDomainProp = domain.otherDomain.prop
tasks << {
    Domain.withNewSession {
        modifyDomain(domain.attach())
    }
}

SynchronousPromiseFactory的坑

当我们自认为解决了Promise的坑,兴奋的去跑自动化测试的时候,发现之前的异步测试全挂了。打开日志,发现了Hibernate抛出了这样的异常: row was deleted or updated by another transaction.,简直要晕掉了。

稍微一想就很容易明白原因,由于SynchronousPromiseFactory会阻塞异步实现,转变为同步实现以便于测试。所以等于我们的taskonPromise全是同步代码了,这样一来等于原来的session还没有end呢,我们又新开了一个session去修改了domain的内容,于是hibernate就报错了。

这里我还没想到其他解决方案,所以只好写了一段判断:

    @Transactional
    void myMethod(List<Domain> domainList, Logrecord record) {
        PromiseList<Domain> tasks = new PromiseList<>()
        domainList.each { Domain domain ->
            tasks << {
                if (Promises.promiseFactory instanceof SynchronousPromiseFactory) {
                    modifyDomain(domain)
                } else {
                    Domain.withNewSession {
                        modifyDomain(domain.attach())
                    }
                }
            }
        }

        tasks.onComplete { List<Domain> result ->
            if (Promises.promiseFactory instanceof SynchronousPromiseFactory) {
                record.message = "Domain list size {result.size()} finished."
                record.save()
            } else {
                Logrecord.withNewTransaction {
                    Logrecord.executeUpdate('UPDATE Logrecord SET message = :message WHERE id = :recordId', [message: "Domain list size {result.size()} finished." recordId: record.id])
                }
            }
        }
    }

判断当前的Promise实现是不是同步实现,如果是同步实现就不开启新的session,避免Hibernate的报错。于是测试终于通过了。

结语

Grails的Async插件使用确实非常方便,让人感觉上手很容易。实际体验之后发现这里的异步Session问题是一个非常大的坑,官方文档却提得非常少。这些解决方案差不多花了我一整天的时间研究,觉得值得记录下来以便以后翻阅,隧成此文。