Internal Pipeline Machinery

预计阅读时间:7分钟

Description

管道是一种结构,其中包含一系列依次调用的功能(块/ lambda),并按拓扑顺序分阶段分配,并具有使该序列发生突变并调用管道中其余功能然后返回当前块的功能. .

所有功能都是暂停块/ lambda,因此整个管道是异步的.

由于管道包含代码块,因此它们可以嵌套,从而有效地创建子管道.

管道在Ktor中用作扩展机制,以在适当的位置插入功能. 例如,一个Ktor应用程序定义了五个主要阶段:设置,监视,功能,呼叫和后备. 路由功能在应用程序的调用阶段定义了自己的嵌套管道.

API

简化的管道API(没有某些泛型,只有公共成员,没有任何主体)如下所示:

class PipelinePhase(val name: String)
class Pipeline <TSubject : Any, TContext : Any> {
    constructor(vararg phases: PipelinePhase)

    val attributes: Attributes
    
    fun addPhase(phase: PipelinePhase)
    fun insertPhaseAfter(reference: PipelinePhase, phase: PipelinePhase)
    fun insertPhaseBefore(reference: PipelinePhase, phase: PipelinePhase)
    
    fun intercept(phase: PipelinePhase, block: suspend PipelineContext.(TSubject) -> Unit)
    
    fun merge(from: Pipeline)

    suspend fun execute(context: TContext, subject: TSubject): TSubject
}

Phases

阶段是拦截器的组,可以按拓扑排序,定义它们之间的关系.

您可以这样定义自己的管道阶段:

val myPipelinePhase = PipelinePhase("MyPipelinePhase")

您可以在构造管道时注册阶段: Pipeline(phase1, phase2, phase3...)

您还可以稍后通过调用Pipeline.addPhase方法在管道中注册您的阶段,或者通过定义与另一个阶段的关系来注册它,并使用Pipeline.insertPhaseAfterPipeline.insertPhaseBefore方法调整阶段的顺序,从而定义关系以便对阶段进行拓扑排序.

例如,如果定义两个阶段并希望按顺序执行它们,则可以:

val phase1 = PipelinePhase("MyPhase1")
val phase2 = PipelinePhase("MyPhase2")
pipeline.insertPhaseAfter(ApplicationCallPipeline.Features, phase1)
pipeline.insertPhaseAfter(phase1, phase2)

然后,您可以截取阶段,因此将按注册阶段的顺序在该阶段中调用拦截器:

pipeline.intercept(phase1) { println("Phase1[A]") }
pipeline.intercept(phase2) { println("Phase2[A]") }
pipeline.intercept(phase2) { println("Phase2[B]") }
pipeline.intercept(phase1) { println("Phase1[B]") }

将打印:

Phase1[A]
Phase1[B]
Phase2[A]
Phase2[B]

您可以通过调用execute方法,提供上下文和主题来执行管道:

pipeline.execute(context, subject)

您可以在使用insertPhase*方法时省略对addPhase方法的调用,除非您需要注册一个阶段,否则以后可以通过调用Pipeline.merge来注册该阶段.

例如,如果您在路由功能的节点内部定义了一个阶段,然后在内部节点中尝试使用该阶段作为参考来插入阶段,则会得到类似于io.ktor.pipeline.InvalidPhaseException: Phase Phase('YourPhase') was not registered for this pipeline的异常io.ktor.pipeline.InvalidPhaseException: Phase Phase('YourPhase') was not registered for this pipeline .
在这种情况下,您可以仅调用addPhase ,因此在合并之前先引用该阶段.

Interceptors and the PipelineContext

调用Pipeline.intercept您将提供一个附加侦听的阶段,还必须提供一个接收以下代码的函数/ lambda this: PipelineContext ,以便您可以处理所需的任何内容. 在该上下文中,您可以访问适当类型的上下文(通常是ApplicationCall )和可选的Subject ,因此您可以将信息传递给其他拦截器.

上下文API:

class PipelineContext<TSubject : Any, out TContext : Any>() {
    val context: TContext
    val subject: TSubject
    
    fun finish()
    suspend fun proceedWith(subject: TSubject): TSubject
    suspend fun proceed(): TSubject
}

这样,拦截器可以通过以下方式控制流:

  • 引发异常:异常传播回来,并且管道被取消.
  • 调用proceedproceedWith功能:拦截器被挂起,而在执行流水线的其余部分. 完成后,该功能将恢复,并proceed / proceedWith代码块.
  • 调用finish()函数:管道完成,没有任何异常,也没有执行其余的管道.
  • 在其他情况下:调用下一个函数,或者如果它是最后一个函数,则管道完成.

块的顺序首先由安装的阶段顺序确定,然后由安装顺序确定.

阶段在创建管道时定义,并且可以使用pipeline.phases进行扩充以添加更多阶段.

对于以ApplicationCall作为上下文的PipelineContext ,有一个便捷扩展属性call作为context的别名.

The Subject

在执行期间,管道上下文还包含一个主题 :正在处理的任意类型的通用对象TSubject .

可以从上下文将subject作为具有其自己名称的属性进行访问,并且可以在拦截器之间传播主题. 您可以使用PipelineContext.proceedWith(subject)方法更改实例(例如,不可变的主题).

使用此方法时,管道将继续使用新的主题实例,并将返回管道中传递的最后一个实例的调用者,从而有效地允许它在以后的拦截中处理主题.

例如,对于没有主题的管道,您可以使用Unit ,因为ApplicationCallPipeline不需要主题. 它使用Unit .

Merging

相同类型的管道可以合并. 这是通过接收管道上的merge功能完成的.

来自管道合并的所有拦截器根据它们的阶段添加到接收管道.

当可以在不同位置安装拦截器时,将合并管道. 一个示例是可以在应用程序级别,呼叫级别或每个路由处截取的响应管道. 在执行响应管道之前,我们将它们全部合并.

Ktor pipelines

ApplicationCallPipeline

Ktor定义了一个没有主题的管道,而ApplicationCall作为一个上下文,定义了按以下顺序执行的五个阶段( SetupMonitoringFeaturesCallFallback ):

#direction: right
#.call: fill=#af8
#.fallback: fill=#faa dashed
[<call>Call]
[<fallback>Fallback]

[Setup] then -> [Monitoring]
[Monitoring] then -> [Features]
[Features] then -> [Call]
[Call] then -> [Fallback]

拦截每个阶段的目的:

  • Setup :用于准备呼叫及其处理属性的阶段(例如CallId功能)
  • Monitoring :用于跟踪呼叫的阶段:对日志记录,指标,错误处理等(如CallLogging功能) 很有用
  • Features :大多数功能都应拦截此阶段(例如身份验证功能).
  • Call :用于完成Call功能和拦截器,例如路由功能
  • Fallback :以正常方式处理未处理的呼叫并以某种方式解决它们的功能,例如StatusPages功能

代码如下:

open class ApplicationCallPipeline : Pipeline<Unit, ApplicationCall>(Setup, Monitoring, Features, Call, Fallback) {
    val receivePipeline = ApplicationReceivePipeline()
    val sendPipeline = ApplicationSendPipeline()

    companion object {
        val Setup = PipelinePhase("Setup")
        val Monitoring = PipelinePhase("Monitoring")
        val Features = PipelinePhase("Features")
        val Call = PipelinePhase("Call")
        val Fallback = PipelinePhase("Fallback")
    }
}

This base pipeline is used by the Application and the Routing features.

Application

Ktor ApplicationApplicationCallPipeline . 这是用于Web后端应用程序处理http请求的主要管道.

Routing

路由功能定义了一个嵌套的管道,该管道附加到应用程序管道中的"调用"阶段. 您可以通过调用val routing = application.routing {}获得路由根节点管道. Route树中的每个节点定义了自己的管道,此管道随后将按每个路由合并.

通过合并树管道,您可以在树中的某个点定义阶段和拦截,然后将按照阶段关系定义的顺序执行它们.

由于Route节点具有自己的管道,并且合并稍后发生,因此,如果您计划将关系添加到其他祖先Route节点中定义的某些阶段,则必须在特定Route节点中使用Pipeline.addPhase添加它们,以避免io.ktor.pipeline.InvalidPhaseException: Phase Phase('YourPhase') was not registered for this pipeline异常io.ktor.pipeline.InvalidPhaseException: Phase Phase('YourPhase') was not registered for this pipeline .

Other

Ktor还通过其他一些功能定义了其他管道.

Samples

有关示例,请参见PipelineTest .

by  ICOPY.SITE