Переключение языка
This commit is contained in:
@@ -4,7 +4,9 @@ import com.github.nullptroma.wallenc.domain.errors.toWallencException
|
||||
import com.github.nullptroma.wallenc.domain.interfaces.IStorageSyncEngine
|
||||
import com.github.nullptroma.wallenc.domain.tasks.ITaskOrchestrator
|
||||
import com.github.nullptroma.wallenc.domain.tasks.TaskId
|
||||
import com.github.nullptroma.wallenc.domain.tasks.TaskLogKey
|
||||
import com.github.nullptroma.wallenc.domain.tasks.TaskLogLevel
|
||||
import com.github.nullptroma.wallenc.domain.tasks.TaskProgressLabel
|
||||
import kotlinx.coroutines.Dispatchers
|
||||
import kotlinx.coroutines.flow.MutableStateFlow
|
||||
import kotlinx.coroutines.flow.StateFlow
|
||||
@@ -28,6 +30,7 @@ class RunStorageSyncUseCase(
|
||||
* @param logReason техническая метка для логов (не для UI)
|
||||
* @return false, если синхронизация уже в очереди или выполняется — новая задача не создана
|
||||
*/
|
||||
@Suppress("UNUSED_PARAMETER")
|
||||
fun enqueue(displayTitle: String, logReason: String): Boolean {
|
||||
if (!running.compareAndSet(false, true)) {
|
||||
return false
|
||||
@@ -39,17 +42,16 @@ class RunStorageSyncUseCase(
|
||||
dispatcher = Dispatchers.IO,
|
||||
work = { ctx ->
|
||||
try {
|
||||
ctx.log(TaskLogLevel.Info, "Storage sync started, reason=$logReason")
|
||||
ctx.reportProgress(null, "Storage sync: started")
|
||||
ctx.log(TaskLogLevel.Info, TaskLogKey.SyncStarted)
|
||||
ctx.reportProgress(null, TaskProgressLabel.SyncStarted)
|
||||
syncEngine.syncAllGroups { fraction, label ->
|
||||
ctx.reportProgress(fraction, label)
|
||||
}
|
||||
ctx.log(TaskLogLevel.Info, "Storage sync finished")
|
||||
ctx.reportProgress(null, "Storage sync: completed")
|
||||
ctx.log(TaskLogLevel.Info, TaskLogKey.SyncFinished)
|
||||
ctx.reportProgress(null, TaskProgressLabel.SyncCompleted)
|
||||
} catch (e: Exception) {
|
||||
val err = e.toWallencException()
|
||||
ctx.log(TaskLogLevel.Error, "Storage sync failed: $err")
|
||||
ctx.reportProgress(null, "Storage sync: failed - $err")
|
||||
ctx.log(TaskLogLevel.Error, TaskLogKey.SyncFailed(err))
|
||||
ctx.fail(err)
|
||||
} finally {
|
||||
running.set(false)
|
||||
|
||||
@@ -1,12 +1,13 @@
|
||||
package com.github.nullptroma.wallenc.usecases
|
||||
|
||||
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournalEntry
|
||||
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncOperation
|
||||
import com.github.nullptroma.wallenc.domain.interfaces.IStorage
|
||||
import com.github.nullptroma.wallenc.domain.interfaces.IStorageAccessor
|
||||
import com.github.nullptroma.wallenc.domain.interfaces.IStorageSyncEngine
|
||||
import com.github.nullptroma.wallenc.domain.interfaces.IStorageSyncGroupStore
|
||||
import com.github.nullptroma.wallenc.domain.interfaces.IVaultsManager
|
||||
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournalEntry
|
||||
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncOperation
|
||||
import com.github.nullptroma.wallenc.domain.tasks.TaskProgressLabel
|
||||
import kotlinx.coroutines.Dispatchers
|
||||
import kotlinx.coroutines.sync.Mutex
|
||||
import kotlinx.coroutines.sync.withLock
|
||||
@@ -26,29 +27,29 @@ class StorageSyncEngine(
|
||||
private val syncGeneration = AtomicLong(0)
|
||||
|
||||
override suspend fun syncAllGroups(
|
||||
reportProgress: (suspend (fraction: Float?, label: String?) -> Unit)?,
|
||||
reportProgress: (suspend (fraction: Float?, label: TaskProgressLabel?) -> Unit)?,
|
||||
): Unit = withContext(Dispatchers.IO) {
|
||||
val reporter = reportProgress ?: { _: Float?, _: String? -> }
|
||||
val reporter = reportProgress ?: { _: Float?, _: TaskProgressLabel? -> }
|
||||
val groups = groupStore.getGroups()
|
||||
if (groups.isEmpty()) {
|
||||
reporter(null, "Storage sync: no groups configured")
|
||||
reporter(null, TaskProgressLabel.SyncNoGroups)
|
||||
return@withContext
|
||||
}
|
||||
reporter(null, "Storage sync: preparing ${groups.size} groups")
|
||||
reporter(null, TaskProgressLabel.SyncPreparing(groups.size))
|
||||
for (group in groups) {
|
||||
syncGroupInternal(
|
||||
groupId = group.id,
|
||||
reportProgress = reporter,
|
||||
)
|
||||
}
|
||||
reporter(null, "Storage sync: completed")
|
||||
reporter(null, TaskProgressLabel.SyncCompleted)
|
||||
}
|
||||
|
||||
override suspend fun syncGroup(
|
||||
groupId: String,
|
||||
reportProgress: (suspend (fraction: Float?, label: String?) -> Unit)?,
|
||||
reportProgress: (suspend (fraction: Float?, label: TaskProgressLabel?) -> Unit)?,
|
||||
): Unit = withContext(Dispatchers.IO) {
|
||||
val reporter = reportProgress ?: { _: Float?, _: String? -> }
|
||||
val reporter = reportProgress ?: { _: Float?, _: TaskProgressLabel? -> }
|
||||
syncGroupInternal(
|
||||
groupId = groupId,
|
||||
reportProgress = reporter,
|
||||
@@ -57,20 +58,20 @@ class StorageSyncEngine(
|
||||
|
||||
private suspend fun syncGroupInternal(
|
||||
groupId: String,
|
||||
reportProgress: suspend (fraction: Float?, label: String?) -> Unit,
|
||||
reportProgress: suspend (fraction: Float?, label: TaskProgressLabel?) -> Unit,
|
||||
) {
|
||||
reportProgress(null, "Storage sync: group \"$groupId\" preparing")
|
||||
reportProgress(null, TaskProgressLabel.SyncGroupPreparing(groupId))
|
||||
val mutex = groupMutexes.getOrPut(groupId) { Mutex() }
|
||||
mutex.withLock {
|
||||
val generationSnapshot = syncGeneration.incrementAndGet()
|
||||
val group = groupStore.getGroups().firstOrNull { it.id == groupId }
|
||||
if (group == null) {
|
||||
reportProgress(null, "Storage sync: group \"$groupId\" not found")
|
||||
reportProgress(null, TaskProgressLabel.SyncGroupNotFound(groupId))
|
||||
return
|
||||
}
|
||||
val storages = resolveStorages(group.storageUuids)
|
||||
if (storages.size < 2) {
|
||||
reportProgress(null, "Storage sync: group \"$groupId\" skipped (need at least 2 storages)")
|
||||
reportProgress(null, TaskProgressLabel.SyncGroupSkippedTooFewStorages(groupId))
|
||||
return
|
||||
}
|
||||
val incompatible = storages.filterNot { storage ->
|
||||
@@ -83,7 +84,7 @@ class StorageSyncEngine(
|
||||
if (incompatible.isNotEmpty()) {
|
||||
reportProgress(
|
||||
null,
|
||||
"Storage sync: group \"$groupId\" skipped (incompatible encryption: ${incompatible.size})",
|
||||
TaskProgressLabel.SyncGroupSkippedIncompatibleEncryption(groupId, incompatible.size),
|
||||
)
|
||||
return
|
||||
}
|
||||
@@ -91,12 +92,15 @@ class StorageSyncEngine(
|
||||
var leaseUntil = Instant.now().plusSeconds(SYNC_LOCK_LEASE_SECONDS)
|
||||
val lockedAccessors = mutableListOf<IStorageAccessor>()
|
||||
try {
|
||||
reportProgress(null, "Storage sync: group \"$groupId\" acquiring locks")
|
||||
reportProgress(null, TaskProgressLabel.SyncGroupAcquiringLocks(groupId))
|
||||
for ((lockIndex, storage) in storages.withIndex()) {
|
||||
reportProgress(null, "Storage sync: group \"$groupId\" lock ${lockIndex + 1}/${storages.size}")
|
||||
reportProgress(
|
||||
null,
|
||||
TaskProgressLabel.SyncGroupLockProgress(groupId, lockIndex + 1, storages.size),
|
||||
)
|
||||
val locked = storage.accessor.tryAcquireSyncLock(holderId, leaseUntil)
|
||||
if (!locked) {
|
||||
reportProgress(null, "Storage sync: group \"$groupId\" lock failed, group skipped")
|
||||
reportProgress(null, TaskProgressLabel.SyncGroupLockFailed(groupId))
|
||||
return
|
||||
}
|
||||
lockedAccessors.add(storage.accessor)
|
||||
@@ -105,7 +109,7 @@ class StorageSyncEngine(
|
||||
val latestByPath = mutableMapOf<String, StorageSyncJournalEntry>()
|
||||
val entriesByStorage = mutableMapOf<UUID, Map<String, StorageSyncJournalEntry>>()
|
||||
|
||||
reportProgress(null, "Storage sync: group \"$groupId\" reading journals")
|
||||
reportProgress(null, TaskProgressLabel.SyncGroupReadingJournals(groupId))
|
||||
for ((journalIndex, storage) in storages.withIndex()) {
|
||||
leaseUntil = renewLocksIfNeeded(
|
||||
groupId = groupId,
|
||||
@@ -114,10 +118,13 @@ class StorageSyncEngine(
|
||||
reportProgress = reportProgress,
|
||||
) ?: return
|
||||
if (syncGeneration.get() != generationSnapshot) {
|
||||
reportProgress(null, "Storage sync: group \"$groupId\" cancelled by newer run")
|
||||
reportProgress(null, TaskProgressLabel.SyncGroupCancelled(groupId))
|
||||
return
|
||||
}
|
||||
reportProgress(null, "Storage sync: group \"$groupId\" journal ${journalIndex + 1}/${storages.size}")
|
||||
reportProgress(
|
||||
null,
|
||||
TaskProgressLabel.SyncGroupJournalProgress(groupId, journalIndex + 1, storages.size),
|
||||
)
|
||||
val latestEntries = latestByPath(storage.accessor.readSyncJournal())
|
||||
entriesByStorage[storage.uuid] = latestEntries
|
||||
for ((path, entry) in latestEntries) {
|
||||
@@ -130,11 +137,14 @@ class StorageSyncEngine(
|
||||
|
||||
val mergedEntries = latestByPath.entries.toList()
|
||||
if (mergedEntries.isEmpty()) {
|
||||
reportProgress(null, "Storage sync: group \"$groupId\" no journal entries")
|
||||
reportProgress(null, TaskProgressLabel.SyncGroupNoJournalEntries(groupId))
|
||||
return
|
||||
}
|
||||
|
||||
reportProgress(null, "Storage sync: group \"$groupId\" processing ${mergedEntries.size} entries")
|
||||
reportProgress(
|
||||
null,
|
||||
TaskProgressLabel.SyncGroupProcessingEntries(groupId, mergedEntries.size),
|
||||
)
|
||||
for ((pathIndex, merged) in mergedEntries.withIndex()) {
|
||||
leaseUntil = renewLocksIfNeeded(
|
||||
groupId = groupId,
|
||||
@@ -144,9 +154,12 @@ class StorageSyncEngine(
|
||||
) ?: return
|
||||
val path = merged.key
|
||||
val winnerEntry = merged.value
|
||||
reportProgress(null, "Storage sync: group \"$groupId\" entry ${pathIndex + 1}/${mergedEntries.size}")
|
||||
reportProgress(
|
||||
null,
|
||||
TaskProgressLabel.SyncGroupEntryProgress(groupId, pathIndex + 1, mergedEntries.size),
|
||||
)
|
||||
if (syncGeneration.get() != generationSnapshot) {
|
||||
reportProgress(null, "Storage sync: group \"$groupId\" cancelled by newer run")
|
||||
reportProgress(null, TaskProgressLabel.SyncGroupCancelled(groupId))
|
||||
return
|
||||
}
|
||||
val sourceStorage = findSourceStorage(storages, entriesByStorage, path, winnerEntry)
|
||||
@@ -172,7 +185,7 @@ class StorageSyncEngine(
|
||||
}
|
||||
}
|
||||
}
|
||||
reportProgress(null, "Storage sync: group \"$groupId\" completed")
|
||||
reportProgress(null, TaskProgressLabel.SyncGroupCompleted(groupId))
|
||||
} finally {
|
||||
for (accessor in lockedAccessors) {
|
||||
runCatching {
|
||||
@@ -187,20 +200,20 @@ class StorageSyncEngine(
|
||||
groupId: String,
|
||||
lockedAccessors: List<IStorageAccessor>,
|
||||
currentLeaseUntil: Instant,
|
||||
reportProgress: suspend (fraction: Float?, label: String?) -> Unit,
|
||||
reportProgress: suspend (fraction: Float?, label: TaskProgressLabel?) -> Unit,
|
||||
): Instant? {
|
||||
val now = Instant.now()
|
||||
if (currentLeaseUntil.isAfter(now.plusSeconds(SYNC_LOCK_RENEW_MARGIN_SECONDS))) {
|
||||
return currentLeaseUntil
|
||||
}
|
||||
val nextLeaseUntil = now.plusSeconds(SYNC_LOCK_LEASE_SECONDS)
|
||||
reportProgress(null, "Storage sync: group \"$groupId\" renewing locks")
|
||||
reportProgress(null, TaskProgressLabel.SyncGroupRenewingLocks(groupId))
|
||||
for (accessor in lockedAccessors) {
|
||||
val renewed = runCatching {
|
||||
accessor.tryAcquireSyncLock(holderId, nextLeaseUntil)
|
||||
}.getOrElse { false }
|
||||
if (!renewed) {
|
||||
reportProgress(null, "Storage sync: group \"$groupId\" lock renewal failed")
|
||||
reportProgress(null, TaskProgressLabel.SyncGroupLockRenewalFailed(groupId))
|
||||
return null
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user