diff --git a/domain-vault/src/main/java/com/github/nullptroma/wallenc/domain/vault/storages/common/StorageSyncJournalBuffer.kt b/domain-vault/src/main/java/com/github/nullptroma/wallenc/domain/vault/storages/common/StorageSyncJournalBuffer.kt new file mode 100644 index 0000000..74e5d6d --- /dev/null +++ b/domain-vault/src/main/java/com/github/nullptroma/wallenc/domain/vault/storages/common/StorageSyncJournalBuffer.kt @@ -0,0 +1,112 @@ +package com.github.nullptroma.wallenc.domain.vault.storages.common + +import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournal +import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournalEntry +import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournalMerge +import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncRevision +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock +import java.util.UUID + +/** + * Буфер журнала sync: накопление записей и безопасный flush (без потери pending при ошибке записи). + */ +class StorageSyncJournalBuffer( + private val syncActorId: String, + private val originStorageUuid: UUID?, + private val readJournal: suspend () -> StorageSyncJournal, + private val writeJournal: suspend (StorageSyncJournal) -> Unit, +) { + private val pendingMutex = Mutex() + private val flushMutex = Mutex() + private var pendingJournalEntries: StorageSyncJournal = emptyMap() + + @Volatile + private var sequenceHighWatermark: Long? = null + + suspend fun flushPending() { + flushMutex.withLock { + flushPendingUnderLock() + } + } + + suspend fun putEntries(entries: StorageSyncJournal) { + flushPending() + if (entries.isEmpty()) { + return + } + val current = readJournal() + val merged = StorageSyncJournalMerge.merge(current, entries) + if (merged == current) { + return + } + writeJournal(merged) + refreshSequenceHighWatermark(merged) + } + + suspend fun appendEntry(path: String, entry: StorageSyncJournalEntry) { + pendingMutex.withLock { + pendingJournalEntries = pendingJournalEntries + (path to entry) + } + flushPending() + } + + suspend fun nextSequence(): Long { + flushPending() + val diskMax = readJournal().values.maxOfOrNull { it.revision.sequence } ?: 0L + val pendingMax = pendingMutex.withLock { + pendingJournalEntries.values.maxOfOrNull { it.revision.sequence } ?: 0L + } + val base = maxOf(diskMax, pendingMax, sequenceHighWatermark ?: 0L) + val next = base + 1L + sequenceHighWatermark = next + return next + } + + fun buildEntry( + path: String, + operation: com.github.nullptroma.wallenc.domain.datatypes.StorageSyncOperation, + sequence: Long, + ): StorageSyncJournalEntry { + return StorageSyncJournalEntry( + path = path, + operation = operation, + revision = StorageSyncRevision( + sequence = sequence, + actorId = syncActorId, + createdAt = java.time.Instant.now(), + ), + originStorageUuid = originStorageUuid, + ) + } + + private suspend fun flushPendingUnderLock() { + val pending = pendingMutex.withLock { + if (pendingJournalEntries.isEmpty()) { + return + } + val snapshot = pendingJournalEntries + pendingJournalEntries = emptyMap() + snapshot + } + try { + val current = readJournal() + val merged = StorageSyncJournalMerge.merge(current, pending) + if (merged != current) { + writeJournal(merged) + } + refreshSequenceHighWatermark(merged) + } catch (e: Exception) { + pendingMutex.withLock { + pendingJournalEntries = StorageSyncJournalMerge.merge(pending, pendingJournalEntries) + } + throw e + } + } + + private fun refreshSequenceHighWatermark(journal: StorageSyncJournal) { + val maxSeq = journal.values.maxOfOrNull { it.revision.sequence } ?: return + val current = sequenceHighWatermark + sequenceHighWatermark = if (current == null) maxSeq else maxOf(current, maxSeq) + } +} diff --git a/domain-vault/src/main/java/com/github/nullptroma/wallenc/domain/vault/storages/encrypt/EncryptedStorageAccessor.kt b/domain-vault/src/main/java/com/github/nullptroma/wallenc/domain/vault/storages/encrypt/EncryptedStorageAccessor.kt index be1d87a..2bb63ac 100644 --- a/domain-vault/src/main/java/com/github/nullptroma/wallenc/domain/vault/storages/encrypt/EncryptedStorageAccessor.kt +++ b/domain-vault/src/main/java/com/github/nullptroma/wallenc/domain/vault/storages/encrypt/EncryptedStorageAccessor.kt @@ -2,6 +2,7 @@ package com.github.nullptroma.wallenc.domain.vault.storages.encrypt import com.fasterxml.jackson.module.kotlin.readValue import com.github.nullptroma.wallenc.domain.errors.WallencException +import com.github.nullptroma.wallenc.domain.vault.storages.common.StorageSyncJournalBuffer import com.github.nullptroma.wallenc.domain.vault.storages.common.StorageSyncJournalCodec import com.github.nullptroma.wallenc.domain.vault.storages.common.readSystemFileBytesOrEmpty import com.github.nullptroma.wallenc.domain.vault.utils.CloseHandledStreamExtension.Companion.onClosed @@ -18,14 +19,12 @@ import com.github.nullptroma.wallenc.domain.interfaces.IFile import com.github.nullptroma.wallenc.domain.interfaces.IMetaInfo import com.github.nullptroma.wallenc.domain.interfaces.IStorageAccessor import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournal -import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournalEntry -import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournalMerge import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncPaths import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncLock import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncOperation -import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncRevision import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.DisposableHandle +import kotlinx.coroutines.runBlocking import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.MutableSharedFlow import kotlinx.coroutines.flow.MutableStateFlow @@ -52,6 +51,12 @@ class EncryptedStorageAccessor( ) : IStorageAccessor, DisposableHandle { private val syncActorId = UUID.randomUUID().toString() private val syncLockMutex = Mutex() + private val journalBuffer = StorageSyncJournalBuffer( + syncActorId = syncActorId, + originStorageUuid = null, + readJournal = { readSyncJournalUnchecked() }, + writeJournal = { writeSyncJournal(it) }, + ) private val _size = MutableStateFlow(null) override val size: StateFlow = _size @@ -257,9 +262,11 @@ class EncryptedStorageAccessor( source.touchDir(encryptPath(path)) } - override suspend fun delete(path: String) { - source.delete(encryptPath(path)) - appendSyncEntry(path = path, operation = StorageSyncOperation.DELETE) + override suspend fun delete(path: String, recordSyncJournal: Boolean) { + source.delete(encryptPath(path), recordSyncJournal = false) + if (recordSyncJournal) { + appendSyncEntry(path = path, operation = StorageSyncOperation.DELETE) + } } override suspend fun openWrite(path: String, recordSyncJournal: Boolean): OutputStream = @@ -270,12 +277,17 @@ class EncryptedStorageAccessor( return dataEncryptor.decryptStream(stream) } - override suspend fun moveToTrash(path: String) { - source.moveToTrash(encryptPath(path)) - appendSyncEntry(path = path, operation = StorageSyncOperation.TRASH) + override suspend fun moveToTrash(path: String, recordSyncJournal: Boolean) { + source.moveToTrash(encryptPath(path), recordSyncJournal = false) + if (recordSyncJournal) { + appendSyncEntry(path = path, operation = StorageSyncOperation.TRASH) + } } override fun dispose() { + runBlocking { + runCatching { journalBuffer.flushPending() } + } dataEncryptor.dispose() } @@ -297,16 +309,21 @@ class EncryptedStorageAccessor( } override suspend fun readSyncJournal(): StorageSyncJournal { - val bytes = readSystemFileBytesOrEmpty { openReadSystemFile(SYNC_JOURNAL_FILENAME) } - return StorageSyncJournalCodec.read(jackson, bytes) + journalBuffer.flushPending() + return readSyncJournalUnchecked() + } + + override suspend fun flushPendingSyncJournal() { + journalBuffer.flushPending() } override suspend fun putSyncJournalEntries(entries: StorageSyncJournal) { - if (entries.isEmpty()) { - return - } - val merged = StorageSyncJournalMerge.merge(readSyncJournal(), entries) - writeSyncJournal(merged) + journalBuffer.putEntries(entries) + } + + private suspend fun readSyncJournalUnchecked(): StorageSyncJournal { + val bytes = readSystemFileBytesOrEmpty { openReadSystemFile(SYNC_JOURNAL_FILENAME) } + return StorageSyncJournalCodec.read(jackson, bytes) } private suspend fun writeSyncJournal(journal: StorageSyncJournal) { @@ -373,21 +390,9 @@ class EncryptedStorageAccessor( if (!StorageSyncPaths.isSyncableUserPath(cleanedPath)) { return } - val journal = readSyncJournal() - val nextSequence = (journal.values.maxOfOrNull { it.revision.sequence } ?: 0L) + 1L - putSyncJournalEntries( - mapOf( - cleanedPath to StorageSyncJournalEntry( - path = cleanedPath, - operation = operation, - revision = StorageSyncRevision( - sequence = nextSequence, - actorId = syncActorId, - createdAt = Instant.now(), - ), - ), - ), - ) + val sequence = journalBuffer.nextSequence() + val entry = journalBuffer.buildEntry(cleanedPath, operation, sequence) + journalBuffer.appendEntry(cleanedPath, entry) } private suspend fun openWriteInternal(path: String, recordJournal: Boolean): OutputStream { diff --git a/domain-vault/src/main/java/com/github/nullptroma/wallenc/domain/vault/storages/local/LocalStorageAccessor.kt b/domain-vault/src/main/java/com/github/nullptroma/wallenc/domain/vault/storages/local/LocalStorageAccessor.kt index 79b9fc8..f5f4000 100644 --- a/domain-vault/src/main/java/com/github/nullptroma/wallenc/domain/vault/storages/local/LocalStorageAccessor.kt +++ b/domain-vault/src/main/java/com/github/nullptroma/wallenc/domain/vault/storages/local/LocalStorageAccessor.kt @@ -1,6 +1,7 @@ package com.github.nullptroma.wallenc.domain.vault.storages.local import com.github.nullptroma.wallenc.domain.errors.WallencException +import com.github.nullptroma.wallenc.domain.vault.storages.common.StorageSyncJournalBuffer import com.github.nullptroma.wallenc.domain.vault.storages.common.StorageSyncJournalCodec import com.github.nullptroma.wallenc.domain.vault.storages.common.readSystemFileBytesOrEmpty @@ -26,6 +27,8 @@ import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncRevision import kotlinx.coroutines.CoroutineDispatcher import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock import kotlinx.coroutines.flow.MutableSharedFlow import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.SharedFlow @@ -73,6 +76,13 @@ class LocalStorageAccessor( private val _dirsUpdates = MutableSharedFlow>() override val dirsUpdates: SharedFlow> = _dirsUpdates + private val journalBuffer = StorageSyncJournalBuffer( + syncActorId = syncActorId, + originStorageUuid = null, + readJournal = { readSyncJournalUnchecked() }, + writeJournal = { writeSyncJournal(it) }, + ) + suspend fun init() = withContext(ioDispatcher) { // запускам сканирование хранилища scanSizeAndNumOfFiles() @@ -513,7 +523,7 @@ class LocalStorageAccessor( createDir(path) } - override suspend fun delete(path: String) = withContext(ioDispatcher) { + override suspend fun delete(path: String, recordSyncJournal: Boolean) = withContext(ioDispatcher) { if (path == "/" || path.isBlank()) { throw WallencException.Storage.DeleteRootForbidden() } @@ -523,7 +533,9 @@ class LocalStorageAccessor( else pair.file.delete() pair.metaFile.delete() scanSizeAndNumOfFiles() - appendSyncEntry(path = path, operation = StorageSyncOperation.DELETE) + if (recordSyncJournal) { + appendSyncEntry(path = path, operation = StorageSyncOperation.DELETE) + } } } @@ -548,12 +560,14 @@ class LocalStorageAccessor( return@withContext pair.file.inputStream() } - override suspend fun moveToTrash(path: String) = withContext(ioDispatcher) { + override suspend fun moveToTrash(path: String, recordSyncJournal: Boolean) = withContext(ioDispatcher) { val pair = LocalStorageFilePair.from(_filesystemBasePath, path) ?: throw WallencException.Storage.FileNotFound() val newMeta = pair.meta.copy(isDeleted = true) writeMeta(pair.metaFile, newMeta) - appendSyncEntry(path = path, operation = StorageSyncOperation.TRASH) + if (recordSyncJournal) { + appendSyncEntry(path = path, operation = StorageSyncOperation.TRASH) + } } override suspend fun openReadSystemFile(name: String): InputStream = withContext(ioDispatcher) { @@ -579,16 +593,21 @@ class LocalStorageAccessor( } override suspend fun readSyncJournal(): StorageSyncJournal = withContext(ioDispatcher) { - val bytes = readSystemFileBytesOrEmpty { openReadSystemFile(SYNC_JOURNAL_FILENAME) } - StorageSyncJournalCodec.read(jackson, bytes) + journalBuffer.flushPending() + readSyncJournalUnchecked() + } + + override suspend fun flushPendingSyncJournal() = withContext(ioDispatcher) { + journalBuffer.flushPending() } override suspend fun putSyncJournalEntries(entries: StorageSyncJournal) = withContext(ioDispatcher) { - if (entries.isEmpty()) { - return@withContext - } - val merged = StorageSyncJournalMerge.merge(readSyncJournal(), entries) - writeSyncJournal(merged) + journalBuffer.putEntries(entries) + } + + private suspend fun readSyncJournalUnchecked(): StorageSyncJournal { + val bytes = readSystemFileBytesOrEmpty { openReadSystemFile(SYNC_JOURNAL_FILENAME) } + return StorageSyncJournalCodec.read(jackson, bytes) } private suspend fun writeSyncJournal(journal: StorageSyncJournal) { @@ -721,21 +740,9 @@ class LocalStorageAccessor( if (!StorageSyncPaths.isSyncableUserPath(cleanedPath)) { return } - val journal = readSyncJournal() - val nextSequence = (journal.values.maxOfOrNull { it.revision.sequence } ?: 0L) + 1L - putSyncJournalEntries( - mapOf( - cleanedPath to StorageSyncJournalEntry( - path = cleanedPath, - operation = operation, - revision = StorageSyncRevision( - sequence = nextSequence, - actorId = syncActorId, - createdAt = Instant.now(), - ), - ), - ), - ) + val sequence = journalBuffer.nextSequence() + val entry = journalBuffer.buildEntry(cleanedPath, operation, sequence) + journalBuffer.appendEntry(cleanedPath, entry) } companion object { diff --git a/domain-vault/src/main/java/com/github/nullptroma/wallenc/domain/vault/storages/yandex/YandexStorageAccessor.kt b/domain-vault/src/main/java/com/github/nullptroma/wallenc/domain/vault/storages/yandex/YandexStorageAccessor.kt index 84ac5ba..bd97f02 100644 --- a/domain-vault/src/main/java/com/github/nullptroma/wallenc/domain/vault/storages/yandex/YandexStorageAccessor.kt +++ b/domain-vault/src/main/java/com/github/nullptroma/wallenc/domain/vault/storages/yandex/YandexStorageAccessor.kt @@ -2,6 +2,7 @@ package com.github.nullptroma.wallenc.domain.vault.storages.yandex import com.github.nullptroma.wallenc.domain.errors.WallencException import com.github.nullptroma.wallenc.domain.vault.errors.toVaultWallencException +import com.github.nullptroma.wallenc.domain.vault.storages.common.StorageSyncJournalBuffer import com.github.nullptroma.wallenc.domain.vault.storages.common.StorageSyncJournalCodec import com.github.nullptroma.wallenc.domain.vault.storages.common.readSystemFileBytesOrEmpty @@ -19,17 +20,16 @@ import com.github.nullptroma.wallenc.domain.interfaces.IDirectory import com.github.nullptroma.wallenc.domain.interfaces.IFile import com.github.nullptroma.wallenc.domain.interfaces.IStorageAccessor import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournal -import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournalEntry -import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournalMerge import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncPaths import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncLock import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncOperation -import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncRevision import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CoroutineDispatcher import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Job import kotlinx.coroutines.delay +import kotlin.coroutines.coroutineContext +import kotlinx.coroutines.ensureActive import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.MutableSharedFlow import kotlinx.coroutines.flow.MutableStateFlow @@ -42,6 +42,7 @@ import kotlinx.coroutines.flow.flowOn import kotlinx.coroutines.flow.stateIn import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.withContext import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.withLock import kotlinx.coroutines.withContext @@ -95,6 +96,13 @@ class YandexStorageAccessor( private var statsPersistJob: Job? = null + private val journalBuffer = StorageSyncJournalBuffer( + syncActorId = syncActorId, + originStorageUuid = storageUuid, + readJournal = { readSyncJournalUnchecked() }, + writeJournal = { writeSyncJournal(it) }, + ) + @Volatile private var systemDirEnsured: Boolean = false @@ -240,6 +248,7 @@ class YandexStorageAccessor( val queue = ArrayDeque() queue.add(relDir) while (queue.isNotEmpty()) { + coroutineContext.ensureActive() val rel = queue.removeFirst() if (isSystemRel(rel)) continue val (files, dirs) = listImmediateChildren(rel) @@ -285,6 +294,7 @@ class YandexStorageAccessor( val dirs = mutableListOf() var offset = 0 while (true) { + coroutineContext.ensureActive() val res = guard { repo.list(diskPath, API_LIST_LIMIT, offset) } val items = res.embedded?.items.orEmpty() for (it in items) { @@ -303,7 +313,7 @@ class YandexStorageAccessor( } private suspend fun getMetadataAfterWrite(diskPath: String): ResourceDto { - val maxAttempts = 6 + val maxAttempts = 3 repeat(maxAttempts) { attempt -> try { return guard { repo.get(diskPath) } @@ -488,8 +498,8 @@ class YandexStorageAccessor( if (created) { _numberOfFiles.value = (_numberOfFiles.value ?: 0) + 1 persistStatsImmediate() + appendSyncEntry(path = path, operation = StorageSyncOperation.UPSERT) } - appendSyncEntry(path = path, operation = StorageSyncOperation.UPSERT) } override suspend fun touchDir(path: String): Unit = withContext(ioDispatcher) { @@ -506,7 +516,7 @@ class YandexStorageAccessor( } } - override suspend fun delete(path: String) = withContext(ioDispatcher) { + override suspend fun delete(path: String, recordSyncJournal: Boolean) = withContext(ioDispatcher) { if (path == "/" || path.isBlank()) { throw WallencException.Storage.DeleteRootForbidden() } @@ -528,7 +538,9 @@ class YandexStorageAccessor( } guard { repo.delete(diskPath, permanently = true) } scheduleStatsPersist() - appendSyncEntry(path = path, operation = StorageSyncOperation.DELETE) + if (recordSyncJournal) { + appendSyncEntry(path = path, operation = StorageSyncOperation.DELETE) + } } override suspend fun openWrite(path: String, recordSyncJournal: Boolean): OutputStream = withContext(ioDispatcher) { @@ -546,9 +558,11 @@ class YandexStorageAccessor( guard { repo.openDownloadStream(toDiskPath(path)) } } - override suspend fun moveToTrash(path: String) = withContext(ioDispatcher) { + override suspend fun moveToTrash(path: String, recordSyncJournal: Boolean) = withContext(ioDispatcher) { patchCustomProps(path, mapOf(PROP_DELETED to "true")) - appendSyncEntry(path = path, operation = StorageSyncOperation.TRASH) + if (recordSyncJournal) { + appendSyncEntry(path = path, operation = StorageSyncOperation.TRASH) + } } override suspend fun openReadSystemFile(name: String): InputStream = withContext(ioDispatcher) { @@ -576,16 +590,21 @@ class YandexStorageAccessor( } override suspend fun readSyncJournal(): StorageSyncJournal = withContext(ioDispatcher) { - val bytes = readSystemFileBytesOrEmpty { openReadSystemFile(SYNC_JOURNAL_FILENAME) } - StorageSyncJournalCodec.read(statsMapper, bytes) + journalBuffer.flushPending() + readSyncJournalUnchecked() + } + + override suspend fun flushPendingSyncJournal() = withContext(ioDispatcher) { + journalBuffer.flushPending() } override suspend fun putSyncJournalEntries(entries: StorageSyncJournal) = withContext(ioDispatcher) { - if (entries.isEmpty()) { - return@withContext - } - val merged = StorageSyncJournalMerge.merge(readSyncJournal(), entries) - writeSyncJournal(merged) + journalBuffer.putEntries(entries) + } + + private suspend fun readSyncJournalUnchecked(): StorageSyncJournal { + val bytes = readSystemFileBytesOrEmpty { openReadSystemFile(SYNC_JOURNAL_FILENAME) } + return StorageSyncJournalCodec.read(statsMapper, bytes) } private suspend fun writeSyncJournal(journal: StorageSyncJournal) { @@ -668,8 +687,10 @@ class YandexStorageAccessor( * Выполняется из [OutputStream.close]; ошибки upload пробрасываются вызывающему коду. */ private fun runCommitAfterStreamClosed(block: suspend () -> Unit) { - runBlocking(ioDispatcher) { - block() + runBlocking { + withContext(ioDispatcher) { + block() + } } } @@ -710,22 +731,9 @@ class YandexStorageAccessor( if (!StorageSyncPaths.isSyncableUserPath(cleanedPath)) { return } - val journal = readSyncJournal() - val nextSequence = (journal.values.maxOfOrNull { it.revision.sequence } ?: 0L) + 1L - putSyncJournalEntries( - mapOf( - cleanedPath to StorageSyncJournalEntry( - path = cleanedPath, - operation = operation, - revision = StorageSyncRevision( - sequence = nextSequence, - actorId = syncActorId, - createdAt = Instant.now(), - ), - originStorageUuid = storageUuid, - ), - ), - ) + val sequence = journalBuffer.nextSequence() + val entry = journalBuffer.buildEntry(cleanedPath, operation, sequence) + journalBuffer.appendEntry(cleanedPath, entry) } private suspend fun touchParentDirs(path: String) { diff --git a/domain-vault/src/test/java/com/github/nullptroma/wallenc/domain/vault/storages/common/StorageSyncJournalBufferTest.kt b/domain-vault/src/test/java/com/github/nullptroma/wallenc/domain/vault/storages/common/StorageSyncJournalBufferTest.kt new file mode 100644 index 0000000..9bb783e --- /dev/null +++ b/domain-vault/src/test/java/com/github/nullptroma/wallenc/domain/vault/storages/common/StorageSyncJournalBufferTest.kt @@ -0,0 +1,41 @@ +package com.github.nullptroma.wallenc.domain.vault.storages.common + +import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournalEntry +import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncOperation +import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncRevision +import kotlinx.coroutines.runBlocking +import org.junit.Assert.assertEquals +import org.junit.Assert.assertTrue +import org.junit.Test +import java.time.Instant +import java.util.concurrent.atomic.AtomicReference + +class StorageSyncJournalBufferTest { + + @Test + fun flushRestoresPendingOnWriteFailure() = runBlocking { + val disk = AtomicReference(emptyMap()) + var shouldFail = true + val buffer = StorageSyncJournalBuffer( + syncActorId = "actor", + originStorageUuid = null, + readJournal = { disk.get() }, + writeJournal = { + if (shouldFail) { + error("disk unavailable") + } + disk.set(it) + }, + ) + val entry = buffer.buildEntry("/a.txt", StorageSyncOperation.UPSERT, 1L) + try { + buffer.appendEntry("/a.txt", entry) + } catch (_: IllegalStateException) { + // expected + } + shouldFail = false + buffer.flushPending() + assertTrue(disk.get().containsKey("/a.txt")) + assertEquals(1L, disk.get()["/a.txt"]?.revision?.sequence) + } +} diff --git a/domain/src/main/java/com/github/nullptroma/wallenc/domain/interfaces/IStorageAccessor.kt b/domain/src/main/java/com/github/nullptroma/wallenc/domain/interfaces/IStorageAccessor.kt index 989e3f7..f636084 100644 --- a/domain/src/main/java/com/github/nullptroma/wallenc/domain/interfaces/IStorageAccessor.kt +++ b/domain/src/main/java/com/github/nullptroma/wallenc/domain/interfaces/IStorageAccessor.kt @@ -39,10 +39,10 @@ interface IStorageAccessor { suspend fun setHidden(path: String, hidden: Boolean) suspend fun touchFile(path: String) suspend fun touchDir(path: String) - suspend fun delete(path: String) + suspend fun delete(path: String, recordSyncJournal: Boolean = true) suspend fun openWrite(path: String, recordSyncJournal: Boolean = true): OutputStream suspend fun openRead(path: String): InputStream - suspend fun moveToTrash(path: String) + suspend fun moveToTrash(path: String, recordSyncJournal: Boolean = true) /** * Системный sidecar-файл для логических нужд хранилища (мета, ключи и т.п.). @@ -53,6 +53,10 @@ interface IStorageAccessor { suspend fun openWriteSystemFile(name: String): OutputStream suspend fun readSyncJournal(): StorageSyncJournal + + /** Сбрасывает отложенные записи журнала на носитель (перед sync и при закрытии storage). */ + suspend fun flushPendingSyncJournal() = Unit + suspend fun putSyncJournalEntries(entries: StorageSyncJournal) suspend fun readSyncLock(): StorageSyncLock? diff --git a/usecases/src/test/java/com/github/nullptroma/wallenc/usecases/StorageSyncEngineTest.kt b/usecases/src/test/java/com/github/nullptroma/wallenc/usecases/StorageSyncEngineTest.kt index 9a8237e..feddc50 100644 --- a/usecases/src/test/java/com/github/nullptroma/wallenc/usecases/StorageSyncEngineTest.kt +++ b/usecases/src/test/java/com/github/nullptroma/wallenc/usecases/StorageSyncEngineTest.kt @@ -2,6 +2,7 @@ package com.github.nullptroma.wallenc.usecases import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournalEntry import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncOperation +import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncPaths import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncRevision import com.github.nullptroma.wallenc.domain.interfaces.StorageSyncGroup import com.github.nullptroma.wallenc.domain.interfaces.StorageSyncGroupEncryptionKind @@ -10,8 +11,11 @@ import com.github.nullptroma.wallenc.usecases.fakes.FakeStorage import com.github.nullptroma.wallenc.usecases.fakes.FakeStorageAccessor import com.github.nullptroma.wallenc.usecases.fakes.FakeStorageSyncGroupStore import com.github.nullptroma.wallenc.usecases.fakes.FakeVaultsManager +import kotlinx.coroutines.CancellationException +import kotlinx.coroutines.async import kotlinx.coroutines.runBlocking import org.junit.Assert.assertArrayEquals +import org.junit.Assert.assertEquals import org.junit.Assert.assertNull import org.junit.Assert.assertTrue import org.junit.Test @@ -19,6 +23,8 @@ import java.time.Instant class StorageSyncEngineTest { + private fun norm(path: String): String = StorageSyncPaths.normalize(path) + @Test fun syncAllGroupsReportsNoGroupsWhenEmpty() = runBlocking { val labels = mutableListOf() @@ -63,7 +69,7 @@ class StorageSyncEngineTest { assertArrayEquals(payload, target.fileBytes(path)) assertTrue(labels.any { it is TaskProgressLabel.SyncGroupCompleted }) - assertTrue(target.syncJournalEntries().any { it.path == path }) + assertTrue(target.syncJournalEntries().any { it.path == norm(path) }) } @Test @@ -111,6 +117,81 @@ class StorageSyncEngineTest { engine.syncGroup(group.id) { _, _ -> } assertNull(target.fileBytes(path)) + val targetEntry = target.syncJournalEntries().single { it.path == norm(path) } + assertEquals(2L, targetEntry.revision.sequence) + assertEquals("actor-b", targetEntry.revision.actorId) + assertEquals(StorageSyncOperation.DELETE, targetEntry.operation) + } + + @Test + fun syncSkipsWhenTargetRevisionAlreadyWinner() = runBlocking { + val source = FakeStorage() + val target = FakeStorage() + val path = "already-synced.txt" + val payload = "same".encodeToByteArray() + source.putFile(path, payload) + target.putFile(path, payload) + + val winner = StorageSyncJournalEntry( + path = path, + operation = StorageSyncOperation.UPSERT, + revision = StorageSyncRevision( + sequence = 5L, + actorId = "winner-actor", + createdAt = Instant.parse("2024-08-01T00:00:00Z"), + ), + size = payload.size.toLong(), + ) + source.addSyncJournalEntry(winner) + target.addSyncJournalEntry(winner) + + val group = StorageSyncGroup( + id = "skip-group", + storageUuids = setOf(source.uuid, target.uuid), + encryptionKind = StorageSyncGroupEncryptionKind.NONE, + ) + val engine = createEngine( + storages = listOf(source, target), + groups = listOf(group), + ) + engine.syncGroup(group.id) { _, _ -> } + + val targetJournal = target.syncJournalEntries().single { it.path == norm(path) } + assertEquals(winner.revision, targetJournal.revision) + assertEquals(winner.operation, targetJournal.operation) + } + + @Test + fun openReadDoesNotChangeJournal() = runBlocking { + val storage = FakeStorage() + val path = "read-only.txt" + storage.putFile(path, "data".encodeToByteArray()) + val before = storage.syncJournalEntries().size + + storage.accessor.openRead(path).use { it.readBytes() } + + assertEquals(before, storage.syncJournalEntries().size) + } + + @Test + fun deleteWithRecordSyncJournalFalseDoesNotBumpSequence() = runBlocking { + val storage = FakeStorage() + val path = "to-delete.txt" + storage.putFile(path, "x".encodeToByteArray()) + storage.addSyncJournalEntry( + StorageSyncJournalEntry( + path = path, + operation = StorageSyncOperation.UPSERT, + revision = StorageSyncRevision(10L, "prior", Instant.EPOCH), + ), + ) + + storage.accessor.delete(path, recordSyncJournal = false) + + assertNull(storage.fileBytes(path)) + val entry = storage.syncJournalEntries().single { it.path == norm(path) } + assertEquals(10L, entry.revision.sequence) + assertEquals(StorageSyncOperation.UPSERT, entry.operation) } @Test @@ -145,7 +226,11 @@ class StorageSyncEngineTest { engine.syncGroup(group.id) { _, _ -> } assertArrayEquals(payload, target.fileBytes(path)) - assertTrue(path in (target.accessor as FakeStorageAccessor).trashedPaths) + assertTrue(norm(path) in (target.accessor as FakeStorageAccessor).trashedPaths) + val targetEntry = target.syncJournalEntries().single { it.path == norm(path) } + assertEquals(3L, targetEntry.revision.sequence) + assertEquals("actor-trash", targetEntry.revision.actorId) + assertEquals(StorageSyncOperation.TRASH, targetEntry.operation) } @Test @@ -221,6 +306,45 @@ class StorageSyncEngineTest { assertNull((second.accessor as FakeStorageAccessor).syncLock) } + @Test + fun syncGroupCooperativeCancellationReleasesLocks() = runBlocking { + val source = FakeStorage() + val target = FakeStorage() + val path = "slow.txt" + val payload = "payload".encodeToByteArray() + source.putFile(path, payload) + (source.accessor as FakeStorageAccessor).openReadDelayMs = 5_000 + source.addSyncJournalEntry( + StorageSyncJournalEntry( + path = path, + operation = StorageSyncOperation.UPSERT, + revision = StorageSyncRevision(1L, "actor", Instant.EPOCH), + size = payload.size.toLong(), + ), + ) + val group = StorageSyncGroup( + id = "cancel-group", + storageUuids = setOf(source.uuid, target.uuid), + encryptionKind = StorageSyncGroupEncryptionKind.NONE, + ) + val engine = createEngine( + storages = listOf(source, target), + groups = listOf(group), + ) + val job = async { + engine.syncGroup(group.id) { _, _ -> } + } + kotlinx.coroutines.delay(50) + job.cancel() + try { + job.await() + } catch (_: CancellationException) { + // expected + } + assertNull((source.accessor as FakeStorageAccessor).syncLock) + assertNull((target.accessor as FakeStorageAccessor).syncLock) + } + @Test fun syncGroupReleasesLocksWhenJournalEmpty() = runBlocking { val first = FakeStorage() diff --git a/usecases/src/test/java/com/github/nullptroma/wallenc/usecases/fakes/FakeStorage.kt b/usecases/src/test/java/com/github/nullptroma/wallenc/usecases/fakes/FakeStorage.kt index 3949a04..99902be 100644 --- a/usecases/src/test/java/com/github/nullptroma/wallenc/usecases/fakes/FakeStorage.kt +++ b/usecases/src/test/java/com/github/nullptroma/wallenc/usecases/fakes/FakeStorage.kt @@ -41,10 +41,12 @@ class FakeStorage( } fun putFile(path: String, bytes: ByteArray) { - accessorImpl.dataFiles[path] = bytes + accessorImpl.dataFiles[com.github.nullptroma.wallenc.domain.datatypes.StorageSyncPaths.normalize(path)] = + bytes } - fun fileBytes(path: String): ByteArray? = accessorImpl.dataFiles[path] + fun fileBytes(path: String): ByteArray? = + accessorImpl.dataFiles[com.github.nullptroma.wallenc.domain.datatypes.StorageSyncPaths.normalize(path)] fun addSyncJournalEntry(entry: StorageSyncJournalEntry) { accessorImpl.syncJournal = StorageSyncJournalMerge.merge( diff --git a/usecases/src/test/java/com/github/nullptroma/wallenc/usecases/fakes/FakeStorageAccessor.kt b/usecases/src/test/java/com/github/nullptroma/wallenc/usecases/fakes/FakeStorageAccessor.kt index 02e7f7d..6d4044a 100644 --- a/usecases/src/test/java/com/github/nullptroma/wallenc/usecases/fakes/FakeStorageAccessor.kt +++ b/usecases/src/test/java/com/github/nullptroma/wallenc/usecases/fakes/FakeStorageAccessor.kt @@ -5,10 +5,14 @@ import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournal import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournalEntry import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournalMerge import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncLock +import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncOperation +import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncPaths +import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncRevision import com.github.nullptroma.wallenc.domain.interfaces.IDirectory import com.github.nullptroma.wallenc.domain.interfaces.IFile import com.github.nullptroma.wallenc.domain.interfaces.IMetaInfo import com.github.nullptroma.wallenc.domain.interfaces.IStorageAccessor +import kotlinx.coroutines.delay import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.MutableSharedFlow import kotlinx.coroutines.flow.MutableStateFlow @@ -23,6 +27,8 @@ import java.time.Instant class FakeStorageAccessor : IStorageAccessor { val dataFiles: MutableMap = mutableMapOf() + + private fun norm(path: String): String = StorageSyncPaths.normalize(path) val trashedPaths: MutableSet = mutableSetOf() private val systemFiles: MutableMap = mutableMapOf() private val _filesUpdates = MutableSharedFlow>(extraBufferCapacity = 16) @@ -31,6 +37,7 @@ class FakeStorageAccessor : IStorageAccessor { var syncLock: StorageSyncLock? = null var acquireLockResult: Boolean = true var readSyncJournalThrows: Throwable? = null + var openReadDelayMs: Long = 0 override val size: StateFlow = MutableStateFlow(0L) override val numberOfFiles: StateFlow = MutableStateFlow(0) @@ -51,7 +58,9 @@ class FakeStorageAccessor : IStorageAccessor { override fun getDirsFlow(path: String): Flow> = emptyFlow() override suspend fun getFileInfo(path: String): IFile { - error("Not implemented in tests") + val key = norm(path) + val bytes = dataFiles[key] ?: throw IllegalStateException("File not found: $path") + return FakeFile(key, bytes.size.toLong()) } override suspend fun getDirInfo(path: String): IDirectory { @@ -64,14 +73,17 @@ class FakeStorageAccessor : IStorageAccessor { override suspend fun touchDir(path: String) = Unit - override suspend fun delete(path: String) { - dataFiles.remove(path) + override suspend fun delete(path: String, recordSyncJournal: Boolean) { + dataFiles.remove(norm(path)) + if (recordSyncJournal) { + recordDeleteJournal(path) + } } override suspend fun openWrite(path: String, recordSyncJournal: Boolean): OutputStream { return object : ByteArrayOutputStream() { override fun close() { - dataFiles[path] = toByteArray() + dataFiles[norm(path)] = toByteArray() _filesUpdates.tryEmit( DataPage( listOf(FakeFile(path)), @@ -84,13 +96,20 @@ class FakeStorageAccessor : IStorageAccessor { } override suspend fun openRead(path: String): InputStream { - val bytes = dataFiles[path] ?: throw IllegalStateException("File not found: $path") + if (openReadDelayMs > 0) { + delay(openReadDelayMs) + } + val bytes = dataFiles[norm(path)] ?: throw IllegalStateException("File not found: $path") return ByteArrayInputStream(bytes) } - override suspend fun moveToTrash(path: String) { - if (path in dataFiles) { - trashedPaths.add(path) + override suspend fun moveToTrash(path: String, recordSyncJournal: Boolean) { + val key = norm(path) + if (key in dataFiles) { + trashedPaths.add(key) + if (recordSyncJournal) { + recordTrashJournal(path) + } } } @@ -112,10 +131,38 @@ class FakeStorageAccessor : IStorageAccessor { return syncJournal } + override suspend fun flushPendingSyncJournal() = Unit + override suspend fun putSyncJournalEntries(entries: StorageSyncJournal) { syncJournal = StorageSyncJournalMerge.merge(syncJournal, entries) } + private suspend fun recordDeleteJournal(path: String) { + appendJournalEntry(path, StorageSyncOperation.DELETE) + } + + private suspend fun recordTrashJournal(path: String) { + appendJournalEntry(path, StorageSyncOperation.TRASH) + } + + private suspend fun appendJournalEntry(path: String, operation: StorageSyncOperation) { + val cleaned = norm(path) + val nextSequence = (syncJournal.values.maxOfOrNull { it.revision.sequence } ?: 0L) + 1L + putSyncJournalEntries( + mapOf( + cleaned to StorageSyncJournalEntry( + path = cleaned, + operation = operation, + revision = StorageSyncRevision( + sequence = nextSequence, + actorId = "fake-actor", + createdAt = Instant.now(), + ), + ), + ), + ) + } + override suspend fun readSyncLock(): StorageSyncLock? = syncLock override suspend fun tryAcquireSyncLock(holderId: String, leaseUntil: Instant): Boolean { @@ -135,9 +182,12 @@ class FakeStorageAccessor : IStorageAccessor { } } -class FakeFile(path: String) : IFile { +class FakeFile( + path: String, + size: Long = 0L, +) : IFile { override val metaInfo: IMetaInfo = object : IMetaInfo { - override val size: Long = 0L + override val size: Long = size override val isDeleted: Boolean = false override val isHidden: Boolean = false override val lastModified: Instant = Instant.now()