Compare commits
5 Commits
08caf08fad
...
99cb410919
| Author | SHA1 | Date | |
|---|---|---|---|
| 99cb410919 | |||
| d3eac81660 | |||
| d0f490a3fd | |||
| 51e6f40587 | |||
| ef40aa9e73 |
@@ -80,7 +80,7 @@ class YandexSignInService @Inject constructor(
|
||||
return
|
||||
}
|
||||
synchronized(this) { pending = onResult }
|
||||
l.launch(YandexAuthLoginOptions(LoginType.WEBVIEW))
|
||||
l.launch(YandexAuthLoginOptions(LoginType.NATIVE))
|
||||
}
|
||||
|
||||
private fun mapYandexResult(result: YandexAuthResult): VaultLinkOutcome = when (result) {
|
||||
|
||||
@@ -1,8 +1,10 @@
|
||||
package com.github.nullptroma.wallenc.app.sync
|
||||
|
||||
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncPaths
|
||||
import com.github.nullptroma.wallenc.domain.interfaces.IVaultsManager
|
||||
import com.github.nullptroma.wallenc.ui.R
|
||||
import com.github.nullptroma.wallenc.ui.resources.UiStringResolver
|
||||
import com.github.nullptroma.wallenc.domain.tasks.StorageSyncTriggerReason
|
||||
import com.github.nullptroma.wallenc.usecases.RunStorageSyncUseCase
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.Dispatchers
|
||||
@@ -10,6 +12,7 @@ import kotlinx.coroutines.FlowPreview
|
||||
import kotlinx.coroutines.SupervisorJob
|
||||
import kotlinx.coroutines.flow.collectLatest
|
||||
import kotlinx.coroutines.flow.debounce
|
||||
import kotlinx.coroutines.flow.filter
|
||||
import kotlinx.coroutines.flow.map
|
||||
import kotlinx.coroutines.flow.merge
|
||||
import kotlinx.coroutines.launch
|
||||
@@ -35,16 +38,31 @@ class StorageSyncBootstrap @Inject constructor(
|
||||
}
|
||||
val triggers = storages.flatMap { storage ->
|
||||
listOf(
|
||||
storage.accessor.filesUpdates.map {},
|
||||
storage.accessor.dirsUpdates.map {},
|
||||
storage.accessor.filesUpdates
|
||||
.filter { page ->
|
||||
page.data.any { file ->
|
||||
StorageSyncPaths.isSyncableUserPath(file.metaInfo.path)
|
||||
}
|
||||
}
|
||||
.map {},
|
||||
storage.accessor.dirsUpdates
|
||||
.filter { page ->
|
||||
page.data.any { dir ->
|
||||
StorageSyncPaths.isSyncableUserPath(dir.metaInfo.path)
|
||||
}
|
||||
}
|
||||
.map {},
|
||||
)
|
||||
}
|
||||
merge(*triggers.toTypedArray())
|
||||
.debounce(DEBOUNCE_AFTER_CHANGE_MS)
|
||||
.collect {
|
||||
if (syncRunner.syncRunning.value) {
|
||||
return@collect
|
||||
}
|
||||
syncRunner.enqueue(
|
||||
displayTitle = uiStrings(R.string.task_title_storage_sync_background),
|
||||
logReason = "debounce",
|
||||
reason = StorageSyncTriggerReason.Debounce,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,6 +4,7 @@ import android.content.Context
|
||||
import androidx.hilt.work.HiltWorker
|
||||
import androidx.work.CoroutineWorker
|
||||
import androidx.work.WorkerParameters
|
||||
import com.github.nullptroma.wallenc.domain.tasks.StorageSyncTriggerReason
|
||||
import com.github.nullptroma.wallenc.usecases.RunStorageSyncUseCase
|
||||
import dagger.assisted.Assisted
|
||||
import dagger.assisted.AssistedInject
|
||||
@@ -19,7 +20,7 @@ class StorageSyncWorker @AssistedInject constructor(
|
||||
override suspend fun doWork(): Result {
|
||||
Timber.d("Periodic storage sync started (attempt %d)", runAttemptCount)
|
||||
return runCatching {
|
||||
syncRunner.runBlocking()
|
||||
syncRunner.runBlocking(StorageSyncTriggerReason.Background)
|
||||
Timber.d("Periodic storage sync finished")
|
||||
Result.success()
|
||||
}.getOrElse { error ->
|
||||
|
||||
@@ -54,28 +54,26 @@ class YandexDiskApiFactory(
|
||||
}
|
||||
|
||||
/**
|
||||
* Провайдер токена читает [YandexAccountRepository] на IO-диспетчере (как и остальной data-слой).
|
||||
* OAuth-токен загружается один раз при создании API (не в OkHttp interceptor).
|
||||
* При 401 см. [YandexDiskAuthException] и повторную привязку vault.
|
||||
*/
|
||||
fun createApiForVault(vaultUuid: UUID): YandexDiskApi {
|
||||
val id = vaultUuid.toString()
|
||||
return createAuthenticatedApi {
|
||||
val now = System.currentTimeMillis()
|
||||
val hit = oauthTokenCache[id]
|
||||
if (hit != null && now - hit.first < OAUTH_TOKEN_CACHE_TTL_MS) {
|
||||
return@createAuthenticatedApi hit.second
|
||||
}
|
||||
val token = runBlocking(ioDispatcher) {
|
||||
accountRepository.getByVaultUuid(id)?.oauthToken
|
||||
} ?: throw java.io.IOException("Yandex OAuth token is missing")
|
||||
oauthTokenCache[id] = now to token
|
||||
token
|
||||
oauthTokenCache[id] = System.currentTimeMillis() to token
|
||||
return createAuthenticatedApi {
|
||||
oauthTokenCache[id]?.second ?: token
|
||||
}
|
||||
}
|
||||
|
||||
fun invalidateTokenCache(vaultUuid: UUID) {
|
||||
oauthTokenCache.remove(vaultUuid.toString())
|
||||
}
|
||||
|
||||
companion object {
|
||||
const val BASE_URL = "https://cloud-api.yandex.net/"
|
||||
private const val OAUTH_TOKEN_CACHE_TTL_MS = 120_000L
|
||||
|
||||
fun createRepositoryWithToken(
|
||||
oauthToken: String,
|
||||
ioDispatcher: CoroutineDispatcher,
|
||||
|
||||
@@ -0,0 +1,29 @@
|
||||
package com.github.nullptroma.wallenc.domain.vault.storages.common
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper
|
||||
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournal
|
||||
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournalEntry
|
||||
import java.util.LinkedHashMap
|
||||
|
||||
internal object StorageSyncJournalCodec {
|
||||
private val journalJavaType = ObjectMapper().typeFactory.constructMapType(
|
||||
LinkedHashMap::class.java,
|
||||
String::class.java,
|
||||
StorageSyncJournalEntry::class.java,
|
||||
)
|
||||
|
||||
fun read(mapper: ObjectMapper, bytes: ByteArray): StorageSyncJournal {
|
||||
if (bytes.isEmpty()) {
|
||||
return emptyMap()
|
||||
}
|
||||
return runCatching {
|
||||
@Suppress("UNCHECKED_CAST")
|
||||
mapper.readValue(bytes, journalJavaType) as StorageSyncJournal
|
||||
}.getOrElse {
|
||||
emptyMap<String, StorageSyncJournalEntry>()
|
||||
}
|
||||
}
|
||||
|
||||
fun write(mapper: ObjectMapper, journal: StorageSyncJournal): ByteArray =
|
||||
mapper.writeValueAsBytes(journal)
|
||||
}
|
||||
@@ -1,6 +1,8 @@
|
||||
package com.github.nullptroma.wallenc.domain.vault.storages.encrypt
|
||||
|
||||
import com.fasterxml.jackson.module.kotlin.readValue
|
||||
import com.github.nullptroma.wallenc.domain.errors.WallencException
|
||||
import com.github.nullptroma.wallenc.domain.vault.storages.common.StorageSyncJournalCodec
|
||||
import com.github.nullptroma.wallenc.domain.vault.storages.common.readSystemFileBytesOrEmpty
|
||||
import com.github.nullptroma.wallenc.domain.vault.utils.CloseHandledStreamExtension.Companion.onClosed
|
||||
import com.github.nullptroma.wallenc.domain.vault.utils.CloseHandledStreamExtension.Companion.onClosing
|
||||
@@ -15,7 +17,10 @@ import com.github.nullptroma.wallenc.domain.interfaces.IDirectory
|
||||
import com.github.nullptroma.wallenc.domain.interfaces.IFile
|
||||
import com.github.nullptroma.wallenc.domain.interfaces.IMetaInfo
|
||||
import com.github.nullptroma.wallenc.domain.interfaces.IStorageAccessor
|
||||
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournal
|
||||
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournalEntry
|
||||
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournalMerge
|
||||
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncPaths
|
||||
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncLock
|
||||
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncOperation
|
||||
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncRevision
|
||||
@@ -46,6 +51,7 @@ class EncryptedStorageAccessor(
|
||||
private val scope: CoroutineScope
|
||||
) : IStorageAccessor, DisposableHandle {
|
||||
private val syncActorId = UUID.randomUUID().toString()
|
||||
private val syncLockMutex = Mutex()
|
||||
private val _size = MutableStateFlow<Long?>(null)
|
||||
override val size: StateFlow<Long?> = _size
|
||||
|
||||
@@ -63,7 +69,6 @@ class EncryptedStorageAccessor(
|
||||
private val dataEncryptor = Encryptor(key.toAesKey())
|
||||
private val pathEncryptor: EncryptorWithStaticIv? = if(pathIv != null) EncryptorWithStaticIv(key.toAesKey(), pathIv) else null
|
||||
|
||||
private val syncLockMutex = Mutex()
|
||||
private var systemHiddenFilesIsActual = false
|
||||
|
||||
init {
|
||||
@@ -257,7 +262,8 @@ class EncryptedStorageAccessor(
|
||||
appendSyncEntry(path = path, operation = StorageSyncOperation.DELETE)
|
||||
}
|
||||
|
||||
override suspend fun openWrite(path: String): OutputStream = openWriteInternal(path, recordJournal = true)
|
||||
override suspend fun openWrite(path: String, recordSyncJournal: Boolean): OutputStream =
|
||||
openWriteInternal(path, recordJournal = recordSyncJournal)
|
||||
|
||||
override suspend fun openRead(path: String): InputStream {
|
||||
val stream = source.openRead(encryptPath(path))
|
||||
@@ -266,7 +272,7 @@ class EncryptedStorageAccessor(
|
||||
|
||||
override suspend fun moveToTrash(path: String) {
|
||||
source.moveToTrash(encryptPath(path))
|
||||
appendSyncEntry(path = path, operation = StorageSyncOperation.DELETE)
|
||||
appendSyncEntry(path = path, operation = StorageSyncOperation.TRASH)
|
||||
}
|
||||
|
||||
override fun dispose() {
|
||||
@@ -290,35 +296,22 @@ class EncryptedStorageAccessor(
|
||||
}
|
||||
}
|
||||
|
||||
override suspend fun readSyncJournal(): List<StorageSyncJournalEntry> {
|
||||
override suspend fun readSyncJournal(): StorageSyncJournal {
|
||||
val bytes = readSystemFileBytesOrEmpty { openReadSystemFile(SYNC_JOURNAL_FILENAME) }
|
||||
if (bytes.isEmpty()) {
|
||||
return emptyList()
|
||||
}
|
||||
return runCatching {
|
||||
val journalType = jackson.typeFactory.constructCollectionType(
|
||||
List::class.java,
|
||||
StorageSyncJournalEntry::class.java,
|
||||
)
|
||||
jackson.readValue<List<StorageSyncJournalEntry>>(bytes, journalType)
|
||||
}.getOrElse {
|
||||
emptyList()
|
||||
}
|
||||
return StorageSyncJournalCodec.read(jackson, bytes)
|
||||
}
|
||||
|
||||
override suspend fun appendSyncJournal(entries: List<StorageSyncJournalEntry>) {
|
||||
override suspend fun putSyncJournalEntries(entries: StorageSyncJournal) {
|
||||
if (entries.isEmpty()) {
|
||||
return
|
||||
}
|
||||
val next = readSyncJournal().toMutableList().apply { addAll(entries) }
|
||||
openWriteSystemFile(SYNC_JOURNAL_FILENAME).use { out ->
|
||||
jackson.writeValue(out, next)
|
||||
}
|
||||
val merged = StorageSyncJournalMerge.merge(readSyncJournal(), entries)
|
||||
writeSyncJournal(merged)
|
||||
}
|
||||
|
||||
override suspend fun rewriteSyncJournal(entries: List<StorageSyncJournalEntry>) {
|
||||
private suspend fun writeSyncJournal(journal: StorageSyncJournal) {
|
||||
openWriteSystemFile(SYNC_JOURNAL_FILENAME).use { out ->
|
||||
jackson.writeValue(out, entries)
|
||||
jackson.writeValue(out, journal)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -376,15 +369,15 @@ class EncryptedStorageAccessor(
|
||||
}
|
||||
|
||||
private suspend fun appendSyncEntry(path: String, operation: StorageSyncOperation) {
|
||||
val cleanedPath = if (path.startsWith("/")) path else "/$path"
|
||||
if (cleanedPath.startsWith("/$systemHiddenDirName/")) {
|
||||
val cleanedPath = StorageSyncPaths.normalize(path)
|
||||
if (!StorageSyncPaths.isSyncableUserPath(cleanedPath)) {
|
||||
return
|
||||
}
|
||||
val entries = readSyncJournal()
|
||||
val nextSequence = (entries.maxOfOrNull { it.revision.sequence } ?: 0L) + 1L
|
||||
appendSyncJournal(
|
||||
listOf(
|
||||
StorageSyncJournalEntry(
|
||||
val journal = readSyncJournal()
|
||||
val nextSequence = (journal.values.maxOfOrNull { it.revision.sequence } ?: 0L) + 1L
|
||||
putSyncJournalEntries(
|
||||
mapOf(
|
||||
cleanedPath to StorageSyncJournalEntry(
|
||||
path = cleanedPath,
|
||||
operation = operation,
|
||||
revision = StorageSyncRevision(
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package com.github.nullptroma.wallenc.domain.vault.storages.local
|
||||
|
||||
import com.github.nullptroma.wallenc.domain.errors.WallencException
|
||||
import com.github.nullptroma.wallenc.domain.vault.storages.common.StorageSyncJournalCodec
|
||||
import com.github.nullptroma.wallenc.domain.vault.storages.common.readSystemFileBytesOrEmpty
|
||||
|
||||
import com.fasterxml.jackson.core.JacksonException
|
||||
@@ -15,9 +16,12 @@ import com.github.nullptroma.wallenc.domain.interfaces.IDirectory
|
||||
import com.github.nullptroma.wallenc.domain.interfaces.IFile
|
||||
import com.github.nullptroma.wallenc.domain.interfaces.IMetaInfo
|
||||
import com.github.nullptroma.wallenc.domain.interfaces.IStorageAccessor
|
||||
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournal
|
||||
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournalEntry
|
||||
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournalMerge
|
||||
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncLock
|
||||
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncOperation
|
||||
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncPaths
|
||||
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncRevision
|
||||
import kotlinx.coroutines.CoroutineDispatcher
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
@@ -523,7 +527,7 @@ class LocalStorageAccessor(
|
||||
}
|
||||
}
|
||||
|
||||
override suspend fun openWrite(path: String): OutputStream = withContext(ioDispatcher) {
|
||||
override suspend fun openWrite(path: String, recordSyncJournal: Boolean): OutputStream = withContext(ioDispatcher) {
|
||||
touchFileInternal(path, recordJournal = false)
|
||||
val pair = LocalStorageFilePair.from(_filesystemBasePath, path)
|
||||
?: throw WallencException.Storage.FileNotFound()
|
||||
@@ -531,10 +535,12 @@ class LocalStorageAccessor(
|
||||
CoroutineScope(ioDispatcher).launch {
|
||||
touchFileInternal(path, recordJournal = false)
|
||||
scanSizeAndNumOfFiles()
|
||||
if (recordSyncJournal) {
|
||||
appendSyncEntry(path = path, operation = StorageSyncOperation.UPSERT)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override suspend fun openRead(path: String): InputStream = withContext(ioDispatcher) {
|
||||
val pair = LocalStorageFilePair.from(_filesystemBasePath, path)
|
||||
@@ -547,7 +553,7 @@ class LocalStorageAccessor(
|
||||
?: throw WallencException.Storage.FileNotFound()
|
||||
val newMeta = pair.meta.copy(isDeleted = true)
|
||||
writeMeta(pair.metaFile, newMeta)
|
||||
appendSyncEntry(path = path, operation = StorageSyncOperation.DELETE)
|
||||
appendSyncEntry(path = path, operation = StorageSyncOperation.TRASH)
|
||||
}
|
||||
|
||||
override suspend fun openReadSystemFile(name: String): InputStream = withContext(ioDispatcher) {
|
||||
@@ -572,33 +578,22 @@ class LocalStorageAccessor(
|
||||
return@withContext file.outputStream()
|
||||
}
|
||||
|
||||
override suspend fun readSyncJournal(): List<StorageSyncJournalEntry> = withContext(ioDispatcher) {
|
||||
override suspend fun readSyncJournal(): StorageSyncJournal = withContext(ioDispatcher) {
|
||||
val bytes = readSystemFileBytesOrEmpty { openReadSystemFile(SYNC_JOURNAL_FILENAME) }
|
||||
if (bytes.isEmpty()) {
|
||||
return@withContext emptyList()
|
||||
}
|
||||
return@withContext runCatching {
|
||||
jackson.readValue<List<StorageSyncJournalEntry>>(bytes)
|
||||
}.getOrElse {
|
||||
emptyList()
|
||||
}
|
||||
StorageSyncJournalCodec.read(jackson, bytes)
|
||||
}
|
||||
|
||||
override suspend fun appendSyncJournal(entries: List<StorageSyncJournalEntry>) = withContext(ioDispatcher) {
|
||||
override suspend fun putSyncJournalEntries(entries: StorageSyncJournal) = withContext(ioDispatcher) {
|
||||
if (entries.isEmpty()) {
|
||||
return@withContext
|
||||
}
|
||||
val next = readSyncJournal().toMutableList().apply {
|
||||
addAll(entries)
|
||||
}
|
||||
openWriteSystemFile(SYNC_JOURNAL_FILENAME).use { out ->
|
||||
jackson.writeValue(out, next)
|
||||
}
|
||||
val merged = StorageSyncJournalMerge.merge(readSyncJournal(), entries)
|
||||
writeSyncJournal(merged)
|
||||
}
|
||||
|
||||
override suspend fun rewriteSyncJournal(entries: List<StorageSyncJournalEntry>) = withContext(ioDispatcher) {
|
||||
private suspend fun writeSyncJournal(journal: StorageSyncJournal) {
|
||||
openWriteSystemFile(SYNC_JOURNAL_FILENAME).use { out ->
|
||||
jackson.writeValue(out, entries)
|
||||
jackson.writeValue(out, journal)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -722,10 +717,15 @@ class LocalStorageAccessor(
|
||||
}
|
||||
|
||||
private suspend fun appendSyncEntry(path: String, operation: StorageSyncOperation) {
|
||||
val cleanedPath = if (path.startsWith("/")) path else "/$path"
|
||||
val entries = readSyncJournal()
|
||||
val nextSequence = (entries.maxOfOrNull { it.revision.sequence } ?: 0L) + 1L
|
||||
val entry = StorageSyncJournalEntry(
|
||||
val cleanedPath = StorageSyncPaths.normalize(path)
|
||||
if (!StorageSyncPaths.isSyncableUserPath(cleanedPath)) {
|
||||
return
|
||||
}
|
||||
val journal = readSyncJournal()
|
||||
val nextSequence = (journal.values.maxOfOrNull { it.revision.sequence } ?: 0L) + 1L
|
||||
putSyncJournalEntries(
|
||||
mapOf(
|
||||
cleanedPath to StorageSyncJournalEntry(
|
||||
path = cleanedPath,
|
||||
operation = operation,
|
||||
revision = StorageSyncRevision(
|
||||
@@ -733,8 +733,9 @@ class LocalStorageAccessor(
|
||||
actorId = syncActorId,
|
||||
createdAt = Instant.now(),
|
||||
),
|
||||
),
|
||||
),
|
||||
)
|
||||
appendSyncJournal(listOf(entry))
|
||||
}
|
||||
|
||||
companion object {
|
||||
|
||||
@@ -2,6 +2,7 @@ package com.github.nullptroma.wallenc.domain.vault.storages.yandex
|
||||
|
||||
import com.github.nullptroma.wallenc.domain.errors.WallencException
|
||||
import com.github.nullptroma.wallenc.domain.vault.errors.toVaultWallencException
|
||||
import com.github.nullptroma.wallenc.domain.vault.storages.common.StorageSyncJournalCodec
|
||||
import com.github.nullptroma.wallenc.domain.vault.storages.common.readSystemFileBytesOrEmpty
|
||||
|
||||
import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
|
||||
@@ -17,7 +18,10 @@ import com.github.nullptroma.wallenc.domain.datatypes.DataPage
|
||||
import com.github.nullptroma.wallenc.domain.interfaces.IDirectory
|
||||
import com.github.nullptroma.wallenc.domain.interfaces.IFile
|
||||
import com.github.nullptroma.wallenc.domain.interfaces.IStorageAccessor
|
||||
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournal
|
||||
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournalEntry
|
||||
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournalMerge
|
||||
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncPaths
|
||||
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncLock
|
||||
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncOperation
|
||||
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncRevision
|
||||
@@ -527,39 +531,13 @@ class YandexStorageAccessor(
|
||||
appendSyncEntry(path = path, operation = StorageSyncOperation.DELETE)
|
||||
}
|
||||
|
||||
override suspend fun openWrite(path: String): OutputStream = withContext(ioDispatcher) {
|
||||
override suspend fun openWrite(path: String, recordSyncJournal: Boolean): OutputStream = withContext(ioDispatcher) {
|
||||
touchParentDirs(path)
|
||||
val tmp = File.createTempFile("wallenc-yandex-", ".upload")
|
||||
val fos = FileOutputStream(tmp)
|
||||
fos.onClosed {
|
||||
runBlocking(ioDispatcher) {
|
||||
try {
|
||||
val diskPath = toDiskPath(path)
|
||||
val prior = guard { repo.getOrNull(diskPath) }
|
||||
if (prior?.type == "dir") {
|
||||
throw WallencException.Storage.CannotWriteOverDirectory()
|
||||
}
|
||||
val hadFile = prior?.type == "file"
|
||||
val priorSize = if (prior?.type == "file") prior.size ?: 0L else 0L
|
||||
guard { repo.uploadFile(diskPath, tmp, overwrite = true) }
|
||||
val after = guard { getMetadataAfterWrite(diskPath) }
|
||||
if (after.type != "file") {
|
||||
throw WallencException.Storage.UnexpectedState()
|
||||
}
|
||||
val newSize = after.size ?: 0L
|
||||
_size.value = ((_size.value ?: 0L) + newSize - priorSize).coerceAtLeast(0L)
|
||||
if (!hadFile) {
|
||||
_numberOfFiles.value = (_numberOfFiles.value ?: 0) + 1
|
||||
}
|
||||
persistStatsImmediate()
|
||||
val info = runCatching { after.toCommonFile(path) }.getOrNull()
|
||||
info?.let {
|
||||
_filesUpdates.emit(DataPage(listOf(it), pageLength = 1, pageIndex = 0))
|
||||
}
|
||||
appendSyncEntry(path = path, operation = StorageSyncOperation.UPSERT)
|
||||
} finally {
|
||||
tmp.delete()
|
||||
}
|
||||
runCommitAfterStreamClosed {
|
||||
commitUploadedFile(path, tmp, recordSyncJournal)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -570,7 +548,7 @@ class YandexStorageAccessor(
|
||||
|
||||
override suspend fun moveToTrash(path: String) = withContext(ioDispatcher) {
|
||||
patchCustomProps(path, mapOf(PROP_DELETED to "true"))
|
||||
appendSyncEntry(path = path, operation = StorageSyncOperation.DELETE)
|
||||
appendSyncEntry(path = path, operation = StorageSyncOperation.TRASH)
|
||||
}
|
||||
|
||||
override suspend fun openReadSystemFile(name: String): InputStream = withContext(ioDispatcher) {
|
||||
@@ -589,45 +567,30 @@ class YandexStorageAccessor(
|
||||
val rel = "/$SYSTEM_HIDDEN_DIRNAME/$name"
|
||||
val uploadBuffer = ByteArrayOutputStream()
|
||||
uploadBuffer.onClosed {
|
||||
runBlocking(ioDispatcher) {
|
||||
guard {
|
||||
repo.uploadBytes(toDiskPath(rel), uploadBuffer.toByteArray(), overwrite = true)
|
||||
}
|
||||
val bytes = uploadBuffer.toByteArray()
|
||||
val diskPath = toDiskPath(rel)
|
||||
runCommitAfterStreamClosed {
|
||||
guard { repo.uploadBytes(diskPath, bytes, overwrite = true) }
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override suspend fun readSyncJournal(): List<StorageSyncJournalEntry> = withContext(ioDispatcher) {
|
||||
override suspend fun readSyncJournal(): StorageSyncJournal = withContext(ioDispatcher) {
|
||||
val bytes = readSystemFileBytesOrEmpty { openReadSystemFile(SYNC_JOURNAL_FILENAME) }
|
||||
if (bytes.isEmpty()) {
|
||||
return@withContext emptyList()
|
||||
}
|
||||
return@withContext runCatching {
|
||||
val journalType = statsMapper.typeFactory.constructCollectionType(
|
||||
List::class.java,
|
||||
StorageSyncJournalEntry::class.java,
|
||||
)
|
||||
statsMapper.readValue<List<StorageSyncJournalEntry>>(bytes, journalType)
|
||||
}.getOrElse {
|
||||
emptyList()
|
||||
}
|
||||
StorageSyncJournalCodec.read(statsMapper, bytes)
|
||||
}
|
||||
|
||||
override suspend fun appendSyncJournal(entries: List<StorageSyncJournalEntry>) = withContext(ioDispatcher) {
|
||||
override suspend fun putSyncJournalEntries(entries: StorageSyncJournal) = withContext(ioDispatcher) {
|
||||
if (entries.isEmpty()) {
|
||||
return@withContext
|
||||
}
|
||||
val next = readSyncJournal().toMutableList().apply {
|
||||
addAll(entries)
|
||||
}
|
||||
openWriteSystemFile(SYNC_JOURNAL_FILENAME).use { out ->
|
||||
statsMapper.writeValue(out, next)
|
||||
}
|
||||
val merged = StorageSyncJournalMerge.merge(readSyncJournal(), entries)
|
||||
writeSyncJournal(merged)
|
||||
}
|
||||
|
||||
override suspend fun rewriteSyncJournal(entries: List<StorageSyncJournalEntry>) = withContext(ioDispatcher) {
|
||||
private suspend fun writeSyncJournal(journal: StorageSyncJournal) {
|
||||
openWriteSystemFile(SYNC_JOURNAL_FILENAME).use { out ->
|
||||
statsMapper.writeValue(out, entries)
|
||||
statsMapper.writeValue(out, journal)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -641,8 +604,25 @@ class YandexStorageAccessor(
|
||||
}.getOrNull()
|
||||
}
|
||||
|
||||
/**
|
||||
* Best-effort lock на Диске (read-modify-write без CAS на стороне API).
|
||||
* Межустройственная координация опирается на [StorageSyncEngine] mutex в процессе.
|
||||
*/
|
||||
override suspend fun tryAcquireSyncLock(holderId: String, leaseUntil: Instant): Boolean = withContext(ioDispatcher) {
|
||||
return@withContext syncLockMutex.withLock {
|
||||
repeat(SYNC_LOCK_CAS_RETRIES) { attempt ->
|
||||
val acquired = tryAcquireSyncLockOnce(holderId, leaseUntil)
|
||||
if (acquired) {
|
||||
return@withContext true
|
||||
}
|
||||
if (attempt < SYNC_LOCK_CAS_RETRIES - 1) {
|
||||
delay(SYNC_LOCK_CAS_DELAY_MS)
|
||||
}
|
||||
}
|
||||
return@withContext false
|
||||
}
|
||||
|
||||
private suspend fun tryAcquireSyncLockOnce(holderId: String, leaseUntil: Instant): Boolean {
|
||||
return syncLockMutex.withLock {
|
||||
val current = readSyncLock()
|
||||
val now = Instant.now()
|
||||
val foreignLockActive = current != null &&
|
||||
@@ -684,11 +664,57 @@ class YandexStorageAccessor(
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Выполняется из [OutputStream.close]; ошибки upload пробрасываются вызывающему коду.
|
||||
*/
|
||||
private fun runCommitAfterStreamClosed(block: suspend () -> Unit) {
|
||||
runBlocking(ioDispatcher) {
|
||||
block()
|
||||
}
|
||||
}
|
||||
|
||||
private suspend fun commitUploadedFile(path: String, tmp: File, recordSyncJournal: Boolean) {
|
||||
try {
|
||||
val diskPath = toDiskPath(path)
|
||||
val prior = guard { repo.getOrNull(diskPath) }
|
||||
if (prior?.type == "dir") {
|
||||
throw WallencException.Storage.CannotWriteOverDirectory()
|
||||
}
|
||||
val hadFile = prior?.type == "file"
|
||||
val priorSize = if (prior?.type == "file") prior.size ?: 0L else 0L
|
||||
guard { repo.uploadFile(diskPath, tmp, overwrite = true) }
|
||||
val after = guard { getMetadataAfterWrite(diskPath) }
|
||||
if (after.type != "file") {
|
||||
throw WallencException.Storage.UnexpectedState()
|
||||
}
|
||||
val newSize = after.size ?: 0L
|
||||
_size.value = ((_size.value ?: 0L) + newSize - priorSize).coerceAtLeast(0L)
|
||||
if (!hadFile) {
|
||||
_numberOfFiles.value = (_numberOfFiles.value ?: 0) + 1
|
||||
}
|
||||
persistStatsImmediate()
|
||||
val info = runCatching { after.toCommonFile(path) }.getOrNull()
|
||||
info?.let {
|
||||
_filesUpdates.emit(DataPage(listOf(it), pageLength = 1, pageIndex = 0))
|
||||
}
|
||||
if (recordSyncJournal) {
|
||||
appendSyncEntry(path = path, operation = StorageSyncOperation.UPSERT)
|
||||
}
|
||||
} finally {
|
||||
tmp.delete()
|
||||
}
|
||||
}
|
||||
|
||||
private suspend fun appendSyncEntry(path: String, operation: StorageSyncOperation) {
|
||||
val cleanedPath = if (path.startsWith("/")) path else "/$path"
|
||||
val entries = readSyncJournal()
|
||||
val nextSequence = (entries.maxOfOrNull { it.revision.sequence } ?: 0L) + 1L
|
||||
val entry = StorageSyncJournalEntry(
|
||||
val cleanedPath = StorageSyncPaths.normalize(path)
|
||||
if (!StorageSyncPaths.isSyncableUserPath(cleanedPath)) {
|
||||
return
|
||||
}
|
||||
val journal = readSyncJournal()
|
||||
val nextSequence = (journal.values.maxOfOrNull { it.revision.sequence } ?: 0L) + 1L
|
||||
putSyncJournalEntries(
|
||||
mapOf(
|
||||
cleanedPath to StorageSyncJournalEntry(
|
||||
path = cleanedPath,
|
||||
operation = operation,
|
||||
revision = StorageSyncRevision(
|
||||
@@ -697,8 +723,9 @@ class YandexStorageAccessor(
|
||||
createdAt = Instant.now(),
|
||||
),
|
||||
originStorageUuid = storageUuid,
|
||||
),
|
||||
),
|
||||
)
|
||||
appendSyncJournal(listOf(entry))
|
||||
}
|
||||
|
||||
private suspend fun touchParentDirs(path: String) {
|
||||
@@ -726,6 +753,8 @@ class YandexStorageAccessor(
|
||||
private const val SYNC_JOURNAL_FILENAME = "sync-journal.json"
|
||||
private const val SYNC_LOCK_FILENAME = "sync-lock.json"
|
||||
private const val SYNC_LOCK_STALE_TIMEOUT_SECONDS: Long = 60 * 60
|
||||
private const val SYNC_LOCK_CAS_RETRIES = 3
|
||||
private const val SYNC_LOCK_CAS_DELAY_MS = 80L
|
||||
private const val STATS_DEBOUNCE_MS = 450L
|
||||
private const val DATA_PAGE_LENGTH = 10
|
||||
private const val API_LIST_LIMIT = 1000
|
||||
|
||||
@@ -27,11 +27,19 @@ private class CloseHandledOutputStream(
|
||||
|
||||
override fun close() {
|
||||
onClosing()
|
||||
var streamFailure: Throwable? = null
|
||||
try {
|
||||
stream.close()
|
||||
} finally {
|
||||
onClose()
|
||||
} catch (t: Throwable) {
|
||||
streamFailure = t
|
||||
}
|
||||
try {
|
||||
onClose()
|
||||
} catch (afterCloseFailure: Throwable) {
|
||||
streamFailure?.let { afterCloseFailure.addSuppressed(it) }
|
||||
throw afterCloseFailure
|
||||
}
|
||||
streamFailure?.let { throw it }
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -0,0 +1,59 @@
|
||||
package com.github.nullptroma.wallenc.domain.datatypes
|
||||
|
||||
/** Журнал синхронизации: один актуальный [StorageSyncJournalEntry] на нормализованный путь. */
|
||||
typealias StorageSyncJournal = Map<String, StorageSyncJournalEntry>
|
||||
|
||||
object StorageSyncPaths {
|
||||
fun normalize(path: String): String = if (path.startsWith("/")) path else "/$path"
|
||||
|
||||
/**
|
||||
* Пути, которые участвуют в sync и попадают в журнал при пользовательских операциях.
|
||||
* Служебные каталоги, lock/journal/meta и файлы внутри *-enc-dir исключены.
|
||||
*/
|
||||
fun isSyncableUserPath(path: String): Boolean {
|
||||
val p = normalize(path)
|
||||
if (p == "/" || p.isBlank()) return false
|
||||
if (p == "/wallenc-yandex-system" || p.startsWith("/wallenc-yandex-system/")) return false
|
||||
if (p.contains("-enc-dir/") || p.endsWith("-enc-dir")) return false
|
||||
val name = p.substringAfterLast('/')
|
||||
if (name == "sync-journal.json" || name == "sync-lock.json") return false
|
||||
if (name.endsWith(".enc-meta") || name.endsWith(".storage-info")) return false
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
object StorageSyncJournalMerge {
|
||||
fun merge(into: StorageSyncJournal, entries: Map<String, StorageSyncJournalEntry>): StorageSyncJournal {
|
||||
if (entries.isEmpty()) return into
|
||||
val result = into.toMutableMap()
|
||||
for ((rawPath, entry) in entries) {
|
||||
val path = StorageSyncPaths.normalize(rawPath)
|
||||
if (!StorageSyncPaths.isSyncableUserPath(path)) continue
|
||||
val normalizedEntry = entry.copy(path = path)
|
||||
val current = result[path]
|
||||
if (current == null || compareEntries(normalizedEntry, current) > 0) {
|
||||
result[path] = normalizedEntry
|
||||
}
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
fun merge(into: StorageSyncJournal, entry: StorageSyncJournalEntry): StorageSyncJournal =
|
||||
merge(into, mapOf(entry.path to entry))
|
||||
|
||||
fun mergeAll(journals: Collection<StorageSyncJournal>): StorageSyncJournal {
|
||||
var acc: StorageSyncJournal = emptyMap()
|
||||
for (journal in journals) {
|
||||
acc = merge(acc, journal)
|
||||
}
|
||||
return acc
|
||||
}
|
||||
|
||||
private fun compareEntries(a: StorageSyncJournalEntry, b: StorageSyncJournalEntry): Int {
|
||||
val seqCmp = a.revision.sequence.compareTo(b.revision.sequence)
|
||||
if (seqCmp != 0) return seqCmp
|
||||
val actorCmp = a.revision.actorId.compareTo(b.revision.actorId)
|
||||
if (actorCmp != 0) return actorCmp
|
||||
return a.revision.createdAt.compareTo(b.revision.createdAt)
|
||||
}
|
||||
}
|
||||
@@ -5,6 +5,9 @@ import java.util.UUID
|
||||
|
||||
enum class StorageSyncOperation {
|
||||
UPSERT,
|
||||
/** Soft-delete (корзина): на peer вызывается [IStorageAccessor.moveToTrash]. */
|
||||
TRASH,
|
||||
/** Жёсткое удаление файла с носителя. */
|
||||
DELETE,
|
||||
}
|
||||
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
package com.github.nullptroma.wallenc.domain.interfaces
|
||||
|
||||
import com.github.nullptroma.wallenc.domain.datatypes.DataPage
|
||||
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournalEntry
|
||||
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournal
|
||||
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncLock
|
||||
import kotlinx.coroutines.flow.Flow
|
||||
import kotlinx.coroutines.flow.SharedFlow
|
||||
@@ -40,7 +40,7 @@ interface IStorageAccessor {
|
||||
suspend fun touchFile(path: String)
|
||||
suspend fun touchDir(path: String)
|
||||
suspend fun delete(path: String)
|
||||
suspend fun openWrite(path: String): OutputStream
|
||||
suspend fun openWrite(path: String, recordSyncJournal: Boolean = true): OutputStream
|
||||
suspend fun openRead(path: String): InputStream
|
||||
suspend fun moveToTrash(path: String)
|
||||
|
||||
@@ -52,9 +52,8 @@ interface IStorageAccessor {
|
||||
suspend fun openReadSystemFile(name: String): InputStream
|
||||
suspend fun openWriteSystemFile(name: String): OutputStream
|
||||
|
||||
suspend fun readSyncJournal(): List<StorageSyncJournalEntry>
|
||||
suspend fun appendSyncJournal(entries: List<StorageSyncJournalEntry>)
|
||||
suspend fun rewriteSyncJournal(entries: List<StorageSyncJournalEntry>)
|
||||
suspend fun readSyncJournal(): StorageSyncJournal
|
||||
suspend fun putSyncJournalEntries(entries: StorageSyncJournal)
|
||||
|
||||
suspend fun readSyncLock(): StorageSyncLock?
|
||||
suspend fun tryAcquireSyncLock(holderId: String, leaseUntil: Instant): Boolean
|
||||
|
||||
@@ -20,4 +20,7 @@ interface ITaskOrchestrator {
|
||||
fun cancel(taskId: TaskId): Boolean
|
||||
|
||||
fun cancelAll()
|
||||
|
||||
/** Запись в общий лог пайплайна вне контекста [TaskContext] (например, WorkManager sync). */
|
||||
fun appendPipelineLog(level: TaskLogLevel, key: TaskLogKey)
|
||||
}
|
||||
|
||||
@@ -6,6 +6,7 @@ import java.util.UUID
|
||||
data class PipelineTask(
|
||||
val id: TaskId,
|
||||
val title: String,
|
||||
val enqueuedAtMs: Long,
|
||||
val dispatcher: CoroutineDispatcher,
|
||||
val state: TaskRunState,
|
||||
/** UUID storage, для которого идёт задача (кнопки только этой строки в UI). */
|
||||
|
||||
@@ -0,0 +1,8 @@
|
||||
package com.github.nullptroma.wallenc.domain.tasks
|
||||
|
||||
/** Источник запуска синхронизации хранилищ (для логов пайплайна задач). */
|
||||
enum class StorageSyncTriggerReason {
|
||||
Debounce,
|
||||
SyncTab,
|
||||
Background,
|
||||
}
|
||||
@@ -3,7 +3,10 @@ package com.github.nullptroma.wallenc.domain.tasks
|
||||
import com.github.nullptroma.wallenc.domain.errors.WallencException
|
||||
|
||||
sealed class TaskLogKey {
|
||||
data object SyncStarted : TaskLogKey()
|
||||
data object SyncFinished : TaskLogKey()
|
||||
data class SyncFailed(val error: WallencException) : TaskLogKey()
|
||||
data class SyncStarted(val reason: StorageSyncTriggerReason) : TaskLogKey()
|
||||
data class SyncFinished(val reason: StorageSyncTriggerReason) : TaskLogKey()
|
||||
data class SyncFailed(
|
||||
val error: WallencException,
|
||||
val reason: StorageSyncTriggerReason,
|
||||
) : TaskLogKey()
|
||||
}
|
||||
|
||||
@@ -20,6 +20,7 @@ sealed class TaskProgressLabel {
|
||||
data class SyncGroupProcessingEntries(val groupId: String, val count: Int) : TaskProgressLabel()
|
||||
data class SyncGroupEntryProgress(val groupId: String, val current: Int, val total: Int) : TaskProgressLabel()
|
||||
data class SyncGroupCompleted(val groupId: String) : TaskProgressLabel()
|
||||
data class SyncGroupEntriesFailed(val groupId: String, val failedCount: Int) : TaskProgressLabel()
|
||||
data class SyncGroupRenewingLocks(val groupId: String) : TaskProgressLabel()
|
||||
data class SyncGroupLockRenewalFailed(val groupId: String) : TaskProgressLabel()
|
||||
|
||||
|
||||
@@ -73,6 +73,7 @@ class TaskOrchestrator(
|
||||
val task = PipelineTask(
|
||||
id = id,
|
||||
title = title,
|
||||
enqueuedAtMs = System.currentTimeMillis(),
|
||||
dispatcher = dispatcher,
|
||||
state = TaskRunState.Queued,
|
||||
busyStorageUuid = busyStorageUuid,
|
||||
@@ -104,6 +105,10 @@ class TaskOrchestrator(
|
||||
}
|
||||
}
|
||||
|
||||
override fun appendPipelineLog(level: TaskLogLevel, key: TaskLogKey) {
|
||||
appendLogLine(level, message = "", logKey = key)
|
||||
}
|
||||
|
||||
private fun replaceTask(id: TaskId, fn: (PipelineTask) -> PipelineTask) {
|
||||
synchronized(tasksById) {
|
||||
val cur = tasksById[id] ?: return
|
||||
|
||||
@@ -2,7 +2,6 @@ package com.github.nullptroma.wallenc.ui.elements
|
||||
|
||||
import android.view.ViewGroup
|
||||
import androidx.camera.core.CameraSelector
|
||||
import androidx.camera.core.ExperimentalGetImage
|
||||
import androidx.camera.core.ImageAnalysis
|
||||
import androidx.camera.core.Preview
|
||||
import androidx.camera.lifecycle.ProcessCameraProvider
|
||||
@@ -39,8 +38,7 @@ import com.google.mlkit.vision.common.InputImage
|
||||
import java.util.concurrent.Executors
|
||||
import java.util.concurrent.atomic.AtomicBoolean
|
||||
|
||||
@ExperimentalGetImage
|
||||
@OptIn(ExperimentalMaterial3Api::class, ExperimentalGetImage::class)
|
||||
@OptIn(ExperimentalMaterial3Api::class)
|
||||
@Composable
|
||||
fun QrScannerDialog(
|
||||
onDismiss: () -> Unit,
|
||||
|
||||
@@ -5,14 +5,12 @@ import androidx.compose.foundation.clickable
|
||||
import androidx.compose.foundation.interaction.MutableInteractionSource
|
||||
import androidx.compose.foundation.layout.Box
|
||||
import androidx.compose.foundation.layout.Column
|
||||
import androidx.compose.foundation.layout.IntrinsicSize
|
||||
import androidx.compose.foundation.layout.Row
|
||||
import androidx.compose.foundation.layout.Spacer
|
||||
import androidx.compose.foundation.layout.fillMaxSize
|
||||
import androidx.compose.foundation.layout.fillMaxWidth
|
||||
import androidx.compose.foundation.layout.height
|
||||
import androidx.compose.foundation.layout.offset
|
||||
import androidx.compose.foundation.layout.padding
|
||||
import androidx.compose.foundation.layout.widthIn
|
||||
import androidx.compose.foundation.layout.wrapContentHeight
|
||||
import androidx.compose.material.icons.Icons
|
||||
import androidx.compose.material.icons.filled.Lock
|
||||
import androidx.compose.material.icons.filled.LockOpen
|
||||
@@ -90,17 +88,16 @@ fun StorageTree(
|
||||
Column(modifier) {
|
||||
Box(
|
||||
modifier = Modifier
|
||||
.height(IntrinsicSize.Min)
|
||||
.fillMaxWidth()
|
||||
.wrapContentHeight()
|
||||
.zIndex(100f),
|
||||
) {
|
||||
val interactionSource = remember { MutableInteractionSource() }
|
||||
Box(
|
||||
modifier = Modifier
|
||||
.clip(
|
||||
CardDefaults.shape,
|
||||
)
|
||||
.padding(0.dp, 0.dp, 16.dp, 0.dp)
|
||||
.fillMaxSize()
|
||||
.matchParentSize()
|
||||
.padding(end = 16.dp)
|
||||
.clip(CardDefaults.shape)
|
||||
.background(borderColor)
|
||||
.clickable(
|
||||
interactionSource = interactionSource,
|
||||
@@ -112,8 +109,9 @@ fun StorageTree(
|
||||
Card(
|
||||
interactionSource = interactionSource,
|
||||
modifier = Modifier
|
||||
.padding(8.dp, 0.dp, 0.dp, 0.dp)
|
||||
.fillMaxWidth(),
|
||||
.padding(start = 8.dp)
|
||||
.fillMaxWidth()
|
||||
.wrapContentHeight(),
|
||||
elevation = CardDefaults.cardElevation(
|
||||
defaultElevation = 4.dp,
|
||||
),
|
||||
@@ -124,8 +122,16 @@ fun StorageTree(
|
||||
}
|
||||
},
|
||||
) {
|
||||
Row(modifier = Modifier.height(IntrinsicSize.Min)) {
|
||||
Column(modifier = Modifier.padding(8.dp)) {
|
||||
Row(
|
||||
modifier = Modifier
|
||||
.fillMaxWidth()
|
||||
.wrapContentHeight(),
|
||||
) {
|
||||
Column(
|
||||
modifier = Modifier
|
||||
.weight(1f)
|
||||
.padding(8.dp),
|
||||
) {
|
||||
Text(metaInfo.name ?: stringResource(R.string.no_name))
|
||||
Text(
|
||||
text = stringResource(
|
||||
@@ -170,7 +176,9 @@ fun StorageTree(
|
||||
}
|
||||
}
|
||||
Column(
|
||||
modifier = Modifier,
|
||||
modifier = Modifier
|
||||
.widthIn(min = 112.dp)
|
||||
.padding(end = 4.dp),
|
||||
horizontalAlignment = Alignment.End,
|
||||
) {
|
||||
var expanded by remember { mutableStateOf(false) }
|
||||
@@ -368,7 +376,6 @@ fun StorageTree(
|
||||
)
|
||||
}
|
||||
}
|
||||
Spacer(modifier = Modifier.weight(1f))
|
||||
if (isEncrypted) {
|
||||
IconButton(
|
||||
onClick = { showLockDialog = true },
|
||||
@@ -382,18 +389,14 @@ fun StorageTree(
|
||||
}
|
||||
Text(
|
||||
modifier = Modifier
|
||||
.fillMaxWidth()
|
||||
.padding(0.dp, 0.dp, 12.dp, 0.dp)
|
||||
.align(Alignment.End),
|
||||
.padding(top = 4.dp, end = 8.dp),
|
||||
text = stringResource(getStatusTextRes(tree)),
|
||||
textAlign = TextAlign.End,
|
||||
fontSize = 11.sp,
|
||||
)
|
||||
Text(
|
||||
modifier = Modifier
|
||||
.fillMaxWidth()
|
||||
.padding(0.dp, 0.dp, 12.dp, 8.dp)
|
||||
.align(Alignment.End),
|
||||
.padding(end = 8.dp, bottom = 8.dp),
|
||||
text = cur.uuid.toString(),
|
||||
textAlign = TextAlign.End,
|
||||
fontSize = 8.sp,
|
||||
|
||||
@@ -1,13 +1,31 @@
|
||||
package com.github.nullptroma.wallenc.ui.resources
|
||||
|
||||
import com.github.nullptroma.wallenc.domain.tasks.StorageSyncTriggerReason
|
||||
import com.github.nullptroma.wallenc.domain.tasks.TaskLogKey
|
||||
import com.github.nullptroma.wallenc.ui.R
|
||||
|
||||
fun TaskLogKey.resolve(resolver: UiStringResolver): String = when (this) {
|
||||
TaskLogKey.SyncStarted -> resolver(R.string.task_log_sync_started)
|
||||
TaskLogKey.SyncFinished -> resolver(R.string.task_log_sync_finished)
|
||||
is TaskLogKey.SyncStarted -> resolver(
|
||||
R.string.task_log_sync_started,
|
||||
resolver.resolveSyncTriggerReason(reason),
|
||||
)
|
||||
is TaskLogKey.SyncFinished -> resolver(
|
||||
R.string.task_log_sync_finished,
|
||||
resolver.resolveSyncTriggerReason(reason),
|
||||
)
|
||||
is TaskLogKey.SyncFailed -> {
|
||||
val notification = error.toUserNotification().resolve(resolver)
|
||||
resolver(R.string.task_log_sync_failed, notification)
|
||||
resolver(
|
||||
R.string.task_log_sync_failed,
|
||||
resolver.resolveSyncTriggerReason(reason),
|
||||
notification,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
private fun UiStringResolver.resolveSyncTriggerReason(reason: StorageSyncTriggerReason): String =
|
||||
when (reason) {
|
||||
StorageSyncTriggerReason.Debounce -> this(R.string.task_sync_trigger_debounce)
|
||||
StorageSyncTriggerReason.SyncTab -> this(R.string.task_sync_trigger_sync_tab)
|
||||
StorageSyncTriggerReason.Background -> this(R.string.task_sync_trigger_background)
|
||||
}
|
||||
|
||||
@@ -0,0 +1,13 @@
|
||||
package com.github.nullptroma.wallenc.ui.resources
|
||||
|
||||
import java.time.Instant
|
||||
import java.time.ZoneId
|
||||
import java.time.format.DateTimeFormatter
|
||||
|
||||
private val pipelineTimeFormatter: DateTimeFormatter =
|
||||
DateTimeFormatter.ofPattern("HH:mm:ss")
|
||||
|
||||
fun formatTaskPipelineTime(timestampMs: Long): String =
|
||||
Instant.ofEpochMilli(timestampMs)
|
||||
.atZone(ZoneId.systemDefault())
|
||||
.format(pipelineTimeFormatter)
|
||||
@@ -37,6 +37,8 @@ fun TaskProgressLabel.resolve(resolver: UiStringResolver): String = when (this)
|
||||
resolver(R.string.sync_progress_group_entry, groupId, current, total)
|
||||
is TaskProgressLabel.SyncGroupCompleted ->
|
||||
resolver(R.string.sync_progress_group_completed, groupId)
|
||||
is TaskProgressLabel.SyncGroupEntriesFailed ->
|
||||
resolver.plurals(R.plurals.sync_progress_group_entries_failed, failedCount, groupId, failedCount)
|
||||
is TaskProgressLabel.SyncGroupRenewingLocks ->
|
||||
resolver(R.string.sync_progress_group_renewing_locks, groupId)
|
||||
is TaskProgressLabel.SyncGroupLockRenewalFailed ->
|
||||
|
||||
@@ -8,6 +8,7 @@ import androidx.compose.foundation.layout.WindowInsets
|
||||
import androidx.compose.foundation.layout.fillMaxSize
|
||||
import androidx.compose.foundation.layout.fillMaxWidth
|
||||
import androidx.compose.foundation.layout.padding
|
||||
import androidx.compose.foundation.layout.widthIn
|
||||
import androidx.compose.foundation.lazy.LazyColumn
|
||||
import androidx.compose.foundation.lazy.items
|
||||
import androidx.compose.material3.AlertDialog
|
||||
@@ -36,7 +37,9 @@ import com.github.nullptroma.wallenc.domain.tasks.PipelineTask
|
||||
import com.github.nullptroma.wallenc.domain.tasks.TaskLogLevel
|
||||
import com.github.nullptroma.wallenc.domain.tasks.TaskRunState
|
||||
import com.github.nullptroma.wallenc.ui.R
|
||||
import com.github.nullptroma.wallenc.domain.tasks.TaskLogLine
|
||||
import com.github.nullptroma.wallenc.ui.resources.displayText
|
||||
import com.github.nullptroma.wallenc.ui.resources.formatTaskPipelineTime
|
||||
import com.github.nullptroma.wallenc.ui.resources.resolveText
|
||||
import com.github.nullptroma.wallenc.ui.resources.toUserNotification
|
||||
|
||||
@@ -91,18 +94,7 @@ fun TaskPipelineScreen(
|
||||
verticalArrangement = Arrangement.spacedBy(4.dp),
|
||||
) {
|
||||
items(logs.size) { i ->
|
||||
val line = logs[i]
|
||||
val prefix = when (line.level) {
|
||||
TaskLogLevel.Debug -> "D"
|
||||
TaskLogLevel.Info -> "I"
|
||||
TaskLogLevel.Warn -> "W"
|
||||
TaskLogLevel.Error -> "E"
|
||||
}
|
||||
Text(
|
||||
"[$prefix] ${line.displayText()}",
|
||||
style = MaterialTheme.typography.bodySmall,
|
||||
color = MaterialTheme.colorScheme.onSurfaceVariant,
|
||||
)
|
||||
PipelineLogRow(line = logs[i])
|
||||
}
|
||||
}
|
||||
Button(
|
||||
@@ -178,14 +170,59 @@ fun TaskPipelineScreen(
|
||||
}
|
||||
}
|
||||
|
||||
@Composable
|
||||
private fun PipelineLogRow(line: TaskLogLine) {
|
||||
val prefix = when (line.level) {
|
||||
TaskLogLevel.Debug -> "D"
|
||||
TaskLogLevel.Info -> "I"
|
||||
TaskLogLevel.Warn -> "W"
|
||||
TaskLogLevel.Error -> "E"
|
||||
}
|
||||
Row(
|
||||
modifier = Modifier.fillMaxWidth(),
|
||||
horizontalArrangement = Arrangement.SpaceBetween,
|
||||
verticalAlignment = Alignment.Top,
|
||||
) {
|
||||
Text(
|
||||
text = "[$prefix] ${line.displayText()}",
|
||||
modifier = Modifier
|
||||
.weight(1f)
|
||||
.padding(end = 8.dp),
|
||||
style = MaterialTheme.typography.bodySmall,
|
||||
color = MaterialTheme.colorScheme.onSurfaceVariant,
|
||||
)
|
||||
Text(
|
||||
text = formatTaskPipelineTime(line.timestampMs),
|
||||
modifier = Modifier.widthIn(min = 56.dp),
|
||||
style = MaterialTheme.typography.bodySmall,
|
||||
color = MaterialTheme.colorScheme.onSurfaceVariant,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
@Composable
|
||||
private fun TaskRow(task: PipelineTask, isRunning: Boolean) {
|
||||
Column(Modifier.fillMaxWidth()) {
|
||||
Row(
|
||||
modifier = Modifier.fillMaxWidth(),
|
||||
horizontalArrangement = Arrangement.SpaceBetween,
|
||||
verticalAlignment = Alignment.CenterVertically,
|
||||
) {
|
||||
Text(
|
||||
task.title,
|
||||
text = task.title,
|
||||
modifier = Modifier
|
||||
.weight(1f)
|
||||
.padding(end = 8.dp),
|
||||
style = if (isRunning) MaterialTheme.typography.titleSmall
|
||||
else MaterialTheme.typography.bodyMedium,
|
||||
)
|
||||
Text(
|
||||
text = formatTaskPipelineTime(task.enqueuedAtMs),
|
||||
modifier = Modifier.widthIn(min = 56.dp),
|
||||
style = MaterialTheme.typography.bodySmall,
|
||||
color = MaterialTheme.colorScheme.onSurfaceVariant,
|
||||
)
|
||||
}
|
||||
val runningProgress = (task.state as? TaskRunState.Running)?.progress
|
||||
val progressLabel = runningProgress?.label?.resolveText()
|
||||
val stateLabel = when (val s = task.state) {
|
||||
|
||||
@@ -13,6 +13,7 @@ import com.github.nullptroma.wallenc.ui.resources.resolve
|
||||
import com.github.nullptroma.wallenc.ui.resources.UserNotification
|
||||
import com.github.nullptroma.wallenc.usecases.AddStorageToSyncGroupResult
|
||||
import com.github.nullptroma.wallenc.usecases.ManageStorageSyncGroupsUseCase
|
||||
import com.github.nullptroma.wallenc.domain.tasks.StorageSyncTriggerReason
|
||||
import com.github.nullptroma.wallenc.usecases.RunStorageSyncUseCase
|
||||
import com.github.nullptroma.wallenc.usecases.StorageSyncCompatibilityInput
|
||||
import com.github.nullptroma.wallenc.usecases.isStorageCompatibleWithGroup
|
||||
@@ -209,7 +210,7 @@ class StorageSyncViewModel @Inject constructor(
|
||||
fun runSyncNow() {
|
||||
val started = runStorageSyncUseCase.enqueue(
|
||||
displayTitle = uiStrings(R.string.task_title_storage_sync),
|
||||
logReason = "sync-tab",
|
||||
reason = StorageSyncTriggerReason.SyncTab,
|
||||
)
|
||||
if (!started) {
|
||||
updateState(
|
||||
@@ -383,7 +384,6 @@ class StorageSyncViewModel @Inject constructor(
|
||||
!isStorageCompatibleWithGroup(
|
||||
storage = storage,
|
||||
group = group,
|
||||
resolveStorageKey = vaultsManager.unlockManager::getOpenedStorageKey,
|
||||
)
|
||||
}
|
||||
StorageSyncGroupUi(
|
||||
|
||||
@@ -12,4 +12,10 @@
|
||||
<item quantity="many">Синхронизация: группа «%1$s» — обработка %2$d записей</item>
|
||||
<item quantity="other">Синхронизация: группа «%1$s» — обработка %2$d записей</item>
|
||||
</plurals>
|
||||
<plurals name="sync_progress_group_entries_failed">
|
||||
<item quantity="one">Синхронизация: группа «%1$s» — не применена %2$d запись</item>
|
||||
<item quantity="few">Синхронизация: группа «%1$s» — не применены %2$d записи</item>
|
||||
<item quantity="many">Синхронизация: группа «%1$s» — не применено %2$d записей</item>
|
||||
<item quantity="other">Синхронизация: группа «%1$s» — не применено %2$d записей</item>
|
||||
</plurals>
|
||||
</resources>
|
||||
|
||||
@@ -289,9 +289,12 @@
|
||||
<string name="sync_progress_group_renewing_locks">Синхронизация: группа «%1$s» — продление блокировок</string>
|
||||
<string name="sync_progress_group_lock_renewal_failed">Синхронизация: группа «%1$s» — не удалось продлить блокировку</string>
|
||||
<string name="task_progress_clear_content">%1$d / %2$d</string>
|
||||
<string name="task_log_sync_started">Синхронизация хранилищ запущена</string>
|
||||
<string name="task_log_sync_finished">Синхронизация хранилищ завершена</string>
|
||||
<string name="task_log_sync_failed">Синхронизация не удалась: %1$s</string>
|
||||
<string name="task_log_sync_started">Синхронизация хранилищ запущена (%1$s)</string>
|
||||
<string name="task_log_sync_finished">Синхронизация хранилищ завершена (%1$s)</string>
|
||||
<string name="task_log_sync_failed">Синхронизация не удалась (%1$s): %2$s</string>
|
||||
<string name="task_sync_trigger_debounce">debounce</string>
|
||||
<string name="task_sync_trigger_sync_tab">sync-tab</string>
|
||||
<string name="task_sync_trigger_background">background</string>
|
||||
<string name="task_log_enumerating">Перечисление файлов и папок…</string>
|
||||
<string name="task_log_creating_storage">Создание хранилища…</string>
|
||||
<string name="task_log_storage_created">Хранилище создано</string>
|
||||
|
||||
@@ -8,4 +8,8 @@
|
||||
<item quantity="one">Storage sync: group "%1$s" processing %2$d entry</item>
|
||||
<item quantity="other">Storage sync: group "%1$s" processing %2$d entries</item>
|
||||
</plurals>
|
||||
<plurals name="sync_progress_group_entries_failed">
|
||||
<item quantity="one">Storage sync: group "%1$s" — %2$d entry failed</item>
|
||||
<item quantity="other">Storage sync: group "%1$s" — %2$d entries failed</item>
|
||||
</plurals>
|
||||
</resources>
|
||||
|
||||
@@ -289,9 +289,12 @@
|
||||
<string name="sync_progress_group_renewing_locks">Storage sync: group "%1$s" renewing locks</string>
|
||||
<string name="sync_progress_group_lock_renewal_failed">Storage sync: group "%1$s" lock renewal failed</string>
|
||||
<string name="task_progress_clear_content">%1$d / %2$d</string>
|
||||
<string name="task_log_sync_started">Storage sync started</string>
|
||||
<string name="task_log_sync_finished">Storage sync finished</string>
|
||||
<string name="task_log_sync_failed">Storage sync failed: %1$s</string>
|
||||
<string name="task_log_sync_started">Storage sync started (%1$s)</string>
|
||||
<string name="task_log_sync_finished">Storage sync finished (%1$s)</string>
|
||||
<string name="task_log_sync_failed">Storage sync failed (%1$s): %2$s</string>
|
||||
<string name="task_sync_trigger_debounce">debounce</string>
|
||||
<string name="task_sync_trigger_sync_tab">sync-tab</string>
|
||||
<string name="task_sync_trigger_background">background</string>
|
||||
<string name="task_log_enumerating">Enumerating files and directories…</string>
|
||||
<string name="task_log_creating_storage">Creating storage…</string>
|
||||
<string name="task_log_storage_created">Storage created</string>
|
||||
|
||||
@@ -3,6 +3,7 @@ package com.github.nullptroma.wallenc.usecases
|
||||
import com.github.nullptroma.wallenc.domain.errors.toWallencException
|
||||
import com.github.nullptroma.wallenc.domain.interfaces.IStorageSyncEngine
|
||||
import com.github.nullptroma.wallenc.domain.tasks.ITaskOrchestrator
|
||||
import com.github.nullptroma.wallenc.domain.tasks.StorageSyncTriggerReason
|
||||
import com.github.nullptroma.wallenc.domain.tasks.TaskId
|
||||
import com.github.nullptroma.wallenc.domain.tasks.TaskLogKey
|
||||
import com.github.nullptroma.wallenc.domain.tasks.TaskLogLevel
|
||||
@@ -31,10 +32,10 @@ class RunStorageSyncUseCase @Inject constructor(
|
||||
|
||||
/**
|
||||
* @param displayTitle заголовок задачи в UI (локализованный на стороне вызова)
|
||||
* @param logReason техническая метка для логов (не для UI)
|
||||
* @param reason источник запуска — попадает в лог пайплайна
|
||||
* @return false, если синхронизация уже в очереди или выполняется — новая задача не создана
|
||||
*/
|
||||
fun enqueue(displayTitle: String, logReason: String): Boolean {
|
||||
fun enqueue(displayTitle: String, reason: StorageSyncTriggerReason): Boolean {
|
||||
if (!running.compareAndSet(false, true)) {
|
||||
return false
|
||||
}
|
||||
@@ -45,36 +46,68 @@ class RunStorageSyncUseCase @Inject constructor(
|
||||
dispatcher = Dispatchers.IO,
|
||||
work = { ctx ->
|
||||
try {
|
||||
ctx.log(TaskLogLevel.Info, TaskLogKey.SyncStarted)
|
||||
ctx.reportProgress(null, TaskProgressLabel.SyncStarted)
|
||||
syncEngine.syncAllGroups { fraction, label ->
|
||||
executeSync(
|
||||
reason = reason,
|
||||
reportProgress = { fraction, label ->
|
||||
ctx.reportProgress(fraction, label)
|
||||
}
|
||||
ctx.log(TaskLogLevel.Info, TaskLogKey.SyncFinished)
|
||||
ctx.reportProgress(null, TaskProgressLabel.SyncCompleted)
|
||||
},
|
||||
log = { level, key -> ctx.log(level, key) },
|
||||
)
|
||||
} catch (e: Exception) {
|
||||
val err = e.toWallencException()
|
||||
ctx.log(TaskLogLevel.Error, TaskLogKey.SyncFailed(err))
|
||||
ctx.fail(err)
|
||||
ctx.fail(e.toWallencException())
|
||||
} finally {
|
||||
running.set(false)
|
||||
_syncRunning.value = false
|
||||
_activeSyncTaskId.value = null
|
||||
clearRunningState()
|
||||
}
|
||||
},
|
||||
)
|
||||
_activeSyncTaskId.value = taskId
|
||||
return true
|
||||
} catch (t: Throwable) {
|
||||
running.set(false)
|
||||
_syncRunning.value = false
|
||||
_activeSyncTaskId.value = null
|
||||
clearRunningState()
|
||||
throw t
|
||||
}
|
||||
}
|
||||
|
||||
suspend fun runBlocking() {
|
||||
suspend fun runBlocking(reason: StorageSyncTriggerReason) {
|
||||
if (!running.compareAndSet(false, true)) {
|
||||
return
|
||||
}
|
||||
_syncRunning.value = true
|
||||
try {
|
||||
executeSync(
|
||||
reason = reason,
|
||||
reportProgress = { _, _ -> },
|
||||
log = { level, key -> orchestrator.appendPipelineLog(level, key) },
|
||||
)
|
||||
} finally {
|
||||
clearRunningState()
|
||||
}
|
||||
}
|
||||
|
||||
private suspend fun executeSync(
|
||||
reason: StorageSyncTriggerReason,
|
||||
reportProgress: suspend (fraction: Float?, label: TaskProgressLabel?) -> Unit,
|
||||
log: (TaskLogLevel, TaskLogKey) -> Unit,
|
||||
) {
|
||||
syncReadiness.awaitReady()
|
||||
syncEngine.syncAllGroups()
|
||||
log(TaskLogLevel.Info, TaskLogKey.SyncStarted(reason))
|
||||
reportProgress(null, TaskProgressLabel.SyncStarted)
|
||||
try {
|
||||
syncEngine.syncAllGroups { fraction, label ->
|
||||
reportProgress(fraction, label)
|
||||
}
|
||||
log(TaskLogLevel.Info, TaskLogKey.SyncFinished(reason))
|
||||
reportProgress(null, TaskProgressLabel.SyncCompleted)
|
||||
} catch (e: Exception) {
|
||||
val err = e.toWallencException()
|
||||
log(TaskLogLevel.Error, TaskLogKey.SyncFailed(err, reason))
|
||||
throw e
|
||||
}
|
||||
}
|
||||
|
||||
private fun clearRunningState() {
|
||||
running.set(false)
|
||||
_syncRunning.value = false
|
||||
_activeSyncTaskId.value = null
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,17 +1,14 @@
|
||||
package com.github.nullptroma.wallenc.usecases
|
||||
|
||||
import com.github.nullptroma.wallenc.domain.datatypes.EncryptKey
|
||||
import com.github.nullptroma.wallenc.domain.interfaces.IStorage
|
||||
import com.github.nullptroma.wallenc.domain.interfaces.StorageSyncGroup
|
||||
import com.github.nullptroma.wallenc.domain.interfaces.StorageSyncGroupEncryptionKind
|
||||
import java.util.UUID
|
||||
|
||||
/** Совместим, если у storage нет активного шифрования ([encInfo] == null). */
|
||||
fun isStorageCompatibleWithGroup(
|
||||
storage: IStorage,
|
||||
group: StorageSyncGroup,
|
||||
resolveStorageKey: (UUID) -> EncryptKey?,
|
||||
): Boolean {
|
||||
// Режим упрощён: в sync-группах допускаются только незашифрованные storage.
|
||||
if (storage.metaInfo.value.encInfo != null) {
|
||||
return false
|
||||
}
|
||||
|
||||
@@ -1,7 +1,10 @@
|
||||
package com.github.nullptroma.wallenc.usecases
|
||||
|
||||
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournal
|
||||
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournalEntry
|
||||
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournalMerge
|
||||
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncOperation
|
||||
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncPaths
|
||||
import com.github.nullptroma.wallenc.domain.interfaces.IStorage
|
||||
import com.github.nullptroma.wallenc.domain.interfaces.IStorageAccessor
|
||||
import com.github.nullptroma.wallenc.domain.interfaces.IStorageSyncEngine
|
||||
@@ -19,6 +22,11 @@ import java.util.concurrent.atomic.AtomicLong
|
||||
import javax.inject.Inject
|
||||
import javax.inject.Singleton
|
||||
|
||||
/**
|
||||
* Синхронизация по журналам storage в группе.
|
||||
* Блокировка на Yandex Disk — best-effort (см. [IStorageAccessor.tryAcquireSyncLock]);
|
||||
* сериализация внутри процесса — [groupMutexes].
|
||||
*/
|
||||
@Singleton
|
||||
class StorageSyncEngine @Inject constructor(
|
||||
private val vaultsManager: IVaultsManager,
|
||||
@@ -81,7 +89,6 @@ class StorageSyncEngine @Inject constructor(
|
||||
isStorageCompatibleWithGroup(
|
||||
storage = storage,
|
||||
group = group,
|
||||
resolveStorageKey = vaultsManager.unlockManager::getOpenedStorageKey,
|
||||
)
|
||||
}
|
||||
if (incompatible.isNotEmpty()) {
|
||||
@@ -109,8 +116,8 @@ class StorageSyncEngine @Inject constructor(
|
||||
lockedAccessors.add(storage.accessor)
|
||||
}
|
||||
|
||||
val latestByPath = mutableMapOf<String, StorageSyncJournalEntry>()
|
||||
val entriesByStorage = mutableMapOf<UUID, Map<String, StorageSyncJournalEntry>>()
|
||||
val mergedByPath = mutableMapOf<String, StorageSyncJournalEntry>()
|
||||
val entriesByStorage = mutableMapOf<UUID, StorageSyncJournal>()
|
||||
|
||||
reportProgress(null, TaskProgressLabel.SyncGroupReadingJournals(groupId))
|
||||
for ((journalIndex, storage) in storages.withIndex()) {
|
||||
@@ -128,17 +135,14 @@ class StorageSyncEngine @Inject constructor(
|
||||
null,
|
||||
TaskProgressLabel.SyncGroupJournalProgress(groupId, journalIndex + 1, storages.size),
|
||||
)
|
||||
val latestEntries = latestByPath(storage.accessor.readSyncJournal())
|
||||
entriesByStorage[storage.uuid] = latestEntries
|
||||
for ((path, entry) in latestEntries) {
|
||||
val current = latestByPath[path]
|
||||
if (current == null || compareEntries(entry, current) > 0) {
|
||||
latestByPath[path] = entry
|
||||
}
|
||||
}
|
||||
val journal = filterSyncableJournal(storage.accessor.readSyncJournal())
|
||||
entriesByStorage[storage.uuid] = journal
|
||||
mergedByPath.putAll(
|
||||
StorageSyncJournalMerge.merge(mergedByPath, journal),
|
||||
)
|
||||
}
|
||||
|
||||
val mergedEntries = latestByPath.entries.toList()
|
||||
val mergedEntries = mergedByPath.entries.toList()
|
||||
if (mergedEntries.isEmpty()) {
|
||||
reportProgress(null, TaskProgressLabel.SyncGroupNoJournalEntries(groupId))
|
||||
return
|
||||
@@ -148,6 +152,7 @@ class StorageSyncEngine @Inject constructor(
|
||||
null,
|
||||
TaskProgressLabel.SyncGroupProcessingEntries(groupId, mergedEntries.size),
|
||||
)
|
||||
var applyFailures = 0
|
||||
for ((pathIndex, merged) in mergedEntries.withIndex()) {
|
||||
leaseUntil = renewLocksIfNeeded(
|
||||
groupId = groupId,
|
||||
@@ -166,7 +171,9 @@ class StorageSyncEngine @Inject constructor(
|
||||
return
|
||||
}
|
||||
val sourceStorage = findSourceStorage(storages, entriesByStorage, path, winnerEntry)
|
||||
if (sourceStorage == null && winnerEntry.operation == StorageSyncOperation.UPSERT) {
|
||||
if (sourceStorage == null &&
|
||||
winnerEntry.operation == StorageSyncOperation.UPSERT
|
||||
) {
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -184,10 +191,18 @@ class StorageSyncEngine @Inject constructor(
|
||||
entry = winnerEntry,
|
||||
)
|
||||
if (applied) {
|
||||
target.accessor.appendSyncJournal(listOf(winnerEntry))
|
||||
target.accessor.putSyncJournalEntries(mapOf(path to winnerEntry))
|
||||
} else {
|
||||
applyFailures++
|
||||
}
|
||||
}
|
||||
}
|
||||
if (applyFailures > 0) {
|
||||
reportProgress(
|
||||
null,
|
||||
TaskProgressLabel.SyncGroupEntriesFailed(groupId, applyFailures),
|
||||
)
|
||||
}
|
||||
reportProgress(null, TaskProgressLabel.SyncGroupCompleted(groupId))
|
||||
} finally {
|
||||
for (accessor in lockedAccessors) {
|
||||
@@ -199,6 +214,10 @@ class StorageSyncEngine @Inject constructor(
|
||||
}
|
||||
}
|
||||
|
||||
private fun filterSyncableJournal(journal: StorageSyncJournal): StorageSyncJournal {
|
||||
return journal.filterKeys { StorageSyncPaths.isSyncableUserPath(it) }
|
||||
}
|
||||
|
||||
private suspend fun renewLocksIfNeeded(
|
||||
groupId: String,
|
||||
lockedAccessors: List<IStorageAccessor>,
|
||||
@@ -227,25 +246,18 @@ class StorageSyncEngine @Inject constructor(
|
||||
return uuids.mapNotNull(findStorageUseCase::find)
|
||||
}
|
||||
|
||||
private fun latestByPath(entries: List<StorageSyncJournalEntry>): Map<String, StorageSyncJournalEntry> {
|
||||
val map = mutableMapOf<String, StorageSyncJournalEntry>()
|
||||
for (entry in entries) {
|
||||
val current = map[entry.path]
|
||||
if (current == null || compareEntries(entry, current) > 0) {
|
||||
map[entry.path] = entry
|
||||
}
|
||||
}
|
||||
return map
|
||||
}
|
||||
|
||||
private fun findSourceStorage(
|
||||
storages: List<IStorage>,
|
||||
entriesByStorage: Map<UUID, Map<String, StorageSyncJournalEntry>>,
|
||||
entriesByStorage: Map<UUID, StorageSyncJournal>,
|
||||
path: String,
|
||||
winnerEntry: StorageSyncJournalEntry,
|
||||
): IStorage? {
|
||||
if (winnerEntry.operation == StorageSyncOperation.DELETE) {
|
||||
return storages.firstOrNull()
|
||||
if (winnerEntry.operation == StorageSyncOperation.DELETE ||
|
||||
winnerEntry.operation == StorageSyncOperation.TRASH
|
||||
) {
|
||||
return storages.firstOrNull { storage ->
|
||||
entriesByStorage[storage.uuid]?.get(path) != null
|
||||
} ?: storages.firstOrNull()
|
||||
}
|
||||
return storages.firstOrNull { storage ->
|
||||
val entry = entriesByStorage[storage.uuid]?.get(path) ?: return@firstOrNull false
|
||||
@@ -258,25 +270,38 @@ class StorageSyncEngine @Inject constructor(
|
||||
target: IStorage,
|
||||
entry: StorageSyncJournalEntry,
|
||||
): Boolean {
|
||||
when (entry.operation) {
|
||||
val result = when (entry.operation) {
|
||||
StorageSyncOperation.DELETE -> {
|
||||
return runCatching {
|
||||
runCatching {
|
||||
target.accessor.delete(entry.path)
|
||||
}.isSuccess
|
||||
}
|
||||
}
|
||||
|
||||
StorageSyncOperation.TRASH -> {
|
||||
runCatching {
|
||||
target.accessor.moveToTrash(entry.path)
|
||||
}
|
||||
}
|
||||
|
||||
StorageSyncOperation.UPSERT -> {
|
||||
val sourceAccessor = source?.accessor ?: return false
|
||||
return runCatching {
|
||||
runCatching {
|
||||
sourceAccessor.openRead(entry.path).use { input ->
|
||||
target.accessor.openWrite(entry.path).use { output ->
|
||||
target.accessor.openWrite(entry.path, recordSyncJournal = false).use { output ->
|
||||
input.copyTo(output)
|
||||
}
|
||||
}
|
||||
}.isSuccess
|
||||
}
|
||||
}
|
||||
}
|
||||
result.exceptionOrNull()?.let { error ->
|
||||
System.err.println(
|
||||
"StorageSyncEngine: apply ${entry.operation} ${entry.path} " +
|
||||
"target=${target.uuid}: ${error.message}",
|
||||
)
|
||||
}
|
||||
return result.isSuccess
|
||||
}
|
||||
|
||||
private fun compareEntries(a: StorageSyncJournalEntry, b: StorageSyncJournalEntry): Int {
|
||||
val seqCmp = a.revision.sequence.compareTo(b.revision.sequence)
|
||||
|
||||
@@ -0,0 +1,41 @@
|
||||
package com.github.nullptroma.wallenc.usecases
|
||||
|
||||
import com.github.nullptroma.wallenc.domain.datatypes.StorageEncryptionInfo
|
||||
import com.github.nullptroma.wallenc.domain.interfaces.StorageSyncGroup
|
||||
import com.github.nullptroma.wallenc.domain.interfaces.StorageSyncGroupEncryptionKind
|
||||
import com.github.nullptroma.wallenc.usecases.fakes.FakeMetaInfo
|
||||
import com.github.nullptroma.wallenc.usecases.fakes.FakeStorage
|
||||
import org.junit.Assert.assertFalse
|
||||
import org.junit.Assert.assertTrue
|
||||
import org.junit.Test
|
||||
import java.util.UUID
|
||||
|
||||
class StorageSyncEncryptionCompatTest {
|
||||
|
||||
@Test
|
||||
fun storageWithoutEncInfoIsCompatible() {
|
||||
val storage = FakeStorage(uuid = UUID.randomUUID(), meta = FakeMetaInfo(encInfo = null))
|
||||
val group = StorageSyncGroup(
|
||||
id = "g1",
|
||||
storageUuids = setOf(storage.uuid),
|
||||
encryptionKind = StorageSyncGroupEncryptionKind.NONE,
|
||||
)
|
||||
assertTrue(isStorageCompatibleWithGroup(storage = storage, group = group))
|
||||
}
|
||||
|
||||
@Test
|
||||
fun storageWithEncInfoIsIncompatible() {
|
||||
val storage = FakeStorage(
|
||||
uuid = UUID.randomUUID(),
|
||||
meta = FakeMetaInfo(
|
||||
encInfo = StorageEncryptionInfo(encryptedTestData = "x", pathIv = ByteArray(16)),
|
||||
),
|
||||
)
|
||||
val group = StorageSyncGroup(
|
||||
id = "g1",
|
||||
storageUuids = setOf(storage.uuid),
|
||||
encryptionKind = StorageSyncGroupEncryptionKind.NONE,
|
||||
)
|
||||
assertFalse(isStorageCompatibleWithGroup(storage = storage, group = group))
|
||||
}
|
||||
}
|
||||
@@ -7,6 +7,7 @@ import com.github.nullptroma.wallenc.domain.interfaces.StorageSyncGroup
|
||||
import com.github.nullptroma.wallenc.domain.interfaces.StorageSyncGroupEncryptionKind
|
||||
import com.github.nullptroma.wallenc.domain.tasks.TaskProgressLabel
|
||||
import com.github.nullptroma.wallenc.usecases.fakes.FakeStorage
|
||||
import com.github.nullptroma.wallenc.usecases.fakes.FakeStorageAccessor
|
||||
import com.github.nullptroma.wallenc.usecases.fakes.FakeStorageSyncGroupStore
|
||||
import com.github.nullptroma.wallenc.usecases.fakes.FakeVaultsManager
|
||||
import kotlinx.coroutines.runBlocking
|
||||
@@ -112,6 +113,41 @@ class StorageSyncEngineTest {
|
||||
assertNull(target.fileBytes(path))
|
||||
}
|
||||
|
||||
@Test
|
||||
fun syncGroupTrashSoftDeletesOnTarget() = runBlocking {
|
||||
val source = FakeStorage()
|
||||
val target = FakeStorage()
|
||||
val path = "trashed/doc.txt"
|
||||
val payload = "keep-in-trash".encodeToByteArray()
|
||||
source.putFile(path, payload)
|
||||
target.putFile(path, payload)
|
||||
|
||||
val entry = StorageSyncJournalEntry(
|
||||
path = path,
|
||||
operation = StorageSyncOperation.TRASH,
|
||||
revision = StorageSyncRevision(
|
||||
sequence = 3L,
|
||||
actorId = "actor-trash",
|
||||
createdAt = Instant.parse("2024-07-01T00:00:00Z"),
|
||||
),
|
||||
)
|
||||
source.addSyncJournalEntry(entry)
|
||||
|
||||
val group = StorageSyncGroup(
|
||||
id = "trash-group",
|
||||
storageUuids = setOf(source.uuid, target.uuid),
|
||||
encryptionKind = StorageSyncGroupEncryptionKind.NONE,
|
||||
)
|
||||
val engine = createEngine(
|
||||
storages = listOf(source, target),
|
||||
groups = listOf(group),
|
||||
)
|
||||
engine.syncGroup(group.id) { _, _ -> }
|
||||
|
||||
assertArrayEquals(payload, target.fileBytes(path))
|
||||
assertTrue(path in (target.accessor as FakeStorageAccessor).trashedPaths)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun syncGroupStopsWhenLockCannotBeAcquired() = runBlocking {
|
||||
val first = FakeStorage()
|
||||
|
||||
@@ -0,0 +1,45 @@
|
||||
package com.github.nullptroma.wallenc.usecases
|
||||
|
||||
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournalEntry
|
||||
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournalMerge
|
||||
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncOperation
|
||||
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncPaths
|
||||
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncRevision
|
||||
import org.junit.Assert.assertEquals
|
||||
import org.junit.Assert.assertFalse
|
||||
import org.junit.Assert.assertTrue
|
||||
import org.junit.Test
|
||||
import java.time.Instant
|
||||
|
||||
class StorageSyncJournalMergeTest {
|
||||
|
||||
@Test
|
||||
fun mergeKeepsSingleEntryPerPath() {
|
||||
val older = entry(path = "/a.txt", sequence = 1L)
|
||||
val newer = entry(path = "/a.txt", sequence = 2L)
|
||||
val merged = StorageSyncJournalMerge.merge(
|
||||
mapOf("/a.txt" to older),
|
||||
mapOf("/a.txt" to newer),
|
||||
)
|
||||
assertEquals(1, merged.size)
|
||||
assertEquals(2L, merged["/a.txt"]?.revision?.sequence)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun isSyncableUserPathExcludesEncDirAndJournal() {
|
||||
assertFalse(StorageSyncPaths.isSyncableUserPath("/88a048ed-enc-dir/sync-journal.json"))
|
||||
assertFalse(StorageSyncPaths.isSyncableUserPath("/wallenc-yandex-system/sync-journal.json"))
|
||||
assertTrue(StorageSyncPaths.isSyncableUserPath("/wallenc-data/text-secrets.json"))
|
||||
}
|
||||
|
||||
private fun entry(path: String, sequence: Long): StorageSyncJournalEntry =
|
||||
StorageSyncJournalEntry(
|
||||
path = path,
|
||||
operation = StorageSyncOperation.UPSERT,
|
||||
revision = StorageSyncRevision(
|
||||
sequence = sequence,
|
||||
actorId = "actor",
|
||||
createdAt = Instant.EPOCH,
|
||||
),
|
||||
)
|
||||
}
|
||||
@@ -3,6 +3,7 @@ package com.github.nullptroma.wallenc.usecases.fakes
|
||||
import com.github.nullptroma.wallenc.domain.datatypes.StorageEncryptionInfo
|
||||
import com.github.nullptroma.wallenc.domain.datatypes.StorageMetaLoadState
|
||||
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournalEntry
|
||||
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournalMerge
|
||||
import com.github.nullptroma.wallenc.domain.interfaces.IStorage
|
||||
import com.github.nullptroma.wallenc.domain.interfaces.IStorageAccessor
|
||||
import com.github.nullptroma.wallenc.domain.interfaces.IStorageMetaInfo
|
||||
@@ -46,10 +47,13 @@ class FakeStorage(
|
||||
fun fileBytes(path: String): ByteArray? = accessorImpl.dataFiles[path]
|
||||
|
||||
fun addSyncJournalEntry(entry: StorageSyncJournalEntry) {
|
||||
accessorImpl.syncJournal.add(entry)
|
||||
accessorImpl.syncJournal = StorageSyncJournalMerge.merge(
|
||||
accessorImpl.syncJournal,
|
||||
mapOf(entry.path to entry),
|
||||
)
|
||||
}
|
||||
|
||||
fun syncJournalEntries(): List<StorageSyncJournalEntry> = accessorImpl.syncJournal.toList()
|
||||
fun syncJournalEntries(): List<StorageSyncJournalEntry> = accessorImpl.syncJournal.values.toList()
|
||||
|
||||
fun setAcquireLockResult(result: Boolean) {
|
||||
accessorImpl.acquireLockResult = result
|
||||
|
||||
@@ -1,7 +1,9 @@
|
||||
package com.github.nullptroma.wallenc.usecases.fakes
|
||||
|
||||
import com.github.nullptroma.wallenc.domain.datatypes.DataPage
|
||||
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournal
|
||||
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournalEntry
|
||||
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournalMerge
|
||||
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncLock
|
||||
import com.github.nullptroma.wallenc.domain.interfaces.IDirectory
|
||||
import com.github.nullptroma.wallenc.domain.interfaces.IFile
|
||||
@@ -21,10 +23,11 @@ import java.time.Instant
|
||||
|
||||
class FakeStorageAccessor : IStorageAccessor {
|
||||
val dataFiles: MutableMap<String, ByteArray> = mutableMapOf()
|
||||
val trashedPaths: MutableSet<String> = mutableSetOf()
|
||||
private val systemFiles: MutableMap<String, ByteArray> = mutableMapOf()
|
||||
private val _filesUpdates = MutableSharedFlow<DataPage<IFile>>(extraBufferCapacity = 16)
|
||||
|
||||
var syncJournal: MutableList<StorageSyncJournalEntry> = mutableListOf()
|
||||
var syncJournal: StorageSyncJournal = emptyMap()
|
||||
var syncLock: StorageSyncLock? = null
|
||||
var acquireLockResult: Boolean = true
|
||||
|
||||
@@ -64,7 +67,7 @@ class FakeStorageAccessor : IStorageAccessor {
|
||||
dataFiles.remove(path)
|
||||
}
|
||||
|
||||
override suspend fun openWrite(path: String): OutputStream {
|
||||
override suspend fun openWrite(path: String, recordSyncJournal: Boolean): OutputStream {
|
||||
return object : ByteArrayOutputStream() {
|
||||
override fun close() {
|
||||
dataFiles[path] = toByteArray()
|
||||
@@ -84,7 +87,11 @@ class FakeStorageAccessor : IStorageAccessor {
|
||||
return ByteArrayInputStream(bytes)
|
||||
}
|
||||
|
||||
override suspend fun moveToTrash(path: String) = Unit
|
||||
override suspend fun moveToTrash(path: String) {
|
||||
if (path in dataFiles) {
|
||||
trashedPaths.add(path)
|
||||
}
|
||||
}
|
||||
|
||||
override suspend fun openReadSystemFile(name: String): InputStream {
|
||||
val bytes = systemFiles[name] ?: ByteArray(0)
|
||||
@@ -99,15 +106,10 @@ class FakeStorageAccessor : IStorageAccessor {
|
||||
}
|
||||
}
|
||||
|
||||
override suspend fun readSyncJournal(): List<StorageSyncJournalEntry> = syncJournal.toList()
|
||||
override suspend fun readSyncJournal(): StorageSyncJournal = syncJournal
|
||||
|
||||
override suspend fun appendSyncJournal(entries: List<StorageSyncJournalEntry>) {
|
||||
syncJournal.addAll(entries)
|
||||
}
|
||||
|
||||
override suspend fun rewriteSyncJournal(entries: List<StorageSyncJournalEntry>) {
|
||||
syncJournal.clear()
|
||||
syncJournal.addAll(entries)
|
||||
override suspend fun putSyncJournalEntries(entries: StorageSyncJournal) {
|
||||
syncJournal = StorageSyncJournalMerge.merge(syncJournal, entries)
|
||||
}
|
||||
|
||||
override suspend fun readSyncLock(): StorageSyncLock? = syncLock
|
||||
|
||||
Reference in New Issue
Block a user