Новый конвейер задач и уведомлений
This commit is contained in:
@@ -5,6 +5,7 @@ import com.github.nullptroma.wallenc.domain.tasks.PipelineState
|
||||
import com.github.nullptroma.wallenc.domain.tasks.PipelineTask
|
||||
import com.github.nullptroma.wallenc.domain.tasks.PipelineWork
|
||||
import com.github.nullptroma.wallenc.domain.tasks.TaskContext
|
||||
import com.github.nullptroma.wallenc.domain.tasks.TaskForegroundItem
|
||||
import com.github.nullptroma.wallenc.domain.tasks.TaskForegroundUiState
|
||||
import com.github.nullptroma.wallenc.domain.tasks.TaskId
|
||||
import com.github.nullptroma.wallenc.domain.tasks.TaskLogLevel
|
||||
@@ -13,20 +14,16 @@ import com.github.nullptroma.wallenc.domain.tasks.TaskProgress
|
||||
import com.github.nullptroma.wallenc.domain.tasks.TaskRunState
|
||||
import kotlinx.coroutines.CoroutineDispatcher
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.Deferred
|
||||
import kotlinx.coroutines.Job
|
||||
import kotlinx.coroutines.SupervisorJob
|
||||
import kotlinx.coroutines.channels.Channel
|
||||
import kotlinx.coroutines.async
|
||||
import kotlinx.coroutines.CancellationException
|
||||
import kotlinx.coroutines.delay
|
||||
import kotlinx.coroutines.flow.MutableStateFlow
|
||||
import kotlinx.coroutines.flow.StateFlow
|
||||
import kotlinx.coroutines.flow.asStateFlow
|
||||
import kotlinx.coroutines.coroutineScope
|
||||
import kotlinx.coroutines.launch
|
||||
import kotlinx.coroutines.CancellationException
|
||||
import java.util.Collections
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
import java.util.concurrent.atomic.AtomicReference
|
||||
|
||||
class TaskOrchestrator(
|
||||
private val ioDispatcher: CoroutineDispatcher,
|
||||
@@ -35,16 +32,15 @@ class TaskOrchestrator(
|
||||
private val pipelineSupervisor = SupervisorJob()
|
||||
private val scope = CoroutineScope(pipelineSupervisor + ioDispatcher)
|
||||
|
||||
private val channel = Channel<TaskEnvelope>(Channel.UNLIMITED)
|
||||
|
||||
private val tasksById =
|
||||
Collections.synchronizedMap(linkedMapOf<TaskId, PipelineTask>())
|
||||
|
||||
private val cancelRequested = ConcurrentHashMap<TaskId, Boolean>()
|
||||
private val currentRunJob = AtomicReference<Job?>(null)
|
||||
private val runningTaskId = AtomicReference<TaskId?>(null)
|
||||
private val runningJobs = ConcurrentHashMap<TaskId, Job>()
|
||||
private val delayedForegroundJobs = ConcurrentHashMap<TaskId, Job>()
|
||||
private val visibleForegroundTaskIds = Collections.synchronizedSet(linkedSetOf<TaskId>())
|
||||
|
||||
private val _pipelineState = MutableStateFlow(PipelineState(emptyList(), null))
|
||||
private val _pipelineState = MutableStateFlow(PipelineState(emptyList(), emptySet()))
|
||||
override val pipelineState: StateFlow<PipelineState> = _pipelineState.asStateFlow()
|
||||
|
||||
private val logLock = Any()
|
||||
@@ -55,87 +51,25 @@ class TaskOrchestrator(
|
||||
private val _foregroundUi = MutableStateFlow<TaskForegroundUiState>(TaskForegroundUiState.Hidden)
|
||||
override val foregroundUi: StateFlow<TaskForegroundUiState> = _foregroundUi.asStateFlow()
|
||||
|
||||
init {
|
||||
scope.launch {
|
||||
processLoop()
|
||||
}
|
||||
}
|
||||
|
||||
private suspend fun processLoop() {
|
||||
while (true) {
|
||||
_foregroundUi.value = TaskForegroundUiState.Hidden
|
||||
val envelope = channel.receive()
|
||||
val id = envelope.id
|
||||
if (cancelRequested[id] == true) {
|
||||
replaceTask(id) { it.copy(state = TaskRunState.Cancelled) }
|
||||
cancelRequested.remove(id)
|
||||
emitState(null)
|
||||
continue
|
||||
}
|
||||
runningTaskId.set(id)
|
||||
replaceTask(id) { it.copy(state = TaskRunState.Running(null)) }
|
||||
if (envelope.requiresForeground) {
|
||||
_foregroundUi.value = TaskForegroundUiState.Visible(envelope.title, null)
|
||||
}
|
||||
emitState(id)
|
||||
|
||||
val ctx = TaskContextImpl(
|
||||
taskId = id,
|
||||
onRunningProgress = { p -> onRunningProgress(id, envelope.title, envelope.requiresForeground, p) },
|
||||
appendLog = { level, msg -> appendLogLine(level, msg) },
|
||||
)
|
||||
try {
|
||||
coroutineScope {
|
||||
val runJob: Deferred<Unit> = async(ioDispatcher) {
|
||||
envelope.work.run(ctx)
|
||||
}
|
||||
currentRunJob.set(runJob)
|
||||
runJob.await()
|
||||
}
|
||||
replaceTask(id) { it.copy(state = TaskRunState.Completed) }
|
||||
cancelRequested.remove(id)
|
||||
} catch (_: CancellationException) {
|
||||
cancelRequested.remove(id)
|
||||
replaceTask(id) { it.copy(state = TaskRunState.Cancelled) }
|
||||
} catch (e: Exception) {
|
||||
cancelRequested.remove(id)
|
||||
replaceTask(id) {
|
||||
it.copy(state = TaskRunState.Failed(e.message ?: e.toString()))
|
||||
}
|
||||
} finally {
|
||||
currentRunJob.set(null)
|
||||
runningTaskId.set(null)
|
||||
emitState(null)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun onRunningProgress(
|
||||
taskId: TaskId,
|
||||
title: String,
|
||||
requiresForeground: Boolean,
|
||||
progress: TaskProgress,
|
||||
) {
|
||||
private fun onRunningProgress(taskId: TaskId, progress: TaskProgress) {
|
||||
replaceTask(taskId) { it.copy(state = TaskRunState.Running(progress)) }
|
||||
if (requiresForeground) {
|
||||
_foregroundUi.value = TaskForegroundUiState.Visible(title, progress)
|
||||
}
|
||||
emitState(taskId)
|
||||
emitState()
|
||||
emitForegroundUiState()
|
||||
}
|
||||
|
||||
override fun enqueue(title: String, requiresForeground: Boolean, work: PipelineWork): TaskId {
|
||||
override fun enqueue(title: String, dispatcher: CoroutineDispatcher, work: PipelineWork): TaskId {
|
||||
val id = TaskId()
|
||||
val task = PipelineTask(
|
||||
id = id,
|
||||
title = title,
|
||||
requiresForeground = requiresForeground,
|
||||
dispatcher = dispatcher,
|
||||
state = TaskRunState.Queued,
|
||||
)
|
||||
synchronized(tasksById) {
|
||||
tasksById[id] = task
|
||||
}
|
||||
emitState(runningTaskId.get())
|
||||
channel.trySend(TaskEnvelope(id, title, requiresForeground, work))
|
||||
emitState()
|
||||
launchTask(id, work)
|
||||
return id
|
||||
}
|
||||
|
||||
@@ -143,23 +77,18 @@ class TaskOrchestrator(
|
||||
val exists = synchronized(tasksById) { tasksById.containsKey(taskId) }
|
||||
if (!exists) return false
|
||||
cancelRequested[taskId] = true
|
||||
if (runningTaskId.get() == taskId) {
|
||||
currentRunJob.get()?.cancel()
|
||||
}
|
||||
runningJobs[taskId]?.cancel()
|
||||
delayedForegroundJobs.remove(taskId)?.cancel()
|
||||
return true
|
||||
}
|
||||
|
||||
override fun cancelCurrent(): Boolean {
|
||||
val id = runningTaskId.get() ?: return false
|
||||
return cancel(id)
|
||||
}
|
||||
|
||||
override fun cancelAll() {
|
||||
val ids = synchronized(tasksById) { tasksById.keys.toList() }
|
||||
for (id in ids) {
|
||||
cancelRequested[id] = true
|
||||
runningJobs[id]?.cancel()
|
||||
delayedForegroundJobs.remove(id)?.cancel()
|
||||
}
|
||||
currentRunJob.get()?.cancel()
|
||||
}
|
||||
|
||||
private fun replaceTask(id: TaskId, fn: (PipelineTask) -> PipelineTask) {
|
||||
@@ -169,16 +98,40 @@ class TaskOrchestrator(
|
||||
}
|
||||
}
|
||||
|
||||
private fun emitState(currentId: TaskId?) {
|
||||
private fun emitState() {
|
||||
val snapshot = synchronized(tasksById) {
|
||||
tasksById.values.toList()
|
||||
}
|
||||
val running = snapshot
|
||||
.asSequence()
|
||||
.filter { it.state is TaskRunState.Running }
|
||||
.map { it.id }
|
||||
.toSet()
|
||||
_pipelineState.value = PipelineState(
|
||||
tasks = snapshot,
|
||||
currentTaskId = currentId ?: runningTaskId.get(),
|
||||
runningTaskIds = running,
|
||||
)
|
||||
}
|
||||
|
||||
private fun emitForegroundUiState() {
|
||||
val visibleItems = synchronized(tasksById) {
|
||||
tasksById.values
|
||||
.filter { visibleForegroundTaskIds.contains(it.id) && it.state is TaskRunState.Running }
|
||||
.map {
|
||||
TaskForegroundItem(
|
||||
taskId = it.id,
|
||||
title = it.title,
|
||||
progress = (it.state as TaskRunState.Running).progress,
|
||||
)
|
||||
}
|
||||
}
|
||||
_foregroundUi.value = if (visibleItems.isEmpty()) {
|
||||
TaskForegroundUiState.Hidden
|
||||
} else {
|
||||
TaskForegroundUiState.Visible(visibleItems)
|
||||
}
|
||||
}
|
||||
|
||||
private fun appendLogLine(level: TaskLogLevel, message: String) {
|
||||
val line = TaskLogLine(
|
||||
timestampMs = System.currentTimeMillis(),
|
||||
@@ -194,12 +147,53 @@ class TaskOrchestrator(
|
||||
}
|
||||
}
|
||||
|
||||
private class TaskEnvelope(
|
||||
val id: TaskId,
|
||||
val title: String,
|
||||
val requiresForeground: Boolean,
|
||||
val work: PipelineWork,
|
||||
)
|
||||
private fun launchTask(taskId: TaskId, work: PipelineWork) {
|
||||
replaceTask(taskId) { it.copy(state = TaskRunState.Running(null)) }
|
||||
emitState()
|
||||
|
||||
val showForegroundJob = scope.launch {
|
||||
delay(FOREGROUND_DELAY_MS)
|
||||
val shouldShow = synchronized(tasksById) {
|
||||
val task = tasksById[taskId] ?: return@synchronized false
|
||||
task.state is TaskRunState.Running
|
||||
}
|
||||
if (shouldShow) {
|
||||
visibleForegroundTaskIds.add(taskId)
|
||||
emitForegroundUiState()
|
||||
}
|
||||
}
|
||||
delayedForegroundJobs[taskId] = showForegroundJob
|
||||
|
||||
val dispatcher = synchronized(tasksById) { tasksById[taskId]?.dispatcher } ?: ioDispatcher
|
||||
val runJob = scope.launch(dispatcher) {
|
||||
val ctx = TaskContextImpl(
|
||||
taskId = taskId,
|
||||
onRunningProgress = { p -> onRunningProgress(taskId, p) },
|
||||
appendLog = { level, msg -> appendLogLine(level, msg) },
|
||||
)
|
||||
try {
|
||||
if (cancelRequested[taskId] == true) {
|
||||
throw CancellationException("Task cancelled before start")
|
||||
}
|
||||
work.run(ctx)
|
||||
replaceTask(taskId) { it.copy(state = TaskRunState.Completed) }
|
||||
} catch (_: CancellationException) {
|
||||
replaceTask(taskId) { it.copy(state = TaskRunState.Cancelled) }
|
||||
} catch (e: Exception) {
|
||||
replaceTask(taskId) {
|
||||
it.copy(state = TaskRunState.Failed(e.message ?: e.toString()))
|
||||
}
|
||||
} finally {
|
||||
cancelRequested.remove(taskId)
|
||||
runningJobs.remove(taskId)
|
||||
delayedForegroundJobs.remove(taskId)?.cancel()
|
||||
visibleForegroundTaskIds.remove(taskId)
|
||||
emitState()
|
||||
emitForegroundUiState()
|
||||
}
|
||||
}
|
||||
runningJobs[taskId] = runJob
|
||||
}
|
||||
|
||||
private class TaskContextImpl(
|
||||
override val taskId: TaskId,
|
||||
@@ -217,5 +211,6 @@ class TaskOrchestrator(
|
||||
|
||||
companion object {
|
||||
private const val MAX_LOG_LINES = 500
|
||||
private const val FOREGROUND_DELAY_MS = 1_000L
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user