fix(sync): исправил журнал при DELETE/TRASH и безопасный flush
Добавил recordSyncJournal для delete/moveToTrash, StorageSyncJournalBuffer с восстановлением pending при ошибке записи и немедленным flush без debounce.
This commit is contained in:
@@ -0,0 +1,112 @@
|
||||
package com.github.nullptroma.wallenc.domain.vault.storages.common
|
||||
|
||||
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournal
|
||||
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournalEntry
|
||||
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournalMerge
|
||||
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncRevision
|
||||
import kotlinx.coroutines.sync.Mutex
|
||||
import kotlinx.coroutines.sync.withLock
|
||||
import java.util.UUID
|
||||
|
||||
/**
|
||||
* Буфер журнала sync: накопление записей и безопасный flush (без потери pending при ошибке записи).
|
||||
*/
|
||||
class StorageSyncJournalBuffer(
|
||||
private val syncActorId: String,
|
||||
private val originStorageUuid: UUID?,
|
||||
private val readJournal: suspend () -> StorageSyncJournal,
|
||||
private val writeJournal: suspend (StorageSyncJournal) -> Unit,
|
||||
) {
|
||||
private val pendingMutex = Mutex()
|
||||
private val flushMutex = Mutex()
|
||||
private var pendingJournalEntries: StorageSyncJournal = emptyMap()
|
||||
|
||||
@Volatile
|
||||
private var sequenceHighWatermark: Long? = null
|
||||
|
||||
suspend fun flushPending() {
|
||||
flushMutex.withLock {
|
||||
flushPendingUnderLock()
|
||||
}
|
||||
}
|
||||
|
||||
suspend fun putEntries(entries: StorageSyncJournal) {
|
||||
flushPending()
|
||||
if (entries.isEmpty()) {
|
||||
return
|
||||
}
|
||||
val current = readJournal()
|
||||
val merged = StorageSyncJournalMerge.merge(current, entries)
|
||||
if (merged == current) {
|
||||
return
|
||||
}
|
||||
writeJournal(merged)
|
||||
refreshSequenceHighWatermark(merged)
|
||||
}
|
||||
|
||||
suspend fun appendEntry(path: String, entry: StorageSyncJournalEntry) {
|
||||
pendingMutex.withLock {
|
||||
pendingJournalEntries = pendingJournalEntries + (path to entry)
|
||||
}
|
||||
flushPending()
|
||||
}
|
||||
|
||||
suspend fun nextSequence(): Long {
|
||||
flushPending()
|
||||
val diskMax = readJournal().values.maxOfOrNull { it.revision.sequence } ?: 0L
|
||||
val pendingMax = pendingMutex.withLock {
|
||||
pendingJournalEntries.values.maxOfOrNull { it.revision.sequence } ?: 0L
|
||||
}
|
||||
val base = maxOf(diskMax, pendingMax, sequenceHighWatermark ?: 0L)
|
||||
val next = base + 1L
|
||||
sequenceHighWatermark = next
|
||||
return next
|
||||
}
|
||||
|
||||
fun buildEntry(
|
||||
path: String,
|
||||
operation: com.github.nullptroma.wallenc.domain.datatypes.StorageSyncOperation,
|
||||
sequence: Long,
|
||||
): StorageSyncJournalEntry {
|
||||
return StorageSyncJournalEntry(
|
||||
path = path,
|
||||
operation = operation,
|
||||
revision = StorageSyncRevision(
|
||||
sequence = sequence,
|
||||
actorId = syncActorId,
|
||||
createdAt = java.time.Instant.now(),
|
||||
),
|
||||
originStorageUuid = originStorageUuid,
|
||||
)
|
||||
}
|
||||
|
||||
private suspend fun flushPendingUnderLock() {
|
||||
val pending = pendingMutex.withLock {
|
||||
if (pendingJournalEntries.isEmpty()) {
|
||||
return
|
||||
}
|
||||
val snapshot = pendingJournalEntries
|
||||
pendingJournalEntries = emptyMap()
|
||||
snapshot
|
||||
}
|
||||
try {
|
||||
val current = readJournal()
|
||||
val merged = StorageSyncJournalMerge.merge(current, pending)
|
||||
if (merged != current) {
|
||||
writeJournal(merged)
|
||||
}
|
||||
refreshSequenceHighWatermark(merged)
|
||||
} catch (e: Exception) {
|
||||
pendingMutex.withLock {
|
||||
pendingJournalEntries = StorageSyncJournalMerge.merge(pending, pendingJournalEntries)
|
||||
}
|
||||
throw e
|
||||
}
|
||||
}
|
||||
|
||||
private fun refreshSequenceHighWatermark(journal: StorageSyncJournal) {
|
||||
val maxSeq = journal.values.maxOfOrNull { it.revision.sequence } ?: return
|
||||
val current = sequenceHighWatermark
|
||||
sequenceHighWatermark = if (current == null) maxSeq else maxOf(current, maxSeq)
|
||||
}
|
||||
}
|
||||
@@ -2,6 +2,7 @@ package com.github.nullptroma.wallenc.domain.vault.storages.encrypt
|
||||
|
||||
import com.fasterxml.jackson.module.kotlin.readValue
|
||||
import com.github.nullptroma.wallenc.domain.errors.WallencException
|
||||
import com.github.nullptroma.wallenc.domain.vault.storages.common.StorageSyncJournalBuffer
|
||||
import com.github.nullptroma.wallenc.domain.vault.storages.common.StorageSyncJournalCodec
|
||||
import com.github.nullptroma.wallenc.domain.vault.storages.common.readSystemFileBytesOrEmpty
|
||||
import com.github.nullptroma.wallenc.domain.vault.utils.CloseHandledStreamExtension.Companion.onClosed
|
||||
@@ -18,14 +19,12 @@ import com.github.nullptroma.wallenc.domain.interfaces.IFile
|
||||
import com.github.nullptroma.wallenc.domain.interfaces.IMetaInfo
|
||||
import com.github.nullptroma.wallenc.domain.interfaces.IStorageAccessor
|
||||
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournal
|
||||
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournalEntry
|
||||
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournalMerge
|
||||
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncPaths
|
||||
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncLock
|
||||
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncOperation
|
||||
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncRevision
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.DisposableHandle
|
||||
import kotlinx.coroutines.runBlocking
|
||||
import kotlinx.coroutines.flow.Flow
|
||||
import kotlinx.coroutines.flow.MutableSharedFlow
|
||||
import kotlinx.coroutines.flow.MutableStateFlow
|
||||
@@ -52,6 +51,12 @@ class EncryptedStorageAccessor(
|
||||
) : IStorageAccessor, DisposableHandle {
|
||||
private val syncActorId = UUID.randomUUID().toString()
|
||||
private val syncLockMutex = Mutex()
|
||||
private val journalBuffer = StorageSyncJournalBuffer(
|
||||
syncActorId = syncActorId,
|
||||
originStorageUuid = null,
|
||||
readJournal = { readSyncJournalUnchecked() },
|
||||
writeJournal = { writeSyncJournal(it) },
|
||||
)
|
||||
private val _size = MutableStateFlow<Long?>(null)
|
||||
override val size: StateFlow<Long?> = _size
|
||||
|
||||
@@ -257,9 +262,11 @@ class EncryptedStorageAccessor(
|
||||
source.touchDir(encryptPath(path))
|
||||
}
|
||||
|
||||
override suspend fun delete(path: String) {
|
||||
source.delete(encryptPath(path))
|
||||
appendSyncEntry(path = path, operation = StorageSyncOperation.DELETE)
|
||||
override suspend fun delete(path: String, recordSyncJournal: Boolean) {
|
||||
source.delete(encryptPath(path), recordSyncJournal = false)
|
||||
if (recordSyncJournal) {
|
||||
appendSyncEntry(path = path, operation = StorageSyncOperation.DELETE)
|
||||
}
|
||||
}
|
||||
|
||||
override suspend fun openWrite(path: String, recordSyncJournal: Boolean): OutputStream =
|
||||
@@ -270,12 +277,17 @@ class EncryptedStorageAccessor(
|
||||
return dataEncryptor.decryptStream(stream)
|
||||
}
|
||||
|
||||
override suspend fun moveToTrash(path: String) {
|
||||
source.moveToTrash(encryptPath(path))
|
||||
appendSyncEntry(path = path, operation = StorageSyncOperation.TRASH)
|
||||
override suspend fun moveToTrash(path: String, recordSyncJournal: Boolean) {
|
||||
source.moveToTrash(encryptPath(path), recordSyncJournal = false)
|
||||
if (recordSyncJournal) {
|
||||
appendSyncEntry(path = path, operation = StorageSyncOperation.TRASH)
|
||||
}
|
||||
}
|
||||
|
||||
override fun dispose() {
|
||||
runBlocking {
|
||||
runCatching { journalBuffer.flushPending() }
|
||||
}
|
||||
dataEncryptor.dispose()
|
||||
}
|
||||
|
||||
@@ -297,16 +309,21 @@ class EncryptedStorageAccessor(
|
||||
}
|
||||
|
||||
override suspend fun readSyncJournal(): StorageSyncJournal {
|
||||
val bytes = readSystemFileBytesOrEmpty { openReadSystemFile(SYNC_JOURNAL_FILENAME) }
|
||||
return StorageSyncJournalCodec.read(jackson, bytes)
|
||||
journalBuffer.flushPending()
|
||||
return readSyncJournalUnchecked()
|
||||
}
|
||||
|
||||
override suspend fun flushPendingSyncJournal() {
|
||||
journalBuffer.flushPending()
|
||||
}
|
||||
|
||||
override suspend fun putSyncJournalEntries(entries: StorageSyncJournal) {
|
||||
if (entries.isEmpty()) {
|
||||
return
|
||||
}
|
||||
val merged = StorageSyncJournalMerge.merge(readSyncJournal(), entries)
|
||||
writeSyncJournal(merged)
|
||||
journalBuffer.putEntries(entries)
|
||||
}
|
||||
|
||||
private suspend fun readSyncJournalUnchecked(): StorageSyncJournal {
|
||||
val bytes = readSystemFileBytesOrEmpty { openReadSystemFile(SYNC_JOURNAL_FILENAME) }
|
||||
return StorageSyncJournalCodec.read(jackson, bytes)
|
||||
}
|
||||
|
||||
private suspend fun writeSyncJournal(journal: StorageSyncJournal) {
|
||||
@@ -373,21 +390,9 @@ class EncryptedStorageAccessor(
|
||||
if (!StorageSyncPaths.isSyncableUserPath(cleanedPath)) {
|
||||
return
|
||||
}
|
||||
val journal = readSyncJournal()
|
||||
val nextSequence = (journal.values.maxOfOrNull { it.revision.sequence } ?: 0L) + 1L
|
||||
putSyncJournalEntries(
|
||||
mapOf(
|
||||
cleanedPath to StorageSyncJournalEntry(
|
||||
path = cleanedPath,
|
||||
operation = operation,
|
||||
revision = StorageSyncRevision(
|
||||
sequence = nextSequence,
|
||||
actorId = syncActorId,
|
||||
createdAt = Instant.now(),
|
||||
),
|
||||
),
|
||||
),
|
||||
)
|
||||
val sequence = journalBuffer.nextSequence()
|
||||
val entry = journalBuffer.buildEntry(cleanedPath, operation, sequence)
|
||||
journalBuffer.appendEntry(cleanedPath, entry)
|
||||
}
|
||||
|
||||
private suspend fun openWriteInternal(path: String, recordJournal: Boolean): OutputStream {
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package com.github.nullptroma.wallenc.domain.vault.storages.local
|
||||
|
||||
import com.github.nullptroma.wallenc.domain.errors.WallencException
|
||||
import com.github.nullptroma.wallenc.domain.vault.storages.common.StorageSyncJournalBuffer
|
||||
import com.github.nullptroma.wallenc.domain.vault.storages.common.StorageSyncJournalCodec
|
||||
import com.github.nullptroma.wallenc.domain.vault.storages.common.readSystemFileBytesOrEmpty
|
||||
|
||||
@@ -26,6 +27,8 @@ import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncRevision
|
||||
import kotlinx.coroutines.CoroutineDispatcher
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.flow.Flow
|
||||
import kotlinx.coroutines.sync.Mutex
|
||||
import kotlinx.coroutines.sync.withLock
|
||||
import kotlinx.coroutines.flow.MutableSharedFlow
|
||||
import kotlinx.coroutines.flow.MutableStateFlow
|
||||
import kotlinx.coroutines.flow.SharedFlow
|
||||
@@ -73,6 +76,13 @@ class LocalStorageAccessor(
|
||||
private val _dirsUpdates = MutableSharedFlow<DataPage<IDirectory>>()
|
||||
override val dirsUpdates: SharedFlow<DataPage<IDirectory>> = _dirsUpdates
|
||||
|
||||
private val journalBuffer = StorageSyncJournalBuffer(
|
||||
syncActorId = syncActorId,
|
||||
originStorageUuid = null,
|
||||
readJournal = { readSyncJournalUnchecked() },
|
||||
writeJournal = { writeSyncJournal(it) },
|
||||
)
|
||||
|
||||
suspend fun init() = withContext(ioDispatcher) {
|
||||
// запускам сканирование хранилища
|
||||
scanSizeAndNumOfFiles()
|
||||
@@ -513,7 +523,7 @@ class LocalStorageAccessor(
|
||||
createDir(path)
|
||||
}
|
||||
|
||||
override suspend fun delete(path: String) = withContext(ioDispatcher) {
|
||||
override suspend fun delete(path: String, recordSyncJournal: Boolean) = withContext(ioDispatcher) {
|
||||
if (path == "/" || path.isBlank()) {
|
||||
throw WallencException.Storage.DeleteRootForbidden()
|
||||
}
|
||||
@@ -523,7 +533,9 @@ class LocalStorageAccessor(
|
||||
else pair.file.delete()
|
||||
pair.metaFile.delete()
|
||||
scanSizeAndNumOfFiles()
|
||||
appendSyncEntry(path = path, operation = StorageSyncOperation.DELETE)
|
||||
if (recordSyncJournal) {
|
||||
appendSyncEntry(path = path, operation = StorageSyncOperation.DELETE)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -548,12 +560,14 @@ class LocalStorageAccessor(
|
||||
return@withContext pair.file.inputStream()
|
||||
}
|
||||
|
||||
override suspend fun moveToTrash(path: String) = withContext(ioDispatcher) {
|
||||
override suspend fun moveToTrash(path: String, recordSyncJournal: Boolean) = withContext(ioDispatcher) {
|
||||
val pair = LocalStorageFilePair.from(_filesystemBasePath, path)
|
||||
?: throw WallencException.Storage.FileNotFound()
|
||||
val newMeta = pair.meta.copy(isDeleted = true)
|
||||
writeMeta(pair.metaFile, newMeta)
|
||||
appendSyncEntry(path = path, operation = StorageSyncOperation.TRASH)
|
||||
if (recordSyncJournal) {
|
||||
appendSyncEntry(path = path, operation = StorageSyncOperation.TRASH)
|
||||
}
|
||||
}
|
||||
|
||||
override suspend fun openReadSystemFile(name: String): InputStream = withContext(ioDispatcher) {
|
||||
@@ -579,16 +593,21 @@ class LocalStorageAccessor(
|
||||
}
|
||||
|
||||
override suspend fun readSyncJournal(): StorageSyncJournal = withContext(ioDispatcher) {
|
||||
val bytes = readSystemFileBytesOrEmpty { openReadSystemFile(SYNC_JOURNAL_FILENAME) }
|
||||
StorageSyncJournalCodec.read(jackson, bytes)
|
||||
journalBuffer.flushPending()
|
||||
readSyncJournalUnchecked()
|
||||
}
|
||||
|
||||
override suspend fun flushPendingSyncJournal() = withContext(ioDispatcher) {
|
||||
journalBuffer.flushPending()
|
||||
}
|
||||
|
||||
override suspend fun putSyncJournalEntries(entries: StorageSyncJournal) = withContext(ioDispatcher) {
|
||||
if (entries.isEmpty()) {
|
||||
return@withContext
|
||||
}
|
||||
val merged = StorageSyncJournalMerge.merge(readSyncJournal(), entries)
|
||||
writeSyncJournal(merged)
|
||||
journalBuffer.putEntries(entries)
|
||||
}
|
||||
|
||||
private suspend fun readSyncJournalUnchecked(): StorageSyncJournal {
|
||||
val bytes = readSystemFileBytesOrEmpty { openReadSystemFile(SYNC_JOURNAL_FILENAME) }
|
||||
return StorageSyncJournalCodec.read(jackson, bytes)
|
||||
}
|
||||
|
||||
private suspend fun writeSyncJournal(journal: StorageSyncJournal) {
|
||||
@@ -721,21 +740,9 @@ class LocalStorageAccessor(
|
||||
if (!StorageSyncPaths.isSyncableUserPath(cleanedPath)) {
|
||||
return
|
||||
}
|
||||
val journal = readSyncJournal()
|
||||
val nextSequence = (journal.values.maxOfOrNull { it.revision.sequence } ?: 0L) + 1L
|
||||
putSyncJournalEntries(
|
||||
mapOf(
|
||||
cleanedPath to StorageSyncJournalEntry(
|
||||
path = cleanedPath,
|
||||
operation = operation,
|
||||
revision = StorageSyncRevision(
|
||||
sequence = nextSequence,
|
||||
actorId = syncActorId,
|
||||
createdAt = Instant.now(),
|
||||
),
|
||||
),
|
||||
),
|
||||
)
|
||||
val sequence = journalBuffer.nextSequence()
|
||||
val entry = journalBuffer.buildEntry(cleanedPath, operation, sequence)
|
||||
journalBuffer.appendEntry(cleanedPath, entry)
|
||||
}
|
||||
|
||||
companion object {
|
||||
|
||||
@@ -2,6 +2,7 @@ package com.github.nullptroma.wallenc.domain.vault.storages.yandex
|
||||
|
||||
import com.github.nullptroma.wallenc.domain.errors.WallencException
|
||||
import com.github.nullptroma.wallenc.domain.vault.errors.toVaultWallencException
|
||||
import com.github.nullptroma.wallenc.domain.vault.storages.common.StorageSyncJournalBuffer
|
||||
import com.github.nullptroma.wallenc.domain.vault.storages.common.StorageSyncJournalCodec
|
||||
import com.github.nullptroma.wallenc.domain.vault.storages.common.readSystemFileBytesOrEmpty
|
||||
|
||||
@@ -19,17 +20,16 @@ import com.github.nullptroma.wallenc.domain.interfaces.IDirectory
|
||||
import com.github.nullptroma.wallenc.domain.interfaces.IFile
|
||||
import com.github.nullptroma.wallenc.domain.interfaces.IStorageAccessor
|
||||
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournal
|
||||
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournalEntry
|
||||
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournalMerge
|
||||
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncPaths
|
||||
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncLock
|
||||
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncOperation
|
||||
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncRevision
|
||||
import kotlinx.coroutines.CancellationException
|
||||
import kotlinx.coroutines.CoroutineDispatcher
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.Job
|
||||
import kotlinx.coroutines.delay
|
||||
import kotlin.coroutines.coroutineContext
|
||||
import kotlinx.coroutines.ensureActive
|
||||
import kotlinx.coroutines.flow.Flow
|
||||
import kotlinx.coroutines.flow.MutableSharedFlow
|
||||
import kotlinx.coroutines.flow.MutableStateFlow
|
||||
@@ -42,6 +42,7 @@ import kotlinx.coroutines.flow.flowOn
|
||||
import kotlinx.coroutines.flow.stateIn
|
||||
import kotlinx.coroutines.launch
|
||||
import kotlinx.coroutines.runBlocking
|
||||
import kotlinx.coroutines.withContext
|
||||
import kotlinx.coroutines.sync.Mutex
|
||||
import kotlinx.coroutines.sync.withLock
|
||||
import kotlinx.coroutines.withContext
|
||||
@@ -95,6 +96,13 @@ class YandexStorageAccessor(
|
||||
|
||||
private var statsPersistJob: Job? = null
|
||||
|
||||
private val journalBuffer = StorageSyncJournalBuffer(
|
||||
syncActorId = syncActorId,
|
||||
originStorageUuid = storageUuid,
|
||||
readJournal = { readSyncJournalUnchecked() },
|
||||
writeJournal = { writeSyncJournal(it) },
|
||||
)
|
||||
|
||||
@Volatile
|
||||
private var systemDirEnsured: Boolean = false
|
||||
|
||||
@@ -240,6 +248,7 @@ class YandexStorageAccessor(
|
||||
val queue = ArrayDeque<String>()
|
||||
queue.add(relDir)
|
||||
while (queue.isNotEmpty()) {
|
||||
coroutineContext.ensureActive()
|
||||
val rel = queue.removeFirst()
|
||||
if (isSystemRel(rel)) continue
|
||||
val (files, dirs) = listImmediateChildren(rel)
|
||||
@@ -285,6 +294,7 @@ class YandexStorageAccessor(
|
||||
val dirs = mutableListOf<IDirectory>()
|
||||
var offset = 0
|
||||
while (true) {
|
||||
coroutineContext.ensureActive()
|
||||
val res = guard { repo.list(diskPath, API_LIST_LIMIT, offset) }
|
||||
val items = res.embedded?.items.orEmpty()
|
||||
for (it in items) {
|
||||
@@ -303,7 +313,7 @@ class YandexStorageAccessor(
|
||||
}
|
||||
|
||||
private suspend fun getMetadataAfterWrite(diskPath: String): ResourceDto {
|
||||
val maxAttempts = 6
|
||||
val maxAttempts = 3
|
||||
repeat(maxAttempts) { attempt ->
|
||||
try {
|
||||
return guard { repo.get(diskPath) }
|
||||
@@ -488,8 +498,8 @@ class YandexStorageAccessor(
|
||||
if (created) {
|
||||
_numberOfFiles.value = (_numberOfFiles.value ?: 0) + 1
|
||||
persistStatsImmediate()
|
||||
appendSyncEntry(path = path, operation = StorageSyncOperation.UPSERT)
|
||||
}
|
||||
appendSyncEntry(path = path, operation = StorageSyncOperation.UPSERT)
|
||||
}
|
||||
|
||||
override suspend fun touchDir(path: String): Unit = withContext(ioDispatcher) {
|
||||
@@ -506,7 +516,7 @@ class YandexStorageAccessor(
|
||||
}
|
||||
}
|
||||
|
||||
override suspend fun delete(path: String) = withContext(ioDispatcher) {
|
||||
override suspend fun delete(path: String, recordSyncJournal: Boolean) = withContext(ioDispatcher) {
|
||||
if (path == "/" || path.isBlank()) {
|
||||
throw WallencException.Storage.DeleteRootForbidden()
|
||||
}
|
||||
@@ -528,7 +538,9 @@ class YandexStorageAccessor(
|
||||
}
|
||||
guard { repo.delete(diskPath, permanently = true) }
|
||||
scheduleStatsPersist()
|
||||
appendSyncEntry(path = path, operation = StorageSyncOperation.DELETE)
|
||||
if (recordSyncJournal) {
|
||||
appendSyncEntry(path = path, operation = StorageSyncOperation.DELETE)
|
||||
}
|
||||
}
|
||||
|
||||
override suspend fun openWrite(path: String, recordSyncJournal: Boolean): OutputStream = withContext(ioDispatcher) {
|
||||
@@ -546,9 +558,11 @@ class YandexStorageAccessor(
|
||||
guard { repo.openDownloadStream(toDiskPath(path)) }
|
||||
}
|
||||
|
||||
override suspend fun moveToTrash(path: String) = withContext(ioDispatcher) {
|
||||
override suspend fun moveToTrash(path: String, recordSyncJournal: Boolean) = withContext(ioDispatcher) {
|
||||
patchCustomProps(path, mapOf(PROP_DELETED to "true"))
|
||||
appendSyncEntry(path = path, operation = StorageSyncOperation.TRASH)
|
||||
if (recordSyncJournal) {
|
||||
appendSyncEntry(path = path, operation = StorageSyncOperation.TRASH)
|
||||
}
|
||||
}
|
||||
|
||||
override suspend fun openReadSystemFile(name: String): InputStream = withContext(ioDispatcher) {
|
||||
@@ -576,16 +590,21 @@ class YandexStorageAccessor(
|
||||
}
|
||||
|
||||
override suspend fun readSyncJournal(): StorageSyncJournal = withContext(ioDispatcher) {
|
||||
val bytes = readSystemFileBytesOrEmpty { openReadSystemFile(SYNC_JOURNAL_FILENAME) }
|
||||
StorageSyncJournalCodec.read(statsMapper, bytes)
|
||||
journalBuffer.flushPending()
|
||||
readSyncJournalUnchecked()
|
||||
}
|
||||
|
||||
override suspend fun flushPendingSyncJournal() = withContext(ioDispatcher) {
|
||||
journalBuffer.flushPending()
|
||||
}
|
||||
|
||||
override suspend fun putSyncJournalEntries(entries: StorageSyncJournal) = withContext(ioDispatcher) {
|
||||
if (entries.isEmpty()) {
|
||||
return@withContext
|
||||
}
|
||||
val merged = StorageSyncJournalMerge.merge(readSyncJournal(), entries)
|
||||
writeSyncJournal(merged)
|
||||
journalBuffer.putEntries(entries)
|
||||
}
|
||||
|
||||
private suspend fun readSyncJournalUnchecked(): StorageSyncJournal {
|
||||
val bytes = readSystemFileBytesOrEmpty { openReadSystemFile(SYNC_JOURNAL_FILENAME) }
|
||||
return StorageSyncJournalCodec.read(statsMapper, bytes)
|
||||
}
|
||||
|
||||
private suspend fun writeSyncJournal(journal: StorageSyncJournal) {
|
||||
@@ -668,8 +687,10 @@ class YandexStorageAccessor(
|
||||
* Выполняется из [OutputStream.close]; ошибки upload пробрасываются вызывающему коду.
|
||||
*/
|
||||
private fun runCommitAfterStreamClosed(block: suspend () -> Unit) {
|
||||
runBlocking(ioDispatcher) {
|
||||
block()
|
||||
runBlocking {
|
||||
withContext(ioDispatcher) {
|
||||
block()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -710,22 +731,9 @@ class YandexStorageAccessor(
|
||||
if (!StorageSyncPaths.isSyncableUserPath(cleanedPath)) {
|
||||
return
|
||||
}
|
||||
val journal = readSyncJournal()
|
||||
val nextSequence = (journal.values.maxOfOrNull { it.revision.sequence } ?: 0L) + 1L
|
||||
putSyncJournalEntries(
|
||||
mapOf(
|
||||
cleanedPath to StorageSyncJournalEntry(
|
||||
path = cleanedPath,
|
||||
operation = operation,
|
||||
revision = StorageSyncRevision(
|
||||
sequence = nextSequence,
|
||||
actorId = syncActorId,
|
||||
createdAt = Instant.now(),
|
||||
),
|
||||
originStorageUuid = storageUuid,
|
||||
),
|
||||
),
|
||||
)
|
||||
val sequence = journalBuffer.nextSequence()
|
||||
val entry = journalBuffer.buildEntry(cleanedPath, operation, sequence)
|
||||
journalBuffer.appendEntry(cleanedPath, entry)
|
||||
}
|
||||
|
||||
private suspend fun touchParentDirs(path: String) {
|
||||
|
||||
@@ -0,0 +1,41 @@
|
||||
package com.github.nullptroma.wallenc.domain.vault.storages.common
|
||||
|
||||
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournalEntry
|
||||
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncOperation
|
||||
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncRevision
|
||||
import kotlinx.coroutines.runBlocking
|
||||
import org.junit.Assert.assertEquals
|
||||
import org.junit.Assert.assertTrue
|
||||
import org.junit.Test
|
||||
import java.time.Instant
|
||||
import java.util.concurrent.atomic.AtomicReference
|
||||
|
||||
class StorageSyncJournalBufferTest {
|
||||
|
||||
@Test
|
||||
fun flushRestoresPendingOnWriteFailure() = runBlocking {
|
||||
val disk = AtomicReference(emptyMap<String, StorageSyncJournalEntry>())
|
||||
var shouldFail = true
|
||||
val buffer = StorageSyncJournalBuffer(
|
||||
syncActorId = "actor",
|
||||
originStorageUuid = null,
|
||||
readJournal = { disk.get() },
|
||||
writeJournal = {
|
||||
if (shouldFail) {
|
||||
error("disk unavailable")
|
||||
}
|
||||
disk.set(it)
|
||||
},
|
||||
)
|
||||
val entry = buffer.buildEntry("/a.txt", StorageSyncOperation.UPSERT, 1L)
|
||||
try {
|
||||
buffer.appendEntry("/a.txt", entry)
|
||||
} catch (_: IllegalStateException) {
|
||||
// expected
|
||||
}
|
||||
shouldFail = false
|
||||
buffer.flushPending()
|
||||
assertTrue(disk.get().containsKey("/a.txt"))
|
||||
assertEquals(1L, disk.get()["/a.txt"]?.revision?.sequence)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user