Костыль для подавления цикла синхронизации
This commit is contained in:
@@ -9,6 +9,7 @@ import kotlinx.coroutines.Dispatchers
|
|||||||
import kotlinx.coroutines.FlowPreview
|
import kotlinx.coroutines.FlowPreview
|
||||||
import kotlinx.coroutines.SupervisorJob
|
import kotlinx.coroutines.SupervisorJob
|
||||||
import kotlinx.coroutines.flow.collectLatest
|
import kotlinx.coroutines.flow.collectLatest
|
||||||
|
import kotlinx.coroutines.flow.combine
|
||||||
import kotlinx.coroutines.flow.debounce
|
import kotlinx.coroutines.flow.debounce
|
||||||
import kotlinx.coroutines.flow.filter
|
import kotlinx.coroutines.flow.filter
|
||||||
import kotlinx.coroutines.flow.map
|
import kotlinx.coroutines.flow.map
|
||||||
@@ -29,7 +30,12 @@ class StorageSyncBootstrap @Inject constructor(
|
|||||||
fun start() {
|
fun start() {
|
||||||
scheduler.ensureScheduled()
|
scheduler.ensureScheduled()
|
||||||
scope.launch {
|
scope.launch {
|
||||||
vaultsManager.allStorages.collectLatest { storages ->
|
combine(
|
||||||
|
vaultsManager.allStorages,
|
||||||
|
vaultsManager.unlockManager.openedStorages,
|
||||||
|
) { rootStorages, opened ->
|
||||||
|
(rootStorages + opened.values).distinctBy { it.uuid }
|
||||||
|
}.collectLatest { storages ->
|
||||||
if (storages.isEmpty()) {
|
if (storages.isEmpty()) {
|
||||||
return@collectLatest
|
return@collectLatest
|
||||||
}
|
}
|
||||||
@@ -52,18 +58,23 @@ class StorageSyncBootstrap @Inject constructor(
|
|||||||
)
|
)
|
||||||
}
|
}
|
||||||
merge(*triggers.toTypedArray())
|
merge(*triggers.toTypedArray())
|
||||||
.debounce(DEBOUNCE_AFTER_CHANGE_MS)
|
.filter { shouldScheduleDebounceSync() }
|
||||||
|
.debounce(RunStorageSyncUseCase.DEBOUNCE_AFTER_CHANGE_MS)
|
||||||
.collect {
|
.collect {
|
||||||
if (syncRunner.syncRunning.value) {
|
|
||||||
return@collect
|
|
||||||
}
|
|
||||||
syncRunner.enqueue(StorageSyncTriggerReason.Debounce)
|
syncRunner.enqueue(StorageSyncTriggerReason.Debounce)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private companion object {
|
/**
|
||||||
private const val DEBOUNCE_AFTER_CHANGE_MS = 60_000L
|
* Игнорировать события во время sync и сразу после него: запись файлов при sync
|
||||||
|
* (в т.ч. через [EncryptedStorageAccessor]) иначе снова запускает debounce через 60 с.
|
||||||
|
*/
|
||||||
|
private fun shouldScheduleDebounceSync(): Boolean {
|
||||||
|
if (syncRunner.syncRunning.value) {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
return System.currentTimeMillis() >= syncRunner.debounceSuppressUntilMs.value
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -32,6 +32,10 @@ class RunStorageSyncUseCase @Inject constructor(
|
|||||||
private val _activeSyncTaskId = MutableStateFlow<TaskId?>(null)
|
private val _activeSyncTaskId = MutableStateFlow<TaskId?>(null)
|
||||||
val activeSyncTaskId: StateFlow<TaskId?> = _activeSyncTaskId.asStateFlow()
|
val activeSyncTaskId: StateFlow<TaskId?> = _activeSyncTaskId.asStateFlow()
|
||||||
|
|
||||||
|
/** Не реагировать на debounce до этого момента (мс с эпохи) после завершения sync. */
|
||||||
|
private val _debounceSuppressUntilMs = MutableStateFlow(0L)
|
||||||
|
val debounceSuppressUntilMs: StateFlow<Long> = _debounceSuppressUntilMs.asStateFlow()
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param reason источник запуска — заголовок задачи и лог пайплайна
|
* @param reason источник запуска — заголовок задачи и лог пайплайна
|
||||||
* @return false, если синхронизация уже в очереди или выполняется — новая задача не создана
|
* @return false, если синхронизация уже в очереди или выполняется — новая задача не создана
|
||||||
@@ -90,25 +94,38 @@ class RunStorageSyncUseCase @Inject constructor(
|
|||||||
reportProgress: suspend (fraction: Float?, label: TaskProgressLabel?) -> Unit,
|
reportProgress: suspend (fraction: Float?, label: TaskProgressLabel?) -> Unit,
|
||||||
log: (TaskLogLevel, TaskLogKey) -> Unit,
|
log: (TaskLogLevel, TaskLogKey) -> Unit,
|
||||||
) {
|
) {
|
||||||
syncReadiness.awaitReady()
|
|
||||||
log(TaskLogLevel.Info, TaskLogKey.SyncStarted(reason))
|
|
||||||
reportProgress(null, TaskProgressLabel.SyncStarted)
|
|
||||||
try {
|
try {
|
||||||
syncEngine.syncAllGroups { fraction, label ->
|
syncReadiness.awaitReady()
|
||||||
reportProgress(fraction, label)
|
log(TaskLogLevel.Info, TaskLogKey.SyncStarted(reason))
|
||||||
|
reportProgress(null, TaskProgressLabel.SyncStarted)
|
||||||
|
try {
|
||||||
|
syncEngine.syncAllGroups { fraction, label ->
|
||||||
|
reportProgress(fraction, label)
|
||||||
|
}
|
||||||
|
log(TaskLogLevel.Info, TaskLogKey.SyncFinished(reason))
|
||||||
|
reportProgress(null, TaskProgressLabel.SyncCompleted)
|
||||||
|
} catch (e: Exception) {
|
||||||
|
val err = e.toWallencException()
|
||||||
|
log(TaskLogLevel.Error, TaskLogKey.SyncFailed(err, reason))
|
||||||
|
throw e
|
||||||
}
|
}
|
||||||
log(TaskLogLevel.Info, TaskLogKey.SyncFinished(reason))
|
} finally {
|
||||||
reportProgress(null, TaskProgressLabel.SyncCompleted)
|
extendDebounceSuppress()
|
||||||
} catch (e: Exception) {
|
|
||||||
val err = e.toWallencException()
|
|
||||||
log(TaskLogLevel.Error, TaskLogKey.SyncFailed(err, reason))
|
|
||||||
throw e
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private fun extendDebounceSuppress() {
|
||||||
|
_debounceSuppressUntilMs.value = System.currentTimeMillis() + DEBOUNCE_AFTER_CHANGE_MS
|
||||||
|
}
|
||||||
|
|
||||||
private fun clearRunningState() {
|
private fun clearRunningState() {
|
||||||
running.set(false)
|
running.set(false)
|
||||||
_syncRunning.value = false
|
_syncRunning.value = false
|
||||||
_activeSyncTaskId.value = null
|
_activeSyncTaskId.value = null
|
||||||
}
|
}
|
||||||
|
|
||||||
|
companion object {
|
||||||
|
/** Пауза после последнего изменения перед debounce-sync; же окно подавления после sync. */
|
||||||
|
const val DEBOUNCE_AFTER_CHANGE_MS = 60_000L
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user