feat(sync): добавил cooperative-отмену sync и pipeline-задач

ensureActive в StorageSyncEngine, flush журнала перед чтением, Cancelled
в StorageSyncRunOutcome и TaskContext.ensureNotCancelled.
This commit is contained in:
2026-05-22 13:22:15 +03:00
parent bc2b354820
commit 2618df41e3
8 changed files with 80 additions and 18 deletions

View File

@@ -29,6 +29,10 @@ class StorageSyncWorker @AssistedInject constructor(
Timber.d("Periodic storage sync finished") Timber.d("Periodic storage sync finished")
Result.success() Result.success()
} }
StorageSyncRunOutcome.Cancelled -> {
Timber.d("Periodic storage sync cancelled")
Result.success()
}
is StorageSyncRunOutcome.Failed -> { is StorageSyncRunOutcome.Failed -> {
Timber.w(outcome.error, "Periodic storage sync failed") Timber.w(outcome.error, "Periodic storage sync failed")
Result.retry() Result.retry()

View File

@@ -14,4 +14,7 @@ interface TaskContext {
fun log(level: TaskLogLevel, key: TaskLogKey) fun log(level: TaskLogLevel, key: TaskLogKey)
fun fail(error: WallencException): Nothing fun fail(error: WallencException): Nothing
/** Проверяет, что задача не отменена; бросает [kotlinx.coroutines.CancellationException] при отмене. */
suspend fun ensureNotCancelled()
} }

View File

@@ -22,6 +22,8 @@ import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job import kotlinx.coroutines.Job
import kotlinx.coroutines.SupervisorJob import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.delay import kotlinx.coroutines.delay
import kotlin.coroutines.coroutineContext
import kotlinx.coroutines.ensureActive
import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.asStateFlow import kotlinx.coroutines.flow.asStateFlow
@@ -238,6 +240,10 @@ class TaskOrchestrator(
override fun fail(error: WallencException): Nothing { override fun fail(error: WallencException): Nothing {
throw TaskFailedException(error) throw TaskFailedException(error)
} }
override suspend fun ensureNotCancelled() {
coroutineContext.ensureActive()
}
} }
private class TaskFailedException(val error: WallencException) : RuntimeException() private class TaskFailedException(val error: WallencException) : RuntimeException()

View File

@@ -32,6 +32,24 @@ class TaskOrchestratorTest {
assertTrue(task.state is TaskRunState.Completed) assertTrue(task.state is TaskRunState.Completed)
} }
@Test
fun cancelAllMarksRunningTaskCancelled() = runTest(dispatcher) {
val orchestrator = TaskOrchestrator(dispatcher)
val id = orchestrator.enqueue(
title = "Long",
dispatcher = dispatcher,
work = { ctx ->
ctx.reportProgress(null, null)
kotlinx.coroutines.delay(60_000)
},
)
advanceTimeBy(1)
orchestrator.cancelAll()
advanceUntilIdle()
val task = orchestrator.pipelineState.value.tasks.first { it.id == id }
assertTrue(task.state is TaskRunState.Cancelled)
}
@Test @Test
fun cancelMarksTaskCancelled() = runTest(dispatcher) { fun cancelMarksTaskCancelled() = runTest(dispatcher) {
val orchestrator = TaskOrchestrator(dispatcher) val orchestrator = TaskOrchestrator(dispatcher)

View File

@@ -57,6 +57,7 @@ class RunStorageSyncUseCase @Inject constructor(
executeSync( executeSync(
reason = reason, reason = reason,
reportProgress = { fraction, label -> reportProgress = { fraction, label ->
ctx.ensureNotCancelled()
ctx.reportProgress(fraction, label) ctx.reportProgress(fraction, label)
}, },
log = { level, key -> ctx.log(level, key) }, log = { level, key -> ctx.log(level, key) },
@@ -90,6 +91,7 @@ class RunStorageSyncUseCase @Inject constructor(
val state = orchestrator.pipelineState.value.tasks.find { it.id == taskId }?.state val state = orchestrator.pipelineState.value.tasks.find { it.id == taskId }?.state
return when (state) { return when (state) {
is TaskRunState.Failed -> StorageSyncRunOutcome.Failed(state.error) is TaskRunState.Failed -> StorageSyncRunOutcome.Failed(state.error)
TaskRunState.Cancelled -> StorageSyncRunOutcome.Cancelled
else -> StorageSyncRunOutcome.Completed else -> StorageSyncRunOutcome.Completed
} }
} }

View File

@@ -12,6 +12,11 @@ import com.github.nullptroma.wallenc.domain.interfaces.IStorageSyncGroupStore
import com.github.nullptroma.wallenc.domain.interfaces.IVaultsManager import com.github.nullptroma.wallenc.domain.interfaces.IVaultsManager
import com.github.nullptroma.wallenc.domain.tasks.TaskProgressLabel import com.github.nullptroma.wallenc.domain.tasks.TaskProgressLabel
import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.async
import kotlinx.coroutines.awaitAll
import kotlinx.coroutines.coroutineScope
import kotlin.coroutines.coroutineContext
import kotlinx.coroutines.ensureActive
import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock import kotlinx.coroutines.sync.withLock
import kotlinx.coroutines.withContext import kotlinx.coroutines.withContext
@@ -48,6 +53,7 @@ class StorageSyncEngine @Inject constructor(
} }
reporter(null, TaskProgressLabel.SyncPreparing(groups.size)) reporter(null, TaskProgressLabel.SyncPreparing(groups.size))
for (group in groups) { for (group in groups) {
coroutineContext.ensureActive()
syncGroupInternal( syncGroupInternal(
groupId = group.id, groupId = group.id,
reportProgress = reporter, reportProgress = reporter,
@@ -104,6 +110,7 @@ class StorageSyncEngine @Inject constructor(
try { try {
reportProgress(null, TaskProgressLabel.SyncGroupAcquiringLocks(groupId)) reportProgress(null, TaskProgressLabel.SyncGroupAcquiringLocks(groupId))
for ((lockIndex, storage) in storages.withIndex()) { for ((lockIndex, storage) in storages.withIndex()) {
coroutineContext.ensureActive()
reportProgress( reportProgress(
null, null,
TaskProgressLabel.SyncGroupLockProgress(groupId, lockIndex + 1, storages.size), TaskProgressLabel.SyncGroupLockProgress(groupId, lockIndex + 1, storages.size),
@@ -120,7 +127,6 @@ class StorageSyncEngine @Inject constructor(
val entriesByStorage = mutableMapOf<UUID, StorageSyncJournal>() val entriesByStorage = mutableMapOf<UUID, StorageSyncJournal>()
reportProgress(null, TaskProgressLabel.SyncGroupReadingJournals(groupId)) reportProgress(null, TaskProgressLabel.SyncGroupReadingJournals(groupId))
for ((journalIndex, storage) in storages.withIndex()) {
leaseUntil = renewLocksIfNeeded( leaseUntil = renewLocksIfNeeded(
groupId = groupId, groupId = groupId,
lockedAccessors = lockedAccessors, lockedAccessors = lockedAccessors,
@@ -131,11 +137,24 @@ class StorageSyncEngine @Inject constructor(
reportProgress(null, TaskProgressLabel.SyncGroupCancelled(groupId)) reportProgress(null, TaskProgressLabel.SyncGroupCancelled(groupId))
return return
} }
val journalReads = coroutineScope {
storages.mapIndexed { journalIndex, storage ->
async {
coroutineContext.ensureActive()
reportProgress( reportProgress(
null, null,
TaskProgressLabel.SyncGroupJournalProgress(groupId, journalIndex + 1, storages.size), TaskProgressLabel.SyncGroupJournalProgress(
groupId,
journalIndex + 1,
storages.size,
),
) )
val journal = filterSyncableJournal(storage.accessor.readSyncJournal()) storage.accessor.flushPendingSyncJournal()
storage to filterSyncableJournal(storage.accessor.readSyncJournal())
}
}.awaitAll()
}
for ((storage, journal) in journalReads) {
entriesByStorage[storage.uuid] = journal entriesByStorage[storage.uuid] = journal
mergedByPath.putAll( mergedByPath.putAll(
StorageSyncJournalMerge.merge(mergedByPath, journal), StorageSyncJournalMerge.merge(mergedByPath, journal),
@@ -154,6 +173,7 @@ class StorageSyncEngine @Inject constructor(
) )
var applyFailures = 0 var applyFailures = 0
for ((pathIndex, merged) in mergedEntries.withIndex()) { for ((pathIndex, merged) in mergedEntries.withIndex()) {
coroutineContext.ensureActive()
leaseUntil = renewLocksIfNeeded( leaseUntil = renewLocksIfNeeded(
groupId = groupId, groupId = groupId,
lockedAccessors = lockedAccessors, lockedAccessors = lockedAccessors,
@@ -178,6 +198,7 @@ class StorageSyncEngine @Inject constructor(
} }
for (target in storages) { for (target in storages) {
coroutineContext.ensureActive()
if (target.uuid == sourceStorage?.uuid) { if (target.uuid == sourceStorage?.uuid) {
continue continue
} }
@@ -231,6 +252,7 @@ class StorageSyncEngine @Inject constructor(
val nextLeaseUntil = now.plusSeconds(SYNC_LOCK_LEASE_SECONDS) val nextLeaseUntil = now.plusSeconds(SYNC_LOCK_LEASE_SECONDS)
reportProgress(null, TaskProgressLabel.SyncGroupRenewingLocks(groupId)) reportProgress(null, TaskProgressLabel.SyncGroupRenewingLocks(groupId))
for (accessor in lockedAccessors) { for (accessor in lockedAccessors) {
coroutineContext.ensureActive()
val renewed = runCatching { val renewed = runCatching {
accessor.tryAcquireSyncLock(holderId, nextLeaseUntil) accessor.tryAcquireSyncLock(holderId, nextLeaseUntil)
}.getOrElse { false } }.getOrElse { false }
@@ -273,13 +295,13 @@ class StorageSyncEngine @Inject constructor(
val result = when (entry.operation) { val result = when (entry.operation) {
StorageSyncOperation.DELETE -> { StorageSyncOperation.DELETE -> {
runCatching { runCatching {
target.accessor.delete(entry.path) target.accessor.delete(entry.path, recordSyncJournal = false)
} }
} }
StorageSyncOperation.TRASH -> { StorageSyncOperation.TRASH -> {
runCatching { runCatching {
target.accessor.moveToTrash(entry.path) target.accessor.moveToTrash(entry.path, recordSyncJournal = false)
} }
} }

View File

@@ -3,6 +3,8 @@ package com.github.nullptroma.wallenc.usecases
import com.github.nullptroma.wallenc.domain.interfaces.IStorageSyncGroupStore import com.github.nullptroma.wallenc.domain.interfaces.IStorageSyncGroupStore
import com.github.nullptroma.wallenc.domain.interfaces.IVaultsManager import com.github.nullptroma.wallenc.domain.interfaces.IVaultsManager
import kotlinx.coroutines.delay import kotlinx.coroutines.delay
import kotlin.coroutines.coroutineContext
import kotlinx.coroutines.ensureActive
import javax.inject.Inject import javax.inject.Inject
import javax.inject.Singleton import javax.inject.Singleton
@@ -27,6 +29,7 @@ class StorageSyncReadiness @Inject constructor(
val deadline = System.currentTimeMillis() + timeoutMs val deadline = System.currentTimeMillis() + timeoutMs
while (System.currentTimeMillis() < deadline) { while (System.currentTimeMillis() < deadline) {
coroutineContext.ensureActive()
if (!isAnyVaultScanning()) { if (!isAnyVaultScanning()) {
break break
} }
@@ -34,6 +37,7 @@ class StorageSyncReadiness @Inject constructor(
} }
while (System.currentTimeMillis() < deadline) { while (System.currentTimeMillis() < deadline) {
coroutineContext.ensureActive()
if (requiredUuids.all { uuid -> findStorageUseCase.find(uuid) != null }) { if (requiredUuids.all { uuid -> findStorageUseCase.find(uuid) != null }) {
return return
} }

View File

@@ -9,5 +9,8 @@ sealed class StorageSyncRunOutcome {
data object Completed : StorageSyncRunOutcome() data object Completed : StorageSyncRunOutcome()
/** Задача синхронизации отменена пользователем или пайплайном. */
data object Cancelled : StorageSyncRunOutcome()
data class Failed(val error: WallencException) : StorageSyncRunOutcome() data class Failed(val error: WallencException) : StorageSyncRunOutcome()
} }