From d0f490a3fd3404f96370fe10841e476c91a27b15 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9F=D1=8B=D1=82=D0=BA=D0=BE=D0=B2=20=D0=A0=D0=BE=D0=BC?= =?UTF-8?q?=D0=B0=D0=BD?= Date: Thu, 21 May 2026 22:05:57 +0300 Subject: [PATCH] =?UTF-8?q?refactor(sync):=20=D0=BF=D0=B5=D1=80=D0=B5?= =?UTF-8?q?=D0=B2=D1=91=D0=BB=20=D0=B6=D1=83=D1=80=D0=BD=D0=B0=D0=BB=20?= =?UTF-8?q?=D0=BD=D0=B0=20map=20=D0=BF=D0=BE=20=D0=BF=D1=83=D1=82=D0=B8=20?= =?UTF-8?q?=D0=B8=20=D1=83=D0=B1=D1=80=D0=B0=D0=BB=20=D1=86=D0=B8=D0=BA?= =?UTF-8?q?=D0=BB=20debounce-sync?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Журнал хранится как словарь path→entry, служебные пути исключены из sync. Apply пишет файлы без записи в журнал; bootstrap не триггерит sync во время работы. --- .../wallenc/app/sync/StorageSyncBootstrap.kt | 22 +++++- .../common/StorageSyncJournalCodec.kt | 29 ++++++++ .../encrypt/EncryptedStorageAccessor.kt | 49 ++++++------- .../storages/local/LocalStorageAccessor.kt | 63 ++++++++-------- .../storages/yandex/YandexStorageAccessor.kt | 73 +++++++++---------- .../domain/datatypes/StorageSyncJournal.kt | 59 +++++++++++++++ .../domain/interfaces/IStorageAccessor.kt | 9 +-- .../wallenc/usecases/StorageSyncEngine.kt | 54 +++++--------- .../usecases/StorageSyncJournalMergeTest.kt | 45 ++++++++++++ .../wallenc/usecases/fakes/FakeStorage.kt | 8 +- .../usecases/fakes/FakeStorageAccessor.kt | 17 ++--- 11 files changed, 275 insertions(+), 153 deletions(-) create mode 100644 domain-vault/src/main/java/com/github/nullptroma/wallenc/domain/vault/storages/common/StorageSyncJournalCodec.kt create mode 100644 domain/src/main/java/com/github/nullptroma/wallenc/domain/datatypes/StorageSyncJournal.kt create mode 100644 usecases/src/test/java/com/github/nullptroma/wallenc/usecases/StorageSyncJournalMergeTest.kt diff --git a/app/src/main/java/com/github/nullptroma/wallenc/app/sync/StorageSyncBootstrap.kt b/app/src/main/java/com/github/nullptroma/wallenc/app/sync/StorageSyncBootstrap.kt index 36958fd..42e8e3b 100644 --- a/app/src/main/java/com/github/nullptroma/wallenc/app/sync/StorageSyncBootstrap.kt +++ b/app/src/main/java/com/github/nullptroma/wallenc/app/sync/StorageSyncBootstrap.kt @@ -1,5 +1,6 @@ package com.github.nullptroma.wallenc.app.sync +import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncPaths import com.github.nullptroma.wallenc.domain.interfaces.IVaultsManager import com.github.nullptroma.wallenc.ui.R import com.github.nullptroma.wallenc.ui.resources.UiStringResolver @@ -10,6 +11,7 @@ import kotlinx.coroutines.FlowPreview import kotlinx.coroutines.SupervisorJob import kotlinx.coroutines.flow.collectLatest import kotlinx.coroutines.flow.debounce +import kotlinx.coroutines.flow.filter import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.merge import kotlinx.coroutines.launch @@ -35,14 +37,28 @@ class StorageSyncBootstrap @Inject constructor( } val triggers = storages.flatMap { storage -> listOf( - storage.accessor.filesUpdates.map {}, - storage.accessor.dirsUpdates.map {}, + storage.accessor.filesUpdates + .filter { page -> + page.data.any { file -> + StorageSyncPaths.isSyncableUserPath(file.metaInfo.path) + } + } + .map {}, + storage.accessor.dirsUpdates + .filter { page -> + page.data.any { dir -> + StorageSyncPaths.isSyncableUserPath(dir.metaInfo.path) + } + } + .map {}, ) } merge(*triggers.toTypedArray()) .debounce(DEBOUNCE_AFTER_CHANGE_MS) .collect { - // RunStorageSyncUseCase.enqueue отбрасывает повтор, пока sync уже в очереди/в работе. + if (syncRunner.syncRunning.value) { + return@collect + } syncRunner.enqueue( displayTitle = uiStrings(R.string.task_title_storage_sync_background), logReason = "debounce", diff --git a/domain-vault/src/main/java/com/github/nullptroma/wallenc/domain/vault/storages/common/StorageSyncJournalCodec.kt b/domain-vault/src/main/java/com/github/nullptroma/wallenc/domain/vault/storages/common/StorageSyncJournalCodec.kt new file mode 100644 index 0000000..209eed8 --- /dev/null +++ b/domain-vault/src/main/java/com/github/nullptroma/wallenc/domain/vault/storages/common/StorageSyncJournalCodec.kt @@ -0,0 +1,29 @@ +package com.github.nullptroma.wallenc.domain.vault.storages.common + +import com.fasterxml.jackson.databind.ObjectMapper +import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournal +import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournalEntry +import java.util.LinkedHashMap + +internal object StorageSyncJournalCodec { + private val journalJavaType = ObjectMapper().typeFactory.constructMapType( + LinkedHashMap::class.java, + String::class.java, + StorageSyncJournalEntry::class.java, + ) + + fun read(mapper: ObjectMapper, bytes: ByteArray): StorageSyncJournal { + if (bytes.isEmpty()) { + return emptyMap() + } + return runCatching { + @Suppress("UNCHECKED_CAST") + mapper.readValue(bytes, journalJavaType) as StorageSyncJournal + }.getOrElse { + emptyMap() + } + } + + fun write(mapper: ObjectMapper, journal: StorageSyncJournal): ByteArray = + mapper.writeValueAsBytes(journal) +} diff --git a/domain-vault/src/main/java/com/github/nullptroma/wallenc/domain/vault/storages/encrypt/EncryptedStorageAccessor.kt b/domain-vault/src/main/java/com/github/nullptroma/wallenc/domain/vault/storages/encrypt/EncryptedStorageAccessor.kt index f6c0b9c..be1d87a 100644 --- a/domain-vault/src/main/java/com/github/nullptroma/wallenc/domain/vault/storages/encrypt/EncryptedStorageAccessor.kt +++ b/domain-vault/src/main/java/com/github/nullptroma/wallenc/domain/vault/storages/encrypt/EncryptedStorageAccessor.kt @@ -1,6 +1,8 @@ package com.github.nullptroma.wallenc.domain.vault.storages.encrypt +import com.fasterxml.jackson.module.kotlin.readValue import com.github.nullptroma.wallenc.domain.errors.WallencException +import com.github.nullptroma.wallenc.domain.vault.storages.common.StorageSyncJournalCodec import com.github.nullptroma.wallenc.domain.vault.storages.common.readSystemFileBytesOrEmpty import com.github.nullptroma.wallenc.domain.vault.utils.CloseHandledStreamExtension.Companion.onClosed import com.github.nullptroma.wallenc.domain.vault.utils.CloseHandledStreamExtension.Companion.onClosing @@ -15,7 +17,10 @@ import com.github.nullptroma.wallenc.domain.interfaces.IDirectory import com.github.nullptroma.wallenc.domain.interfaces.IFile import com.github.nullptroma.wallenc.domain.interfaces.IMetaInfo import com.github.nullptroma.wallenc.domain.interfaces.IStorageAccessor +import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournal import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournalEntry +import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournalMerge +import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncPaths import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncLock import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncOperation import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncRevision @@ -257,7 +262,8 @@ class EncryptedStorageAccessor( appendSyncEntry(path = path, operation = StorageSyncOperation.DELETE) } - override suspend fun openWrite(path: String): OutputStream = openWriteInternal(path, recordJournal = true) + override suspend fun openWrite(path: String, recordSyncJournal: Boolean): OutputStream = + openWriteInternal(path, recordJournal = recordSyncJournal) override suspend fun openRead(path: String): InputStream { val stream = source.openRead(encryptPath(path)) @@ -290,35 +296,22 @@ class EncryptedStorageAccessor( } } - override suspend fun readSyncJournal(): List { + override suspend fun readSyncJournal(): StorageSyncJournal { val bytes = readSystemFileBytesOrEmpty { openReadSystemFile(SYNC_JOURNAL_FILENAME) } - if (bytes.isEmpty()) { - return emptyList() - } - return runCatching { - val journalType = jackson.typeFactory.constructCollectionType( - List::class.java, - StorageSyncJournalEntry::class.java, - ) - jackson.readValue>(bytes, journalType) - }.getOrElse { - emptyList() - } + return StorageSyncJournalCodec.read(jackson, bytes) } - override suspend fun appendSyncJournal(entries: List) { + override suspend fun putSyncJournalEntries(entries: StorageSyncJournal) { if (entries.isEmpty()) { return } - val next = readSyncJournal().toMutableList().apply { addAll(entries) } - openWriteSystemFile(SYNC_JOURNAL_FILENAME).use { out -> - jackson.writeValue(out, next) - } + val merged = StorageSyncJournalMerge.merge(readSyncJournal(), entries) + writeSyncJournal(merged) } - override suspend fun rewriteSyncJournal(entries: List) { + private suspend fun writeSyncJournal(journal: StorageSyncJournal) { openWriteSystemFile(SYNC_JOURNAL_FILENAME).use { out -> - jackson.writeValue(out, entries) + jackson.writeValue(out, journal) } } @@ -376,15 +369,15 @@ class EncryptedStorageAccessor( } private suspend fun appendSyncEntry(path: String, operation: StorageSyncOperation) { - val cleanedPath = if (path.startsWith("/")) path else "/$path" - if (cleanedPath.startsWith("/$systemHiddenDirName/")) { + val cleanedPath = StorageSyncPaths.normalize(path) + if (!StorageSyncPaths.isSyncableUserPath(cleanedPath)) { return } - val entries = readSyncJournal() - val nextSequence = (entries.maxOfOrNull { it.revision.sequence } ?: 0L) + 1L - appendSyncJournal( - listOf( - StorageSyncJournalEntry( + val journal = readSyncJournal() + val nextSequence = (journal.values.maxOfOrNull { it.revision.sequence } ?: 0L) + 1L + putSyncJournalEntries( + mapOf( + cleanedPath to StorageSyncJournalEntry( path = cleanedPath, operation = operation, revision = StorageSyncRevision( diff --git a/domain-vault/src/main/java/com/github/nullptroma/wallenc/domain/vault/storages/local/LocalStorageAccessor.kt b/domain-vault/src/main/java/com/github/nullptroma/wallenc/domain/vault/storages/local/LocalStorageAccessor.kt index 20205fc..79b9fc8 100644 --- a/domain-vault/src/main/java/com/github/nullptroma/wallenc/domain/vault/storages/local/LocalStorageAccessor.kt +++ b/domain-vault/src/main/java/com/github/nullptroma/wallenc/domain/vault/storages/local/LocalStorageAccessor.kt @@ -1,6 +1,7 @@ package com.github.nullptroma.wallenc.domain.vault.storages.local import com.github.nullptroma.wallenc.domain.errors.WallencException +import com.github.nullptroma.wallenc.domain.vault.storages.common.StorageSyncJournalCodec import com.github.nullptroma.wallenc.domain.vault.storages.common.readSystemFileBytesOrEmpty import com.fasterxml.jackson.core.JacksonException @@ -15,9 +16,12 @@ import com.github.nullptroma.wallenc.domain.interfaces.IDirectory import com.github.nullptroma.wallenc.domain.interfaces.IFile import com.github.nullptroma.wallenc.domain.interfaces.IMetaInfo import com.github.nullptroma.wallenc.domain.interfaces.IStorageAccessor +import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournal import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournalEntry +import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournalMerge import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncLock import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncOperation +import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncPaths import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncRevision import kotlinx.coroutines.CoroutineDispatcher import kotlinx.coroutines.CoroutineScope @@ -523,7 +527,7 @@ class LocalStorageAccessor( } } - override suspend fun openWrite(path: String): OutputStream = withContext(ioDispatcher) { + override suspend fun openWrite(path: String, recordSyncJournal: Boolean): OutputStream = withContext(ioDispatcher) { touchFileInternal(path, recordJournal = false) val pair = LocalStorageFilePair.from(_filesystemBasePath, path) ?: throw WallencException.Storage.FileNotFound() @@ -531,7 +535,9 @@ class LocalStorageAccessor( CoroutineScope(ioDispatcher).launch { touchFileInternal(path, recordJournal = false) scanSizeAndNumOfFiles() - appendSyncEntry(path = path, operation = StorageSyncOperation.UPSERT) + if (recordSyncJournal) { + appendSyncEntry(path = path, operation = StorageSyncOperation.UPSERT) + } } } } @@ -572,33 +578,22 @@ class LocalStorageAccessor( return@withContext file.outputStream() } - override suspend fun readSyncJournal(): List = withContext(ioDispatcher) { + override suspend fun readSyncJournal(): StorageSyncJournal = withContext(ioDispatcher) { val bytes = readSystemFileBytesOrEmpty { openReadSystemFile(SYNC_JOURNAL_FILENAME) } - if (bytes.isEmpty()) { - return@withContext emptyList() - } - return@withContext runCatching { - jackson.readValue>(bytes) - }.getOrElse { - emptyList() - } + StorageSyncJournalCodec.read(jackson, bytes) } - override suspend fun appendSyncJournal(entries: List) = withContext(ioDispatcher) { + override suspend fun putSyncJournalEntries(entries: StorageSyncJournal) = withContext(ioDispatcher) { if (entries.isEmpty()) { return@withContext } - val next = readSyncJournal().toMutableList().apply { - addAll(entries) - } - openWriteSystemFile(SYNC_JOURNAL_FILENAME).use { out -> - jackson.writeValue(out, next) - } + val merged = StorageSyncJournalMerge.merge(readSyncJournal(), entries) + writeSyncJournal(merged) } - override suspend fun rewriteSyncJournal(entries: List) = withContext(ioDispatcher) { + private suspend fun writeSyncJournal(journal: StorageSyncJournal) { openWriteSystemFile(SYNC_JOURNAL_FILENAME).use { out -> - jackson.writeValue(out, entries) + jackson.writeValue(out, journal) } } @@ -722,19 +717,25 @@ class LocalStorageAccessor( } private suspend fun appendSyncEntry(path: String, operation: StorageSyncOperation) { - val cleanedPath = if (path.startsWith("/")) path else "/$path" - val entries = readSyncJournal() - val nextSequence = (entries.maxOfOrNull { it.revision.sequence } ?: 0L) + 1L - val entry = StorageSyncJournalEntry( - path = cleanedPath, - operation = operation, - revision = StorageSyncRevision( - sequence = nextSequence, - actorId = syncActorId, - createdAt = Instant.now(), + val cleanedPath = StorageSyncPaths.normalize(path) + if (!StorageSyncPaths.isSyncableUserPath(cleanedPath)) { + return + } + val journal = readSyncJournal() + val nextSequence = (journal.values.maxOfOrNull { it.revision.sequence } ?: 0L) + 1L + putSyncJournalEntries( + mapOf( + cleanedPath to StorageSyncJournalEntry( + path = cleanedPath, + operation = operation, + revision = StorageSyncRevision( + sequence = nextSequence, + actorId = syncActorId, + createdAt = Instant.now(), + ), + ), ), ) - appendSyncJournal(listOf(entry)) } companion object { diff --git a/domain-vault/src/main/java/com/github/nullptroma/wallenc/domain/vault/storages/yandex/YandexStorageAccessor.kt b/domain-vault/src/main/java/com/github/nullptroma/wallenc/domain/vault/storages/yandex/YandexStorageAccessor.kt index 27d9757..84ac5ba 100644 --- a/domain-vault/src/main/java/com/github/nullptroma/wallenc/domain/vault/storages/yandex/YandexStorageAccessor.kt +++ b/domain-vault/src/main/java/com/github/nullptroma/wallenc/domain/vault/storages/yandex/YandexStorageAccessor.kt @@ -2,6 +2,7 @@ package com.github.nullptroma.wallenc.domain.vault.storages.yandex import com.github.nullptroma.wallenc.domain.errors.WallencException import com.github.nullptroma.wallenc.domain.vault.errors.toVaultWallencException +import com.github.nullptroma.wallenc.domain.vault.storages.common.StorageSyncJournalCodec import com.github.nullptroma.wallenc.domain.vault.storages.common.readSystemFileBytesOrEmpty import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper @@ -17,7 +18,10 @@ import com.github.nullptroma.wallenc.domain.datatypes.DataPage import com.github.nullptroma.wallenc.domain.interfaces.IDirectory import com.github.nullptroma.wallenc.domain.interfaces.IFile import com.github.nullptroma.wallenc.domain.interfaces.IStorageAccessor +import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournal import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournalEntry +import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournalMerge +import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncPaths import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncLock import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncOperation import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncRevision @@ -527,13 +531,13 @@ class YandexStorageAccessor( appendSyncEntry(path = path, operation = StorageSyncOperation.DELETE) } - override suspend fun openWrite(path: String): OutputStream = withContext(ioDispatcher) { + override suspend fun openWrite(path: String, recordSyncJournal: Boolean): OutputStream = withContext(ioDispatcher) { touchParentDirs(path) val tmp = File.createTempFile("wallenc-yandex-", ".upload") val fos = FileOutputStream(tmp) fos.onClosed { runCommitAfterStreamClosed { - commitUploadedFile(path, tmp) + commitUploadedFile(path, tmp, recordSyncJournal) } } } @@ -571,37 +575,22 @@ class YandexStorageAccessor( } } - override suspend fun readSyncJournal(): List = withContext(ioDispatcher) { + override suspend fun readSyncJournal(): StorageSyncJournal = withContext(ioDispatcher) { val bytes = readSystemFileBytesOrEmpty { openReadSystemFile(SYNC_JOURNAL_FILENAME) } - if (bytes.isEmpty()) { - return@withContext emptyList() - } - return@withContext runCatching { - val journalType = statsMapper.typeFactory.constructCollectionType( - List::class.java, - StorageSyncJournalEntry::class.java, - ) - statsMapper.readValue>(bytes, journalType) - }.getOrElse { - emptyList() - } + StorageSyncJournalCodec.read(statsMapper, bytes) } - override suspend fun appendSyncJournal(entries: List) = withContext(ioDispatcher) { + override suspend fun putSyncJournalEntries(entries: StorageSyncJournal) = withContext(ioDispatcher) { if (entries.isEmpty()) { return@withContext } - val next = readSyncJournal().toMutableList().apply { - addAll(entries) - } - openWriteSystemFile(SYNC_JOURNAL_FILENAME).use { out -> - statsMapper.writeValue(out, next) - } + val merged = StorageSyncJournalMerge.merge(readSyncJournal(), entries) + writeSyncJournal(merged) } - override suspend fun rewriteSyncJournal(entries: List) = withContext(ioDispatcher) { + private suspend fun writeSyncJournal(journal: StorageSyncJournal) { openWriteSystemFile(SYNC_JOURNAL_FILENAME).use { out -> - statsMapper.writeValue(out, entries) + statsMapper.writeValue(out, journal) } } @@ -684,7 +673,7 @@ class YandexStorageAccessor( } } - private suspend fun commitUploadedFile(path: String, tmp: File) { + private suspend fun commitUploadedFile(path: String, tmp: File, recordSyncJournal: Boolean) { try { val diskPath = toDiskPath(path) val prior = guard { repo.getOrNull(diskPath) } @@ -708,27 +697,35 @@ class YandexStorageAccessor( info?.let { _filesUpdates.emit(DataPage(listOf(it), pageLength = 1, pageIndex = 0)) } - appendSyncEntry(path = path, operation = StorageSyncOperation.UPSERT) + if (recordSyncJournal) { + appendSyncEntry(path = path, operation = StorageSyncOperation.UPSERT) + } } finally { tmp.delete() } } private suspend fun appendSyncEntry(path: String, operation: StorageSyncOperation) { - val cleanedPath = if (path.startsWith("/")) path else "/$path" - val entries = readSyncJournal() - val nextSequence = (entries.maxOfOrNull { it.revision.sequence } ?: 0L) + 1L - val entry = StorageSyncJournalEntry( - path = cleanedPath, - operation = operation, - revision = StorageSyncRevision( - sequence = nextSequence, - actorId = syncActorId, - createdAt = Instant.now(), + val cleanedPath = StorageSyncPaths.normalize(path) + if (!StorageSyncPaths.isSyncableUserPath(cleanedPath)) { + return + } + val journal = readSyncJournal() + val nextSequence = (journal.values.maxOfOrNull { it.revision.sequence } ?: 0L) + 1L + putSyncJournalEntries( + mapOf( + cleanedPath to StorageSyncJournalEntry( + path = cleanedPath, + operation = operation, + revision = StorageSyncRevision( + sequence = nextSequence, + actorId = syncActorId, + createdAt = Instant.now(), + ), + originStorageUuid = storageUuid, + ), ), - originStorageUuid = storageUuid, ) - appendSyncJournal(listOf(entry)) } private suspend fun touchParentDirs(path: String) { diff --git a/domain/src/main/java/com/github/nullptroma/wallenc/domain/datatypes/StorageSyncJournal.kt b/domain/src/main/java/com/github/nullptroma/wallenc/domain/datatypes/StorageSyncJournal.kt new file mode 100644 index 0000000..89ab055 --- /dev/null +++ b/domain/src/main/java/com/github/nullptroma/wallenc/domain/datatypes/StorageSyncJournal.kt @@ -0,0 +1,59 @@ +package com.github.nullptroma.wallenc.domain.datatypes + +/** Журнал синхронизации: один актуальный [StorageSyncJournalEntry] на нормализованный путь. */ +typealias StorageSyncJournal = Map + +object StorageSyncPaths { + fun normalize(path: String): String = if (path.startsWith("/")) path else "/$path" + + /** + * Пути, которые участвуют в sync и попадают в журнал при пользовательских операциях. + * Служебные каталоги, lock/journal/meta и файлы внутри *-enc-dir исключены. + */ + fun isSyncableUserPath(path: String): Boolean { + val p = normalize(path) + if (p == "/" || p.isBlank()) return false + if (p == "/wallenc-yandex-system" || p.startsWith("/wallenc-yandex-system/")) return false + if (p.contains("-enc-dir/") || p.endsWith("-enc-dir")) return false + val name = p.substringAfterLast('/') + if (name == "sync-journal.json" || name == "sync-lock.json") return false + if (name.endsWith(".enc-meta") || name.endsWith(".storage-info")) return false + return true + } +} + +object StorageSyncJournalMerge { + fun merge(into: StorageSyncJournal, entries: Map): StorageSyncJournal { + if (entries.isEmpty()) return into + val result = into.toMutableMap() + for ((rawPath, entry) in entries) { + val path = StorageSyncPaths.normalize(rawPath) + if (!StorageSyncPaths.isSyncableUserPath(path)) continue + val normalizedEntry = entry.copy(path = path) + val current = result[path] + if (current == null || compareEntries(normalizedEntry, current) > 0) { + result[path] = normalizedEntry + } + } + return result + } + + fun merge(into: StorageSyncJournal, entry: StorageSyncJournalEntry): StorageSyncJournal = + merge(into, mapOf(entry.path to entry)) + + fun mergeAll(journals: Collection): StorageSyncJournal { + var acc: StorageSyncJournal = emptyMap() + for (journal in journals) { + acc = merge(acc, journal) + } + return acc + } + + private fun compareEntries(a: StorageSyncJournalEntry, b: StorageSyncJournalEntry): Int { + val seqCmp = a.revision.sequence.compareTo(b.revision.sequence) + if (seqCmp != 0) return seqCmp + val actorCmp = a.revision.actorId.compareTo(b.revision.actorId) + if (actorCmp != 0) return actorCmp + return a.revision.createdAt.compareTo(b.revision.createdAt) + } +} diff --git a/domain/src/main/java/com/github/nullptroma/wallenc/domain/interfaces/IStorageAccessor.kt b/domain/src/main/java/com/github/nullptroma/wallenc/domain/interfaces/IStorageAccessor.kt index ae9ad9c..989e3f7 100644 --- a/domain/src/main/java/com/github/nullptroma/wallenc/domain/interfaces/IStorageAccessor.kt +++ b/domain/src/main/java/com/github/nullptroma/wallenc/domain/interfaces/IStorageAccessor.kt @@ -1,7 +1,7 @@ package com.github.nullptroma.wallenc.domain.interfaces import com.github.nullptroma.wallenc.domain.datatypes.DataPage -import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournalEntry +import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournal import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncLock import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.SharedFlow @@ -40,7 +40,7 @@ interface IStorageAccessor { suspend fun touchFile(path: String) suspend fun touchDir(path: String) suspend fun delete(path: String) - suspend fun openWrite(path: String): OutputStream + suspend fun openWrite(path: String, recordSyncJournal: Boolean = true): OutputStream suspend fun openRead(path: String): InputStream suspend fun moveToTrash(path: String) @@ -52,9 +52,8 @@ interface IStorageAccessor { suspend fun openReadSystemFile(name: String): InputStream suspend fun openWriteSystemFile(name: String): OutputStream - suspend fun readSyncJournal(): List - suspend fun appendSyncJournal(entries: List) - suspend fun rewriteSyncJournal(entries: List) + suspend fun readSyncJournal(): StorageSyncJournal + suspend fun putSyncJournalEntries(entries: StorageSyncJournal) suspend fun readSyncLock(): StorageSyncLock? suspend fun tryAcquireSyncLock(holderId: String, leaseUntil: Instant): Boolean diff --git a/usecases/src/main/java/com/github/nullptroma/wallenc/usecases/StorageSyncEngine.kt b/usecases/src/main/java/com/github/nullptroma/wallenc/usecases/StorageSyncEngine.kt index e88f743..118c701 100644 --- a/usecases/src/main/java/com/github/nullptroma/wallenc/usecases/StorageSyncEngine.kt +++ b/usecases/src/main/java/com/github/nullptroma/wallenc/usecases/StorageSyncEngine.kt @@ -1,7 +1,10 @@ package com.github.nullptroma.wallenc.usecases +import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournal import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournalEntry +import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournalMerge import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncOperation +import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncPaths import com.github.nullptroma.wallenc.domain.interfaces.IStorage import com.github.nullptroma.wallenc.domain.interfaces.IStorageAccessor import com.github.nullptroma.wallenc.domain.interfaces.IStorageSyncEngine @@ -113,8 +116,8 @@ class StorageSyncEngine @Inject constructor( lockedAccessors.add(storage.accessor) } - val latestByPath = mutableMapOf() - val entriesByStorage = mutableMapOf>() + val mergedByPath = mutableMapOf() + val entriesByStorage = mutableMapOf() reportProgress(null, TaskProgressLabel.SyncGroupReadingJournals(groupId)) for ((journalIndex, storage) in storages.withIndex()) { @@ -132,17 +135,14 @@ class StorageSyncEngine @Inject constructor( null, TaskProgressLabel.SyncGroupJournalProgress(groupId, journalIndex + 1, storages.size), ) - val latestEntries = latestByPath(storage.accessor.readSyncJournal()) - entriesByStorage[storage.uuid] = latestEntries - for ((path, entry) in latestEntries) { - val current = latestByPath[path] - if (current == null || compareEntries(entry, current) > 0) { - latestByPath[path] = entry - } - } + val journal = filterSyncableJournal(storage.accessor.readSyncJournal()) + entriesByStorage[storage.uuid] = journal + mergedByPath.putAll( + StorageSyncJournalMerge.merge(mergedByPath, journal), + ) } - val mergedEntries = latestByPath.entries.toList() + val mergedEntries = mergedByPath.entries.toList() if (mergedEntries.isEmpty()) { reportProgress(null, TaskProgressLabel.SyncGroupNoJournalEntries(groupId)) return @@ -191,13 +191,12 @@ class StorageSyncEngine @Inject constructor( entry = winnerEntry, ) if (applied) { - target.accessor.appendSyncJournal(listOf(winnerEntry)) + target.accessor.putSyncJournalEntries(mapOf(path to winnerEntry)) } else { applyFailures++ } } } - compactSyncJournals(storages) if (applyFailures > 0) { reportProgress( null, @@ -215,6 +214,10 @@ class StorageSyncEngine @Inject constructor( } } + private fun filterSyncableJournal(journal: StorageSyncJournal): StorageSyncJournal { + return journal.filterKeys { StorageSyncPaths.isSyncableUserPath(it) } + } + private suspend fun renewLocksIfNeeded( groupId: String, lockedAccessors: List, @@ -243,20 +246,9 @@ class StorageSyncEngine @Inject constructor( return uuids.mapNotNull(findStorageUseCase::find) } - private fun latestByPath(entries: List): Map { - val map = mutableMapOf() - for (entry in entries) { - val current = map[entry.path] - if (current == null || compareEntries(entry, current) > 0) { - map[entry.path] = entry - } - } - return map - } - private fun findSourceStorage( storages: List, - entriesByStorage: Map>, + entriesByStorage: Map, path: String, winnerEntry: StorageSyncJournalEntry, ): IStorage? { @@ -273,16 +265,6 @@ class StorageSyncEngine @Inject constructor( } } - private suspend fun compactSyncJournals(storages: List) { - for (storage in storages) { - val entries = storage.accessor.readSyncJournal() - val compacted = latestByPath(entries).values.toList() - if (compacted.size < entries.size) { - storage.accessor.rewriteSyncJournal(compacted) - } - } - } - private suspend fun applyEntry( source: IStorage?, target: IStorage, @@ -305,7 +287,7 @@ class StorageSyncEngine @Inject constructor( val sourceAccessor = source?.accessor ?: return false runCatching { sourceAccessor.openRead(entry.path).use { input -> - target.accessor.openWrite(entry.path).use { output -> + target.accessor.openWrite(entry.path, recordSyncJournal = false).use { output -> input.copyTo(output) } } diff --git a/usecases/src/test/java/com/github/nullptroma/wallenc/usecases/StorageSyncJournalMergeTest.kt b/usecases/src/test/java/com/github/nullptroma/wallenc/usecases/StorageSyncJournalMergeTest.kt new file mode 100644 index 0000000..1defdbc --- /dev/null +++ b/usecases/src/test/java/com/github/nullptroma/wallenc/usecases/StorageSyncJournalMergeTest.kt @@ -0,0 +1,45 @@ +package com.github.nullptroma.wallenc.usecases + +import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournalEntry +import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournalMerge +import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncOperation +import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncPaths +import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncRevision +import org.junit.Assert.assertEquals +import org.junit.Assert.assertFalse +import org.junit.Assert.assertTrue +import org.junit.Test +import java.time.Instant + +class StorageSyncJournalMergeTest { + + @Test + fun mergeKeepsSingleEntryPerPath() { + val older = entry(path = "/a.txt", sequence = 1L) + val newer = entry(path = "/a.txt", sequence = 2L) + val merged = StorageSyncJournalMerge.merge( + mapOf("/a.txt" to older), + mapOf("/a.txt" to newer), + ) + assertEquals(1, merged.size) + assertEquals(2L, merged["/a.txt"]?.revision?.sequence) + } + + @Test + fun isSyncableUserPathExcludesEncDirAndJournal() { + assertFalse(StorageSyncPaths.isSyncableUserPath("/88a048ed-enc-dir/sync-journal.json")) + assertFalse(StorageSyncPaths.isSyncableUserPath("/wallenc-yandex-system/sync-journal.json")) + assertTrue(StorageSyncPaths.isSyncableUserPath("/wallenc-data/text-secrets.json")) + } + + private fun entry(path: String, sequence: Long): StorageSyncJournalEntry = + StorageSyncJournalEntry( + path = path, + operation = StorageSyncOperation.UPSERT, + revision = StorageSyncRevision( + sequence = sequence, + actorId = "actor", + createdAt = Instant.EPOCH, + ), + ) +} diff --git a/usecases/src/test/java/com/github/nullptroma/wallenc/usecases/fakes/FakeStorage.kt b/usecases/src/test/java/com/github/nullptroma/wallenc/usecases/fakes/FakeStorage.kt index 9a3442b..3949a04 100644 --- a/usecases/src/test/java/com/github/nullptroma/wallenc/usecases/fakes/FakeStorage.kt +++ b/usecases/src/test/java/com/github/nullptroma/wallenc/usecases/fakes/FakeStorage.kt @@ -3,6 +3,7 @@ package com.github.nullptroma.wallenc.usecases.fakes import com.github.nullptroma.wallenc.domain.datatypes.StorageEncryptionInfo import com.github.nullptroma.wallenc.domain.datatypes.StorageMetaLoadState import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournalEntry +import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournalMerge import com.github.nullptroma.wallenc.domain.interfaces.IStorage import com.github.nullptroma.wallenc.domain.interfaces.IStorageAccessor import com.github.nullptroma.wallenc.domain.interfaces.IStorageMetaInfo @@ -46,10 +47,13 @@ class FakeStorage( fun fileBytes(path: String): ByteArray? = accessorImpl.dataFiles[path] fun addSyncJournalEntry(entry: StorageSyncJournalEntry) { - accessorImpl.syncJournal.add(entry) + accessorImpl.syncJournal = StorageSyncJournalMerge.merge( + accessorImpl.syncJournal, + mapOf(entry.path to entry), + ) } - fun syncJournalEntries(): List = accessorImpl.syncJournal.toList() + fun syncJournalEntries(): List = accessorImpl.syncJournal.values.toList() fun setAcquireLockResult(result: Boolean) { accessorImpl.acquireLockResult = result diff --git a/usecases/src/test/java/com/github/nullptroma/wallenc/usecases/fakes/FakeStorageAccessor.kt b/usecases/src/test/java/com/github/nullptroma/wallenc/usecases/fakes/FakeStorageAccessor.kt index 0e9801c..f4ee453 100644 --- a/usecases/src/test/java/com/github/nullptroma/wallenc/usecases/fakes/FakeStorageAccessor.kt +++ b/usecases/src/test/java/com/github/nullptroma/wallenc/usecases/fakes/FakeStorageAccessor.kt @@ -1,7 +1,9 @@ package com.github.nullptroma.wallenc.usecases.fakes import com.github.nullptroma.wallenc.domain.datatypes.DataPage +import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournal import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournalEntry +import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournalMerge import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncLock import com.github.nullptroma.wallenc.domain.interfaces.IDirectory import com.github.nullptroma.wallenc.domain.interfaces.IFile @@ -25,7 +27,7 @@ class FakeStorageAccessor : IStorageAccessor { private val systemFiles: MutableMap = mutableMapOf() private val _filesUpdates = MutableSharedFlow>(extraBufferCapacity = 16) - var syncJournal: MutableList = mutableListOf() + var syncJournal: StorageSyncJournal = emptyMap() var syncLock: StorageSyncLock? = null var acquireLockResult: Boolean = true @@ -65,7 +67,7 @@ class FakeStorageAccessor : IStorageAccessor { dataFiles.remove(path) } - override suspend fun openWrite(path: String): OutputStream { + override suspend fun openWrite(path: String, recordSyncJournal: Boolean): OutputStream { return object : ByteArrayOutputStream() { override fun close() { dataFiles[path] = toByteArray() @@ -104,15 +106,10 @@ class FakeStorageAccessor : IStorageAccessor { } } - override suspend fun readSyncJournal(): List = syncJournal.toList() + override suspend fun readSyncJournal(): StorageSyncJournal = syncJournal - override suspend fun appendSyncJournal(entries: List) { - syncJournal.addAll(entries) - } - - override suspend fun rewriteSyncJournal(entries: List) { - syncJournal.clear() - syncJournal.addAll(entries) + override suspend fun putSyncJournalEntries(entries: StorageSyncJournal) { + syncJournal = StorageSyncJournalMerge.merge(syncJournal, entries) } override suspend fun readSyncLock(): StorageSyncLock? = syncLock