Compare commits

..

5 Commits

Author SHA1 Message Date
99cb410919 Native Яндекс 2026-05-21 22:57:53 +03:00
d3eac81660 Причина синхронизации и временная метка в логах 2026-05-21 22:30:40 +03:00
d0f490a3fd refactor(sync): перевёл журнал на map по пути и убрал цикл debounce-sync
Журнал хранится как словарь path→entry, служебные пути исключены из sync.
Apply пишет файлы без записи в журнал; bootstrap не триггерит sync во время работы.
2026-05-21 22:05:57 +03:00
51e6f40587 fix(sync): стабилизировал синхронизацию, Yandex I/O и вёрстку карточки storage
Добавил TRASH вместо DELETE для moveToTrash, компакцию журналов и отчёт об ошибках apply.
Исправил проброс ошибок upload Yandex при close, CAS lock и загрузку OAuth-токена.
Упростил совместимость sync-групп (только encInfo), поправил растягивание StorageTree при недоступных meta.
2026-05-21 18:46:03 +03:00
ef40aa9e73 Исправлен ворнинг 2026-05-21 13:14:00 +03:00
37 changed files with 712 additions and 283 deletions

View File

@@ -80,7 +80,7 @@ class YandexSignInService @Inject constructor(
return return
} }
synchronized(this) { pending = onResult } synchronized(this) { pending = onResult }
l.launch(YandexAuthLoginOptions(LoginType.WEBVIEW)) l.launch(YandexAuthLoginOptions(LoginType.NATIVE))
} }
private fun mapYandexResult(result: YandexAuthResult): VaultLinkOutcome = when (result) { private fun mapYandexResult(result: YandexAuthResult): VaultLinkOutcome = when (result) {

View File

@@ -1,8 +1,10 @@
package com.github.nullptroma.wallenc.app.sync package com.github.nullptroma.wallenc.app.sync
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncPaths
import com.github.nullptroma.wallenc.domain.interfaces.IVaultsManager import com.github.nullptroma.wallenc.domain.interfaces.IVaultsManager
import com.github.nullptroma.wallenc.ui.R import com.github.nullptroma.wallenc.ui.R
import com.github.nullptroma.wallenc.ui.resources.UiStringResolver import com.github.nullptroma.wallenc.ui.resources.UiStringResolver
import com.github.nullptroma.wallenc.domain.tasks.StorageSyncTriggerReason
import com.github.nullptroma.wallenc.usecases.RunStorageSyncUseCase import com.github.nullptroma.wallenc.usecases.RunStorageSyncUseCase
import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Dispatchers
@@ -10,6 +12,7 @@ import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.SupervisorJob import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.flow.collectLatest import kotlinx.coroutines.flow.collectLatest
import kotlinx.coroutines.flow.debounce import kotlinx.coroutines.flow.debounce
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.merge import kotlinx.coroutines.flow.merge
import kotlinx.coroutines.launch import kotlinx.coroutines.launch
@@ -35,16 +38,31 @@ class StorageSyncBootstrap @Inject constructor(
} }
val triggers = storages.flatMap { storage -> val triggers = storages.flatMap { storage ->
listOf( listOf(
storage.accessor.filesUpdates.map {}, storage.accessor.filesUpdates
storage.accessor.dirsUpdates.map {}, .filter { page ->
page.data.any { file ->
StorageSyncPaths.isSyncableUserPath(file.metaInfo.path)
}
}
.map {},
storage.accessor.dirsUpdates
.filter { page ->
page.data.any { dir ->
StorageSyncPaths.isSyncableUserPath(dir.metaInfo.path)
}
}
.map {},
) )
} }
merge(*triggers.toTypedArray()) merge(*triggers.toTypedArray())
.debounce(DEBOUNCE_AFTER_CHANGE_MS) .debounce(DEBOUNCE_AFTER_CHANGE_MS)
.collect { .collect {
if (syncRunner.syncRunning.value) {
return@collect
}
syncRunner.enqueue( syncRunner.enqueue(
displayTitle = uiStrings(R.string.task_title_storage_sync_background), displayTitle = uiStrings(R.string.task_title_storage_sync_background),
logReason = "debounce", reason = StorageSyncTriggerReason.Debounce,
) )
} }
} }

View File

@@ -4,6 +4,7 @@ import android.content.Context
import androidx.hilt.work.HiltWorker import androidx.hilt.work.HiltWorker
import androidx.work.CoroutineWorker import androidx.work.CoroutineWorker
import androidx.work.WorkerParameters import androidx.work.WorkerParameters
import com.github.nullptroma.wallenc.domain.tasks.StorageSyncTriggerReason
import com.github.nullptroma.wallenc.usecases.RunStorageSyncUseCase import com.github.nullptroma.wallenc.usecases.RunStorageSyncUseCase
import dagger.assisted.Assisted import dagger.assisted.Assisted
import dagger.assisted.AssistedInject import dagger.assisted.AssistedInject
@@ -19,7 +20,7 @@ class StorageSyncWorker @AssistedInject constructor(
override suspend fun doWork(): Result { override suspend fun doWork(): Result {
Timber.d("Periodic storage sync started (attempt %d)", runAttemptCount) Timber.d("Periodic storage sync started (attempt %d)", runAttemptCount)
return runCatching { return runCatching {
syncRunner.runBlocking() syncRunner.runBlocking(StorageSyncTriggerReason.Background)
Timber.d("Periodic storage sync finished") Timber.d("Periodic storage sync finished")
Result.success() Result.success()
}.getOrElse { error -> }.getOrElse { error ->

View File

@@ -54,28 +54,26 @@ class YandexDiskApiFactory(
} }
/** /**
* Провайдер токена читает [YandexAccountRepository] на IO-диспетчере (как и остальной data-слой). * OAuth-токен загружается один раз при создании API (не в OkHttp interceptor).
* При 401 см. [YandexDiskAuthException] и повторную привязку vault.
*/ */
fun createApiForVault(vaultUuid: UUID): YandexDiskApi { fun createApiForVault(vaultUuid: UUID): YandexDiskApi {
val id = vaultUuid.toString() val id = vaultUuid.toString()
val token = runBlocking(ioDispatcher) {
accountRepository.getByVaultUuid(id)?.oauthToken
} ?: throw java.io.IOException("Yandex OAuth token is missing")
oauthTokenCache[id] = System.currentTimeMillis() to token
return createAuthenticatedApi { return createAuthenticatedApi {
val now = System.currentTimeMillis() oauthTokenCache[id]?.second ?: token
val hit = oauthTokenCache[id]
if (hit != null && now - hit.first < OAUTH_TOKEN_CACHE_TTL_MS) {
return@createAuthenticatedApi hit.second
}
val token = runBlocking(ioDispatcher) {
accountRepository.getByVaultUuid(id)?.oauthToken
} ?: throw java.io.IOException("Yandex OAuth token is missing")
oauthTokenCache[id] = now to token
token
} }
} }
fun invalidateTokenCache(vaultUuid: UUID) {
oauthTokenCache.remove(vaultUuid.toString())
}
companion object { companion object {
const val BASE_URL = "https://cloud-api.yandex.net/" const val BASE_URL = "https://cloud-api.yandex.net/"
private const val OAUTH_TOKEN_CACHE_TTL_MS = 120_000L
fun createRepositoryWithToken( fun createRepositoryWithToken(
oauthToken: String, oauthToken: String,
ioDispatcher: CoroutineDispatcher, ioDispatcher: CoroutineDispatcher,

View File

@@ -0,0 +1,29 @@
package com.github.nullptroma.wallenc.domain.vault.storages.common
import com.fasterxml.jackson.databind.ObjectMapper
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournal
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournalEntry
import java.util.LinkedHashMap
internal object StorageSyncJournalCodec {
private val journalJavaType = ObjectMapper().typeFactory.constructMapType(
LinkedHashMap::class.java,
String::class.java,
StorageSyncJournalEntry::class.java,
)
fun read(mapper: ObjectMapper, bytes: ByteArray): StorageSyncJournal {
if (bytes.isEmpty()) {
return emptyMap()
}
return runCatching {
@Suppress("UNCHECKED_CAST")
mapper.readValue(bytes, journalJavaType) as StorageSyncJournal
}.getOrElse {
emptyMap<String, StorageSyncJournalEntry>()
}
}
fun write(mapper: ObjectMapper, journal: StorageSyncJournal): ByteArray =
mapper.writeValueAsBytes(journal)
}

View File

@@ -1,6 +1,8 @@
package com.github.nullptroma.wallenc.domain.vault.storages.encrypt package com.github.nullptroma.wallenc.domain.vault.storages.encrypt
import com.fasterxml.jackson.module.kotlin.readValue
import com.github.nullptroma.wallenc.domain.errors.WallencException import com.github.nullptroma.wallenc.domain.errors.WallencException
import com.github.nullptroma.wallenc.domain.vault.storages.common.StorageSyncJournalCodec
import com.github.nullptroma.wallenc.domain.vault.storages.common.readSystemFileBytesOrEmpty import com.github.nullptroma.wallenc.domain.vault.storages.common.readSystemFileBytesOrEmpty
import com.github.nullptroma.wallenc.domain.vault.utils.CloseHandledStreamExtension.Companion.onClosed import com.github.nullptroma.wallenc.domain.vault.utils.CloseHandledStreamExtension.Companion.onClosed
import com.github.nullptroma.wallenc.domain.vault.utils.CloseHandledStreamExtension.Companion.onClosing import com.github.nullptroma.wallenc.domain.vault.utils.CloseHandledStreamExtension.Companion.onClosing
@@ -15,7 +17,10 @@ import com.github.nullptroma.wallenc.domain.interfaces.IDirectory
import com.github.nullptroma.wallenc.domain.interfaces.IFile import com.github.nullptroma.wallenc.domain.interfaces.IFile
import com.github.nullptroma.wallenc.domain.interfaces.IMetaInfo import com.github.nullptroma.wallenc.domain.interfaces.IMetaInfo
import com.github.nullptroma.wallenc.domain.interfaces.IStorageAccessor import com.github.nullptroma.wallenc.domain.interfaces.IStorageAccessor
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournal
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournalEntry import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournalEntry
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournalMerge
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncPaths
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncLock import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncLock
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncOperation import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncOperation
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncRevision import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncRevision
@@ -46,6 +51,7 @@ class EncryptedStorageAccessor(
private val scope: CoroutineScope private val scope: CoroutineScope
) : IStorageAccessor, DisposableHandle { ) : IStorageAccessor, DisposableHandle {
private val syncActorId = UUID.randomUUID().toString() private val syncActorId = UUID.randomUUID().toString()
private val syncLockMutex = Mutex()
private val _size = MutableStateFlow<Long?>(null) private val _size = MutableStateFlow<Long?>(null)
override val size: StateFlow<Long?> = _size override val size: StateFlow<Long?> = _size
@@ -63,7 +69,6 @@ class EncryptedStorageAccessor(
private val dataEncryptor = Encryptor(key.toAesKey()) private val dataEncryptor = Encryptor(key.toAesKey())
private val pathEncryptor: EncryptorWithStaticIv? = if(pathIv != null) EncryptorWithStaticIv(key.toAesKey(), pathIv) else null private val pathEncryptor: EncryptorWithStaticIv? = if(pathIv != null) EncryptorWithStaticIv(key.toAesKey(), pathIv) else null
private val syncLockMutex = Mutex()
private var systemHiddenFilesIsActual = false private var systemHiddenFilesIsActual = false
init { init {
@@ -257,7 +262,8 @@ class EncryptedStorageAccessor(
appendSyncEntry(path = path, operation = StorageSyncOperation.DELETE) appendSyncEntry(path = path, operation = StorageSyncOperation.DELETE)
} }
override suspend fun openWrite(path: String): OutputStream = openWriteInternal(path, recordJournal = true) override suspend fun openWrite(path: String, recordSyncJournal: Boolean): OutputStream =
openWriteInternal(path, recordJournal = recordSyncJournal)
override suspend fun openRead(path: String): InputStream { override suspend fun openRead(path: String): InputStream {
val stream = source.openRead(encryptPath(path)) val stream = source.openRead(encryptPath(path))
@@ -266,7 +272,7 @@ class EncryptedStorageAccessor(
override suspend fun moveToTrash(path: String) { override suspend fun moveToTrash(path: String) {
source.moveToTrash(encryptPath(path)) source.moveToTrash(encryptPath(path))
appendSyncEntry(path = path, operation = StorageSyncOperation.DELETE) appendSyncEntry(path = path, operation = StorageSyncOperation.TRASH)
} }
override fun dispose() { override fun dispose() {
@@ -290,35 +296,22 @@ class EncryptedStorageAccessor(
} }
} }
override suspend fun readSyncJournal(): List<StorageSyncJournalEntry> { override suspend fun readSyncJournal(): StorageSyncJournal {
val bytes = readSystemFileBytesOrEmpty { openReadSystemFile(SYNC_JOURNAL_FILENAME) } val bytes = readSystemFileBytesOrEmpty { openReadSystemFile(SYNC_JOURNAL_FILENAME) }
if (bytes.isEmpty()) { return StorageSyncJournalCodec.read(jackson, bytes)
return emptyList()
}
return runCatching {
val journalType = jackson.typeFactory.constructCollectionType(
List::class.java,
StorageSyncJournalEntry::class.java,
)
jackson.readValue<List<StorageSyncJournalEntry>>(bytes, journalType)
}.getOrElse {
emptyList()
}
} }
override suspend fun appendSyncJournal(entries: List<StorageSyncJournalEntry>) { override suspend fun putSyncJournalEntries(entries: StorageSyncJournal) {
if (entries.isEmpty()) { if (entries.isEmpty()) {
return return
} }
val next = readSyncJournal().toMutableList().apply { addAll(entries) } val merged = StorageSyncJournalMerge.merge(readSyncJournal(), entries)
openWriteSystemFile(SYNC_JOURNAL_FILENAME).use { out -> writeSyncJournal(merged)
jackson.writeValue(out, next)
}
} }
override suspend fun rewriteSyncJournal(entries: List<StorageSyncJournalEntry>) { private suspend fun writeSyncJournal(journal: StorageSyncJournal) {
openWriteSystemFile(SYNC_JOURNAL_FILENAME).use { out -> openWriteSystemFile(SYNC_JOURNAL_FILENAME).use { out ->
jackson.writeValue(out, entries) jackson.writeValue(out, journal)
} }
} }
@@ -376,15 +369,15 @@ class EncryptedStorageAccessor(
} }
private suspend fun appendSyncEntry(path: String, operation: StorageSyncOperation) { private suspend fun appendSyncEntry(path: String, operation: StorageSyncOperation) {
val cleanedPath = if (path.startsWith("/")) path else "/$path" val cleanedPath = StorageSyncPaths.normalize(path)
if (cleanedPath.startsWith("/$systemHiddenDirName/")) { if (!StorageSyncPaths.isSyncableUserPath(cleanedPath)) {
return return
} }
val entries = readSyncJournal() val journal = readSyncJournal()
val nextSequence = (entries.maxOfOrNull { it.revision.sequence } ?: 0L) + 1L val nextSequence = (journal.values.maxOfOrNull { it.revision.sequence } ?: 0L) + 1L
appendSyncJournal( putSyncJournalEntries(
listOf( mapOf(
StorageSyncJournalEntry( cleanedPath to StorageSyncJournalEntry(
path = cleanedPath, path = cleanedPath,
operation = operation, operation = operation,
revision = StorageSyncRevision( revision = StorageSyncRevision(

View File

@@ -1,6 +1,7 @@
package com.github.nullptroma.wallenc.domain.vault.storages.local package com.github.nullptroma.wallenc.domain.vault.storages.local
import com.github.nullptroma.wallenc.domain.errors.WallencException import com.github.nullptroma.wallenc.domain.errors.WallencException
import com.github.nullptroma.wallenc.domain.vault.storages.common.StorageSyncJournalCodec
import com.github.nullptroma.wallenc.domain.vault.storages.common.readSystemFileBytesOrEmpty import com.github.nullptroma.wallenc.domain.vault.storages.common.readSystemFileBytesOrEmpty
import com.fasterxml.jackson.core.JacksonException import com.fasterxml.jackson.core.JacksonException
@@ -15,9 +16,12 @@ import com.github.nullptroma.wallenc.domain.interfaces.IDirectory
import com.github.nullptroma.wallenc.domain.interfaces.IFile import com.github.nullptroma.wallenc.domain.interfaces.IFile
import com.github.nullptroma.wallenc.domain.interfaces.IMetaInfo import com.github.nullptroma.wallenc.domain.interfaces.IMetaInfo
import com.github.nullptroma.wallenc.domain.interfaces.IStorageAccessor import com.github.nullptroma.wallenc.domain.interfaces.IStorageAccessor
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournal
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournalEntry import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournalEntry
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournalMerge
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncLock import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncLock
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncOperation import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncOperation
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncPaths
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncRevision import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncRevision
import kotlinx.coroutines.CoroutineDispatcher import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.CoroutineScope
@@ -523,7 +527,7 @@ class LocalStorageAccessor(
} }
} }
override suspend fun openWrite(path: String): OutputStream = withContext(ioDispatcher) { override suspend fun openWrite(path: String, recordSyncJournal: Boolean): OutputStream = withContext(ioDispatcher) {
touchFileInternal(path, recordJournal = false) touchFileInternal(path, recordJournal = false)
val pair = LocalStorageFilePair.from(_filesystemBasePath, path) val pair = LocalStorageFilePair.from(_filesystemBasePath, path)
?: throw WallencException.Storage.FileNotFound() ?: throw WallencException.Storage.FileNotFound()
@@ -531,7 +535,9 @@ class LocalStorageAccessor(
CoroutineScope(ioDispatcher).launch { CoroutineScope(ioDispatcher).launch {
touchFileInternal(path, recordJournal = false) touchFileInternal(path, recordJournal = false)
scanSizeAndNumOfFiles() scanSizeAndNumOfFiles()
appendSyncEntry(path = path, operation = StorageSyncOperation.UPSERT) if (recordSyncJournal) {
appendSyncEntry(path = path, operation = StorageSyncOperation.UPSERT)
}
} }
} }
} }
@@ -547,7 +553,7 @@ class LocalStorageAccessor(
?: throw WallencException.Storage.FileNotFound() ?: throw WallencException.Storage.FileNotFound()
val newMeta = pair.meta.copy(isDeleted = true) val newMeta = pair.meta.copy(isDeleted = true)
writeMeta(pair.metaFile, newMeta) writeMeta(pair.metaFile, newMeta)
appendSyncEntry(path = path, operation = StorageSyncOperation.DELETE) appendSyncEntry(path = path, operation = StorageSyncOperation.TRASH)
} }
override suspend fun openReadSystemFile(name: String): InputStream = withContext(ioDispatcher) { override suspend fun openReadSystemFile(name: String): InputStream = withContext(ioDispatcher) {
@@ -572,33 +578,22 @@ class LocalStorageAccessor(
return@withContext file.outputStream() return@withContext file.outputStream()
} }
override suspend fun readSyncJournal(): List<StorageSyncJournalEntry> = withContext(ioDispatcher) { override suspend fun readSyncJournal(): StorageSyncJournal = withContext(ioDispatcher) {
val bytes = readSystemFileBytesOrEmpty { openReadSystemFile(SYNC_JOURNAL_FILENAME) } val bytes = readSystemFileBytesOrEmpty { openReadSystemFile(SYNC_JOURNAL_FILENAME) }
if (bytes.isEmpty()) { StorageSyncJournalCodec.read(jackson, bytes)
return@withContext emptyList()
}
return@withContext runCatching {
jackson.readValue<List<StorageSyncJournalEntry>>(bytes)
}.getOrElse {
emptyList()
}
} }
override suspend fun appendSyncJournal(entries: List<StorageSyncJournalEntry>) = withContext(ioDispatcher) { override suspend fun putSyncJournalEntries(entries: StorageSyncJournal) = withContext(ioDispatcher) {
if (entries.isEmpty()) { if (entries.isEmpty()) {
return@withContext return@withContext
} }
val next = readSyncJournal().toMutableList().apply { val merged = StorageSyncJournalMerge.merge(readSyncJournal(), entries)
addAll(entries) writeSyncJournal(merged)
}
openWriteSystemFile(SYNC_JOURNAL_FILENAME).use { out ->
jackson.writeValue(out, next)
}
} }
override suspend fun rewriteSyncJournal(entries: List<StorageSyncJournalEntry>) = withContext(ioDispatcher) { private suspend fun writeSyncJournal(journal: StorageSyncJournal) {
openWriteSystemFile(SYNC_JOURNAL_FILENAME).use { out -> openWriteSystemFile(SYNC_JOURNAL_FILENAME).use { out ->
jackson.writeValue(out, entries) jackson.writeValue(out, journal)
} }
} }
@@ -722,19 +717,25 @@ class LocalStorageAccessor(
} }
private suspend fun appendSyncEntry(path: String, operation: StorageSyncOperation) { private suspend fun appendSyncEntry(path: String, operation: StorageSyncOperation) {
val cleanedPath = if (path.startsWith("/")) path else "/$path" val cleanedPath = StorageSyncPaths.normalize(path)
val entries = readSyncJournal() if (!StorageSyncPaths.isSyncableUserPath(cleanedPath)) {
val nextSequence = (entries.maxOfOrNull { it.revision.sequence } ?: 0L) + 1L return
val entry = StorageSyncJournalEntry( }
path = cleanedPath, val journal = readSyncJournal()
operation = operation, val nextSequence = (journal.values.maxOfOrNull { it.revision.sequence } ?: 0L) + 1L
revision = StorageSyncRevision( putSyncJournalEntries(
sequence = nextSequence, mapOf(
actorId = syncActorId, cleanedPath to StorageSyncJournalEntry(
createdAt = Instant.now(), path = cleanedPath,
operation = operation,
revision = StorageSyncRevision(
sequence = nextSequence,
actorId = syncActorId,
createdAt = Instant.now(),
),
),
), ),
) )
appendSyncJournal(listOf(entry))
} }
companion object { companion object {

View File

@@ -2,6 +2,7 @@ package com.github.nullptroma.wallenc.domain.vault.storages.yandex
import com.github.nullptroma.wallenc.domain.errors.WallencException import com.github.nullptroma.wallenc.domain.errors.WallencException
import com.github.nullptroma.wallenc.domain.vault.errors.toVaultWallencException import com.github.nullptroma.wallenc.domain.vault.errors.toVaultWallencException
import com.github.nullptroma.wallenc.domain.vault.storages.common.StorageSyncJournalCodec
import com.github.nullptroma.wallenc.domain.vault.storages.common.readSystemFileBytesOrEmpty import com.github.nullptroma.wallenc.domain.vault.storages.common.readSystemFileBytesOrEmpty
import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
@@ -17,7 +18,10 @@ import com.github.nullptroma.wallenc.domain.datatypes.DataPage
import com.github.nullptroma.wallenc.domain.interfaces.IDirectory import com.github.nullptroma.wallenc.domain.interfaces.IDirectory
import com.github.nullptroma.wallenc.domain.interfaces.IFile import com.github.nullptroma.wallenc.domain.interfaces.IFile
import com.github.nullptroma.wallenc.domain.interfaces.IStorageAccessor import com.github.nullptroma.wallenc.domain.interfaces.IStorageAccessor
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournal
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournalEntry import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournalEntry
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournalMerge
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncPaths
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncLock import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncLock
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncOperation import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncOperation
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncRevision import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncRevision
@@ -527,39 +531,13 @@ class YandexStorageAccessor(
appendSyncEntry(path = path, operation = StorageSyncOperation.DELETE) appendSyncEntry(path = path, operation = StorageSyncOperation.DELETE)
} }
override suspend fun openWrite(path: String): OutputStream = withContext(ioDispatcher) { override suspend fun openWrite(path: String, recordSyncJournal: Boolean): OutputStream = withContext(ioDispatcher) {
touchParentDirs(path) touchParentDirs(path)
val tmp = File.createTempFile("wallenc-yandex-", ".upload") val tmp = File.createTempFile("wallenc-yandex-", ".upload")
val fos = FileOutputStream(tmp) val fos = FileOutputStream(tmp)
fos.onClosed { fos.onClosed {
runBlocking(ioDispatcher) { runCommitAfterStreamClosed {
try { commitUploadedFile(path, tmp, recordSyncJournal)
val diskPath = toDiskPath(path)
val prior = guard { repo.getOrNull(diskPath) }
if (prior?.type == "dir") {
throw WallencException.Storage.CannotWriteOverDirectory()
}
val hadFile = prior?.type == "file"
val priorSize = if (prior?.type == "file") prior.size ?: 0L else 0L
guard { repo.uploadFile(diskPath, tmp, overwrite = true) }
val after = guard { getMetadataAfterWrite(diskPath) }
if (after.type != "file") {
throw WallencException.Storage.UnexpectedState()
}
val newSize = after.size ?: 0L
_size.value = ((_size.value ?: 0L) + newSize - priorSize).coerceAtLeast(0L)
if (!hadFile) {
_numberOfFiles.value = (_numberOfFiles.value ?: 0) + 1
}
persistStatsImmediate()
val info = runCatching { after.toCommonFile(path) }.getOrNull()
info?.let {
_filesUpdates.emit(DataPage(listOf(it), pageLength = 1, pageIndex = 0))
}
appendSyncEntry(path = path, operation = StorageSyncOperation.UPSERT)
} finally {
tmp.delete()
}
} }
} }
} }
@@ -570,7 +548,7 @@ class YandexStorageAccessor(
override suspend fun moveToTrash(path: String) = withContext(ioDispatcher) { override suspend fun moveToTrash(path: String) = withContext(ioDispatcher) {
patchCustomProps(path, mapOf(PROP_DELETED to "true")) patchCustomProps(path, mapOf(PROP_DELETED to "true"))
appendSyncEntry(path = path, operation = StorageSyncOperation.DELETE) appendSyncEntry(path = path, operation = StorageSyncOperation.TRASH)
} }
override suspend fun openReadSystemFile(name: String): InputStream = withContext(ioDispatcher) { override suspend fun openReadSystemFile(name: String): InputStream = withContext(ioDispatcher) {
@@ -589,45 +567,30 @@ class YandexStorageAccessor(
val rel = "/$SYSTEM_HIDDEN_DIRNAME/$name" val rel = "/$SYSTEM_HIDDEN_DIRNAME/$name"
val uploadBuffer = ByteArrayOutputStream() val uploadBuffer = ByteArrayOutputStream()
uploadBuffer.onClosed { uploadBuffer.onClosed {
runBlocking(ioDispatcher) { val bytes = uploadBuffer.toByteArray()
guard { val diskPath = toDiskPath(rel)
repo.uploadBytes(toDiskPath(rel), uploadBuffer.toByteArray(), overwrite = true) runCommitAfterStreamClosed {
} guard { repo.uploadBytes(diskPath, bytes, overwrite = true) }
} }
} }
} }
override suspend fun readSyncJournal(): List<StorageSyncJournalEntry> = withContext(ioDispatcher) { override suspend fun readSyncJournal(): StorageSyncJournal = withContext(ioDispatcher) {
val bytes = readSystemFileBytesOrEmpty { openReadSystemFile(SYNC_JOURNAL_FILENAME) } val bytes = readSystemFileBytesOrEmpty { openReadSystemFile(SYNC_JOURNAL_FILENAME) }
if (bytes.isEmpty()) { StorageSyncJournalCodec.read(statsMapper, bytes)
return@withContext emptyList()
}
return@withContext runCatching {
val journalType = statsMapper.typeFactory.constructCollectionType(
List::class.java,
StorageSyncJournalEntry::class.java,
)
statsMapper.readValue<List<StorageSyncJournalEntry>>(bytes, journalType)
}.getOrElse {
emptyList()
}
} }
override suspend fun appendSyncJournal(entries: List<StorageSyncJournalEntry>) = withContext(ioDispatcher) { override suspend fun putSyncJournalEntries(entries: StorageSyncJournal) = withContext(ioDispatcher) {
if (entries.isEmpty()) { if (entries.isEmpty()) {
return@withContext return@withContext
} }
val next = readSyncJournal().toMutableList().apply { val merged = StorageSyncJournalMerge.merge(readSyncJournal(), entries)
addAll(entries) writeSyncJournal(merged)
}
openWriteSystemFile(SYNC_JOURNAL_FILENAME).use { out ->
statsMapper.writeValue(out, next)
}
} }
override suspend fun rewriteSyncJournal(entries: List<StorageSyncJournalEntry>) = withContext(ioDispatcher) { private suspend fun writeSyncJournal(journal: StorageSyncJournal) {
openWriteSystemFile(SYNC_JOURNAL_FILENAME).use { out -> openWriteSystemFile(SYNC_JOURNAL_FILENAME).use { out ->
statsMapper.writeValue(out, entries) statsMapper.writeValue(out, journal)
} }
} }
@@ -641,8 +604,25 @@ class YandexStorageAccessor(
}.getOrNull() }.getOrNull()
} }
/**
* Best-effort lock на Диске (read-modify-write без CAS на стороне API).
* Межустройственная координация опирается на [StorageSyncEngine] mutex в процессе.
*/
override suspend fun tryAcquireSyncLock(holderId: String, leaseUntil: Instant): Boolean = withContext(ioDispatcher) { override suspend fun tryAcquireSyncLock(holderId: String, leaseUntil: Instant): Boolean = withContext(ioDispatcher) {
return@withContext syncLockMutex.withLock { repeat(SYNC_LOCK_CAS_RETRIES) { attempt ->
val acquired = tryAcquireSyncLockOnce(holderId, leaseUntil)
if (acquired) {
return@withContext true
}
if (attempt < SYNC_LOCK_CAS_RETRIES - 1) {
delay(SYNC_LOCK_CAS_DELAY_MS)
}
}
return@withContext false
}
private suspend fun tryAcquireSyncLockOnce(holderId: String, leaseUntil: Instant): Boolean {
return syncLockMutex.withLock {
val current = readSyncLock() val current = readSyncLock()
val now = Instant.now() val now = Instant.now()
val foreignLockActive = current != null && val foreignLockActive = current != null &&
@@ -684,21 +664,68 @@ class YandexStorageAccessor(
} }
} }
/**
* Выполняется из [OutputStream.close]; ошибки upload пробрасываются вызывающему коду.
*/
private fun runCommitAfterStreamClosed(block: suspend () -> Unit) {
runBlocking(ioDispatcher) {
block()
}
}
private suspend fun commitUploadedFile(path: String, tmp: File, recordSyncJournal: Boolean) {
try {
val diskPath = toDiskPath(path)
val prior = guard { repo.getOrNull(diskPath) }
if (prior?.type == "dir") {
throw WallencException.Storage.CannotWriteOverDirectory()
}
val hadFile = prior?.type == "file"
val priorSize = if (prior?.type == "file") prior.size ?: 0L else 0L
guard { repo.uploadFile(diskPath, tmp, overwrite = true) }
val after = guard { getMetadataAfterWrite(diskPath) }
if (after.type != "file") {
throw WallencException.Storage.UnexpectedState()
}
val newSize = after.size ?: 0L
_size.value = ((_size.value ?: 0L) + newSize - priorSize).coerceAtLeast(0L)
if (!hadFile) {
_numberOfFiles.value = (_numberOfFiles.value ?: 0) + 1
}
persistStatsImmediate()
val info = runCatching { after.toCommonFile(path) }.getOrNull()
info?.let {
_filesUpdates.emit(DataPage(listOf(it), pageLength = 1, pageIndex = 0))
}
if (recordSyncJournal) {
appendSyncEntry(path = path, operation = StorageSyncOperation.UPSERT)
}
} finally {
tmp.delete()
}
}
private suspend fun appendSyncEntry(path: String, operation: StorageSyncOperation) { private suspend fun appendSyncEntry(path: String, operation: StorageSyncOperation) {
val cleanedPath = if (path.startsWith("/")) path else "/$path" val cleanedPath = StorageSyncPaths.normalize(path)
val entries = readSyncJournal() if (!StorageSyncPaths.isSyncableUserPath(cleanedPath)) {
val nextSequence = (entries.maxOfOrNull { it.revision.sequence } ?: 0L) + 1L return
val entry = StorageSyncJournalEntry( }
path = cleanedPath, val journal = readSyncJournal()
operation = operation, val nextSequence = (journal.values.maxOfOrNull { it.revision.sequence } ?: 0L) + 1L
revision = StorageSyncRevision( putSyncJournalEntries(
sequence = nextSequence, mapOf(
actorId = syncActorId, cleanedPath to StorageSyncJournalEntry(
createdAt = Instant.now(), path = cleanedPath,
operation = operation,
revision = StorageSyncRevision(
sequence = nextSequence,
actorId = syncActorId,
createdAt = Instant.now(),
),
originStorageUuid = storageUuid,
),
), ),
originStorageUuid = storageUuid,
) )
appendSyncJournal(listOf(entry))
} }
private suspend fun touchParentDirs(path: String) { private suspend fun touchParentDirs(path: String) {
@@ -726,6 +753,8 @@ class YandexStorageAccessor(
private const val SYNC_JOURNAL_FILENAME = "sync-journal.json" private const val SYNC_JOURNAL_FILENAME = "sync-journal.json"
private const val SYNC_LOCK_FILENAME = "sync-lock.json" private const val SYNC_LOCK_FILENAME = "sync-lock.json"
private const val SYNC_LOCK_STALE_TIMEOUT_SECONDS: Long = 60 * 60 private const val SYNC_LOCK_STALE_TIMEOUT_SECONDS: Long = 60 * 60
private const val SYNC_LOCK_CAS_RETRIES = 3
private const val SYNC_LOCK_CAS_DELAY_MS = 80L
private const val STATS_DEBOUNCE_MS = 450L private const val STATS_DEBOUNCE_MS = 450L
private const val DATA_PAGE_LENGTH = 10 private const val DATA_PAGE_LENGTH = 10
private const val API_LIST_LIMIT = 1000 private const val API_LIST_LIMIT = 1000

View File

@@ -27,11 +27,19 @@ private class CloseHandledOutputStream(
override fun close() { override fun close() {
onClosing() onClosing()
var streamFailure: Throwable? = null
try { try {
stream.close() stream.close()
} finally { } catch (t: Throwable) {
onClose() streamFailure = t
} }
try {
onClose()
} catch (afterCloseFailure: Throwable) {
streamFailure?.let { afterCloseFailure.addSuppressed(it) }
throw afterCloseFailure
}
streamFailure?.let { throw it }
} }
} }

View File

@@ -0,0 +1,59 @@
package com.github.nullptroma.wallenc.domain.datatypes
/** Журнал синхронизации: один актуальный [StorageSyncJournalEntry] на нормализованный путь. */
typealias StorageSyncJournal = Map<String, StorageSyncJournalEntry>
object StorageSyncPaths {
fun normalize(path: String): String = if (path.startsWith("/")) path else "/$path"
/**
* Пути, которые участвуют в sync и попадают в журнал при пользовательских операциях.
* Служебные каталоги, lock/journal/meta и файлы внутри *-enc-dir исключены.
*/
fun isSyncableUserPath(path: String): Boolean {
val p = normalize(path)
if (p == "/" || p.isBlank()) return false
if (p == "/wallenc-yandex-system" || p.startsWith("/wallenc-yandex-system/")) return false
if (p.contains("-enc-dir/") || p.endsWith("-enc-dir")) return false
val name = p.substringAfterLast('/')
if (name == "sync-journal.json" || name == "sync-lock.json") return false
if (name.endsWith(".enc-meta") || name.endsWith(".storage-info")) return false
return true
}
}
object StorageSyncJournalMerge {
fun merge(into: StorageSyncJournal, entries: Map<String, StorageSyncJournalEntry>): StorageSyncJournal {
if (entries.isEmpty()) return into
val result = into.toMutableMap()
for ((rawPath, entry) in entries) {
val path = StorageSyncPaths.normalize(rawPath)
if (!StorageSyncPaths.isSyncableUserPath(path)) continue
val normalizedEntry = entry.copy(path = path)
val current = result[path]
if (current == null || compareEntries(normalizedEntry, current) > 0) {
result[path] = normalizedEntry
}
}
return result
}
fun merge(into: StorageSyncJournal, entry: StorageSyncJournalEntry): StorageSyncJournal =
merge(into, mapOf(entry.path to entry))
fun mergeAll(journals: Collection<StorageSyncJournal>): StorageSyncJournal {
var acc: StorageSyncJournal = emptyMap()
for (journal in journals) {
acc = merge(acc, journal)
}
return acc
}
private fun compareEntries(a: StorageSyncJournalEntry, b: StorageSyncJournalEntry): Int {
val seqCmp = a.revision.sequence.compareTo(b.revision.sequence)
if (seqCmp != 0) return seqCmp
val actorCmp = a.revision.actorId.compareTo(b.revision.actorId)
if (actorCmp != 0) return actorCmp
return a.revision.createdAt.compareTo(b.revision.createdAt)
}
}

View File

@@ -5,6 +5,9 @@ import java.util.UUID
enum class StorageSyncOperation { enum class StorageSyncOperation {
UPSERT, UPSERT,
/** Soft-delete (корзина): на peer вызывается [IStorageAccessor.moveToTrash]. */
TRASH,
/** Жёсткое удаление файла с носителя. */
DELETE, DELETE,
} }

View File

@@ -1,7 +1,7 @@
package com.github.nullptroma.wallenc.domain.interfaces package com.github.nullptroma.wallenc.domain.interfaces
import com.github.nullptroma.wallenc.domain.datatypes.DataPage import com.github.nullptroma.wallenc.domain.datatypes.DataPage
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournalEntry import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournal
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncLock import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncLock
import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.SharedFlow import kotlinx.coroutines.flow.SharedFlow
@@ -40,7 +40,7 @@ interface IStorageAccessor {
suspend fun touchFile(path: String) suspend fun touchFile(path: String)
suspend fun touchDir(path: String) suspend fun touchDir(path: String)
suspend fun delete(path: String) suspend fun delete(path: String)
suspend fun openWrite(path: String): OutputStream suspend fun openWrite(path: String, recordSyncJournal: Boolean = true): OutputStream
suspend fun openRead(path: String): InputStream suspend fun openRead(path: String): InputStream
suspend fun moveToTrash(path: String) suspend fun moveToTrash(path: String)
@@ -52,9 +52,8 @@ interface IStorageAccessor {
suspend fun openReadSystemFile(name: String): InputStream suspend fun openReadSystemFile(name: String): InputStream
suspend fun openWriteSystemFile(name: String): OutputStream suspend fun openWriteSystemFile(name: String): OutputStream
suspend fun readSyncJournal(): List<StorageSyncJournalEntry> suspend fun readSyncJournal(): StorageSyncJournal
suspend fun appendSyncJournal(entries: List<StorageSyncJournalEntry>) suspend fun putSyncJournalEntries(entries: StorageSyncJournal)
suspend fun rewriteSyncJournal(entries: List<StorageSyncJournalEntry>)
suspend fun readSyncLock(): StorageSyncLock? suspend fun readSyncLock(): StorageSyncLock?
suspend fun tryAcquireSyncLock(holderId: String, leaseUntil: Instant): Boolean suspend fun tryAcquireSyncLock(holderId: String, leaseUntil: Instant): Boolean

View File

@@ -20,4 +20,7 @@ interface ITaskOrchestrator {
fun cancel(taskId: TaskId): Boolean fun cancel(taskId: TaskId): Boolean
fun cancelAll() fun cancelAll()
/** Запись в общий лог пайплайна вне контекста [TaskContext] (например, WorkManager sync). */
fun appendPipelineLog(level: TaskLogLevel, key: TaskLogKey)
} }

View File

@@ -6,6 +6,7 @@ import java.util.UUID
data class PipelineTask( data class PipelineTask(
val id: TaskId, val id: TaskId,
val title: String, val title: String,
val enqueuedAtMs: Long,
val dispatcher: CoroutineDispatcher, val dispatcher: CoroutineDispatcher,
val state: TaskRunState, val state: TaskRunState,
/** UUID storage, для которого идёт задача (кнопки только этой строки в UI). */ /** UUID storage, для которого идёт задача (кнопки только этой строки в UI). */

View File

@@ -0,0 +1,8 @@
package com.github.nullptroma.wallenc.domain.tasks
/** Источник запуска синхронизации хранилищ (для логов пайплайна задач). */
enum class StorageSyncTriggerReason {
Debounce,
SyncTab,
Background,
}

View File

@@ -3,7 +3,10 @@ package com.github.nullptroma.wallenc.domain.tasks
import com.github.nullptroma.wallenc.domain.errors.WallencException import com.github.nullptroma.wallenc.domain.errors.WallencException
sealed class TaskLogKey { sealed class TaskLogKey {
data object SyncStarted : TaskLogKey() data class SyncStarted(val reason: StorageSyncTriggerReason) : TaskLogKey()
data object SyncFinished : TaskLogKey() data class SyncFinished(val reason: StorageSyncTriggerReason) : TaskLogKey()
data class SyncFailed(val error: WallencException) : TaskLogKey() data class SyncFailed(
val error: WallencException,
val reason: StorageSyncTriggerReason,
) : TaskLogKey()
} }

View File

@@ -20,6 +20,7 @@ sealed class TaskProgressLabel {
data class SyncGroupProcessingEntries(val groupId: String, val count: Int) : TaskProgressLabel() data class SyncGroupProcessingEntries(val groupId: String, val count: Int) : TaskProgressLabel()
data class SyncGroupEntryProgress(val groupId: String, val current: Int, val total: Int) : TaskProgressLabel() data class SyncGroupEntryProgress(val groupId: String, val current: Int, val total: Int) : TaskProgressLabel()
data class SyncGroupCompleted(val groupId: String) : TaskProgressLabel() data class SyncGroupCompleted(val groupId: String) : TaskProgressLabel()
data class SyncGroupEntriesFailed(val groupId: String, val failedCount: Int) : TaskProgressLabel()
data class SyncGroupRenewingLocks(val groupId: String) : TaskProgressLabel() data class SyncGroupRenewingLocks(val groupId: String) : TaskProgressLabel()
data class SyncGroupLockRenewalFailed(val groupId: String) : TaskProgressLabel() data class SyncGroupLockRenewalFailed(val groupId: String) : TaskProgressLabel()

View File

@@ -73,6 +73,7 @@ class TaskOrchestrator(
val task = PipelineTask( val task = PipelineTask(
id = id, id = id,
title = title, title = title,
enqueuedAtMs = System.currentTimeMillis(),
dispatcher = dispatcher, dispatcher = dispatcher,
state = TaskRunState.Queued, state = TaskRunState.Queued,
busyStorageUuid = busyStorageUuid, busyStorageUuid = busyStorageUuid,
@@ -104,6 +105,10 @@ class TaskOrchestrator(
} }
} }
override fun appendPipelineLog(level: TaskLogLevel, key: TaskLogKey) {
appendLogLine(level, message = "", logKey = key)
}
private fun replaceTask(id: TaskId, fn: (PipelineTask) -> PipelineTask) { private fun replaceTask(id: TaskId, fn: (PipelineTask) -> PipelineTask) {
synchronized(tasksById) { synchronized(tasksById) {
val cur = tasksById[id] ?: return val cur = tasksById[id] ?: return

View File

@@ -2,7 +2,6 @@ package com.github.nullptroma.wallenc.ui.elements
import android.view.ViewGroup import android.view.ViewGroup
import androidx.camera.core.CameraSelector import androidx.camera.core.CameraSelector
import androidx.camera.core.ExperimentalGetImage
import androidx.camera.core.ImageAnalysis import androidx.camera.core.ImageAnalysis
import androidx.camera.core.Preview import androidx.camera.core.Preview
import androidx.camera.lifecycle.ProcessCameraProvider import androidx.camera.lifecycle.ProcessCameraProvider
@@ -39,8 +38,7 @@ import com.google.mlkit.vision.common.InputImage
import java.util.concurrent.Executors import java.util.concurrent.Executors
import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.atomic.AtomicBoolean
@ExperimentalGetImage @OptIn(ExperimentalMaterial3Api::class)
@OptIn(ExperimentalMaterial3Api::class, ExperimentalGetImage::class)
@Composable @Composable
fun QrScannerDialog( fun QrScannerDialog(
onDismiss: () -> Unit, onDismiss: () -> Unit,

View File

@@ -5,14 +5,12 @@ import androidx.compose.foundation.clickable
import androidx.compose.foundation.interaction.MutableInteractionSource import androidx.compose.foundation.interaction.MutableInteractionSource
import androidx.compose.foundation.layout.Box import androidx.compose.foundation.layout.Box
import androidx.compose.foundation.layout.Column import androidx.compose.foundation.layout.Column
import androidx.compose.foundation.layout.IntrinsicSize
import androidx.compose.foundation.layout.Row import androidx.compose.foundation.layout.Row
import androidx.compose.foundation.layout.Spacer
import androidx.compose.foundation.layout.fillMaxSize
import androidx.compose.foundation.layout.fillMaxWidth import androidx.compose.foundation.layout.fillMaxWidth
import androidx.compose.foundation.layout.height
import androidx.compose.foundation.layout.offset import androidx.compose.foundation.layout.offset
import androidx.compose.foundation.layout.padding import androidx.compose.foundation.layout.padding
import androidx.compose.foundation.layout.widthIn
import androidx.compose.foundation.layout.wrapContentHeight
import androidx.compose.material.icons.Icons import androidx.compose.material.icons.Icons
import androidx.compose.material.icons.filled.Lock import androidx.compose.material.icons.filled.Lock
import androidx.compose.material.icons.filled.LockOpen import androidx.compose.material.icons.filled.LockOpen
@@ -90,17 +88,16 @@ fun StorageTree(
Column(modifier) { Column(modifier) {
Box( Box(
modifier = Modifier modifier = Modifier
.height(IntrinsicSize.Min) .fillMaxWidth()
.wrapContentHeight()
.zIndex(100f), .zIndex(100f),
) { ) {
val interactionSource = remember { MutableInteractionSource() } val interactionSource = remember { MutableInteractionSource() }
Box( Box(
modifier = Modifier modifier = Modifier
.clip( .matchParentSize()
CardDefaults.shape, .padding(end = 16.dp)
) .clip(CardDefaults.shape)
.padding(0.dp, 0.dp, 16.dp, 0.dp)
.fillMaxSize()
.background(borderColor) .background(borderColor)
.clickable( .clickable(
interactionSource = interactionSource, interactionSource = interactionSource,
@@ -112,8 +109,9 @@ fun StorageTree(
Card( Card(
interactionSource = interactionSource, interactionSource = interactionSource,
modifier = Modifier modifier = Modifier
.padding(8.dp, 0.dp, 0.dp, 0.dp) .padding(start = 8.dp)
.fillMaxWidth(), .fillMaxWidth()
.wrapContentHeight(),
elevation = CardDefaults.cardElevation( elevation = CardDefaults.cardElevation(
defaultElevation = 4.dp, defaultElevation = 4.dp,
), ),
@@ -124,8 +122,16 @@ fun StorageTree(
} }
}, },
) { ) {
Row(modifier = Modifier.height(IntrinsicSize.Min)) { Row(
Column(modifier = Modifier.padding(8.dp)) { modifier = Modifier
.fillMaxWidth()
.wrapContentHeight(),
) {
Column(
modifier = Modifier
.weight(1f)
.padding(8.dp),
) {
Text(metaInfo.name ?: stringResource(R.string.no_name)) Text(metaInfo.name ?: stringResource(R.string.no_name))
Text( Text(
text = stringResource( text = stringResource(
@@ -170,7 +176,9 @@ fun StorageTree(
} }
} }
Column( Column(
modifier = Modifier, modifier = Modifier
.widthIn(min = 112.dp)
.padding(end = 4.dp),
horizontalAlignment = Alignment.End, horizontalAlignment = Alignment.End,
) { ) {
var expanded by remember { mutableStateOf(false) } var expanded by remember { mutableStateOf(false) }
@@ -368,7 +376,6 @@ fun StorageTree(
) )
} }
} }
Spacer(modifier = Modifier.weight(1f))
if (isEncrypted) { if (isEncrypted) {
IconButton( IconButton(
onClick = { showLockDialog = true }, onClick = { showLockDialog = true },
@@ -382,18 +389,14 @@ fun StorageTree(
} }
Text( Text(
modifier = Modifier modifier = Modifier
.fillMaxWidth() .padding(top = 4.dp, end = 8.dp),
.padding(0.dp, 0.dp, 12.dp, 0.dp)
.align(Alignment.End),
text = stringResource(getStatusTextRes(tree)), text = stringResource(getStatusTextRes(tree)),
textAlign = TextAlign.End, textAlign = TextAlign.End,
fontSize = 11.sp, fontSize = 11.sp,
) )
Text( Text(
modifier = Modifier modifier = Modifier
.fillMaxWidth() .padding(end = 8.dp, bottom = 8.dp),
.padding(0.dp, 0.dp, 12.dp, 8.dp)
.align(Alignment.End),
text = cur.uuid.toString(), text = cur.uuid.toString(),
textAlign = TextAlign.End, textAlign = TextAlign.End,
fontSize = 8.sp, fontSize = 8.sp,

View File

@@ -1,13 +1,31 @@
package com.github.nullptroma.wallenc.ui.resources package com.github.nullptroma.wallenc.ui.resources
import com.github.nullptroma.wallenc.domain.tasks.StorageSyncTriggerReason
import com.github.nullptroma.wallenc.domain.tasks.TaskLogKey import com.github.nullptroma.wallenc.domain.tasks.TaskLogKey
import com.github.nullptroma.wallenc.ui.R import com.github.nullptroma.wallenc.ui.R
fun TaskLogKey.resolve(resolver: UiStringResolver): String = when (this) { fun TaskLogKey.resolve(resolver: UiStringResolver): String = when (this) {
TaskLogKey.SyncStarted -> resolver(R.string.task_log_sync_started) is TaskLogKey.SyncStarted -> resolver(
TaskLogKey.SyncFinished -> resolver(R.string.task_log_sync_finished) R.string.task_log_sync_started,
resolver.resolveSyncTriggerReason(reason),
)
is TaskLogKey.SyncFinished -> resolver(
R.string.task_log_sync_finished,
resolver.resolveSyncTriggerReason(reason),
)
is TaskLogKey.SyncFailed -> { is TaskLogKey.SyncFailed -> {
val notification = error.toUserNotification().resolve(resolver) val notification = error.toUserNotification().resolve(resolver)
resolver(R.string.task_log_sync_failed, notification) resolver(
R.string.task_log_sync_failed,
resolver.resolveSyncTriggerReason(reason),
notification,
)
} }
} }
private fun UiStringResolver.resolveSyncTriggerReason(reason: StorageSyncTriggerReason): String =
when (reason) {
StorageSyncTriggerReason.Debounce -> this(R.string.task_sync_trigger_debounce)
StorageSyncTriggerReason.SyncTab -> this(R.string.task_sync_trigger_sync_tab)
StorageSyncTriggerReason.Background -> this(R.string.task_sync_trigger_background)
}

View File

@@ -0,0 +1,13 @@
package com.github.nullptroma.wallenc.ui.resources
import java.time.Instant
import java.time.ZoneId
import java.time.format.DateTimeFormatter
private val pipelineTimeFormatter: DateTimeFormatter =
DateTimeFormatter.ofPattern("HH:mm:ss")
fun formatTaskPipelineTime(timestampMs: Long): String =
Instant.ofEpochMilli(timestampMs)
.atZone(ZoneId.systemDefault())
.format(pipelineTimeFormatter)

View File

@@ -37,6 +37,8 @@ fun TaskProgressLabel.resolve(resolver: UiStringResolver): String = when (this)
resolver(R.string.sync_progress_group_entry, groupId, current, total) resolver(R.string.sync_progress_group_entry, groupId, current, total)
is TaskProgressLabel.SyncGroupCompleted -> is TaskProgressLabel.SyncGroupCompleted ->
resolver(R.string.sync_progress_group_completed, groupId) resolver(R.string.sync_progress_group_completed, groupId)
is TaskProgressLabel.SyncGroupEntriesFailed ->
resolver.plurals(R.plurals.sync_progress_group_entries_failed, failedCount, groupId, failedCount)
is TaskProgressLabel.SyncGroupRenewingLocks -> is TaskProgressLabel.SyncGroupRenewingLocks ->
resolver(R.string.sync_progress_group_renewing_locks, groupId) resolver(R.string.sync_progress_group_renewing_locks, groupId)
is TaskProgressLabel.SyncGroupLockRenewalFailed -> is TaskProgressLabel.SyncGroupLockRenewalFailed ->

View File

@@ -8,6 +8,7 @@ import androidx.compose.foundation.layout.WindowInsets
import androidx.compose.foundation.layout.fillMaxSize import androidx.compose.foundation.layout.fillMaxSize
import androidx.compose.foundation.layout.fillMaxWidth import androidx.compose.foundation.layout.fillMaxWidth
import androidx.compose.foundation.layout.padding import androidx.compose.foundation.layout.padding
import androidx.compose.foundation.layout.widthIn
import androidx.compose.foundation.lazy.LazyColumn import androidx.compose.foundation.lazy.LazyColumn
import androidx.compose.foundation.lazy.items import androidx.compose.foundation.lazy.items
import androidx.compose.material3.AlertDialog import androidx.compose.material3.AlertDialog
@@ -36,7 +37,9 @@ import com.github.nullptroma.wallenc.domain.tasks.PipelineTask
import com.github.nullptroma.wallenc.domain.tasks.TaskLogLevel import com.github.nullptroma.wallenc.domain.tasks.TaskLogLevel
import com.github.nullptroma.wallenc.domain.tasks.TaskRunState import com.github.nullptroma.wallenc.domain.tasks.TaskRunState
import com.github.nullptroma.wallenc.ui.R import com.github.nullptroma.wallenc.ui.R
import com.github.nullptroma.wallenc.domain.tasks.TaskLogLine
import com.github.nullptroma.wallenc.ui.resources.displayText import com.github.nullptroma.wallenc.ui.resources.displayText
import com.github.nullptroma.wallenc.ui.resources.formatTaskPipelineTime
import com.github.nullptroma.wallenc.ui.resources.resolveText import com.github.nullptroma.wallenc.ui.resources.resolveText
import com.github.nullptroma.wallenc.ui.resources.toUserNotification import com.github.nullptroma.wallenc.ui.resources.toUserNotification
@@ -91,18 +94,7 @@ fun TaskPipelineScreen(
verticalArrangement = Arrangement.spacedBy(4.dp), verticalArrangement = Arrangement.spacedBy(4.dp),
) { ) {
items(logs.size) { i -> items(logs.size) { i ->
val line = logs[i] PipelineLogRow(line = logs[i])
val prefix = when (line.level) {
TaskLogLevel.Debug -> "D"
TaskLogLevel.Info -> "I"
TaskLogLevel.Warn -> "W"
TaskLogLevel.Error -> "E"
}
Text(
"[$prefix] ${line.displayText()}",
style = MaterialTheme.typography.bodySmall,
color = MaterialTheme.colorScheme.onSurfaceVariant,
)
} }
} }
Button( Button(
@@ -178,14 +170,59 @@ fun TaskPipelineScreen(
} }
} }
@Composable
private fun PipelineLogRow(line: TaskLogLine) {
val prefix = when (line.level) {
TaskLogLevel.Debug -> "D"
TaskLogLevel.Info -> "I"
TaskLogLevel.Warn -> "W"
TaskLogLevel.Error -> "E"
}
Row(
modifier = Modifier.fillMaxWidth(),
horizontalArrangement = Arrangement.SpaceBetween,
verticalAlignment = Alignment.Top,
) {
Text(
text = "[$prefix] ${line.displayText()}",
modifier = Modifier
.weight(1f)
.padding(end = 8.dp),
style = MaterialTheme.typography.bodySmall,
color = MaterialTheme.colorScheme.onSurfaceVariant,
)
Text(
text = formatTaskPipelineTime(line.timestampMs),
modifier = Modifier.widthIn(min = 56.dp),
style = MaterialTheme.typography.bodySmall,
color = MaterialTheme.colorScheme.onSurfaceVariant,
)
}
}
@Composable @Composable
private fun TaskRow(task: PipelineTask, isRunning: Boolean) { private fun TaskRow(task: PipelineTask, isRunning: Boolean) {
Column(Modifier.fillMaxWidth()) { Column(Modifier.fillMaxWidth()) {
Text( Row(
task.title, modifier = Modifier.fillMaxWidth(),
style = if (isRunning) MaterialTheme.typography.titleSmall horizontalArrangement = Arrangement.SpaceBetween,
else MaterialTheme.typography.bodyMedium, verticalAlignment = Alignment.CenterVertically,
) ) {
Text(
text = task.title,
modifier = Modifier
.weight(1f)
.padding(end = 8.dp),
style = if (isRunning) MaterialTheme.typography.titleSmall
else MaterialTheme.typography.bodyMedium,
)
Text(
text = formatTaskPipelineTime(task.enqueuedAtMs),
modifier = Modifier.widthIn(min = 56.dp),
style = MaterialTheme.typography.bodySmall,
color = MaterialTheme.colorScheme.onSurfaceVariant,
)
}
val runningProgress = (task.state as? TaskRunState.Running)?.progress val runningProgress = (task.state as? TaskRunState.Running)?.progress
val progressLabel = runningProgress?.label?.resolveText() val progressLabel = runningProgress?.label?.resolveText()
val stateLabel = when (val s = task.state) { val stateLabel = when (val s = task.state) {

View File

@@ -13,6 +13,7 @@ import com.github.nullptroma.wallenc.ui.resources.resolve
import com.github.nullptroma.wallenc.ui.resources.UserNotification import com.github.nullptroma.wallenc.ui.resources.UserNotification
import com.github.nullptroma.wallenc.usecases.AddStorageToSyncGroupResult import com.github.nullptroma.wallenc.usecases.AddStorageToSyncGroupResult
import com.github.nullptroma.wallenc.usecases.ManageStorageSyncGroupsUseCase import com.github.nullptroma.wallenc.usecases.ManageStorageSyncGroupsUseCase
import com.github.nullptroma.wallenc.domain.tasks.StorageSyncTriggerReason
import com.github.nullptroma.wallenc.usecases.RunStorageSyncUseCase import com.github.nullptroma.wallenc.usecases.RunStorageSyncUseCase
import com.github.nullptroma.wallenc.usecases.StorageSyncCompatibilityInput import com.github.nullptroma.wallenc.usecases.StorageSyncCompatibilityInput
import com.github.nullptroma.wallenc.usecases.isStorageCompatibleWithGroup import com.github.nullptroma.wallenc.usecases.isStorageCompatibleWithGroup
@@ -209,7 +210,7 @@ class StorageSyncViewModel @Inject constructor(
fun runSyncNow() { fun runSyncNow() {
val started = runStorageSyncUseCase.enqueue( val started = runStorageSyncUseCase.enqueue(
displayTitle = uiStrings(R.string.task_title_storage_sync), displayTitle = uiStrings(R.string.task_title_storage_sync),
logReason = "sync-tab", reason = StorageSyncTriggerReason.SyncTab,
) )
if (!started) { if (!started) {
updateState( updateState(
@@ -383,7 +384,6 @@ class StorageSyncViewModel @Inject constructor(
!isStorageCompatibleWithGroup( !isStorageCompatibleWithGroup(
storage = storage, storage = storage,
group = group, group = group,
resolveStorageKey = vaultsManager.unlockManager::getOpenedStorageKey,
) )
} }
StorageSyncGroupUi( StorageSyncGroupUi(

View File

@@ -12,4 +12,10 @@
<item quantity="many">Синхронизация: группа «%1$s» — обработка %2$d записей</item> <item quantity="many">Синхронизация: группа «%1$s» — обработка %2$d записей</item>
<item quantity="other">Синхронизация: группа «%1$s» — обработка %2$d записей</item> <item quantity="other">Синхронизация: группа «%1$s» — обработка %2$d записей</item>
</plurals> </plurals>
<plurals name="sync_progress_group_entries_failed">
<item quantity="one">Синхронизация: группа «%1$s» — не применена %2$d запись</item>
<item quantity="few">Синхронизация: группа «%1$s» — не применены %2$d записи</item>
<item quantity="many">Синхронизация: группа «%1$s» — не применено %2$d записей</item>
<item quantity="other">Синхронизация: группа «%1$s» — не применено %2$d записей</item>
</plurals>
</resources> </resources>

View File

@@ -289,9 +289,12 @@
<string name="sync_progress_group_renewing_locks">Синхронизация: группа «%1$s» — продление блокировок</string> <string name="sync_progress_group_renewing_locks">Синхронизация: группа «%1$s» — продление блокировок</string>
<string name="sync_progress_group_lock_renewal_failed">Синхронизация: группа «%1$s» — не удалось продлить блокировку</string> <string name="sync_progress_group_lock_renewal_failed">Синхронизация: группа «%1$s» — не удалось продлить блокировку</string>
<string name="task_progress_clear_content">%1$d / %2$d</string> <string name="task_progress_clear_content">%1$d / %2$d</string>
<string name="task_log_sync_started">Синхронизация хранилищ запущена</string> <string name="task_log_sync_started">Синхронизация хранилищ запущена (%1$s)</string>
<string name="task_log_sync_finished">Синхронизация хранилищ завершена</string> <string name="task_log_sync_finished">Синхронизация хранилищ завершена (%1$s)</string>
<string name="task_log_sync_failed">Синхронизация не удалась: %1$s</string> <string name="task_log_sync_failed">Синхронизация не удалась (%1$s): %2$s</string>
<string name="task_sync_trigger_debounce">debounce</string>
<string name="task_sync_trigger_sync_tab">sync-tab</string>
<string name="task_sync_trigger_background">background</string>
<string name="task_log_enumerating">Перечисление файлов и папок…</string> <string name="task_log_enumerating">Перечисление файлов и папок…</string>
<string name="task_log_creating_storage">Создание хранилища…</string> <string name="task_log_creating_storage">Создание хранилища…</string>
<string name="task_log_storage_created">Хранилище создано</string> <string name="task_log_storage_created">Хранилище создано</string>

View File

@@ -8,4 +8,8 @@
<item quantity="one">Storage sync: group "%1$s" processing %2$d entry</item> <item quantity="one">Storage sync: group "%1$s" processing %2$d entry</item>
<item quantity="other">Storage sync: group "%1$s" processing %2$d entries</item> <item quantity="other">Storage sync: group "%1$s" processing %2$d entries</item>
</plurals> </plurals>
<plurals name="sync_progress_group_entries_failed">
<item quantity="one">Storage sync: group "%1$s" — %2$d entry failed</item>
<item quantity="other">Storage sync: group "%1$s" — %2$d entries failed</item>
</plurals>
</resources> </resources>

View File

@@ -289,9 +289,12 @@
<string name="sync_progress_group_renewing_locks">Storage sync: group "%1$s" renewing locks</string> <string name="sync_progress_group_renewing_locks">Storage sync: group "%1$s" renewing locks</string>
<string name="sync_progress_group_lock_renewal_failed">Storage sync: group "%1$s" lock renewal failed</string> <string name="sync_progress_group_lock_renewal_failed">Storage sync: group "%1$s" lock renewal failed</string>
<string name="task_progress_clear_content">%1$d / %2$d</string> <string name="task_progress_clear_content">%1$d / %2$d</string>
<string name="task_log_sync_started">Storage sync started</string> <string name="task_log_sync_started">Storage sync started (%1$s)</string>
<string name="task_log_sync_finished">Storage sync finished</string> <string name="task_log_sync_finished">Storage sync finished (%1$s)</string>
<string name="task_log_sync_failed">Storage sync failed: %1$s</string> <string name="task_log_sync_failed">Storage sync failed (%1$s): %2$s</string>
<string name="task_sync_trigger_debounce">debounce</string>
<string name="task_sync_trigger_sync_tab">sync-tab</string>
<string name="task_sync_trigger_background">background</string>
<string name="task_log_enumerating">Enumerating files and directories…</string> <string name="task_log_enumerating">Enumerating files and directories…</string>
<string name="task_log_creating_storage">Creating storage…</string> <string name="task_log_creating_storage">Creating storage…</string>
<string name="task_log_storage_created">Storage created</string> <string name="task_log_storage_created">Storage created</string>

View File

@@ -3,6 +3,7 @@ package com.github.nullptroma.wallenc.usecases
import com.github.nullptroma.wallenc.domain.errors.toWallencException import com.github.nullptroma.wallenc.domain.errors.toWallencException
import com.github.nullptroma.wallenc.domain.interfaces.IStorageSyncEngine import com.github.nullptroma.wallenc.domain.interfaces.IStorageSyncEngine
import com.github.nullptroma.wallenc.domain.tasks.ITaskOrchestrator import com.github.nullptroma.wallenc.domain.tasks.ITaskOrchestrator
import com.github.nullptroma.wallenc.domain.tasks.StorageSyncTriggerReason
import com.github.nullptroma.wallenc.domain.tasks.TaskId import com.github.nullptroma.wallenc.domain.tasks.TaskId
import com.github.nullptroma.wallenc.domain.tasks.TaskLogKey import com.github.nullptroma.wallenc.domain.tasks.TaskLogKey
import com.github.nullptroma.wallenc.domain.tasks.TaskLogLevel import com.github.nullptroma.wallenc.domain.tasks.TaskLogLevel
@@ -31,10 +32,10 @@ class RunStorageSyncUseCase @Inject constructor(
/** /**
* @param displayTitle заголовок задачи в UI (локализованный на стороне вызова) * @param displayTitle заголовок задачи в UI (локализованный на стороне вызова)
* @param logReason техническая метка для логов (не для UI) * @param reason источник запуска — попадает в лог пайплайна
* @return false, если синхронизация уже в очереди или выполняется — новая задача не создана * @return false, если синхронизация уже в очереди или выполняется — новая задача не создана
*/ */
fun enqueue(displayTitle: String, logReason: String): Boolean { fun enqueue(displayTitle: String, reason: StorageSyncTriggerReason): Boolean {
if (!running.compareAndSet(false, true)) { if (!running.compareAndSet(false, true)) {
return false return false
} }
@@ -45,36 +46,68 @@ class RunStorageSyncUseCase @Inject constructor(
dispatcher = Dispatchers.IO, dispatcher = Dispatchers.IO,
work = { ctx -> work = { ctx ->
try { try {
ctx.log(TaskLogLevel.Info, TaskLogKey.SyncStarted) executeSync(
ctx.reportProgress(null, TaskProgressLabel.SyncStarted) reason = reason,
syncEngine.syncAllGroups { fraction, label -> reportProgress = { fraction, label ->
ctx.reportProgress(fraction, label) ctx.reportProgress(fraction, label)
} },
ctx.log(TaskLogLevel.Info, TaskLogKey.SyncFinished) log = { level, key -> ctx.log(level, key) },
ctx.reportProgress(null, TaskProgressLabel.SyncCompleted) )
} catch (e: Exception) { } catch (e: Exception) {
val err = e.toWallencException() ctx.fail(e.toWallencException())
ctx.log(TaskLogLevel.Error, TaskLogKey.SyncFailed(err))
ctx.fail(err)
} finally { } finally {
running.set(false) clearRunningState()
_syncRunning.value = false
_activeSyncTaskId.value = null
} }
}, },
) )
_activeSyncTaskId.value = taskId _activeSyncTaskId.value = taskId
return true return true
} catch (t: Throwable) { } catch (t: Throwable) {
running.set(false) clearRunningState()
_syncRunning.value = false
_activeSyncTaskId.value = null
throw t throw t
} }
} }
suspend fun runBlocking() { suspend fun runBlocking(reason: StorageSyncTriggerReason) {
if (!running.compareAndSet(false, true)) {
return
}
_syncRunning.value = true
try {
executeSync(
reason = reason,
reportProgress = { _, _ -> },
log = { level, key -> orchestrator.appendPipelineLog(level, key) },
)
} finally {
clearRunningState()
}
}
private suspend fun executeSync(
reason: StorageSyncTriggerReason,
reportProgress: suspend (fraction: Float?, label: TaskProgressLabel?) -> Unit,
log: (TaskLogLevel, TaskLogKey) -> Unit,
) {
syncReadiness.awaitReady() syncReadiness.awaitReady()
syncEngine.syncAllGroups() log(TaskLogLevel.Info, TaskLogKey.SyncStarted(reason))
reportProgress(null, TaskProgressLabel.SyncStarted)
try {
syncEngine.syncAllGroups { fraction, label ->
reportProgress(fraction, label)
}
log(TaskLogLevel.Info, TaskLogKey.SyncFinished(reason))
reportProgress(null, TaskProgressLabel.SyncCompleted)
} catch (e: Exception) {
val err = e.toWallencException()
log(TaskLogLevel.Error, TaskLogKey.SyncFailed(err, reason))
throw e
}
}
private fun clearRunningState() {
running.set(false)
_syncRunning.value = false
_activeSyncTaskId.value = null
} }
} }

View File

@@ -1,17 +1,14 @@
package com.github.nullptroma.wallenc.usecases package com.github.nullptroma.wallenc.usecases
import com.github.nullptroma.wallenc.domain.datatypes.EncryptKey
import com.github.nullptroma.wallenc.domain.interfaces.IStorage import com.github.nullptroma.wallenc.domain.interfaces.IStorage
import com.github.nullptroma.wallenc.domain.interfaces.StorageSyncGroup import com.github.nullptroma.wallenc.domain.interfaces.StorageSyncGroup
import com.github.nullptroma.wallenc.domain.interfaces.StorageSyncGroupEncryptionKind import com.github.nullptroma.wallenc.domain.interfaces.StorageSyncGroupEncryptionKind
import java.util.UUID
/** Совместим, если у storage нет активного шифрования ([encInfo] == null). */
fun isStorageCompatibleWithGroup( fun isStorageCompatibleWithGroup(
storage: IStorage, storage: IStorage,
group: StorageSyncGroup, group: StorageSyncGroup,
resolveStorageKey: (UUID) -> EncryptKey?,
): Boolean { ): Boolean {
// Режим упрощён: в sync-группах допускаются только незашифрованные storage.
if (storage.metaInfo.value.encInfo != null) { if (storage.metaInfo.value.encInfo != null) {
return false return false
} }

View File

@@ -1,7 +1,10 @@
package com.github.nullptroma.wallenc.usecases package com.github.nullptroma.wallenc.usecases
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournal
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournalEntry import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournalEntry
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournalMerge
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncOperation import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncOperation
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncPaths
import com.github.nullptroma.wallenc.domain.interfaces.IStorage import com.github.nullptroma.wallenc.domain.interfaces.IStorage
import com.github.nullptroma.wallenc.domain.interfaces.IStorageAccessor import com.github.nullptroma.wallenc.domain.interfaces.IStorageAccessor
import com.github.nullptroma.wallenc.domain.interfaces.IStorageSyncEngine import com.github.nullptroma.wallenc.domain.interfaces.IStorageSyncEngine
@@ -19,6 +22,11 @@ import java.util.concurrent.atomic.AtomicLong
import javax.inject.Inject import javax.inject.Inject
import javax.inject.Singleton import javax.inject.Singleton
/**
* Синхронизация по журналам storage в группе.
* Блокировка на Yandex Disk — best-effort (см. [IStorageAccessor.tryAcquireSyncLock]);
* сериализация внутри процесса — [groupMutexes].
*/
@Singleton @Singleton
class StorageSyncEngine @Inject constructor( class StorageSyncEngine @Inject constructor(
private val vaultsManager: IVaultsManager, private val vaultsManager: IVaultsManager,
@@ -81,7 +89,6 @@ class StorageSyncEngine @Inject constructor(
isStorageCompatibleWithGroup( isStorageCompatibleWithGroup(
storage = storage, storage = storage,
group = group, group = group,
resolveStorageKey = vaultsManager.unlockManager::getOpenedStorageKey,
) )
} }
if (incompatible.isNotEmpty()) { if (incompatible.isNotEmpty()) {
@@ -109,8 +116,8 @@ class StorageSyncEngine @Inject constructor(
lockedAccessors.add(storage.accessor) lockedAccessors.add(storage.accessor)
} }
val latestByPath = mutableMapOf<String, StorageSyncJournalEntry>() val mergedByPath = mutableMapOf<String, StorageSyncJournalEntry>()
val entriesByStorage = mutableMapOf<UUID, Map<String, StorageSyncJournalEntry>>() val entriesByStorage = mutableMapOf<UUID, StorageSyncJournal>()
reportProgress(null, TaskProgressLabel.SyncGroupReadingJournals(groupId)) reportProgress(null, TaskProgressLabel.SyncGroupReadingJournals(groupId))
for ((journalIndex, storage) in storages.withIndex()) { for ((journalIndex, storage) in storages.withIndex()) {
@@ -128,17 +135,14 @@ class StorageSyncEngine @Inject constructor(
null, null,
TaskProgressLabel.SyncGroupJournalProgress(groupId, journalIndex + 1, storages.size), TaskProgressLabel.SyncGroupJournalProgress(groupId, journalIndex + 1, storages.size),
) )
val latestEntries = latestByPath(storage.accessor.readSyncJournal()) val journal = filterSyncableJournal(storage.accessor.readSyncJournal())
entriesByStorage[storage.uuid] = latestEntries entriesByStorage[storage.uuid] = journal
for ((path, entry) in latestEntries) { mergedByPath.putAll(
val current = latestByPath[path] StorageSyncJournalMerge.merge(mergedByPath, journal),
if (current == null || compareEntries(entry, current) > 0) { )
latestByPath[path] = entry
}
}
} }
val mergedEntries = latestByPath.entries.toList() val mergedEntries = mergedByPath.entries.toList()
if (mergedEntries.isEmpty()) { if (mergedEntries.isEmpty()) {
reportProgress(null, TaskProgressLabel.SyncGroupNoJournalEntries(groupId)) reportProgress(null, TaskProgressLabel.SyncGroupNoJournalEntries(groupId))
return return
@@ -148,6 +152,7 @@ class StorageSyncEngine @Inject constructor(
null, null,
TaskProgressLabel.SyncGroupProcessingEntries(groupId, mergedEntries.size), TaskProgressLabel.SyncGroupProcessingEntries(groupId, mergedEntries.size),
) )
var applyFailures = 0
for ((pathIndex, merged) in mergedEntries.withIndex()) { for ((pathIndex, merged) in mergedEntries.withIndex()) {
leaseUntil = renewLocksIfNeeded( leaseUntil = renewLocksIfNeeded(
groupId = groupId, groupId = groupId,
@@ -166,7 +171,9 @@ class StorageSyncEngine @Inject constructor(
return return
} }
val sourceStorage = findSourceStorage(storages, entriesByStorage, path, winnerEntry) val sourceStorage = findSourceStorage(storages, entriesByStorage, path, winnerEntry)
if (sourceStorage == null && winnerEntry.operation == StorageSyncOperation.UPSERT) { if (sourceStorage == null &&
winnerEntry.operation == StorageSyncOperation.UPSERT
) {
continue continue
} }
@@ -184,10 +191,18 @@ class StorageSyncEngine @Inject constructor(
entry = winnerEntry, entry = winnerEntry,
) )
if (applied) { if (applied) {
target.accessor.appendSyncJournal(listOf(winnerEntry)) target.accessor.putSyncJournalEntries(mapOf(path to winnerEntry))
} else {
applyFailures++
} }
} }
} }
if (applyFailures > 0) {
reportProgress(
null,
TaskProgressLabel.SyncGroupEntriesFailed(groupId, applyFailures),
)
}
reportProgress(null, TaskProgressLabel.SyncGroupCompleted(groupId)) reportProgress(null, TaskProgressLabel.SyncGroupCompleted(groupId))
} finally { } finally {
for (accessor in lockedAccessors) { for (accessor in lockedAccessors) {
@@ -199,6 +214,10 @@ class StorageSyncEngine @Inject constructor(
} }
} }
private fun filterSyncableJournal(journal: StorageSyncJournal): StorageSyncJournal {
return journal.filterKeys { StorageSyncPaths.isSyncableUserPath(it) }
}
private suspend fun renewLocksIfNeeded( private suspend fun renewLocksIfNeeded(
groupId: String, groupId: String,
lockedAccessors: List<IStorageAccessor>, lockedAccessors: List<IStorageAccessor>,
@@ -227,25 +246,18 @@ class StorageSyncEngine @Inject constructor(
return uuids.mapNotNull(findStorageUseCase::find) return uuids.mapNotNull(findStorageUseCase::find)
} }
private fun latestByPath(entries: List<StorageSyncJournalEntry>): Map<String, StorageSyncJournalEntry> {
val map = mutableMapOf<String, StorageSyncJournalEntry>()
for (entry in entries) {
val current = map[entry.path]
if (current == null || compareEntries(entry, current) > 0) {
map[entry.path] = entry
}
}
return map
}
private fun findSourceStorage( private fun findSourceStorage(
storages: List<IStorage>, storages: List<IStorage>,
entriesByStorage: Map<UUID, Map<String, StorageSyncJournalEntry>>, entriesByStorage: Map<UUID, StorageSyncJournal>,
path: String, path: String,
winnerEntry: StorageSyncJournalEntry, winnerEntry: StorageSyncJournalEntry,
): IStorage? { ): IStorage? {
if (winnerEntry.operation == StorageSyncOperation.DELETE) { if (winnerEntry.operation == StorageSyncOperation.DELETE ||
return storages.firstOrNull() winnerEntry.operation == StorageSyncOperation.TRASH
) {
return storages.firstOrNull { storage ->
entriesByStorage[storage.uuid]?.get(path) != null
} ?: storages.firstOrNull()
} }
return storages.firstOrNull { storage -> return storages.firstOrNull { storage ->
val entry = entriesByStorage[storage.uuid]?.get(path) ?: return@firstOrNull false val entry = entriesByStorage[storage.uuid]?.get(path) ?: return@firstOrNull false
@@ -258,24 +270,37 @@ class StorageSyncEngine @Inject constructor(
target: IStorage, target: IStorage,
entry: StorageSyncJournalEntry, entry: StorageSyncJournalEntry,
): Boolean { ): Boolean {
when (entry.operation) { val result = when (entry.operation) {
StorageSyncOperation.DELETE -> { StorageSyncOperation.DELETE -> {
return runCatching { runCatching {
target.accessor.delete(entry.path) target.accessor.delete(entry.path)
}.isSuccess }
}
StorageSyncOperation.TRASH -> {
runCatching {
target.accessor.moveToTrash(entry.path)
}
} }
StorageSyncOperation.UPSERT -> { StorageSyncOperation.UPSERT -> {
val sourceAccessor = source?.accessor ?: return false val sourceAccessor = source?.accessor ?: return false
return runCatching { runCatching {
sourceAccessor.openRead(entry.path).use { input -> sourceAccessor.openRead(entry.path).use { input ->
target.accessor.openWrite(entry.path).use { output -> target.accessor.openWrite(entry.path, recordSyncJournal = false).use { output ->
input.copyTo(output) input.copyTo(output)
} }
} }
}.isSuccess }
} }
} }
result.exceptionOrNull()?.let { error ->
System.err.println(
"StorageSyncEngine: apply ${entry.operation} ${entry.path} " +
"target=${target.uuid}: ${error.message}",
)
}
return result.isSuccess
} }
private fun compareEntries(a: StorageSyncJournalEntry, b: StorageSyncJournalEntry): Int { private fun compareEntries(a: StorageSyncJournalEntry, b: StorageSyncJournalEntry): Int {

View File

@@ -0,0 +1,41 @@
package com.github.nullptroma.wallenc.usecases
import com.github.nullptroma.wallenc.domain.datatypes.StorageEncryptionInfo
import com.github.nullptroma.wallenc.domain.interfaces.StorageSyncGroup
import com.github.nullptroma.wallenc.domain.interfaces.StorageSyncGroupEncryptionKind
import com.github.nullptroma.wallenc.usecases.fakes.FakeMetaInfo
import com.github.nullptroma.wallenc.usecases.fakes.FakeStorage
import org.junit.Assert.assertFalse
import org.junit.Assert.assertTrue
import org.junit.Test
import java.util.UUID
class StorageSyncEncryptionCompatTest {
@Test
fun storageWithoutEncInfoIsCompatible() {
val storage = FakeStorage(uuid = UUID.randomUUID(), meta = FakeMetaInfo(encInfo = null))
val group = StorageSyncGroup(
id = "g1",
storageUuids = setOf(storage.uuid),
encryptionKind = StorageSyncGroupEncryptionKind.NONE,
)
assertTrue(isStorageCompatibleWithGroup(storage = storage, group = group))
}
@Test
fun storageWithEncInfoIsIncompatible() {
val storage = FakeStorage(
uuid = UUID.randomUUID(),
meta = FakeMetaInfo(
encInfo = StorageEncryptionInfo(encryptedTestData = "x", pathIv = ByteArray(16)),
),
)
val group = StorageSyncGroup(
id = "g1",
storageUuids = setOf(storage.uuid),
encryptionKind = StorageSyncGroupEncryptionKind.NONE,
)
assertFalse(isStorageCompatibleWithGroup(storage = storage, group = group))
}
}

View File

@@ -7,6 +7,7 @@ import com.github.nullptroma.wallenc.domain.interfaces.StorageSyncGroup
import com.github.nullptroma.wallenc.domain.interfaces.StorageSyncGroupEncryptionKind import com.github.nullptroma.wallenc.domain.interfaces.StorageSyncGroupEncryptionKind
import com.github.nullptroma.wallenc.domain.tasks.TaskProgressLabel import com.github.nullptroma.wallenc.domain.tasks.TaskProgressLabel
import com.github.nullptroma.wallenc.usecases.fakes.FakeStorage import com.github.nullptroma.wallenc.usecases.fakes.FakeStorage
import com.github.nullptroma.wallenc.usecases.fakes.FakeStorageAccessor
import com.github.nullptroma.wallenc.usecases.fakes.FakeStorageSyncGroupStore import com.github.nullptroma.wallenc.usecases.fakes.FakeStorageSyncGroupStore
import com.github.nullptroma.wallenc.usecases.fakes.FakeVaultsManager import com.github.nullptroma.wallenc.usecases.fakes.FakeVaultsManager
import kotlinx.coroutines.runBlocking import kotlinx.coroutines.runBlocking
@@ -112,6 +113,41 @@ class StorageSyncEngineTest {
assertNull(target.fileBytes(path)) assertNull(target.fileBytes(path))
} }
@Test
fun syncGroupTrashSoftDeletesOnTarget() = runBlocking {
val source = FakeStorage()
val target = FakeStorage()
val path = "trashed/doc.txt"
val payload = "keep-in-trash".encodeToByteArray()
source.putFile(path, payload)
target.putFile(path, payload)
val entry = StorageSyncJournalEntry(
path = path,
operation = StorageSyncOperation.TRASH,
revision = StorageSyncRevision(
sequence = 3L,
actorId = "actor-trash",
createdAt = Instant.parse("2024-07-01T00:00:00Z"),
),
)
source.addSyncJournalEntry(entry)
val group = StorageSyncGroup(
id = "trash-group",
storageUuids = setOf(source.uuid, target.uuid),
encryptionKind = StorageSyncGroupEncryptionKind.NONE,
)
val engine = createEngine(
storages = listOf(source, target),
groups = listOf(group),
)
engine.syncGroup(group.id) { _, _ -> }
assertArrayEquals(payload, target.fileBytes(path))
assertTrue(path in (target.accessor as FakeStorageAccessor).trashedPaths)
}
@Test @Test
fun syncGroupStopsWhenLockCannotBeAcquired() = runBlocking { fun syncGroupStopsWhenLockCannotBeAcquired() = runBlocking {
val first = FakeStorage() val first = FakeStorage()

View File

@@ -0,0 +1,45 @@
package com.github.nullptroma.wallenc.usecases
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournalEntry
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournalMerge
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncOperation
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncPaths
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncRevision
import org.junit.Assert.assertEquals
import org.junit.Assert.assertFalse
import org.junit.Assert.assertTrue
import org.junit.Test
import java.time.Instant
class StorageSyncJournalMergeTest {
@Test
fun mergeKeepsSingleEntryPerPath() {
val older = entry(path = "/a.txt", sequence = 1L)
val newer = entry(path = "/a.txt", sequence = 2L)
val merged = StorageSyncJournalMerge.merge(
mapOf("/a.txt" to older),
mapOf("/a.txt" to newer),
)
assertEquals(1, merged.size)
assertEquals(2L, merged["/a.txt"]?.revision?.sequence)
}
@Test
fun isSyncableUserPathExcludesEncDirAndJournal() {
assertFalse(StorageSyncPaths.isSyncableUserPath("/88a048ed-enc-dir/sync-journal.json"))
assertFalse(StorageSyncPaths.isSyncableUserPath("/wallenc-yandex-system/sync-journal.json"))
assertTrue(StorageSyncPaths.isSyncableUserPath("/wallenc-data/text-secrets.json"))
}
private fun entry(path: String, sequence: Long): StorageSyncJournalEntry =
StorageSyncJournalEntry(
path = path,
operation = StorageSyncOperation.UPSERT,
revision = StorageSyncRevision(
sequence = sequence,
actorId = "actor",
createdAt = Instant.EPOCH,
),
)
}

View File

@@ -3,6 +3,7 @@ package com.github.nullptroma.wallenc.usecases.fakes
import com.github.nullptroma.wallenc.domain.datatypes.StorageEncryptionInfo import com.github.nullptroma.wallenc.domain.datatypes.StorageEncryptionInfo
import com.github.nullptroma.wallenc.domain.datatypes.StorageMetaLoadState import com.github.nullptroma.wallenc.domain.datatypes.StorageMetaLoadState
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournalEntry import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournalEntry
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournalMerge
import com.github.nullptroma.wallenc.domain.interfaces.IStorage import com.github.nullptroma.wallenc.domain.interfaces.IStorage
import com.github.nullptroma.wallenc.domain.interfaces.IStorageAccessor import com.github.nullptroma.wallenc.domain.interfaces.IStorageAccessor
import com.github.nullptroma.wallenc.domain.interfaces.IStorageMetaInfo import com.github.nullptroma.wallenc.domain.interfaces.IStorageMetaInfo
@@ -46,10 +47,13 @@ class FakeStorage(
fun fileBytes(path: String): ByteArray? = accessorImpl.dataFiles[path] fun fileBytes(path: String): ByteArray? = accessorImpl.dataFiles[path]
fun addSyncJournalEntry(entry: StorageSyncJournalEntry) { fun addSyncJournalEntry(entry: StorageSyncJournalEntry) {
accessorImpl.syncJournal.add(entry) accessorImpl.syncJournal = StorageSyncJournalMerge.merge(
accessorImpl.syncJournal,
mapOf(entry.path to entry),
)
} }
fun syncJournalEntries(): List<StorageSyncJournalEntry> = accessorImpl.syncJournal.toList() fun syncJournalEntries(): List<StorageSyncJournalEntry> = accessorImpl.syncJournal.values.toList()
fun setAcquireLockResult(result: Boolean) { fun setAcquireLockResult(result: Boolean) {
accessorImpl.acquireLockResult = result accessorImpl.acquireLockResult = result

View File

@@ -1,7 +1,9 @@
package com.github.nullptroma.wallenc.usecases.fakes package com.github.nullptroma.wallenc.usecases.fakes
import com.github.nullptroma.wallenc.domain.datatypes.DataPage import com.github.nullptroma.wallenc.domain.datatypes.DataPage
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournal
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournalEntry import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournalEntry
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournalMerge
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncLock import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncLock
import com.github.nullptroma.wallenc.domain.interfaces.IDirectory import com.github.nullptroma.wallenc.domain.interfaces.IDirectory
import com.github.nullptroma.wallenc.domain.interfaces.IFile import com.github.nullptroma.wallenc.domain.interfaces.IFile
@@ -21,10 +23,11 @@ import java.time.Instant
class FakeStorageAccessor : IStorageAccessor { class FakeStorageAccessor : IStorageAccessor {
val dataFiles: MutableMap<String, ByteArray> = mutableMapOf() val dataFiles: MutableMap<String, ByteArray> = mutableMapOf()
val trashedPaths: MutableSet<String> = mutableSetOf()
private val systemFiles: MutableMap<String, ByteArray> = mutableMapOf() private val systemFiles: MutableMap<String, ByteArray> = mutableMapOf()
private val _filesUpdates = MutableSharedFlow<DataPage<IFile>>(extraBufferCapacity = 16) private val _filesUpdates = MutableSharedFlow<DataPage<IFile>>(extraBufferCapacity = 16)
var syncJournal: MutableList<StorageSyncJournalEntry> = mutableListOf() var syncJournal: StorageSyncJournal = emptyMap()
var syncLock: StorageSyncLock? = null var syncLock: StorageSyncLock? = null
var acquireLockResult: Boolean = true var acquireLockResult: Boolean = true
@@ -64,7 +67,7 @@ class FakeStorageAccessor : IStorageAccessor {
dataFiles.remove(path) dataFiles.remove(path)
} }
override suspend fun openWrite(path: String): OutputStream { override suspend fun openWrite(path: String, recordSyncJournal: Boolean): OutputStream {
return object : ByteArrayOutputStream() { return object : ByteArrayOutputStream() {
override fun close() { override fun close() {
dataFiles[path] = toByteArray() dataFiles[path] = toByteArray()
@@ -84,7 +87,11 @@ class FakeStorageAccessor : IStorageAccessor {
return ByteArrayInputStream(bytes) return ByteArrayInputStream(bytes)
} }
override suspend fun moveToTrash(path: String) = Unit override suspend fun moveToTrash(path: String) {
if (path in dataFiles) {
trashedPaths.add(path)
}
}
override suspend fun openReadSystemFile(name: String): InputStream { override suspend fun openReadSystemFile(name: String): InputStream {
val bytes = systemFiles[name] ?: ByteArray(0) val bytes = systemFiles[name] ?: ByteArray(0)
@@ -99,15 +106,10 @@ class FakeStorageAccessor : IStorageAccessor {
} }
} }
override suspend fun readSyncJournal(): List<StorageSyncJournalEntry> = syncJournal.toList() override suspend fun readSyncJournal(): StorageSyncJournal = syncJournal
override suspend fun appendSyncJournal(entries: List<StorageSyncJournalEntry>) { override suspend fun putSyncJournalEntries(entries: StorageSyncJournal) {
syncJournal.addAll(entries) syncJournal = StorageSyncJournalMerge.merge(syncJournal, entries)
}
override suspend fun rewriteSyncJournal(entries: List<StorageSyncJournalEntry>) {
syncJournal.clear()
syncJournal.addAll(entries)
} }
override suspend fun readSyncLock(): StorageSyncLock? = syncLock override suspend fun readSyncLock(): StorageSyncLock? = syncLock