diff --git a/app/src/main/java/com/github/nullptroma/wallenc/app/sync/StorageSyncWorker.kt b/app/src/main/java/com/github/nullptroma/wallenc/app/sync/StorageSyncWorker.kt index 84530e0..f3cefe0 100644 --- a/app/src/main/java/com/github/nullptroma/wallenc/app/sync/StorageSyncWorker.kt +++ b/app/src/main/java/com/github/nullptroma/wallenc/app/sync/StorageSyncWorker.kt @@ -29,6 +29,10 @@ class StorageSyncWorker @AssistedInject constructor( Timber.d("Periodic storage sync finished") Result.success() } + StorageSyncRunOutcome.Cancelled -> { + Timber.d("Periodic storage sync cancelled") + Result.success() + } is StorageSyncRunOutcome.Failed -> { Timber.w(outcome.error, "Periodic storage sync failed") Result.retry() diff --git a/domain/src/main/java/com/github/nullptroma/wallenc/domain/tasks/TaskContext.kt b/domain/src/main/java/com/github/nullptroma/wallenc/domain/tasks/TaskContext.kt index dc9b941..7a19422 100644 --- a/domain/src/main/java/com/github/nullptroma/wallenc/domain/tasks/TaskContext.kt +++ b/domain/src/main/java/com/github/nullptroma/wallenc/domain/tasks/TaskContext.kt @@ -14,4 +14,7 @@ interface TaskContext { fun log(level: TaskLogLevel, key: TaskLogKey) fun fail(error: WallencException): Nothing + + /** Проверяет, что задача не отменена; бросает [kotlinx.coroutines.CancellationException] при отмене. */ + suspend fun ensureNotCancelled() } diff --git a/task-runtime/src/main/java/com/github/nullptroma/wallenc/task/runtime/TaskOrchestrator.kt b/task-runtime/src/main/java/com/github/nullptroma/wallenc/task/runtime/TaskOrchestrator.kt index f6016f5..d6bcdc7 100644 --- a/task-runtime/src/main/java/com/github/nullptroma/wallenc/task/runtime/TaskOrchestrator.kt +++ b/task-runtime/src/main/java/com/github/nullptroma/wallenc/task/runtime/TaskOrchestrator.kt @@ -22,6 +22,8 @@ import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Job import kotlinx.coroutines.SupervisorJob import kotlinx.coroutines.delay +import kotlin.coroutines.coroutineContext +import kotlinx.coroutines.ensureActive import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.StateFlow import kotlinx.coroutines.flow.asStateFlow @@ -238,6 +240,10 @@ class TaskOrchestrator( override fun fail(error: WallencException): Nothing { throw TaskFailedException(error) } + + override suspend fun ensureNotCancelled() { + coroutineContext.ensureActive() + } } private class TaskFailedException(val error: WallencException) : RuntimeException() diff --git a/task-runtime/src/test/java/com/github/nullptroma/wallenc/task/runtime/TaskOrchestratorTest.kt b/task-runtime/src/test/java/com/github/nullptroma/wallenc/task/runtime/TaskOrchestratorTest.kt index 12d59b9..08b447a 100644 --- a/task-runtime/src/test/java/com/github/nullptroma/wallenc/task/runtime/TaskOrchestratorTest.kt +++ b/task-runtime/src/test/java/com/github/nullptroma/wallenc/task/runtime/TaskOrchestratorTest.kt @@ -32,6 +32,24 @@ class TaskOrchestratorTest { assertTrue(task.state is TaskRunState.Completed) } + @Test + fun cancelAllMarksRunningTaskCancelled() = runTest(dispatcher) { + val orchestrator = TaskOrchestrator(dispatcher) + val id = orchestrator.enqueue( + title = "Long", + dispatcher = dispatcher, + work = { ctx -> + ctx.reportProgress(null, null) + kotlinx.coroutines.delay(60_000) + }, + ) + advanceTimeBy(1) + orchestrator.cancelAll() + advanceUntilIdle() + val task = orchestrator.pipelineState.value.tasks.first { it.id == id } + assertTrue(task.state is TaskRunState.Cancelled) + } + @Test fun cancelMarksTaskCancelled() = runTest(dispatcher) { val orchestrator = TaskOrchestrator(dispatcher) diff --git a/usecases/src/main/java/com/github/nullptroma/wallenc/usecases/RunStorageSyncUseCase.kt b/usecases/src/main/java/com/github/nullptroma/wallenc/usecases/RunStorageSyncUseCase.kt index 09e98b5..768cfae 100644 --- a/usecases/src/main/java/com/github/nullptroma/wallenc/usecases/RunStorageSyncUseCase.kt +++ b/usecases/src/main/java/com/github/nullptroma/wallenc/usecases/RunStorageSyncUseCase.kt @@ -57,6 +57,7 @@ class RunStorageSyncUseCase @Inject constructor( executeSync( reason = reason, reportProgress = { fraction, label -> + ctx.ensureNotCancelled() ctx.reportProgress(fraction, label) }, log = { level, key -> ctx.log(level, key) }, @@ -90,6 +91,7 @@ class RunStorageSyncUseCase @Inject constructor( val state = orchestrator.pipelineState.value.tasks.find { it.id == taskId }?.state return when (state) { is TaskRunState.Failed -> StorageSyncRunOutcome.Failed(state.error) + TaskRunState.Cancelled -> StorageSyncRunOutcome.Cancelled else -> StorageSyncRunOutcome.Completed } } diff --git a/usecases/src/main/java/com/github/nullptroma/wallenc/usecases/StorageSyncEngine.kt b/usecases/src/main/java/com/github/nullptroma/wallenc/usecases/StorageSyncEngine.kt index 118c701..797fb4f 100644 --- a/usecases/src/main/java/com/github/nullptroma/wallenc/usecases/StorageSyncEngine.kt +++ b/usecases/src/main/java/com/github/nullptroma/wallenc/usecases/StorageSyncEngine.kt @@ -12,6 +12,11 @@ import com.github.nullptroma.wallenc.domain.interfaces.IStorageSyncGroupStore import com.github.nullptroma.wallenc.domain.interfaces.IVaultsManager import com.github.nullptroma.wallenc.domain.tasks.TaskProgressLabel import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.async +import kotlinx.coroutines.awaitAll +import kotlinx.coroutines.coroutineScope +import kotlin.coroutines.coroutineContext +import kotlinx.coroutines.ensureActive import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.withLock import kotlinx.coroutines.withContext @@ -48,6 +53,7 @@ class StorageSyncEngine @Inject constructor( } reporter(null, TaskProgressLabel.SyncPreparing(groups.size)) for (group in groups) { + coroutineContext.ensureActive() syncGroupInternal( groupId = group.id, reportProgress = reporter, @@ -104,6 +110,7 @@ class StorageSyncEngine @Inject constructor( try { reportProgress(null, TaskProgressLabel.SyncGroupAcquiringLocks(groupId)) for ((lockIndex, storage) in storages.withIndex()) { + coroutineContext.ensureActive() reportProgress( null, TaskProgressLabel.SyncGroupLockProgress(groupId, lockIndex + 1, storages.size), @@ -120,22 +127,34 @@ class StorageSyncEngine @Inject constructor( val entriesByStorage = mutableMapOf() reportProgress(null, TaskProgressLabel.SyncGroupReadingJournals(groupId)) - for ((journalIndex, storage) in storages.withIndex()) { - leaseUntil = renewLocksIfNeeded( - groupId = groupId, - lockedAccessors = lockedAccessors, - currentLeaseUntil = leaseUntil, - reportProgress = reportProgress, - ) ?: return - if (syncGeneration.get() != generationSnapshot) { - reportProgress(null, TaskProgressLabel.SyncGroupCancelled(groupId)) - return - } - reportProgress( - null, - TaskProgressLabel.SyncGroupJournalProgress(groupId, journalIndex + 1, storages.size), - ) - val journal = filterSyncableJournal(storage.accessor.readSyncJournal()) + leaseUntil = renewLocksIfNeeded( + groupId = groupId, + lockedAccessors = lockedAccessors, + currentLeaseUntil = leaseUntil, + reportProgress = reportProgress, + ) ?: return + if (syncGeneration.get() != generationSnapshot) { + reportProgress(null, TaskProgressLabel.SyncGroupCancelled(groupId)) + return + } + val journalReads = coroutineScope { + storages.mapIndexed { journalIndex, storage -> + async { + coroutineContext.ensureActive() + reportProgress( + null, + TaskProgressLabel.SyncGroupJournalProgress( + groupId, + journalIndex + 1, + storages.size, + ), + ) + storage.accessor.flushPendingSyncJournal() + storage to filterSyncableJournal(storage.accessor.readSyncJournal()) + } + }.awaitAll() + } + for ((storage, journal) in journalReads) { entriesByStorage[storage.uuid] = journal mergedByPath.putAll( StorageSyncJournalMerge.merge(mergedByPath, journal), @@ -154,6 +173,7 @@ class StorageSyncEngine @Inject constructor( ) var applyFailures = 0 for ((pathIndex, merged) in mergedEntries.withIndex()) { + coroutineContext.ensureActive() leaseUntil = renewLocksIfNeeded( groupId = groupId, lockedAccessors = lockedAccessors, @@ -178,6 +198,7 @@ class StorageSyncEngine @Inject constructor( } for (target in storages) { + coroutineContext.ensureActive() if (target.uuid == sourceStorage?.uuid) { continue } @@ -231,6 +252,7 @@ class StorageSyncEngine @Inject constructor( val nextLeaseUntil = now.plusSeconds(SYNC_LOCK_LEASE_SECONDS) reportProgress(null, TaskProgressLabel.SyncGroupRenewingLocks(groupId)) for (accessor in lockedAccessors) { + coroutineContext.ensureActive() val renewed = runCatching { accessor.tryAcquireSyncLock(holderId, nextLeaseUntil) }.getOrElse { false } @@ -273,13 +295,13 @@ class StorageSyncEngine @Inject constructor( val result = when (entry.operation) { StorageSyncOperation.DELETE -> { runCatching { - target.accessor.delete(entry.path) + target.accessor.delete(entry.path, recordSyncJournal = false) } } StorageSyncOperation.TRASH -> { runCatching { - target.accessor.moveToTrash(entry.path) + target.accessor.moveToTrash(entry.path, recordSyncJournal = false) } } diff --git a/usecases/src/main/java/com/github/nullptroma/wallenc/usecases/StorageSyncReadiness.kt b/usecases/src/main/java/com/github/nullptroma/wallenc/usecases/StorageSyncReadiness.kt index 8f58bb1..1709c94 100644 --- a/usecases/src/main/java/com/github/nullptroma/wallenc/usecases/StorageSyncReadiness.kt +++ b/usecases/src/main/java/com/github/nullptroma/wallenc/usecases/StorageSyncReadiness.kt @@ -3,6 +3,8 @@ package com.github.nullptroma.wallenc.usecases import com.github.nullptroma.wallenc.domain.interfaces.IStorageSyncGroupStore import com.github.nullptroma.wallenc.domain.interfaces.IVaultsManager import kotlinx.coroutines.delay +import kotlin.coroutines.coroutineContext +import kotlinx.coroutines.ensureActive import javax.inject.Inject import javax.inject.Singleton @@ -27,6 +29,7 @@ class StorageSyncReadiness @Inject constructor( val deadline = System.currentTimeMillis() + timeoutMs while (System.currentTimeMillis() < deadline) { + coroutineContext.ensureActive() if (!isAnyVaultScanning()) { break } @@ -34,6 +37,7 @@ class StorageSyncReadiness @Inject constructor( } while (System.currentTimeMillis() < deadline) { + coroutineContext.ensureActive() if (requiredUuids.all { uuid -> findStorageUseCase.find(uuid) != null }) { return } diff --git a/usecases/src/main/java/com/github/nullptroma/wallenc/usecases/StorageSyncRunOutcome.kt b/usecases/src/main/java/com/github/nullptroma/wallenc/usecases/StorageSyncRunOutcome.kt index 8786c79..d932f3d 100644 --- a/usecases/src/main/java/com/github/nullptroma/wallenc/usecases/StorageSyncRunOutcome.kt +++ b/usecases/src/main/java/com/github/nullptroma/wallenc/usecases/StorageSyncRunOutcome.kt @@ -9,5 +9,8 @@ sealed class StorageSyncRunOutcome { data object Completed : StorageSyncRunOutcome() + /** Задача синхронизации отменена пользователем или пайплайном. */ + data object Cancelled : StorageSyncRunOutcome() + data class Failed(val error: WallencException) : StorageSyncRunOutcome() }