10 月 23 - 25 日,QCon 上海站即将召开,现在购票,享9折优惠 了解详情
写点什么

Serverless 实战:利用云函数 + API 网关实现 Websocket 聊天工具

  • 2020-06-16
  • 本文字数:9259 字

    阅读完需:约 30 分钟

Serverless实战:利用云函数 + API网关实现Websocket聊天工具

如果是传统技术栈想要实现 Websocket 会比较容易,但是函数计算由于不支持长连接操作,由事件驱动,所以实现起来会有难度。本文将结合函数计算与 API 网关,尝试由 Websocket 实现一个聊天工具。

API 网关触发器实现 Websocket

WebSocket 协议是基于 TCP 的一种新的网络协议。它实现了浏览器与服务器全双工(full-duplex)通信,即允许服务器主动发送信息给客户端。WebSocket 在服务端有数据推送需求时,可以主动发送数据至客户端。而原有 HTTP 协议的服务端对于需推送的数据,仅能通过轮询或 long poll 的方式来让客户端获得。


由于云函数是无状态且以触发式运行,即在有事件到来时才会被触发,因此,为了实现 WebSocket,需要云函数与 API 网关相结合,通过 API 网关承接、保持与客户端的连接,可以认为 API 网关与 SCF 一起实现了服务端。当客户端有消息发出时,会先传递给 API 网关,再由 API 网关触发云函数执行。当服务端云函数要向客户端发送消息时,会先由云函数将消息 POST 到 API 网关的反向推送链接,再由 API 网关向客户端完成消息的推送。具体的实现架构如下:



对于 WebSocket 的整个生命周期,主要由以下几个事件组成:


  • 连接建立:客户端向服务端请求建立连接并完成连接建立。

  • 数据上行:客户端通过已经建立的连接向服务端发送数据。

  • 数据下行:服务端通过已经建立的连接向客户端发送数据。

  • 客户端断开:客户端要求断开已经建立的连接。

  • 服务端断开:服务端要求断开已经建立的连接。


对于 WebSocket 整个生命周期的事件,云函数和 API 网关的处理过程如下:


  • 连接建立:客户端与 API 网关建立 WebSocket 连接,API 网关将连接建立事件发送给 SCF。

  • 数据上行:客户端通过 WebSocket 发送数据,API 网关将数据转发送给 SCF。

  • 数据下行:SCF 通过向 API 网关指定的推送地址发送请求,API 网关收到后会将数据通过 WebSocket 发送给客户端。

  • 客户端断开:客户端请求断开连接,API 网关将连接断开事件发送给 SCF。

  • 服务端断开:SCF 通过向 API 网关指定的推送地址发送断开请求,API 网关收到后断开 WebSocket 连接。


API 网关与 SCF 之间的交互需要由 3 类云函数来承载:


  • 注册函数:在客户端发起和 API 网关之间建立 WebSocket 连接时触发该函数,通知 SCF WebSocket 连接的 secConnectionID。通常会在该函数记录 secConnectionID 到持久存储中,用于后续数据的反向推送。

  • 清理函数:在客户端主动发起 WebSocket 连接中断请求时触发该函数,通知 SCF 准备断开连接的 secConnectionID。通常会在该函数清理持久存储中记录的该 secConnectionID。

  • 传输函数:在客户端通过 WebSocket 连接发送数据时触发该函数,告知 SCF 连接的 secConnectionID 以及发送的数据。通常会在该函数处理业务数据。例如,是否将数据推送给持久存储中的其他 secConnectionID。

Websocket 功能实现

下图是腾讯云官网提供的整体架构图:



我们可以使用 COS(对象存储)作为持久化的方案,当用户建立链接存储 ConnectionId 到 COS 中,用户断开连接时删除该链接 Id。


注册函数:



# -*- coding: utf8 -*-
import os
from qcloud_cos_v5 import CosConfig
from qcloud_cos_v5 import CosS3Client
bucket = os.environ.get('bucket')
region = os.environ.get('region')
secret_id = os.environ.get('secret_id')
secret_key = os.environ.get('secret_key')
cosClient = CosS3Client(CosConfig(Region=region, SecretId=secret_id, SecretKey=secret_key))

def main_handler(event, context):
print("event is %s" % event)
connectionID = event['websocket']['secConnectionID']
retmsg = {}
retmsg['errNo'] = 0
retmsg['errMsg'] = "ok"
retmsg['websocket'] = {
"action": "connecting",
"secConnectionID": connectionID
}
cosClient.put_object(
Bucket=bucket,
Body='websocket'.encode("utf-8"),
Key=str(connectionID),
EnableMD5=False
)
return retmsg

复制代码


传输函数:



# -*- coding: utf8 -*-
import os
import json
import requests
from qcloud_cos_v5 import CosConfig
from qcloud_cos_v5 import CosS3Client
bucket = os.environ.get('bucket')
region = os.environ.get('region')
secret_id = os.environ.get('secret_id')
secret_key = os.environ.get('secret_key')
cosClient = CosS3Client(CosConfig(Region=region, SecretId=secret_id, SecretKey=secret_key))
sendbackHost = os.environ.get("url")

def Get_ConnectionID_List():
response = cosClient.list_objects(
Bucket=bucket,
)
return [eve['Key'] for eve in response['Contents']]

def send(connectionID, data):
retmsg = {}
retmsg['websocket'] = {}
retmsg['websocket']['action'] = "data send"
retmsg['websocket']['secConnectionID'] = connectionID
retmsg['websocket']['dataType'] = 'text'
retmsg['websocket']['data'] = data
requests.post(sendbackHost, json=retmsg)

def main_handler(event, context):
print("event is %s" % event)
connectionID_List = Get_ConnectionID_List()
connectionID = event['websocket']['secConnectionID']
count = len(connectionID_List)
data = event['websocket']['data'] + "(===Online people:" + str(count) + "===)"
for ID in connectionID_List:
if ID != connectionID:
send(ID, data)
return "send success"

复制代码


清理函数:



# -*- coding: utf8 -*-
import os
import requests
from qcloud_cos_v5 import CosConfig
from qcloud_cos_v5 import CosS3Client
bucket = os.environ.get('bucket')
region = os.environ.get('region')
secret_id = os.environ.get('secret_id')
secret_key = os.environ.get('secret_key')
cosClient = CosS3Client(CosConfig(Region=region, SecretId=secret_id, SecretKey=secret_key))
sendbackHost = os.environ.get("url")

def main_handler(event, context):
print("event is %s" % event)
connectionID = event['websocket']['secConnectionID']
retmsg = {}
retmsg['websocket'] = {}
retmsg['websocket']['action'] = "closing"
retmsg['websocket']['secConnectionID'] = connectionID
requests.post(sendbackHost, json=retmsg)
cosClient.delete_object(
Bucket=bucket,
Key=str(connectionID),
)
return event

复制代码


Yaml 格式如下所示:



Conf:
component: "serverless-global"
inputs:
region: ap-guangzhou
bucket: chat-cos-1256773370
secret_id:
secret_key:
myBucket:
component: '@serverless/tencent-cos'
inputs:
bucket: ${Conf.bucket}
region: ${Conf.region}
restApi:
component: '@serverless/tencent-apigateway'
inputs:
region: ${Conf.region}
protocols:
- http
- https
serviceName: ChatDemo
environment: release
endpoints:
- path: /
method: GET
protocol: WEBSOCKET
serviceTimeout: 800
function:
transportFunctionName: ChatTrans
registerFunctionName: ChatReg
cleanupFunctionName: ChatClean

ChatReg:
component: "@serverless/tencent-scf"
inputs:
name: ChatReg
codeUri: ./code
handler: reg.main_handler
runtime: Python3.6
region: ${Conf.region}
environment:
variables:
region: ${Conf.region}
bucket: ${Conf.bucket}
secret_id: ${Conf.secret_id}
secret_key: ${Conf.secret_key}
url: [](http://set-gwm9thyc.cb-guangzhou.apigateway.tencentyun.com/api-etj7lhtw)
ChatTrans:
component: "@serverless/tencent-scf"
inputs:
name: ChatTrans
codeUri: ./code
handler: trans.main_handler
runtime: Python3.6
region: ${Conf.region}
environment:
variables:
region: ${Conf.region}
bucket: ${Conf.bucket}
secret_id: ${Conf.secret_id}
secret_key: ${Conf.secret_key}
url: [](http://set-gwm9thyc.cb-guangzhou.apigateway.tencentyun.com/api-etj7lhtw)
ChatClean:
component: "@serverless/tencent-scf"
inputs:
name: ChatClean
codeUri: ./code
handler: clean.main_handler
runtime: Python3.6
region: ${Conf.region}
environment:
variables:
region: ${Conf.region}
bucket: ${Conf.bucket}
secret_id: ${Conf.secret_id}
secret_key: ${Conf.secret_key}
url: [](http://set-gwm9thyc.cb-guangzhou.apigateway.tencentyun.com/api-etj7lhtw)

复制代码


需要注意的是,我们要先部署 API 网关,完成之后获得回推地址,将回推地址以 url 的形式写入到对应函数的环境变量中:



从理论上来讲,这里的设计不是很合理,按道理我们是可以通过${restApi.url[0].internalDomain}自动获得 url,但是我并没有成功获得到 url,所以只能先部署 API 网关,获得地址之后,再重新部署。


部署完成之后,我们可以编写 HTML 代码实现可视化的 Websocket Client,其核心的 JavaScript 代码为:



window.onload = function () {
var conn;
var msg = document.getElementById("msg");
var log = document.getElementById("log");
function appendLog(item) {
var doScroll = log.scrollTop === log.scrollHeight - log.clientHeight;
log.appendChild(item);
if (doScroll) {
log.scrollTop = log.scrollHeight - log.clientHeight;
}
}
document.getElementById("form").onsubmit = function () {
if (!conn) {
return false;
}
if (!msg.value) {
return false;
}
conn.send(msg.value);
//msg.value = "";

var item = document.createElement("div");
item.innerText = "发送↑:";
appendLog(item);

var item = document.createElement("div");
item.innerText = msg.value;
appendLog(item);

return false;
};
if (window["WebSocket"]) {
//替换为websocket连接地址
conn = new WebSocket("ws://service-01era6ni-1256773370.gz.apigw.tencentcs.com/release/");
conn.onclose = function (evt) {
var item = document.createElement("div");
item.innerHTML = "<b>Connection closed.</b>";
appendLog(item);
};
conn.onmessage = function (evt) {
var item = document.createElement("div");
item.innerText = "接收↓:";
appendLog(item);

var messages = evt.data.split('\n');
for (var i = 0; i < messages.length; i++) {
var item = document.createElement("div");
item.innerText = messages[i];
appendLog(item);
}
};
} else {
var item = document.createElement("div");
item.innerHTML = "<b>Your browser does not support WebSockets.</b>";
appendLog(item);
}
};

复制代码


完成之后,我们打开两个页面进行测试:


总结

通过云函数 + API 网关进行 Websocket 的实践,绝对不仅仅只是一个聊天工具,它可以实现很多功能,例如通过 Websocket 进行实时日志系统的制作等。


单独的函数计算仅仅是一个计算平台,只有和周边的 BaaS 结合才能展示出 Serverless 架构的价值和真正的能力、意义。这也是为什么很多人说 Serverless=FaaS+BaaS 的一个原因。


2020-06-16 15:455269

评论

发布
暂无评论
发现更多内容

week12学习总结

burner

滴滴云平台事业群——就是稳!

滴滴技术

招聘 滴滴技术 滴滴云平台事业群分享月

物联网的银河,华为的桨,少年的歌

脑极体

1.Flink检查点算法-15

小知识点

scala 大数据 flink

滴滴Ceph分布式存储系统优化之锁优化

滴滴技术

云计算 分布式存储 Ceph 滴滴技术

Redis做消息队列全攻略

架构师修行之路

redis MQ 消息队列

c语言函数指针之回调函数

C语言与CPP编程

C语言 回调函数 函数 函数指针

在Rust里面嵌入python代码

lipi

Python rust

浅析LR.Net工作流引擎

Learun

.net 敏捷开发 工作流

隐私计算会成为“金融”向“数科”转型的一剂猛药?

hellompc

滴滴推理引擎IFX:千万规模设备下AI部署实践

滴滴技术

人工智能 学习 AI 滴滴技术 IFX

可编程网卡芯片在滴滴云网络的应用实践

滴滴技术

云计算 芯片 滴滴技术

拥抱K8S系列-03-服务器部署应用和docker部署应用区别(MySQL篇)

张无忌

MySQL Docker 运维

实时数仓在滴滴的实践和落地

滴滴技术

大数据 滴滴技术 数据通道服务

滴滴数据仓库指标体系建设实践

滴滴技术

大数据 数据仓库 滴滴技术

合约跟单系统开发,数字货币合约跟单软件搭建

13530558032

区块链技术成为金融业务应用热点

CECBC

区块链 人工智能 金融

GPU虚拟机创建时间深度优化

滴滴技术

云计算 虚拟化 滴滴技术

滴滴数据通道服务演进之路

滴滴技术

大数据 滴滴技术 数据服务通道

分布式QoS算法解析

焱融科技

分布式 算法 焱融科技 分布式文件存储 QoS

【Spring注解驱动开发】AOP核心类源码解析,这是最全的一篇了!!

冰河

spring aop ioc

Zeppelin SDK :Flink 平台建设的基石

Apache Flink

flink

迭代技术方案设计文档规范

程序员架构进阶

技术方案

第 0 期架构师训练营第 8 周作业 1

fujin

第 0 期架构师训练营第 8 周作业2-总结

fujin

突破传统 区块链如何实现病历永存

CECBC

区块链 电子病历 信息共享

基于Prometheus的微服务应用监控

易观大数据

自定义线程池来实现文档转码

架构师修行之路

滴滴ElasticSearch千万级TPS写入性能翻倍技术剖析

滴滴技术

大数据 elasticsearch 滴滴技术

滴滴七层接入平台实践和探索

滴滴技术

微服务 运维 滴滴技术 七层接入

数据分析之伯克森谬误:颜值和性格真成反比吗

KAMI

人生 数据分析 数据

Serverless实战:利用云函数 + API网关实现Websocket聊天工具_服务革新_刘宇_InfoQ精选文章