Добавлен foreground сервис
This commit is contained in:
@@ -147,7 +147,7 @@ class UnlockManager(
|
||||
}
|
||||
}
|
||||
|
||||
private suspend fun closeBySourceUuid(opened: MutableMap<UUID, EncryptedStorage>, sourceUuid: UUID) {
|
||||
private fun closeBySourceUuid(opened: MutableMap<UUID, EncryptedStorage>, sourceUuid: UUID) {
|
||||
val enc = opened[sourceUuid] ?: return
|
||||
val nestedSourceUuid = enc.uuid
|
||||
if (nestedSourceUuid != sourceUuid && opened.containsKey(nestedSourceUuid)) {
|
||||
|
||||
@@ -8,6 +8,7 @@ import com.github.nullptroma.wallenc.domain.datatypes.StorageEncryptionInfo
|
||||
import com.github.nullptroma.wallenc.domain.encrypt.Encryptor
|
||||
import com.github.nullptroma.wallenc.domain.interfaces.IStorage
|
||||
import com.github.nullptroma.wallenc.domain.interfaces.IStorageMetaInfo
|
||||
import com.github.nullptroma.wallenc.domain.tasks.TaskProgress
|
||||
import kotlinx.coroutines.CoroutineDispatcher
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.DisposableHandle
|
||||
@@ -16,9 +17,11 @@ import kotlinx.coroutines.flow.Flow
|
||||
import kotlinx.coroutines.flow.MutableStateFlow
|
||||
import kotlinx.coroutines.flow.StateFlow
|
||||
import kotlinx.coroutines.flow.map
|
||||
import kotlinx.coroutines.ensureActive
|
||||
import kotlinx.coroutines.withContext
|
||||
import java.io.InputStream
|
||||
import java.util.UUID
|
||||
import kotlin.coroutines.coroutineContext
|
||||
|
||||
class EncryptedStorage private constructor(
|
||||
private val source: IStorage,
|
||||
@@ -115,7 +118,7 @@ class EncryptedStorage private constructor(
|
||||
)
|
||||
}
|
||||
|
||||
override suspend fun clearAllContent() = scope.run {
|
||||
override suspend fun clearAllContent(onProgress: suspend (TaskProgress) -> Unit) = scope.run {
|
||||
val files = accessor.getAllFiles()
|
||||
val dirs = accessor.getAllDirs()
|
||||
val paths = buildList {
|
||||
@@ -124,8 +127,22 @@ class EncryptedStorage private constructor(
|
||||
}
|
||||
.filter { it != "/" && it.isNotBlank() }
|
||||
.sortedByDescending { it.length }
|
||||
for (path in paths) {
|
||||
val total = paths.size
|
||||
if (total == 0) {
|
||||
onProgress(TaskProgress(1f, null))
|
||||
return@run
|
||||
}
|
||||
paths.forEachIndexed { index, path ->
|
||||
accessor.delete(path)
|
||||
if (index % PROGRESS_REPORT_INTERVAL == 0 || index == paths.lastIndex) {
|
||||
onProgress(
|
||||
TaskProgress(
|
||||
fraction = (index + 1).toFloat() / total,
|
||||
label = null,
|
||||
),
|
||||
)
|
||||
coroutineContext.ensureActive()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -135,6 +152,7 @@ class EncryptedStorage private constructor(
|
||||
}
|
||||
|
||||
companion object {
|
||||
private const val PROGRESS_REPORT_INTERVAL = 16
|
||||
suspend fun create(
|
||||
source: IStorage,
|
||||
key: EncryptKey,
|
||||
|
||||
@@ -6,13 +6,16 @@ import com.github.nullptroma.wallenc.domain.datatypes.StorageEncryptionInfo
|
||||
import com.github.nullptroma.wallenc.domain.interfaces.IStorage
|
||||
import com.github.nullptroma.wallenc.domain.interfaces.IStorageAccessor
|
||||
import com.github.nullptroma.wallenc.domain.interfaces.IStorageMetaInfo
|
||||
import com.github.nullptroma.wallenc.domain.tasks.TaskProgress
|
||||
import kotlinx.coroutines.CoroutineDispatcher
|
||||
import kotlinx.coroutines.ensureActive
|
||||
import kotlinx.coroutines.flow.Flow
|
||||
import kotlinx.coroutines.flow.MutableStateFlow
|
||||
import kotlinx.coroutines.flow.StateFlow
|
||||
import kotlinx.coroutines.flow.map
|
||||
import kotlinx.coroutines.withContext
|
||||
import java.io.InputStream
|
||||
import kotlin.coroutines.coroutineContext
|
||||
import java.util.UUID
|
||||
|
||||
|
||||
@@ -94,7 +97,7 @@ class LocalStorage(
|
||||
))
|
||||
}
|
||||
|
||||
override suspend fun clearAllContent() = withContext(ioDispatcher) {
|
||||
override suspend fun clearAllContent(onProgress: suspend (TaskProgress) -> Unit) = withContext(ioDispatcher) {
|
||||
val files = accessor.getAllFiles()
|
||||
val dirs = accessor.getAllDirs()
|
||||
val paths = buildList {
|
||||
@@ -103,12 +106,27 @@ class LocalStorage(
|
||||
}
|
||||
.filter { it != "/" && it.isNotBlank() }
|
||||
.sortedByDescending { it.length }
|
||||
for (path in paths) {
|
||||
val total = paths.size
|
||||
if (total == 0) {
|
||||
onProgress(TaskProgress(1f, null))
|
||||
return@withContext
|
||||
}
|
||||
paths.forEachIndexed { index, path ->
|
||||
accessor.delete(path)
|
||||
if (index % PROGRESS_REPORT_INTERVAL == 0 || index == paths.lastIndex) {
|
||||
onProgress(
|
||||
TaskProgress(
|
||||
fraction = (index + 1).toFloat() / total,
|
||||
label = null,
|
||||
),
|
||||
)
|
||||
coroutineContext.ensureActive()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
companion object {
|
||||
private const val PROGRESS_REPORT_INTERVAL = 16
|
||||
const val STORAGE_INFO_FILE_POSTFIX = ".storage-info"
|
||||
private val jackson = jacksonObjectMapper().apply { findAndRegisterModules() }
|
||||
}
|
||||
|
||||
@@ -0,0 +1,217 @@
|
||||
package com.github.nullptroma.wallenc.data.tasks
|
||||
|
||||
import com.github.nullptroma.wallenc.domain.tasks.ITaskOrchestrator
|
||||
import com.github.nullptroma.wallenc.domain.tasks.PipelineState
|
||||
import com.github.nullptroma.wallenc.domain.tasks.PipelineTask
|
||||
import com.github.nullptroma.wallenc.domain.tasks.PipelineWork
|
||||
import com.github.nullptroma.wallenc.domain.tasks.TaskContext
|
||||
import com.github.nullptroma.wallenc.domain.tasks.TaskForegroundUiState
|
||||
import com.github.nullptroma.wallenc.domain.tasks.TaskId
|
||||
import com.github.nullptroma.wallenc.domain.tasks.TaskLogLevel
|
||||
import com.github.nullptroma.wallenc.domain.tasks.TaskLogLine
|
||||
import com.github.nullptroma.wallenc.domain.tasks.TaskProgress
|
||||
import com.github.nullptroma.wallenc.domain.tasks.TaskRunState
|
||||
import kotlinx.coroutines.CoroutineDispatcher
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.Job
|
||||
import kotlinx.coroutines.SupervisorJob
|
||||
import kotlinx.coroutines.channels.Channel
|
||||
import kotlinx.coroutines.flow.MutableStateFlow
|
||||
import kotlinx.coroutines.flow.StateFlow
|
||||
import kotlinx.coroutines.flow.asStateFlow
|
||||
import kotlinx.coroutines.launch
|
||||
import kotlinx.coroutines.CancellationException
|
||||
import kotlinx.coroutines.withContext
|
||||
import java.util.Collections
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
import java.util.concurrent.atomic.AtomicReference
|
||||
|
||||
class TaskOrchestrator(
|
||||
private val ioDispatcher: CoroutineDispatcher,
|
||||
) : ITaskOrchestrator {
|
||||
|
||||
private val pipelineSupervisor = SupervisorJob()
|
||||
private val scope = CoroutineScope(pipelineSupervisor + ioDispatcher)
|
||||
|
||||
private val channel = Channel<TaskEnvelope>(Channel.UNLIMITED)
|
||||
|
||||
private val tasksById =
|
||||
Collections.synchronizedMap(linkedMapOf<TaskId, PipelineTask>())
|
||||
|
||||
private val cancelRequested = ConcurrentHashMap<TaskId, Boolean>()
|
||||
private val currentRunJob = AtomicReference<Job?>(null)
|
||||
private val runningTaskId = AtomicReference<TaskId?>(null)
|
||||
|
||||
private val _pipelineState = MutableStateFlow(PipelineState(emptyList(), null))
|
||||
override val pipelineState: StateFlow<PipelineState> = _pipelineState.asStateFlow()
|
||||
|
||||
private val logLock = Any()
|
||||
private val logBuffer = ArrayDeque<TaskLogLine>(MAX_LOG_LINES + 1)
|
||||
private val _logLines = MutableStateFlow<List<TaskLogLine>>(emptyList())
|
||||
override val logLines: StateFlow<List<TaskLogLine>> = _logLines.asStateFlow()
|
||||
|
||||
private val _foregroundUi = MutableStateFlow<TaskForegroundUiState>(TaskForegroundUiState.Hidden)
|
||||
override val foregroundUi: StateFlow<TaskForegroundUiState> = _foregroundUi.asStateFlow()
|
||||
|
||||
init {
|
||||
scope.launch {
|
||||
processLoop()
|
||||
}
|
||||
}
|
||||
|
||||
private suspend fun processLoop() {
|
||||
while (true) {
|
||||
_foregroundUi.value = TaskForegroundUiState.Hidden
|
||||
val envelope = channel.receive()
|
||||
val id = envelope.id
|
||||
if (cancelRequested[id] == true) {
|
||||
replaceTask(id) { it.copy(state = TaskRunState.Cancelled) }
|
||||
cancelRequested.remove(id)
|
||||
emitState(null)
|
||||
continue
|
||||
}
|
||||
runningTaskId.set(id)
|
||||
replaceTask(id) { it.copy(state = TaskRunState.Running(null)) }
|
||||
if (envelope.requiresForeground) {
|
||||
_foregroundUi.value = TaskForegroundUiState.Visible(envelope.title, null)
|
||||
}
|
||||
emitState(id)
|
||||
|
||||
val job = Job(pipelineSupervisor)
|
||||
currentRunJob.set(job)
|
||||
val ctx = TaskContextImpl(
|
||||
taskId = id,
|
||||
onRunningProgress = { p -> onRunningProgress(id, envelope.title, envelope.requiresForeground, p) },
|
||||
appendLog = { level, msg -> appendLogLine(level, msg) },
|
||||
)
|
||||
try {
|
||||
withContext(ioDispatcher + job) {
|
||||
envelope.work.run(ctx)
|
||||
}
|
||||
replaceTask(id) { it.copy(state = TaskRunState.Completed) }
|
||||
cancelRequested.remove(id)
|
||||
} catch (_: CancellationException) {
|
||||
cancelRequested.remove(id)
|
||||
replaceTask(id) { it.copy(state = TaskRunState.Cancelled) }
|
||||
} catch (e: Exception) {
|
||||
cancelRequested.remove(id)
|
||||
replaceTask(id) {
|
||||
it.copy(state = TaskRunState.Failed(e.message ?: e.toString()))
|
||||
}
|
||||
} finally {
|
||||
currentRunJob.set(null)
|
||||
runningTaskId.set(null)
|
||||
emitState(null)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun onRunningProgress(
|
||||
taskId: TaskId,
|
||||
title: String,
|
||||
requiresForeground: Boolean,
|
||||
progress: TaskProgress,
|
||||
) {
|
||||
replaceTask(taskId) { it.copy(state = TaskRunState.Running(progress)) }
|
||||
if (requiresForeground) {
|
||||
_foregroundUi.value = TaskForegroundUiState.Visible(title, progress)
|
||||
}
|
||||
emitState(taskId)
|
||||
}
|
||||
|
||||
override fun enqueue(title: String, requiresForeground: Boolean, work: PipelineWork): TaskId {
|
||||
val id = TaskId()
|
||||
val task = PipelineTask(
|
||||
id = id,
|
||||
title = title,
|
||||
requiresForeground = requiresForeground,
|
||||
state = TaskRunState.Queued,
|
||||
)
|
||||
synchronized(tasksById) {
|
||||
tasksById[id] = task
|
||||
}
|
||||
emitState(runningTaskId.get())
|
||||
channel.trySend(TaskEnvelope(id, title, requiresForeground, work))
|
||||
return id
|
||||
}
|
||||
|
||||
override fun cancel(taskId: TaskId): Boolean {
|
||||
val exists = synchronized(tasksById) { tasksById.containsKey(taskId) }
|
||||
if (!exists) return false
|
||||
cancelRequested[taskId] = true
|
||||
if (runningTaskId.get() == taskId) {
|
||||
currentRunJob.get()?.cancel()
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
override fun cancelCurrent(): Boolean {
|
||||
val id = runningTaskId.get() ?: return false
|
||||
return cancel(id)
|
||||
}
|
||||
|
||||
override fun cancelAll() {
|
||||
val ids = synchronized(tasksById) { tasksById.keys.toList() }
|
||||
for (id in ids) {
|
||||
cancelRequested[id] = true
|
||||
}
|
||||
currentRunJob.get()?.cancel()
|
||||
}
|
||||
|
||||
private fun replaceTask(id: TaskId, fn: (PipelineTask) -> PipelineTask) {
|
||||
synchronized(tasksById) {
|
||||
val cur = tasksById[id] ?: return
|
||||
tasksById[id] = fn(cur)
|
||||
}
|
||||
}
|
||||
|
||||
private fun emitState(currentId: TaskId?) {
|
||||
val snapshot = synchronized(tasksById) {
|
||||
tasksById.values.toList()
|
||||
}
|
||||
_pipelineState.value = PipelineState(
|
||||
tasks = snapshot,
|
||||
currentTaskId = currentId ?: runningTaskId.get(),
|
||||
)
|
||||
}
|
||||
|
||||
private fun appendLogLine(level: TaskLogLevel, message: String) {
|
||||
val line = TaskLogLine(
|
||||
timestampMs = System.currentTimeMillis(),
|
||||
level = level,
|
||||
message = message,
|
||||
)
|
||||
synchronized(logLock) {
|
||||
if (logBuffer.size >= MAX_LOG_LINES) {
|
||||
logBuffer.removeFirst()
|
||||
}
|
||||
logBuffer.addLast(line)
|
||||
_logLines.value = logBuffer.toList()
|
||||
}
|
||||
}
|
||||
|
||||
private class TaskEnvelope(
|
||||
val id: TaskId,
|
||||
val title: String,
|
||||
val requiresForeground: Boolean,
|
||||
val work: PipelineWork,
|
||||
)
|
||||
|
||||
private class TaskContextImpl(
|
||||
override val taskId: TaskId,
|
||||
private val onRunningProgress: (TaskProgress) -> Unit,
|
||||
private val appendLog: (TaskLogLevel, String) -> Unit,
|
||||
) : TaskContext {
|
||||
override suspend fun reportProgress(fraction: Float?, label: String?) {
|
||||
onRunningProgress(TaskProgress(fraction, label))
|
||||
}
|
||||
|
||||
override fun log(level: TaskLogLevel, message: String) {
|
||||
appendLog(level, message)
|
||||
}
|
||||
}
|
||||
|
||||
companion object {
|
||||
private const val MAX_LOG_LINES = 500
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user