WebSockets

预计阅读时间:6分钟

此功能将WebSockets支持添加到Ktor. WebSocket是一种在服务器和客户端之间保持双向实时有序连接的机制. 该通道中的每条消息都称为"框架":框架可以是文本或二进制消息,也可以是结束或乒乓消息. 帧可以标记为不完整或最终的.

此功能定义在类io.ktor.websocket.WebSockets在神器io.ktor:ktor-websockets:$ktor_version .
dependencies { implementation "io.ktor:ktor-websockets:$ktor_version" }
dependencies { implementation("io.ktor:ktor-websockets:$ktor_version") }
<project> ... <dependencies> <dependency> <groupId>io.ktor</groupId> <artifactId>ktor-websockets</artifactId> <version>${ktor.version}</version> <scope>compile</scope> </dependency> </dependencies> </project>

目录:

Installing

为了使用WebSockets功能,您首先必须安装它:

install(WebSockets)

如果需要,您可以在安装时调整一些参数:

install(WebSockets) {
    pingPeriod = Duration.ofSeconds(60) // Disabled (null) by default
    timeout = Duration.ofSeconds(15)
    maxFrameSize = Long.MAX_VALUE // Disabled (max value). The connection will be closed if surpassed this length. 
    masking = false
}

Usage

安装后,您可以为路由功能定义webSocket路由:

代替短暂的正常路由处理程序,webSocket处理程序应该是长期的. 并且所有相关的WebSocket方法都被挂起,以便在接收或发送消息时以非阻塞方式挂起该功能.

webSocket方法接收带有WebSocketSession实例的回调作为接收者. 该接口定义了incoming (ReceiveChannel)属性和outgoing (SendChannel)属性,以及一个close方法. 检查完整的WebSocketSession以获取更多信息.

Usage as an suspend actor

routing {
    webSocket("/") { // websocketSession
        for (frame in incoming) {
            when (frame) {
                is Frame.Text -> {
                    val text = frame.readText()
                    outgoing.send(Frame.Text("YOU SAID: $text"))
                    if (text.equals("bye", ignoreCase = true)) {
                        close(CloseReason(CloseReason.Codes.NORMAL, "Client said BYE"))
                    }
                }
            }
        }
    }
}

如果客户端显式关闭连接或关闭TCP套接字,则在接收Frame时将引发异常. 因此,即使有一个while (true)循环,也不应该是泄漏.

Usage as a Channel

由于incoming属性是ReceiveChannel,因此可以将其与类似流的接口一起使用:

routing {
    webSocket("/") { // websocketSession
        for (frame in incoming.mapNotNull { it as? Frame.Text }) {
            val text = frame.readText()
            outgoing.send(Frame.Text("YOU SAID $text"))
            if (text.equals("bye", ignoreCase = true)) {
                close(CloseReason(CloseReason.Codes.NORMAL, "Client said BYE"))
            }
        }
    }
}

Interface

The WebSocketSession interface

您将收到一个WebSocketSession作为接收者(此),从而可以直接访问webSocket处理程序中的这些成员.

interface WebSocketSession {
    // Basic interface
    val incoming: ReceiveChannel<Frame> // Incoming frames channel
    val outgoing: SendChannel<Frame> // Outgoing frames channel
    fun close(reason: CloseReason)

    // Convenience method equivalent to `outgoing.send(frame)`
    suspend fun send(frame: Frame) // Enqueue frame, may suspend if the outgoing queue is full. May throw an exception if the outgoing channel is already closed, so it is impossible to transfer any message.

    // The call and the context
    val call: ApplicationCall
    val application: Application

    // Modifiable properties for this request. Their initial value comes from the feature configuration.
    var pingInterval: Duration?
    var timeout: Duration
    var masking: Boolean // Enable or disable masking output messages by a random xor mask.
    var maxFrameSize: Long // Specifies frame size limit. The connection will be closed if violated
    
    // Advanced
    val closeReason: Deferred<CloseReason?>
    suspend fun flush() // Flush all outstanding messages and suspend until all earlier sent messages will be written. Could be called at any time even after close. May return immediately if connection is already terminated.
    fun terminate() // Initiate connection termination immediately. Termination may complete asynchronously.
}

如果您需要有关连接的信息. 例如,客户端IP,您可以访问call属性. 因此,您可以在websocket块中执行诸如call.request.origin.host类的call.request.origin.host .

The Frame interface

一个帧是在WebSocket协议级别发送和接收的每个数据包. 有两种消息类型:TEXT和BINARY. 还有三个控制包:CLOSE,PING和PONG. 每个数据包都有一个有效负载buffer . 对于文本或关闭消息,可以调用readTextreadReason来解释该缓冲区.

enum class FrameType { TEXT, BINARY, CLOSE, PING, PONG }
sealed class Frame {
    val fin: Boolean // Is this frame a final frame?
    val frameType: FrameType // The Type of the frame
    val buffer: ByteBuffer // Payload
    val disposableHandle: DisposableHandle

    class Binary : Frame
    class Text : Frame {
        fun readText(): String
    }
    class Close : Frame {
        fun readReason(): CloseReason?
    }
    class Ping : Frame
    class Pong : Frame
}

Testing

您可以使用withTestApplication块中的handleWebSocketConversation方法来测试WebSocket对话.

test.kt
class MyAppTest {
    @Test
    fun testConversation() {
        withTestApplication {
            application.install(WebSockets)
    
            val received = arrayListOf<String>()
            application.routing {
                webSocket("/echo") {
                    try {
                        while (true) {
                            val text = (incoming.receive() as Frame.Text).readText()
                            received += text
                            outgoing.send(Frame.Text(text))
                        }
                    } catch (e: ClosedReceiveChannelException) {
                        // Do nothing!
                    } catch (e: Throwable) {
                        e.printStackTrace()
                    }
                }
            }
    
            handleWebSocketConversation("/echo") { incoming, outgoing ->
                val textMessages = listOf("HELLO", "WORLD")
                for (msg in textMessages) {
                    outgoing.send(Frame.Text(msg))
                    assertEquals(msg, (incoming.receive() as Frame.Text).readText())
                }
                assertEquals(textMessages, received)
            }
        }
    }
}

FAQ

Standard Events: onConnect, onMessage, onClose and onError

WebSocket API中标准事件如何映射到Ktor?

  • onConnect发生在块的开头.
  • onMessage在成功读取一条消息(例如,使用incoming.receive() )或for(frame in incoming)使用暂停的迭代之后发生.
  • 当关闭incoming通道时,将发生onClose . 这将完成暂停的迭代,或者在尝试接收ClosedReceiveChannelException时抛出ClosedReceiveChannelException.
  • onError等同于其他其他异常.

onCloseonError ,都设置了closeReason属性 .

为了说明这一点:

webSocket("/echo") {
    println("onConnect")
    try {
        for (frame in incoming){
            val text = (frame as Frame.Text).readText()
            println("onMessage")
            received += text
            outgoing.send(Frame.Text(text))
        }
    } catch (e: ClosedReceiveChannelException) {
        println("onClose ${closeReason.await()}")
    } catch (e: Throwable) {
        println("onError ${closeReason.await()}")
        e.printStackTrace()
    }
}

在此示例中,仅在引发异常时退出无限循环: ClosedReceiveChannelException或另一个异常.

by  ICOPY.SITE