Большая реструктуризация проекта
This commit is contained in:
20
task-runtime/build.gradle.kts
Normal file
20
task-runtime/build.gradle.kts
Normal file
@@ -0,0 +1,20 @@
|
||||
plugins {
|
||||
id("java-library")
|
||||
alias(libs.plugins.jetbrains.kotlin.jvm)
|
||||
}
|
||||
|
||||
java {
|
||||
toolchain {
|
||||
languageVersion = JavaLanguageVersion.of(17)
|
||||
}
|
||||
}
|
||||
|
||||
kotlin {
|
||||
jvmToolchain(17)
|
||||
}
|
||||
|
||||
dependencies {
|
||||
implementation(project(":domain"))
|
||||
implementation(libs.kotlinx.coroutines.core)
|
||||
testImplementation(libs.junit)
|
||||
}
|
||||
@@ -0,0 +1,216 @@
|
||||
package com.github.nullptroma.wallenc.task.runtime
|
||||
|
||||
import com.github.nullptroma.wallenc.domain.tasks.ITaskOrchestrator
|
||||
import com.github.nullptroma.wallenc.domain.tasks.PipelineState
|
||||
import com.github.nullptroma.wallenc.domain.tasks.PipelineTask
|
||||
import com.github.nullptroma.wallenc.domain.tasks.PipelineWork
|
||||
import com.github.nullptroma.wallenc.domain.tasks.TaskContext
|
||||
import com.github.nullptroma.wallenc.domain.tasks.TaskForegroundItem
|
||||
import com.github.nullptroma.wallenc.domain.tasks.TaskForegroundUiState
|
||||
import com.github.nullptroma.wallenc.domain.tasks.TaskId
|
||||
import com.github.nullptroma.wallenc.domain.tasks.TaskLogLevel
|
||||
import com.github.nullptroma.wallenc.domain.tasks.TaskLogLine
|
||||
import com.github.nullptroma.wallenc.domain.tasks.TaskProgress
|
||||
import com.github.nullptroma.wallenc.domain.tasks.TaskRunState
|
||||
import kotlinx.coroutines.CancellationException
|
||||
import kotlinx.coroutines.CoroutineDispatcher
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.Job
|
||||
import kotlinx.coroutines.SupervisorJob
|
||||
import kotlinx.coroutines.delay
|
||||
import kotlinx.coroutines.flow.MutableStateFlow
|
||||
import kotlinx.coroutines.flow.StateFlow
|
||||
import kotlinx.coroutines.flow.asStateFlow
|
||||
import kotlinx.coroutines.launch
|
||||
import java.util.Collections
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
|
||||
class TaskOrchestrator(
|
||||
private val ioDispatcher: CoroutineDispatcher,
|
||||
) : ITaskOrchestrator {
|
||||
|
||||
private val pipelineSupervisor = SupervisorJob()
|
||||
private val scope = CoroutineScope(pipelineSupervisor + ioDispatcher)
|
||||
|
||||
private val tasksById =
|
||||
Collections.synchronizedMap(linkedMapOf<TaskId, PipelineTask>())
|
||||
|
||||
private val cancelRequested = ConcurrentHashMap<TaskId, Boolean>()
|
||||
private val runningJobs = ConcurrentHashMap<TaskId, Job>()
|
||||
private val delayedForegroundJobs = ConcurrentHashMap<TaskId, Job>()
|
||||
private val visibleForegroundTaskIds = Collections.synchronizedSet(linkedSetOf<TaskId>())
|
||||
|
||||
private val _pipelineState = MutableStateFlow(PipelineState(emptyList(), emptySet()))
|
||||
override val pipelineState: StateFlow<PipelineState> = _pipelineState.asStateFlow()
|
||||
|
||||
private val logLock = Any()
|
||||
private val logBuffer = ArrayDeque<TaskLogLine>(MAX_LOG_LINES + 1)
|
||||
private val _logLines = MutableStateFlow<List<TaskLogLine>>(emptyList())
|
||||
override val logLines: StateFlow<List<TaskLogLine>> = _logLines.asStateFlow()
|
||||
|
||||
private val _foregroundUi = MutableStateFlow<TaskForegroundUiState>(TaskForegroundUiState.Hidden)
|
||||
override val foregroundUi: StateFlow<TaskForegroundUiState> = _foregroundUi.asStateFlow()
|
||||
|
||||
private fun onRunningProgress(taskId: TaskId, progress: TaskProgress) {
|
||||
replaceTask(taskId) { it.copy(state = TaskRunState.Running(progress)) }
|
||||
emitState()
|
||||
emitForegroundUiState()
|
||||
}
|
||||
|
||||
override fun enqueue(title: String, dispatcher: CoroutineDispatcher, work: PipelineWork): TaskId {
|
||||
val id = TaskId()
|
||||
val task = PipelineTask(
|
||||
id = id,
|
||||
title = title,
|
||||
dispatcher = dispatcher,
|
||||
state = TaskRunState.Queued,
|
||||
)
|
||||
synchronized(tasksById) {
|
||||
tasksById[id] = task
|
||||
}
|
||||
emitState()
|
||||
launchTask(id, work)
|
||||
return id
|
||||
}
|
||||
|
||||
override fun cancel(taskId: TaskId): Boolean {
|
||||
val exists = synchronized(tasksById) { tasksById.containsKey(taskId) }
|
||||
if (!exists) return false
|
||||
cancelRequested[taskId] = true
|
||||
runningJobs[taskId]?.cancel()
|
||||
delayedForegroundJobs.remove(taskId)?.cancel()
|
||||
return true
|
||||
}
|
||||
|
||||
override fun cancelAll() {
|
||||
val ids = synchronized(tasksById) { tasksById.keys.toList() }
|
||||
for (id in ids) {
|
||||
cancelRequested[id] = true
|
||||
runningJobs[id]?.cancel()
|
||||
delayedForegroundJobs.remove(id)?.cancel()
|
||||
}
|
||||
}
|
||||
|
||||
private fun replaceTask(id: TaskId, fn: (PipelineTask) -> PipelineTask) {
|
||||
synchronized(tasksById) {
|
||||
val cur = tasksById[id] ?: return
|
||||
tasksById[id] = fn(cur)
|
||||
}
|
||||
}
|
||||
|
||||
private fun emitState() {
|
||||
val snapshot = synchronized(tasksById) {
|
||||
tasksById.values.toList()
|
||||
}
|
||||
val running = snapshot
|
||||
.asSequence()
|
||||
.filter { it.state is TaskRunState.Running }
|
||||
.map { it.id }
|
||||
.toSet()
|
||||
_pipelineState.value = PipelineState(
|
||||
tasks = snapshot,
|
||||
runningTaskIds = running,
|
||||
)
|
||||
}
|
||||
|
||||
private fun emitForegroundUiState() {
|
||||
val visibleItems = synchronized(tasksById) {
|
||||
tasksById.values
|
||||
.filter { visibleForegroundTaskIds.contains(it.id) && it.state is TaskRunState.Running }
|
||||
.map {
|
||||
TaskForegroundItem(
|
||||
taskId = it.id,
|
||||
title = it.title,
|
||||
progress = (it.state as TaskRunState.Running).progress,
|
||||
)
|
||||
}
|
||||
}
|
||||
_foregroundUi.value = if (visibleItems.isEmpty()) {
|
||||
TaskForegroundUiState.Hidden
|
||||
} else {
|
||||
TaskForegroundUiState.Visible(visibleItems)
|
||||
}
|
||||
}
|
||||
|
||||
private fun appendLogLine(level: TaskLogLevel, message: String) {
|
||||
val line = TaskLogLine(
|
||||
timestampMs = System.currentTimeMillis(),
|
||||
level = level,
|
||||
message = message,
|
||||
)
|
||||
synchronized(logLock) {
|
||||
if (logBuffer.size >= MAX_LOG_LINES) {
|
||||
logBuffer.removeFirst()
|
||||
}
|
||||
logBuffer.addLast(line)
|
||||
_logLines.value = logBuffer.toList()
|
||||
}
|
||||
}
|
||||
|
||||
private fun launchTask(taskId: TaskId, work: PipelineWork) {
|
||||
replaceTask(taskId) { it.copy(state = TaskRunState.Running(null)) }
|
||||
emitState()
|
||||
|
||||
val showForegroundJob = scope.launch {
|
||||
delay(FOREGROUND_DELAY_MS)
|
||||
val shouldShow = synchronized(tasksById) {
|
||||
val task = tasksById[taskId] ?: return@synchronized false
|
||||
task.state is TaskRunState.Running
|
||||
}
|
||||
if (shouldShow) {
|
||||
visibleForegroundTaskIds.add(taskId)
|
||||
emitForegroundUiState()
|
||||
}
|
||||
}
|
||||
delayedForegroundJobs[taskId] = showForegroundJob
|
||||
|
||||
val dispatcher = synchronized(tasksById) { tasksById[taskId]?.dispatcher } ?: ioDispatcher
|
||||
val runJob = scope.launch(dispatcher) {
|
||||
val ctx = TaskContextImpl(
|
||||
taskId = taskId,
|
||||
onRunningProgress = { p -> onRunningProgress(taskId, p) },
|
||||
appendLog = { level, msg -> appendLogLine(level, msg) },
|
||||
)
|
||||
try {
|
||||
if (cancelRequested[taskId] == true) {
|
||||
throw CancellationException("Task cancelled before start")
|
||||
}
|
||||
work.run(ctx)
|
||||
replaceTask(taskId) { it.copy(state = TaskRunState.Completed) }
|
||||
} catch (_: CancellationException) {
|
||||
replaceTask(taskId) { it.copy(state = TaskRunState.Cancelled) }
|
||||
} catch (e: Exception) {
|
||||
replaceTask(taskId) {
|
||||
it.copy(state = TaskRunState.Failed(e.message ?: e.toString()))
|
||||
}
|
||||
} finally {
|
||||
cancelRequested.remove(taskId)
|
||||
runningJobs.remove(taskId)
|
||||
delayedForegroundJobs.remove(taskId)?.cancel()
|
||||
visibleForegroundTaskIds.remove(taskId)
|
||||
emitState()
|
||||
emitForegroundUiState()
|
||||
}
|
||||
}
|
||||
runningJobs[taskId] = runJob
|
||||
}
|
||||
|
||||
private class TaskContextImpl(
|
||||
override val taskId: TaskId,
|
||||
private val onRunningProgress: (TaskProgress) -> Unit,
|
||||
private val appendLog: (TaskLogLevel, String) -> Unit,
|
||||
) : TaskContext {
|
||||
override suspend fun reportProgress(fraction: Float?, label: String?) {
|
||||
onRunningProgress(TaskProgress(fraction, label))
|
||||
}
|
||||
|
||||
override fun log(level: TaskLogLevel, message: String) {
|
||||
appendLog(level, message)
|
||||
}
|
||||
}
|
||||
|
||||
companion object {
|
||||
private const val MAX_LOG_LINES = 500
|
||||
private const val FOREGROUND_DELAY_MS = 1_000L
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user