¿
最近尝试制作了一个能够在线运行代码的工具: , 踩坑不少, 做一个总结
涉及到的技术/模块如下:
-
服务端推送技术
Server Sent Events
, 下文简称SSE
-
Node.js
模块:child_process
模块stream
模块
-
Docker
几条简单的命令docker run
docker pull
docker kill
-
Koa2
: 顺便用用, 不是核心 -
pug
: 渲染模板
参考
1. SSE 用作服务端推送
严格地说,HTTP 协议无法做到服务器主动推送信息。但是,有一种变通方法, 就是服务器向客户端声明,接下来要发送的是流信息(streaming)。这时, 客户端 不会关闭连接, 而是会一直等着服务器发过来的新数据流。
1.1 SSE 和 WebSocket 对比
相对于WebSocket
, 它有如下的特点:
-
SSE
使用HTTP
协议;WebSocket
是一个独立的协议 -
SSE
使用简单, 轻量级;WebSocket
你肯定要引入socket.io
,ws
之类的库 -
SSE
默认支持断线重连;WebSocket
需要自己实现 -
SSE
一般用来传输文本;WebSocket
默认支持传送二进制数据
如果仅仅需要服务端推送这个功能的话, 使用SSE
的开发成本是最低的, 兼容性如下
1.2 SSE 最简单接入
假设接口地址为/sse
, 服务端代码(以Koa2
为例)为:
const { PassThrough } = require('stream')router.get('/sse', ctx => { const stream = new PassThrough() setInterval(() => { stream.write(`: \n\n`) }, 5000) ctx.set({ 'Content-Type':'text/event-stream' }) ctx.body = stream})复制代码
客户端
const eventSource = new EventSource('/sse')复制代码
把空行算上, 仅仅 10 行代码, 就完成了前后端的SSE
连接, 效果如下
1.3 SSE 事件流格式
事件流仅仅是一个简单的文本流数据,文本应该使用
UTF-8
格式的编码,每条消息后面都有一由一个空行作为分隔符 以冒号,以冒号开头的行为注释行,会被忽略。
注释行可以用来防止连接超时,服务器可以定期发送一条消息注释行,以保持连接不断
这样看不够直接,代码表述如下:
// 发送注释行stream.write(`: \n\n`)// 发送自定义事件test, 数据为字符串: this is a test messagestream.write(`event: test\ndata: this is a test message\n\n`)// 发送自定义事件test1, 数据为一个对象: { msg: 'this is a test message' }stream.write(`event: test1\ndata: ${ JSON.stringify({ msg: 'this is a test message' })}\n\n`)复制代码
客户端监听自定义事件:
const eventSource = new EventSource('/sse')eventSource.addEventListener('test', event => { console.log(event.data) // this is a test message})复制代码
没错, 就是这么简单
2. Code-Runner 具体实现
2.1 整体流程
- 客户端发出
GET /sse HTTP/1.1
请求
- 监听自定义事件
sse-connect
: 获得一个身份标识id - 监听自定义事件
sse-message
: 进度消息的推送, 例如镜像拉取、代码执行开始、代码执行结束 - 监听自定义事件
sse-result
: 代码执行结果的推送
- 用户提交代码
POST /runner HTTP/1.1
- 拉取镜像, 例如:
docker pull node:latest
- 将用户提交代码写入文件, 例如:
/code/main-1.js
- 启动容器, 例如:
docker run --rm --name runner-1 -v /code:/code node:latest node /code/main-1.js
- 根据身份标识id, 将结果写入对应的流
- 关闭容器
2.2 SSE的封装
封装目标:
-
可以根据身份标识
id
获得对应的流 -
对发送自定义事件的封装
-
对保持连接不断的封装
-
维护一个实例表, 便于向对应的流推送消息
const { PassThrough } = require('stream')const instanceMap = new Map()let uid = 0/** * Server Sent Events封装 */module.exports = class SSE { /** * 构造函数中初始化转换流、身份标识、执行初始化方法 */ constructor(options = {}) { this.stream = new PassThrough() this.uid = ++uid this.intervalTime = options.intervalTime || 5000 this._init() } /** * 根据uid获取SSE实例 */ static getInstance(uid) { return instanceMap.get(+uid) } /** * 根据uid发送自定义事件 */ static writeStream(uid, event, data) { const instance = this.getInstance(uid) if (instance) instance.writeStream(event, data) } /** * 初始化函数中记录当前实例, 并保持长连接 */ _init() { instanceMap.set(this.uid, this) this._writeKeepAliveStream() const timer = setInterval(() => { this._writeKeepAliveStream() }, this.intervalTime) this.stream.on('close', () => { clearInterval(timer) instanceMap.delete(this.uid) }) } /** * 通过发送注释消息保持长连接 */ _writeKeepAliveStream() { this.stream.write(': \n\n') } /** * 发送自定义事件 */ writeStream(event, data) { const payload = typeof data === 'string' ? data : JSON.stringify(data) this.stream.write(`event: ${event}\ndata: ${payload}\n\n`) }}复制代码
封装后, /sse
接口代码简化为:
router.get('/sse', ctx => { ctx.set({ 'Content-Type':'text/event-stream', 'Cache-Control':'no-cache', 'Connection': 'keep-alive' }) const sse = new SSE() sse.writeStream('sse-connect', sse.uid) ctx.body = sse.stream })复制代码
2.3 限制容器的使用时长
执行用户代码, 需要限制容器的使用时长, 虽然一直有, 但是最佳的停止容器运行的方式还是docker stop / docker kill
docker stop
: 用docker stop命令来停掉容器的时候,docker默认会允许容器中的应用程序有10秒的时间用以终止运行, 如果等待时间达到设定的超时时间,或者默认的10秒,会继续发送SIGKILL的系统信号强行kill掉进程
docker kill
: 默认情况下,docker kill命令不会给容器中的应用程序有任何gracefully shutdown的机会。 它会直接发出SIGKILL的系统信号,以强行终止容器中程序的运行
因此, 此处采用docker kill
更符合需求, 停止容器的代码如下:
/** * 停止Docker容器 * @description * exec方法中的timeout选项在执行“docker run”命令时无效, 因此采用“docker kill”命令来限制容器使用时长 * 通过docker kill, childProcess exitCode值为137 * @param {string} containerName 容器名称 * @param {number} timeout 限制使用时长 * @return {number} timer */function stopDocker(containerName, timeout) { return setTimeout(async () => { try { await exec(`docker kill ${containerName}`) } catch (e) { } }, timeout)}复制代码
注: 此处child_process.exec
的timeout
选项并不能停止docker
容器
2.4 流的方式获得输出
child_process.exec
方法执行命令返回的结果是buffer/string
, 此处我们需要使用流的方式, 就要使用child_process.spawn
方法
容器启动的部分代码如下:
/** * 启动Docker容器并使用流模式获取输出 * @param {object} dockerOptions 启动docker的配置 */function startDockerBySpawn(dockerOptions) { // 获得容器名, 镜像名, 执行命令, 挂载卷 const { containerName, imageName, execCommand, volume } = dockerOptions // 参数差异 const commandStr = `docker run --rm --memory=50m --name ${containerName} -v ${volume} ${imageName} ${execCommand}` const [command, ...args] = commandStr.split(' ') // 启动容器 const childProcess = spawn(command, args) //...}复制代码
容器启动后, 可以获得两个流:
-
childProcess.stdout
-
childProcess.stderr
我们需要把两个流的数据组合起来, 然后将其转换为SSE
数据格式, 最终写入到目标流targetStream
, 代码如下:
const t = new SSETransform()const transferStation = new PassThrough()childProcess.stdout.pipe(transferStation)childProcess.stderr.pipe(transferStation)transferStation.pipe(t).pipe(targetStream, { end: false })复制代码
自定义的转换流如下:
const { Transform } = require('stream')/** * 自定义转换流 * @description * 将child_process.stdout/stderr的可写流转换为EventStream的格式 */module.exports = class SSETransform extends Transform { constructor(eventName) { super() this.eventName = eventName || 'sse-result' } _transform(chunk, encoding, callback) { callback(null, `event: ${ this.eventName}\ndata: ${ JSON.stringify({ result: chunk.toString('utf8') })}\n\n`) }}复制代码
一次代码执行流程获得的数据如下图所示
至此, 就完成了一个代码在线运行工具的开发