Compare commits
7 Commits
233a716e47
...
96e9de49c3
| Author | SHA1 | Date | |
|---|---|---|---|
| 96e9de49c3 | |||
| 6ab402da51 | |||
| 2618df41e3 | |||
| bc2b354820 | |||
| b00eed901b | |||
| 35ba6dd377 | |||
| 07d54b5996 |
@@ -1,6 +1,8 @@
|
|||||||
package com.github.nullptroma.wallenc.app.di.modules.domain
|
package com.github.nullptroma.wallenc.app.di.modules.domain
|
||||||
|
|
||||||
|
import com.github.nullptroma.wallenc.app.sync.StorageSyncTaskTitleFormatter
|
||||||
import com.github.nullptroma.wallenc.domain.interfaces.IStorageSyncEngine
|
import com.github.nullptroma.wallenc.domain.interfaces.IStorageSyncEngine
|
||||||
|
import com.github.nullptroma.wallenc.domain.tasks.IStorageSyncTaskTitleFormatter
|
||||||
import com.github.nullptroma.wallenc.usecases.StorageSyncEngine
|
import com.github.nullptroma.wallenc.usecases.StorageSyncEngine
|
||||||
import dagger.Binds
|
import dagger.Binds
|
||||||
import dagger.Module
|
import dagger.Module
|
||||||
@@ -15,4 +17,10 @@ abstract class UseCasesModule {
|
|||||||
@Binds
|
@Binds
|
||||||
@Singleton
|
@Singleton
|
||||||
abstract fun bindStorageSyncEngine(impl: StorageSyncEngine): IStorageSyncEngine
|
abstract fun bindStorageSyncEngine(impl: StorageSyncEngine): IStorageSyncEngine
|
||||||
|
|
||||||
|
@Binds
|
||||||
|
@Singleton
|
||||||
|
abstract fun bindStorageSyncTaskTitleFormatter(
|
||||||
|
impl: StorageSyncTaskTitleFormatter,
|
||||||
|
): IStorageSyncTaskTitleFormatter
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -2,8 +2,6 @@ package com.github.nullptroma.wallenc.app.sync
|
|||||||
|
|
||||||
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncPaths
|
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncPaths
|
||||||
import com.github.nullptroma.wallenc.domain.interfaces.IVaultsManager
|
import com.github.nullptroma.wallenc.domain.interfaces.IVaultsManager
|
||||||
import com.github.nullptroma.wallenc.ui.R
|
|
||||||
import com.github.nullptroma.wallenc.ui.resources.UiStringResolver
|
|
||||||
import com.github.nullptroma.wallenc.domain.tasks.StorageSyncTriggerReason
|
import com.github.nullptroma.wallenc.domain.tasks.StorageSyncTriggerReason
|
||||||
import com.github.nullptroma.wallenc.usecases.RunStorageSyncUseCase
|
import com.github.nullptroma.wallenc.usecases.RunStorageSyncUseCase
|
||||||
import kotlinx.coroutines.CoroutineScope
|
import kotlinx.coroutines.CoroutineScope
|
||||||
@@ -11,6 +9,7 @@ import kotlinx.coroutines.Dispatchers
|
|||||||
import kotlinx.coroutines.FlowPreview
|
import kotlinx.coroutines.FlowPreview
|
||||||
import kotlinx.coroutines.SupervisorJob
|
import kotlinx.coroutines.SupervisorJob
|
||||||
import kotlinx.coroutines.flow.collectLatest
|
import kotlinx.coroutines.flow.collectLatest
|
||||||
|
import kotlinx.coroutines.flow.combine
|
||||||
import kotlinx.coroutines.flow.debounce
|
import kotlinx.coroutines.flow.debounce
|
||||||
import kotlinx.coroutines.flow.filter
|
import kotlinx.coroutines.flow.filter
|
||||||
import kotlinx.coroutines.flow.map
|
import kotlinx.coroutines.flow.map
|
||||||
@@ -25,14 +24,18 @@ class StorageSyncBootstrap @Inject constructor(
|
|||||||
private val scheduler: StorageSyncScheduler,
|
private val scheduler: StorageSyncScheduler,
|
||||||
private val vaultsManager: IVaultsManager,
|
private val vaultsManager: IVaultsManager,
|
||||||
private val syncRunner: RunStorageSyncUseCase,
|
private val syncRunner: RunStorageSyncUseCase,
|
||||||
private val uiStrings: UiStringResolver,
|
|
||||||
) {
|
) {
|
||||||
private val scope = CoroutineScope(SupervisorJob() + Dispatchers.IO)
|
private val scope = CoroutineScope(SupervisorJob() + Dispatchers.IO)
|
||||||
|
|
||||||
fun start() {
|
fun start() {
|
||||||
scheduler.ensureScheduled()
|
scheduler.ensureScheduled()
|
||||||
scope.launch {
|
scope.launch {
|
||||||
vaultsManager.allStorages.collectLatest { storages ->
|
combine(
|
||||||
|
vaultsManager.allStorages,
|
||||||
|
vaultsManager.unlockManager.openedStorages,
|
||||||
|
) { rootStorages, opened ->
|
||||||
|
(rootStorages + opened.values).distinctBy { it.uuid }
|
||||||
|
}.collectLatest { storages ->
|
||||||
if (storages.isEmpty()) {
|
if (storages.isEmpty()) {
|
||||||
return@collectLatest
|
return@collectLatest
|
||||||
}
|
}
|
||||||
@@ -55,21 +58,23 @@ class StorageSyncBootstrap @Inject constructor(
|
|||||||
)
|
)
|
||||||
}
|
}
|
||||||
merge(*triggers.toTypedArray())
|
merge(*triggers.toTypedArray())
|
||||||
.debounce(DEBOUNCE_AFTER_CHANGE_MS)
|
.filter { shouldScheduleDebounceSync() }
|
||||||
|
.debounce(RunStorageSyncUseCase.DEBOUNCE_AFTER_CHANGE_MS)
|
||||||
.collect {
|
.collect {
|
||||||
if (syncRunner.syncRunning.value) {
|
syncRunner.enqueue(StorageSyncTriggerReason.Debounce)
|
||||||
return@collect
|
|
||||||
}
|
|
||||||
syncRunner.enqueue(
|
|
||||||
displayTitle = uiStrings(R.string.task_title_storage_sync_background),
|
|
||||||
reason = StorageSyncTriggerReason.Debounce,
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private companion object {
|
/**
|
||||||
private const val DEBOUNCE_AFTER_CHANGE_MS = 60_000L
|
* Игнорировать события во время sync и сразу после него: запись файлов при sync
|
||||||
|
* (в т.ч. через [EncryptedStorageAccessor]) иначе снова запускает debounce через 60 с.
|
||||||
|
*/
|
||||||
|
private fun shouldScheduleDebounceSync(): Boolean {
|
||||||
|
if (syncRunner.syncRunning.value) {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
return System.currentTimeMillis() >= syncRunner.debounceSuppressUntilMs.value
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,16 @@
|
|||||||
|
package com.github.nullptroma.wallenc.app.sync
|
||||||
|
|
||||||
|
import com.github.nullptroma.wallenc.domain.tasks.IStorageSyncTaskTitleFormatter
|
||||||
|
import com.github.nullptroma.wallenc.domain.tasks.StorageSyncTriggerReason
|
||||||
|
import com.github.nullptroma.wallenc.ui.resources.UiStringResolver
|
||||||
|
import com.github.nullptroma.wallenc.ui.resources.storageSyncTaskTitle
|
||||||
|
import javax.inject.Inject
|
||||||
|
import javax.inject.Singleton
|
||||||
|
|
||||||
|
@Singleton
|
||||||
|
class StorageSyncTaskTitleFormatter @Inject constructor(
|
||||||
|
private val uiStrings: UiStringResolver,
|
||||||
|
) : IStorageSyncTaskTitleFormatter {
|
||||||
|
override fun format(reason: StorageSyncTriggerReason): String =
|
||||||
|
uiStrings.storageSyncTaskTitle(reason)
|
||||||
|
}
|
||||||
@@ -6,6 +6,7 @@ import androidx.work.CoroutineWorker
|
|||||||
import androidx.work.WorkerParameters
|
import androidx.work.WorkerParameters
|
||||||
import com.github.nullptroma.wallenc.domain.tasks.StorageSyncTriggerReason
|
import com.github.nullptroma.wallenc.domain.tasks.StorageSyncTriggerReason
|
||||||
import com.github.nullptroma.wallenc.usecases.RunStorageSyncUseCase
|
import com.github.nullptroma.wallenc.usecases.RunStorageSyncUseCase
|
||||||
|
import com.github.nullptroma.wallenc.usecases.StorageSyncRunOutcome
|
||||||
import dagger.assisted.Assisted
|
import dagger.assisted.Assisted
|
||||||
import dagger.assisted.AssistedInject
|
import dagger.assisted.AssistedInject
|
||||||
import timber.log.Timber
|
import timber.log.Timber
|
||||||
@@ -19,15 +20,25 @@ class StorageSyncWorker @AssistedInject constructor(
|
|||||||
|
|
||||||
override suspend fun doWork(): Result {
|
override suspend fun doWork(): Result {
|
||||||
Timber.d("Periodic storage sync started (attempt %d)", runAttemptCount)
|
Timber.d("Periodic storage sync started (attempt %d)", runAttemptCount)
|
||||||
return runCatching {
|
return when (val outcome = syncRunner.enqueueAndAwait(StorageSyncTriggerReason.Background)) {
|
||||||
syncRunner.runBlocking(StorageSyncTriggerReason.Background)
|
StorageSyncRunOutcome.SkippedAlreadyRunning -> {
|
||||||
|
Timber.d("Periodic storage sync skipped — already running")
|
||||||
|
Result.success()
|
||||||
|
}
|
||||||
|
StorageSyncRunOutcome.Completed -> {
|
||||||
Timber.d("Periodic storage sync finished")
|
Timber.d("Periodic storage sync finished")
|
||||||
Result.success()
|
Result.success()
|
||||||
}.getOrElse { error ->
|
}
|
||||||
Timber.w(error, "Periodic storage sync failed")
|
StorageSyncRunOutcome.Cancelled -> {
|
||||||
|
Timber.d("Periodic storage sync cancelled")
|
||||||
|
Result.success()
|
||||||
|
}
|
||||||
|
is StorageSyncRunOutcome.Failed -> {
|
||||||
|
Timber.w(outcome.error, "Periodic storage sync failed")
|
||||||
Result.retry()
|
Result.retry()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
companion object {
|
companion object {
|
||||||
const val UNIQUE_WORK_NAME = "wallenc-storage-sync-periodic"
|
const val UNIQUE_WORK_NAME = "wallenc-storage-sync-periodic"
|
||||||
|
|||||||
@@ -25,6 +25,7 @@ import java.io.FilterInputStream
|
|||||||
import java.io.IOException
|
import java.io.IOException
|
||||||
import java.io.InputStream
|
import java.io.InputStream
|
||||||
import java.util.concurrent.ConcurrentHashMap
|
import java.util.concurrent.ConcurrentHashMap
|
||||||
|
import java.util.concurrent.atomic.AtomicLong
|
||||||
|
|
||||||
class YandexDiskRepository(
|
class YandexDiskRepository(
|
||||||
private val api: YandexDiskApi,
|
private val api: YandexDiskApi,
|
||||||
@@ -38,6 +39,13 @@ class YandexDiskRepository(
|
|||||||
|
|
||||||
private val listCache = ConcurrentHashMap<ListCacheKey, ResourceDto>()
|
private val listCache = ConcurrentHashMap<ListCacheKey, ResourceDto>()
|
||||||
private val getCache = ConcurrentHashMap<String, ResourceDto>()
|
private val getCache = ConcurrentHashMap<String, ResourceDto>()
|
||||||
|
private val cloudApiCallCount = AtomicLong(0)
|
||||||
|
|
||||||
|
fun cloudApiCallCount(): Long = cloudApiCallCount.get()
|
||||||
|
|
||||||
|
fun resetCloudApiCallCount() {
|
||||||
|
cloudApiCallCount.set(0)
|
||||||
|
}
|
||||||
|
|
||||||
suspend fun diskInfo(): DiskInfoDto = withContext(ioDispatcher) {
|
suspend fun diskInfo(): DiskInfoDto = withContext(ioDispatcher) {
|
||||||
val now = System.currentTimeMillis()
|
val now = System.currentTimeMillis()
|
||||||
@@ -93,7 +101,7 @@ class YandexDiskRepository(
|
|||||||
suspend fun createFolder(path: String): Unit = withContext(ioDispatcher) {
|
suspend fun createFolder(path: String): Unit = withContext(ioDispatcher) {
|
||||||
val resp = wrapAuth { api.createFolder(path) }
|
val resp = wrapAuth { api.createFolder(path) }
|
||||||
when (resp.code()) {
|
when (resp.code()) {
|
||||||
201, 409 -> invalidateDiskMetaCaches()
|
201, 409 -> invalidateDiskMetaCaches(path)
|
||||||
else -> throw failure("createFolder", resp)
|
else -> throw failure("createFolder", resp)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -101,14 +109,14 @@ class YandexDiskRepository(
|
|||||||
suspend fun delete(path: String, permanently: Boolean = true): Unit = withContext(ioDispatcher) {
|
suspend fun delete(path: String, permanently: Boolean = true): Unit = withContext(ioDispatcher) {
|
||||||
val resp = wrapAuth { api.deleteResource(path, permanently) }
|
val resp = wrapAuth { api.deleteResource(path, permanently) }
|
||||||
when (resp.code()) {
|
when (resp.code()) {
|
||||||
204 -> invalidateDiskMetaCaches()
|
204 -> invalidateDiskMetaCaches(path)
|
||||||
202 -> {
|
202 -> {
|
||||||
val link = resp.body()?.use { body -> parseLink(body) }
|
val link = resp.body()?.use { body -> parseLink(body) }
|
||||||
?: throw IOException("DELETE 202 without body")
|
?: throw IOException("DELETE 202 without body")
|
||||||
awaitOperation(link.href)
|
awaitOperation(link.href)
|
||||||
invalidateDiskMetaCaches()
|
invalidateDiskMetaCaches(path)
|
||||||
}
|
}
|
||||||
404 -> invalidateDiskMetaCaches()
|
404 -> invalidateDiskMetaCaches(path)
|
||||||
else -> throw failure("delete", resp)
|
else -> throw failure("delete", resp)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -122,7 +130,7 @@ class YandexDiskRepository(
|
|||||||
throw failure("patch", resp)
|
throw failure("patch", resp)
|
||||||
}
|
}
|
||||||
resp.body()?.close()
|
resp.body()?.close()
|
||||||
invalidateDiskMetaCaches()
|
invalidateDiskMetaCaches(path)
|
||||||
}
|
}
|
||||||
|
|
||||||
suspend fun uploadBytes(path: String, bytes: ByteArray, overwrite: Boolean = true): Unit =
|
suspend fun uploadBytes(path: String, bytes: ByteArray, overwrite: Boolean = true): Unit =
|
||||||
@@ -134,10 +142,11 @@ class YandexDiskRepository(
|
|||||||
val body = bytes.toRequestBody(OCTET_STREAM)
|
val body = bytes.toRequestBody(OCTET_STREAM)
|
||||||
val req = Request.Builder().url(link.href).put(body).build()
|
val req = Request.Builder().url(link.href).put(body).build()
|
||||||
repeat(LOCKED_RETRY_MAX) { attempt ->
|
repeat(LOCKED_RETRY_MAX) { attempt ->
|
||||||
|
recordCloudApiCall()
|
||||||
rawHttp.newCall(req).execute().use { resp ->
|
rawHttp.newCall(req).execute().use { resp ->
|
||||||
when {
|
when {
|
||||||
resp.isSuccessful -> {
|
resp.isSuccessful -> {
|
||||||
invalidateDiskMetaCaches()
|
invalidateDiskMetaCaches(path)
|
||||||
return@withContext
|
return@withContext
|
||||||
}
|
}
|
||||||
resp.code == 423 && attempt < LOCKED_RETRY_MAX - 1 ->
|
resp.code == 423 && attempt < LOCKED_RETRY_MAX - 1 ->
|
||||||
@@ -158,10 +167,11 @@ class YandexDiskRepository(
|
|||||||
val body = file.asRequestBody(OCTET_STREAM)
|
val body = file.asRequestBody(OCTET_STREAM)
|
||||||
val req = Request.Builder().url(link.href).put(body).build()
|
val req = Request.Builder().url(link.href).put(body).build()
|
||||||
repeat(LOCKED_RETRY_MAX) { attempt ->
|
repeat(LOCKED_RETRY_MAX) { attempt ->
|
||||||
|
recordCloudApiCall()
|
||||||
rawHttp.newCall(req).execute().use { resp ->
|
rawHttp.newCall(req).execute().use { resp ->
|
||||||
when {
|
when {
|
||||||
resp.isSuccessful -> {
|
resp.isSuccessful -> {
|
||||||
invalidateDiskMetaCaches()
|
invalidateDiskMetaCaches(path)
|
||||||
return@withContext
|
return@withContext
|
||||||
}
|
}
|
||||||
resp.code == 423 && attempt < LOCKED_RETRY_MAX - 1 ->
|
resp.code == 423 && attempt < LOCKED_RETRY_MAX - 1 ->
|
||||||
@@ -186,6 +196,7 @@ class YandexDiskRepository(
|
|||||||
}
|
}
|
||||||
val req = Request.Builder().url(link.href).get().build()
|
val req = Request.Builder().url(link.href).get().build()
|
||||||
repeat(LOCKED_RETRY_MAX) { attempt ->
|
repeat(LOCKED_RETRY_MAX) { attempt ->
|
||||||
|
recordCloudApiCall()
|
||||||
val resp = rawHttp.newCall(req).execute()
|
val resp = rawHttp.newCall(req).execute()
|
||||||
when {
|
when {
|
||||||
resp.isSuccessful -> {
|
resp.isSuccessful -> {
|
||||||
@@ -281,18 +292,47 @@ class YandexDiskRepository(
|
|||||||
getCache[path] = value
|
getCache[path] = value
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun invalidateDiskMetaCaches() {
|
private fun invalidateDiskMetaCaches(changedDiskPath: String? = null) {
|
||||||
synchronized(diskCacheLock) {
|
synchronized(diskCacheLock) {
|
||||||
diskInfoCached = null
|
diskInfoCached = null
|
||||||
diskInfoCachedUntilMs = 0L
|
diskInfoCachedUntilMs = 0L
|
||||||
}
|
}
|
||||||
|
if (changedDiskPath == null) {
|
||||||
listCache.clear()
|
listCache.clear()
|
||||||
getCache.clear()
|
getCache.clear()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
val prefixes = cachePrefixesForPath(changedDiskPath)
|
||||||
|
listCache.keys.removeAll { key ->
|
||||||
|
prefixes.any { prefix ->
|
||||||
|
key.path.startsWith(prefix) || prefix.startsWith(key.path.trimEnd('/'))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
getCache.keys.removeAll { cachedPath ->
|
||||||
|
prefixes.any { prefix ->
|
||||||
|
cachedPath.startsWith(prefix) || prefix.startsWith(cachedPath.trimEnd('/'))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private fun cachePrefixesForPath(diskPath: String): List<String> {
|
||||||
|
val normalized = diskPath.trimEnd('/')
|
||||||
|
val out = mutableListOf<String>()
|
||||||
|
var current = normalized
|
||||||
|
while (current.isNotEmpty()) {
|
||||||
|
out.add(current)
|
||||||
|
out.add("$current/")
|
||||||
|
val slash = current.lastIndexOf('/')
|
||||||
|
if (slash <= 0) break
|
||||||
|
current = current.substring(0, slash)
|
||||||
|
}
|
||||||
|
return out
|
||||||
}
|
}
|
||||||
|
|
||||||
private suspend inline fun <T> wrapAuth(crossinline block: suspend () -> T): T {
|
private suspend inline fun <T> wrapAuth(crossinline block: suspend () -> T): T {
|
||||||
repeat(LOCKED_RETRY_MAX) { attempt ->
|
repeat(LOCKED_RETRY_MAX) { attempt ->
|
||||||
try {
|
try {
|
||||||
|
recordCloudApiCall()
|
||||||
return block()
|
return block()
|
||||||
} catch (e: HttpException) {
|
} catch (e: HttpException) {
|
||||||
when (e.code()) {
|
when (e.code()) {
|
||||||
@@ -313,6 +353,10 @@ class YandexDiskRepository(
|
|||||||
error("unreachable")
|
error("unreachable")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private fun recordCloudApiCall() {
|
||||||
|
cloudApiCallCount.incrementAndGet()
|
||||||
|
}
|
||||||
|
|
||||||
private fun failure(op: String, resp: Response<ResponseBody>): IOException {
|
private fun failure(op: String, resp: Response<ResponseBody>): IOException {
|
||||||
val msg = resp.errorBody()?.string() ?: resp.message()
|
val msg = resp.errorBody()?.string() ?: resp.message()
|
||||||
return IOException("$op failed: HTTP ${resp.code()} $msg")
|
return IOException("$op failed: HTTP ${resp.code()} $msg")
|
||||||
|
|||||||
@@ -0,0 +1,112 @@
|
|||||||
|
package com.github.nullptroma.wallenc.domain.vault.storages.common
|
||||||
|
|
||||||
|
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournal
|
||||||
|
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournalEntry
|
||||||
|
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournalMerge
|
||||||
|
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncRevision
|
||||||
|
import kotlinx.coroutines.sync.Mutex
|
||||||
|
import kotlinx.coroutines.sync.withLock
|
||||||
|
import java.util.UUID
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Буфер журнала sync: накопление записей и безопасный flush (без потери pending при ошибке записи).
|
||||||
|
*/
|
||||||
|
class StorageSyncJournalBuffer(
|
||||||
|
private val syncActorId: String,
|
||||||
|
private val originStorageUuid: UUID?,
|
||||||
|
private val readJournal: suspend () -> StorageSyncJournal,
|
||||||
|
private val writeJournal: suspend (StorageSyncJournal) -> Unit,
|
||||||
|
) {
|
||||||
|
private val pendingMutex = Mutex()
|
||||||
|
private val flushMutex = Mutex()
|
||||||
|
private var pendingJournalEntries: StorageSyncJournal = emptyMap()
|
||||||
|
|
||||||
|
@Volatile
|
||||||
|
private var sequenceHighWatermark: Long? = null
|
||||||
|
|
||||||
|
suspend fun flushPending() {
|
||||||
|
flushMutex.withLock {
|
||||||
|
flushPendingUnderLock()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
suspend fun putEntries(entries: StorageSyncJournal) {
|
||||||
|
flushPending()
|
||||||
|
if (entries.isEmpty()) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
val current = readJournal()
|
||||||
|
val merged = StorageSyncJournalMerge.merge(current, entries)
|
||||||
|
if (merged == current) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
writeJournal(merged)
|
||||||
|
refreshSequenceHighWatermark(merged)
|
||||||
|
}
|
||||||
|
|
||||||
|
suspend fun appendEntry(path: String, entry: StorageSyncJournalEntry) {
|
||||||
|
pendingMutex.withLock {
|
||||||
|
pendingJournalEntries = pendingJournalEntries + (path to entry)
|
||||||
|
}
|
||||||
|
flushPending()
|
||||||
|
}
|
||||||
|
|
||||||
|
suspend fun nextSequence(): Long {
|
||||||
|
flushPending()
|
||||||
|
val diskMax = readJournal().values.maxOfOrNull { it.revision.sequence } ?: 0L
|
||||||
|
val pendingMax = pendingMutex.withLock {
|
||||||
|
pendingJournalEntries.values.maxOfOrNull { it.revision.sequence } ?: 0L
|
||||||
|
}
|
||||||
|
val base = maxOf(diskMax, pendingMax, sequenceHighWatermark ?: 0L)
|
||||||
|
val next = base + 1L
|
||||||
|
sequenceHighWatermark = next
|
||||||
|
return next
|
||||||
|
}
|
||||||
|
|
||||||
|
fun buildEntry(
|
||||||
|
path: String,
|
||||||
|
operation: com.github.nullptroma.wallenc.domain.datatypes.StorageSyncOperation,
|
||||||
|
sequence: Long,
|
||||||
|
): StorageSyncJournalEntry {
|
||||||
|
return StorageSyncJournalEntry(
|
||||||
|
path = path,
|
||||||
|
operation = operation,
|
||||||
|
revision = StorageSyncRevision(
|
||||||
|
sequence = sequence,
|
||||||
|
actorId = syncActorId,
|
||||||
|
createdAt = java.time.Instant.now(),
|
||||||
|
),
|
||||||
|
originStorageUuid = originStorageUuid,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
private suspend fun flushPendingUnderLock() {
|
||||||
|
val pending = pendingMutex.withLock {
|
||||||
|
if (pendingJournalEntries.isEmpty()) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
val snapshot = pendingJournalEntries
|
||||||
|
pendingJournalEntries = emptyMap()
|
||||||
|
snapshot
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
val current = readJournal()
|
||||||
|
val merged = StorageSyncJournalMerge.merge(current, pending)
|
||||||
|
if (merged != current) {
|
||||||
|
writeJournal(merged)
|
||||||
|
}
|
||||||
|
refreshSequenceHighWatermark(merged)
|
||||||
|
} catch (e: Exception) {
|
||||||
|
pendingMutex.withLock {
|
||||||
|
pendingJournalEntries = StorageSyncJournalMerge.merge(pending, pendingJournalEntries)
|
||||||
|
}
|
||||||
|
throw e
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private fun refreshSequenceHighWatermark(journal: StorageSyncJournal) {
|
||||||
|
val maxSeq = journal.values.maxOfOrNull { it.revision.sequence } ?: return
|
||||||
|
val current = sequenceHighWatermark
|
||||||
|
sequenceHighWatermark = if (current == null) maxSeq else maxOf(current, maxSeq)
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -2,6 +2,7 @@ package com.github.nullptroma.wallenc.domain.vault.storages.encrypt
|
|||||||
|
|
||||||
import com.fasterxml.jackson.module.kotlin.readValue
|
import com.fasterxml.jackson.module.kotlin.readValue
|
||||||
import com.github.nullptroma.wallenc.domain.errors.WallencException
|
import com.github.nullptroma.wallenc.domain.errors.WallencException
|
||||||
|
import com.github.nullptroma.wallenc.domain.vault.storages.common.StorageSyncJournalBuffer
|
||||||
import com.github.nullptroma.wallenc.domain.vault.storages.common.StorageSyncJournalCodec
|
import com.github.nullptroma.wallenc.domain.vault.storages.common.StorageSyncJournalCodec
|
||||||
import com.github.nullptroma.wallenc.domain.vault.storages.common.readSystemFileBytesOrEmpty
|
import com.github.nullptroma.wallenc.domain.vault.storages.common.readSystemFileBytesOrEmpty
|
||||||
import com.github.nullptroma.wallenc.domain.vault.utils.CloseHandledStreamExtension.Companion.onClosed
|
import com.github.nullptroma.wallenc.domain.vault.utils.CloseHandledStreamExtension.Companion.onClosed
|
||||||
@@ -18,14 +19,12 @@ import com.github.nullptroma.wallenc.domain.interfaces.IFile
|
|||||||
import com.github.nullptroma.wallenc.domain.interfaces.IMetaInfo
|
import com.github.nullptroma.wallenc.domain.interfaces.IMetaInfo
|
||||||
import com.github.nullptroma.wallenc.domain.interfaces.IStorageAccessor
|
import com.github.nullptroma.wallenc.domain.interfaces.IStorageAccessor
|
||||||
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournal
|
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournal
|
||||||
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournalEntry
|
|
||||||
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournalMerge
|
|
||||||
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncPaths
|
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncPaths
|
||||||
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncLock
|
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncLock
|
||||||
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncOperation
|
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncOperation
|
||||||
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncRevision
|
|
||||||
import kotlinx.coroutines.CoroutineScope
|
import kotlinx.coroutines.CoroutineScope
|
||||||
import kotlinx.coroutines.DisposableHandle
|
import kotlinx.coroutines.DisposableHandle
|
||||||
|
import kotlinx.coroutines.runBlocking
|
||||||
import kotlinx.coroutines.flow.Flow
|
import kotlinx.coroutines.flow.Flow
|
||||||
import kotlinx.coroutines.flow.MutableSharedFlow
|
import kotlinx.coroutines.flow.MutableSharedFlow
|
||||||
import kotlinx.coroutines.flow.MutableStateFlow
|
import kotlinx.coroutines.flow.MutableStateFlow
|
||||||
@@ -52,6 +51,12 @@ class EncryptedStorageAccessor(
|
|||||||
) : IStorageAccessor, DisposableHandle {
|
) : IStorageAccessor, DisposableHandle {
|
||||||
private val syncActorId = UUID.randomUUID().toString()
|
private val syncActorId = UUID.randomUUID().toString()
|
||||||
private val syncLockMutex = Mutex()
|
private val syncLockMutex = Mutex()
|
||||||
|
private val journalBuffer = StorageSyncJournalBuffer(
|
||||||
|
syncActorId = syncActorId,
|
||||||
|
originStorageUuid = null,
|
||||||
|
readJournal = { readSyncJournalUnchecked() },
|
||||||
|
writeJournal = { writeSyncJournal(it) },
|
||||||
|
)
|
||||||
private val _size = MutableStateFlow<Long?>(null)
|
private val _size = MutableStateFlow<Long?>(null)
|
||||||
override val size: StateFlow<Long?> = _size
|
override val size: StateFlow<Long?> = _size
|
||||||
|
|
||||||
@@ -110,22 +115,24 @@ class EncryptedStorageAccessor(
|
|||||||
|
|
||||||
launch {
|
launch {
|
||||||
source.numberOfFiles.collect {
|
source.numberOfFiles.collect {
|
||||||
if(it == null)
|
if (it == null) {
|
||||||
_numberOfFiles.value = null
|
_numberOfFiles.value = null
|
||||||
else
|
} else {
|
||||||
{
|
val hiddenCount = runCatching { getSystemFiles().size }.getOrNull() ?: return@collect
|
||||||
_numberOfFiles.value = it - getSystemFiles().size
|
_numberOfFiles.value = it - hiddenCount
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
launch {
|
launch {
|
||||||
source.size.collect { sourceSize ->
|
source.size.collect { sourceSize ->
|
||||||
if(sourceSize == null)
|
if (sourceSize == null) {
|
||||||
_size.value = null
|
_size.value = null
|
||||||
else
|
} else {
|
||||||
{
|
val hiddenBytes = runCatching {
|
||||||
_size.value = sourceSize - getSystemFiles().sumOf { it.metaInfo.size }
|
getSystemFiles().sumOf { file -> file.metaInfo.size }
|
||||||
|
}.getOrNull() ?: return@collect
|
||||||
|
_size.value = sourceSize - hiddenBytes
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -257,10 +264,12 @@ class EncryptedStorageAccessor(
|
|||||||
source.touchDir(encryptPath(path))
|
source.touchDir(encryptPath(path))
|
||||||
}
|
}
|
||||||
|
|
||||||
override suspend fun delete(path: String) {
|
override suspend fun delete(path: String, recordSyncJournal: Boolean) {
|
||||||
source.delete(encryptPath(path))
|
source.delete(encryptPath(path), recordSyncJournal = false)
|
||||||
|
if (recordSyncJournal) {
|
||||||
appendSyncEntry(path = path, operation = StorageSyncOperation.DELETE)
|
appendSyncEntry(path = path, operation = StorageSyncOperation.DELETE)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
override suspend fun openWrite(path: String, recordSyncJournal: Boolean): OutputStream =
|
override suspend fun openWrite(path: String, recordSyncJournal: Boolean): OutputStream =
|
||||||
openWriteInternal(path, recordJournal = recordSyncJournal)
|
openWriteInternal(path, recordJournal = recordSyncJournal)
|
||||||
@@ -270,12 +279,17 @@ class EncryptedStorageAccessor(
|
|||||||
return dataEncryptor.decryptStream(stream)
|
return dataEncryptor.decryptStream(stream)
|
||||||
}
|
}
|
||||||
|
|
||||||
override suspend fun moveToTrash(path: String) {
|
override suspend fun moveToTrash(path: String, recordSyncJournal: Boolean) {
|
||||||
source.moveToTrash(encryptPath(path))
|
source.moveToTrash(encryptPath(path), recordSyncJournal = false)
|
||||||
|
if (recordSyncJournal) {
|
||||||
appendSyncEntry(path = path, operation = StorageSyncOperation.TRASH)
|
appendSyncEntry(path = path, operation = StorageSyncOperation.TRASH)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
override fun dispose() {
|
override fun dispose() {
|
||||||
|
runBlocking {
|
||||||
|
runCatching { journalBuffer.flushPending() }
|
||||||
|
}
|
||||||
dataEncryptor.dispose()
|
dataEncryptor.dispose()
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -297,16 +311,21 @@ class EncryptedStorageAccessor(
|
|||||||
}
|
}
|
||||||
|
|
||||||
override suspend fun readSyncJournal(): StorageSyncJournal {
|
override suspend fun readSyncJournal(): StorageSyncJournal {
|
||||||
val bytes = readSystemFileBytesOrEmpty { openReadSystemFile(SYNC_JOURNAL_FILENAME) }
|
journalBuffer.flushPending()
|
||||||
return StorageSyncJournalCodec.read(jackson, bytes)
|
return readSyncJournalUnchecked()
|
||||||
|
}
|
||||||
|
|
||||||
|
override suspend fun flushPendingSyncJournal() {
|
||||||
|
journalBuffer.flushPending()
|
||||||
}
|
}
|
||||||
|
|
||||||
override suspend fun putSyncJournalEntries(entries: StorageSyncJournal) {
|
override suspend fun putSyncJournalEntries(entries: StorageSyncJournal) {
|
||||||
if (entries.isEmpty()) {
|
journalBuffer.putEntries(entries)
|
||||||
return
|
|
||||||
}
|
}
|
||||||
val merged = StorageSyncJournalMerge.merge(readSyncJournal(), entries)
|
|
||||||
writeSyncJournal(merged)
|
private suspend fun readSyncJournalUnchecked(): StorageSyncJournal {
|
||||||
|
val bytes = readSystemFileBytesOrEmpty { openReadSystemFile(SYNC_JOURNAL_FILENAME) }
|
||||||
|
return StorageSyncJournalCodec.read(jackson, bytes)
|
||||||
}
|
}
|
||||||
|
|
||||||
private suspend fun writeSyncJournal(journal: StorageSyncJournal) {
|
private suspend fun writeSyncJournal(journal: StorageSyncJournal) {
|
||||||
@@ -373,21 +392,9 @@ class EncryptedStorageAccessor(
|
|||||||
if (!StorageSyncPaths.isSyncableUserPath(cleanedPath)) {
|
if (!StorageSyncPaths.isSyncableUserPath(cleanedPath)) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
val journal = readSyncJournal()
|
val sequence = journalBuffer.nextSequence()
|
||||||
val nextSequence = (journal.values.maxOfOrNull { it.revision.sequence } ?: 0L) + 1L
|
val entry = journalBuffer.buildEntry(cleanedPath, operation, sequence)
|
||||||
putSyncJournalEntries(
|
journalBuffer.appendEntry(cleanedPath, entry)
|
||||||
mapOf(
|
|
||||||
cleanedPath to StorageSyncJournalEntry(
|
|
||||||
path = cleanedPath,
|
|
||||||
operation = operation,
|
|
||||||
revision = StorageSyncRevision(
|
|
||||||
sequence = nextSequence,
|
|
||||||
actorId = syncActorId,
|
|
||||||
createdAt = Instant.now(),
|
|
||||||
),
|
|
||||||
),
|
|
||||||
),
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private suspend fun openWriteInternal(path: String, recordJournal: Boolean): OutputStream {
|
private suspend fun openWriteInternal(path: String, recordJournal: Boolean): OutputStream {
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
package com.github.nullptroma.wallenc.domain.vault.storages.local
|
package com.github.nullptroma.wallenc.domain.vault.storages.local
|
||||||
|
|
||||||
import com.github.nullptroma.wallenc.domain.errors.WallencException
|
import com.github.nullptroma.wallenc.domain.errors.WallencException
|
||||||
|
import com.github.nullptroma.wallenc.domain.vault.storages.common.StorageSyncJournalBuffer
|
||||||
import com.github.nullptroma.wallenc.domain.vault.storages.common.StorageSyncJournalCodec
|
import com.github.nullptroma.wallenc.domain.vault.storages.common.StorageSyncJournalCodec
|
||||||
import com.github.nullptroma.wallenc.domain.vault.storages.common.readSystemFileBytesOrEmpty
|
import com.github.nullptroma.wallenc.domain.vault.storages.common.readSystemFileBytesOrEmpty
|
||||||
|
|
||||||
@@ -26,6 +27,8 @@ import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncRevision
|
|||||||
import kotlinx.coroutines.CoroutineDispatcher
|
import kotlinx.coroutines.CoroutineDispatcher
|
||||||
import kotlinx.coroutines.CoroutineScope
|
import kotlinx.coroutines.CoroutineScope
|
||||||
import kotlinx.coroutines.flow.Flow
|
import kotlinx.coroutines.flow.Flow
|
||||||
|
import kotlinx.coroutines.sync.Mutex
|
||||||
|
import kotlinx.coroutines.sync.withLock
|
||||||
import kotlinx.coroutines.flow.MutableSharedFlow
|
import kotlinx.coroutines.flow.MutableSharedFlow
|
||||||
import kotlinx.coroutines.flow.MutableStateFlow
|
import kotlinx.coroutines.flow.MutableStateFlow
|
||||||
import kotlinx.coroutines.flow.SharedFlow
|
import kotlinx.coroutines.flow.SharedFlow
|
||||||
@@ -73,6 +76,13 @@ class LocalStorageAccessor(
|
|||||||
private val _dirsUpdates = MutableSharedFlow<DataPage<IDirectory>>()
|
private val _dirsUpdates = MutableSharedFlow<DataPage<IDirectory>>()
|
||||||
override val dirsUpdates: SharedFlow<DataPage<IDirectory>> = _dirsUpdates
|
override val dirsUpdates: SharedFlow<DataPage<IDirectory>> = _dirsUpdates
|
||||||
|
|
||||||
|
private val journalBuffer = StorageSyncJournalBuffer(
|
||||||
|
syncActorId = syncActorId,
|
||||||
|
originStorageUuid = null,
|
||||||
|
readJournal = { readSyncJournalUnchecked() },
|
||||||
|
writeJournal = { writeSyncJournal(it) },
|
||||||
|
)
|
||||||
|
|
||||||
suspend fun init() = withContext(ioDispatcher) {
|
suspend fun init() = withContext(ioDispatcher) {
|
||||||
// запускам сканирование хранилища
|
// запускам сканирование хранилища
|
||||||
scanSizeAndNumOfFiles()
|
scanSizeAndNumOfFiles()
|
||||||
@@ -513,7 +523,7 @@ class LocalStorageAccessor(
|
|||||||
createDir(path)
|
createDir(path)
|
||||||
}
|
}
|
||||||
|
|
||||||
override suspend fun delete(path: String) = withContext(ioDispatcher) {
|
override suspend fun delete(path: String, recordSyncJournal: Boolean) = withContext(ioDispatcher) {
|
||||||
if (path == "/" || path.isBlank()) {
|
if (path == "/" || path.isBlank()) {
|
||||||
throw WallencException.Storage.DeleteRootForbidden()
|
throw WallencException.Storage.DeleteRootForbidden()
|
||||||
}
|
}
|
||||||
@@ -523,9 +533,11 @@ class LocalStorageAccessor(
|
|||||||
else pair.file.delete()
|
else pair.file.delete()
|
||||||
pair.metaFile.delete()
|
pair.metaFile.delete()
|
||||||
scanSizeAndNumOfFiles()
|
scanSizeAndNumOfFiles()
|
||||||
|
if (recordSyncJournal) {
|
||||||
appendSyncEntry(path = path, operation = StorageSyncOperation.DELETE)
|
appendSyncEntry(path = path, operation = StorageSyncOperation.DELETE)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
override suspend fun openWrite(path: String, recordSyncJournal: Boolean): OutputStream = withContext(ioDispatcher) {
|
override suspend fun openWrite(path: String, recordSyncJournal: Boolean): OutputStream = withContext(ioDispatcher) {
|
||||||
touchFileInternal(path, recordJournal = false)
|
touchFileInternal(path, recordJournal = false)
|
||||||
@@ -548,13 +560,15 @@ class LocalStorageAccessor(
|
|||||||
return@withContext pair.file.inputStream()
|
return@withContext pair.file.inputStream()
|
||||||
}
|
}
|
||||||
|
|
||||||
override suspend fun moveToTrash(path: String) = withContext(ioDispatcher) {
|
override suspend fun moveToTrash(path: String, recordSyncJournal: Boolean) = withContext(ioDispatcher) {
|
||||||
val pair = LocalStorageFilePair.from(_filesystemBasePath, path)
|
val pair = LocalStorageFilePair.from(_filesystemBasePath, path)
|
||||||
?: throw WallencException.Storage.FileNotFound()
|
?: throw WallencException.Storage.FileNotFound()
|
||||||
val newMeta = pair.meta.copy(isDeleted = true)
|
val newMeta = pair.meta.copy(isDeleted = true)
|
||||||
writeMeta(pair.metaFile, newMeta)
|
writeMeta(pair.metaFile, newMeta)
|
||||||
|
if (recordSyncJournal) {
|
||||||
appendSyncEntry(path = path, operation = StorageSyncOperation.TRASH)
|
appendSyncEntry(path = path, operation = StorageSyncOperation.TRASH)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
override suspend fun openReadSystemFile(name: String): InputStream = withContext(ioDispatcher) {
|
override suspend fun openReadSystemFile(name: String): InputStream = withContext(ioDispatcher) {
|
||||||
val dirPath = _filesystemBasePath.resolve(SYSTEM_HIDDEN_DIRNAME)
|
val dirPath = _filesystemBasePath.resolve(SYSTEM_HIDDEN_DIRNAME)
|
||||||
@@ -579,16 +593,21 @@ class LocalStorageAccessor(
|
|||||||
}
|
}
|
||||||
|
|
||||||
override suspend fun readSyncJournal(): StorageSyncJournal = withContext(ioDispatcher) {
|
override suspend fun readSyncJournal(): StorageSyncJournal = withContext(ioDispatcher) {
|
||||||
val bytes = readSystemFileBytesOrEmpty { openReadSystemFile(SYNC_JOURNAL_FILENAME) }
|
journalBuffer.flushPending()
|
||||||
StorageSyncJournalCodec.read(jackson, bytes)
|
readSyncJournalUnchecked()
|
||||||
|
}
|
||||||
|
|
||||||
|
override suspend fun flushPendingSyncJournal() = withContext(ioDispatcher) {
|
||||||
|
journalBuffer.flushPending()
|
||||||
}
|
}
|
||||||
|
|
||||||
override suspend fun putSyncJournalEntries(entries: StorageSyncJournal) = withContext(ioDispatcher) {
|
override suspend fun putSyncJournalEntries(entries: StorageSyncJournal) = withContext(ioDispatcher) {
|
||||||
if (entries.isEmpty()) {
|
journalBuffer.putEntries(entries)
|
||||||
return@withContext
|
|
||||||
}
|
}
|
||||||
val merged = StorageSyncJournalMerge.merge(readSyncJournal(), entries)
|
|
||||||
writeSyncJournal(merged)
|
private suspend fun readSyncJournalUnchecked(): StorageSyncJournal {
|
||||||
|
val bytes = readSystemFileBytesOrEmpty { openReadSystemFile(SYNC_JOURNAL_FILENAME) }
|
||||||
|
return StorageSyncJournalCodec.read(jackson, bytes)
|
||||||
}
|
}
|
||||||
|
|
||||||
private suspend fun writeSyncJournal(journal: StorageSyncJournal) {
|
private suspend fun writeSyncJournal(journal: StorageSyncJournal) {
|
||||||
@@ -721,21 +740,9 @@ class LocalStorageAccessor(
|
|||||||
if (!StorageSyncPaths.isSyncableUserPath(cleanedPath)) {
|
if (!StorageSyncPaths.isSyncableUserPath(cleanedPath)) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
val journal = readSyncJournal()
|
val sequence = journalBuffer.nextSequence()
|
||||||
val nextSequence = (journal.values.maxOfOrNull { it.revision.sequence } ?: 0L) + 1L
|
val entry = journalBuffer.buildEntry(cleanedPath, operation, sequence)
|
||||||
putSyncJournalEntries(
|
journalBuffer.appendEntry(cleanedPath, entry)
|
||||||
mapOf(
|
|
||||||
cleanedPath to StorageSyncJournalEntry(
|
|
||||||
path = cleanedPath,
|
|
||||||
operation = operation,
|
|
||||||
revision = StorageSyncRevision(
|
|
||||||
sequence = nextSequence,
|
|
||||||
actorId = syncActorId,
|
|
||||||
createdAt = Instant.now(),
|
|
||||||
),
|
|
||||||
),
|
|
||||||
),
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
companion object {
|
companion object {
|
||||||
|
|||||||
@@ -2,6 +2,7 @@ package com.github.nullptroma.wallenc.domain.vault.storages.yandex
|
|||||||
|
|
||||||
import com.github.nullptroma.wallenc.domain.errors.WallencException
|
import com.github.nullptroma.wallenc.domain.errors.WallencException
|
||||||
import com.github.nullptroma.wallenc.domain.vault.errors.toVaultWallencException
|
import com.github.nullptroma.wallenc.domain.vault.errors.toVaultWallencException
|
||||||
|
import com.github.nullptroma.wallenc.domain.vault.storages.common.StorageSyncJournalBuffer
|
||||||
import com.github.nullptroma.wallenc.domain.vault.storages.common.StorageSyncJournalCodec
|
import com.github.nullptroma.wallenc.domain.vault.storages.common.StorageSyncJournalCodec
|
||||||
import com.github.nullptroma.wallenc.domain.vault.storages.common.readSystemFileBytesOrEmpty
|
import com.github.nullptroma.wallenc.domain.vault.storages.common.readSystemFileBytesOrEmpty
|
||||||
|
|
||||||
@@ -19,17 +20,16 @@ import com.github.nullptroma.wallenc.domain.interfaces.IDirectory
|
|||||||
import com.github.nullptroma.wallenc.domain.interfaces.IFile
|
import com.github.nullptroma.wallenc.domain.interfaces.IFile
|
||||||
import com.github.nullptroma.wallenc.domain.interfaces.IStorageAccessor
|
import com.github.nullptroma.wallenc.domain.interfaces.IStorageAccessor
|
||||||
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournal
|
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournal
|
||||||
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournalEntry
|
|
||||||
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournalMerge
|
|
||||||
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncPaths
|
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncPaths
|
||||||
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncLock
|
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncLock
|
||||||
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncOperation
|
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncOperation
|
||||||
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncRevision
|
|
||||||
import kotlinx.coroutines.CancellationException
|
import kotlinx.coroutines.CancellationException
|
||||||
import kotlinx.coroutines.CoroutineDispatcher
|
import kotlinx.coroutines.CoroutineDispatcher
|
||||||
import kotlinx.coroutines.CoroutineScope
|
import kotlinx.coroutines.CoroutineScope
|
||||||
import kotlinx.coroutines.Job
|
import kotlinx.coroutines.Job
|
||||||
import kotlinx.coroutines.delay
|
import kotlinx.coroutines.delay
|
||||||
|
import kotlin.coroutines.coroutineContext
|
||||||
|
import kotlinx.coroutines.ensureActive
|
||||||
import kotlinx.coroutines.flow.Flow
|
import kotlinx.coroutines.flow.Flow
|
||||||
import kotlinx.coroutines.flow.MutableSharedFlow
|
import kotlinx.coroutines.flow.MutableSharedFlow
|
||||||
import kotlinx.coroutines.flow.MutableStateFlow
|
import kotlinx.coroutines.flow.MutableStateFlow
|
||||||
@@ -42,6 +42,7 @@ import kotlinx.coroutines.flow.flowOn
|
|||||||
import kotlinx.coroutines.flow.stateIn
|
import kotlinx.coroutines.flow.stateIn
|
||||||
import kotlinx.coroutines.launch
|
import kotlinx.coroutines.launch
|
||||||
import kotlinx.coroutines.runBlocking
|
import kotlinx.coroutines.runBlocking
|
||||||
|
import kotlinx.coroutines.withContext
|
||||||
import kotlinx.coroutines.sync.Mutex
|
import kotlinx.coroutines.sync.Mutex
|
||||||
import kotlinx.coroutines.sync.withLock
|
import kotlinx.coroutines.sync.withLock
|
||||||
import kotlinx.coroutines.withContext
|
import kotlinx.coroutines.withContext
|
||||||
@@ -95,6 +96,13 @@ class YandexStorageAccessor(
|
|||||||
|
|
||||||
private var statsPersistJob: Job? = null
|
private var statsPersistJob: Job? = null
|
||||||
|
|
||||||
|
private val journalBuffer = StorageSyncJournalBuffer(
|
||||||
|
syncActorId = syncActorId,
|
||||||
|
originStorageUuid = storageUuid,
|
||||||
|
readJournal = { readSyncJournalUnchecked() },
|
||||||
|
writeJournal = { writeSyncJournal(it) },
|
||||||
|
)
|
||||||
|
|
||||||
@Volatile
|
@Volatile
|
||||||
private var systemDirEnsured: Boolean = false
|
private var systemDirEnsured: Boolean = false
|
||||||
|
|
||||||
@@ -240,6 +248,7 @@ class YandexStorageAccessor(
|
|||||||
val queue = ArrayDeque<String>()
|
val queue = ArrayDeque<String>()
|
||||||
queue.add(relDir)
|
queue.add(relDir)
|
||||||
while (queue.isNotEmpty()) {
|
while (queue.isNotEmpty()) {
|
||||||
|
coroutineContext.ensureActive()
|
||||||
val rel = queue.removeFirst()
|
val rel = queue.removeFirst()
|
||||||
if (isSystemRel(rel)) continue
|
if (isSystemRel(rel)) continue
|
||||||
val (files, dirs) = listImmediateChildren(rel)
|
val (files, dirs) = listImmediateChildren(rel)
|
||||||
@@ -285,6 +294,7 @@ class YandexStorageAccessor(
|
|||||||
val dirs = mutableListOf<IDirectory>()
|
val dirs = mutableListOf<IDirectory>()
|
||||||
var offset = 0
|
var offset = 0
|
||||||
while (true) {
|
while (true) {
|
||||||
|
coroutineContext.ensureActive()
|
||||||
val res = guard { repo.list(diskPath, API_LIST_LIMIT, offset) }
|
val res = guard { repo.list(diskPath, API_LIST_LIMIT, offset) }
|
||||||
val items = res.embedded?.items.orEmpty()
|
val items = res.embedded?.items.orEmpty()
|
||||||
for (it in items) {
|
for (it in items) {
|
||||||
@@ -303,7 +313,7 @@ class YandexStorageAccessor(
|
|||||||
}
|
}
|
||||||
|
|
||||||
private suspend fun getMetadataAfterWrite(diskPath: String): ResourceDto {
|
private suspend fun getMetadataAfterWrite(diskPath: String): ResourceDto {
|
||||||
val maxAttempts = 6
|
val maxAttempts = 3
|
||||||
repeat(maxAttempts) { attempt ->
|
repeat(maxAttempts) { attempt ->
|
||||||
try {
|
try {
|
||||||
return guard { repo.get(diskPath) }
|
return guard { repo.get(diskPath) }
|
||||||
@@ -488,9 +498,9 @@ class YandexStorageAccessor(
|
|||||||
if (created) {
|
if (created) {
|
||||||
_numberOfFiles.value = (_numberOfFiles.value ?: 0) + 1
|
_numberOfFiles.value = (_numberOfFiles.value ?: 0) + 1
|
||||||
persistStatsImmediate()
|
persistStatsImmediate()
|
||||||
}
|
|
||||||
appendSyncEntry(path = path, operation = StorageSyncOperation.UPSERT)
|
appendSyncEntry(path = path, operation = StorageSyncOperation.UPSERT)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
override suspend fun touchDir(path: String): Unit = withContext(ioDispatcher) {
|
override suspend fun touchDir(path: String): Unit = withContext(ioDispatcher) {
|
||||||
val segments = pathSegments(path)
|
val segments = pathSegments(path)
|
||||||
@@ -506,7 +516,7 @@ class YandexStorageAccessor(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
override suspend fun delete(path: String) = withContext(ioDispatcher) {
|
override suspend fun delete(path: String, recordSyncJournal: Boolean) = withContext(ioDispatcher) {
|
||||||
if (path == "/" || path.isBlank()) {
|
if (path == "/" || path.isBlank()) {
|
||||||
throw WallencException.Storage.DeleteRootForbidden()
|
throw WallencException.Storage.DeleteRootForbidden()
|
||||||
}
|
}
|
||||||
@@ -528,8 +538,10 @@ class YandexStorageAccessor(
|
|||||||
}
|
}
|
||||||
guard { repo.delete(diskPath, permanently = true) }
|
guard { repo.delete(diskPath, permanently = true) }
|
||||||
scheduleStatsPersist()
|
scheduleStatsPersist()
|
||||||
|
if (recordSyncJournal) {
|
||||||
appendSyncEntry(path = path, operation = StorageSyncOperation.DELETE)
|
appendSyncEntry(path = path, operation = StorageSyncOperation.DELETE)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
override suspend fun openWrite(path: String, recordSyncJournal: Boolean): OutputStream = withContext(ioDispatcher) {
|
override suspend fun openWrite(path: String, recordSyncJournal: Boolean): OutputStream = withContext(ioDispatcher) {
|
||||||
touchParentDirs(path)
|
touchParentDirs(path)
|
||||||
@@ -546,10 +558,12 @@ class YandexStorageAccessor(
|
|||||||
guard { repo.openDownloadStream(toDiskPath(path)) }
|
guard { repo.openDownloadStream(toDiskPath(path)) }
|
||||||
}
|
}
|
||||||
|
|
||||||
override suspend fun moveToTrash(path: String) = withContext(ioDispatcher) {
|
override suspend fun moveToTrash(path: String, recordSyncJournal: Boolean) = withContext(ioDispatcher) {
|
||||||
patchCustomProps(path, mapOf(PROP_DELETED to "true"))
|
patchCustomProps(path, mapOf(PROP_DELETED to "true"))
|
||||||
|
if (recordSyncJournal) {
|
||||||
appendSyncEntry(path = path, operation = StorageSyncOperation.TRASH)
|
appendSyncEntry(path = path, operation = StorageSyncOperation.TRASH)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
override suspend fun openReadSystemFile(name: String): InputStream = withContext(ioDispatcher) {
|
override suspend fun openReadSystemFile(name: String): InputStream = withContext(ioDispatcher) {
|
||||||
ensureSystemDirExists()
|
ensureSystemDirExists()
|
||||||
@@ -576,16 +590,21 @@ class YandexStorageAccessor(
|
|||||||
}
|
}
|
||||||
|
|
||||||
override suspend fun readSyncJournal(): StorageSyncJournal = withContext(ioDispatcher) {
|
override suspend fun readSyncJournal(): StorageSyncJournal = withContext(ioDispatcher) {
|
||||||
val bytes = readSystemFileBytesOrEmpty { openReadSystemFile(SYNC_JOURNAL_FILENAME) }
|
journalBuffer.flushPending()
|
||||||
StorageSyncJournalCodec.read(statsMapper, bytes)
|
readSyncJournalUnchecked()
|
||||||
|
}
|
||||||
|
|
||||||
|
override suspend fun flushPendingSyncJournal() = withContext(ioDispatcher) {
|
||||||
|
journalBuffer.flushPending()
|
||||||
}
|
}
|
||||||
|
|
||||||
override suspend fun putSyncJournalEntries(entries: StorageSyncJournal) = withContext(ioDispatcher) {
|
override suspend fun putSyncJournalEntries(entries: StorageSyncJournal) = withContext(ioDispatcher) {
|
||||||
if (entries.isEmpty()) {
|
journalBuffer.putEntries(entries)
|
||||||
return@withContext
|
|
||||||
}
|
}
|
||||||
val merged = StorageSyncJournalMerge.merge(readSyncJournal(), entries)
|
|
||||||
writeSyncJournal(merged)
|
private suspend fun readSyncJournalUnchecked(): StorageSyncJournal {
|
||||||
|
val bytes = readSystemFileBytesOrEmpty { openReadSystemFile(SYNC_JOURNAL_FILENAME) }
|
||||||
|
return StorageSyncJournalCodec.read(statsMapper, bytes)
|
||||||
}
|
}
|
||||||
|
|
||||||
private suspend fun writeSyncJournal(journal: StorageSyncJournal) {
|
private suspend fun writeSyncJournal(journal: StorageSyncJournal) {
|
||||||
@@ -668,10 +687,12 @@ class YandexStorageAccessor(
|
|||||||
* Выполняется из [OutputStream.close]; ошибки upload пробрасываются вызывающему коду.
|
* Выполняется из [OutputStream.close]; ошибки upload пробрасываются вызывающему коду.
|
||||||
*/
|
*/
|
||||||
private fun runCommitAfterStreamClosed(block: suspend () -> Unit) {
|
private fun runCommitAfterStreamClosed(block: suspend () -> Unit) {
|
||||||
runBlocking(ioDispatcher) {
|
runBlocking {
|
||||||
|
withContext(ioDispatcher) {
|
||||||
block()
|
block()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private suspend fun commitUploadedFile(path: String, tmp: File, recordSyncJournal: Boolean) {
|
private suspend fun commitUploadedFile(path: String, tmp: File, recordSyncJournal: Boolean) {
|
||||||
try {
|
try {
|
||||||
@@ -710,22 +731,9 @@ class YandexStorageAccessor(
|
|||||||
if (!StorageSyncPaths.isSyncableUserPath(cleanedPath)) {
|
if (!StorageSyncPaths.isSyncableUserPath(cleanedPath)) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
val journal = readSyncJournal()
|
val sequence = journalBuffer.nextSequence()
|
||||||
val nextSequence = (journal.values.maxOfOrNull { it.revision.sequence } ?: 0L) + 1L
|
val entry = journalBuffer.buildEntry(cleanedPath, operation, sequence)
|
||||||
putSyncJournalEntries(
|
journalBuffer.appendEntry(cleanedPath, entry)
|
||||||
mapOf(
|
|
||||||
cleanedPath to StorageSyncJournalEntry(
|
|
||||||
path = cleanedPath,
|
|
||||||
operation = operation,
|
|
||||||
revision = StorageSyncRevision(
|
|
||||||
sequence = nextSequence,
|
|
||||||
actorId = syncActorId,
|
|
||||||
createdAt = Instant.now(),
|
|
||||||
),
|
|
||||||
originStorageUuid = storageUuid,
|
|
||||||
),
|
|
||||||
),
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private suspend fun touchParentDirs(path: String) {
|
private suspend fun touchParentDirs(path: String) {
|
||||||
|
|||||||
@@ -33,6 +33,7 @@ class YandexDiskRepositoryTest {
|
|||||||
|
|
||||||
@Test
|
@Test
|
||||||
fun diskInfoParsesResponse() = runBlocking {
|
fun diskInfoParsesResponse() = runBlocking {
|
||||||
|
repository.resetCloudApiCallCount()
|
||||||
server.enqueue(
|
server.enqueue(
|
||||||
MockResponse()
|
MockResponse()
|
||||||
.setResponseCode(200)
|
.setResponseCode(200)
|
||||||
@@ -42,6 +43,7 @@ class YandexDiskRepositoryTest {
|
|||||||
val info = repository.diskInfo()
|
val info = repository.diskInfo()
|
||||||
assertEquals(1000L, info.totalSpace)
|
assertEquals(1000L, info.totalSpace)
|
||||||
assertEquals(200L, info.usedSpace)
|
assertEquals(200L, info.usedSpace)
|
||||||
|
assertEquals(1L, repository.cloudApiCallCount())
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|||||||
@@ -0,0 +1,41 @@
|
|||||||
|
package com.github.nullptroma.wallenc.domain.vault.storages.common
|
||||||
|
|
||||||
|
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournalEntry
|
||||||
|
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncOperation
|
||||||
|
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncRevision
|
||||||
|
import kotlinx.coroutines.runBlocking
|
||||||
|
import org.junit.Assert.assertEquals
|
||||||
|
import org.junit.Assert.assertTrue
|
||||||
|
import org.junit.Test
|
||||||
|
import java.time.Instant
|
||||||
|
import java.util.concurrent.atomic.AtomicReference
|
||||||
|
|
||||||
|
class StorageSyncJournalBufferTest {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun flushRestoresPendingOnWriteFailure() = runBlocking {
|
||||||
|
val disk = AtomicReference(emptyMap<String, StorageSyncJournalEntry>())
|
||||||
|
var shouldFail = true
|
||||||
|
val buffer = StorageSyncJournalBuffer(
|
||||||
|
syncActorId = "actor",
|
||||||
|
originStorageUuid = null,
|
||||||
|
readJournal = { disk.get() },
|
||||||
|
writeJournal = {
|
||||||
|
if (shouldFail) {
|
||||||
|
error("disk unavailable")
|
||||||
|
}
|
||||||
|
disk.set(it)
|
||||||
|
},
|
||||||
|
)
|
||||||
|
val entry = buffer.buildEntry("/a.txt", StorageSyncOperation.UPSERT, 1L)
|
||||||
|
try {
|
||||||
|
buffer.appendEntry("/a.txt", entry)
|
||||||
|
} catch (_: IllegalStateException) {
|
||||||
|
// expected
|
||||||
|
}
|
||||||
|
shouldFail = false
|
||||||
|
buffer.flushPending()
|
||||||
|
assertTrue(disk.get().containsKey("/a.txt"))
|
||||||
|
assertEquals(1L, disk.get()["/a.txt"]?.revision?.sequence)
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -39,10 +39,10 @@ interface IStorageAccessor {
|
|||||||
suspend fun setHidden(path: String, hidden: Boolean)
|
suspend fun setHidden(path: String, hidden: Boolean)
|
||||||
suspend fun touchFile(path: String)
|
suspend fun touchFile(path: String)
|
||||||
suspend fun touchDir(path: String)
|
suspend fun touchDir(path: String)
|
||||||
suspend fun delete(path: String)
|
suspend fun delete(path: String, recordSyncJournal: Boolean = true)
|
||||||
suspend fun openWrite(path: String, recordSyncJournal: Boolean = true): OutputStream
|
suspend fun openWrite(path: String, recordSyncJournal: Boolean = true): OutputStream
|
||||||
suspend fun openRead(path: String): InputStream
|
suspend fun openRead(path: String): InputStream
|
||||||
suspend fun moveToTrash(path: String)
|
suspend fun moveToTrash(path: String, recordSyncJournal: Boolean = true)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Системный sidecar-файл для логических нужд хранилища (мета, ключи и т.п.).
|
* Системный sidecar-файл для логических нужд хранилища (мета, ключи и т.п.).
|
||||||
@@ -53,6 +53,10 @@ interface IStorageAccessor {
|
|||||||
suspend fun openWriteSystemFile(name: String): OutputStream
|
suspend fun openWriteSystemFile(name: String): OutputStream
|
||||||
|
|
||||||
suspend fun readSyncJournal(): StorageSyncJournal
|
suspend fun readSyncJournal(): StorageSyncJournal
|
||||||
|
|
||||||
|
/** Сбрасывает отложенные записи журнала на носитель (перед sync и при закрытии storage). */
|
||||||
|
suspend fun flushPendingSyncJournal() = Unit
|
||||||
|
|
||||||
suspend fun putSyncJournalEntries(entries: StorageSyncJournal)
|
suspend fun putSyncJournalEntries(entries: StorageSyncJournal)
|
||||||
|
|
||||||
suspend fun readSyncLock(): StorageSyncLock?
|
suspend fun readSyncLock(): StorageSyncLock?
|
||||||
|
|||||||
@@ -0,0 +1,6 @@
|
|||||||
|
package com.github.nullptroma.wallenc.domain.tasks
|
||||||
|
|
||||||
|
/** Локализованный заголовок задачи синхронизации хранилищ по источнику запуска. */
|
||||||
|
fun interface IStorageSyncTaskTitleFormatter {
|
||||||
|
fun format(reason: StorageSyncTriggerReason): String
|
||||||
|
}
|
||||||
@@ -14,4 +14,7 @@ interface TaskContext {
|
|||||||
fun log(level: TaskLogLevel, key: TaskLogKey)
|
fun log(level: TaskLogLevel, key: TaskLogKey)
|
||||||
|
|
||||||
fun fail(error: WallencException): Nothing
|
fun fail(error: WallencException): Nothing
|
||||||
|
|
||||||
|
/** Проверяет, что задача не отменена; бросает [kotlinx.coroutines.CancellationException] при отмене. */
|
||||||
|
suspend fun ensureNotCancelled()
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -22,6 +22,8 @@ import kotlinx.coroutines.CoroutineScope
|
|||||||
import kotlinx.coroutines.Job
|
import kotlinx.coroutines.Job
|
||||||
import kotlinx.coroutines.SupervisorJob
|
import kotlinx.coroutines.SupervisorJob
|
||||||
import kotlinx.coroutines.delay
|
import kotlinx.coroutines.delay
|
||||||
|
import kotlin.coroutines.coroutineContext
|
||||||
|
import kotlinx.coroutines.ensureActive
|
||||||
import kotlinx.coroutines.flow.MutableStateFlow
|
import kotlinx.coroutines.flow.MutableStateFlow
|
||||||
import kotlinx.coroutines.flow.StateFlow
|
import kotlinx.coroutines.flow.StateFlow
|
||||||
import kotlinx.coroutines.flow.asStateFlow
|
import kotlinx.coroutines.flow.asStateFlow
|
||||||
@@ -238,6 +240,10 @@ class TaskOrchestrator(
|
|||||||
override fun fail(error: WallencException): Nothing {
|
override fun fail(error: WallencException): Nothing {
|
||||||
throw TaskFailedException(error)
|
throw TaskFailedException(error)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
override suspend fun ensureNotCancelled() {
|
||||||
|
coroutineContext.ensureActive()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private class TaskFailedException(val error: WallencException) : RuntimeException()
|
private class TaskFailedException(val error: WallencException) : RuntimeException()
|
||||||
|
|||||||
@@ -32,6 +32,24 @@ class TaskOrchestratorTest {
|
|||||||
assertTrue(task.state is TaskRunState.Completed)
|
assertTrue(task.state is TaskRunState.Completed)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun cancelAllMarksRunningTaskCancelled() = runTest(dispatcher) {
|
||||||
|
val orchestrator = TaskOrchestrator(dispatcher)
|
||||||
|
val id = orchestrator.enqueue(
|
||||||
|
title = "Long",
|
||||||
|
dispatcher = dispatcher,
|
||||||
|
work = { ctx ->
|
||||||
|
ctx.reportProgress(null, null)
|
||||||
|
kotlinx.coroutines.delay(60_000)
|
||||||
|
},
|
||||||
|
)
|
||||||
|
advanceTimeBy(1)
|
||||||
|
orchestrator.cancelAll()
|
||||||
|
advanceUntilIdle()
|
||||||
|
val task = orchestrator.pipelineState.value.tasks.first { it.id == id }
|
||||||
|
assertTrue(task.state is TaskRunState.Cancelled)
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
fun cancelMarksTaskCancelled() = runTest(dispatcher) {
|
fun cancelMarksTaskCancelled() = runTest(dispatcher) {
|
||||||
val orchestrator = TaskOrchestrator(dispatcher)
|
val orchestrator = TaskOrchestrator(dispatcher)
|
||||||
|
|||||||
@@ -23,6 +23,9 @@ fun TaskLogKey.resolve(resolver: UiStringResolver): String = when (this) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fun UiStringResolver.storageSyncTaskTitle(reason: StorageSyncTriggerReason): String =
|
||||||
|
this(R.string.task_title_storage_sync, resolveSyncTriggerReason(reason))
|
||||||
|
|
||||||
private fun UiStringResolver.resolveSyncTriggerReason(reason: StorageSyncTriggerReason): String =
|
private fun UiStringResolver.resolveSyncTriggerReason(reason: StorageSyncTriggerReason): String =
|
||||||
when (reason) {
|
when (reason) {
|
||||||
StorageSyncTriggerReason.Debounce -> this(R.string.task_sync_trigger_debounce)
|
StorageSyncTriggerReason.Debounce -> this(R.string.task_sync_trigger_debounce)
|
||||||
|
|||||||
@@ -5,6 +5,7 @@ import androidx.lifecycle.viewModelScope
|
|||||||
import com.github.nullptroma.wallenc.domain.datatypes.EncryptKey
|
import com.github.nullptroma.wallenc.domain.datatypes.EncryptKey
|
||||||
import com.github.nullptroma.wallenc.domain.datatypes.StorageMetaLoadState
|
import com.github.nullptroma.wallenc.domain.datatypes.StorageMetaLoadState
|
||||||
import com.github.nullptroma.wallenc.domain.datatypes.Tree
|
import com.github.nullptroma.wallenc.domain.datatypes.Tree
|
||||||
|
import com.github.nullptroma.wallenc.domain.errors.WallencException
|
||||||
import com.github.nullptroma.wallenc.domain.errors.toWallencException
|
import com.github.nullptroma.wallenc.domain.errors.toWallencException
|
||||||
import com.github.nullptroma.wallenc.domain.interfaces.ILogger
|
import com.github.nullptroma.wallenc.domain.interfaces.ILogger
|
||||||
import com.github.nullptroma.wallenc.domain.interfaces.IStorage
|
import com.github.nullptroma.wallenc.domain.interfaces.IStorage
|
||||||
@@ -188,9 +189,14 @@ abstract class AbstractVaultBrowserViewModel(
|
|||||||
ctx.log(TaskLogLevel.Info, uiStrings(R.string.task_log_rescanning_vault_storages))
|
ctx.log(TaskLogLevel.Info, uiStrings(R.string.task_log_rescanning_vault_storages))
|
||||||
manageVaultUseCase.rescanStorages(vaultUuid)
|
manageVaultUseCase.rescanStorages(vaultUuid)
|
||||||
ctx.log(TaskLogLevel.Info, uiStrings(R.string.task_log_rescan_vault_storages_done))
|
ctx.log(TaskLogLevel.Info, uiStrings(R.string.task_log_rescan_vault_storages_done))
|
||||||
|
val vault = manageVaultUseCase.find(vaultUuid)
|
||||||
|
if (vault != null && !vault.isAvailable.value) {
|
||||||
|
emitTaskError(WallencException.Network.IoFailed())
|
||||||
|
}
|
||||||
} catch (e: Exception) {
|
} catch (e: Exception) {
|
||||||
logger.debug(TAG, "rescanStorages failed: ${e.stackTraceToString()}")
|
logger.debug(TAG, "rescanStorages failed: ${e.stackTraceToString()}")
|
||||||
ctx.log(TaskLogLevel.Error, uiStrings(R.string.task_log_rescan_vault_storages_failed))
|
ctx.log(TaskLogLevel.Error, uiStrings(R.string.task_log_rescan_vault_storages_failed))
|
||||||
|
emitTaskError(e)
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
@@ -224,7 +230,7 @@ abstract class AbstractVaultBrowserViewModel(
|
|||||||
} catch (e: Exception) {
|
} catch (e: Exception) {
|
||||||
logger.debug(TAG, "createStorage failed: ${e.stackTraceToString()}")
|
logger.debug(TAG, "createStorage failed: ${e.stackTraceToString()}")
|
||||||
ctx.log(TaskLogLevel.Error, uiStrings(R.string.task_log_add_vault_failed))
|
ctx.log(TaskLogLevel.Error, uiStrings(R.string.task_log_add_vault_failed))
|
||||||
throw e
|
emitTaskError(e)
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -208,10 +208,7 @@ class StorageSyncViewModel @Inject constructor(
|
|||||||
}
|
}
|
||||||
|
|
||||||
fun runSyncNow() {
|
fun runSyncNow() {
|
||||||
val started = runStorageSyncUseCase.enqueue(
|
val started = runStorageSyncUseCase.enqueue(StorageSyncTriggerReason.SyncTab)
|
||||||
displayTitle = uiStrings(R.string.task_title_storage_sync),
|
|
||||||
reason = StorageSyncTriggerReason.SyncTab,
|
|
||||||
)
|
|
||||||
if (!started) {
|
if (!started) {
|
||||||
updateState(
|
updateState(
|
||||||
state.value.copy(
|
state.value.copy(
|
||||||
|
|||||||
@@ -149,8 +149,7 @@
|
|||||||
<string name="task_title_remove_remote_vault">Удаление удалённого хранилища</string>
|
<string name="task_title_remove_remote_vault">Удаление удалённого хранилища</string>
|
||||||
<string name="task_title_retry_remote_vault">Повторное подключение удалённого хранилища</string>
|
<string name="task_title_retry_remote_vault">Повторное подключение удалённого хранилища</string>
|
||||||
<string name="task_title_rescan_vault_storages">Обновление списка хранилищ</string>
|
<string name="task_title_rescan_vault_storages">Обновление списка хранилищ</string>
|
||||||
<string name="task_title_storage_sync">Синхронизация хранилищ</string>
|
<string name="task_title_storage_sync">Синхронизация хранилищ (%1$s)</string>
|
||||||
<string name="task_title_storage_sync_background">Фоновая синхронизация хранилищ</string>
|
|
||||||
<string name="task_title_save_2fa_token">Сохранение 2FA токена</string>
|
<string name="task_title_save_2fa_token">Сохранение 2FA токена</string>
|
||||||
<string name="task_title_delete_2fa_token">Удаление 2FA токена</string>
|
<string name="task_title_delete_2fa_token">Удаление 2FA токена</string>
|
||||||
<string name="task_title_save_text_secret">Сохранение текстового секрета</string>
|
<string name="task_title_save_text_secret">Сохранение текстового секрета</string>
|
||||||
|
|||||||
@@ -149,8 +149,7 @@
|
|||||||
<string name="task_title_remove_remote_vault">Remove remote vault</string>
|
<string name="task_title_remove_remote_vault">Remove remote vault</string>
|
||||||
<string name="task_title_retry_remote_vault">Retry remote vault connection</string>
|
<string name="task_title_retry_remote_vault">Retry remote vault connection</string>
|
||||||
<string name="task_title_rescan_vault_storages">Rescan vault storages</string>
|
<string name="task_title_rescan_vault_storages">Rescan vault storages</string>
|
||||||
<string name="task_title_storage_sync">Storage sync</string>
|
<string name="task_title_storage_sync">Storage sync (%1$s)</string>
|
||||||
<string name="task_title_storage_sync_background">Background storage sync</string>
|
|
||||||
<string name="task_title_save_2fa_token">Save 2FA token</string>
|
<string name="task_title_save_2fa_token">Save 2FA token</string>
|
||||||
<string name="task_title_delete_2fa_token">Delete 2FA token</string>
|
<string name="task_title_delete_2fa_token">Delete 2FA token</string>
|
||||||
<string name="task_title_save_text_secret">Save text secret</string>
|
<string name="task_title_save_text_secret">Save text secret</string>
|
||||||
|
|||||||
@@ -3,15 +3,19 @@ package com.github.nullptroma.wallenc.usecases
|
|||||||
import com.github.nullptroma.wallenc.domain.errors.toWallencException
|
import com.github.nullptroma.wallenc.domain.errors.toWallencException
|
||||||
import com.github.nullptroma.wallenc.domain.interfaces.IStorageSyncEngine
|
import com.github.nullptroma.wallenc.domain.interfaces.IStorageSyncEngine
|
||||||
import com.github.nullptroma.wallenc.domain.tasks.ITaskOrchestrator
|
import com.github.nullptroma.wallenc.domain.tasks.ITaskOrchestrator
|
||||||
|
import com.github.nullptroma.wallenc.domain.tasks.IStorageSyncTaskTitleFormatter
|
||||||
import com.github.nullptroma.wallenc.domain.tasks.StorageSyncTriggerReason
|
import com.github.nullptroma.wallenc.domain.tasks.StorageSyncTriggerReason
|
||||||
import com.github.nullptroma.wallenc.domain.tasks.TaskId
|
import com.github.nullptroma.wallenc.domain.tasks.TaskId
|
||||||
import com.github.nullptroma.wallenc.domain.tasks.TaskLogKey
|
import com.github.nullptroma.wallenc.domain.tasks.TaskLogKey
|
||||||
import com.github.nullptroma.wallenc.domain.tasks.TaskLogLevel
|
import com.github.nullptroma.wallenc.domain.tasks.TaskLogLevel
|
||||||
import com.github.nullptroma.wallenc.domain.tasks.TaskProgressLabel
|
import com.github.nullptroma.wallenc.domain.tasks.TaskProgressLabel
|
||||||
|
import com.github.nullptroma.wallenc.domain.tasks.TaskRunState
|
||||||
import kotlinx.coroutines.Dispatchers
|
import kotlinx.coroutines.Dispatchers
|
||||||
import kotlinx.coroutines.flow.MutableStateFlow
|
import kotlinx.coroutines.flow.MutableStateFlow
|
||||||
import kotlinx.coroutines.flow.StateFlow
|
import kotlinx.coroutines.flow.StateFlow
|
||||||
import kotlinx.coroutines.flow.asStateFlow
|
import kotlinx.coroutines.flow.asStateFlow
|
||||||
|
import kotlinx.coroutines.flow.filter
|
||||||
|
import kotlinx.coroutines.flow.first
|
||||||
import java.util.concurrent.atomic.AtomicBoolean
|
import java.util.concurrent.atomic.AtomicBoolean
|
||||||
import javax.inject.Inject
|
import javax.inject.Inject
|
||||||
import javax.inject.Singleton
|
import javax.inject.Singleton
|
||||||
@@ -21,6 +25,7 @@ class RunStorageSyncUseCase @Inject constructor(
|
|||||||
private val orchestrator: ITaskOrchestrator,
|
private val orchestrator: ITaskOrchestrator,
|
||||||
private val syncEngine: IStorageSyncEngine,
|
private val syncEngine: IStorageSyncEngine,
|
||||||
private val syncReadiness: StorageSyncReadiness,
|
private val syncReadiness: StorageSyncReadiness,
|
||||||
|
private val taskTitleFormatter: IStorageSyncTaskTitleFormatter,
|
||||||
) {
|
) {
|
||||||
private val running = AtomicBoolean(false)
|
private val running = AtomicBoolean(false)
|
||||||
|
|
||||||
@@ -30,25 +35,29 @@ class RunStorageSyncUseCase @Inject constructor(
|
|||||||
private val _activeSyncTaskId = MutableStateFlow<TaskId?>(null)
|
private val _activeSyncTaskId = MutableStateFlow<TaskId?>(null)
|
||||||
val activeSyncTaskId: StateFlow<TaskId?> = _activeSyncTaskId.asStateFlow()
|
val activeSyncTaskId: StateFlow<TaskId?> = _activeSyncTaskId.asStateFlow()
|
||||||
|
|
||||||
|
/** Не реагировать на debounce до этого момента (мс с эпохи) после завершения sync. */
|
||||||
|
private val _debounceSuppressUntilMs = MutableStateFlow(0L)
|
||||||
|
val debounceSuppressUntilMs: StateFlow<Long> = _debounceSuppressUntilMs.asStateFlow()
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param displayTitle заголовок задачи в UI (локализованный на стороне вызова)
|
* @param reason источник запуска — заголовок задачи и лог пайплайна
|
||||||
* @param reason источник запуска — попадает в лог пайплайна
|
|
||||||
* @return false, если синхронизация уже в очереди или выполняется — новая задача не создана
|
* @return false, если синхронизация уже в очереди или выполняется — новая задача не создана
|
||||||
*/
|
*/
|
||||||
fun enqueue(displayTitle: String, reason: StorageSyncTriggerReason): Boolean {
|
fun enqueue(reason: StorageSyncTriggerReason): Boolean {
|
||||||
if (!running.compareAndSet(false, true)) {
|
if (!running.compareAndSet(false, true)) {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
_syncRunning.value = true
|
_syncRunning.value = true
|
||||||
try {
|
try {
|
||||||
val taskId = orchestrator.enqueue(
|
val taskId = orchestrator.enqueue(
|
||||||
title = displayTitle,
|
title = taskTitleFormatter.format(reason),
|
||||||
dispatcher = Dispatchers.IO,
|
dispatcher = Dispatchers.IO,
|
||||||
work = { ctx ->
|
work = { ctx ->
|
||||||
try {
|
try {
|
||||||
executeSync(
|
executeSync(
|
||||||
reason = reason,
|
reason = reason,
|
||||||
reportProgress = { fraction, label ->
|
reportProgress = { fraction, label ->
|
||||||
|
ctx.ensureNotCancelled()
|
||||||
ctx.reportProgress(fraction, label)
|
ctx.reportProgress(fraction, label)
|
||||||
},
|
},
|
||||||
log = { level, key -> ctx.log(level, key) },
|
log = { level, key -> ctx.log(level, key) },
|
||||||
@@ -68,19 +77,22 @@ class RunStorageSyncUseCase @Inject constructor(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
suspend fun runBlocking(reason: StorageSyncTriggerReason) {
|
/**
|
||||||
if (!running.compareAndSet(false, true)) {
|
* Ставит sync в пайплайн задач (как debounce / sync-tab) и ждёт завершения.
|
||||||
return
|
* Для WorkManager и других фоновых запусков без отдельного «orphan»-лога.
|
||||||
|
*/
|
||||||
|
suspend fun enqueueAndAwait(reason: StorageSyncTriggerReason): StorageSyncRunOutcome {
|
||||||
|
if (!enqueue(reason)) {
|
||||||
|
return StorageSyncRunOutcome.SkippedAlreadyRunning
|
||||||
}
|
}
|
||||||
_syncRunning.value = true
|
val taskId = _activeSyncTaskId.value
|
||||||
try {
|
?: return StorageSyncRunOutcome.Completed
|
||||||
executeSync(
|
syncRunning.filter { !it }.first()
|
||||||
reason = reason,
|
val state = orchestrator.pipelineState.value.tasks.find { it.id == taskId }?.state
|
||||||
reportProgress = { _, _ -> },
|
return when (state) {
|
||||||
log = { level, key -> orchestrator.appendPipelineLog(level, key) },
|
is TaskRunState.Failed -> StorageSyncRunOutcome.Failed(state.error)
|
||||||
)
|
TaskRunState.Cancelled -> StorageSyncRunOutcome.Cancelled
|
||||||
} finally {
|
else -> StorageSyncRunOutcome.Completed
|
||||||
clearRunningState()
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -89,6 +101,7 @@ class RunStorageSyncUseCase @Inject constructor(
|
|||||||
reportProgress: suspend (fraction: Float?, label: TaskProgressLabel?) -> Unit,
|
reportProgress: suspend (fraction: Float?, label: TaskProgressLabel?) -> Unit,
|
||||||
log: (TaskLogLevel, TaskLogKey) -> Unit,
|
log: (TaskLogLevel, TaskLogKey) -> Unit,
|
||||||
) {
|
) {
|
||||||
|
try {
|
||||||
syncReadiness.awaitReady()
|
syncReadiness.awaitReady()
|
||||||
log(TaskLogLevel.Info, TaskLogKey.SyncStarted(reason))
|
log(TaskLogLevel.Info, TaskLogKey.SyncStarted(reason))
|
||||||
reportProgress(null, TaskProgressLabel.SyncStarted)
|
reportProgress(null, TaskProgressLabel.SyncStarted)
|
||||||
@@ -103,6 +116,13 @@ class RunStorageSyncUseCase @Inject constructor(
|
|||||||
log(TaskLogLevel.Error, TaskLogKey.SyncFailed(err, reason))
|
log(TaskLogLevel.Error, TaskLogKey.SyncFailed(err, reason))
|
||||||
throw e
|
throw e
|
||||||
}
|
}
|
||||||
|
} finally {
|
||||||
|
extendDebounceSuppress()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private fun extendDebounceSuppress() {
|
||||||
|
_debounceSuppressUntilMs.value = System.currentTimeMillis() + DEBOUNCE_AFTER_CHANGE_MS
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun clearRunningState() {
|
private fun clearRunningState() {
|
||||||
@@ -110,4 +130,9 @@ class RunStorageSyncUseCase @Inject constructor(
|
|||||||
_syncRunning.value = false
|
_syncRunning.value = false
|
||||||
_activeSyncTaskId.value = null
|
_activeSyncTaskId.value = null
|
||||||
}
|
}
|
||||||
|
|
||||||
|
companion object {
|
||||||
|
/** Пауза после последнего изменения перед debounce-sync; же окно подавления после sync. */
|
||||||
|
const val DEBOUNCE_AFTER_CHANGE_MS = 60_000L
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -12,6 +12,11 @@ import com.github.nullptroma.wallenc.domain.interfaces.IStorageSyncGroupStore
|
|||||||
import com.github.nullptroma.wallenc.domain.interfaces.IVaultsManager
|
import com.github.nullptroma.wallenc.domain.interfaces.IVaultsManager
|
||||||
import com.github.nullptroma.wallenc.domain.tasks.TaskProgressLabel
|
import com.github.nullptroma.wallenc.domain.tasks.TaskProgressLabel
|
||||||
import kotlinx.coroutines.Dispatchers
|
import kotlinx.coroutines.Dispatchers
|
||||||
|
import kotlinx.coroutines.async
|
||||||
|
import kotlinx.coroutines.awaitAll
|
||||||
|
import kotlinx.coroutines.coroutineScope
|
||||||
|
import kotlin.coroutines.coroutineContext
|
||||||
|
import kotlinx.coroutines.ensureActive
|
||||||
import kotlinx.coroutines.sync.Mutex
|
import kotlinx.coroutines.sync.Mutex
|
||||||
import kotlinx.coroutines.sync.withLock
|
import kotlinx.coroutines.sync.withLock
|
||||||
import kotlinx.coroutines.withContext
|
import kotlinx.coroutines.withContext
|
||||||
@@ -48,6 +53,7 @@ class StorageSyncEngine @Inject constructor(
|
|||||||
}
|
}
|
||||||
reporter(null, TaskProgressLabel.SyncPreparing(groups.size))
|
reporter(null, TaskProgressLabel.SyncPreparing(groups.size))
|
||||||
for (group in groups) {
|
for (group in groups) {
|
||||||
|
coroutineContext.ensureActive()
|
||||||
syncGroupInternal(
|
syncGroupInternal(
|
||||||
groupId = group.id,
|
groupId = group.id,
|
||||||
reportProgress = reporter,
|
reportProgress = reporter,
|
||||||
@@ -104,6 +110,7 @@ class StorageSyncEngine @Inject constructor(
|
|||||||
try {
|
try {
|
||||||
reportProgress(null, TaskProgressLabel.SyncGroupAcquiringLocks(groupId))
|
reportProgress(null, TaskProgressLabel.SyncGroupAcquiringLocks(groupId))
|
||||||
for ((lockIndex, storage) in storages.withIndex()) {
|
for ((lockIndex, storage) in storages.withIndex()) {
|
||||||
|
coroutineContext.ensureActive()
|
||||||
reportProgress(
|
reportProgress(
|
||||||
null,
|
null,
|
||||||
TaskProgressLabel.SyncGroupLockProgress(groupId, lockIndex + 1, storages.size),
|
TaskProgressLabel.SyncGroupLockProgress(groupId, lockIndex + 1, storages.size),
|
||||||
@@ -120,7 +127,6 @@ class StorageSyncEngine @Inject constructor(
|
|||||||
val entriesByStorage = mutableMapOf<UUID, StorageSyncJournal>()
|
val entriesByStorage = mutableMapOf<UUID, StorageSyncJournal>()
|
||||||
|
|
||||||
reportProgress(null, TaskProgressLabel.SyncGroupReadingJournals(groupId))
|
reportProgress(null, TaskProgressLabel.SyncGroupReadingJournals(groupId))
|
||||||
for ((journalIndex, storage) in storages.withIndex()) {
|
|
||||||
leaseUntil = renewLocksIfNeeded(
|
leaseUntil = renewLocksIfNeeded(
|
||||||
groupId = groupId,
|
groupId = groupId,
|
||||||
lockedAccessors = lockedAccessors,
|
lockedAccessors = lockedAccessors,
|
||||||
@@ -131,11 +137,24 @@ class StorageSyncEngine @Inject constructor(
|
|||||||
reportProgress(null, TaskProgressLabel.SyncGroupCancelled(groupId))
|
reportProgress(null, TaskProgressLabel.SyncGroupCancelled(groupId))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
val journalReads = coroutineScope {
|
||||||
|
storages.mapIndexed { journalIndex, storage ->
|
||||||
|
async {
|
||||||
|
coroutineContext.ensureActive()
|
||||||
reportProgress(
|
reportProgress(
|
||||||
null,
|
null,
|
||||||
TaskProgressLabel.SyncGroupJournalProgress(groupId, journalIndex + 1, storages.size),
|
TaskProgressLabel.SyncGroupJournalProgress(
|
||||||
|
groupId,
|
||||||
|
journalIndex + 1,
|
||||||
|
storages.size,
|
||||||
|
),
|
||||||
)
|
)
|
||||||
val journal = filterSyncableJournal(storage.accessor.readSyncJournal())
|
storage.accessor.flushPendingSyncJournal()
|
||||||
|
storage to filterSyncableJournal(storage.accessor.readSyncJournal())
|
||||||
|
}
|
||||||
|
}.awaitAll()
|
||||||
|
}
|
||||||
|
for ((storage, journal) in journalReads) {
|
||||||
entriesByStorage[storage.uuid] = journal
|
entriesByStorage[storage.uuid] = journal
|
||||||
mergedByPath.putAll(
|
mergedByPath.putAll(
|
||||||
StorageSyncJournalMerge.merge(mergedByPath, journal),
|
StorageSyncJournalMerge.merge(mergedByPath, journal),
|
||||||
@@ -154,6 +173,7 @@ class StorageSyncEngine @Inject constructor(
|
|||||||
)
|
)
|
||||||
var applyFailures = 0
|
var applyFailures = 0
|
||||||
for ((pathIndex, merged) in mergedEntries.withIndex()) {
|
for ((pathIndex, merged) in mergedEntries.withIndex()) {
|
||||||
|
coroutineContext.ensureActive()
|
||||||
leaseUntil = renewLocksIfNeeded(
|
leaseUntil = renewLocksIfNeeded(
|
||||||
groupId = groupId,
|
groupId = groupId,
|
||||||
lockedAccessors = lockedAccessors,
|
lockedAccessors = lockedAccessors,
|
||||||
@@ -178,6 +198,7 @@ class StorageSyncEngine @Inject constructor(
|
|||||||
}
|
}
|
||||||
|
|
||||||
for (target in storages) {
|
for (target in storages) {
|
||||||
|
coroutineContext.ensureActive()
|
||||||
if (target.uuid == sourceStorage?.uuid) {
|
if (target.uuid == sourceStorage?.uuid) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
@@ -231,6 +252,7 @@ class StorageSyncEngine @Inject constructor(
|
|||||||
val nextLeaseUntil = now.plusSeconds(SYNC_LOCK_LEASE_SECONDS)
|
val nextLeaseUntil = now.plusSeconds(SYNC_LOCK_LEASE_SECONDS)
|
||||||
reportProgress(null, TaskProgressLabel.SyncGroupRenewingLocks(groupId))
|
reportProgress(null, TaskProgressLabel.SyncGroupRenewingLocks(groupId))
|
||||||
for (accessor in lockedAccessors) {
|
for (accessor in lockedAccessors) {
|
||||||
|
coroutineContext.ensureActive()
|
||||||
val renewed = runCatching {
|
val renewed = runCatching {
|
||||||
accessor.tryAcquireSyncLock(holderId, nextLeaseUntil)
|
accessor.tryAcquireSyncLock(holderId, nextLeaseUntil)
|
||||||
}.getOrElse { false }
|
}.getOrElse { false }
|
||||||
@@ -273,13 +295,13 @@ class StorageSyncEngine @Inject constructor(
|
|||||||
val result = when (entry.operation) {
|
val result = when (entry.operation) {
|
||||||
StorageSyncOperation.DELETE -> {
|
StorageSyncOperation.DELETE -> {
|
||||||
runCatching {
|
runCatching {
|
||||||
target.accessor.delete(entry.path)
|
target.accessor.delete(entry.path, recordSyncJournal = false)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
StorageSyncOperation.TRASH -> {
|
StorageSyncOperation.TRASH -> {
|
||||||
runCatching {
|
runCatching {
|
||||||
target.accessor.moveToTrash(entry.path)
|
target.accessor.moveToTrash(entry.path, recordSyncJournal = false)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -3,6 +3,8 @@ package com.github.nullptroma.wallenc.usecases
|
|||||||
import com.github.nullptroma.wallenc.domain.interfaces.IStorageSyncGroupStore
|
import com.github.nullptroma.wallenc.domain.interfaces.IStorageSyncGroupStore
|
||||||
import com.github.nullptroma.wallenc.domain.interfaces.IVaultsManager
|
import com.github.nullptroma.wallenc.domain.interfaces.IVaultsManager
|
||||||
import kotlinx.coroutines.delay
|
import kotlinx.coroutines.delay
|
||||||
|
import kotlin.coroutines.coroutineContext
|
||||||
|
import kotlinx.coroutines.ensureActive
|
||||||
import javax.inject.Inject
|
import javax.inject.Inject
|
||||||
import javax.inject.Singleton
|
import javax.inject.Singleton
|
||||||
|
|
||||||
@@ -27,6 +29,7 @@ class StorageSyncReadiness @Inject constructor(
|
|||||||
val deadline = System.currentTimeMillis() + timeoutMs
|
val deadline = System.currentTimeMillis() + timeoutMs
|
||||||
|
|
||||||
while (System.currentTimeMillis() < deadline) {
|
while (System.currentTimeMillis() < deadline) {
|
||||||
|
coroutineContext.ensureActive()
|
||||||
if (!isAnyVaultScanning()) {
|
if (!isAnyVaultScanning()) {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
@@ -34,6 +37,7 @@ class StorageSyncReadiness @Inject constructor(
|
|||||||
}
|
}
|
||||||
|
|
||||||
while (System.currentTimeMillis() < deadline) {
|
while (System.currentTimeMillis() < deadline) {
|
||||||
|
coroutineContext.ensureActive()
|
||||||
if (requiredUuids.all { uuid -> findStorageUseCase.find(uuid) != null }) {
|
if (requiredUuids.all { uuid -> findStorageUseCase.find(uuid) != null }) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,16 @@
|
|||||||
|
package com.github.nullptroma.wallenc.usecases
|
||||||
|
|
||||||
|
import com.github.nullptroma.wallenc.domain.errors.WallencException
|
||||||
|
|
||||||
|
/** Результат [RunStorageSyncUseCase.enqueueAndAwait]. */
|
||||||
|
sealed class StorageSyncRunOutcome {
|
||||||
|
/** Синхронизация уже выполнялась — новая задача не создана. */
|
||||||
|
data object SkippedAlreadyRunning : StorageSyncRunOutcome()
|
||||||
|
|
||||||
|
data object Completed : StorageSyncRunOutcome()
|
||||||
|
|
||||||
|
/** Задача синхронизации отменена пользователем или пайплайном. */
|
||||||
|
data object Cancelled : StorageSyncRunOutcome()
|
||||||
|
|
||||||
|
data class Failed(val error: WallencException) : StorageSyncRunOutcome()
|
||||||
|
}
|
||||||
@@ -2,6 +2,7 @@ package com.github.nullptroma.wallenc.usecases
|
|||||||
|
|
||||||
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournalEntry
|
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournalEntry
|
||||||
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncOperation
|
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncOperation
|
||||||
|
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncPaths
|
||||||
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncRevision
|
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncRevision
|
||||||
import com.github.nullptroma.wallenc.domain.interfaces.StorageSyncGroup
|
import com.github.nullptroma.wallenc.domain.interfaces.StorageSyncGroup
|
||||||
import com.github.nullptroma.wallenc.domain.interfaces.StorageSyncGroupEncryptionKind
|
import com.github.nullptroma.wallenc.domain.interfaces.StorageSyncGroupEncryptionKind
|
||||||
@@ -10,8 +11,11 @@ import com.github.nullptroma.wallenc.usecases.fakes.FakeStorage
|
|||||||
import com.github.nullptroma.wallenc.usecases.fakes.FakeStorageAccessor
|
import com.github.nullptroma.wallenc.usecases.fakes.FakeStorageAccessor
|
||||||
import com.github.nullptroma.wallenc.usecases.fakes.FakeStorageSyncGroupStore
|
import com.github.nullptroma.wallenc.usecases.fakes.FakeStorageSyncGroupStore
|
||||||
import com.github.nullptroma.wallenc.usecases.fakes.FakeVaultsManager
|
import com.github.nullptroma.wallenc.usecases.fakes.FakeVaultsManager
|
||||||
|
import kotlinx.coroutines.CancellationException
|
||||||
|
import kotlinx.coroutines.async
|
||||||
import kotlinx.coroutines.runBlocking
|
import kotlinx.coroutines.runBlocking
|
||||||
import org.junit.Assert.assertArrayEquals
|
import org.junit.Assert.assertArrayEquals
|
||||||
|
import org.junit.Assert.assertEquals
|
||||||
import org.junit.Assert.assertNull
|
import org.junit.Assert.assertNull
|
||||||
import org.junit.Assert.assertTrue
|
import org.junit.Assert.assertTrue
|
||||||
import org.junit.Test
|
import org.junit.Test
|
||||||
@@ -19,6 +23,8 @@ import java.time.Instant
|
|||||||
|
|
||||||
class StorageSyncEngineTest {
|
class StorageSyncEngineTest {
|
||||||
|
|
||||||
|
private fun norm(path: String): String = StorageSyncPaths.normalize(path)
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
fun syncAllGroupsReportsNoGroupsWhenEmpty() = runBlocking {
|
fun syncAllGroupsReportsNoGroupsWhenEmpty() = runBlocking {
|
||||||
val labels = mutableListOf<TaskProgressLabel?>()
|
val labels = mutableListOf<TaskProgressLabel?>()
|
||||||
@@ -63,7 +69,7 @@ class StorageSyncEngineTest {
|
|||||||
|
|
||||||
assertArrayEquals(payload, target.fileBytes(path))
|
assertArrayEquals(payload, target.fileBytes(path))
|
||||||
assertTrue(labels.any { it is TaskProgressLabel.SyncGroupCompleted })
|
assertTrue(labels.any { it is TaskProgressLabel.SyncGroupCompleted })
|
||||||
assertTrue(target.syncJournalEntries().any { it.path == path })
|
assertTrue(target.syncJournalEntries().any { it.path == norm(path) })
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@@ -111,6 +117,81 @@ class StorageSyncEngineTest {
|
|||||||
engine.syncGroup(group.id) { _, _ -> }
|
engine.syncGroup(group.id) { _, _ -> }
|
||||||
|
|
||||||
assertNull(target.fileBytes(path))
|
assertNull(target.fileBytes(path))
|
||||||
|
val targetEntry = target.syncJournalEntries().single { it.path == norm(path) }
|
||||||
|
assertEquals(2L, targetEntry.revision.sequence)
|
||||||
|
assertEquals("actor-b", targetEntry.revision.actorId)
|
||||||
|
assertEquals(StorageSyncOperation.DELETE, targetEntry.operation)
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun syncSkipsWhenTargetRevisionAlreadyWinner() = runBlocking {
|
||||||
|
val source = FakeStorage()
|
||||||
|
val target = FakeStorage()
|
||||||
|
val path = "already-synced.txt"
|
||||||
|
val payload = "same".encodeToByteArray()
|
||||||
|
source.putFile(path, payload)
|
||||||
|
target.putFile(path, payload)
|
||||||
|
|
||||||
|
val winner = StorageSyncJournalEntry(
|
||||||
|
path = path,
|
||||||
|
operation = StorageSyncOperation.UPSERT,
|
||||||
|
revision = StorageSyncRevision(
|
||||||
|
sequence = 5L,
|
||||||
|
actorId = "winner-actor",
|
||||||
|
createdAt = Instant.parse("2024-08-01T00:00:00Z"),
|
||||||
|
),
|
||||||
|
size = payload.size.toLong(),
|
||||||
|
)
|
||||||
|
source.addSyncJournalEntry(winner)
|
||||||
|
target.addSyncJournalEntry(winner)
|
||||||
|
|
||||||
|
val group = StorageSyncGroup(
|
||||||
|
id = "skip-group",
|
||||||
|
storageUuids = setOf(source.uuid, target.uuid),
|
||||||
|
encryptionKind = StorageSyncGroupEncryptionKind.NONE,
|
||||||
|
)
|
||||||
|
val engine = createEngine(
|
||||||
|
storages = listOf(source, target),
|
||||||
|
groups = listOf(group),
|
||||||
|
)
|
||||||
|
engine.syncGroup(group.id) { _, _ -> }
|
||||||
|
|
||||||
|
val targetJournal = target.syncJournalEntries().single { it.path == norm(path) }
|
||||||
|
assertEquals(winner.revision, targetJournal.revision)
|
||||||
|
assertEquals(winner.operation, targetJournal.operation)
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun openReadDoesNotChangeJournal() = runBlocking {
|
||||||
|
val storage = FakeStorage()
|
||||||
|
val path = "read-only.txt"
|
||||||
|
storage.putFile(path, "data".encodeToByteArray())
|
||||||
|
val before = storage.syncJournalEntries().size
|
||||||
|
|
||||||
|
storage.accessor.openRead(path).use { it.readBytes() }
|
||||||
|
|
||||||
|
assertEquals(before, storage.syncJournalEntries().size)
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun deleteWithRecordSyncJournalFalseDoesNotBumpSequence() = runBlocking {
|
||||||
|
val storage = FakeStorage()
|
||||||
|
val path = "to-delete.txt"
|
||||||
|
storage.putFile(path, "x".encodeToByteArray())
|
||||||
|
storage.addSyncJournalEntry(
|
||||||
|
StorageSyncJournalEntry(
|
||||||
|
path = path,
|
||||||
|
operation = StorageSyncOperation.UPSERT,
|
||||||
|
revision = StorageSyncRevision(10L, "prior", Instant.EPOCH),
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
|
storage.accessor.delete(path, recordSyncJournal = false)
|
||||||
|
|
||||||
|
assertNull(storage.fileBytes(path))
|
||||||
|
val entry = storage.syncJournalEntries().single { it.path == norm(path) }
|
||||||
|
assertEquals(10L, entry.revision.sequence)
|
||||||
|
assertEquals(StorageSyncOperation.UPSERT, entry.operation)
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@@ -145,7 +226,11 @@ class StorageSyncEngineTest {
|
|||||||
engine.syncGroup(group.id) { _, _ -> }
|
engine.syncGroup(group.id) { _, _ -> }
|
||||||
|
|
||||||
assertArrayEquals(payload, target.fileBytes(path))
|
assertArrayEquals(payload, target.fileBytes(path))
|
||||||
assertTrue(path in (target.accessor as FakeStorageAccessor).trashedPaths)
|
assertTrue(norm(path) in (target.accessor as FakeStorageAccessor).trashedPaths)
|
||||||
|
val targetEntry = target.syncJournalEntries().single { it.path == norm(path) }
|
||||||
|
assertEquals(3L, targetEntry.revision.sequence)
|
||||||
|
assertEquals("actor-trash", targetEntry.revision.actorId)
|
||||||
|
assertEquals(StorageSyncOperation.TRASH, targetEntry.operation)
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@@ -171,6 +256,113 @@ class StorageSyncEngineTest {
|
|||||||
)
|
)
|
||||||
engine.syncGroup(group.id) { _, label -> labels.add(label) }
|
engine.syncGroup(group.id) { _, label -> labels.add(label) }
|
||||||
assertTrue(labels.any { it is TaskProgressLabel.SyncGroupLockFailed })
|
assertTrue(labels.any { it is TaskProgressLabel.SyncGroupLockFailed })
|
||||||
|
assertNull((first.accessor as FakeStorageAccessor).syncLock)
|
||||||
|
assertNull((second.accessor as FakeStorageAccessor).syncLock)
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun syncGroupReleasesLocksAfterSuccessfulSync() = runBlocking {
|
||||||
|
val source = FakeStorage()
|
||||||
|
val target = FakeStorage()
|
||||||
|
source.addSyncJournalEntry(
|
||||||
|
StorageSyncJournalEntry(
|
||||||
|
path = "a.txt",
|
||||||
|
operation = StorageSyncOperation.UPSERT,
|
||||||
|
revision = StorageSyncRevision(1L, "x", Instant.EPOCH),
|
||||||
|
),
|
||||||
|
)
|
||||||
|
source.putFile("a.txt", "x".encodeToByteArray())
|
||||||
|
val group = StorageSyncGroup(
|
||||||
|
id = "ok",
|
||||||
|
storageUuids = setOf(source.uuid, target.uuid),
|
||||||
|
encryptionKind = StorageSyncGroupEncryptionKind.NONE,
|
||||||
|
)
|
||||||
|
val engine = createEngine(
|
||||||
|
storages = listOf(source, target),
|
||||||
|
groups = listOf(group),
|
||||||
|
)
|
||||||
|
engine.syncGroup(group.id) { _, _ -> }
|
||||||
|
assertNull((source.accessor as FakeStorageAccessor).syncLock)
|
||||||
|
assertNull((target.accessor as FakeStorageAccessor).syncLock)
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun syncGroupReleasesLocksWhenJournalReadFails() = runBlocking {
|
||||||
|
val first = FakeStorage()
|
||||||
|
val second = FakeStorage()
|
||||||
|
(first.accessor as FakeStorageAccessor).readSyncJournalThrows =
|
||||||
|
IllegalStateException("journal read failed")
|
||||||
|
val group = StorageSyncGroup(
|
||||||
|
id = "journal-fail",
|
||||||
|
storageUuids = setOf(first.uuid, second.uuid),
|
||||||
|
encryptionKind = StorageSyncGroupEncryptionKind.NONE,
|
||||||
|
)
|
||||||
|
val engine = createEngine(
|
||||||
|
storages = listOf(first, second),
|
||||||
|
groups = listOf(group),
|
||||||
|
)
|
||||||
|
runCatching { engine.syncGroup(group.id) { _, _ -> } }
|
||||||
|
assertNull((first.accessor as FakeStorageAccessor).syncLock)
|
||||||
|
assertNull((second.accessor as FakeStorageAccessor).syncLock)
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun syncGroupCooperativeCancellationReleasesLocks() = runBlocking {
|
||||||
|
val source = FakeStorage()
|
||||||
|
val target = FakeStorage()
|
||||||
|
val path = "slow.txt"
|
||||||
|
val payload = "payload".encodeToByteArray()
|
||||||
|
source.putFile(path, payload)
|
||||||
|
(source.accessor as FakeStorageAccessor).openReadDelayMs = 5_000
|
||||||
|
source.addSyncJournalEntry(
|
||||||
|
StorageSyncJournalEntry(
|
||||||
|
path = path,
|
||||||
|
operation = StorageSyncOperation.UPSERT,
|
||||||
|
revision = StorageSyncRevision(1L, "actor", Instant.EPOCH),
|
||||||
|
size = payload.size.toLong(),
|
||||||
|
),
|
||||||
|
)
|
||||||
|
val group = StorageSyncGroup(
|
||||||
|
id = "cancel-group",
|
||||||
|
storageUuids = setOf(source.uuid, target.uuid),
|
||||||
|
encryptionKind = StorageSyncGroupEncryptionKind.NONE,
|
||||||
|
)
|
||||||
|
val engine = createEngine(
|
||||||
|
storages = listOf(source, target),
|
||||||
|
groups = listOf(group),
|
||||||
|
)
|
||||||
|
val job = async {
|
||||||
|
engine.syncGroup(group.id) { _, _ -> }
|
||||||
|
}
|
||||||
|
kotlinx.coroutines.delay(50)
|
||||||
|
job.cancel()
|
||||||
|
try {
|
||||||
|
job.await()
|
||||||
|
} catch (_: CancellationException) {
|
||||||
|
// expected
|
||||||
|
}
|
||||||
|
assertNull((source.accessor as FakeStorageAccessor).syncLock)
|
||||||
|
assertNull((target.accessor as FakeStorageAccessor).syncLock)
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun syncGroupReleasesLocksWhenJournalEmpty() = runBlocking {
|
||||||
|
val first = FakeStorage()
|
||||||
|
val second = FakeStorage()
|
||||||
|
val group = StorageSyncGroup(
|
||||||
|
id = "empty-journal",
|
||||||
|
storageUuids = setOf(first.uuid, second.uuid),
|
||||||
|
encryptionKind = StorageSyncGroupEncryptionKind.NONE,
|
||||||
|
)
|
||||||
|
val labels = mutableListOf<TaskProgressLabel?>()
|
||||||
|
val engine = createEngine(
|
||||||
|
storages = listOf(first, second),
|
||||||
|
groups = listOf(group),
|
||||||
|
)
|
||||||
|
engine.syncGroup(group.id) { _, label -> labels.add(label) }
|
||||||
|
assertTrue(labels.any { it is TaskProgressLabel.SyncGroupNoJournalEntries })
|
||||||
|
assertNull((first.accessor as FakeStorageAccessor).syncLock)
|
||||||
|
assertNull((second.accessor as FakeStorageAccessor).syncLock)
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun createEngine(
|
private fun createEngine(
|
||||||
|
|||||||
@@ -41,10 +41,12 @@ class FakeStorage(
|
|||||||
}
|
}
|
||||||
|
|
||||||
fun putFile(path: String, bytes: ByteArray) {
|
fun putFile(path: String, bytes: ByteArray) {
|
||||||
accessorImpl.dataFiles[path] = bytes
|
accessorImpl.dataFiles[com.github.nullptroma.wallenc.domain.datatypes.StorageSyncPaths.normalize(path)] =
|
||||||
|
bytes
|
||||||
}
|
}
|
||||||
|
|
||||||
fun fileBytes(path: String): ByteArray? = accessorImpl.dataFiles[path]
|
fun fileBytes(path: String): ByteArray? =
|
||||||
|
accessorImpl.dataFiles[com.github.nullptroma.wallenc.domain.datatypes.StorageSyncPaths.normalize(path)]
|
||||||
|
|
||||||
fun addSyncJournalEntry(entry: StorageSyncJournalEntry) {
|
fun addSyncJournalEntry(entry: StorageSyncJournalEntry) {
|
||||||
accessorImpl.syncJournal = StorageSyncJournalMerge.merge(
|
accessorImpl.syncJournal = StorageSyncJournalMerge.merge(
|
||||||
|
|||||||
@@ -5,10 +5,14 @@ import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournal
|
|||||||
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournalEntry
|
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournalEntry
|
||||||
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournalMerge
|
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncJournalMerge
|
||||||
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncLock
|
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncLock
|
||||||
|
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncOperation
|
||||||
|
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncPaths
|
||||||
|
import com.github.nullptroma.wallenc.domain.datatypes.StorageSyncRevision
|
||||||
import com.github.nullptroma.wallenc.domain.interfaces.IDirectory
|
import com.github.nullptroma.wallenc.domain.interfaces.IDirectory
|
||||||
import com.github.nullptroma.wallenc.domain.interfaces.IFile
|
import com.github.nullptroma.wallenc.domain.interfaces.IFile
|
||||||
import com.github.nullptroma.wallenc.domain.interfaces.IMetaInfo
|
import com.github.nullptroma.wallenc.domain.interfaces.IMetaInfo
|
||||||
import com.github.nullptroma.wallenc.domain.interfaces.IStorageAccessor
|
import com.github.nullptroma.wallenc.domain.interfaces.IStorageAccessor
|
||||||
|
import kotlinx.coroutines.delay
|
||||||
import kotlinx.coroutines.flow.Flow
|
import kotlinx.coroutines.flow.Flow
|
||||||
import kotlinx.coroutines.flow.MutableSharedFlow
|
import kotlinx.coroutines.flow.MutableSharedFlow
|
||||||
import kotlinx.coroutines.flow.MutableStateFlow
|
import kotlinx.coroutines.flow.MutableStateFlow
|
||||||
@@ -23,6 +27,8 @@ import java.time.Instant
|
|||||||
|
|
||||||
class FakeStorageAccessor : IStorageAccessor {
|
class FakeStorageAccessor : IStorageAccessor {
|
||||||
val dataFiles: MutableMap<String, ByteArray> = mutableMapOf()
|
val dataFiles: MutableMap<String, ByteArray> = mutableMapOf()
|
||||||
|
|
||||||
|
private fun norm(path: String): String = StorageSyncPaths.normalize(path)
|
||||||
val trashedPaths: MutableSet<String> = mutableSetOf()
|
val trashedPaths: MutableSet<String> = mutableSetOf()
|
||||||
private val systemFiles: MutableMap<String, ByteArray> = mutableMapOf()
|
private val systemFiles: MutableMap<String, ByteArray> = mutableMapOf()
|
||||||
private val _filesUpdates = MutableSharedFlow<DataPage<IFile>>(extraBufferCapacity = 16)
|
private val _filesUpdates = MutableSharedFlow<DataPage<IFile>>(extraBufferCapacity = 16)
|
||||||
@@ -30,6 +36,8 @@ class FakeStorageAccessor : IStorageAccessor {
|
|||||||
var syncJournal: StorageSyncJournal = emptyMap()
|
var syncJournal: StorageSyncJournal = emptyMap()
|
||||||
var syncLock: StorageSyncLock? = null
|
var syncLock: StorageSyncLock? = null
|
||||||
var acquireLockResult: Boolean = true
|
var acquireLockResult: Boolean = true
|
||||||
|
var readSyncJournalThrows: Throwable? = null
|
||||||
|
var openReadDelayMs: Long = 0
|
||||||
|
|
||||||
override val size: StateFlow<Long?> = MutableStateFlow(0L)
|
override val size: StateFlow<Long?> = MutableStateFlow(0L)
|
||||||
override val numberOfFiles: StateFlow<Int?> = MutableStateFlow(0)
|
override val numberOfFiles: StateFlow<Int?> = MutableStateFlow(0)
|
||||||
@@ -50,7 +58,9 @@ class FakeStorageAccessor : IStorageAccessor {
|
|||||||
override fun getDirsFlow(path: String): Flow<DataPage<IDirectory>> = emptyFlow()
|
override fun getDirsFlow(path: String): Flow<DataPage<IDirectory>> = emptyFlow()
|
||||||
|
|
||||||
override suspend fun getFileInfo(path: String): IFile {
|
override suspend fun getFileInfo(path: String): IFile {
|
||||||
error("Not implemented in tests")
|
val key = norm(path)
|
||||||
|
val bytes = dataFiles[key] ?: throw IllegalStateException("File not found: $path")
|
||||||
|
return FakeFile(key, bytes.size.toLong())
|
||||||
}
|
}
|
||||||
|
|
||||||
override suspend fun getDirInfo(path: String): IDirectory {
|
override suspend fun getDirInfo(path: String): IDirectory {
|
||||||
@@ -63,14 +73,17 @@ class FakeStorageAccessor : IStorageAccessor {
|
|||||||
|
|
||||||
override suspend fun touchDir(path: String) = Unit
|
override suspend fun touchDir(path: String) = Unit
|
||||||
|
|
||||||
override suspend fun delete(path: String) {
|
override suspend fun delete(path: String, recordSyncJournal: Boolean) {
|
||||||
dataFiles.remove(path)
|
dataFiles.remove(norm(path))
|
||||||
|
if (recordSyncJournal) {
|
||||||
|
recordDeleteJournal(path)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
override suspend fun openWrite(path: String, recordSyncJournal: Boolean): OutputStream {
|
override suspend fun openWrite(path: String, recordSyncJournal: Boolean): OutputStream {
|
||||||
return object : ByteArrayOutputStream() {
|
return object : ByteArrayOutputStream() {
|
||||||
override fun close() {
|
override fun close() {
|
||||||
dataFiles[path] = toByteArray()
|
dataFiles[norm(path)] = toByteArray()
|
||||||
_filesUpdates.tryEmit(
|
_filesUpdates.tryEmit(
|
||||||
DataPage(
|
DataPage(
|
||||||
listOf(FakeFile(path)),
|
listOf(FakeFile(path)),
|
||||||
@@ -83,13 +96,20 @@ class FakeStorageAccessor : IStorageAccessor {
|
|||||||
}
|
}
|
||||||
|
|
||||||
override suspend fun openRead(path: String): InputStream {
|
override suspend fun openRead(path: String): InputStream {
|
||||||
val bytes = dataFiles[path] ?: throw IllegalStateException("File not found: $path")
|
if (openReadDelayMs > 0) {
|
||||||
|
delay(openReadDelayMs)
|
||||||
|
}
|
||||||
|
val bytes = dataFiles[norm(path)] ?: throw IllegalStateException("File not found: $path")
|
||||||
return ByteArrayInputStream(bytes)
|
return ByteArrayInputStream(bytes)
|
||||||
}
|
}
|
||||||
|
|
||||||
override suspend fun moveToTrash(path: String) {
|
override suspend fun moveToTrash(path: String, recordSyncJournal: Boolean) {
|
||||||
if (path in dataFiles) {
|
val key = norm(path)
|
||||||
trashedPaths.add(path)
|
if (key in dataFiles) {
|
||||||
|
trashedPaths.add(key)
|
||||||
|
if (recordSyncJournal) {
|
||||||
|
recordTrashJournal(path)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -106,12 +126,43 @@ class FakeStorageAccessor : IStorageAccessor {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
override suspend fun readSyncJournal(): StorageSyncJournal = syncJournal
|
override suspend fun readSyncJournal(): StorageSyncJournal {
|
||||||
|
readSyncJournalThrows?.let { throw it }
|
||||||
|
return syncJournal
|
||||||
|
}
|
||||||
|
|
||||||
|
override suspend fun flushPendingSyncJournal() = Unit
|
||||||
|
|
||||||
override suspend fun putSyncJournalEntries(entries: StorageSyncJournal) {
|
override suspend fun putSyncJournalEntries(entries: StorageSyncJournal) {
|
||||||
syncJournal = StorageSyncJournalMerge.merge(syncJournal, entries)
|
syncJournal = StorageSyncJournalMerge.merge(syncJournal, entries)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private suspend fun recordDeleteJournal(path: String) {
|
||||||
|
appendJournalEntry(path, StorageSyncOperation.DELETE)
|
||||||
|
}
|
||||||
|
|
||||||
|
private suspend fun recordTrashJournal(path: String) {
|
||||||
|
appendJournalEntry(path, StorageSyncOperation.TRASH)
|
||||||
|
}
|
||||||
|
|
||||||
|
private suspend fun appendJournalEntry(path: String, operation: StorageSyncOperation) {
|
||||||
|
val cleaned = norm(path)
|
||||||
|
val nextSequence = (syncJournal.values.maxOfOrNull { it.revision.sequence } ?: 0L) + 1L
|
||||||
|
putSyncJournalEntries(
|
||||||
|
mapOf(
|
||||||
|
cleaned to StorageSyncJournalEntry(
|
||||||
|
path = cleaned,
|
||||||
|
operation = operation,
|
||||||
|
revision = StorageSyncRevision(
|
||||||
|
sequence = nextSequence,
|
||||||
|
actorId = "fake-actor",
|
||||||
|
createdAt = Instant.now(),
|
||||||
|
),
|
||||||
|
),
|
||||||
|
),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
override suspend fun readSyncLock(): StorageSyncLock? = syncLock
|
override suspend fun readSyncLock(): StorageSyncLock? = syncLock
|
||||||
|
|
||||||
override suspend fun tryAcquireSyncLock(holderId: String, leaseUntil: Instant): Boolean {
|
override suspend fun tryAcquireSyncLock(holderId: String, leaseUntil: Instant): Boolean {
|
||||||
@@ -131,9 +182,12 @@ class FakeStorageAccessor : IStorageAccessor {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
class FakeFile(path: String) : IFile {
|
class FakeFile(
|
||||||
|
path: String,
|
||||||
|
size: Long = 0L,
|
||||||
|
) : IFile {
|
||||||
override val metaInfo: IMetaInfo = object : IMetaInfo {
|
override val metaInfo: IMetaInfo = object : IMetaInfo {
|
||||||
override val size: Long = 0L
|
override val size: Long = size
|
||||||
override val isDeleted: Boolean = false
|
override val isDeleted: Boolean = false
|
||||||
override val isHidden: Boolean = false
|
override val isHidden: Boolean = false
|
||||||
override val lastModified: Instant = Instant.now()
|
override val lastModified: Instant = Instant.now()
|
||||||
|
|||||||
Reference in New Issue
Block a user