意料外的问题
/*** 将指定的 [amount] 的金额,从 [accountA] 转移到 [accountB]*/suspend fun transferMoney(accountA: String, accountB: String, amount: Int) {// 使用了 IO dispatcher,所以该 DB 的操作在 IO 线程上进行withContext(Dispatchers.IO) {database.beginTransaction() //在 IO-Thread-1 线程上开始执行事务try {// 协程可以在与调度器(这里就是 Dispatchers.IO)相关联的任何线程上绑定并继续执行。同时,由于事务也是在 IO-Thread-1 中开始的,因此我们可能恰好可以成功执行查询。moneyDao.decrease(accountA, amount) //挂起函数// 如果协程又继续在 IO-Thread-2 上执行,那么下列操作数据库的代码可能会引起死锁,因为它需要等到 IO-Thread-1 的线程执行结束后才可以继续。moneyDao.increase(accountB, amount) //挂起函数database.setTransactionSuccessful() //永远不会执行这一行} finally {database.endTransaction() //永远不会执行这一行}}}
Android 的 SQLite 事务受制于单个线程
在协程中使用数据库事务操作可能会引起死锁
简单实现
suspend fun <T> RoomDatabase.runInTransaction(block: suspend () -> T): T = withContext(newSingleThreadContext("DB")) {beginTransaction()try {val result = block.invoke(this)setTransactionSuccessful()return@runBlocking result} finally {endTransaction()}}
// 一个很简单的退税函数suspend fun sendTaxRefund(federalAccount: String, taypayerList: List<Taxpayer>) {database.runInTransaction {val refundJobs = taypayerList.map { taxpayer ->coroutineScope {// 并行去计算退税金额async(Dispatchers.IO) {val amount = irsTool.calculateRefund(taxpayer)moneyDao.decrease(federalAccount, amount)moneyDao.increase(taxpayer.account, amount)}}}// 等待所有计算任务结束refundJobs.joinAll()}}
介绍 withTransaction
fun transferMoney(accountA: String,accountB: String,amount: Int) = GlobalScope.launch(Dispatchers.Main) {roomDatabase.withTransaction {moneyDao.decrease(accountA, amount)moneyDao.increase(accountB, amount)}Toast.makeText(context, "Transfer Completed.", Toast.LENGTH_SHORT).show()}
withTransaction API
https://developer.android.google.cn/reference/kotlin/androidx/room/package-summary.html#(androidx.room.RoomDatabase).withTransaction(kotlin.coroutines.SuspendFunction0)
withTransaction API 在上下文中创建了三个关键元素:
单线程调度器,用于执行数据库操作;
上下文元素,帮助 DAO 函数判断其是否处在事务中;
ThreadContextElement,用来标记事务协程中所使用的调度线程。
事务调度器
/***构建并返回一个 [ContinuationInterceptor] 用来将协程分发到获取到的线程中,并执行事务。[controlJob] 用来通过取消任务来控制线程的释放。*/private suspend fun Executor.acquireTransactionThread(controlJob: Job): ContinuationInterceptor = suspendCancellableCoroutine { continuation ->continuation.invokeOnCancellation {// 当我们在等待获取到可用线程时,如果失败了或者任务取消,我们是不能够停止等待这一动作的,但我们可以取消 controlJob,这样一旦获取到控制权,很快就会被释放。controlJob.cancel()}try {execute {// runBlocking 创建一个 event loop 来执行协程中的任务代码runBlocking {// 获取到线程后,通过返回有 runBlocking 创建的拦截器来恢复 suspendCancellableCoroutine,拦截器将会被用来拦截和分发代码块到获取的线程中continuation.resume(coroutineContext[ContinuationInterceptor]!!)// 挂起 runBlocking 协程,直到 controlJob 完成。由于协程是空的,所以这将会阻止 runBlocking 立即结束。controlJob.join()}}} catch (ex: RejectedExecutionException) {// 无法获取线程,取消协程continuation.cancel(IllegalStateException("Unable to acquire a thread to perform the transaction.", ex))}}
suspendCancellableCoroutine
https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/suspend-cancellable-coroutine.html
runBlocking
https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/run-blocking.html
事务上下文元素
internal class TransactionElement(private val transactionThreadControlJob: Job,internal val transactionDispatcher: ContinuationInterceptor) : CoroutineContext.Element {// Singleton key 用于检索此上下文中的 elementcompanion object Key : CoroutineContext.Key<TransactionElement>override val key: CoroutineContext.Key<TransactionElement>get() = TransactionElement/***这个 element 用来统计事务数量(包含嵌套事务)。调用 [acquire] 来增加计数,调用 [release] 来减少计数。如果在调用 [release] 时计数达到 0,则事务被取消,事务线程会被释放*/private val referenceCount = AtomicInteger(0)fun acquire() {referenceCount.incrementAndGet()}fun release() {val count = referenceCount.decrementAndGet()if (count < 0) {throw IllegalStateException("Transaction was never started or was already released.")} else if (count == 0) {// 取消控制事务线程的 job 会导致它被 releasetransactionThreadControlJob.cancel()}}}
事务线程标记
private final ThreadLocal<Integer> mSuspendingTransactionId = new ThreadLocal<>();public void assertNotSuspendingTransaction() {if (!inTransaction() && mSuspendingTransactionId.get() != null) {throw new IllegalStateException("Cannot access database on a different"+ " coroutine context inherited from a suspending transaction.");}}
private suspend fun RoomDatabase.createTransactionContext(): CoroutineContext {val controlJob = Job()val dispatcher = queryExecutor.acquireTransactionThread(controlJob)val transactionElement = TransactionElement(controlJob, dispatcher)val threadLocalElement =suspendingTransactionId.asContextElement(controlJob.hashCode())return dispatcher + transactionElement + threadLocalElement}
事务 API 的实现
创建了事务上下文之后,我们终于可以提供一个安全的 API 用于在协程中执行数据库事务。接下来要做的就是将这个上下文和通常的 begin/end 事务模式结合起来:
suspend fun <R> RoomDatabase.withTransaction(block: suspend () -> R): R {// 如果可以的话就使用继承的事务上下文,这样允许嵌套挂起的事务val transactionContext =coroutineContext[TransactionElement]?.transactionDispatcher?: createTransactionContext()return withContext(transactionContext) {val transactionElement = coroutineContext[TransactionElement]!!transactionElement.acquire()try {beginTransaction()try {// 在一个新的 scope 中封装 suspend 代码块,来等待子协程val result = coroutineScope {block.invoke(this)}setTransactionSuccessful()return@withContext result} finally {endTransaction()}} finally {transactionElement.release()}}}
推荐阅读