feat(sync): добавил механизм синхронизации хранилищ и управление группами
Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
@@ -0,0 +1,42 @@
|
||||
package com.github.nullptroma.wallenc.usecases
|
||||
|
||||
import com.github.nullptroma.wallenc.domain.interfaces.IStorageSyncGroupStore
|
||||
import com.github.nullptroma.wallenc.domain.interfaces.StorageSyncGroup
|
||||
import java.util.UUID
|
||||
|
||||
class ManageStorageSyncGroupsUseCase(
|
||||
private val groupStore: IStorageSyncGroupStore,
|
||||
) {
|
||||
suspend fun getGroups(): List<StorageSyncGroup> = groupStore.getGroups()
|
||||
|
||||
suspend fun createGroup(): StorageSyncGroup {
|
||||
val existingIds = getGroups().map { it.id }.toSet()
|
||||
var index = 1
|
||||
var candidate = "group-$index"
|
||||
while (candidate in existingIds) {
|
||||
index++
|
||||
candidate = "group-$index"
|
||||
}
|
||||
val group = StorageSyncGroup(id = candidate, storageUuids = emptySet())
|
||||
groupStore.putGroup(group)
|
||||
return group
|
||||
}
|
||||
|
||||
suspend fun removeGroup(groupId: String) {
|
||||
groupStore.removeGroup(groupId.trim())
|
||||
}
|
||||
|
||||
suspend fun addStorageToGroup(groupId: String, storageUuid: UUID) {
|
||||
val current = getGroups().firstOrNull { it.id == groupId } ?: return
|
||||
groupStore.putGroup(
|
||||
current.copy(storageUuids = current.storageUuids + storageUuid),
|
||||
)
|
||||
}
|
||||
|
||||
suspend fun removeStorageFromGroup(groupId: String, storageUuid: UUID) {
|
||||
val current = getGroups().firstOrNull { it.id == groupId } ?: return
|
||||
groupStore.putGroup(
|
||||
current.copy(storageUuids = current.storageUuids - storageUuid),
|
||||
)
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,41 @@
|
||||
package com.github.nullptroma.wallenc.usecases
|
||||
|
||||
import com.github.nullptroma.wallenc.domain.interfaces.IStorageSyncEngine
|
||||
import com.github.nullptroma.wallenc.domain.tasks.ITaskOrchestrator
|
||||
import com.github.nullptroma.wallenc.domain.tasks.TaskLogLevel
|
||||
import kotlinx.coroutines.Dispatchers
|
||||
import java.util.concurrent.atomic.AtomicBoolean
|
||||
|
||||
class RunStorageSyncUseCase(
|
||||
private val orchestrator: ITaskOrchestrator,
|
||||
private val syncEngine: IStorageSyncEngine,
|
||||
) {
|
||||
private val running = AtomicBoolean(false)
|
||||
|
||||
fun enqueue(reason: String) {
|
||||
orchestrator.enqueue(
|
||||
title = "Storage sync ($reason)",
|
||||
dispatcher = Dispatchers.IO,
|
||||
work = { ctx ->
|
||||
if (!running.compareAndSet(false, true)) {
|
||||
ctx.log(TaskLogLevel.Info, "Storage sync skipped: already running")
|
||||
return@enqueue
|
||||
}
|
||||
try {
|
||||
ctx.log(TaskLogLevel.Info, "Storage sync started")
|
||||
ctx.reportProgress(null, "Storage sync: started")
|
||||
syncEngine.syncAllGroups { fraction, label ->
|
||||
ctx.reportProgress(fraction, label)
|
||||
}
|
||||
ctx.reportProgress(null, "Storage sync: completed")
|
||||
} finally {
|
||||
running.set(false)
|
||||
}
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
suspend fun runBlocking() {
|
||||
syncEngine.syncAllGroups()
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,226 @@
|
||||
package com.github.nullptroma.wallenc.usecases
|
||||
|
||||
import com.github.nullptroma.wallenc.domain.interfaces.IStorage
|
||||
import com.github.nullptroma.wallenc.domain.interfaces.IStorageAccessor
|
||||
import com.github.nullptroma.wallenc.domain.interfaces.IStorageSyncEngine
|
||||
import com.github.nullptroma.wallenc.domain.interfaces.IStorageSyncGroupStore
|
||||
import com.github.nullptroma.wallenc.domain.interfaces.IVaultsManager
|
||||
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournalEntry
|
||||
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncOperation
|
||||
import kotlinx.coroutines.Dispatchers
|
||||
import kotlinx.coroutines.sync.Mutex
|
||||
import kotlinx.coroutines.sync.withLock
|
||||
import kotlinx.coroutines.withContext
|
||||
import java.time.Instant
|
||||
import java.util.UUID
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
import java.util.concurrent.atomic.AtomicLong
|
||||
|
||||
class StorageSyncEngine(
|
||||
private val vaultsManager: IVaultsManager,
|
||||
private val groupStore: IStorageSyncGroupStore,
|
||||
) : IStorageSyncEngine {
|
||||
private val holderId: String = UUID.randomUUID().toString()
|
||||
private val groupMutexes = ConcurrentHashMap<String, Mutex>()
|
||||
private val syncGeneration = AtomicLong(0)
|
||||
|
||||
override suspend fun syncAllGroups(
|
||||
reportProgress: (suspend (fraction: Float?, label: String?) -> Unit)?,
|
||||
): Unit = withContext(Dispatchers.IO) {
|
||||
val reporter = reportProgress ?: { _: Float?, _: String? -> }
|
||||
val groups = groupStore.getGroups()
|
||||
if (groups.isEmpty()) {
|
||||
reporter(null, "Storage sync: no groups configured")
|
||||
return@withContext
|
||||
}
|
||||
reporter(null, "Storage sync: preparing ${groups.size} groups")
|
||||
for (group in groups) {
|
||||
syncGroupInternal(
|
||||
groupId = group.id,
|
||||
reportProgress = reporter,
|
||||
)
|
||||
}
|
||||
reporter(null, "Storage sync: completed")
|
||||
}
|
||||
|
||||
override suspend fun syncGroup(
|
||||
groupId: String,
|
||||
reportProgress: (suspend (fraction: Float?, label: String?) -> Unit)?,
|
||||
): Unit = withContext(Dispatchers.IO) {
|
||||
val reporter = reportProgress ?: { _: Float?, _: String? -> }
|
||||
syncGroupInternal(
|
||||
groupId = groupId,
|
||||
reportProgress = reporter,
|
||||
)
|
||||
}
|
||||
|
||||
private suspend fun syncGroupInternal(
|
||||
groupId: String,
|
||||
reportProgress: suspend (fraction: Float?, label: String?) -> Unit,
|
||||
) {
|
||||
reportProgress(null, "Storage sync: group \"$groupId\" preparing")
|
||||
val mutex = groupMutexes.getOrPut(groupId) { Mutex() }
|
||||
mutex.withLock {
|
||||
val generationSnapshot = syncGeneration.incrementAndGet()
|
||||
val group = groupStore.getGroups().firstOrNull { it.id == groupId }
|
||||
if (group == null) {
|
||||
reportProgress(null, "Storage sync: group \"$groupId\" not found")
|
||||
return
|
||||
}
|
||||
val storages = resolveStorages(group.storageUuids)
|
||||
if (storages.size < 2) {
|
||||
reportProgress(null, "Storage sync: group \"$groupId\" skipped (need at least 2 storages)")
|
||||
return
|
||||
}
|
||||
|
||||
val leaseUntil = Instant.MAX
|
||||
val lockedAccessors = mutableListOf<IStorageAccessor>()
|
||||
try {
|
||||
reportProgress(null, "Storage sync: group \"$groupId\" acquiring locks")
|
||||
for ((lockIndex, storage) in storages.withIndex()) {
|
||||
reportProgress(null, "Storage sync: group \"$groupId\" lock ${lockIndex + 1}/${storages.size}")
|
||||
val locked = storage.accessor.tryAcquireSyncLock(holderId, leaseUntil)
|
||||
if (!locked) {
|
||||
reportProgress(null, "Storage sync: group \"$groupId\" lock failed, group skipped")
|
||||
return
|
||||
}
|
||||
lockedAccessors.add(storage.accessor)
|
||||
}
|
||||
|
||||
val latestByPath = mutableMapOf<String, StorageSyncJournalEntry>()
|
||||
val entriesByStorage = mutableMapOf<UUID, Map<String, StorageSyncJournalEntry>>()
|
||||
|
||||
reportProgress(null, "Storage sync: group \"$groupId\" reading journals")
|
||||
for ((journalIndex, storage) in storages.withIndex()) {
|
||||
if (syncGeneration.get() != generationSnapshot) {
|
||||
reportProgress(null, "Storage sync: group \"$groupId\" cancelled by newer run")
|
||||
return
|
||||
}
|
||||
reportProgress(null, "Storage sync: group \"$groupId\" journal ${journalIndex + 1}/${storages.size}")
|
||||
val latestEntries = latestByPath(storage.accessor.readSyncJournal())
|
||||
entriesByStorage[storage.uuid] = latestEntries
|
||||
for ((path, entry) in latestEntries) {
|
||||
val current = latestByPath[path]
|
||||
if (current == null || compareEntries(entry, current) > 0) {
|
||||
latestByPath[path] = entry
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
val mergedEntries = latestByPath.entries.toList()
|
||||
if (mergedEntries.isEmpty()) {
|
||||
reportProgress(null, "Storage sync: group \"$groupId\" no journal entries")
|
||||
return
|
||||
}
|
||||
|
||||
reportProgress(null, "Storage sync: group \"$groupId\" processing ${mergedEntries.size} entries")
|
||||
for ((pathIndex, merged) in mergedEntries.withIndex()) {
|
||||
val path = merged.key
|
||||
val winnerEntry = merged.value
|
||||
reportProgress(null, "Storage sync: group \"$groupId\" entry ${pathIndex + 1}/${mergedEntries.size}")
|
||||
if (syncGeneration.get() != generationSnapshot) {
|
||||
reportProgress(null, "Storage sync: group \"$groupId\" cancelled by newer run")
|
||||
return
|
||||
}
|
||||
val sourceStorage = findSourceStorage(storages, entriesByStorage, path, winnerEntry)
|
||||
if (sourceStorage == null && winnerEntry.operation == StorageSyncOperation.UPSERT) {
|
||||
continue
|
||||
}
|
||||
|
||||
for (target in storages) {
|
||||
if (target.uuid == sourceStorage?.uuid) {
|
||||
continue
|
||||
}
|
||||
val targetEntry = entriesByStorage[target.uuid]?.get(path)
|
||||
if (targetEntry != null && compareEntries(targetEntry, winnerEntry) >= 0) {
|
||||
continue
|
||||
}
|
||||
val applied = applyEntry(
|
||||
source = sourceStorage,
|
||||
target = target,
|
||||
entry = winnerEntry,
|
||||
)
|
||||
if (applied) {
|
||||
target.accessor.appendSyncJournal(listOf(winnerEntry))
|
||||
}
|
||||
}
|
||||
}
|
||||
reportProgress(null, "Storage sync: group \"$groupId\" completed")
|
||||
} finally {
|
||||
for (accessor in lockedAccessors) {
|
||||
runCatching {
|
||||
accessor.releaseSyncLock(holderId)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun resolveStorages(uuids: Set<UUID>): List<IStorage> {
|
||||
val byUuid = vaultsManager.allStorages.value.associateBy { it.uuid }
|
||||
return uuids.mapNotNull { byUuid[it] }
|
||||
}
|
||||
|
||||
private fun latestByPath(entries: List<StorageSyncJournalEntry>): Map<String, StorageSyncJournalEntry> {
|
||||
val map = mutableMapOf<String, StorageSyncJournalEntry>()
|
||||
for (entry in entries) {
|
||||
val current = map[entry.path]
|
||||
if (current == null || compareEntries(entry, current) > 0) {
|
||||
map[entry.path] = entry
|
||||
}
|
||||
}
|
||||
return map
|
||||
}
|
||||
|
||||
private fun findSourceStorage(
|
||||
storages: List<IStorage>,
|
||||
entriesByStorage: Map<UUID, Map<String, StorageSyncJournalEntry>>,
|
||||
path: String,
|
||||
winnerEntry: StorageSyncJournalEntry,
|
||||
): IStorage? {
|
||||
if (winnerEntry.operation == StorageSyncOperation.DELETE) {
|
||||
return storages.firstOrNull()
|
||||
}
|
||||
return storages.firstOrNull { storage ->
|
||||
val entry = entriesByStorage[storage.uuid]?.get(path) ?: return@firstOrNull false
|
||||
compareEntries(entry, winnerEntry) == 0
|
||||
}
|
||||
}
|
||||
|
||||
private suspend fun applyEntry(
|
||||
source: IStorage?,
|
||||
target: IStorage,
|
||||
entry: StorageSyncJournalEntry,
|
||||
): Boolean {
|
||||
when (entry.operation) {
|
||||
StorageSyncOperation.DELETE -> {
|
||||
return runCatching {
|
||||
target.accessor.delete(entry.path)
|
||||
}.isSuccess
|
||||
}
|
||||
|
||||
StorageSyncOperation.UPSERT -> {
|
||||
val sourceAccessor = source?.accessor ?: return false
|
||||
return runCatching {
|
||||
sourceAccessor.openRead(entry.path).use { input ->
|
||||
target.accessor.openWrite(entry.path).use { output ->
|
||||
input.copyTo(output)
|
||||
}
|
||||
}
|
||||
}.isSuccess
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun compareEntries(a: StorageSyncJournalEntry, b: StorageSyncJournalEntry): Int {
|
||||
val seqCmp = a.revision.sequence.compareTo(b.revision.sequence)
|
||||
if (seqCmp != 0) {
|
||||
return seqCmp
|
||||
}
|
||||
val actorCmp = a.revision.actorId.compareTo(b.revision.actorId)
|
||||
if (actorCmp != 0) {
|
||||
return actorCmp
|
||||
}
|
||||
return a.revision.createdAt.compareTo(b.revision.createdAt)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user