Compare commits

..

7 Commits

Author SHA1 Message Date
96e9de49c3 Более аккуратная обработка сетвой ошибки на экране Vault 2026-05-22 13:33:14 +03:00
6ab402da51 perf(yandex): сузил инвалидацию кэша Disk API и добавил счётчик запросов
Инвалидирую list/get по префиксу пути вместо полной очистки, учитываю вызовы
в cloudApiCallCount для замеров.
2026-05-22 13:22:17 +03:00
2618df41e3 feat(sync): добавил cooperative-отмену sync и pipeline-задач
ensureActive в StorageSyncEngine, flush журнала перед чтением, Cancelled
в StorageSyncRunOutcome и TaskContext.ensureNotCancelled.
2026-05-22 13:22:15 +03:00
bc2b354820 fix(sync): исправил журнал при DELETE/TRASH и безопасный flush
Добавил recordSyncJournal для delete/moveToTrash, StorageSyncJournalBuffer
с восстановлением pending при ошибке записи и немедленным flush без debounce.
2026-05-22 13:22:05 +03:00
b00eed901b foreground task для фоновой синхронизации 2026-05-22 00:51:29 +03:00
35ba6dd377 Костыль для подавления цикла синхронизации 2026-05-22 00:37:00 +03:00
07d54b5996 Заголовок задачи синхронизации 2026-05-22 00:20:49 +03:00
28 changed files with 818 additions and 201 deletions

View File

@@ -1,6 +1,8 @@
package com.github.nullptroma.wallenc.app.di.modules.domain
import com.github.nullptroma.wallenc.app.sync.StorageSyncTaskTitleFormatter
import com.github.nullptroma.wallenc.domain.interfaces.IStorageSyncEngine
import com.github.nullptroma.wallenc.domain.tasks.IStorageSyncTaskTitleFormatter
import com.github.nullptroma.wallenc.usecases.StorageSyncEngine
import dagger.Binds
import dagger.Module
@@ -15,4 +17,10 @@ abstract class UseCasesModule {
@Binds
@Singleton
abstract fun bindStorageSyncEngine(impl: StorageSyncEngine): IStorageSyncEngine
@Binds
@Singleton
abstract fun bindStorageSyncTaskTitleFormatter(
impl: StorageSyncTaskTitleFormatter,
): IStorageSyncTaskTitleFormatter
}

View File

@@ -2,8 +2,6 @@ package com.github.nullptroma.wallenc.app.sync
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncPaths
import com.github.nullptroma.wallenc.domain.interfaces.IVaultsManager
import com.github.nullptroma.wallenc.ui.R
import com.github.nullptroma.wallenc.ui.resources.UiStringResolver
import com.github.nullptroma.wallenc.domain.tasks.StorageSyncTriggerReason
import com.github.nullptroma.wallenc.usecases.RunStorageSyncUseCase
import kotlinx.coroutines.CoroutineScope
@@ -11,6 +9,7 @@ import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.flow.collectLatest
import kotlinx.coroutines.flow.combine
import kotlinx.coroutines.flow.debounce
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.map
@@ -25,14 +24,18 @@ class StorageSyncBootstrap @Inject constructor(
private val scheduler: StorageSyncScheduler,
private val vaultsManager: IVaultsManager,
private val syncRunner: RunStorageSyncUseCase,
private val uiStrings: UiStringResolver,
) {
private val scope = CoroutineScope(SupervisorJob() + Dispatchers.IO)
fun start() {
scheduler.ensureScheduled()
scope.launch {
vaultsManager.allStorages.collectLatest { storages ->
combine(
vaultsManager.allStorages,
vaultsManager.unlockManager.openedStorages,
) { rootStorages, opened ->
(rootStorages + opened.values).distinctBy { it.uuid }
}.collectLatest { storages ->
if (storages.isEmpty()) {
return@collectLatest
}
@@ -55,21 +58,23 @@ class StorageSyncBootstrap @Inject constructor(
)
}
merge(*triggers.toTypedArray())
.debounce(DEBOUNCE_AFTER_CHANGE_MS)
.filter { shouldScheduleDebounceSync() }
.debounce(RunStorageSyncUseCase.DEBOUNCE_AFTER_CHANGE_MS)
.collect {
if (syncRunner.syncRunning.value) {
return@collect
}
syncRunner.enqueue(
displayTitle = uiStrings(R.string.task_title_storage_sync_background),
reason = StorageSyncTriggerReason.Debounce,
)
syncRunner.enqueue(StorageSyncTriggerReason.Debounce)
}
}
}
}
private companion object {
private const val DEBOUNCE_AFTER_CHANGE_MS = 60_000L
/**
* Игнорировать события во время sync и сразу после него: запись файлов при sync
* (в т.ч. через [EncryptedStorageAccessor]) иначе снова запускает debounce через 60 с.
*/
private fun shouldScheduleDebounceSync(): Boolean {
if (syncRunner.syncRunning.value) {
return false
}
return System.currentTimeMillis() >= syncRunner.debounceSuppressUntilMs.value
}
}

View File

@@ -0,0 +1,16 @@
package com.github.nullptroma.wallenc.app.sync
import com.github.nullptroma.wallenc.domain.tasks.IStorageSyncTaskTitleFormatter
import com.github.nullptroma.wallenc.domain.tasks.StorageSyncTriggerReason
import com.github.nullptroma.wallenc.ui.resources.UiStringResolver
import com.github.nullptroma.wallenc.ui.resources.storageSyncTaskTitle
import javax.inject.Inject
import javax.inject.Singleton
@Singleton
class StorageSyncTaskTitleFormatter @Inject constructor(
private val uiStrings: UiStringResolver,
) : IStorageSyncTaskTitleFormatter {
override fun format(reason: StorageSyncTriggerReason): String =
uiStrings.storageSyncTaskTitle(reason)
}

View File

@@ -6,6 +6,7 @@ import androidx.work.CoroutineWorker
import androidx.work.WorkerParameters
import com.github.nullptroma.wallenc.domain.tasks.StorageSyncTriggerReason
import com.github.nullptroma.wallenc.usecases.RunStorageSyncUseCase
import com.github.nullptroma.wallenc.usecases.StorageSyncRunOutcome
import dagger.assisted.Assisted
import dagger.assisted.AssistedInject
import timber.log.Timber
@@ -19,15 +20,25 @@ class StorageSyncWorker @AssistedInject constructor(
override suspend fun doWork(): Result {
Timber.d("Periodic storage sync started (attempt %d)", runAttemptCount)
return runCatching {
syncRunner.runBlocking(StorageSyncTriggerReason.Background)
return when (val outcome = syncRunner.enqueueAndAwait(StorageSyncTriggerReason.Background)) {
StorageSyncRunOutcome.SkippedAlreadyRunning -> {
Timber.d("Periodic storage sync skipped — already running")
Result.success()
}
StorageSyncRunOutcome.Completed -> {
Timber.d("Periodic storage sync finished")
Result.success()
}.getOrElse { error ->
Timber.w(error, "Periodic storage sync failed")
}
StorageSyncRunOutcome.Cancelled -> {
Timber.d("Periodic storage sync cancelled")
Result.success()
}
is StorageSyncRunOutcome.Failed -> {
Timber.w(outcome.error, "Periodic storage sync failed")
Result.retry()
}
}
}
companion object {
const val UNIQUE_WORK_NAME = "wallenc-storage-sync-periodic"

View File

@@ -25,6 +25,7 @@ import java.io.FilterInputStream
import java.io.IOException
import java.io.InputStream
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicLong
class YandexDiskRepository(
private val api: YandexDiskApi,
@@ -38,6 +39,13 @@ class YandexDiskRepository(
private val listCache = ConcurrentHashMap<ListCacheKey, ResourceDto>()
private val getCache = ConcurrentHashMap<String, ResourceDto>()
private val cloudApiCallCount = AtomicLong(0)
fun cloudApiCallCount(): Long = cloudApiCallCount.get()
fun resetCloudApiCallCount() {
cloudApiCallCount.set(0)
}
suspend fun diskInfo(): DiskInfoDto = withContext(ioDispatcher) {
val now = System.currentTimeMillis()
@@ -93,7 +101,7 @@ class YandexDiskRepository(
suspend fun createFolder(path: String): Unit = withContext(ioDispatcher) {
val resp = wrapAuth { api.createFolder(path) }
when (resp.code()) {
201, 409 -> invalidateDiskMetaCaches()
201, 409 -> invalidateDiskMetaCaches(path)
else -> throw failure("createFolder", resp)
}
}
@@ -101,14 +109,14 @@ class YandexDiskRepository(
suspend fun delete(path: String, permanently: Boolean = true): Unit = withContext(ioDispatcher) {
val resp = wrapAuth { api.deleteResource(path, permanently) }
when (resp.code()) {
204 -> invalidateDiskMetaCaches()
204 -> invalidateDiskMetaCaches(path)
202 -> {
val link = resp.body()?.use { body -> parseLink(body) }
?: throw IOException("DELETE 202 without body")
awaitOperation(link.href)
invalidateDiskMetaCaches()
invalidateDiskMetaCaches(path)
}
404 -> invalidateDiskMetaCaches()
404 -> invalidateDiskMetaCaches(path)
else -> throw failure("delete", resp)
}
}
@@ -122,7 +130,7 @@ class YandexDiskRepository(
throw failure("patch", resp)
}
resp.body()?.close()
invalidateDiskMetaCaches()
invalidateDiskMetaCaches(path)
}
suspend fun uploadBytes(path: String, bytes: ByteArray, overwrite: Boolean = true): Unit =
@@ -134,10 +142,11 @@ class YandexDiskRepository(
val body = bytes.toRequestBody(OCTET_STREAM)
val req = Request.Builder().url(link.href).put(body).build()
repeat(LOCKED_RETRY_MAX) { attempt ->
recordCloudApiCall()
rawHttp.newCall(req).execute().use { resp ->
when {
resp.isSuccessful -> {
invalidateDiskMetaCaches()
invalidateDiskMetaCaches(path)
return@withContext
}
resp.code == 423 && attempt < LOCKED_RETRY_MAX - 1 ->
@@ -158,10 +167,11 @@ class YandexDiskRepository(
val body = file.asRequestBody(OCTET_STREAM)
val req = Request.Builder().url(link.href).put(body).build()
repeat(LOCKED_RETRY_MAX) { attempt ->
recordCloudApiCall()
rawHttp.newCall(req).execute().use { resp ->
when {
resp.isSuccessful -> {
invalidateDiskMetaCaches()
invalidateDiskMetaCaches(path)
return@withContext
}
resp.code == 423 && attempt < LOCKED_RETRY_MAX - 1 ->
@@ -186,6 +196,7 @@ class YandexDiskRepository(
}
val req = Request.Builder().url(link.href).get().build()
repeat(LOCKED_RETRY_MAX) { attempt ->
recordCloudApiCall()
val resp = rawHttp.newCall(req).execute()
when {
resp.isSuccessful -> {
@@ -281,18 +292,47 @@ class YandexDiskRepository(
getCache[path] = value
}
private fun invalidateDiskMetaCaches() {
private fun invalidateDiskMetaCaches(changedDiskPath: String? = null) {
synchronized(diskCacheLock) {
diskInfoCached = null
diskInfoCachedUntilMs = 0L
}
if (changedDiskPath == null) {
listCache.clear()
getCache.clear()
return
}
val prefixes = cachePrefixesForPath(changedDiskPath)
listCache.keys.removeAll { key ->
prefixes.any { prefix ->
key.path.startsWith(prefix) || prefix.startsWith(key.path.trimEnd('/'))
}
}
getCache.keys.removeAll { cachedPath ->
prefixes.any { prefix ->
cachedPath.startsWith(prefix) || prefix.startsWith(cachedPath.trimEnd('/'))
}
}
}
private fun cachePrefixesForPath(diskPath: String): List<String> {
val normalized = diskPath.trimEnd('/')
val out = mutableListOf<String>()
var current = normalized
while (current.isNotEmpty()) {
out.add(current)
out.add("$current/")
val slash = current.lastIndexOf('/')
if (slash <= 0) break
current = current.substring(0, slash)
}
return out
}
private suspend inline fun <T> wrapAuth(crossinline block: suspend () -> T): T {
repeat(LOCKED_RETRY_MAX) { attempt ->
try {
recordCloudApiCall()
return block()
} catch (e: HttpException) {
when (e.code()) {
@@ -313,6 +353,10 @@ class YandexDiskRepository(
error("unreachable")
}
private fun recordCloudApiCall() {
cloudApiCallCount.incrementAndGet()
}
private fun failure(op: String, resp: Response<ResponseBody>): IOException {
val msg = resp.errorBody()?.string() ?: resp.message()
return IOException("$op failed: HTTP ${resp.code()} $msg")

View File

@@ -0,0 +1,112 @@
package com.github.nullptroma.wallenc.domain.vault.storages.common
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournal
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournalEntry
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournalMerge
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncRevision
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import java.util.UUID
/**
* Буфер журнала sync: накопление записей и безопасный flush (без потери pending при ошибке записи).
*/
class StorageSyncJournalBuffer(
private val syncActorId: String,
private val originStorageUuid: UUID?,
private val readJournal: suspend () -> StorageSyncJournal,
private val writeJournal: suspend (StorageSyncJournal) -> Unit,
) {
private val pendingMutex = Mutex()
private val flushMutex = Mutex()
private var pendingJournalEntries: StorageSyncJournal = emptyMap()
@Volatile
private var sequenceHighWatermark: Long? = null
suspend fun flushPending() {
flushMutex.withLock {
flushPendingUnderLock()
}
}
suspend fun putEntries(entries: StorageSyncJournal) {
flushPending()
if (entries.isEmpty()) {
return
}
val current = readJournal()
val merged = StorageSyncJournalMerge.merge(current, entries)
if (merged == current) {
return
}
writeJournal(merged)
refreshSequenceHighWatermark(merged)
}
suspend fun appendEntry(path: String, entry: StorageSyncJournalEntry) {
pendingMutex.withLock {
pendingJournalEntries = pendingJournalEntries + (path to entry)
}
flushPending()
}
suspend fun nextSequence(): Long {
flushPending()
val diskMax = readJournal().values.maxOfOrNull { it.revision.sequence } ?: 0L
val pendingMax = pendingMutex.withLock {
pendingJournalEntries.values.maxOfOrNull { it.revision.sequence } ?: 0L
}
val base = maxOf(diskMax, pendingMax, sequenceHighWatermark ?: 0L)
val next = base + 1L
sequenceHighWatermark = next
return next
}
fun buildEntry(
path: String,
operation: com.github.nullptroma.wallenc.domain.datatypes.StorageSyncOperation,
sequence: Long,
): StorageSyncJournalEntry {
return StorageSyncJournalEntry(
path = path,
operation = operation,
revision = StorageSyncRevision(
sequence = sequence,
actorId = syncActorId,
createdAt = java.time.Instant.now(),
),
originStorageUuid = originStorageUuid,
)
}
private suspend fun flushPendingUnderLock() {
val pending = pendingMutex.withLock {
if (pendingJournalEntries.isEmpty()) {
return
}
val snapshot = pendingJournalEntries
pendingJournalEntries = emptyMap()
snapshot
}
try {
val current = readJournal()
val merged = StorageSyncJournalMerge.merge(current, pending)
if (merged != current) {
writeJournal(merged)
}
refreshSequenceHighWatermark(merged)
} catch (e: Exception) {
pendingMutex.withLock {
pendingJournalEntries = StorageSyncJournalMerge.merge(pending, pendingJournalEntries)
}
throw e
}
}
private fun refreshSequenceHighWatermark(journal: StorageSyncJournal) {
val maxSeq = journal.values.maxOfOrNull { it.revision.sequence } ?: return
val current = sequenceHighWatermark
sequenceHighWatermark = if (current == null) maxSeq else maxOf(current, maxSeq)
}
}

View File

@@ -2,6 +2,7 @@ package com.github.nullptroma.wallenc.domain.vault.storages.encrypt
import com.fasterxml.jackson.module.kotlin.readValue
import com.github.nullptroma.wallenc.domain.errors.WallencException
import com.github.nullptroma.wallenc.domain.vault.storages.common.StorageSyncJournalBuffer
import com.github.nullptroma.wallenc.domain.vault.storages.common.StorageSyncJournalCodec
import com.github.nullptroma.wallenc.domain.vault.storages.common.readSystemFileBytesOrEmpty
import com.github.nullptroma.wallenc.domain.vault.utils.CloseHandledStreamExtension.Companion.onClosed
@@ -18,14 +19,12 @@ import com.github.nullptroma.wallenc.domain.interfaces.IFile
import com.github.nullptroma.wallenc.domain.interfaces.IMetaInfo
import com.github.nullptroma.wallenc.domain.interfaces.IStorageAccessor
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournal
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournalEntry
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournalMerge
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncPaths
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncLock
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncOperation
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncRevision
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.DisposableHandle
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.MutableStateFlow
@@ -52,6 +51,12 @@ class EncryptedStorageAccessor(
) : IStorageAccessor, DisposableHandle {
private val syncActorId = UUID.randomUUID().toString()
private val syncLockMutex = Mutex()
private val journalBuffer = StorageSyncJournalBuffer(
syncActorId = syncActorId,
originStorageUuid = null,
readJournal = { readSyncJournalUnchecked() },
writeJournal = { writeSyncJournal(it) },
)
private val _size = MutableStateFlow<Long?>(null)
override val size: StateFlow<Long?> = _size
@@ -110,22 +115,24 @@ class EncryptedStorageAccessor(
launch {
source.numberOfFiles.collect {
if(it == null)
if (it == null) {
_numberOfFiles.value = null
else
{
_numberOfFiles.value = it - getSystemFiles().size
} else {
val hiddenCount = runCatching { getSystemFiles().size }.getOrNull() ?: return@collect
_numberOfFiles.value = it - hiddenCount
}
}
}
launch {
source.size.collect { sourceSize ->
if(sourceSize == null)
if (sourceSize == null) {
_size.value = null
else
{
_size.value = sourceSize - getSystemFiles().sumOf { it.metaInfo.size }
} else {
val hiddenBytes = runCatching {
getSystemFiles().sumOf { file -> file.metaInfo.size }
}.getOrNull() ?: return@collect
_size.value = sourceSize - hiddenBytes
}
}
}
@@ -257,10 +264,12 @@ class EncryptedStorageAccessor(
source.touchDir(encryptPath(path))
}
override suspend fun delete(path: String) {
source.delete(encryptPath(path))
override suspend fun delete(path: String, recordSyncJournal: Boolean) {
source.delete(encryptPath(path), recordSyncJournal = false)
if (recordSyncJournal) {
appendSyncEntry(path = path, operation = StorageSyncOperation.DELETE)
}
}
override suspend fun openWrite(path: String, recordSyncJournal: Boolean): OutputStream =
openWriteInternal(path, recordJournal = recordSyncJournal)
@@ -270,12 +279,17 @@ class EncryptedStorageAccessor(
return dataEncryptor.decryptStream(stream)
}
override suspend fun moveToTrash(path: String) {
source.moveToTrash(encryptPath(path))
override suspend fun moveToTrash(path: String, recordSyncJournal: Boolean) {
source.moveToTrash(encryptPath(path), recordSyncJournal = false)
if (recordSyncJournal) {
appendSyncEntry(path = path, operation = StorageSyncOperation.TRASH)
}
}
override fun dispose() {
runBlocking {
runCatching { journalBuffer.flushPending() }
}
dataEncryptor.dispose()
}
@@ -297,16 +311,21 @@ class EncryptedStorageAccessor(
}
override suspend fun readSyncJournal(): StorageSyncJournal {
val bytes = readSystemFileBytesOrEmpty { openReadSystemFile(SYNC_JOURNAL_FILENAME) }
return StorageSyncJournalCodec.read(jackson, bytes)
journalBuffer.flushPending()
return readSyncJournalUnchecked()
}
override suspend fun flushPendingSyncJournal() {
journalBuffer.flushPending()
}
override suspend fun putSyncJournalEntries(entries: StorageSyncJournal) {
if (entries.isEmpty()) {
return
journalBuffer.putEntries(entries)
}
val merged = StorageSyncJournalMerge.merge(readSyncJournal(), entries)
writeSyncJournal(merged)
private suspend fun readSyncJournalUnchecked(): StorageSyncJournal {
val bytes = readSystemFileBytesOrEmpty { openReadSystemFile(SYNC_JOURNAL_FILENAME) }
return StorageSyncJournalCodec.read(jackson, bytes)
}
private suspend fun writeSyncJournal(journal: StorageSyncJournal) {
@@ -373,21 +392,9 @@ class EncryptedStorageAccessor(
if (!StorageSyncPaths.isSyncableUserPath(cleanedPath)) {
return
}
val journal = readSyncJournal()
val nextSequence = (journal.values.maxOfOrNull { it.revision.sequence } ?: 0L) + 1L
putSyncJournalEntries(
mapOf(
cleanedPath to StorageSyncJournalEntry(
path = cleanedPath,
operation = operation,
revision = StorageSyncRevision(
sequence = nextSequence,
actorId = syncActorId,
createdAt = Instant.now(),
),
),
),
)
val sequence = journalBuffer.nextSequence()
val entry = journalBuffer.buildEntry(cleanedPath, operation, sequence)
journalBuffer.appendEntry(cleanedPath, entry)
}
private suspend fun openWriteInternal(path: String, recordJournal: Boolean): OutputStream {

View File

@@ -1,6 +1,7 @@
package com.github.nullptroma.wallenc.domain.vault.storages.local
import com.github.nullptroma.wallenc.domain.errors.WallencException
import com.github.nullptroma.wallenc.domain.vault.storages.common.StorageSyncJournalBuffer
import com.github.nullptroma.wallenc.domain.vault.storages.common.StorageSyncJournalCodec
import com.github.nullptroma.wallenc.domain.vault.storages.common.readSystemFileBytesOrEmpty
@@ -26,6 +27,8 @@ import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncRevision
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.SharedFlow
@@ -73,6 +76,13 @@ class LocalStorageAccessor(
private val _dirsUpdates = MutableSharedFlow<DataPage<IDirectory>>()
override val dirsUpdates: SharedFlow<DataPage<IDirectory>> = _dirsUpdates
private val journalBuffer = StorageSyncJournalBuffer(
syncActorId = syncActorId,
originStorageUuid = null,
readJournal = { readSyncJournalUnchecked() },
writeJournal = { writeSyncJournal(it) },
)
suspend fun init() = withContext(ioDispatcher) {
// запускам сканирование хранилища
scanSizeAndNumOfFiles()
@@ -513,7 +523,7 @@ class LocalStorageAccessor(
createDir(path)
}
override suspend fun delete(path: String) = withContext(ioDispatcher) {
override suspend fun delete(path: String, recordSyncJournal: Boolean) = withContext(ioDispatcher) {
if (path == "/" || path.isBlank()) {
throw WallencException.Storage.DeleteRootForbidden()
}
@@ -523,9 +533,11 @@ class LocalStorageAccessor(
else pair.file.delete()
pair.metaFile.delete()
scanSizeAndNumOfFiles()
if (recordSyncJournal) {
appendSyncEntry(path = path, operation = StorageSyncOperation.DELETE)
}
}
}
override suspend fun openWrite(path: String, recordSyncJournal: Boolean): OutputStream = withContext(ioDispatcher) {
touchFileInternal(path, recordJournal = false)
@@ -548,13 +560,15 @@ class LocalStorageAccessor(
return@withContext pair.file.inputStream()
}
override suspend fun moveToTrash(path: String) = withContext(ioDispatcher) {
override suspend fun moveToTrash(path: String, recordSyncJournal: Boolean) = withContext(ioDispatcher) {
val pair = LocalStorageFilePair.from(_filesystemBasePath, path)
?: throw WallencException.Storage.FileNotFound()
val newMeta = pair.meta.copy(isDeleted = true)
writeMeta(pair.metaFile, newMeta)
if (recordSyncJournal) {
appendSyncEntry(path = path, operation = StorageSyncOperation.TRASH)
}
}
override suspend fun openReadSystemFile(name: String): InputStream = withContext(ioDispatcher) {
val dirPath = _filesystemBasePath.resolve(SYSTEM_HIDDEN_DIRNAME)
@@ -579,16 +593,21 @@ class LocalStorageAccessor(
}
override suspend fun readSyncJournal(): StorageSyncJournal = withContext(ioDispatcher) {
val bytes = readSystemFileBytesOrEmpty { openReadSystemFile(SYNC_JOURNAL_FILENAME) }
StorageSyncJournalCodec.read(jackson, bytes)
journalBuffer.flushPending()
readSyncJournalUnchecked()
}
override suspend fun flushPendingSyncJournal() = withContext(ioDispatcher) {
journalBuffer.flushPending()
}
override suspend fun putSyncJournalEntries(entries: StorageSyncJournal) = withContext(ioDispatcher) {
if (entries.isEmpty()) {
return@withContext
journalBuffer.putEntries(entries)
}
val merged = StorageSyncJournalMerge.merge(readSyncJournal(), entries)
writeSyncJournal(merged)
private suspend fun readSyncJournalUnchecked(): StorageSyncJournal {
val bytes = readSystemFileBytesOrEmpty { openReadSystemFile(SYNC_JOURNAL_FILENAME) }
return StorageSyncJournalCodec.read(jackson, bytes)
}
private suspend fun writeSyncJournal(journal: StorageSyncJournal) {
@@ -721,21 +740,9 @@ class LocalStorageAccessor(
if (!StorageSyncPaths.isSyncableUserPath(cleanedPath)) {
return
}
val journal = readSyncJournal()
val nextSequence = (journal.values.maxOfOrNull { it.revision.sequence } ?: 0L) + 1L
putSyncJournalEntries(
mapOf(
cleanedPath to StorageSyncJournalEntry(
path = cleanedPath,
operation = operation,
revision = StorageSyncRevision(
sequence = nextSequence,
actorId = syncActorId,
createdAt = Instant.now(),
),
),
),
)
val sequence = journalBuffer.nextSequence()
val entry = journalBuffer.buildEntry(cleanedPath, operation, sequence)
journalBuffer.appendEntry(cleanedPath, entry)
}
companion object {

View File

@@ -2,6 +2,7 @@ package com.github.nullptroma.wallenc.domain.vault.storages.yandex
import com.github.nullptroma.wallenc.domain.errors.WallencException
import com.github.nullptroma.wallenc.domain.vault.errors.toVaultWallencException
import com.github.nullptroma.wallenc.domain.vault.storages.common.StorageSyncJournalBuffer
import com.github.nullptroma.wallenc.domain.vault.storages.common.StorageSyncJournalCodec
import com.github.nullptroma.wallenc.domain.vault.storages.common.readSystemFileBytesOrEmpty
@@ -19,17 +20,16 @@ import com.github.nullptroma.wallenc.domain.interfaces.IDirectory
import com.github.nullptroma.wallenc.domain.interfaces.IFile
import com.github.nullptroma.wallenc.domain.interfaces.IStorageAccessor
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournal
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournalEntry
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournalMerge
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncPaths
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncLock
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncOperation
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncRevision
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.delay
import kotlin.coroutines.coroutineContext
import kotlinx.coroutines.ensureActive
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.MutableStateFlow
@@ -42,6 +42,7 @@ import kotlinx.coroutines.flow.flowOn
import kotlinx.coroutines.flow.stateIn
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withContext
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlinx.coroutines.withContext
@@ -95,6 +96,13 @@ class YandexStorageAccessor(
private var statsPersistJob: Job? = null
private val journalBuffer = StorageSyncJournalBuffer(
syncActorId = syncActorId,
originStorageUuid = storageUuid,
readJournal = { readSyncJournalUnchecked() },
writeJournal = { writeSyncJournal(it) },
)
@Volatile
private var systemDirEnsured: Boolean = false
@@ -240,6 +248,7 @@ class YandexStorageAccessor(
val queue = ArrayDeque<String>()
queue.add(relDir)
while (queue.isNotEmpty()) {
coroutineContext.ensureActive()
val rel = queue.removeFirst()
if (isSystemRel(rel)) continue
val (files, dirs) = listImmediateChildren(rel)
@@ -285,6 +294,7 @@ class YandexStorageAccessor(
val dirs = mutableListOf<IDirectory>()
var offset = 0
while (true) {
coroutineContext.ensureActive()
val res = guard { repo.list(diskPath, API_LIST_LIMIT, offset) }
val items = res.embedded?.items.orEmpty()
for (it in items) {
@@ -303,7 +313,7 @@ class YandexStorageAccessor(
}
private suspend fun getMetadataAfterWrite(diskPath: String): ResourceDto {
val maxAttempts = 6
val maxAttempts = 3
repeat(maxAttempts) { attempt ->
try {
return guard { repo.get(diskPath) }
@@ -488,9 +498,9 @@ class YandexStorageAccessor(
if (created) {
_numberOfFiles.value = (_numberOfFiles.value ?: 0) + 1
persistStatsImmediate()
}
appendSyncEntry(path = path, operation = StorageSyncOperation.UPSERT)
}
}
override suspend fun touchDir(path: String): Unit = withContext(ioDispatcher) {
val segments = pathSegments(path)
@@ -506,7 +516,7 @@ class YandexStorageAccessor(
}
}
override suspend fun delete(path: String) = withContext(ioDispatcher) {
override suspend fun delete(path: String, recordSyncJournal: Boolean) = withContext(ioDispatcher) {
if (path == "/" || path.isBlank()) {
throw WallencException.Storage.DeleteRootForbidden()
}
@@ -528,8 +538,10 @@ class YandexStorageAccessor(
}
guard { repo.delete(diskPath, permanently = true) }
scheduleStatsPersist()
if (recordSyncJournal) {
appendSyncEntry(path = path, operation = StorageSyncOperation.DELETE)
}
}
override suspend fun openWrite(path: String, recordSyncJournal: Boolean): OutputStream = withContext(ioDispatcher) {
touchParentDirs(path)
@@ -546,10 +558,12 @@ class YandexStorageAccessor(
guard { repo.openDownloadStream(toDiskPath(path)) }
}
override suspend fun moveToTrash(path: String) = withContext(ioDispatcher) {
override suspend fun moveToTrash(path: String, recordSyncJournal: Boolean) = withContext(ioDispatcher) {
patchCustomProps(path, mapOf(PROP_DELETED to "true"))
if (recordSyncJournal) {
appendSyncEntry(path = path, operation = StorageSyncOperation.TRASH)
}
}
override suspend fun openReadSystemFile(name: String): InputStream = withContext(ioDispatcher) {
ensureSystemDirExists()
@@ -576,16 +590,21 @@ class YandexStorageAccessor(
}
override suspend fun readSyncJournal(): StorageSyncJournal = withContext(ioDispatcher) {
val bytes = readSystemFileBytesOrEmpty { openReadSystemFile(SYNC_JOURNAL_FILENAME) }
StorageSyncJournalCodec.read(statsMapper, bytes)
journalBuffer.flushPending()
readSyncJournalUnchecked()
}
override suspend fun flushPendingSyncJournal() = withContext(ioDispatcher) {
journalBuffer.flushPending()
}
override suspend fun putSyncJournalEntries(entries: StorageSyncJournal) = withContext(ioDispatcher) {
if (entries.isEmpty()) {
return@withContext
journalBuffer.putEntries(entries)
}
val merged = StorageSyncJournalMerge.merge(readSyncJournal(), entries)
writeSyncJournal(merged)
private suspend fun readSyncJournalUnchecked(): StorageSyncJournal {
val bytes = readSystemFileBytesOrEmpty { openReadSystemFile(SYNC_JOURNAL_FILENAME) }
return StorageSyncJournalCodec.read(statsMapper, bytes)
}
private suspend fun writeSyncJournal(journal: StorageSyncJournal) {
@@ -668,10 +687,12 @@ class YandexStorageAccessor(
* Выполняется из [OutputStream.close]; ошибки upload пробрасываются вызывающему коду.
*/
private fun runCommitAfterStreamClosed(block: suspend () -> Unit) {
runBlocking(ioDispatcher) {
runBlocking {
withContext(ioDispatcher) {
block()
}
}
}
private suspend fun commitUploadedFile(path: String, tmp: File, recordSyncJournal: Boolean) {
try {
@@ -710,22 +731,9 @@ class YandexStorageAccessor(
if (!StorageSyncPaths.isSyncableUserPath(cleanedPath)) {
return
}
val journal = readSyncJournal()
val nextSequence = (journal.values.maxOfOrNull { it.revision.sequence } ?: 0L) + 1L
putSyncJournalEntries(
mapOf(
cleanedPath to StorageSyncJournalEntry(
path = cleanedPath,
operation = operation,
revision = StorageSyncRevision(
sequence = nextSequence,
actorId = syncActorId,
createdAt = Instant.now(),
),
originStorageUuid = storageUuid,
),
),
)
val sequence = journalBuffer.nextSequence()
val entry = journalBuffer.buildEntry(cleanedPath, operation, sequence)
journalBuffer.appendEntry(cleanedPath, entry)
}
private suspend fun touchParentDirs(path: String) {

View File

@@ -33,6 +33,7 @@ class YandexDiskRepositoryTest {
@Test
fun diskInfoParsesResponse() = runBlocking {
repository.resetCloudApiCallCount()
server.enqueue(
MockResponse()
.setResponseCode(200)
@@ -42,6 +43,7 @@ class YandexDiskRepositoryTest {
val info = repository.diskInfo()
assertEquals(1000L, info.totalSpace)
assertEquals(200L, info.usedSpace)
assertEquals(1L, repository.cloudApiCallCount())
}
@Test

View File

@@ -0,0 +1,41 @@
package com.github.nullptroma.wallenc.domain.vault.storages.common
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournalEntry
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncOperation
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncRevision
import kotlinx.coroutines.runBlocking
import org.junit.Assert.assertEquals
import org.junit.Assert.assertTrue
import org.junit.Test
import java.time.Instant
import java.util.concurrent.atomic.AtomicReference
class StorageSyncJournalBufferTest {
@Test
fun flushRestoresPendingOnWriteFailure() = runBlocking {
val disk = AtomicReference(emptyMap<String, StorageSyncJournalEntry>())
var shouldFail = true
val buffer = StorageSyncJournalBuffer(
syncActorId = "actor",
originStorageUuid = null,
readJournal = { disk.get() },
writeJournal = {
if (shouldFail) {
error("disk unavailable")
}
disk.set(it)
},
)
val entry = buffer.buildEntry("/a.txt", StorageSyncOperation.UPSERT, 1L)
try {
buffer.appendEntry("/a.txt", entry)
} catch (_: IllegalStateException) {
// expected
}
shouldFail = false
buffer.flushPending()
assertTrue(disk.get().containsKey("/a.txt"))
assertEquals(1L, disk.get()["/a.txt"]?.revision?.sequence)
}
}

View File

@@ -39,10 +39,10 @@ interface IStorageAccessor {
suspend fun setHidden(path: String, hidden: Boolean)
suspend fun touchFile(path: String)
suspend fun touchDir(path: String)
suspend fun delete(path: String)
suspend fun delete(path: String, recordSyncJournal: Boolean = true)
suspend fun openWrite(path: String, recordSyncJournal: Boolean = true): OutputStream
suspend fun openRead(path: String): InputStream
suspend fun moveToTrash(path: String)
suspend fun moveToTrash(path: String, recordSyncJournal: Boolean = true)
/**
* Системный sidecar-файл для логических нужд хранилища (мета, ключи и т.п.).
@@ -53,6 +53,10 @@ interface IStorageAccessor {
suspend fun openWriteSystemFile(name: String): OutputStream
suspend fun readSyncJournal(): StorageSyncJournal
/** Сбрасывает отложенные записи журнала на носитель (перед sync и при закрытии storage). */
suspend fun flushPendingSyncJournal() = Unit
suspend fun putSyncJournalEntries(entries: StorageSyncJournal)
suspend fun readSyncLock(): StorageSyncLock?

View File

@@ -0,0 +1,6 @@
package com.github.nullptroma.wallenc.domain.tasks
/** Локализованный заголовок задачи синхронизации хранилищ по источнику запуска. */
fun interface IStorageSyncTaskTitleFormatter {
fun format(reason: StorageSyncTriggerReason): String
}

View File

@@ -14,4 +14,7 @@ interface TaskContext {
fun log(level: TaskLogLevel, key: TaskLogKey)
fun fail(error: WallencException): Nothing
/** Проверяет, что задача не отменена; бросает [kotlinx.coroutines.CancellationException] при отмене. */
suspend fun ensureNotCancelled()
}

View File

@@ -22,6 +22,8 @@ import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.delay
import kotlin.coroutines.coroutineContext
import kotlinx.coroutines.ensureActive
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.asStateFlow
@@ -238,6 +240,10 @@ class TaskOrchestrator(
override fun fail(error: WallencException): Nothing {
throw TaskFailedException(error)
}
override suspend fun ensureNotCancelled() {
coroutineContext.ensureActive()
}
}
private class TaskFailedException(val error: WallencException) : RuntimeException()

View File

@@ -32,6 +32,24 @@ class TaskOrchestratorTest {
assertTrue(task.state is TaskRunState.Completed)
}
@Test
fun cancelAllMarksRunningTaskCancelled() = runTest(dispatcher) {
val orchestrator = TaskOrchestrator(dispatcher)
val id = orchestrator.enqueue(
title = "Long",
dispatcher = dispatcher,
work = { ctx ->
ctx.reportProgress(null, null)
kotlinx.coroutines.delay(60_000)
},
)
advanceTimeBy(1)
orchestrator.cancelAll()
advanceUntilIdle()
val task = orchestrator.pipelineState.value.tasks.first { it.id == id }
assertTrue(task.state is TaskRunState.Cancelled)
}
@Test
fun cancelMarksTaskCancelled() = runTest(dispatcher) {
val orchestrator = TaskOrchestrator(dispatcher)

View File

@@ -23,6 +23,9 @@ fun TaskLogKey.resolve(resolver: UiStringResolver): String = when (this) {
}
}
fun UiStringResolver.storageSyncTaskTitle(reason: StorageSyncTriggerReason): String =
this(R.string.task_title_storage_sync, resolveSyncTriggerReason(reason))
private fun UiStringResolver.resolveSyncTriggerReason(reason: StorageSyncTriggerReason): String =
when (reason) {
StorageSyncTriggerReason.Debounce -> this(R.string.task_sync_trigger_debounce)

View File

@@ -5,6 +5,7 @@ import androidx.lifecycle.viewModelScope
import com.github.nullptroma.wallenc.domain.datatypes.EncryptKey
import com.github.nullptroma.wallenc.domain.datatypes.StorageMetaLoadState
import com.github.nullptroma.wallenc.domain.datatypes.Tree
import com.github.nullptroma.wallenc.domain.errors.WallencException
import com.github.nullptroma.wallenc.domain.errors.toWallencException
import com.github.nullptroma.wallenc.domain.interfaces.ILogger
import com.github.nullptroma.wallenc.domain.interfaces.IStorage
@@ -188,9 +189,14 @@ abstract class AbstractVaultBrowserViewModel(
ctx.log(TaskLogLevel.Info, uiStrings(R.string.task_log_rescanning_vault_storages))
manageVaultUseCase.rescanStorages(vaultUuid)
ctx.log(TaskLogLevel.Info, uiStrings(R.string.task_log_rescan_vault_storages_done))
val vault = manageVaultUseCase.find(vaultUuid)
if (vault != null && !vault.isAvailable.value) {
emitTaskError(WallencException.Network.IoFailed())
}
} catch (e: Exception) {
logger.debug(TAG, "rescanStorages failed: ${e.stackTraceToString()}")
ctx.log(TaskLogLevel.Error, uiStrings(R.string.task_log_rescan_vault_storages_failed))
emitTaskError(e)
}
},
)
@@ -224,7 +230,7 @@ abstract class AbstractVaultBrowserViewModel(
} catch (e: Exception) {
logger.debug(TAG, "createStorage failed: ${e.stackTraceToString()}")
ctx.log(TaskLogLevel.Error, uiStrings(R.string.task_log_add_vault_failed))
throw e
emitTaskError(e)
}
},
)

View File

@@ -208,10 +208,7 @@ class StorageSyncViewModel @Inject constructor(
}
fun runSyncNow() {
val started = runStorageSyncUseCase.enqueue(
displayTitle = uiStrings(R.string.task_title_storage_sync),
reason = StorageSyncTriggerReason.SyncTab,
)
val started = runStorageSyncUseCase.enqueue(StorageSyncTriggerReason.SyncTab)
if (!started) {
updateState(
state.value.copy(

View File

@@ -149,8 +149,7 @@
<string name="task_title_remove_remote_vault">Удаление удалённого хранилища</string>
<string name="task_title_retry_remote_vault">Повторное подключение удалённого хранилища</string>
<string name="task_title_rescan_vault_storages">Обновление списка хранилищ</string>
<string name="task_title_storage_sync">Синхронизация хранилищ</string>
<string name="task_title_storage_sync_background">Фоновая синхронизация хранилищ</string>
<string name="task_title_storage_sync">Синхронизация хранилищ (%1$s)</string>
<string name="task_title_save_2fa_token">Сохранение 2FA токена</string>
<string name="task_title_delete_2fa_token">Удаление 2FA токена</string>
<string name="task_title_save_text_secret">Сохранение текстового секрета</string>

View File

@@ -149,8 +149,7 @@
<string name="task_title_remove_remote_vault">Remove remote vault</string>
<string name="task_title_retry_remote_vault">Retry remote vault connection</string>
<string name="task_title_rescan_vault_storages">Rescan vault storages</string>
<string name="task_title_storage_sync">Storage sync</string>
<string name="task_title_storage_sync_background">Background storage sync</string>
<string name="task_title_storage_sync">Storage sync (%1$s)</string>
<string name="task_title_save_2fa_token">Save 2FA token</string>
<string name="task_title_delete_2fa_token">Delete 2FA token</string>
<string name="task_title_save_text_secret">Save text secret</string>

View File

@@ -3,15 +3,19 @@ package com.github.nullptroma.wallenc.usecases
import com.github.nullptroma.wallenc.domain.errors.toWallencException
import com.github.nullptroma.wallenc.domain.interfaces.IStorageSyncEngine
import com.github.nullptroma.wallenc.domain.tasks.ITaskOrchestrator
import com.github.nullptroma.wallenc.domain.tasks.IStorageSyncTaskTitleFormatter
import com.github.nullptroma.wallenc.domain.tasks.StorageSyncTriggerReason
import com.github.nullptroma.wallenc.domain.tasks.TaskId
import com.github.nullptroma.wallenc.domain.tasks.TaskLogKey
import com.github.nullptroma.wallenc.domain.tasks.TaskLogLevel
import com.github.nullptroma.wallenc.domain.tasks.TaskProgressLabel
import com.github.nullptroma.wallenc.domain.tasks.TaskRunState
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.asStateFlow
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.first
import java.util.concurrent.atomic.AtomicBoolean
import javax.inject.Inject
import javax.inject.Singleton
@@ -21,6 +25,7 @@ class RunStorageSyncUseCase @Inject constructor(
private val orchestrator: ITaskOrchestrator,
private val syncEngine: IStorageSyncEngine,
private val syncReadiness: StorageSyncReadiness,
private val taskTitleFormatter: IStorageSyncTaskTitleFormatter,
) {
private val running = AtomicBoolean(false)
@@ -30,25 +35,29 @@ class RunStorageSyncUseCase @Inject constructor(
private val _activeSyncTaskId = MutableStateFlow<TaskId?>(null)
val activeSyncTaskId: StateFlow<TaskId?> = _activeSyncTaskId.asStateFlow()
/** Не реагировать на debounce до этого момента (мс с эпохи) после завершения sync. */
private val _debounceSuppressUntilMs = MutableStateFlow(0L)
val debounceSuppressUntilMs: StateFlow<Long> = _debounceSuppressUntilMs.asStateFlow()
/**
* @param displayTitle заголовок задачи в UI (локализованный на стороне вызова)
* @param reason источник запуска — попадает в лог пайплайна
* @param reason источник запуска — заголовок задачи и лог пайплайна
* @return false, если синхронизация уже в очереди или выполняется — новая задача не создана
*/
fun enqueue(displayTitle: String, reason: StorageSyncTriggerReason): Boolean {
fun enqueue(reason: StorageSyncTriggerReason): Boolean {
if (!running.compareAndSet(false, true)) {
return false
}
_syncRunning.value = true
try {
val taskId = orchestrator.enqueue(
title = displayTitle,
title = taskTitleFormatter.format(reason),
dispatcher = Dispatchers.IO,
work = { ctx ->
try {
executeSync(
reason = reason,
reportProgress = { fraction, label ->
ctx.ensureNotCancelled()
ctx.reportProgress(fraction, label)
},
log = { level, key -> ctx.log(level, key) },
@@ -68,19 +77,22 @@ class RunStorageSyncUseCase @Inject constructor(
}
}
suspend fun runBlocking(reason: StorageSyncTriggerReason) {
if (!running.compareAndSet(false, true)) {
return
/**
* Ставит sync в пайплайн задач (как debounce / sync-tab) и ждёт завершения.
* Для WorkManager и других фоновых запусков без отдельного «orphan»-лога.
*/
suspend fun enqueueAndAwait(reason: StorageSyncTriggerReason): StorageSyncRunOutcome {
if (!enqueue(reason)) {
return StorageSyncRunOutcome.SkippedAlreadyRunning
}
_syncRunning.value = true
try {
executeSync(
reason = reason,
reportProgress = { _, _ -> },
log = { level, key -> orchestrator.appendPipelineLog(level, key) },
)
} finally {
clearRunningState()
val taskId = _activeSyncTaskId.value
?: return StorageSyncRunOutcome.Completed
syncRunning.filter { !it }.first()
val state = orchestrator.pipelineState.value.tasks.find { it.id == taskId }?.state
return when (state) {
is TaskRunState.Failed -> StorageSyncRunOutcome.Failed(state.error)
TaskRunState.Cancelled -> StorageSyncRunOutcome.Cancelled
else -> StorageSyncRunOutcome.Completed
}
}
@@ -89,6 +101,7 @@ class RunStorageSyncUseCase @Inject constructor(
reportProgress: suspend (fraction: Float?, label: TaskProgressLabel?) -> Unit,
log: (TaskLogLevel, TaskLogKey) -> Unit,
) {
try {
syncReadiness.awaitReady()
log(TaskLogLevel.Info, TaskLogKey.SyncStarted(reason))
reportProgress(null, TaskProgressLabel.SyncStarted)
@@ -103,6 +116,13 @@ class RunStorageSyncUseCase @Inject constructor(
log(TaskLogLevel.Error, TaskLogKey.SyncFailed(err, reason))
throw e
}
} finally {
extendDebounceSuppress()
}
}
private fun extendDebounceSuppress() {
_debounceSuppressUntilMs.value = System.currentTimeMillis() + DEBOUNCE_AFTER_CHANGE_MS
}
private fun clearRunningState() {
@@ -110,4 +130,9 @@ class RunStorageSyncUseCase @Inject constructor(
_syncRunning.value = false
_activeSyncTaskId.value = null
}
companion object {
/** Пауза после последнего изменения перед debounce-sync; же окно подавления после sync. */
const val DEBOUNCE_AFTER_CHANGE_MS = 60_000L
}
}

View File

@@ -12,6 +12,11 @@ import com.github.nullptroma.wallenc.domain.interfaces.IStorageSyncGroupStore
import com.github.nullptroma.wallenc.domain.interfaces.IVaultsManager
import com.github.nullptroma.wallenc.domain.tasks.TaskProgressLabel
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.async
import kotlinx.coroutines.awaitAll
import kotlinx.coroutines.coroutineScope
import kotlin.coroutines.coroutineContext
import kotlinx.coroutines.ensureActive
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlinx.coroutines.withContext
@@ -48,6 +53,7 @@ class StorageSyncEngine @Inject constructor(
}
reporter(null, TaskProgressLabel.SyncPreparing(groups.size))
for (group in groups) {
coroutineContext.ensureActive()
syncGroupInternal(
groupId = group.id,
reportProgress = reporter,
@@ -104,6 +110,7 @@ class StorageSyncEngine @Inject constructor(
try {
reportProgress(null, TaskProgressLabel.SyncGroupAcquiringLocks(groupId))
for ((lockIndex, storage) in storages.withIndex()) {
coroutineContext.ensureActive()
reportProgress(
null,
TaskProgressLabel.SyncGroupLockProgress(groupId, lockIndex + 1, storages.size),
@@ -120,7 +127,6 @@ class StorageSyncEngine @Inject constructor(
val entriesByStorage = mutableMapOf<UUID, StorageSyncJournal>()
reportProgress(null, TaskProgressLabel.SyncGroupReadingJournals(groupId))
for ((journalIndex, storage) in storages.withIndex()) {
leaseUntil = renewLocksIfNeeded(
groupId = groupId,
lockedAccessors = lockedAccessors,
@@ -131,11 +137,24 @@ class StorageSyncEngine @Inject constructor(
reportProgress(null, TaskProgressLabel.SyncGroupCancelled(groupId))
return
}
val journalReads = coroutineScope {
storages.mapIndexed { journalIndex, storage ->
async {
coroutineContext.ensureActive()
reportProgress(
null,
TaskProgressLabel.SyncGroupJournalProgress(groupId, journalIndex + 1, storages.size),
TaskProgressLabel.SyncGroupJournalProgress(
groupId,
journalIndex + 1,
storages.size,
),
)
val journal = filterSyncableJournal(storage.accessor.readSyncJournal())
storage.accessor.flushPendingSyncJournal()
storage to filterSyncableJournal(storage.accessor.readSyncJournal())
}
}.awaitAll()
}
for ((storage, journal) in journalReads) {
entriesByStorage[storage.uuid] = journal
mergedByPath.putAll(
StorageSyncJournalMerge.merge(mergedByPath, journal),
@@ -154,6 +173,7 @@ class StorageSyncEngine @Inject constructor(
)
var applyFailures = 0
for ((pathIndex, merged) in mergedEntries.withIndex()) {
coroutineContext.ensureActive()
leaseUntil = renewLocksIfNeeded(
groupId = groupId,
lockedAccessors = lockedAccessors,
@@ -178,6 +198,7 @@ class StorageSyncEngine @Inject constructor(
}
for (target in storages) {
coroutineContext.ensureActive()
if (target.uuid == sourceStorage?.uuid) {
continue
}
@@ -231,6 +252,7 @@ class StorageSyncEngine @Inject constructor(
val nextLeaseUntil = now.plusSeconds(SYNC_LOCK_LEASE_SECONDS)
reportProgress(null, TaskProgressLabel.SyncGroupRenewingLocks(groupId))
for (accessor in lockedAccessors) {
coroutineContext.ensureActive()
val renewed = runCatching {
accessor.tryAcquireSyncLock(holderId, nextLeaseUntil)
}.getOrElse { false }
@@ -273,13 +295,13 @@ class StorageSyncEngine @Inject constructor(
val result = when (entry.operation) {
StorageSyncOperation.DELETE -> {
runCatching {
target.accessor.delete(entry.path)
target.accessor.delete(entry.path, recordSyncJournal = false)
}
}
StorageSyncOperation.TRASH -> {
runCatching {
target.accessor.moveToTrash(entry.path)
target.accessor.moveToTrash(entry.path, recordSyncJournal = false)
}
}

View File

@@ -3,6 +3,8 @@ package com.github.nullptroma.wallenc.usecases
import com.github.nullptroma.wallenc.domain.interfaces.IStorageSyncGroupStore
import com.github.nullptroma.wallenc.domain.interfaces.IVaultsManager
import kotlinx.coroutines.delay
import kotlin.coroutines.coroutineContext
import kotlinx.coroutines.ensureActive
import javax.inject.Inject
import javax.inject.Singleton
@@ -27,6 +29,7 @@ class StorageSyncReadiness @Inject constructor(
val deadline = System.currentTimeMillis() + timeoutMs
while (System.currentTimeMillis() < deadline) {
coroutineContext.ensureActive()
if (!isAnyVaultScanning()) {
break
}
@@ -34,6 +37,7 @@ class StorageSyncReadiness @Inject constructor(
}
while (System.currentTimeMillis() < deadline) {
coroutineContext.ensureActive()
if (requiredUuids.all { uuid -> findStorageUseCase.find(uuid) != null }) {
return
}

View File

@@ -0,0 +1,16 @@
package com.github.nullptroma.wallenc.usecases
import com.github.nullptroma.wallenc.domain.errors.WallencException
/** Результат [RunStorageSyncUseCase.enqueueAndAwait]. */
sealed class StorageSyncRunOutcome {
/** Синхронизация уже выполнялась — новая задача не создана. */
data object SkippedAlreadyRunning : StorageSyncRunOutcome()
data object Completed : StorageSyncRunOutcome()
/** Задача синхронизации отменена пользователем или пайплайном. */
data object Cancelled : StorageSyncRunOutcome()
data class Failed(val error: WallencException) : StorageSyncRunOutcome()
}

View File

@@ -2,6 +2,7 @@ package com.github.nullptroma.wallenc.usecases
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournalEntry
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncOperation
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncPaths
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncRevision
import com.github.nullptroma.wallenc.domain.interfaces.StorageSyncGroup
import com.github.nullptroma.wallenc.domain.interfaces.StorageSyncGroupEncryptionKind
@@ -10,8 +11,11 @@ import com.github.nullptroma.wallenc.usecases.fakes.FakeStorage
import com.github.nullptroma.wallenc.usecases.fakes.FakeStorageAccessor
import com.github.nullptroma.wallenc.usecases.fakes.FakeStorageSyncGroupStore
import com.github.nullptroma.wallenc.usecases.fakes.FakeVaultsManager
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.async
import kotlinx.coroutines.runBlocking
import org.junit.Assert.assertArrayEquals
import org.junit.Assert.assertEquals
import org.junit.Assert.assertNull
import org.junit.Assert.assertTrue
import org.junit.Test
@@ -19,6 +23,8 @@ import java.time.Instant
class StorageSyncEngineTest {
private fun norm(path: String): String = StorageSyncPaths.normalize(path)
@Test
fun syncAllGroupsReportsNoGroupsWhenEmpty() = runBlocking {
val labels = mutableListOf<TaskProgressLabel?>()
@@ -63,7 +69,7 @@ class StorageSyncEngineTest {
assertArrayEquals(payload, target.fileBytes(path))
assertTrue(labels.any { it is TaskProgressLabel.SyncGroupCompleted })
assertTrue(target.syncJournalEntries().any { it.path == path })
assertTrue(target.syncJournalEntries().any { it.path == norm(path) })
}
@Test
@@ -111,6 +117,81 @@ class StorageSyncEngineTest {
engine.syncGroup(group.id) { _, _ -> }
assertNull(target.fileBytes(path))
val targetEntry = target.syncJournalEntries().single { it.path == norm(path) }
assertEquals(2L, targetEntry.revision.sequence)
assertEquals("actor-b", targetEntry.revision.actorId)
assertEquals(StorageSyncOperation.DELETE, targetEntry.operation)
}
@Test
fun syncSkipsWhenTargetRevisionAlreadyWinner() = runBlocking {
val source = FakeStorage()
val target = FakeStorage()
val path = "already-synced.txt"
val payload = "same".encodeToByteArray()
source.putFile(path, payload)
target.putFile(path, payload)
val winner = StorageSyncJournalEntry(
path = path,
operation = StorageSyncOperation.UPSERT,
revision = StorageSyncRevision(
sequence = 5L,
actorId = "winner-actor",
createdAt = Instant.parse("2024-08-01T00:00:00Z"),
),
size = payload.size.toLong(),
)
source.addSyncJournalEntry(winner)
target.addSyncJournalEntry(winner)
val group = StorageSyncGroup(
id = "skip-group",
storageUuids = setOf(source.uuid, target.uuid),
encryptionKind = StorageSyncGroupEncryptionKind.NONE,
)
val engine = createEngine(
storages = listOf(source, target),
groups = listOf(group),
)
engine.syncGroup(group.id) { _, _ -> }
val targetJournal = target.syncJournalEntries().single { it.path == norm(path) }
assertEquals(winner.revision, targetJournal.revision)
assertEquals(winner.operation, targetJournal.operation)
}
@Test
fun openReadDoesNotChangeJournal() = runBlocking {
val storage = FakeStorage()
val path = "read-only.txt"
storage.putFile(path, "data".encodeToByteArray())
val before = storage.syncJournalEntries().size
storage.accessor.openRead(path).use { it.readBytes() }
assertEquals(before, storage.syncJournalEntries().size)
}
@Test
fun deleteWithRecordSyncJournalFalseDoesNotBumpSequence() = runBlocking {
val storage = FakeStorage()
val path = "to-delete.txt"
storage.putFile(path, "x".encodeToByteArray())
storage.addSyncJournalEntry(
StorageSyncJournalEntry(
path = path,
operation = StorageSyncOperation.UPSERT,
revision = StorageSyncRevision(10L, "prior", Instant.EPOCH),
),
)
storage.accessor.delete(path, recordSyncJournal = false)
assertNull(storage.fileBytes(path))
val entry = storage.syncJournalEntries().single { it.path == norm(path) }
assertEquals(10L, entry.revision.sequence)
assertEquals(StorageSyncOperation.UPSERT, entry.operation)
}
@Test
@@ -145,7 +226,11 @@ class StorageSyncEngineTest {
engine.syncGroup(group.id) { _, _ -> }
assertArrayEquals(payload, target.fileBytes(path))
assertTrue(path in (target.accessor as FakeStorageAccessor).trashedPaths)
assertTrue(norm(path) in (target.accessor as FakeStorageAccessor).trashedPaths)
val targetEntry = target.syncJournalEntries().single { it.path == norm(path) }
assertEquals(3L, targetEntry.revision.sequence)
assertEquals("actor-trash", targetEntry.revision.actorId)
assertEquals(StorageSyncOperation.TRASH, targetEntry.operation)
}
@Test
@@ -171,6 +256,113 @@ class StorageSyncEngineTest {
)
engine.syncGroup(group.id) { _, label -> labels.add(label) }
assertTrue(labels.any { it is TaskProgressLabel.SyncGroupLockFailed })
assertNull((first.accessor as FakeStorageAccessor).syncLock)
assertNull((second.accessor as FakeStorageAccessor).syncLock)
}
@Test
fun syncGroupReleasesLocksAfterSuccessfulSync() = runBlocking {
val source = FakeStorage()
val target = FakeStorage()
source.addSyncJournalEntry(
StorageSyncJournalEntry(
path = "a.txt",
operation = StorageSyncOperation.UPSERT,
revision = StorageSyncRevision(1L, "x", Instant.EPOCH),
),
)
source.putFile("a.txt", "x".encodeToByteArray())
val group = StorageSyncGroup(
id = "ok",
storageUuids = setOf(source.uuid, target.uuid),
encryptionKind = StorageSyncGroupEncryptionKind.NONE,
)
val engine = createEngine(
storages = listOf(source, target),
groups = listOf(group),
)
engine.syncGroup(group.id) { _, _ -> }
assertNull((source.accessor as FakeStorageAccessor).syncLock)
assertNull((target.accessor as FakeStorageAccessor).syncLock)
}
@Test
fun syncGroupReleasesLocksWhenJournalReadFails() = runBlocking {
val first = FakeStorage()
val second = FakeStorage()
(first.accessor as FakeStorageAccessor).readSyncJournalThrows =
IllegalStateException("journal read failed")
val group = StorageSyncGroup(
id = "journal-fail",
storageUuids = setOf(first.uuid, second.uuid),
encryptionKind = StorageSyncGroupEncryptionKind.NONE,
)
val engine = createEngine(
storages = listOf(first, second),
groups = listOf(group),
)
runCatching { engine.syncGroup(group.id) { _, _ -> } }
assertNull((first.accessor as FakeStorageAccessor).syncLock)
assertNull((second.accessor as FakeStorageAccessor).syncLock)
}
@Test
fun syncGroupCooperativeCancellationReleasesLocks() = runBlocking {
val source = FakeStorage()
val target = FakeStorage()
val path = "slow.txt"
val payload = "payload".encodeToByteArray()
source.putFile(path, payload)
(source.accessor as FakeStorageAccessor).openReadDelayMs = 5_000
source.addSyncJournalEntry(
StorageSyncJournalEntry(
path = path,
operation = StorageSyncOperation.UPSERT,
revision = StorageSyncRevision(1L, "actor", Instant.EPOCH),
size = payload.size.toLong(),
),
)
val group = StorageSyncGroup(
id = "cancel-group",
storageUuids = setOf(source.uuid, target.uuid),
encryptionKind = StorageSyncGroupEncryptionKind.NONE,
)
val engine = createEngine(
storages = listOf(source, target),
groups = listOf(group),
)
val job = async {
engine.syncGroup(group.id) { _, _ -> }
}
kotlinx.coroutines.delay(50)
job.cancel()
try {
job.await()
} catch (_: CancellationException) {
// expected
}
assertNull((source.accessor as FakeStorageAccessor).syncLock)
assertNull((target.accessor as FakeStorageAccessor).syncLock)
}
@Test
fun syncGroupReleasesLocksWhenJournalEmpty() = runBlocking {
val first = FakeStorage()
val second = FakeStorage()
val group = StorageSyncGroup(
id = "empty-journal",
storageUuids = setOf(first.uuid, second.uuid),
encryptionKind = StorageSyncGroupEncryptionKind.NONE,
)
val labels = mutableListOf<TaskProgressLabel?>()
val engine = createEngine(
storages = listOf(first, second),
groups = listOf(group),
)
engine.syncGroup(group.id) { _, label -> labels.add(label) }
assertTrue(labels.any { it is TaskProgressLabel.SyncGroupNoJournalEntries })
assertNull((first.accessor as FakeStorageAccessor).syncLock)
assertNull((second.accessor as FakeStorageAccessor).syncLock)
}
private fun createEngine(

View File

@@ -41,10 +41,12 @@ class FakeStorage(
}
fun putFile(path: String, bytes: ByteArray) {
accessorImpl.dataFiles[path] = bytes
accessorImpl.dataFiles[com.github.nullptroma.wallenc.domain.datatypes.StorageSyncPaths.normalize(path)] =
bytes
}
fun fileBytes(path: String): ByteArray? = accessorImpl.dataFiles[path]
fun fileBytes(path: String): ByteArray? =
accessorImpl.dataFiles[com.github.nullptroma.wallenc.domain.datatypes.StorageSyncPaths.normalize(path)]
fun addSyncJournalEntry(entry: StorageSyncJournalEntry) {
accessorImpl.syncJournal = StorageSyncJournalMerge.merge(

View File

@@ -5,10 +5,14 @@ import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournal
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournalEntry
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournalMerge
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncLock
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncOperation
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncPaths
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncRevision
import com.github.nullptroma.wallenc.domain.interfaces.IDirectory
import com.github.nullptroma.wallenc.domain.interfaces.IFile
import com.github.nullptroma.wallenc.domain.interfaces.IMetaInfo
import com.github.nullptroma.wallenc.domain.interfaces.IStorageAccessor
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.MutableStateFlow
@@ -23,6 +27,8 @@ import java.time.Instant
class FakeStorageAccessor : IStorageAccessor {
val dataFiles: MutableMap<String, ByteArray> = mutableMapOf()
private fun norm(path: String): String = StorageSyncPaths.normalize(path)
val trashedPaths: MutableSet<String> = mutableSetOf()
private val systemFiles: MutableMap<String, ByteArray> = mutableMapOf()
private val _filesUpdates = MutableSharedFlow<DataPage<IFile>>(extraBufferCapacity = 16)
@@ -30,6 +36,8 @@ class FakeStorageAccessor : IStorageAccessor {
var syncJournal: StorageSyncJournal = emptyMap()
var syncLock: StorageSyncLock? = null
var acquireLockResult: Boolean = true
var readSyncJournalThrows: Throwable? = null
var openReadDelayMs: Long = 0
override val size: StateFlow<Long?> = MutableStateFlow(0L)
override val numberOfFiles: StateFlow<Int?> = MutableStateFlow(0)
@@ -50,7 +58,9 @@ class FakeStorageAccessor : IStorageAccessor {
override fun getDirsFlow(path: String): Flow<DataPage<IDirectory>> = emptyFlow()
override suspend fun getFileInfo(path: String): IFile {
error("Not implemented in tests")
val key = norm(path)
val bytes = dataFiles[key] ?: throw IllegalStateException("File not found: $path")
return FakeFile(key, bytes.size.toLong())
}
override suspend fun getDirInfo(path: String): IDirectory {
@@ -63,14 +73,17 @@ class FakeStorageAccessor : IStorageAccessor {
override suspend fun touchDir(path: String) = Unit
override suspend fun delete(path: String) {
dataFiles.remove(path)
override suspend fun delete(path: String, recordSyncJournal: Boolean) {
dataFiles.remove(norm(path))
if (recordSyncJournal) {
recordDeleteJournal(path)
}
}
override suspend fun openWrite(path: String, recordSyncJournal: Boolean): OutputStream {
return object : ByteArrayOutputStream() {
override fun close() {
dataFiles[path] = toByteArray()
dataFiles[norm(path)] = toByteArray()
_filesUpdates.tryEmit(
DataPage(
listOf(FakeFile(path)),
@@ -83,13 +96,20 @@ class FakeStorageAccessor : IStorageAccessor {
}
override suspend fun openRead(path: String): InputStream {
val bytes = dataFiles[path] ?: throw IllegalStateException("File not found: $path")
if (openReadDelayMs > 0) {
delay(openReadDelayMs)
}
val bytes = dataFiles[norm(path)] ?: throw IllegalStateException("File not found: $path")
return ByteArrayInputStream(bytes)
}
override suspend fun moveToTrash(path: String) {
if (path in dataFiles) {
trashedPaths.add(path)
override suspend fun moveToTrash(path: String, recordSyncJournal: Boolean) {
val key = norm(path)
if (key in dataFiles) {
trashedPaths.add(key)
if (recordSyncJournal) {
recordTrashJournal(path)
}
}
}
@@ -106,12 +126,43 @@ class FakeStorageAccessor : IStorageAccessor {
}
}
override suspend fun readSyncJournal(): StorageSyncJournal = syncJournal
override suspend fun readSyncJournal(): StorageSyncJournal {
readSyncJournalThrows?.let { throw it }
return syncJournal
}
override suspend fun flushPendingSyncJournal() = Unit
override suspend fun putSyncJournalEntries(entries: StorageSyncJournal) {
syncJournal = StorageSyncJournalMerge.merge(syncJournal, entries)
}
private suspend fun recordDeleteJournal(path: String) {
appendJournalEntry(path, StorageSyncOperation.DELETE)
}
private suspend fun recordTrashJournal(path: String) {
appendJournalEntry(path, StorageSyncOperation.TRASH)
}
private suspend fun appendJournalEntry(path: String, operation: StorageSyncOperation) {
val cleaned = norm(path)
val nextSequence = (syncJournal.values.maxOfOrNull { it.revision.sequence } ?: 0L) + 1L
putSyncJournalEntries(
mapOf(
cleaned to StorageSyncJournalEntry(
path = cleaned,
operation = operation,
revision = StorageSyncRevision(
sequence = nextSequence,
actorId = "fake-actor",
createdAt = Instant.now(),
),
),
),
)
}
override suspend fun readSyncLock(): StorageSyncLock? = syncLock
override suspend fun tryAcquireSyncLock(holderId: String, leaseUntil: Instant): Boolean {
@@ -131,9 +182,12 @@ class FakeStorageAccessor : IStorageAccessor {
}
}
class FakeFile(path: String) : IFile {
class FakeFile(
path: String,
size: Long = 0L,
) : IFile {
override val metaInfo: IMetaInfo = object : IMetaInfo {
override val size: Long = 0L
override val size: Long = size
override val isDeleted: Boolean = false
override val isHidden: Boolean = false
override val lastModified: Instant = Instant.now()