refactor(sync): перевёл журнал на map по пути и убрал цикл debounce-sync
Журнал хранится как словарь path→entry, служебные пути исключены из sync. Apply пишет файлы без записи в журнал; bootstrap не триггерит sync во время работы.
This commit is contained in:
@@ -0,0 +1,29 @@
|
||||
package com.github.nullptroma.wallenc.domain.vault.storages.common
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper
|
||||
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournal
|
||||
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournalEntry
|
||||
import java.util.LinkedHashMap
|
||||
|
||||
internal object StorageSyncJournalCodec {
|
||||
private val journalJavaType = ObjectMapper().typeFactory.constructMapType(
|
||||
LinkedHashMap::class.java,
|
||||
String::class.java,
|
||||
StorageSyncJournalEntry::class.java,
|
||||
)
|
||||
|
||||
fun read(mapper: ObjectMapper, bytes: ByteArray): StorageSyncJournal {
|
||||
if (bytes.isEmpty()) {
|
||||
return emptyMap()
|
||||
}
|
||||
return runCatching {
|
||||
@Suppress("UNCHECKED_CAST")
|
||||
mapper.readValue(bytes, journalJavaType) as StorageSyncJournal
|
||||
}.getOrElse {
|
||||
emptyMap<String, StorageSyncJournalEntry>()
|
||||
}
|
||||
}
|
||||
|
||||
fun write(mapper: ObjectMapper, journal: StorageSyncJournal): ByteArray =
|
||||
mapper.writeValueAsBytes(journal)
|
||||
}
|
||||
@@ -1,6 +1,8 @@
|
||||
package com.github.nullptroma.wallenc.domain.vault.storages.encrypt
|
||||
|
||||
import com.fasterxml.jackson.module.kotlin.readValue
|
||||
import com.github.nullptroma.wallenc.domain.errors.WallencException
|
||||
import com.github.nullptroma.wallenc.domain.vault.storages.common.StorageSyncJournalCodec
|
||||
import com.github.nullptroma.wallenc.domain.vault.storages.common.readSystemFileBytesOrEmpty
|
||||
import com.github.nullptroma.wallenc.domain.vault.utils.CloseHandledStreamExtension.Companion.onClosed
|
||||
import com.github.nullptroma.wallenc.domain.vault.utils.CloseHandledStreamExtension.Companion.onClosing
|
||||
@@ -15,7 +17,10 @@ import com.github.nullptroma.wallenc.domain.interfaces.IDirectory
|
||||
import com.github.nullptroma.wallenc.domain.interfaces.IFile
|
||||
import com.github.nullptroma.wallenc.domain.interfaces.IMetaInfo
|
||||
import com.github.nullptroma.wallenc.domain.interfaces.IStorageAccessor
|
||||
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournal
|
||||
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournalEntry
|
||||
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournalMerge
|
||||
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncPaths
|
||||
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncLock
|
||||
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncOperation
|
||||
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncRevision
|
||||
@@ -257,7 +262,8 @@ class EncryptedStorageAccessor(
|
||||
appendSyncEntry(path = path, operation = StorageSyncOperation.DELETE)
|
||||
}
|
||||
|
||||
override suspend fun openWrite(path: String): OutputStream = openWriteInternal(path, recordJournal = true)
|
||||
override suspend fun openWrite(path: String, recordSyncJournal: Boolean): OutputStream =
|
||||
openWriteInternal(path, recordJournal = recordSyncJournal)
|
||||
|
||||
override suspend fun openRead(path: String): InputStream {
|
||||
val stream = source.openRead(encryptPath(path))
|
||||
@@ -290,35 +296,22 @@ class EncryptedStorageAccessor(
|
||||
}
|
||||
}
|
||||
|
||||
override suspend fun readSyncJournal(): List<StorageSyncJournalEntry> {
|
||||
override suspend fun readSyncJournal(): StorageSyncJournal {
|
||||
val bytes = readSystemFileBytesOrEmpty { openReadSystemFile(SYNC_JOURNAL_FILENAME) }
|
||||
if (bytes.isEmpty()) {
|
||||
return emptyList()
|
||||
}
|
||||
return runCatching {
|
||||
val journalType = jackson.typeFactory.constructCollectionType(
|
||||
List::class.java,
|
||||
StorageSyncJournalEntry::class.java,
|
||||
)
|
||||
jackson.readValue<List<StorageSyncJournalEntry>>(bytes, journalType)
|
||||
}.getOrElse {
|
||||
emptyList()
|
||||
}
|
||||
return StorageSyncJournalCodec.read(jackson, bytes)
|
||||
}
|
||||
|
||||
override suspend fun appendSyncJournal(entries: List<StorageSyncJournalEntry>) {
|
||||
override suspend fun putSyncJournalEntries(entries: StorageSyncJournal) {
|
||||
if (entries.isEmpty()) {
|
||||
return
|
||||
}
|
||||
val next = readSyncJournal().toMutableList().apply { addAll(entries) }
|
||||
openWriteSystemFile(SYNC_JOURNAL_FILENAME).use { out ->
|
||||
jackson.writeValue(out, next)
|
||||
}
|
||||
val merged = StorageSyncJournalMerge.merge(readSyncJournal(), entries)
|
||||
writeSyncJournal(merged)
|
||||
}
|
||||
|
||||
override suspend fun rewriteSyncJournal(entries: List<StorageSyncJournalEntry>) {
|
||||
private suspend fun writeSyncJournal(journal: StorageSyncJournal) {
|
||||
openWriteSystemFile(SYNC_JOURNAL_FILENAME).use { out ->
|
||||
jackson.writeValue(out, entries)
|
||||
jackson.writeValue(out, journal)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -376,15 +369,15 @@ class EncryptedStorageAccessor(
|
||||
}
|
||||
|
||||
private suspend fun appendSyncEntry(path: String, operation: StorageSyncOperation) {
|
||||
val cleanedPath = if (path.startsWith("/")) path else "/$path"
|
||||
if (cleanedPath.startsWith("/$systemHiddenDirName/")) {
|
||||
val cleanedPath = StorageSyncPaths.normalize(path)
|
||||
if (!StorageSyncPaths.isSyncableUserPath(cleanedPath)) {
|
||||
return
|
||||
}
|
||||
val entries = readSyncJournal()
|
||||
val nextSequence = (entries.maxOfOrNull { it.revision.sequence } ?: 0L) + 1L
|
||||
appendSyncJournal(
|
||||
listOf(
|
||||
StorageSyncJournalEntry(
|
||||
val journal = readSyncJournal()
|
||||
val nextSequence = (journal.values.maxOfOrNull { it.revision.sequence } ?: 0L) + 1L
|
||||
putSyncJournalEntries(
|
||||
mapOf(
|
||||
cleanedPath to StorageSyncJournalEntry(
|
||||
path = cleanedPath,
|
||||
operation = operation,
|
||||
revision = StorageSyncRevision(
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package com.github.nullptroma.wallenc.domain.vault.storages.local
|
||||
|
||||
import com.github.nullptroma.wallenc.domain.errors.WallencException
|
||||
import com.github.nullptroma.wallenc.domain.vault.storages.common.StorageSyncJournalCodec
|
||||
import com.github.nullptroma.wallenc.domain.vault.storages.common.readSystemFileBytesOrEmpty
|
||||
|
||||
import com.fasterxml.jackson.core.JacksonException
|
||||
@@ -15,9 +16,12 @@ import com.github.nullptroma.wallenc.domain.interfaces.IDirectory
|
||||
import com.github.nullptroma.wallenc.domain.interfaces.IFile
|
||||
import com.github.nullptroma.wallenc.domain.interfaces.IMetaInfo
|
||||
import com.github.nullptroma.wallenc.domain.interfaces.IStorageAccessor
|
||||
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournal
|
||||
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournalEntry
|
||||
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournalMerge
|
||||
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncLock
|
||||
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncOperation
|
||||
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncPaths
|
||||
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncRevision
|
||||
import kotlinx.coroutines.CoroutineDispatcher
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
@@ -523,7 +527,7 @@ class LocalStorageAccessor(
|
||||
}
|
||||
}
|
||||
|
||||
override suspend fun openWrite(path: String): OutputStream = withContext(ioDispatcher) {
|
||||
override suspend fun openWrite(path: String, recordSyncJournal: Boolean): OutputStream = withContext(ioDispatcher) {
|
||||
touchFileInternal(path, recordJournal = false)
|
||||
val pair = LocalStorageFilePair.from(_filesystemBasePath, path)
|
||||
?: throw WallencException.Storage.FileNotFound()
|
||||
@@ -531,7 +535,9 @@ class LocalStorageAccessor(
|
||||
CoroutineScope(ioDispatcher).launch {
|
||||
touchFileInternal(path, recordJournal = false)
|
||||
scanSizeAndNumOfFiles()
|
||||
appendSyncEntry(path = path, operation = StorageSyncOperation.UPSERT)
|
||||
if (recordSyncJournal) {
|
||||
appendSyncEntry(path = path, operation = StorageSyncOperation.UPSERT)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -572,33 +578,22 @@ class LocalStorageAccessor(
|
||||
return@withContext file.outputStream()
|
||||
}
|
||||
|
||||
override suspend fun readSyncJournal(): List<StorageSyncJournalEntry> = withContext(ioDispatcher) {
|
||||
override suspend fun readSyncJournal(): StorageSyncJournal = withContext(ioDispatcher) {
|
||||
val bytes = readSystemFileBytesOrEmpty { openReadSystemFile(SYNC_JOURNAL_FILENAME) }
|
||||
if (bytes.isEmpty()) {
|
||||
return@withContext emptyList()
|
||||
}
|
||||
return@withContext runCatching {
|
||||
jackson.readValue<List<StorageSyncJournalEntry>>(bytes)
|
||||
}.getOrElse {
|
||||
emptyList()
|
||||
}
|
||||
StorageSyncJournalCodec.read(jackson, bytes)
|
||||
}
|
||||
|
||||
override suspend fun appendSyncJournal(entries: List<StorageSyncJournalEntry>) = withContext(ioDispatcher) {
|
||||
override suspend fun putSyncJournalEntries(entries: StorageSyncJournal) = withContext(ioDispatcher) {
|
||||
if (entries.isEmpty()) {
|
||||
return@withContext
|
||||
}
|
||||
val next = readSyncJournal().toMutableList().apply {
|
||||
addAll(entries)
|
||||
}
|
||||
openWriteSystemFile(SYNC_JOURNAL_FILENAME).use { out ->
|
||||
jackson.writeValue(out, next)
|
||||
}
|
||||
val merged = StorageSyncJournalMerge.merge(readSyncJournal(), entries)
|
||||
writeSyncJournal(merged)
|
||||
}
|
||||
|
||||
override suspend fun rewriteSyncJournal(entries: List<StorageSyncJournalEntry>) = withContext(ioDispatcher) {
|
||||
private suspend fun writeSyncJournal(journal: StorageSyncJournal) {
|
||||
openWriteSystemFile(SYNC_JOURNAL_FILENAME).use { out ->
|
||||
jackson.writeValue(out, entries)
|
||||
jackson.writeValue(out, journal)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -722,19 +717,25 @@ class LocalStorageAccessor(
|
||||
}
|
||||
|
||||
private suspend fun appendSyncEntry(path: String, operation: StorageSyncOperation) {
|
||||
val cleanedPath = if (path.startsWith("/")) path else "/$path"
|
||||
val entries = readSyncJournal()
|
||||
val nextSequence = (entries.maxOfOrNull { it.revision.sequence } ?: 0L) + 1L
|
||||
val entry = StorageSyncJournalEntry(
|
||||
path = cleanedPath,
|
||||
operation = operation,
|
||||
revision = StorageSyncRevision(
|
||||
sequence = nextSequence,
|
||||
actorId = syncActorId,
|
||||
createdAt = Instant.now(),
|
||||
val cleanedPath = StorageSyncPaths.normalize(path)
|
||||
if (!StorageSyncPaths.isSyncableUserPath(cleanedPath)) {
|
||||
return
|
||||
}
|
||||
val journal = readSyncJournal()
|
||||
val nextSequence = (journal.values.maxOfOrNull { it.revision.sequence } ?: 0L) + 1L
|
||||
putSyncJournalEntries(
|
||||
mapOf(
|
||||
cleanedPath to StorageSyncJournalEntry(
|
||||
path = cleanedPath,
|
||||
operation = operation,
|
||||
revision = StorageSyncRevision(
|
||||
sequence = nextSequence,
|
||||
actorId = syncActorId,
|
||||
createdAt = Instant.now(),
|
||||
),
|
||||
),
|
||||
),
|
||||
)
|
||||
appendSyncJournal(listOf(entry))
|
||||
}
|
||||
|
||||
companion object {
|
||||
|
||||
@@ -2,6 +2,7 @@ package com.github.nullptroma.wallenc.domain.vault.storages.yandex
|
||||
|
||||
import com.github.nullptroma.wallenc.domain.errors.WallencException
|
||||
import com.github.nullptroma.wallenc.domain.vault.errors.toVaultWallencException
|
||||
import com.github.nullptroma.wallenc.domain.vault.storages.common.StorageSyncJournalCodec
|
||||
import com.github.nullptroma.wallenc.domain.vault.storages.common.readSystemFileBytesOrEmpty
|
||||
|
||||
import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
|
||||
@@ -17,7 +18,10 @@ import com.github.nullptroma.wallenc.domain.datatypes.DataPage
|
||||
import com.github.nullptroma.wallenc.domain.interfaces.IDirectory
|
||||
import com.github.nullptroma.wallenc.domain.interfaces.IFile
|
||||
import com.github.nullptroma.wallenc.domain.interfaces.IStorageAccessor
|
||||
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournal
|
||||
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournalEntry
|
||||
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournalMerge
|
||||
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncPaths
|
||||
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncLock
|
||||
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncOperation
|
||||
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncRevision
|
||||
@@ -527,13 +531,13 @@ class YandexStorageAccessor(
|
||||
appendSyncEntry(path = path, operation = StorageSyncOperation.DELETE)
|
||||
}
|
||||
|
||||
override suspend fun openWrite(path: String): OutputStream = withContext(ioDispatcher) {
|
||||
override suspend fun openWrite(path: String, recordSyncJournal: Boolean): OutputStream = withContext(ioDispatcher) {
|
||||
touchParentDirs(path)
|
||||
val tmp = File.createTempFile("wallenc-yandex-", ".upload")
|
||||
val fos = FileOutputStream(tmp)
|
||||
fos.onClosed {
|
||||
runCommitAfterStreamClosed {
|
||||
commitUploadedFile(path, tmp)
|
||||
commitUploadedFile(path, tmp, recordSyncJournal)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -571,37 +575,22 @@ class YandexStorageAccessor(
|
||||
}
|
||||
}
|
||||
|
||||
override suspend fun readSyncJournal(): List<StorageSyncJournalEntry> = withContext(ioDispatcher) {
|
||||
override suspend fun readSyncJournal(): StorageSyncJournal = withContext(ioDispatcher) {
|
||||
val bytes = readSystemFileBytesOrEmpty { openReadSystemFile(SYNC_JOURNAL_FILENAME) }
|
||||
if (bytes.isEmpty()) {
|
||||
return@withContext emptyList()
|
||||
}
|
||||
return@withContext runCatching {
|
||||
val journalType = statsMapper.typeFactory.constructCollectionType(
|
||||
List::class.java,
|
||||
StorageSyncJournalEntry::class.java,
|
||||
)
|
||||
statsMapper.readValue<List<StorageSyncJournalEntry>>(bytes, journalType)
|
||||
}.getOrElse {
|
||||
emptyList()
|
||||
}
|
||||
StorageSyncJournalCodec.read(statsMapper, bytes)
|
||||
}
|
||||
|
||||
override suspend fun appendSyncJournal(entries: List<StorageSyncJournalEntry>) = withContext(ioDispatcher) {
|
||||
override suspend fun putSyncJournalEntries(entries: StorageSyncJournal) = withContext(ioDispatcher) {
|
||||
if (entries.isEmpty()) {
|
||||
return@withContext
|
||||
}
|
||||
val next = readSyncJournal().toMutableList().apply {
|
||||
addAll(entries)
|
||||
}
|
||||
openWriteSystemFile(SYNC_JOURNAL_FILENAME).use { out ->
|
||||
statsMapper.writeValue(out, next)
|
||||
}
|
||||
val merged = StorageSyncJournalMerge.merge(readSyncJournal(), entries)
|
||||
writeSyncJournal(merged)
|
||||
}
|
||||
|
||||
override suspend fun rewriteSyncJournal(entries: List<StorageSyncJournalEntry>) = withContext(ioDispatcher) {
|
||||
private suspend fun writeSyncJournal(journal: StorageSyncJournal) {
|
||||
openWriteSystemFile(SYNC_JOURNAL_FILENAME).use { out ->
|
||||
statsMapper.writeValue(out, entries)
|
||||
statsMapper.writeValue(out, journal)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -684,7 +673,7 @@ class YandexStorageAccessor(
|
||||
}
|
||||
}
|
||||
|
||||
private suspend fun commitUploadedFile(path: String, tmp: File) {
|
||||
private suspend fun commitUploadedFile(path: String, tmp: File, recordSyncJournal: Boolean) {
|
||||
try {
|
||||
val diskPath = toDiskPath(path)
|
||||
val prior = guard { repo.getOrNull(diskPath) }
|
||||
@@ -708,27 +697,35 @@ class YandexStorageAccessor(
|
||||
info?.let {
|
||||
_filesUpdates.emit(DataPage(listOf(it), pageLength = 1, pageIndex = 0))
|
||||
}
|
||||
appendSyncEntry(path = path, operation = StorageSyncOperation.UPSERT)
|
||||
if (recordSyncJournal) {
|
||||
appendSyncEntry(path = path, operation = StorageSyncOperation.UPSERT)
|
||||
}
|
||||
} finally {
|
||||
tmp.delete()
|
||||
}
|
||||
}
|
||||
|
||||
private suspend fun appendSyncEntry(path: String, operation: StorageSyncOperation) {
|
||||
val cleanedPath = if (path.startsWith("/")) path else "/$path"
|
||||
val entries = readSyncJournal()
|
||||
val nextSequence = (entries.maxOfOrNull { it.revision.sequence } ?: 0L) + 1L
|
||||
val entry = StorageSyncJournalEntry(
|
||||
path = cleanedPath,
|
||||
operation = operation,
|
||||
revision = StorageSyncRevision(
|
||||
sequence = nextSequence,
|
||||
actorId = syncActorId,
|
||||
createdAt = Instant.now(),
|
||||
val cleanedPath = StorageSyncPaths.normalize(path)
|
||||
if (!StorageSyncPaths.isSyncableUserPath(cleanedPath)) {
|
||||
return
|
||||
}
|
||||
val journal = readSyncJournal()
|
||||
val nextSequence = (journal.values.maxOfOrNull { it.revision.sequence } ?: 0L) + 1L
|
||||
putSyncJournalEntries(
|
||||
mapOf(
|
||||
cleanedPath to StorageSyncJournalEntry(
|
||||
path = cleanedPath,
|
||||
operation = operation,
|
||||
revision = StorageSyncRevision(
|
||||
sequence = nextSequence,
|
||||
actorId = syncActorId,
|
||||
createdAt = Instant.now(),
|
||||
),
|
||||
originStorageUuid = storageUuid,
|
||||
),
|
||||
),
|
||||
originStorageUuid = storageUuid,
|
||||
)
|
||||
appendSyncJournal(listOf(entry))
|
||||
}
|
||||
|
||||
private suspend fun touchParentDirs(path: String) {
|
||||
|
||||
Reference in New Issue
Block a user