Compare commits

..

7 Commits

Author SHA1 Message Date
96e9de49c3 Более аккуратная обработка сетвой ошибки на экране Vault 2026-05-22 13:33:14 +03:00
6ab402da51 perf(yandex): сузил инвалидацию кэша Disk API и добавил счётчик запросов
Инвалидирую list/get по префиксу пути вместо полной очистки, учитываю вызовы
в cloudApiCallCount для замеров.
2026-05-22 13:22:17 +03:00
2618df41e3 feat(sync): добавил cooperative-отмену sync и pipeline-задач
ensureActive в StorageSyncEngine, flush журнала перед чтением, Cancelled
в StorageSyncRunOutcome и TaskContext.ensureNotCancelled.
2026-05-22 13:22:15 +03:00
bc2b354820 fix(sync): исправил журнал при DELETE/TRASH и безопасный flush
Добавил recordSyncJournal для delete/moveToTrash, StorageSyncJournalBuffer
с восстановлением pending при ошибке записи и немедленным flush без debounce.
2026-05-22 13:22:05 +03:00
b00eed901b foreground task для фоновой синхронизации 2026-05-22 00:51:29 +03:00
35ba6dd377 Костыль для подавления цикла синхронизации 2026-05-22 00:37:00 +03:00
07d54b5996 Заголовок задачи синхронизации 2026-05-22 00:20:49 +03:00
28 changed files with 818 additions and 201 deletions

View File

@@ -1,6 +1,8 @@
package com.github.nullptroma.wallenc.app.di.modules.domain package com.github.nullptroma.wallenc.app.di.modules.domain
import com.github.nullptroma.wallenc.app.sync.StorageSyncTaskTitleFormatter
import com.github.nullptroma.wallenc.domain.interfaces.IStorageSyncEngine import com.github.nullptroma.wallenc.domain.interfaces.IStorageSyncEngine
import com.github.nullptroma.wallenc.domain.tasks.IStorageSyncTaskTitleFormatter
import com.github.nullptroma.wallenc.usecases.StorageSyncEngine import com.github.nullptroma.wallenc.usecases.StorageSyncEngine
import dagger.Binds import dagger.Binds
import dagger.Module import dagger.Module
@@ -15,4 +17,10 @@ abstract class UseCasesModule {
@Binds @Binds
@Singleton @Singleton
abstract fun bindStorageSyncEngine(impl: StorageSyncEngine): IStorageSyncEngine abstract fun bindStorageSyncEngine(impl: StorageSyncEngine): IStorageSyncEngine
@Binds
@Singleton
abstract fun bindStorageSyncTaskTitleFormatter(
impl: StorageSyncTaskTitleFormatter,
): IStorageSyncTaskTitleFormatter
} }

View File

@@ -2,8 +2,6 @@ package com.github.nullptroma.wallenc.app.sync
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncPaths import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncPaths
import com.github.nullptroma.wallenc.domain.interfaces.IVaultsManager import com.github.nullptroma.wallenc.domain.interfaces.IVaultsManager
import com.github.nullptroma.wallenc.ui.R
import com.github.nullptroma.wallenc.ui.resources.UiStringResolver
import com.github.nullptroma.wallenc.domain.tasks.StorageSyncTriggerReason import com.github.nullptroma.wallenc.domain.tasks.StorageSyncTriggerReason
import com.github.nullptroma.wallenc.usecases.RunStorageSyncUseCase import com.github.nullptroma.wallenc.usecases.RunStorageSyncUseCase
import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.CoroutineScope
@@ -11,6 +9,7 @@ import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.FlowPreview import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.SupervisorJob import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.flow.collectLatest import kotlinx.coroutines.flow.collectLatest
import kotlinx.coroutines.flow.combine
import kotlinx.coroutines.flow.debounce import kotlinx.coroutines.flow.debounce
import kotlinx.coroutines.flow.filter import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.map
@@ -25,14 +24,18 @@ class StorageSyncBootstrap @Inject constructor(
private val scheduler: StorageSyncScheduler, private val scheduler: StorageSyncScheduler,
private val vaultsManager: IVaultsManager, private val vaultsManager: IVaultsManager,
private val syncRunner: RunStorageSyncUseCase, private val syncRunner: RunStorageSyncUseCase,
private val uiStrings: UiStringResolver,
) { ) {
private val scope = CoroutineScope(SupervisorJob() + Dispatchers.IO) private val scope = CoroutineScope(SupervisorJob() + Dispatchers.IO)
fun start() { fun start() {
scheduler.ensureScheduled() scheduler.ensureScheduled()
scope.launch { scope.launch {
vaultsManager.allStorages.collectLatest { storages -> combine(
vaultsManager.allStorages,
vaultsManager.unlockManager.openedStorages,
) { rootStorages, opened ->
(rootStorages + opened.values).distinctBy { it.uuid }
}.collectLatest { storages ->
if (storages.isEmpty()) { if (storages.isEmpty()) {
return@collectLatest return@collectLatest
} }
@@ -55,21 +58,23 @@ class StorageSyncBootstrap @Inject constructor(
) )
} }
merge(*triggers.toTypedArray()) merge(*triggers.toTypedArray())
.debounce(DEBOUNCE_AFTER_CHANGE_MS) .filter { shouldScheduleDebounceSync() }
.debounce(RunStorageSyncUseCase.DEBOUNCE_AFTER_CHANGE_MS)
.collect { .collect {
if (syncRunner.syncRunning.value) { syncRunner.enqueue(StorageSyncTriggerReason.Debounce)
return@collect
}
syncRunner.enqueue(
displayTitle = uiStrings(R.string.task_title_storage_sync_background),
reason = StorageSyncTriggerReason.Debounce,
)
} }
} }
} }
} }
private companion object { /**
private const val DEBOUNCE_AFTER_CHANGE_MS = 60_000L * Игнорировать события во время sync и сразу после него: запись файлов при sync
* (в т.ч. через [EncryptedStorageAccessor]) иначе снова запускает debounce через 60 с.
*/
private fun shouldScheduleDebounceSync(): Boolean {
if (syncRunner.syncRunning.value) {
return false
}
return System.currentTimeMillis() >= syncRunner.debounceSuppressUntilMs.value
} }
} }

View File

@@ -0,0 +1,16 @@
package com.github.nullptroma.wallenc.app.sync
import com.github.nullptroma.wallenc.domain.tasks.IStorageSyncTaskTitleFormatter
import com.github.nullptroma.wallenc.domain.tasks.StorageSyncTriggerReason
import com.github.nullptroma.wallenc.ui.resources.UiStringResolver
import com.github.nullptroma.wallenc.ui.resources.storageSyncTaskTitle
import javax.inject.Inject
import javax.inject.Singleton
@Singleton
class StorageSyncTaskTitleFormatter @Inject constructor(
private val uiStrings: UiStringResolver,
) : IStorageSyncTaskTitleFormatter {
override fun format(reason: StorageSyncTriggerReason): String =
uiStrings.storageSyncTaskTitle(reason)
}

View File

@@ -6,6 +6,7 @@ import androidx.work.CoroutineWorker
import androidx.work.WorkerParameters import androidx.work.WorkerParameters
import com.github.nullptroma.wallenc.domain.tasks.StorageSyncTriggerReason import com.github.nullptroma.wallenc.domain.tasks.StorageSyncTriggerReason
import com.github.nullptroma.wallenc.usecases.RunStorageSyncUseCase import com.github.nullptroma.wallenc.usecases.RunStorageSyncUseCase
import com.github.nullptroma.wallenc.usecases.StorageSyncRunOutcome
import dagger.assisted.Assisted import dagger.assisted.Assisted
import dagger.assisted.AssistedInject import dagger.assisted.AssistedInject
import timber.log.Timber import timber.log.Timber
@@ -19,13 +20,23 @@ class StorageSyncWorker @AssistedInject constructor(
override suspend fun doWork(): Result { override suspend fun doWork(): Result {
Timber.d("Periodic storage sync started (attempt %d)", runAttemptCount) Timber.d("Periodic storage sync started (attempt %d)", runAttemptCount)
return runCatching { return when (val outcome = syncRunner.enqueueAndAwait(StorageSyncTriggerReason.Background)) {
syncRunner.runBlocking(StorageSyncTriggerReason.Background) StorageSyncRunOutcome.SkippedAlreadyRunning -> {
Timber.d("Periodic storage sync finished") Timber.d("Periodic storage sync skipped — already running")
Result.success() Result.success()
}.getOrElse { error -> }
Timber.w(error, "Periodic storage sync failed") StorageSyncRunOutcome.Completed -> {
Result.retry() Timber.d("Periodic storage sync finished")
Result.success()
}
StorageSyncRunOutcome.Cancelled -> {
Timber.d("Periodic storage sync cancelled")
Result.success()
}
is StorageSyncRunOutcome.Failed -> {
Timber.w(outcome.error, "Periodic storage sync failed")
Result.retry()
}
} }
} }

View File

@@ -25,6 +25,7 @@ import java.io.FilterInputStream
import java.io.IOException import java.io.IOException
import java.io.InputStream import java.io.InputStream
import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicLong
class YandexDiskRepository( class YandexDiskRepository(
private val api: YandexDiskApi, private val api: YandexDiskApi,
@@ -38,6 +39,13 @@ class YandexDiskRepository(
private val listCache = ConcurrentHashMap<ListCacheKey, ResourceDto>() private val listCache = ConcurrentHashMap<ListCacheKey, ResourceDto>()
private val getCache = ConcurrentHashMap<String, ResourceDto>() private val getCache = ConcurrentHashMap<String, ResourceDto>()
private val cloudApiCallCount = AtomicLong(0)
fun cloudApiCallCount(): Long = cloudApiCallCount.get()
fun resetCloudApiCallCount() {
cloudApiCallCount.set(0)
}
suspend fun diskInfo(): DiskInfoDto = withContext(ioDispatcher) { suspend fun diskInfo(): DiskInfoDto = withContext(ioDispatcher) {
val now = System.currentTimeMillis() val now = System.currentTimeMillis()
@@ -93,7 +101,7 @@ class YandexDiskRepository(
suspend fun createFolder(path: String): Unit = withContext(ioDispatcher) { suspend fun createFolder(path: String): Unit = withContext(ioDispatcher) {
val resp = wrapAuth { api.createFolder(path) } val resp = wrapAuth { api.createFolder(path) }
when (resp.code()) { when (resp.code()) {
201, 409 -> invalidateDiskMetaCaches() 201, 409 -> invalidateDiskMetaCaches(path)
else -> throw failure("createFolder", resp) else -> throw failure("createFolder", resp)
} }
} }
@@ -101,14 +109,14 @@ class YandexDiskRepository(
suspend fun delete(path: String, permanently: Boolean = true): Unit = withContext(ioDispatcher) { suspend fun delete(path: String, permanently: Boolean = true): Unit = withContext(ioDispatcher) {
val resp = wrapAuth { api.deleteResource(path, permanently) } val resp = wrapAuth { api.deleteResource(path, permanently) }
when (resp.code()) { when (resp.code()) {
204 -> invalidateDiskMetaCaches() 204 -> invalidateDiskMetaCaches(path)
202 -> { 202 -> {
val link = resp.body()?.use { body -> parseLink(body) } val link = resp.body()?.use { body -> parseLink(body) }
?: throw IOException("DELETE 202 without body") ?: throw IOException("DELETE 202 without body")
awaitOperation(link.href) awaitOperation(link.href)
invalidateDiskMetaCaches() invalidateDiskMetaCaches(path)
} }
404 -> invalidateDiskMetaCaches() 404 -> invalidateDiskMetaCaches(path)
else -> throw failure("delete", resp) else -> throw failure("delete", resp)
} }
} }
@@ -122,7 +130,7 @@ class YandexDiskRepository(
throw failure("patch", resp) throw failure("patch", resp)
} }
resp.body()?.close() resp.body()?.close()
invalidateDiskMetaCaches() invalidateDiskMetaCaches(path)
} }
suspend fun uploadBytes(path: String, bytes: ByteArray, overwrite: Boolean = true): Unit = suspend fun uploadBytes(path: String, bytes: ByteArray, overwrite: Boolean = true): Unit =
@@ -134,10 +142,11 @@ class YandexDiskRepository(
val body = bytes.toRequestBody(OCTET_STREAM) val body = bytes.toRequestBody(OCTET_STREAM)
val req = Request.Builder().url(link.href).put(body).build() val req = Request.Builder().url(link.href).put(body).build()
repeat(LOCKED_RETRY_MAX) { attempt -> repeat(LOCKED_RETRY_MAX) { attempt ->
recordCloudApiCall()
rawHttp.newCall(req).execute().use { resp -> rawHttp.newCall(req).execute().use { resp ->
when { when {
resp.isSuccessful -> { resp.isSuccessful -> {
invalidateDiskMetaCaches() invalidateDiskMetaCaches(path)
return@withContext return@withContext
} }
resp.code == 423 && attempt < LOCKED_RETRY_MAX - 1 -> resp.code == 423 && attempt < LOCKED_RETRY_MAX - 1 ->
@@ -158,10 +167,11 @@ class YandexDiskRepository(
val body = file.asRequestBody(OCTET_STREAM) val body = file.asRequestBody(OCTET_STREAM)
val req = Request.Builder().url(link.href).put(body).build() val req = Request.Builder().url(link.href).put(body).build()
repeat(LOCKED_RETRY_MAX) { attempt -> repeat(LOCKED_RETRY_MAX) { attempt ->
recordCloudApiCall()
rawHttp.newCall(req).execute().use { resp -> rawHttp.newCall(req).execute().use { resp ->
when { when {
resp.isSuccessful -> { resp.isSuccessful -> {
invalidateDiskMetaCaches() invalidateDiskMetaCaches(path)
return@withContext return@withContext
} }
resp.code == 423 && attempt < LOCKED_RETRY_MAX - 1 -> resp.code == 423 && attempt < LOCKED_RETRY_MAX - 1 ->
@@ -186,6 +196,7 @@ class YandexDiskRepository(
} }
val req = Request.Builder().url(link.href).get().build() val req = Request.Builder().url(link.href).get().build()
repeat(LOCKED_RETRY_MAX) { attempt -> repeat(LOCKED_RETRY_MAX) { attempt ->
recordCloudApiCall()
val resp = rawHttp.newCall(req).execute() val resp = rawHttp.newCall(req).execute()
when { when {
resp.isSuccessful -> { resp.isSuccessful -> {
@@ -281,18 +292,47 @@ class YandexDiskRepository(
getCache[path] = value getCache[path] = value
} }
private fun invalidateDiskMetaCaches() { private fun invalidateDiskMetaCaches(changedDiskPath: String? = null) {
synchronized(diskCacheLock) { synchronized(diskCacheLock) {
diskInfoCached = null diskInfoCached = null
diskInfoCachedUntilMs = 0L diskInfoCachedUntilMs = 0L
} }
listCache.clear() if (changedDiskPath == null) {
getCache.clear() listCache.clear()
getCache.clear()
return
}
val prefixes = cachePrefixesForPath(changedDiskPath)
listCache.keys.removeAll { key ->
prefixes.any { prefix ->
key.path.startsWith(prefix) || prefix.startsWith(key.path.trimEnd('/'))
}
}
getCache.keys.removeAll { cachedPath ->
prefixes.any { prefix ->
cachedPath.startsWith(prefix) || prefix.startsWith(cachedPath.trimEnd('/'))
}
}
}
private fun cachePrefixesForPath(diskPath: String): List<String> {
val normalized = diskPath.trimEnd('/')
val out = mutableListOf<String>()
var current = normalized
while (current.isNotEmpty()) {
out.add(current)
out.add("$current/")
val slash = current.lastIndexOf('/')
if (slash <= 0) break
current = current.substring(0, slash)
}
return out
} }
private suspend inline fun <T> wrapAuth(crossinline block: suspend () -> T): T { private suspend inline fun <T> wrapAuth(crossinline block: suspend () -> T): T {
repeat(LOCKED_RETRY_MAX) { attempt -> repeat(LOCKED_RETRY_MAX) { attempt ->
try { try {
recordCloudApiCall()
return block() return block()
} catch (e: HttpException) { } catch (e: HttpException) {
when (e.code()) { when (e.code()) {
@@ -313,6 +353,10 @@ class YandexDiskRepository(
error("unreachable") error("unreachable")
} }
private fun recordCloudApiCall() {
cloudApiCallCount.incrementAndGet()
}
private fun failure(op: String, resp: Response<ResponseBody>): IOException { private fun failure(op: String, resp: Response<ResponseBody>): IOException {
val msg = resp.errorBody()?.string() ?: resp.message() val msg = resp.errorBody()?.string() ?: resp.message()
return IOException("$op failed: HTTP ${resp.code()} $msg") return IOException("$op failed: HTTP ${resp.code()} $msg")

View File

@@ -0,0 +1,112 @@
package com.github.nullptroma.wallenc.domain.vault.storages.common
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournal
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournalEntry
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournalMerge
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncRevision
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import java.util.UUID
/**
* Буфер журнала sync: накопление записей и безопасный flush (без потери pending при ошибке записи).
*/
class StorageSyncJournalBuffer(
private val syncActorId: String,
private val originStorageUuid: UUID?,
private val readJournal: suspend () -> StorageSyncJournal,
private val writeJournal: suspend (StorageSyncJournal) -> Unit,
) {
private val pendingMutex = Mutex()
private val flushMutex = Mutex()
private var pendingJournalEntries: StorageSyncJournal = emptyMap()
@Volatile
private var sequenceHighWatermark: Long? = null
suspend fun flushPending() {
flushMutex.withLock {
flushPendingUnderLock()
}
}
suspend fun putEntries(entries: StorageSyncJournal) {
flushPending()
if (entries.isEmpty()) {
return
}
val current = readJournal()
val merged = StorageSyncJournalMerge.merge(current, entries)
if (merged == current) {
return
}
writeJournal(merged)
refreshSequenceHighWatermark(merged)
}
suspend fun appendEntry(path: String, entry: StorageSyncJournalEntry) {
pendingMutex.withLock {
pendingJournalEntries = pendingJournalEntries + (path to entry)
}
flushPending()
}
suspend fun nextSequence(): Long {
flushPending()
val diskMax = readJournal().values.maxOfOrNull { it.revision.sequence } ?: 0L
val pendingMax = pendingMutex.withLock {
pendingJournalEntries.values.maxOfOrNull { it.revision.sequence } ?: 0L
}
val base = maxOf(diskMax, pendingMax, sequenceHighWatermark ?: 0L)
val next = base + 1L
sequenceHighWatermark = next
return next
}
fun buildEntry(
path: String,
operation: com.github.nullptroma.wallenc.domain.datatypes.StorageSyncOperation,
sequence: Long,
): StorageSyncJournalEntry {
return StorageSyncJournalEntry(
path = path,
operation = operation,
revision = StorageSyncRevision(
sequence = sequence,
actorId = syncActorId,
createdAt = java.time.Instant.now(),
),
originStorageUuid = originStorageUuid,
)
}
private suspend fun flushPendingUnderLock() {
val pending = pendingMutex.withLock {
if (pendingJournalEntries.isEmpty()) {
return
}
val snapshot = pendingJournalEntries
pendingJournalEntries = emptyMap()
snapshot
}
try {
val current = readJournal()
val merged = StorageSyncJournalMerge.merge(current, pending)
if (merged != current) {
writeJournal(merged)
}
refreshSequenceHighWatermark(merged)
} catch (e: Exception) {
pendingMutex.withLock {
pendingJournalEntries = StorageSyncJournalMerge.merge(pending, pendingJournalEntries)
}
throw e
}
}
private fun refreshSequenceHighWatermark(journal: StorageSyncJournal) {
val maxSeq = journal.values.maxOfOrNull { it.revision.sequence } ?: return
val current = sequenceHighWatermark
sequenceHighWatermark = if (current == null) maxSeq else maxOf(current, maxSeq)
}
}

View File

@@ -2,6 +2,7 @@ package com.github.nullptroma.wallenc.domain.vault.storages.encrypt
import com.fasterxml.jackson.module.kotlin.readValue import com.fasterxml.jackson.module.kotlin.readValue
import com.github.nullptroma.wallenc.domain.errors.WallencException import com.github.nullptroma.wallenc.domain.errors.WallencException
import com.github.nullptroma.wallenc.domain.vault.storages.common.StorageSyncJournalBuffer
import com.github.nullptroma.wallenc.domain.vault.storages.common.StorageSyncJournalCodec import com.github.nullptroma.wallenc.domain.vault.storages.common.StorageSyncJournalCodec
import com.github.nullptroma.wallenc.domain.vault.storages.common.readSystemFileBytesOrEmpty import com.github.nullptroma.wallenc.domain.vault.storages.common.readSystemFileBytesOrEmpty
import com.github.nullptroma.wallenc.domain.vault.utils.CloseHandledStreamExtension.Companion.onClosed import com.github.nullptroma.wallenc.domain.vault.utils.CloseHandledStreamExtension.Companion.onClosed
@@ -18,14 +19,12 @@ import com.github.nullptroma.wallenc.domain.interfaces.IFile
import com.github.nullptroma.wallenc.domain.interfaces.IMetaInfo import com.github.nullptroma.wallenc.domain.interfaces.IMetaInfo
import com.github.nullptroma.wallenc.domain.interfaces.IStorageAccessor import com.github.nullptroma.wallenc.domain.interfaces.IStorageAccessor
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournal import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournal
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournalEntry
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournalMerge
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncPaths import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncPaths
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncLock import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncLock
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncOperation import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncOperation
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncRevision
import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.DisposableHandle import kotlinx.coroutines.DisposableHandle
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableSharedFlow import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.MutableStateFlow
@@ -52,6 +51,12 @@ class EncryptedStorageAccessor(
) : IStorageAccessor, DisposableHandle { ) : IStorageAccessor, DisposableHandle {
private val syncActorId = UUID.randomUUID().toString() private val syncActorId = UUID.randomUUID().toString()
private val syncLockMutex = Mutex() private val syncLockMutex = Mutex()
private val journalBuffer = StorageSyncJournalBuffer(
syncActorId = syncActorId,
originStorageUuid = null,
readJournal = { readSyncJournalUnchecked() },
writeJournal = { writeSyncJournal(it) },
)
private val _size = MutableStateFlow<Long?>(null) private val _size = MutableStateFlow<Long?>(null)
override val size: StateFlow<Long?> = _size override val size: StateFlow<Long?> = _size
@@ -110,22 +115,24 @@ class EncryptedStorageAccessor(
launch { launch {
source.numberOfFiles.collect { source.numberOfFiles.collect {
if(it == null) if (it == null) {
_numberOfFiles.value = null _numberOfFiles.value = null
else } else {
{ val hiddenCount = runCatching { getSystemFiles().size }.getOrNull() ?: return@collect
_numberOfFiles.value = it - getSystemFiles().size _numberOfFiles.value = it - hiddenCount
} }
} }
} }
launch { launch {
source.size.collect { sourceSize -> source.size.collect { sourceSize ->
if(sourceSize == null) if (sourceSize == null) {
_size.value = null _size.value = null
else } else {
{ val hiddenBytes = runCatching {
_size.value = sourceSize - getSystemFiles().sumOf { it.metaInfo.size } getSystemFiles().sumOf { file -> file.metaInfo.size }
}.getOrNull() ?: return@collect
_size.value = sourceSize - hiddenBytes
} }
} }
} }
@@ -257,9 +264,11 @@ class EncryptedStorageAccessor(
source.touchDir(encryptPath(path)) source.touchDir(encryptPath(path))
} }
override suspend fun delete(path: String) { override suspend fun delete(path: String, recordSyncJournal: Boolean) {
source.delete(encryptPath(path)) source.delete(encryptPath(path), recordSyncJournal = false)
appendSyncEntry(path = path, operation = StorageSyncOperation.DELETE) if (recordSyncJournal) {
appendSyncEntry(path = path, operation = StorageSyncOperation.DELETE)
}
} }
override suspend fun openWrite(path: String, recordSyncJournal: Boolean): OutputStream = override suspend fun openWrite(path: String, recordSyncJournal: Boolean): OutputStream =
@@ -270,12 +279,17 @@ class EncryptedStorageAccessor(
return dataEncryptor.decryptStream(stream) return dataEncryptor.decryptStream(stream)
} }
override suspend fun moveToTrash(path: String) { override suspend fun moveToTrash(path: String, recordSyncJournal: Boolean) {
source.moveToTrash(encryptPath(path)) source.moveToTrash(encryptPath(path), recordSyncJournal = false)
appendSyncEntry(path = path, operation = StorageSyncOperation.TRASH) if (recordSyncJournal) {
appendSyncEntry(path = path, operation = StorageSyncOperation.TRASH)
}
} }
override fun dispose() { override fun dispose() {
runBlocking {
runCatching { journalBuffer.flushPending() }
}
dataEncryptor.dispose() dataEncryptor.dispose()
} }
@@ -297,16 +311,21 @@ class EncryptedStorageAccessor(
} }
override suspend fun readSyncJournal(): StorageSyncJournal { override suspend fun readSyncJournal(): StorageSyncJournal {
val bytes = readSystemFileBytesOrEmpty { openReadSystemFile(SYNC_JOURNAL_FILENAME) } journalBuffer.flushPending()
return StorageSyncJournalCodec.read(jackson, bytes) return readSyncJournalUnchecked()
}
override suspend fun flushPendingSyncJournal() {
journalBuffer.flushPending()
} }
override suspend fun putSyncJournalEntries(entries: StorageSyncJournal) { override suspend fun putSyncJournalEntries(entries: StorageSyncJournal) {
if (entries.isEmpty()) { journalBuffer.putEntries(entries)
return }
}
val merged = StorageSyncJournalMerge.merge(readSyncJournal(), entries) private suspend fun readSyncJournalUnchecked(): StorageSyncJournal {
writeSyncJournal(merged) val bytes = readSystemFileBytesOrEmpty { openReadSystemFile(SYNC_JOURNAL_FILENAME) }
return StorageSyncJournalCodec.read(jackson, bytes)
} }
private suspend fun writeSyncJournal(journal: StorageSyncJournal) { private suspend fun writeSyncJournal(journal: StorageSyncJournal) {
@@ -373,21 +392,9 @@ class EncryptedStorageAccessor(
if (!StorageSyncPaths.isSyncableUserPath(cleanedPath)) { if (!StorageSyncPaths.isSyncableUserPath(cleanedPath)) {
return return
} }
val journal = readSyncJournal() val sequence = journalBuffer.nextSequence()
val nextSequence = (journal.values.maxOfOrNull { it.revision.sequence } ?: 0L) + 1L val entry = journalBuffer.buildEntry(cleanedPath, operation, sequence)
putSyncJournalEntries( journalBuffer.appendEntry(cleanedPath, entry)
mapOf(
cleanedPath to StorageSyncJournalEntry(
path = cleanedPath,
operation = operation,
revision = StorageSyncRevision(
sequence = nextSequence,
actorId = syncActorId,
createdAt = Instant.now(),
),
),
),
)
} }
private suspend fun openWriteInternal(path: String, recordJournal: Boolean): OutputStream { private suspend fun openWriteInternal(path: String, recordJournal: Boolean): OutputStream {

View File

@@ -1,6 +1,7 @@
package com.github.nullptroma.wallenc.domain.vault.storages.local package com.github.nullptroma.wallenc.domain.vault.storages.local
import com.github.nullptroma.wallenc.domain.errors.WallencException import com.github.nullptroma.wallenc.domain.errors.WallencException
import com.github.nullptroma.wallenc.domain.vault.storages.common.StorageSyncJournalBuffer
import com.github.nullptroma.wallenc.domain.vault.storages.common.StorageSyncJournalCodec import com.github.nullptroma.wallenc.domain.vault.storages.common.StorageSyncJournalCodec
import com.github.nullptroma.wallenc.domain.vault.storages.common.readSystemFileBytesOrEmpty import com.github.nullptroma.wallenc.domain.vault.storages.common.readSystemFileBytesOrEmpty
@@ -26,6 +27,8 @@ import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncRevision
import kotlinx.coroutines.CoroutineDispatcher import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlinx.coroutines.flow.MutableSharedFlow import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.SharedFlow import kotlinx.coroutines.flow.SharedFlow
@@ -73,6 +76,13 @@ class LocalStorageAccessor(
private val _dirsUpdates = MutableSharedFlow<DataPage<IDirectory>>() private val _dirsUpdates = MutableSharedFlow<DataPage<IDirectory>>()
override val dirsUpdates: SharedFlow<DataPage<IDirectory>> = _dirsUpdates override val dirsUpdates: SharedFlow<DataPage<IDirectory>> = _dirsUpdates
private val journalBuffer = StorageSyncJournalBuffer(
syncActorId = syncActorId,
originStorageUuid = null,
readJournal = { readSyncJournalUnchecked() },
writeJournal = { writeSyncJournal(it) },
)
suspend fun init() = withContext(ioDispatcher) { suspend fun init() = withContext(ioDispatcher) {
// запускам сканирование хранилища // запускам сканирование хранилища
scanSizeAndNumOfFiles() scanSizeAndNumOfFiles()
@@ -513,7 +523,7 @@ class LocalStorageAccessor(
createDir(path) createDir(path)
} }
override suspend fun delete(path: String) = withContext(ioDispatcher) { override suspend fun delete(path: String, recordSyncJournal: Boolean) = withContext(ioDispatcher) {
if (path == "/" || path.isBlank()) { if (path == "/" || path.isBlank()) {
throw WallencException.Storage.DeleteRootForbidden() throw WallencException.Storage.DeleteRootForbidden()
} }
@@ -523,7 +533,9 @@ class LocalStorageAccessor(
else pair.file.delete() else pair.file.delete()
pair.metaFile.delete() pair.metaFile.delete()
scanSizeAndNumOfFiles() scanSizeAndNumOfFiles()
appendSyncEntry(path = path, operation = StorageSyncOperation.DELETE) if (recordSyncJournal) {
appendSyncEntry(path = path, operation = StorageSyncOperation.DELETE)
}
} }
} }
@@ -548,12 +560,14 @@ class LocalStorageAccessor(
return@withContext pair.file.inputStream() return@withContext pair.file.inputStream()
} }
override suspend fun moveToTrash(path: String) = withContext(ioDispatcher) { override suspend fun moveToTrash(path: String, recordSyncJournal: Boolean) = withContext(ioDispatcher) {
val pair = LocalStorageFilePair.from(_filesystemBasePath, path) val pair = LocalStorageFilePair.from(_filesystemBasePath, path)
?: throw WallencException.Storage.FileNotFound() ?: throw WallencException.Storage.FileNotFound()
val newMeta = pair.meta.copy(isDeleted = true) val newMeta = pair.meta.copy(isDeleted = true)
writeMeta(pair.metaFile, newMeta) writeMeta(pair.metaFile, newMeta)
appendSyncEntry(path = path, operation = StorageSyncOperation.TRASH) if (recordSyncJournal) {
appendSyncEntry(path = path, operation = StorageSyncOperation.TRASH)
}
} }
override suspend fun openReadSystemFile(name: String): InputStream = withContext(ioDispatcher) { override suspend fun openReadSystemFile(name: String): InputStream = withContext(ioDispatcher) {
@@ -579,16 +593,21 @@ class LocalStorageAccessor(
} }
override suspend fun readSyncJournal(): StorageSyncJournal = withContext(ioDispatcher) { override suspend fun readSyncJournal(): StorageSyncJournal = withContext(ioDispatcher) {
val bytes = readSystemFileBytesOrEmpty { openReadSystemFile(SYNC_JOURNAL_FILENAME) } journalBuffer.flushPending()
StorageSyncJournalCodec.read(jackson, bytes) readSyncJournalUnchecked()
}
override suspend fun flushPendingSyncJournal() = withContext(ioDispatcher) {
journalBuffer.flushPending()
} }
override suspend fun putSyncJournalEntries(entries: StorageSyncJournal) = withContext(ioDispatcher) { override suspend fun putSyncJournalEntries(entries: StorageSyncJournal) = withContext(ioDispatcher) {
if (entries.isEmpty()) { journalBuffer.putEntries(entries)
return@withContext }
}
val merged = StorageSyncJournalMerge.merge(readSyncJournal(), entries) private suspend fun readSyncJournalUnchecked(): StorageSyncJournal {
writeSyncJournal(merged) val bytes = readSystemFileBytesOrEmpty { openReadSystemFile(SYNC_JOURNAL_FILENAME) }
return StorageSyncJournalCodec.read(jackson, bytes)
} }
private suspend fun writeSyncJournal(journal: StorageSyncJournal) { private suspend fun writeSyncJournal(journal: StorageSyncJournal) {
@@ -721,21 +740,9 @@ class LocalStorageAccessor(
if (!StorageSyncPaths.isSyncableUserPath(cleanedPath)) { if (!StorageSyncPaths.isSyncableUserPath(cleanedPath)) {
return return
} }
val journal = readSyncJournal() val sequence = journalBuffer.nextSequence()
val nextSequence = (journal.values.maxOfOrNull { it.revision.sequence } ?: 0L) + 1L val entry = journalBuffer.buildEntry(cleanedPath, operation, sequence)
putSyncJournalEntries( journalBuffer.appendEntry(cleanedPath, entry)
mapOf(
cleanedPath to StorageSyncJournalEntry(
path = cleanedPath,
operation = operation,
revision = StorageSyncRevision(
sequence = nextSequence,
actorId = syncActorId,
createdAt = Instant.now(),
),
),
),
)
} }
companion object { companion object {

View File

@@ -2,6 +2,7 @@ package com.github.nullptroma.wallenc.domain.vault.storages.yandex
import com.github.nullptroma.wallenc.domain.errors.WallencException import com.github.nullptroma.wallenc.domain.errors.WallencException
import com.github.nullptroma.wallenc.domain.vault.errors.toVaultWallencException import com.github.nullptroma.wallenc.domain.vault.errors.toVaultWallencException
import com.github.nullptroma.wallenc.domain.vault.storages.common.StorageSyncJournalBuffer
import com.github.nullptroma.wallenc.domain.vault.storages.common.StorageSyncJournalCodec import com.github.nullptroma.wallenc.domain.vault.storages.common.StorageSyncJournalCodec
import com.github.nullptroma.wallenc.domain.vault.storages.common.readSystemFileBytesOrEmpty import com.github.nullptroma.wallenc.domain.vault.storages.common.readSystemFileBytesOrEmpty
@@ -19,17 +20,16 @@ import com.github.nullptroma.wallenc.domain.interfaces.IDirectory
import com.github.nullptroma.wallenc.domain.interfaces.IFile import com.github.nullptroma.wallenc.domain.interfaces.IFile
import com.github.nullptroma.wallenc.domain.interfaces.IStorageAccessor import com.github.nullptroma.wallenc.domain.interfaces.IStorageAccessor
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournal import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournal
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournalEntry
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournalMerge
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncPaths import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncPaths
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncLock import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncLock
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncOperation import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncOperation
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncRevision
import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineDispatcher import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job import kotlinx.coroutines.Job
import kotlinx.coroutines.delay import kotlinx.coroutines.delay
import kotlin.coroutines.coroutineContext
import kotlinx.coroutines.ensureActive
import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableSharedFlow import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.MutableStateFlow
@@ -42,6 +42,7 @@ import kotlinx.coroutines.flow.flowOn
import kotlinx.coroutines.flow.stateIn import kotlinx.coroutines.flow.stateIn
import kotlinx.coroutines.launch import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withContext
import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock import kotlinx.coroutines.sync.withLock
import kotlinx.coroutines.withContext import kotlinx.coroutines.withContext
@@ -95,6 +96,13 @@ class YandexStorageAccessor(
private var statsPersistJob: Job? = null private var statsPersistJob: Job? = null
private val journalBuffer = StorageSyncJournalBuffer(
syncActorId = syncActorId,
originStorageUuid = storageUuid,
readJournal = { readSyncJournalUnchecked() },
writeJournal = { writeSyncJournal(it) },
)
@Volatile @Volatile
private var systemDirEnsured: Boolean = false private var systemDirEnsured: Boolean = false
@@ -240,6 +248,7 @@ class YandexStorageAccessor(
val queue = ArrayDeque<String>() val queue = ArrayDeque<String>()
queue.add(relDir) queue.add(relDir)
while (queue.isNotEmpty()) { while (queue.isNotEmpty()) {
coroutineContext.ensureActive()
val rel = queue.removeFirst() val rel = queue.removeFirst()
if (isSystemRel(rel)) continue if (isSystemRel(rel)) continue
val (files, dirs) = listImmediateChildren(rel) val (files, dirs) = listImmediateChildren(rel)
@@ -285,6 +294,7 @@ class YandexStorageAccessor(
val dirs = mutableListOf<IDirectory>() val dirs = mutableListOf<IDirectory>()
var offset = 0 var offset = 0
while (true) { while (true) {
coroutineContext.ensureActive()
val res = guard { repo.list(diskPath, API_LIST_LIMIT, offset) } val res = guard { repo.list(diskPath, API_LIST_LIMIT, offset) }
val items = res.embedded?.items.orEmpty() val items = res.embedded?.items.orEmpty()
for (it in items) { for (it in items) {
@@ -303,7 +313,7 @@ class YandexStorageAccessor(
} }
private suspend fun getMetadataAfterWrite(diskPath: String): ResourceDto { private suspend fun getMetadataAfterWrite(diskPath: String): ResourceDto {
val maxAttempts = 6 val maxAttempts = 3
repeat(maxAttempts) { attempt -> repeat(maxAttempts) { attempt ->
try { try {
return guard { repo.get(diskPath) } return guard { repo.get(diskPath) }
@@ -488,8 +498,8 @@ class YandexStorageAccessor(
if (created) { if (created) {
_numberOfFiles.value = (_numberOfFiles.value ?: 0) + 1 _numberOfFiles.value = (_numberOfFiles.value ?: 0) + 1
persistStatsImmediate() persistStatsImmediate()
appendSyncEntry(path = path, operation = StorageSyncOperation.UPSERT)
} }
appendSyncEntry(path = path, operation = StorageSyncOperation.UPSERT)
} }
override suspend fun touchDir(path: String): Unit = withContext(ioDispatcher) { override suspend fun touchDir(path: String): Unit = withContext(ioDispatcher) {
@@ -506,7 +516,7 @@ class YandexStorageAccessor(
} }
} }
override suspend fun delete(path: String) = withContext(ioDispatcher) { override suspend fun delete(path: String, recordSyncJournal: Boolean) = withContext(ioDispatcher) {
if (path == "/" || path.isBlank()) { if (path == "/" || path.isBlank()) {
throw WallencException.Storage.DeleteRootForbidden() throw WallencException.Storage.DeleteRootForbidden()
} }
@@ -528,7 +538,9 @@ class YandexStorageAccessor(
} }
guard { repo.delete(diskPath, permanently = true) } guard { repo.delete(diskPath, permanently = true) }
scheduleStatsPersist() scheduleStatsPersist()
appendSyncEntry(path = path, operation = StorageSyncOperation.DELETE) if (recordSyncJournal) {
appendSyncEntry(path = path, operation = StorageSyncOperation.DELETE)
}
} }
override suspend fun openWrite(path: String, recordSyncJournal: Boolean): OutputStream = withContext(ioDispatcher) { override suspend fun openWrite(path: String, recordSyncJournal: Boolean): OutputStream = withContext(ioDispatcher) {
@@ -546,9 +558,11 @@ class YandexStorageAccessor(
guard { repo.openDownloadStream(toDiskPath(path)) } guard { repo.openDownloadStream(toDiskPath(path)) }
} }
override suspend fun moveToTrash(path: String) = withContext(ioDispatcher) { override suspend fun moveToTrash(path: String, recordSyncJournal: Boolean) = withContext(ioDispatcher) {
patchCustomProps(path, mapOf(PROP_DELETED to "true")) patchCustomProps(path, mapOf(PROP_DELETED to "true"))
appendSyncEntry(path = path, operation = StorageSyncOperation.TRASH) if (recordSyncJournal) {
appendSyncEntry(path = path, operation = StorageSyncOperation.TRASH)
}
} }
override suspend fun openReadSystemFile(name: String): InputStream = withContext(ioDispatcher) { override suspend fun openReadSystemFile(name: String): InputStream = withContext(ioDispatcher) {
@@ -576,16 +590,21 @@ class YandexStorageAccessor(
} }
override suspend fun readSyncJournal(): StorageSyncJournal = withContext(ioDispatcher) { override suspend fun readSyncJournal(): StorageSyncJournal = withContext(ioDispatcher) {
val bytes = readSystemFileBytesOrEmpty { openReadSystemFile(SYNC_JOURNAL_FILENAME) } journalBuffer.flushPending()
StorageSyncJournalCodec.read(statsMapper, bytes) readSyncJournalUnchecked()
}
override suspend fun flushPendingSyncJournal() = withContext(ioDispatcher) {
journalBuffer.flushPending()
} }
override suspend fun putSyncJournalEntries(entries: StorageSyncJournal) = withContext(ioDispatcher) { override suspend fun putSyncJournalEntries(entries: StorageSyncJournal) = withContext(ioDispatcher) {
if (entries.isEmpty()) { journalBuffer.putEntries(entries)
return@withContext }
}
val merged = StorageSyncJournalMerge.merge(readSyncJournal(), entries) private suspend fun readSyncJournalUnchecked(): StorageSyncJournal {
writeSyncJournal(merged) val bytes = readSystemFileBytesOrEmpty { openReadSystemFile(SYNC_JOURNAL_FILENAME) }
return StorageSyncJournalCodec.read(statsMapper, bytes)
} }
private suspend fun writeSyncJournal(journal: StorageSyncJournal) { private suspend fun writeSyncJournal(journal: StorageSyncJournal) {
@@ -668,8 +687,10 @@ class YandexStorageAccessor(
* Выполняется из [OutputStream.close]; ошибки upload пробрасываются вызывающему коду. * Выполняется из [OutputStream.close]; ошибки upload пробрасываются вызывающему коду.
*/ */
private fun runCommitAfterStreamClosed(block: suspend () -> Unit) { private fun runCommitAfterStreamClosed(block: suspend () -> Unit) {
runBlocking(ioDispatcher) { runBlocking {
block() withContext(ioDispatcher) {
block()
}
} }
} }
@@ -710,22 +731,9 @@ class YandexStorageAccessor(
if (!StorageSyncPaths.isSyncableUserPath(cleanedPath)) { if (!StorageSyncPaths.isSyncableUserPath(cleanedPath)) {
return return
} }
val journal = readSyncJournal() val sequence = journalBuffer.nextSequence()
val nextSequence = (journal.values.maxOfOrNull { it.revision.sequence } ?: 0L) + 1L val entry = journalBuffer.buildEntry(cleanedPath, operation, sequence)
putSyncJournalEntries( journalBuffer.appendEntry(cleanedPath, entry)
mapOf(
cleanedPath to StorageSyncJournalEntry(
path = cleanedPath,
operation = operation,
revision = StorageSyncRevision(
sequence = nextSequence,
actorId = syncActorId,
createdAt = Instant.now(),
),
originStorageUuid = storageUuid,
),
),
)
} }
private suspend fun touchParentDirs(path: String) { private suspend fun touchParentDirs(path: String) {

View File

@@ -33,6 +33,7 @@ class YandexDiskRepositoryTest {
@Test @Test
fun diskInfoParsesResponse() = runBlocking { fun diskInfoParsesResponse() = runBlocking {
repository.resetCloudApiCallCount()
server.enqueue( server.enqueue(
MockResponse() MockResponse()
.setResponseCode(200) .setResponseCode(200)
@@ -42,6 +43,7 @@ class YandexDiskRepositoryTest {
val info = repository.diskInfo() val info = repository.diskInfo()
assertEquals(1000L, info.totalSpace) assertEquals(1000L, info.totalSpace)
assertEquals(200L, info.usedSpace) assertEquals(200L, info.usedSpace)
assertEquals(1L, repository.cloudApiCallCount())
} }
@Test @Test

View File

@@ -0,0 +1,41 @@
package com.github.nullptroma.wallenc.domain.vault.storages.common
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournalEntry
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncOperation
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncRevision
import kotlinx.coroutines.runBlocking
import org.junit.Assert.assertEquals
import org.junit.Assert.assertTrue
import org.junit.Test
import java.time.Instant
import java.util.concurrent.atomic.AtomicReference
class StorageSyncJournalBufferTest {
@Test
fun flushRestoresPendingOnWriteFailure() = runBlocking {
val disk = AtomicReference(emptyMap<String, StorageSyncJournalEntry>())
var shouldFail = true
val buffer = StorageSyncJournalBuffer(
syncActorId = "actor",
originStorageUuid = null,
readJournal = { disk.get() },
writeJournal = {
if (shouldFail) {
error("disk unavailable")
}
disk.set(it)
},
)
val entry = buffer.buildEntry("/a.txt", StorageSyncOperation.UPSERT, 1L)
try {
buffer.appendEntry("/a.txt", entry)
} catch (_: IllegalStateException) {
// expected
}
shouldFail = false
buffer.flushPending()
assertTrue(disk.get().containsKey("/a.txt"))
assertEquals(1L, disk.get()["/a.txt"]?.revision?.sequence)
}
}

View File

@@ -39,10 +39,10 @@ interface IStorageAccessor {
suspend fun setHidden(path: String, hidden: Boolean) suspend fun setHidden(path: String, hidden: Boolean)
suspend fun touchFile(path: String) suspend fun touchFile(path: String)
suspend fun touchDir(path: String) suspend fun touchDir(path: String)
suspend fun delete(path: String) suspend fun delete(path: String, recordSyncJournal: Boolean = true)
suspend fun openWrite(path: String, recordSyncJournal: Boolean = true): OutputStream suspend fun openWrite(path: String, recordSyncJournal: Boolean = true): OutputStream
suspend fun openRead(path: String): InputStream suspend fun openRead(path: String): InputStream
suspend fun moveToTrash(path: String) suspend fun moveToTrash(path: String, recordSyncJournal: Boolean = true)
/** /**
* Системный sidecar-файл для логических нужд хранилища (мета, ключи и т.п.). * Системный sidecar-файл для логических нужд хранилища (мета, ключи и т.п.).
@@ -53,6 +53,10 @@ interface IStorageAccessor {
suspend fun openWriteSystemFile(name: String): OutputStream suspend fun openWriteSystemFile(name: String): OutputStream
suspend fun readSyncJournal(): StorageSyncJournal suspend fun readSyncJournal(): StorageSyncJournal
/** Сбрасывает отложенные записи журнала на носитель (перед sync и при закрытии storage). */
suspend fun flushPendingSyncJournal() = Unit
suspend fun putSyncJournalEntries(entries: StorageSyncJournal) suspend fun putSyncJournalEntries(entries: StorageSyncJournal)
suspend fun readSyncLock(): StorageSyncLock? suspend fun readSyncLock(): StorageSyncLock?

View File

@@ -0,0 +1,6 @@
package com.github.nullptroma.wallenc.domain.tasks
/** Локализованный заголовок задачи синхронизации хранилищ по источнику запуска. */
fun interface IStorageSyncTaskTitleFormatter {
fun format(reason: StorageSyncTriggerReason): String
}

View File

@@ -14,4 +14,7 @@ interface TaskContext {
fun log(level: TaskLogLevel, key: TaskLogKey) fun log(level: TaskLogLevel, key: TaskLogKey)
fun fail(error: WallencException): Nothing fun fail(error: WallencException): Nothing
/** Проверяет, что задача не отменена; бросает [kotlinx.coroutines.CancellationException] при отмене. */
suspend fun ensureNotCancelled()
} }

View File

@@ -22,6 +22,8 @@ import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job import kotlinx.coroutines.Job
import kotlinx.coroutines.SupervisorJob import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.delay import kotlinx.coroutines.delay
import kotlin.coroutines.coroutineContext
import kotlinx.coroutines.ensureActive
import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.asStateFlow import kotlinx.coroutines.flow.asStateFlow
@@ -238,6 +240,10 @@ class TaskOrchestrator(
override fun fail(error: WallencException): Nothing { override fun fail(error: WallencException): Nothing {
throw TaskFailedException(error) throw TaskFailedException(error)
} }
override suspend fun ensureNotCancelled() {
coroutineContext.ensureActive()
}
} }
private class TaskFailedException(val error: WallencException) : RuntimeException() private class TaskFailedException(val error: WallencException) : RuntimeException()

View File

@@ -32,6 +32,24 @@ class TaskOrchestratorTest {
assertTrue(task.state is TaskRunState.Completed) assertTrue(task.state is TaskRunState.Completed)
} }
@Test
fun cancelAllMarksRunningTaskCancelled() = runTest(dispatcher) {
val orchestrator = TaskOrchestrator(dispatcher)
val id = orchestrator.enqueue(
title = "Long",
dispatcher = dispatcher,
work = { ctx ->
ctx.reportProgress(null, null)
kotlinx.coroutines.delay(60_000)
},
)
advanceTimeBy(1)
orchestrator.cancelAll()
advanceUntilIdle()
val task = orchestrator.pipelineState.value.tasks.first { it.id == id }
assertTrue(task.state is TaskRunState.Cancelled)
}
@Test @Test
fun cancelMarksTaskCancelled() = runTest(dispatcher) { fun cancelMarksTaskCancelled() = runTest(dispatcher) {
val orchestrator = TaskOrchestrator(dispatcher) val orchestrator = TaskOrchestrator(dispatcher)

View File

@@ -23,6 +23,9 @@ fun TaskLogKey.resolve(resolver: UiStringResolver): String = when (this) {
} }
} }
fun UiStringResolver.storageSyncTaskTitle(reason: StorageSyncTriggerReason): String =
this(R.string.task_title_storage_sync, resolveSyncTriggerReason(reason))
private fun UiStringResolver.resolveSyncTriggerReason(reason: StorageSyncTriggerReason): String = private fun UiStringResolver.resolveSyncTriggerReason(reason: StorageSyncTriggerReason): String =
when (reason) { when (reason) {
StorageSyncTriggerReason.Debounce -> this(R.string.task_sync_trigger_debounce) StorageSyncTriggerReason.Debounce -> this(R.string.task_sync_trigger_debounce)

View File

@@ -5,6 +5,7 @@ import androidx.lifecycle.viewModelScope
import com.github.nullptroma.wallenc.domain.datatypes.EncryptKey import com.github.nullptroma.wallenc.domain.datatypes.EncryptKey
import com.github.nullptroma.wallenc.domain.datatypes.StorageMetaLoadState import com.github.nullptroma.wallenc.domain.datatypes.StorageMetaLoadState
import com.github.nullptroma.wallenc.domain.datatypes.Tree import com.github.nullptroma.wallenc.domain.datatypes.Tree
import com.github.nullptroma.wallenc.domain.errors.WallencException
import com.github.nullptroma.wallenc.domain.errors.toWallencException import com.github.nullptroma.wallenc.domain.errors.toWallencException
import com.github.nullptroma.wallenc.domain.interfaces.ILogger import com.github.nullptroma.wallenc.domain.interfaces.ILogger
import com.github.nullptroma.wallenc.domain.interfaces.IStorage import com.github.nullptroma.wallenc.domain.interfaces.IStorage
@@ -188,9 +189,14 @@ abstract class AbstractVaultBrowserViewModel(
ctx.log(TaskLogLevel.Info, uiStrings(R.string.task_log_rescanning_vault_storages)) ctx.log(TaskLogLevel.Info, uiStrings(R.string.task_log_rescanning_vault_storages))
manageVaultUseCase.rescanStorages(vaultUuid) manageVaultUseCase.rescanStorages(vaultUuid)
ctx.log(TaskLogLevel.Info, uiStrings(R.string.task_log_rescan_vault_storages_done)) ctx.log(TaskLogLevel.Info, uiStrings(R.string.task_log_rescan_vault_storages_done))
val vault = manageVaultUseCase.find(vaultUuid)
if (vault != null && !vault.isAvailable.value) {
emitTaskError(WallencException.Network.IoFailed())
}
} catch (e: Exception) { } catch (e: Exception) {
logger.debug(TAG, "rescanStorages failed: ${e.stackTraceToString()}") logger.debug(TAG, "rescanStorages failed: ${e.stackTraceToString()}")
ctx.log(TaskLogLevel.Error, uiStrings(R.string.task_log_rescan_vault_storages_failed)) ctx.log(TaskLogLevel.Error, uiStrings(R.string.task_log_rescan_vault_storages_failed))
emitTaskError(e)
} }
}, },
) )
@@ -224,7 +230,7 @@ abstract class AbstractVaultBrowserViewModel(
} catch (e: Exception) { } catch (e: Exception) {
logger.debug(TAG, "createStorage failed: ${e.stackTraceToString()}") logger.debug(TAG, "createStorage failed: ${e.stackTraceToString()}")
ctx.log(TaskLogLevel.Error, uiStrings(R.string.task_log_add_vault_failed)) ctx.log(TaskLogLevel.Error, uiStrings(R.string.task_log_add_vault_failed))
throw e emitTaskError(e)
} }
}, },
) )

View File

@@ -208,10 +208,7 @@ class StorageSyncViewModel @Inject constructor(
} }
fun runSyncNow() { fun runSyncNow() {
val started = runStorageSyncUseCase.enqueue( val started = runStorageSyncUseCase.enqueue(StorageSyncTriggerReason.SyncTab)
displayTitle = uiStrings(R.string.task_title_storage_sync),
reason = StorageSyncTriggerReason.SyncTab,
)
if (!started) { if (!started) {
updateState( updateState(
state.value.copy( state.value.copy(

View File

@@ -149,8 +149,7 @@
<string name="task_title_remove_remote_vault">Удаление удалённого хранилища</string> <string name="task_title_remove_remote_vault">Удаление удалённого хранилища</string>
<string name="task_title_retry_remote_vault">Повторное подключение удалённого хранилища</string> <string name="task_title_retry_remote_vault">Повторное подключение удалённого хранилища</string>
<string name="task_title_rescan_vault_storages">Обновление списка хранилищ</string> <string name="task_title_rescan_vault_storages">Обновление списка хранилищ</string>
<string name="task_title_storage_sync">Синхронизация хранилищ</string> <string name="task_title_storage_sync">Синхронизация хранилищ (%1$s)</string>
<string name="task_title_storage_sync_background">Фоновая синхронизация хранилищ</string>
<string name="task_title_save_2fa_token">Сохранение 2FA токена</string> <string name="task_title_save_2fa_token">Сохранение 2FA токена</string>
<string name="task_title_delete_2fa_token">Удаление 2FA токена</string> <string name="task_title_delete_2fa_token">Удаление 2FA токена</string>
<string name="task_title_save_text_secret">Сохранение текстового секрета</string> <string name="task_title_save_text_secret">Сохранение текстового секрета</string>

View File

@@ -149,8 +149,7 @@
<string name="task_title_remove_remote_vault">Remove remote vault</string> <string name="task_title_remove_remote_vault">Remove remote vault</string>
<string name="task_title_retry_remote_vault">Retry remote vault connection</string> <string name="task_title_retry_remote_vault">Retry remote vault connection</string>
<string name="task_title_rescan_vault_storages">Rescan vault storages</string> <string name="task_title_rescan_vault_storages">Rescan vault storages</string>
<string name="task_title_storage_sync">Storage sync</string> <string name="task_title_storage_sync">Storage sync (%1$s)</string>
<string name="task_title_storage_sync_background">Background storage sync</string>
<string name="task_title_save_2fa_token">Save 2FA token</string> <string name="task_title_save_2fa_token">Save 2FA token</string>
<string name="task_title_delete_2fa_token">Delete 2FA token</string> <string name="task_title_delete_2fa_token">Delete 2FA token</string>
<string name="task_title_save_text_secret">Save text secret</string> <string name="task_title_save_text_secret">Save text secret</string>

View File

@@ -3,15 +3,19 @@ package com.github.nullptroma.wallenc.usecases
import com.github.nullptroma.wallenc.domain.errors.toWallencException import com.github.nullptroma.wallenc.domain.errors.toWallencException
import com.github.nullptroma.wallenc.domain.interfaces.IStorageSyncEngine import com.github.nullptroma.wallenc.domain.interfaces.IStorageSyncEngine
import com.github.nullptroma.wallenc.domain.tasks.ITaskOrchestrator import com.github.nullptroma.wallenc.domain.tasks.ITaskOrchestrator
import com.github.nullptroma.wallenc.domain.tasks.IStorageSyncTaskTitleFormatter
import com.github.nullptroma.wallenc.domain.tasks.StorageSyncTriggerReason import com.github.nullptroma.wallenc.domain.tasks.StorageSyncTriggerReason
import com.github.nullptroma.wallenc.domain.tasks.TaskId import com.github.nullptroma.wallenc.domain.tasks.TaskId
import com.github.nullptroma.wallenc.domain.tasks.TaskLogKey import com.github.nullptroma.wallenc.domain.tasks.TaskLogKey
import com.github.nullptroma.wallenc.domain.tasks.TaskLogLevel import com.github.nullptroma.wallenc.domain.tasks.TaskLogLevel
import com.github.nullptroma.wallenc.domain.tasks.TaskProgressLabel import com.github.nullptroma.wallenc.domain.tasks.TaskProgressLabel
import com.github.nullptroma.wallenc.domain.tasks.TaskRunState
import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.asStateFlow import kotlinx.coroutines.flow.asStateFlow
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.first
import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.atomic.AtomicBoolean
import javax.inject.Inject import javax.inject.Inject
import javax.inject.Singleton import javax.inject.Singleton
@@ -21,6 +25,7 @@ class RunStorageSyncUseCase @Inject constructor(
private val orchestrator: ITaskOrchestrator, private val orchestrator: ITaskOrchestrator,
private val syncEngine: IStorageSyncEngine, private val syncEngine: IStorageSyncEngine,
private val syncReadiness: StorageSyncReadiness, private val syncReadiness: StorageSyncReadiness,
private val taskTitleFormatter: IStorageSyncTaskTitleFormatter,
) { ) {
private val running = AtomicBoolean(false) private val running = AtomicBoolean(false)
@@ -30,25 +35,29 @@ class RunStorageSyncUseCase @Inject constructor(
private val _activeSyncTaskId = MutableStateFlow<TaskId?>(null) private val _activeSyncTaskId = MutableStateFlow<TaskId?>(null)
val activeSyncTaskId: StateFlow<TaskId?> = _activeSyncTaskId.asStateFlow() val activeSyncTaskId: StateFlow<TaskId?> = _activeSyncTaskId.asStateFlow()
/** Не реагировать на debounce до этого момента (мс с эпохи) после завершения sync. */
private val _debounceSuppressUntilMs = MutableStateFlow(0L)
val debounceSuppressUntilMs: StateFlow<Long> = _debounceSuppressUntilMs.asStateFlow()
/** /**
* @param displayTitle заголовок задачи в UI (локализованный на стороне вызова) * @param reason источник запуска — заголовок задачи и лог пайплайна
* @param reason источник запуска — попадает в лог пайплайна
* @return false, если синхронизация уже в очереди или выполняется — новая задача не создана * @return false, если синхронизация уже в очереди или выполняется — новая задача не создана
*/ */
fun enqueue(displayTitle: String, reason: StorageSyncTriggerReason): Boolean { fun enqueue(reason: StorageSyncTriggerReason): Boolean {
if (!running.compareAndSet(false, true)) { if (!running.compareAndSet(false, true)) {
return false return false
} }
_syncRunning.value = true _syncRunning.value = true
try { try {
val taskId = orchestrator.enqueue( val taskId = orchestrator.enqueue(
title = displayTitle, title = taskTitleFormatter.format(reason),
dispatcher = Dispatchers.IO, dispatcher = Dispatchers.IO,
work = { ctx -> work = { ctx ->
try { try {
executeSync( executeSync(
reason = reason, reason = reason,
reportProgress = { fraction, label -> reportProgress = { fraction, label ->
ctx.ensureNotCancelled()
ctx.reportProgress(fraction, label) ctx.reportProgress(fraction, label)
}, },
log = { level, key -> ctx.log(level, key) }, log = { level, key -> ctx.log(level, key) },
@@ -68,19 +77,22 @@ class RunStorageSyncUseCase @Inject constructor(
} }
} }
suspend fun runBlocking(reason: StorageSyncTriggerReason) { /**
if (!running.compareAndSet(false, true)) { * Ставит sync в пайплайн задач (как debounce / sync-tab) и ждёт завершения.
return * Для WorkManager и других фоновых запусков без отдельного «orphan»-лога.
*/
suspend fun enqueueAndAwait(reason: StorageSyncTriggerReason): StorageSyncRunOutcome {
if (!enqueue(reason)) {
return StorageSyncRunOutcome.SkippedAlreadyRunning
} }
_syncRunning.value = true val taskId = _activeSyncTaskId.value
try { ?: return StorageSyncRunOutcome.Completed
executeSync( syncRunning.filter { !it }.first()
reason = reason, val state = orchestrator.pipelineState.value.tasks.find { it.id == taskId }?.state
reportProgress = { _, _ -> }, return when (state) {
log = { level, key -> orchestrator.appendPipelineLog(level, key) }, is TaskRunState.Failed -> StorageSyncRunOutcome.Failed(state.error)
) TaskRunState.Cancelled -> StorageSyncRunOutcome.Cancelled
} finally { else -> StorageSyncRunOutcome.Completed
clearRunningState()
} }
} }
@@ -89,25 +101,38 @@ class RunStorageSyncUseCase @Inject constructor(
reportProgress: suspend (fraction: Float?, label: TaskProgressLabel?) -> Unit, reportProgress: suspend (fraction: Float?, label: TaskProgressLabel?) -> Unit,
log: (TaskLogLevel, TaskLogKey) -> Unit, log: (TaskLogLevel, TaskLogKey) -> Unit,
) { ) {
syncReadiness.awaitReady()
log(TaskLogLevel.Info, TaskLogKey.SyncStarted(reason))
reportProgress(null, TaskProgressLabel.SyncStarted)
try { try {
syncEngine.syncAllGroups { fraction, label -> syncReadiness.awaitReady()
reportProgress(fraction, label) log(TaskLogLevel.Info, TaskLogKey.SyncStarted(reason))
reportProgress(null, TaskProgressLabel.SyncStarted)
try {
syncEngine.syncAllGroups { fraction, label ->
reportProgress(fraction, label)
}
log(TaskLogLevel.Info, TaskLogKey.SyncFinished(reason))
reportProgress(null, TaskProgressLabel.SyncCompleted)
} catch (e: Exception) {
val err = e.toWallencException()
log(TaskLogLevel.Error, TaskLogKey.SyncFailed(err, reason))
throw e
} }
log(TaskLogLevel.Info, TaskLogKey.SyncFinished(reason)) } finally {
reportProgress(null, TaskProgressLabel.SyncCompleted) extendDebounceSuppress()
} catch (e: Exception) {
val err = e.toWallencException()
log(TaskLogLevel.Error, TaskLogKey.SyncFailed(err, reason))
throw e
} }
} }
private fun extendDebounceSuppress() {
_debounceSuppressUntilMs.value = System.currentTimeMillis() + DEBOUNCE_AFTER_CHANGE_MS
}
private fun clearRunningState() { private fun clearRunningState() {
running.set(false) running.set(false)
_syncRunning.value = false _syncRunning.value = false
_activeSyncTaskId.value = null _activeSyncTaskId.value = null
} }
companion object {
/** Пауза после последнего изменения перед debounce-sync; же окно подавления после sync. */
const val DEBOUNCE_AFTER_CHANGE_MS = 60_000L
}
} }

View File

@@ -12,6 +12,11 @@ import com.github.nullptroma.wallenc.domain.interfaces.IStorageSyncGroupStore
import com.github.nullptroma.wallenc.domain.interfaces.IVaultsManager import com.github.nullptroma.wallenc.domain.interfaces.IVaultsManager
import com.github.nullptroma.wallenc.domain.tasks.TaskProgressLabel import com.github.nullptroma.wallenc.domain.tasks.TaskProgressLabel
import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.async
import kotlinx.coroutines.awaitAll
import kotlinx.coroutines.coroutineScope
import kotlin.coroutines.coroutineContext
import kotlinx.coroutines.ensureActive
import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock import kotlinx.coroutines.sync.withLock
import kotlinx.coroutines.withContext import kotlinx.coroutines.withContext
@@ -48,6 +53,7 @@ class StorageSyncEngine @Inject constructor(
} }
reporter(null, TaskProgressLabel.SyncPreparing(groups.size)) reporter(null, TaskProgressLabel.SyncPreparing(groups.size))
for (group in groups) { for (group in groups) {
coroutineContext.ensureActive()
syncGroupInternal( syncGroupInternal(
groupId = group.id, groupId = group.id,
reportProgress = reporter, reportProgress = reporter,
@@ -104,6 +110,7 @@ class StorageSyncEngine @Inject constructor(
try { try {
reportProgress(null, TaskProgressLabel.SyncGroupAcquiringLocks(groupId)) reportProgress(null, TaskProgressLabel.SyncGroupAcquiringLocks(groupId))
for ((lockIndex, storage) in storages.withIndex()) { for ((lockIndex, storage) in storages.withIndex()) {
coroutineContext.ensureActive()
reportProgress( reportProgress(
null, null,
TaskProgressLabel.SyncGroupLockProgress(groupId, lockIndex + 1, storages.size), TaskProgressLabel.SyncGroupLockProgress(groupId, lockIndex + 1, storages.size),
@@ -120,22 +127,34 @@ class StorageSyncEngine @Inject constructor(
val entriesByStorage = mutableMapOf<UUID, StorageSyncJournal>() val entriesByStorage = mutableMapOf<UUID, StorageSyncJournal>()
reportProgress(null, TaskProgressLabel.SyncGroupReadingJournals(groupId)) reportProgress(null, TaskProgressLabel.SyncGroupReadingJournals(groupId))
for ((journalIndex, storage) in storages.withIndex()) { leaseUntil = renewLocksIfNeeded(
leaseUntil = renewLocksIfNeeded( groupId = groupId,
groupId = groupId, lockedAccessors = lockedAccessors,
lockedAccessors = lockedAccessors, currentLeaseUntil = leaseUntil,
currentLeaseUntil = leaseUntil, reportProgress = reportProgress,
reportProgress = reportProgress, ) ?: return
) ?: return if (syncGeneration.get() != generationSnapshot) {
if (syncGeneration.get() != generationSnapshot) { reportProgress(null, TaskProgressLabel.SyncGroupCancelled(groupId))
reportProgress(null, TaskProgressLabel.SyncGroupCancelled(groupId)) return
return }
} val journalReads = coroutineScope {
reportProgress( storages.mapIndexed { journalIndex, storage ->
null, async {
TaskProgressLabel.SyncGroupJournalProgress(groupId, journalIndex + 1, storages.size), coroutineContext.ensureActive()
) reportProgress(
val journal = filterSyncableJournal(storage.accessor.readSyncJournal()) null,
TaskProgressLabel.SyncGroupJournalProgress(
groupId,
journalIndex + 1,
storages.size,
),
)
storage.accessor.flushPendingSyncJournal()
storage to filterSyncableJournal(storage.accessor.readSyncJournal())
}
}.awaitAll()
}
for ((storage, journal) in journalReads) {
entriesByStorage[storage.uuid] = journal entriesByStorage[storage.uuid] = journal
mergedByPath.putAll( mergedByPath.putAll(
StorageSyncJournalMerge.merge(mergedByPath, journal), StorageSyncJournalMerge.merge(mergedByPath, journal),
@@ -154,6 +173,7 @@ class StorageSyncEngine @Inject constructor(
) )
var applyFailures = 0 var applyFailures = 0
for ((pathIndex, merged) in mergedEntries.withIndex()) { for ((pathIndex, merged) in mergedEntries.withIndex()) {
coroutineContext.ensureActive()
leaseUntil = renewLocksIfNeeded( leaseUntil = renewLocksIfNeeded(
groupId = groupId, groupId = groupId,
lockedAccessors = lockedAccessors, lockedAccessors = lockedAccessors,
@@ -178,6 +198,7 @@ class StorageSyncEngine @Inject constructor(
} }
for (target in storages) { for (target in storages) {
coroutineContext.ensureActive()
if (target.uuid == sourceStorage?.uuid) { if (target.uuid == sourceStorage?.uuid) {
continue continue
} }
@@ -231,6 +252,7 @@ class StorageSyncEngine @Inject constructor(
val nextLeaseUntil = now.plusSeconds(SYNC_LOCK_LEASE_SECONDS) val nextLeaseUntil = now.plusSeconds(SYNC_LOCK_LEASE_SECONDS)
reportProgress(null, TaskProgressLabel.SyncGroupRenewingLocks(groupId)) reportProgress(null, TaskProgressLabel.SyncGroupRenewingLocks(groupId))
for (accessor in lockedAccessors) { for (accessor in lockedAccessors) {
coroutineContext.ensureActive()
val renewed = runCatching { val renewed = runCatching {
accessor.tryAcquireSyncLock(holderId, nextLeaseUntil) accessor.tryAcquireSyncLock(holderId, nextLeaseUntil)
}.getOrElse { false } }.getOrElse { false }
@@ -273,13 +295,13 @@ class StorageSyncEngine @Inject constructor(
val result = when (entry.operation) { val result = when (entry.operation) {
StorageSyncOperation.DELETE -> { StorageSyncOperation.DELETE -> {
runCatching { runCatching {
target.accessor.delete(entry.path) target.accessor.delete(entry.path, recordSyncJournal = false)
} }
} }
StorageSyncOperation.TRASH -> { StorageSyncOperation.TRASH -> {
runCatching { runCatching {
target.accessor.moveToTrash(entry.path) target.accessor.moveToTrash(entry.path, recordSyncJournal = false)
} }
} }

View File

@@ -3,6 +3,8 @@ package com.github.nullptroma.wallenc.usecases
import com.github.nullptroma.wallenc.domain.interfaces.IStorageSyncGroupStore import com.github.nullptroma.wallenc.domain.interfaces.IStorageSyncGroupStore
import com.github.nullptroma.wallenc.domain.interfaces.IVaultsManager import com.github.nullptroma.wallenc.domain.interfaces.IVaultsManager
import kotlinx.coroutines.delay import kotlinx.coroutines.delay
import kotlin.coroutines.coroutineContext
import kotlinx.coroutines.ensureActive
import javax.inject.Inject import javax.inject.Inject
import javax.inject.Singleton import javax.inject.Singleton
@@ -27,6 +29,7 @@ class StorageSyncReadiness @Inject constructor(
val deadline = System.currentTimeMillis() + timeoutMs val deadline = System.currentTimeMillis() + timeoutMs
while (System.currentTimeMillis() < deadline) { while (System.currentTimeMillis() < deadline) {
coroutineContext.ensureActive()
if (!isAnyVaultScanning()) { if (!isAnyVaultScanning()) {
break break
} }
@@ -34,6 +37,7 @@ class StorageSyncReadiness @Inject constructor(
} }
while (System.currentTimeMillis() < deadline) { while (System.currentTimeMillis() < deadline) {
coroutineContext.ensureActive()
if (requiredUuids.all { uuid -> findStorageUseCase.find(uuid) != null }) { if (requiredUuids.all { uuid -> findStorageUseCase.find(uuid) != null }) {
return return
} }

View File

@@ -0,0 +1,16 @@
package com.github.nullptroma.wallenc.usecases
import com.github.nullptroma.wallenc.domain.errors.WallencException
/** Результат [RunStorageSyncUseCase.enqueueAndAwait]. */
sealed class StorageSyncRunOutcome {
/** Синхронизация уже выполнялась — новая задача не создана. */
data object SkippedAlreadyRunning : StorageSyncRunOutcome()
data object Completed : StorageSyncRunOutcome()
/** Задача синхронизации отменена пользователем или пайплайном. */
data object Cancelled : StorageSyncRunOutcome()
data class Failed(val error: WallencException) : StorageSyncRunOutcome()
}

View File

@@ -2,6 +2,7 @@ package com.github.nullptroma.wallenc.usecases
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournalEntry import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournalEntry
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncOperation import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncOperation
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncPaths
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncRevision import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncRevision
import com.github.nullptroma.wallenc.domain.interfaces.StorageSyncGroup import com.github.nullptroma.wallenc.domain.interfaces.StorageSyncGroup
import com.github.nullptroma.wallenc.domain.interfaces.StorageSyncGroupEncryptionKind import com.github.nullptroma.wallenc.domain.interfaces.StorageSyncGroupEncryptionKind
@@ -10,8 +11,11 @@ import com.github.nullptroma.wallenc.usecases.fakes.FakeStorage
import com.github.nullptroma.wallenc.usecases.fakes.FakeStorageAccessor import com.github.nullptroma.wallenc.usecases.fakes.FakeStorageAccessor
import com.github.nullptroma.wallenc.usecases.fakes.FakeStorageSyncGroupStore import com.github.nullptroma.wallenc.usecases.fakes.FakeStorageSyncGroupStore
import com.github.nullptroma.wallenc.usecases.fakes.FakeVaultsManager import com.github.nullptroma.wallenc.usecases.fakes.FakeVaultsManager
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.async
import kotlinx.coroutines.runBlocking import kotlinx.coroutines.runBlocking
import org.junit.Assert.assertArrayEquals import org.junit.Assert.assertArrayEquals
import org.junit.Assert.assertEquals
import org.junit.Assert.assertNull import org.junit.Assert.assertNull
import org.junit.Assert.assertTrue import org.junit.Assert.assertTrue
import org.junit.Test import org.junit.Test
@@ -19,6 +23,8 @@ import java.time.Instant
class StorageSyncEngineTest { class StorageSyncEngineTest {
private fun norm(path: String): String = StorageSyncPaths.normalize(path)
@Test @Test
fun syncAllGroupsReportsNoGroupsWhenEmpty() = runBlocking { fun syncAllGroupsReportsNoGroupsWhenEmpty() = runBlocking {
val labels = mutableListOf<TaskProgressLabel?>() val labels = mutableListOf<TaskProgressLabel?>()
@@ -63,7 +69,7 @@ class StorageSyncEngineTest {
assertArrayEquals(payload, target.fileBytes(path)) assertArrayEquals(payload, target.fileBytes(path))
assertTrue(labels.any { it is TaskProgressLabel.SyncGroupCompleted }) assertTrue(labels.any { it is TaskProgressLabel.SyncGroupCompleted })
assertTrue(target.syncJournalEntries().any { it.path == path }) assertTrue(target.syncJournalEntries().any { it.path == norm(path) })
} }
@Test @Test
@@ -111,6 +117,81 @@ class StorageSyncEngineTest {
engine.syncGroup(group.id) { _, _ -> } engine.syncGroup(group.id) { _, _ -> }
assertNull(target.fileBytes(path)) assertNull(target.fileBytes(path))
val targetEntry = target.syncJournalEntries().single { it.path == norm(path) }
assertEquals(2L, targetEntry.revision.sequence)
assertEquals("actor-b", targetEntry.revision.actorId)
assertEquals(StorageSyncOperation.DELETE, targetEntry.operation)
}
@Test
fun syncSkipsWhenTargetRevisionAlreadyWinner() = runBlocking {
val source = FakeStorage()
val target = FakeStorage()
val path = "already-synced.txt"
val payload = "same".encodeToByteArray()
source.putFile(path, payload)
target.putFile(path, payload)
val winner = StorageSyncJournalEntry(
path = path,
operation = StorageSyncOperation.UPSERT,
revision = StorageSyncRevision(
sequence = 5L,
actorId = "winner-actor",
createdAt = Instant.parse("2024-08-01T00:00:00Z"),
),
size = payload.size.toLong(),
)
source.addSyncJournalEntry(winner)
target.addSyncJournalEntry(winner)
val group = StorageSyncGroup(
id = "skip-group",
storageUuids = setOf(source.uuid, target.uuid),
encryptionKind = StorageSyncGroupEncryptionKind.NONE,
)
val engine = createEngine(
storages = listOf(source, target),
groups = listOf(group),
)
engine.syncGroup(group.id) { _, _ -> }
val targetJournal = target.syncJournalEntries().single { it.path == norm(path) }
assertEquals(winner.revision, targetJournal.revision)
assertEquals(winner.operation, targetJournal.operation)
}
@Test
fun openReadDoesNotChangeJournal() = runBlocking {
val storage = FakeStorage()
val path = "read-only.txt"
storage.putFile(path, "data".encodeToByteArray())
val before = storage.syncJournalEntries().size
storage.accessor.openRead(path).use { it.readBytes() }
assertEquals(before, storage.syncJournalEntries().size)
}
@Test
fun deleteWithRecordSyncJournalFalseDoesNotBumpSequence() = runBlocking {
val storage = FakeStorage()
val path = "to-delete.txt"
storage.putFile(path, "x".encodeToByteArray())
storage.addSyncJournalEntry(
StorageSyncJournalEntry(
path = path,
operation = StorageSyncOperation.UPSERT,
revision = StorageSyncRevision(10L, "prior", Instant.EPOCH),
),
)
storage.accessor.delete(path, recordSyncJournal = false)
assertNull(storage.fileBytes(path))
val entry = storage.syncJournalEntries().single { it.path == norm(path) }
assertEquals(10L, entry.revision.sequence)
assertEquals(StorageSyncOperation.UPSERT, entry.operation)
} }
@Test @Test
@@ -145,7 +226,11 @@ class StorageSyncEngineTest {
engine.syncGroup(group.id) { _, _ -> } engine.syncGroup(group.id) { _, _ -> }
assertArrayEquals(payload, target.fileBytes(path)) assertArrayEquals(payload, target.fileBytes(path))
assertTrue(path in (target.accessor as FakeStorageAccessor).trashedPaths) assertTrue(norm(path) in (target.accessor as FakeStorageAccessor).trashedPaths)
val targetEntry = target.syncJournalEntries().single { it.path == norm(path) }
assertEquals(3L, targetEntry.revision.sequence)
assertEquals("actor-trash", targetEntry.revision.actorId)
assertEquals(StorageSyncOperation.TRASH, targetEntry.operation)
} }
@Test @Test
@@ -171,6 +256,113 @@ class StorageSyncEngineTest {
) )
engine.syncGroup(group.id) { _, label -> labels.add(label) } engine.syncGroup(group.id) { _, label -> labels.add(label) }
assertTrue(labels.any { it is TaskProgressLabel.SyncGroupLockFailed }) assertTrue(labels.any { it is TaskProgressLabel.SyncGroupLockFailed })
assertNull((first.accessor as FakeStorageAccessor).syncLock)
assertNull((second.accessor as FakeStorageAccessor).syncLock)
}
@Test
fun syncGroupReleasesLocksAfterSuccessfulSync() = runBlocking {
val source = FakeStorage()
val target = FakeStorage()
source.addSyncJournalEntry(
StorageSyncJournalEntry(
path = "a.txt",
operation = StorageSyncOperation.UPSERT,
revision = StorageSyncRevision(1L, "x", Instant.EPOCH),
),
)
source.putFile("a.txt", "x".encodeToByteArray())
val group = StorageSyncGroup(
id = "ok",
storageUuids = setOf(source.uuid, target.uuid),
encryptionKind = StorageSyncGroupEncryptionKind.NONE,
)
val engine = createEngine(
storages = listOf(source, target),
groups = listOf(group),
)
engine.syncGroup(group.id) { _, _ -> }
assertNull((source.accessor as FakeStorageAccessor).syncLock)
assertNull((target.accessor as FakeStorageAccessor).syncLock)
}
@Test
fun syncGroupReleasesLocksWhenJournalReadFails() = runBlocking {
val first = FakeStorage()
val second = FakeStorage()
(first.accessor as FakeStorageAccessor).readSyncJournalThrows =
IllegalStateException("journal read failed")
val group = StorageSyncGroup(
id = "journal-fail",
storageUuids = setOf(first.uuid, second.uuid),
encryptionKind = StorageSyncGroupEncryptionKind.NONE,
)
val engine = createEngine(
storages = listOf(first, second),
groups = listOf(group),
)
runCatching { engine.syncGroup(group.id) { _, _ -> } }
assertNull((first.accessor as FakeStorageAccessor).syncLock)
assertNull((second.accessor as FakeStorageAccessor).syncLock)
}
@Test
fun syncGroupCooperativeCancellationReleasesLocks() = runBlocking {
val source = FakeStorage()
val target = FakeStorage()
val path = "slow.txt"
val payload = "payload".encodeToByteArray()
source.putFile(path, payload)
(source.accessor as FakeStorageAccessor).openReadDelayMs = 5_000
source.addSyncJournalEntry(
StorageSyncJournalEntry(
path = path,
operation = StorageSyncOperation.UPSERT,
revision = StorageSyncRevision(1L, "actor", Instant.EPOCH),
size = payload.size.toLong(),
),
)
val group = StorageSyncGroup(
id = "cancel-group",
storageUuids = setOf(source.uuid, target.uuid),
encryptionKind = StorageSyncGroupEncryptionKind.NONE,
)
val engine = createEngine(
storages = listOf(source, target),
groups = listOf(group),
)
val job = async {
engine.syncGroup(group.id) { _, _ -> }
}
kotlinx.coroutines.delay(50)
job.cancel()
try {
job.await()
} catch (_: CancellationException) {
// expected
}
assertNull((source.accessor as FakeStorageAccessor).syncLock)
assertNull((target.accessor as FakeStorageAccessor).syncLock)
}
@Test
fun syncGroupReleasesLocksWhenJournalEmpty() = runBlocking {
val first = FakeStorage()
val second = FakeStorage()
val group = StorageSyncGroup(
id = "empty-journal",
storageUuids = setOf(first.uuid, second.uuid),
encryptionKind = StorageSyncGroupEncryptionKind.NONE,
)
val labels = mutableListOf<TaskProgressLabel?>()
val engine = createEngine(
storages = listOf(first, second),
groups = listOf(group),
)
engine.syncGroup(group.id) { _, label -> labels.add(label) }
assertTrue(labels.any { it is TaskProgressLabel.SyncGroupNoJournalEntries })
assertNull((first.accessor as FakeStorageAccessor).syncLock)
assertNull((second.accessor as FakeStorageAccessor).syncLock)
} }
private fun createEngine( private fun createEngine(

View File

@@ -41,10 +41,12 @@ class FakeStorage(
} }
fun putFile(path: String, bytes: ByteArray) { fun putFile(path: String, bytes: ByteArray) {
accessorImpl.dataFiles[path] = bytes accessorImpl.dataFiles[com.github.nullptroma.wallenc.domain.datatypes.StorageSyncPaths.normalize(path)] =
bytes
} }
fun fileBytes(path: String): ByteArray? = accessorImpl.dataFiles[path] fun fileBytes(path: String): ByteArray? =
accessorImpl.dataFiles[com.github.nullptroma.wallenc.domain.datatypes.StorageSyncPaths.normalize(path)]
fun addSyncJournalEntry(entry: StorageSyncJournalEntry) { fun addSyncJournalEntry(entry: StorageSyncJournalEntry) {
accessorImpl.syncJournal = StorageSyncJournalMerge.merge( accessorImpl.syncJournal = StorageSyncJournalMerge.merge(

View File

@@ -5,10 +5,14 @@ import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournal
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournalEntry import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournalEntry
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournalMerge import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournalMerge
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncLock import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncLock
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncOperation
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncPaths
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncRevision
import com.github.nullptroma.wallenc.domain.interfaces.IDirectory import com.github.nullptroma.wallenc.domain.interfaces.IDirectory
import com.github.nullptroma.wallenc.domain.interfaces.IFile import com.github.nullptroma.wallenc.domain.interfaces.IFile
import com.github.nullptroma.wallenc.domain.interfaces.IMetaInfo import com.github.nullptroma.wallenc.domain.interfaces.IMetaInfo
import com.github.nullptroma.wallenc.domain.interfaces.IStorageAccessor import com.github.nullptroma.wallenc.domain.interfaces.IStorageAccessor
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableSharedFlow import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.MutableStateFlow
@@ -23,6 +27,8 @@ import java.time.Instant
class FakeStorageAccessor : IStorageAccessor { class FakeStorageAccessor : IStorageAccessor {
val dataFiles: MutableMap<String, ByteArray> = mutableMapOf() val dataFiles: MutableMap<String, ByteArray> = mutableMapOf()
private fun norm(path: String): String = StorageSyncPaths.normalize(path)
val trashedPaths: MutableSet<String> = mutableSetOf() val trashedPaths: MutableSet<String> = mutableSetOf()
private val systemFiles: MutableMap<String, ByteArray> = mutableMapOf() private val systemFiles: MutableMap<String, ByteArray> = mutableMapOf()
private val _filesUpdates = MutableSharedFlow<DataPage<IFile>>(extraBufferCapacity = 16) private val _filesUpdates = MutableSharedFlow<DataPage<IFile>>(extraBufferCapacity = 16)
@@ -30,6 +36,8 @@ class FakeStorageAccessor : IStorageAccessor {
var syncJournal: StorageSyncJournal = emptyMap() var syncJournal: StorageSyncJournal = emptyMap()
var syncLock: StorageSyncLock? = null var syncLock: StorageSyncLock? = null
var acquireLockResult: Boolean = true var acquireLockResult: Boolean = true
var readSyncJournalThrows: Throwable? = null
var openReadDelayMs: Long = 0
override val size: StateFlow<Long?> = MutableStateFlow(0L) override val size: StateFlow<Long?> = MutableStateFlow(0L)
override val numberOfFiles: StateFlow<Int?> = MutableStateFlow(0) override val numberOfFiles: StateFlow<Int?> = MutableStateFlow(0)
@@ -50,7 +58,9 @@ class FakeStorageAccessor : IStorageAccessor {
override fun getDirsFlow(path: String): Flow<DataPage<IDirectory>> = emptyFlow() override fun getDirsFlow(path: String): Flow<DataPage<IDirectory>> = emptyFlow()
override suspend fun getFileInfo(path: String): IFile { override suspend fun getFileInfo(path: String): IFile {
error("Not implemented in tests") val key = norm(path)
val bytes = dataFiles[key] ?: throw IllegalStateException("File not found: $path")
return FakeFile(key, bytes.size.toLong())
} }
override suspend fun getDirInfo(path: String): IDirectory { override suspend fun getDirInfo(path: String): IDirectory {
@@ -63,14 +73,17 @@ class FakeStorageAccessor : IStorageAccessor {
override suspend fun touchDir(path: String) = Unit override suspend fun touchDir(path: String) = Unit
override suspend fun delete(path: String) { override suspend fun delete(path: String, recordSyncJournal: Boolean) {
dataFiles.remove(path) dataFiles.remove(norm(path))
if (recordSyncJournal) {
recordDeleteJournal(path)
}
} }
override suspend fun openWrite(path: String, recordSyncJournal: Boolean): OutputStream { override suspend fun openWrite(path: String, recordSyncJournal: Boolean): OutputStream {
return object : ByteArrayOutputStream() { return object : ByteArrayOutputStream() {
override fun close() { override fun close() {
dataFiles[path] = toByteArray() dataFiles[norm(path)] = toByteArray()
_filesUpdates.tryEmit( _filesUpdates.tryEmit(
DataPage( DataPage(
listOf(FakeFile(path)), listOf(FakeFile(path)),
@@ -83,13 +96,20 @@ class FakeStorageAccessor : IStorageAccessor {
} }
override suspend fun openRead(path: String): InputStream { override suspend fun openRead(path: String): InputStream {
val bytes = dataFiles[path] ?: throw IllegalStateException("File not found: $path") if (openReadDelayMs > 0) {
delay(openReadDelayMs)
}
val bytes = dataFiles[norm(path)] ?: throw IllegalStateException("File not found: $path")
return ByteArrayInputStream(bytes) return ByteArrayInputStream(bytes)
} }
override suspend fun moveToTrash(path: String) { override suspend fun moveToTrash(path: String, recordSyncJournal: Boolean) {
if (path in dataFiles) { val key = norm(path)
trashedPaths.add(path) if (key in dataFiles) {
trashedPaths.add(key)
if (recordSyncJournal) {
recordTrashJournal(path)
}
} }
} }
@@ -106,12 +126,43 @@ class FakeStorageAccessor : IStorageAccessor {
} }
} }
override suspend fun readSyncJournal(): StorageSyncJournal = syncJournal override suspend fun readSyncJournal(): StorageSyncJournal {
readSyncJournalThrows?.let { throw it }
return syncJournal
}
override suspend fun flushPendingSyncJournal() = Unit
override suspend fun putSyncJournalEntries(entries: StorageSyncJournal) { override suspend fun putSyncJournalEntries(entries: StorageSyncJournal) {
syncJournal = StorageSyncJournalMerge.merge(syncJournal, entries) syncJournal = StorageSyncJournalMerge.merge(syncJournal, entries)
} }
private suspend fun recordDeleteJournal(path: String) {
appendJournalEntry(path, StorageSyncOperation.DELETE)
}
private suspend fun recordTrashJournal(path: String) {
appendJournalEntry(path, StorageSyncOperation.TRASH)
}
private suspend fun appendJournalEntry(path: String, operation: StorageSyncOperation) {
val cleaned = norm(path)
val nextSequence = (syncJournal.values.maxOfOrNull { it.revision.sequence } ?: 0L) + 1L
putSyncJournalEntries(
mapOf(
cleaned to StorageSyncJournalEntry(
path = cleaned,
operation = operation,
revision = StorageSyncRevision(
sequence = nextSequence,
actorId = "fake-actor",
createdAt = Instant.now(),
),
),
),
)
}
override suspend fun readSyncLock(): StorageSyncLock? = syncLock override suspend fun readSyncLock(): StorageSyncLock? = syncLock
override suspend fun tryAcquireSyncLock(holderId: String, leaseUntil: Instant): Boolean { override suspend fun tryAcquireSyncLock(holderId: String, leaseUntil: Instant): Boolean {
@@ -131,9 +182,12 @@ class FakeStorageAccessor : IStorageAccessor {
} }
} }
class FakeFile(path: String) : IFile { class FakeFile(
path: String,
size: Long = 0L,
) : IFile {
override val metaInfo: IMetaInfo = object : IMetaInfo { override val metaInfo: IMetaInfo = object : IMetaInfo {
override val size: Long = 0L override val size: Long = size
override val isDeleted: Boolean = false override val isDeleted: Boolean = false
override val isHidden: Boolean = false override val isHidden: Boolean = false
override val lastModified: Instant = Instant.now() override val lastModified: Instant = Instant.now()