foreground task для фоновой синхронизации
This commit is contained in:
@@ -9,10 +9,13 @@ import com.github.nullptroma.wallenc.domain.tasks.TaskId
|
||||
import com.github.nullptroma.wallenc.domain.tasks.TaskLogKey
|
||||
import com.github.nullptroma.wallenc.domain.tasks.TaskLogLevel
|
||||
import com.github.nullptroma.wallenc.domain.tasks.TaskProgressLabel
|
||||
import com.github.nullptroma.wallenc.domain.tasks.TaskRunState
|
||||
import kotlinx.coroutines.Dispatchers
|
||||
import kotlinx.coroutines.flow.MutableStateFlow
|
||||
import kotlinx.coroutines.flow.StateFlow
|
||||
import kotlinx.coroutines.flow.asStateFlow
|
||||
import kotlinx.coroutines.flow.filter
|
||||
import kotlinx.coroutines.flow.first
|
||||
import java.util.concurrent.atomic.AtomicBoolean
|
||||
import javax.inject.Inject
|
||||
import javax.inject.Singleton
|
||||
@@ -73,19 +76,21 @@ class RunStorageSyncUseCase @Inject constructor(
|
||||
}
|
||||
}
|
||||
|
||||
suspend fun runBlocking(reason: StorageSyncTriggerReason) {
|
||||
if (!running.compareAndSet(false, true)) {
|
||||
return
|
||||
/**
|
||||
* Ставит sync в пайплайн задач (как debounce / sync-tab) и ждёт завершения.
|
||||
* Для WorkManager и других фоновых запусков без отдельного «orphan»-лога.
|
||||
*/
|
||||
suspend fun enqueueAndAwait(reason: StorageSyncTriggerReason): StorageSyncRunOutcome {
|
||||
if (!enqueue(reason)) {
|
||||
return StorageSyncRunOutcome.SkippedAlreadyRunning
|
||||
}
|
||||
_syncRunning.value = true
|
||||
try {
|
||||
executeSync(
|
||||
reason = reason,
|
||||
reportProgress = { _, _ -> },
|
||||
log = { level, key -> orchestrator.appendPipelineLog(level, key) },
|
||||
)
|
||||
} finally {
|
||||
clearRunningState()
|
||||
val taskId = _activeSyncTaskId.value
|
||||
?: return StorageSyncRunOutcome.Completed
|
||||
syncRunning.filter { !it }.first()
|
||||
val state = orchestrator.pipelineState.value.tasks.find { it.id == taskId }?.state
|
||||
return when (state) {
|
||||
is TaskRunState.Failed -> StorageSyncRunOutcome.Failed(state.error)
|
||||
else -> StorageSyncRunOutcome.Completed
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -0,0 +1,13 @@
|
||||
package com.github.nullptroma.wallenc.usecases
|
||||
|
||||
import com.github.nullptroma.wallenc.domain.errors.WallencException
|
||||
|
||||
/** Результат [RunStorageSyncUseCase.enqueueAndAwait]. */
|
||||
sealed class StorageSyncRunOutcome {
|
||||
/** Синхронизация уже выполнялась — новая задача не создана. */
|
||||
data object SkippedAlreadyRunning : StorageSyncRunOutcome()
|
||||
|
||||
data object Completed : StorageSyncRunOutcome()
|
||||
|
||||
data class Failed(val error: WallencException) : StorageSyncRunOutcome()
|
||||
}
|
||||
@@ -171,6 +171,74 @@ class StorageSyncEngineTest {
|
||||
)
|
||||
engine.syncGroup(group.id) { _, label -> labels.add(label) }
|
||||
assertTrue(labels.any { it is TaskProgressLabel.SyncGroupLockFailed })
|
||||
assertNull((first.accessor as FakeStorageAccessor).syncLock)
|
||||
assertNull((second.accessor as FakeStorageAccessor).syncLock)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun syncGroupReleasesLocksAfterSuccessfulSync() = runBlocking {
|
||||
val source = FakeStorage()
|
||||
val target = FakeStorage()
|
||||
source.addSyncJournalEntry(
|
||||
StorageSyncJournalEntry(
|
||||
path = "a.txt",
|
||||
operation = StorageSyncOperation.UPSERT,
|
||||
revision = StorageSyncRevision(1L, "x", Instant.EPOCH),
|
||||
),
|
||||
)
|
||||
source.putFile("a.txt", "x".encodeToByteArray())
|
||||
val group = StorageSyncGroup(
|
||||
id = "ok",
|
||||
storageUuids = setOf(source.uuid, target.uuid),
|
||||
encryptionKind = StorageSyncGroupEncryptionKind.NONE,
|
||||
)
|
||||
val engine = createEngine(
|
||||
storages = listOf(source, target),
|
||||
groups = listOf(group),
|
||||
)
|
||||
engine.syncGroup(group.id) { _, _ -> }
|
||||
assertNull((source.accessor as FakeStorageAccessor).syncLock)
|
||||
assertNull((target.accessor as FakeStorageAccessor).syncLock)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun syncGroupReleasesLocksWhenJournalReadFails() = runBlocking {
|
||||
val first = FakeStorage()
|
||||
val second = FakeStorage()
|
||||
(first.accessor as FakeStorageAccessor).readSyncJournalThrows =
|
||||
IllegalStateException("journal read failed")
|
||||
val group = StorageSyncGroup(
|
||||
id = "journal-fail",
|
||||
storageUuids = setOf(first.uuid, second.uuid),
|
||||
encryptionKind = StorageSyncGroupEncryptionKind.NONE,
|
||||
)
|
||||
val engine = createEngine(
|
||||
storages = listOf(first, second),
|
||||
groups = listOf(group),
|
||||
)
|
||||
runCatching { engine.syncGroup(group.id) { _, _ -> } }
|
||||
assertNull((first.accessor as FakeStorageAccessor).syncLock)
|
||||
assertNull((second.accessor as FakeStorageAccessor).syncLock)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun syncGroupReleasesLocksWhenJournalEmpty() = runBlocking {
|
||||
val first = FakeStorage()
|
||||
val second = FakeStorage()
|
||||
val group = StorageSyncGroup(
|
||||
id = "empty-journal",
|
||||
storageUuids = setOf(first.uuid, second.uuid),
|
||||
encryptionKind = StorageSyncGroupEncryptionKind.NONE,
|
||||
)
|
||||
val labels = mutableListOf<TaskProgressLabel?>()
|
||||
val engine = createEngine(
|
||||
storages = listOf(first, second),
|
||||
groups = listOf(group),
|
||||
)
|
||||
engine.syncGroup(group.id) { _, label -> labels.add(label) }
|
||||
assertTrue(labels.any { it is TaskProgressLabel.SyncGroupNoJournalEntries })
|
||||
assertNull((first.accessor as FakeStorageAccessor).syncLock)
|
||||
assertNull((second.accessor as FakeStorageAccessor).syncLock)
|
||||
}
|
||||
|
||||
private fun createEngine(
|
||||
|
||||
@@ -30,6 +30,7 @@ class FakeStorageAccessor : IStorageAccessor {
|
||||
var syncJournal: StorageSyncJournal = emptyMap()
|
||||
var syncLock: StorageSyncLock? = null
|
||||
var acquireLockResult: Boolean = true
|
||||
var readSyncJournalThrows: Throwable? = null
|
||||
|
||||
override val size: StateFlow<Long?> = MutableStateFlow(0L)
|
||||
override val numberOfFiles: StateFlow<Int?> = MutableStateFlow(0)
|
||||
@@ -106,7 +107,10 @@ class FakeStorageAccessor : IStorageAccessor {
|
||||
}
|
||||
}
|
||||
|
||||
override suspend fun readSyncJournal(): StorageSyncJournal = syncJournal
|
||||
override suspend fun readSyncJournal(): StorageSyncJournal {
|
||||
readSyncJournalThrows?.let { throw it }
|
||||
return syncJournal
|
||||
}
|
||||
|
||||
override suspend fun putSyncJournalEntries(entries: StorageSyncJournal) {
|
||||
syncJournal = StorageSyncJournalMerge.merge(syncJournal, entries)
|
||||
|
||||
Reference in New Issue
Block a user