From 51e6f40587152a011222198974475a2a29da1a72 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9F=D1=8B=D1=82=D0=BA=D0=BE=D0=B2=20=D0=A0=D0=BE=D0=BC?= =?UTF-8?q?=D0=B0=D0=BD?= Date: Thu, 21 May 2026 18:46:03 +0300 Subject: [PATCH] =?UTF-8?q?fix(sync):=20=D1=81=D1=82=D0=B0=D0=B1=D0=B8?= =?UTF-8?q?=D0=BB=D0=B8=D0=B7=D0=B8=D1=80=D0=BE=D0=B2=D0=B0=D0=BB=20=D1=81?= =?UTF-8?q?=D0=B8=D0=BD=D1=85=D1=80=D0=BE=D0=BD=D0=B8=D0=B7=D0=B0=D1=86?= =?UTF-8?q?=D0=B8=D1=8E,=20Yandex=20I/O=20=D0=B8=20=D0=B2=D1=91=D1=80?= =?UTF-8?q?=D1=81=D1=82=D0=BA=D1=83=20=D0=BA=D0=B0=D1=80=D1=82=D0=BE=D1=87?= =?UTF-8?q?=D0=BA=D0=B8=20storage?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Добавил TRASH вместо DELETE для moveToTrash, компакцию журналов и отчёт об ошибках apply. Исправил проброс ошибок upload Yandex при close, CAS lock и загрузку OAuth-токена. Упростил совместимость sync-групп (только encInfo), поправил растягивание StorageTree при недоступных meta. --- .../wallenc/app/sync/StorageSyncBootstrap.kt | 1 + .../yandexdisk/YandexDiskApiFactory.kt | 24 ++--- .../encrypt/EncryptedStorageAccessor.kt | 4 +- .../storages/local/LocalStorageAccessor.kt | 2 +- .../storages/yandex/YandexStorageAccessor.kt | 100 ++++++++++++------ .../domain/vault/utils/CloseHandledStream.kt | 12 ++- .../domain/datatypes/StorageSyncModels.kt | 3 + .../wallenc/domain/tasks/TaskProgressLabel.kt | 1 + .../wallenc/ui/elements/StorageTree.kt | 47 ++++---- .../ui/resources/TaskProgressLabels.kt | 2 + .../ui/screens/sync/StorageSyncViewModel.kt | 1 - ui/src/main/res/values-ru/plurals.xml | 6 ++ ui/src/main/res/values/plurals.xml | 4 + .../usecases/StorageSyncEncryptionCompat.kt | 5 +- .../wallenc/usecases/StorageSyncEngine.kt | 61 +++++++++-- .../StorageSyncEncryptionCompatTest.kt | 41 +++++++ .../wallenc/usecases/StorageSyncEngineTest.kt | 36 +++++++ .../usecases/fakes/FakeStorageAccessor.kt | 7 +- 18 files changed, 268 insertions(+), 89 deletions(-) create mode 100644 usecases/src/test/java/com/github/nullptroma/wallenc/usecases/StorageSyncEncryptionCompatTest.kt diff --git a/app/src/main/java/com/github/nullptroma/wallenc/app/sync/StorageSyncBootstrap.kt b/app/src/main/java/com/github/nullptroma/wallenc/app/sync/StorageSyncBootstrap.kt index 5b14930..36958fd 100644 --- a/app/src/main/java/com/github/nullptroma/wallenc/app/sync/StorageSyncBootstrap.kt +++ b/app/src/main/java/com/github/nullptroma/wallenc/app/sync/StorageSyncBootstrap.kt @@ -42,6 +42,7 @@ class StorageSyncBootstrap @Inject constructor( merge(*triggers.toTypedArray()) .debounce(DEBOUNCE_AFTER_CHANGE_MS) .collect { + // RunStorageSyncUseCase.enqueue отбрасывает повтор, пока sync уже в очереди/в работе. syncRunner.enqueue( displayTitle = uiStrings(R.string.task_title_storage_sync_background), logReason = "debounce", diff --git a/domain-vault/src/main/java/com/github/nullptroma/wallenc/domain/vault/network/yandexdisk/YandexDiskApiFactory.kt b/domain-vault/src/main/java/com/github/nullptroma/wallenc/domain/vault/network/yandexdisk/YandexDiskApiFactory.kt index 65e4192..300fbc9 100644 --- a/domain-vault/src/main/java/com/github/nullptroma/wallenc/domain/vault/network/yandexdisk/YandexDiskApiFactory.kt +++ b/domain-vault/src/main/java/com/github/nullptroma/wallenc/domain/vault/network/yandexdisk/YandexDiskApiFactory.kt @@ -54,28 +54,26 @@ class YandexDiskApiFactory( } /** - * Провайдер токена читает [YandexAccountRepository] на IO-диспетчере (как и остальной data-слой). + * OAuth-токен загружается один раз при создании API (не в OkHttp interceptor). + * При 401 см. [YandexDiskAuthException] и повторную привязку vault. */ fun createApiForVault(vaultUuid: UUID): YandexDiskApi { val id = vaultUuid.toString() + val token = runBlocking(ioDispatcher) { + accountRepository.getByVaultUuid(id)?.oauthToken + } ?: throw java.io.IOException("Yandex OAuth token is missing") + oauthTokenCache[id] = System.currentTimeMillis() to token return createAuthenticatedApi { - val now = System.currentTimeMillis() - val hit = oauthTokenCache[id] - if (hit != null && now - hit.first < OAUTH_TOKEN_CACHE_TTL_MS) { - return@createAuthenticatedApi hit.second - } - val token = runBlocking(ioDispatcher) { - accountRepository.getByVaultUuid(id)?.oauthToken - } ?: throw java.io.IOException("Yandex OAuth token is missing") - oauthTokenCache[id] = now to token - token + oauthTokenCache[id]?.second ?: token } } + fun invalidateTokenCache(vaultUuid: UUID) { + oauthTokenCache.remove(vaultUuid.toString()) + } + companion object { const val BASE_URL = "https://cloud-api.yandex.net/" - private const val OAUTH_TOKEN_CACHE_TTL_MS = 120_000L - fun createRepositoryWithToken( oauthToken: String, ioDispatcher: CoroutineDispatcher, diff --git a/domain-vault/src/main/java/com/github/nullptroma/wallenc/domain/vault/storages/encrypt/EncryptedStorageAccessor.kt b/domain-vault/src/main/java/com/github/nullptroma/wallenc/domain/vault/storages/encrypt/EncryptedStorageAccessor.kt index a887cc6..f6c0b9c 100644 --- a/domain-vault/src/main/java/com/github/nullptroma/wallenc/domain/vault/storages/encrypt/EncryptedStorageAccessor.kt +++ b/domain-vault/src/main/java/com/github/nullptroma/wallenc/domain/vault/storages/encrypt/EncryptedStorageAccessor.kt @@ -46,6 +46,7 @@ class EncryptedStorageAccessor( private val scope: CoroutineScope ) : IStorageAccessor, DisposableHandle { private val syncActorId = UUID.randomUUID().toString() + private val syncLockMutex = Mutex() private val _size = MutableStateFlow(null) override val size: StateFlow = _size @@ -63,7 +64,6 @@ class EncryptedStorageAccessor( private val dataEncryptor = Encryptor(key.toAesKey()) private val pathEncryptor: EncryptorWithStaticIv? = if(pathIv != null) EncryptorWithStaticIv(key.toAesKey(), pathIv) else null - private val syncLockMutex = Mutex() private var systemHiddenFilesIsActual = false init { @@ -266,7 +266,7 @@ class EncryptedStorageAccessor( override suspend fun moveToTrash(path: String) { source.moveToTrash(encryptPath(path)) - appendSyncEntry(path = path, operation = StorageSyncOperation.DELETE) + appendSyncEntry(path = path, operation = StorageSyncOperation.TRASH) } override fun dispose() { diff --git a/domain-vault/src/main/java/com/github/nullptroma/wallenc/domain/vault/storages/local/LocalStorageAccessor.kt b/domain-vault/src/main/java/com/github/nullptroma/wallenc/domain/vault/storages/local/LocalStorageAccessor.kt index b10e1c0..20205fc 100644 --- a/domain-vault/src/main/java/com/github/nullptroma/wallenc/domain/vault/storages/local/LocalStorageAccessor.kt +++ b/domain-vault/src/main/java/com/github/nullptroma/wallenc/domain/vault/storages/local/LocalStorageAccessor.kt @@ -547,7 +547,7 @@ class LocalStorageAccessor( ?: throw WallencException.Storage.FileNotFound() val newMeta = pair.meta.copy(isDeleted = true) writeMeta(pair.metaFile, newMeta) - appendSyncEntry(path = path, operation = StorageSyncOperation.DELETE) + appendSyncEntry(path = path, operation = StorageSyncOperation.TRASH) } override suspend fun openReadSystemFile(name: String): InputStream = withContext(ioDispatcher) { diff --git a/domain-vault/src/main/java/com/github/nullptroma/wallenc/domain/vault/storages/yandex/YandexStorageAccessor.kt b/domain-vault/src/main/java/com/github/nullptroma/wallenc/domain/vault/storages/yandex/YandexStorageAccessor.kt index 5e6d710..27d9757 100644 --- a/domain-vault/src/main/java/com/github/nullptroma/wallenc/domain/vault/storages/yandex/YandexStorageAccessor.kt +++ b/domain-vault/src/main/java/com/github/nullptroma/wallenc/domain/vault/storages/yandex/YandexStorageAccessor.kt @@ -532,34 +532,8 @@ class YandexStorageAccessor( val tmp = File.createTempFile("wallenc-yandex-", ".upload") val fos = FileOutputStream(tmp) fos.onClosed { - runBlocking(ioDispatcher) { - try { - val diskPath = toDiskPath(path) - val prior = guard { repo.getOrNull(diskPath) } - if (prior?.type == "dir") { - throw WallencException.Storage.CannotWriteOverDirectory() - } - val hadFile = prior?.type == "file" - val priorSize = if (prior?.type == "file") prior.size ?: 0L else 0L - guard { repo.uploadFile(diskPath, tmp, overwrite = true) } - val after = guard { getMetadataAfterWrite(diskPath) } - if (after.type != "file") { - throw WallencException.Storage.UnexpectedState() - } - val newSize = after.size ?: 0L - _size.value = ((_size.value ?: 0L) + newSize - priorSize).coerceAtLeast(0L) - if (!hadFile) { - _numberOfFiles.value = (_numberOfFiles.value ?: 0) + 1 - } - persistStatsImmediate() - val info = runCatching { after.toCommonFile(path) }.getOrNull() - info?.let { - _filesUpdates.emit(DataPage(listOf(it), pageLength = 1, pageIndex = 0)) - } - appendSyncEntry(path = path, operation = StorageSyncOperation.UPSERT) - } finally { - tmp.delete() - } + runCommitAfterStreamClosed { + commitUploadedFile(path, tmp) } } } @@ -570,7 +544,7 @@ class YandexStorageAccessor( override suspend fun moveToTrash(path: String) = withContext(ioDispatcher) { patchCustomProps(path, mapOf(PROP_DELETED to "true")) - appendSyncEntry(path = path, operation = StorageSyncOperation.DELETE) + appendSyncEntry(path = path, operation = StorageSyncOperation.TRASH) } override suspend fun openReadSystemFile(name: String): InputStream = withContext(ioDispatcher) { @@ -589,10 +563,10 @@ class YandexStorageAccessor( val rel = "/$SYSTEM_HIDDEN_DIRNAME/$name" val uploadBuffer = ByteArrayOutputStream() uploadBuffer.onClosed { - runBlocking(ioDispatcher) { - guard { - repo.uploadBytes(toDiskPath(rel), uploadBuffer.toByteArray(), overwrite = true) - } + val bytes = uploadBuffer.toByteArray() + val diskPath = toDiskPath(rel) + runCommitAfterStreamClosed { + guard { repo.uploadBytes(diskPath, bytes, overwrite = true) } } } } @@ -641,8 +615,25 @@ class YandexStorageAccessor( }.getOrNull() } + /** + * Best-effort lock на Диске (read-modify-write без CAS на стороне API). + * Межустройственная координация опирается на [StorageSyncEngine] mutex в процессе. + */ override suspend fun tryAcquireSyncLock(holderId: String, leaseUntil: Instant): Boolean = withContext(ioDispatcher) { - return@withContext syncLockMutex.withLock { + repeat(SYNC_LOCK_CAS_RETRIES) { attempt -> + val acquired = tryAcquireSyncLockOnce(holderId, leaseUntil) + if (acquired) { + return@withContext true + } + if (attempt < SYNC_LOCK_CAS_RETRIES - 1) { + delay(SYNC_LOCK_CAS_DELAY_MS) + } + } + return@withContext false + } + + private suspend fun tryAcquireSyncLockOnce(holderId: String, leaseUntil: Instant): Boolean { + return syncLockMutex.withLock { val current = readSyncLock() val now = Instant.now() val foreignLockActive = current != null && @@ -684,6 +675,45 @@ class YandexStorageAccessor( } } + /** + * Выполняется из [OutputStream.close]; ошибки upload пробрасываются вызывающему коду. + */ + private fun runCommitAfterStreamClosed(block: suspend () -> Unit) { + runBlocking(ioDispatcher) { + block() + } + } + + private suspend fun commitUploadedFile(path: String, tmp: File) { + try { + val diskPath = toDiskPath(path) + val prior = guard { repo.getOrNull(diskPath) } + if (prior?.type == "dir") { + throw WallencException.Storage.CannotWriteOverDirectory() + } + val hadFile = prior?.type == "file" + val priorSize = if (prior?.type == "file") prior.size ?: 0L else 0L + guard { repo.uploadFile(diskPath, tmp, overwrite = true) } + val after = guard { getMetadataAfterWrite(diskPath) } + if (after.type != "file") { + throw WallencException.Storage.UnexpectedState() + } + val newSize = after.size ?: 0L + _size.value = ((_size.value ?: 0L) + newSize - priorSize).coerceAtLeast(0L) + if (!hadFile) { + _numberOfFiles.value = (_numberOfFiles.value ?: 0) + 1 + } + persistStatsImmediate() + val info = runCatching { after.toCommonFile(path) }.getOrNull() + info?.let { + _filesUpdates.emit(DataPage(listOf(it), pageLength = 1, pageIndex = 0)) + } + appendSyncEntry(path = path, operation = StorageSyncOperation.UPSERT) + } finally { + tmp.delete() + } + } + private suspend fun appendSyncEntry(path: String, operation: StorageSyncOperation) { val cleanedPath = if (path.startsWith("/")) path else "/$path" val entries = readSyncJournal() @@ -726,6 +756,8 @@ class YandexStorageAccessor( private const val SYNC_JOURNAL_FILENAME = "sync-journal.json" private const val SYNC_LOCK_FILENAME = "sync-lock.json" private const val SYNC_LOCK_STALE_TIMEOUT_SECONDS: Long = 60 * 60 + private const val SYNC_LOCK_CAS_RETRIES = 3 + private const val SYNC_LOCK_CAS_DELAY_MS = 80L private const val STATS_DEBOUNCE_MS = 450L private const val DATA_PAGE_LENGTH = 10 private const val API_LIST_LIMIT = 1000 diff --git a/domain-vault/src/main/java/com/github/nullptroma/wallenc/domain/vault/utils/CloseHandledStream.kt b/domain-vault/src/main/java/com/github/nullptroma/wallenc/domain/vault/utils/CloseHandledStream.kt index a78c5fe..2fb8112 100644 --- a/domain-vault/src/main/java/com/github/nullptroma/wallenc/domain/vault/utils/CloseHandledStream.kt +++ b/domain-vault/src/main/java/com/github/nullptroma/wallenc/domain/vault/utils/CloseHandledStream.kt @@ -27,11 +27,19 @@ private class CloseHandledOutputStream( override fun close() { onClosing() + var streamFailure: Throwable? = null try { stream.close() - } finally { - onClose() + } catch (t: Throwable) { + streamFailure = t } + try { + onClose() + } catch (afterCloseFailure: Throwable) { + streamFailure?.let { afterCloseFailure.addSuppressed(it) } + throw afterCloseFailure + } + streamFailure?.let { throw it } } } diff --git a/domain/src/main/java/com/github/nullptroma/wallenc/domain/datatypes/StorageSyncModels.kt b/domain/src/main/java/com/github/nullptroma/wallenc/domain/datatypes/StorageSyncModels.kt index 6075cf9..7a213d2 100644 --- a/domain/src/main/java/com/github/nullptroma/wallenc/domain/datatypes/StorageSyncModels.kt +++ b/domain/src/main/java/com/github/nullptroma/wallenc/domain/datatypes/StorageSyncModels.kt @@ -5,6 +5,9 @@ import java.util.UUID enum class StorageSyncOperation { UPSERT, + /** Soft-delete (корзина): на peer вызывается [IStorageAccessor.moveToTrash]. */ + TRASH, + /** Жёсткое удаление файла с носителя. */ DELETE, } diff --git a/domain/src/main/java/com/github/nullptroma/wallenc/domain/tasks/TaskProgressLabel.kt b/domain/src/main/java/com/github/nullptroma/wallenc/domain/tasks/TaskProgressLabel.kt index c190c9e..e82b760 100644 --- a/domain/src/main/java/com/github/nullptroma/wallenc/domain/tasks/TaskProgressLabel.kt +++ b/domain/src/main/java/com/github/nullptroma/wallenc/domain/tasks/TaskProgressLabel.kt @@ -20,6 +20,7 @@ sealed class TaskProgressLabel { data class SyncGroupProcessingEntries(val groupId: String, val count: Int) : TaskProgressLabel() data class SyncGroupEntryProgress(val groupId: String, val current: Int, val total: Int) : TaskProgressLabel() data class SyncGroupCompleted(val groupId: String) : TaskProgressLabel() + data class SyncGroupEntriesFailed(val groupId: String, val failedCount: Int) : TaskProgressLabel() data class SyncGroupRenewingLocks(val groupId: String) : TaskProgressLabel() data class SyncGroupLockRenewalFailed(val groupId: String) : TaskProgressLabel() diff --git a/ui/src/main/java/com/github/nullptroma/wallenc/ui/elements/StorageTree.kt b/ui/src/main/java/com/github/nullptroma/wallenc/ui/elements/StorageTree.kt index 95c42c5..11eb6f6 100644 --- a/ui/src/main/java/com/github/nullptroma/wallenc/ui/elements/StorageTree.kt +++ b/ui/src/main/java/com/github/nullptroma/wallenc/ui/elements/StorageTree.kt @@ -5,14 +5,12 @@ import androidx.compose.foundation.clickable import androidx.compose.foundation.interaction.MutableInteractionSource import androidx.compose.foundation.layout.Box import androidx.compose.foundation.layout.Column -import androidx.compose.foundation.layout.IntrinsicSize import androidx.compose.foundation.layout.Row -import androidx.compose.foundation.layout.Spacer -import androidx.compose.foundation.layout.fillMaxSize import androidx.compose.foundation.layout.fillMaxWidth -import androidx.compose.foundation.layout.height import androidx.compose.foundation.layout.offset import androidx.compose.foundation.layout.padding +import androidx.compose.foundation.layout.widthIn +import androidx.compose.foundation.layout.wrapContentHeight import androidx.compose.material.icons.Icons import androidx.compose.material.icons.filled.Lock import androidx.compose.material.icons.filled.LockOpen @@ -90,17 +88,16 @@ fun StorageTree( Column(modifier) { Box( modifier = Modifier - .height(IntrinsicSize.Min) + .fillMaxWidth() + .wrapContentHeight() .zIndex(100f), ) { val interactionSource = remember { MutableInteractionSource() } Box( modifier = Modifier - .clip( - CardDefaults.shape, - ) - .padding(0.dp, 0.dp, 16.dp, 0.dp) - .fillMaxSize() + .matchParentSize() + .padding(end = 16.dp) + .clip(CardDefaults.shape) .background(borderColor) .clickable( interactionSource = interactionSource, @@ -112,8 +109,9 @@ fun StorageTree( Card( interactionSource = interactionSource, modifier = Modifier - .padding(8.dp, 0.dp, 0.dp, 0.dp) - .fillMaxWidth(), + .padding(start = 8.dp) + .fillMaxWidth() + .wrapContentHeight(), elevation = CardDefaults.cardElevation( defaultElevation = 4.dp, ), @@ -124,8 +122,16 @@ fun StorageTree( } }, ) { - Row(modifier = Modifier.height(IntrinsicSize.Min)) { - Column(modifier = Modifier.padding(8.dp)) { + Row( + modifier = Modifier + .fillMaxWidth() + .wrapContentHeight(), + ) { + Column( + modifier = Modifier + .weight(1f) + .padding(8.dp), + ) { Text(metaInfo.name ?: stringResource(R.string.no_name)) Text( text = stringResource( @@ -170,7 +176,9 @@ fun StorageTree( } } Column( - modifier = Modifier, + modifier = Modifier + .widthIn(min = 112.dp) + .padding(end = 4.dp), horizontalAlignment = Alignment.End, ) { var expanded by remember { mutableStateOf(false) } @@ -368,7 +376,6 @@ fun StorageTree( ) } } - Spacer(modifier = Modifier.weight(1f)) if (isEncrypted) { IconButton( onClick = { showLockDialog = true }, @@ -382,18 +389,14 @@ fun StorageTree( } Text( modifier = Modifier - .fillMaxWidth() - .padding(0.dp, 0.dp, 12.dp, 0.dp) - .align(Alignment.End), + .padding(top = 4.dp, end = 8.dp), text = stringResource(getStatusTextRes(tree)), textAlign = TextAlign.End, fontSize = 11.sp, ) Text( modifier = Modifier - .fillMaxWidth() - .padding(0.dp, 0.dp, 12.dp, 8.dp) - .align(Alignment.End), + .padding(end = 8.dp, bottom = 8.dp), text = cur.uuid.toString(), textAlign = TextAlign.End, fontSize = 8.sp, diff --git a/ui/src/main/java/com/github/nullptroma/wallenc/ui/resources/TaskProgressLabels.kt b/ui/src/main/java/com/github/nullptroma/wallenc/ui/resources/TaskProgressLabels.kt index 8abdc97..aaa591b 100644 --- a/ui/src/main/java/com/github/nullptroma/wallenc/ui/resources/TaskProgressLabels.kt +++ b/ui/src/main/java/com/github/nullptroma/wallenc/ui/resources/TaskProgressLabels.kt @@ -37,6 +37,8 @@ fun TaskProgressLabel.resolve(resolver: UiStringResolver): String = when (this) resolver(R.string.sync_progress_group_entry, groupId, current, total) is TaskProgressLabel.SyncGroupCompleted -> resolver(R.string.sync_progress_group_completed, groupId) + is TaskProgressLabel.SyncGroupEntriesFailed -> + resolver.plurals(R.plurals.sync_progress_group_entries_failed, failedCount, groupId, failedCount) is TaskProgressLabel.SyncGroupRenewingLocks -> resolver(R.string.sync_progress_group_renewing_locks, groupId) is TaskProgressLabel.SyncGroupLockRenewalFailed -> diff --git a/ui/src/main/java/com/github/nullptroma/wallenc/ui/screens/sync/StorageSyncViewModel.kt b/ui/src/main/java/com/github/nullptroma/wallenc/ui/screens/sync/StorageSyncViewModel.kt index 901c55b..a6776d3 100644 --- a/ui/src/main/java/com/github/nullptroma/wallenc/ui/screens/sync/StorageSyncViewModel.kt +++ b/ui/src/main/java/com/github/nullptroma/wallenc/ui/screens/sync/StorageSyncViewModel.kt @@ -383,7 +383,6 @@ class StorageSyncViewModel @Inject constructor( !isStorageCompatibleWithGroup( storage = storage, group = group, - resolveStorageKey = vaultsManager.unlockManager::getOpenedStorageKey, ) } StorageSyncGroupUi( diff --git a/ui/src/main/res/values-ru/plurals.xml b/ui/src/main/res/values-ru/plurals.xml index 6d666e6..5e670c6 100644 --- a/ui/src/main/res/values-ru/plurals.xml +++ b/ui/src/main/res/values-ru/plurals.xml @@ -12,4 +12,10 @@ Синхронизация: группа «%1$s» — обработка %2$d записей Синхронизация: группа «%1$s» — обработка %2$d записей + + Синхронизация: группа «%1$s» — не применена %2$d запись + Синхронизация: группа «%1$s» — не применены %2$d записи + Синхронизация: группа «%1$s» — не применено %2$d записей + Синхронизация: группа «%1$s» — не применено %2$d записей + diff --git a/ui/src/main/res/values/plurals.xml b/ui/src/main/res/values/plurals.xml index 12b9756..f3b1217 100644 --- a/ui/src/main/res/values/plurals.xml +++ b/ui/src/main/res/values/plurals.xml @@ -8,4 +8,8 @@ Storage sync: group "%1$s" processing %2$d entry Storage sync: group "%1$s" processing %2$d entries + + Storage sync: group "%1$s" — %2$d entry failed + Storage sync: group "%1$s" — %2$d entries failed + diff --git a/usecases/src/main/java/com/github/nullptroma/wallenc/usecases/StorageSyncEncryptionCompat.kt b/usecases/src/main/java/com/github/nullptroma/wallenc/usecases/StorageSyncEncryptionCompat.kt index e2409ce..fe056e4 100644 --- a/usecases/src/main/java/com/github/nullptroma/wallenc/usecases/StorageSyncEncryptionCompat.kt +++ b/usecases/src/main/java/com/github/nullptroma/wallenc/usecases/StorageSyncEncryptionCompat.kt @@ -1,17 +1,14 @@ package com.github.nullptroma.wallenc.usecases -import com.github.nullptroma.wallenc.domain.datatypes.EncryptKey import com.github.nullptroma.wallenc.domain.interfaces.IStorage import com.github.nullptroma.wallenc.domain.interfaces.StorageSyncGroup import com.github.nullptroma.wallenc.domain.interfaces.StorageSyncGroupEncryptionKind -import java.util.UUID +/** Совместим, если у storage нет активного шифрования ([encInfo] == null). */ fun isStorageCompatibleWithGroup( storage: IStorage, group: StorageSyncGroup, - resolveStorageKey: (UUID) -> EncryptKey?, ): Boolean { - // Режим упрощён: в sync-группах допускаются только незашифрованные storage. if (storage.metaInfo.value.encInfo != null) { return false } diff --git a/usecases/src/main/java/com/github/nullptroma/wallenc/usecases/StorageSyncEngine.kt b/usecases/src/main/java/com/github/nullptroma/wallenc/usecases/StorageSyncEngine.kt index 73a037e..e88f743 100644 --- a/usecases/src/main/java/com/github/nullptroma/wallenc/usecases/StorageSyncEngine.kt +++ b/usecases/src/main/java/com/github/nullptroma/wallenc/usecases/StorageSyncEngine.kt @@ -19,6 +19,11 @@ import java.util.concurrent.atomic.AtomicLong import javax.inject.Inject import javax.inject.Singleton +/** + * Синхронизация по журналам storage в группе. + * Блокировка на Yandex Disk — best-effort (см. [IStorageAccessor.tryAcquireSyncLock]); + * сериализация внутри процесса — [groupMutexes]. + */ @Singleton class StorageSyncEngine @Inject constructor( private val vaultsManager: IVaultsManager, @@ -81,7 +86,6 @@ class StorageSyncEngine @Inject constructor( isStorageCompatibleWithGroup( storage = storage, group = group, - resolveStorageKey = vaultsManager.unlockManager::getOpenedStorageKey, ) } if (incompatible.isNotEmpty()) { @@ -148,6 +152,7 @@ class StorageSyncEngine @Inject constructor( null, TaskProgressLabel.SyncGroupProcessingEntries(groupId, mergedEntries.size), ) + var applyFailures = 0 for ((pathIndex, merged) in mergedEntries.withIndex()) { leaseUntil = renewLocksIfNeeded( groupId = groupId, @@ -166,7 +171,9 @@ class StorageSyncEngine @Inject constructor( return } val sourceStorage = findSourceStorage(storages, entriesByStorage, path, winnerEntry) - if (sourceStorage == null && winnerEntry.operation == StorageSyncOperation.UPSERT) { + if (sourceStorage == null && + winnerEntry.operation == StorageSyncOperation.UPSERT + ) { continue } @@ -185,9 +192,18 @@ class StorageSyncEngine @Inject constructor( ) if (applied) { target.accessor.appendSyncJournal(listOf(winnerEntry)) + } else { + applyFailures++ } } } + compactSyncJournals(storages) + if (applyFailures > 0) { + reportProgress( + null, + TaskProgressLabel.SyncGroupEntriesFailed(groupId, applyFailures), + ) + } reportProgress(null, TaskProgressLabel.SyncGroupCompleted(groupId)) } finally { for (accessor in lockedAccessors) { @@ -244,8 +260,12 @@ class StorageSyncEngine @Inject constructor( path: String, winnerEntry: StorageSyncJournalEntry, ): IStorage? { - if (winnerEntry.operation == StorageSyncOperation.DELETE) { - return storages.firstOrNull() + if (winnerEntry.operation == StorageSyncOperation.DELETE || + winnerEntry.operation == StorageSyncOperation.TRASH + ) { + return storages.firstOrNull { storage -> + entriesByStorage[storage.uuid]?.get(path) != null + } ?: storages.firstOrNull() } return storages.firstOrNull { storage -> val entry = entriesByStorage[storage.uuid]?.get(path) ?: return@firstOrNull false @@ -253,29 +273,52 @@ class StorageSyncEngine @Inject constructor( } } + private suspend fun compactSyncJournals(storages: List) { + for (storage in storages) { + val entries = storage.accessor.readSyncJournal() + val compacted = latestByPath(entries).values.toList() + if (compacted.size < entries.size) { + storage.accessor.rewriteSyncJournal(compacted) + } + } + } + private suspend fun applyEntry( source: IStorage?, target: IStorage, entry: StorageSyncJournalEntry, ): Boolean { - when (entry.operation) { + val result = when (entry.operation) { StorageSyncOperation.DELETE -> { - return runCatching { + runCatching { target.accessor.delete(entry.path) - }.isSuccess + } + } + + StorageSyncOperation.TRASH -> { + runCatching { + target.accessor.moveToTrash(entry.path) + } } StorageSyncOperation.UPSERT -> { val sourceAccessor = source?.accessor ?: return false - return runCatching { + runCatching { sourceAccessor.openRead(entry.path).use { input -> target.accessor.openWrite(entry.path).use { output -> input.copyTo(output) } } - }.isSuccess + } } } + result.exceptionOrNull()?.let { error -> + System.err.println( + "StorageSyncEngine: apply ${entry.operation} ${entry.path} " + + "target=${target.uuid}: ${error.message}", + ) + } + return result.isSuccess } private fun compareEntries(a: StorageSyncJournalEntry, b: StorageSyncJournalEntry): Int { diff --git a/usecases/src/test/java/com/github/nullptroma/wallenc/usecases/StorageSyncEncryptionCompatTest.kt b/usecases/src/test/java/com/github/nullptroma/wallenc/usecases/StorageSyncEncryptionCompatTest.kt new file mode 100644 index 0000000..c2fbe27 --- /dev/null +++ b/usecases/src/test/java/com/github/nullptroma/wallenc/usecases/StorageSyncEncryptionCompatTest.kt @@ -0,0 +1,41 @@ +package com.github.nullptroma.wallenc.usecases + +import com.github.nullptroma.wallenc.domain.datatypes.StorageEncryptionInfo +import com.github.nullptroma.wallenc.domain.interfaces.StorageSyncGroup +import com.github.nullptroma.wallenc.domain.interfaces.StorageSyncGroupEncryptionKind +import com.github.nullptroma.wallenc.usecases.fakes.FakeMetaInfo +import com.github.nullptroma.wallenc.usecases.fakes.FakeStorage +import org.junit.Assert.assertFalse +import org.junit.Assert.assertTrue +import org.junit.Test +import java.util.UUID + +class StorageSyncEncryptionCompatTest { + + @Test + fun storageWithoutEncInfoIsCompatible() { + val storage = FakeStorage(uuid = UUID.randomUUID(), meta = FakeMetaInfo(encInfo = null)) + val group = StorageSyncGroup( + id = "g1", + storageUuids = setOf(storage.uuid), + encryptionKind = StorageSyncGroupEncryptionKind.NONE, + ) + assertTrue(isStorageCompatibleWithGroup(storage = storage, group = group)) + } + + @Test + fun storageWithEncInfoIsIncompatible() { + val storage = FakeStorage( + uuid = UUID.randomUUID(), + meta = FakeMetaInfo( + encInfo = StorageEncryptionInfo(encryptedTestData = "x", pathIv = ByteArray(16)), + ), + ) + val group = StorageSyncGroup( + id = "g1", + storageUuids = setOf(storage.uuid), + encryptionKind = StorageSyncGroupEncryptionKind.NONE, + ) + assertFalse(isStorageCompatibleWithGroup(storage = storage, group = group)) + } +} diff --git a/usecases/src/test/java/com/github/nullptroma/wallenc/usecases/StorageSyncEngineTest.kt b/usecases/src/test/java/com/github/nullptroma/wallenc/usecases/StorageSyncEngineTest.kt index 046d285..de9a9e1 100644 --- a/usecases/src/test/java/com/github/nullptroma/wallenc/usecases/StorageSyncEngineTest.kt +++ b/usecases/src/test/java/com/github/nullptroma/wallenc/usecases/StorageSyncEngineTest.kt @@ -7,6 +7,7 @@ import com.github.nullptroma.wallenc.domain.interfaces.StorageSyncGroup import com.github.nullptroma.wallenc.domain.interfaces.StorageSyncGroupEncryptionKind import com.github.nullptroma.wallenc.domain.tasks.TaskProgressLabel import com.github.nullptroma.wallenc.usecases.fakes.FakeStorage +import com.github.nullptroma.wallenc.usecases.fakes.FakeStorageAccessor import com.github.nullptroma.wallenc.usecases.fakes.FakeStorageSyncGroupStore import com.github.nullptroma.wallenc.usecases.fakes.FakeVaultsManager import kotlinx.coroutines.runBlocking @@ -112,6 +113,41 @@ class StorageSyncEngineTest { assertNull(target.fileBytes(path)) } + @Test + fun syncGroupTrashSoftDeletesOnTarget() = runBlocking { + val source = FakeStorage() + val target = FakeStorage() + val path = "trashed/doc.txt" + val payload = "keep-in-trash".encodeToByteArray() + source.putFile(path, payload) + target.putFile(path, payload) + + val entry = StorageSyncJournalEntry( + path = path, + operation = StorageSyncOperation.TRASH, + revision = StorageSyncRevision( + sequence = 3L, + actorId = "actor-trash", + createdAt = Instant.parse("2024-07-01T00:00:00Z"), + ), + ) + source.addSyncJournalEntry(entry) + + val group = StorageSyncGroup( + id = "trash-group", + storageUuids = setOf(source.uuid, target.uuid), + encryptionKind = StorageSyncGroupEncryptionKind.NONE, + ) + val engine = createEngine( + storages = listOf(source, target), + groups = listOf(group), + ) + engine.syncGroup(group.id) { _, _ -> } + + assertArrayEquals(payload, target.fileBytes(path)) + assertTrue(path in (target.accessor as FakeStorageAccessor).trashedPaths) + } + @Test fun syncGroupStopsWhenLockCannotBeAcquired() = runBlocking { val first = FakeStorage() diff --git a/usecases/src/test/java/com/github/nullptroma/wallenc/usecases/fakes/FakeStorageAccessor.kt b/usecases/src/test/java/com/github/nullptroma/wallenc/usecases/fakes/FakeStorageAccessor.kt index 9a03410..0e9801c 100644 --- a/usecases/src/test/java/com/github/nullptroma/wallenc/usecases/fakes/FakeStorageAccessor.kt +++ b/usecases/src/test/java/com/github/nullptroma/wallenc/usecases/fakes/FakeStorageAccessor.kt @@ -21,6 +21,7 @@ import java.time.Instant class FakeStorageAccessor : IStorageAccessor { val dataFiles: MutableMap = mutableMapOf() + val trashedPaths: MutableSet = mutableSetOf() private val systemFiles: MutableMap = mutableMapOf() private val _filesUpdates = MutableSharedFlow>(extraBufferCapacity = 16) @@ -84,7 +85,11 @@ class FakeStorageAccessor : IStorageAccessor { return ByteArrayInputStream(bytes) } - override suspend fun moveToTrash(path: String) = Unit + override suspend fun moveToTrash(path: String) { + if (path in dataFiles) { + trashedPaths.add(path) + } + } override suspend fun openReadSystemFile(name: String): InputStream { val bytes = systemFiles[name] ?: ByteArray(0)