NVIDIA 初创加速计划,免费加速您的创业启动 了解详情
写点什么

Serverless 架构下如何实现日志的实时输出?

  • 2020-07-31
  • 本文字数:9705 字

    阅读完需:约 32 分钟

Serverless架构下如何实现日志的实时输出?

Serverless 白皮书中曾描述过 Serverless 的一些缺点,例如难以调试、冷启动严重等等。其中难以调试是表现在多个方面的,有一个方面是日志输出。


当我们把 Serverless 架构应用于实际项目,就会发现调试成为了效率的重要影响因素。以日志输出为例,某个函数被触发之后未得到预期结果,大家第一想法就是查看日志,但这时输出的日志可能并未是我们想要的,而且云厂商输出日志的延时也非常高。

日志输出现状

以腾讯云云函数为例,我们可以看一下其日志输出情况:


  • 通过控制台或者是云 API 的 Invoke 接口触发云函数:



通过这个测试功能,可以很快获取到函数的结果,并查看日志信息。


  • 通过 API 网关、COS 等触发云函数,此处以 API 网关为例:


通过网关触发一个函数:



通过函数日志查看何时会刷出这个日志:



这个过程大概有 11S,通过代码来进行更加详细的测试:


import json,timefrom tencentcloud.common import credentialfrom tencentcloud.common.profile.client_profile import ClientProfilefrom tencentcloud.common.profile.http_profile import HttpProfilefrom tencentcloud.common.exception.tencent_cloud_sdk_exception import TencentCloudSDKExceptionfrom tencentcloud.scf.v20180416 import scf_client, modelstry:    cred = credential.Credential("", "")    httpProfile = HttpProfile()    httpProfile.endpoint = "scf.tencentcloudapi.com"
clientProfile = ClientProfile() clientProfile.httpProfile = httpProfile client = scf_client.ScfClient(cred, "ap-guangzhou", clientProfile)
req = models.InvokeRequest() params = '{"FunctionName":"test"}' req.from_json_string(params)
resp = client.Invoke(req) functionRequestId = json.loads(resp.to_json_string())["Result"][ "FunctionRequestId"]
print(time.time(), functionRequestId)
while True: time.sleep(0.2) req = models.GetFunctionLogsRequest() params = '{"FunctionName":"test"}' req.from_json_string(params)
resp = client.GetFunctionLogs(req) if functionRequestId in str(resp.to_json_string()): break
print(time.time())

except TencentCloudSDKException as err: print(err)

复制代码


输出结果:


1584108001.141546 ee7243dd-6532-11ea-8bce-5254000c8aa41584108005.2496068
复制代码


这次输出结果是 4S,再做一个多次调用的时间对比图:


import jsonimport timeimport numpyimport matplotlib.pyplot as pltfrom tencentcloud.common import credentialfrom tencentcloud.common.profile.client_profile import ClientProfilefrom tencentcloud.common.profile.http_profile import HttpProfilefrom tencentcloud.common.exception.tencent_cloud_sdk_exception import TencentCloudSDKExceptionfrom tencentcloud.scf.v20180416 import scf_client, models
try: cred = credential.Credential("", "") httpProfile = HttpProfile() httpProfile.endpoint = "scf.tencentcloudapi.com"
clientProfile = ClientProfile() clientProfile.httpProfile = httpProfile client = scf_client.ScfClient(cred, "ap-guangzhou", clientProfile)
timeList = [] for i in range(0, 100): req = models.InvokeRequest() params = '{"FunctionName":"test"}' req.from_json_string(params)
resp = client.Invoke(req) functionRequestId = json.loads(resp.to_json_string())["Result"]["FunctionRequestId"]
startTime = int(time.time())
while True: time.sleep(0.2) req = models.GetFunctionLogsRequest() params = '{"FunctionName":"test"}' req.from_json_string(params)
resp = client.GetFunctionLogs(req) if functionRequestId in str(resp.to_json_string()): break
endTime = int(time.time()) timeList.append(endTime - startTime)
print("最大时间", int(max(timeList))) print("最小时间", int(min(timeList))) print("平均时间", int(numpy.mean(timeList)))
plt.figure() plt.subplot(2, 1, 1) x_data = range(0, len(timeList)) plt.plot(x_data, timeList) plt.subplot(2, 1, 2) plt.hist(timeList, bins=20) plt.show()

except TencentCloudSDKException as err: print(err)

复制代码


这是比较差的一段代码,耗时很久,可以考虑加入队列,一方面多进程在队列面加入执行的 RequestId,一方面消费 RequestId,进入到获取 Logs 的对象中,速度可以大大提升。但是无论如何,运行结果如下:


最大时间 31最小时间 0平均时间 17
复制代码



通过这个结果,我们发现日志输出有两个问题:


  • 时间频率不固定,通过数据可以看到,快的话可能几秒就出结果,慢的话可能十几秒,二十几秒,甚至三十几秒;

  • 日志普遍输出速度很慢,会严重影响定位问题;


就目前的腾讯云 Serverless 架构而言,如果要在本地开发一个项目,并在本地进行了初步的调试,就算一切正常,也并不能保证在线上完全可用,尤其在复杂的触发器环境下以及复杂的对象复用、内网资源使用的前提下,本地调试的难度非常大,很难完整模拟出线上的环境。


以 API 网关触发器为例,当本地写完代码,调试完成部署线上,通过 API 网关触发一次,发现函数代码不能正常运行,这个时候的第一想法是什么?查看日志,看一下打印的日志有哪些问题,是不是通过日志可以判断出问题。很遗憾的告诉你,你可能要等几秒钟,十几秒钟,甚至二十几秒,三十秒。

自建日志输出功能

通过刚才的分析,我们可以知道,在线上触发函数的时候,日志入库的速度非常缓慢,而且极其不稳定,一定条件下会严重影响开发进度以及问题定位的进度。为了解决这个问题,我们可以通过 Serverless 架构,封装一套实时日志功能:



在这个操作过程中,主要使用一个 API 网关作为 Websocket 与客户端建立链接,三个函数(注册函数,上报函数,清理函数)与 API 搭配使用,存储桶作为部分资源的临时存储。


整个流程大概可以描述为:


  1. 客户端决定开启实时日志,并将要监控的函数信息(包括地域,命名空间,函数名)作为参数,与 API 网关建立 Websocket 链接;

  2. API 网关建立 Websocket 链接的时候,会触发注册函数,此时注册函数会将 RequestId(ConnectionId)与函数信息以 Key-Value 存储到对象存储中;

  3. 根据函数信息找到对应的函数,将回推地址以及 ConnectionId 写到函数环境变量中;

  4. 此时函数只要被触发,就会先读取环境变量,根据环境变量决定是否将函数日志上报到指定地址(即带着 connectionId 发送到回推地址);

  5. 上报函数收到业务函数传递过来的数据,将数据发送到指定的 ConnectionId 的客户端,实现实时日志的输出;

  6. 当客户端断开连接之后,会触发清理函数;

  7. 清理函数会清理掉业务函数中的回推地址和 ConnectionId 等信息,清理之后,业务函数再被触发,则会因为读取不到该参数,而不会上报数据;

  8. 将根据 RequestId(ConnectionId)从对象存储删除,至此完成一次日志实时输出功能;


由于腾讯云的 API 网关限制,所以该功能每次最长只能执行 900s,900s 之后需要重新执行该程序。


API 网关涉及到的三个函数:


  • 注册函数:主要用来完成数据存储和函数信息修改等操作,是用户建立链接时触发的函数;


# -*- coding: utf8 -*-
import json, osfrom qcloud_cos_v5 import CosConfigfrom qcloud_cos_v5 import CosS3Clientfrom tencentcloud.common import credentialfrom tencentcloud.scf.v20180416 import scf_client, models

def setFunction2Bucket(name, namespace, secretId, secretKey, token, connid): region = os.environ.get("bucket_region") config = CosConfig(Region=region, SecretId=secretId, SecretKey=secretKey, Token=token) client = CosS3Client(config) response = client.put_object( Bucket=os.environ.get("bucket"), Body=json.dumps({ "region": region, "namespace": namespace, "function": name }).encode("utf-8"), Key=connid, EnableMD5=False ) return response

def setFunctionConfigure(name, namespace, region, secreetId, secretKey, token, connid, transurl): try: environmentVariablesList = [ { "Key": "real_time_log_id", "Value": connid }, { "Key": "real_time_log_url", "Value": transurl }, { "Key": "real_time_log", "Value": "open" } ] cred = credential.Credential(secreetId, secretKey, token=token) client = scf_client.ScfClient(cred, region)
req = models.GetFunctionRequest() req.from_json_string(json.dumps({"FunctionName": name, "Namespace": namespace, "ShowCode": "FALSE"})) resp = client.GetFunction(req) environmentVariables = json.loads(resp.to_json_string())["Environment"]["Variables"] for eveVariables in environmentVariables: if eveVariables["Key"] == "real_time_log_id" or eveVariables["Key"] == "real_time_log_url" or eveVariables["Key"] == "real_time_log": continue environmentVariablesList.append(eveVariables)
req = models.UpdateFunctionConfigurationRequest() req.from_json_string(json.dumps({"FunctionName": name, "Environment": { "Variables": environmentVariablesList }, "Namespace": namespace})) client.UpdateFunctionConfiguration(req)
setFunction2Bucket(name, namespace, secreetId, secretKey, token, connid) return True except Exception as e: print(e) return False

def main_handler(event, context): print("event is: ", event)
connectionID = event['websocket']['secConnectionID'] if not setFunctionConfigure( event['queryString']['name'], event['queryString']['namespace'], event['queryString']['region'], os.environ.get("TENCENTCLOUD_SECRETID"), os.environ.get("TENCENTCLOUD_SECRETKEY"), os.environ.get("TENCENTCLOUD_SESSIONTOKEN"), connectionID, os.environ.get("url") ): return False
if 'requestContext' not in event.keys(): return {"errNo": 101, "errMsg": "not found request context"} if 'websocket' not in event.keys(): return {"errNo": 102, "errMsg": "not found web socket"}
retmsg = {} retmsg['errNo'] = 0 retmsg['errMsg'] = "ok" retmsg['websocket'] = { "action": "connecting", "secConnectionID": connectionID }
if "secWebSocketProtocol" in event['websocket'].keys(): retmsg['websocket']['secWebSocketProtocol'] = event['websocket']['secWebSocketProtocol'] if "secWebSocketExtensions" in event['websocket'].keys(): ext = event['websocket']['secWebSocketExtensions'] retext = [] exts = ext.split(";") print(exts) for e in exts: e = e.strip(" ") if e == "permessage-deflate": pass if e == "client_max_window_bits": pass retmsg['websocket']['secWebSocketExtensions'] = ";".join(retext)
print("connecting: connection id:%s" % event['websocket']['secConnectionID']) return retmsg

复制代码


  • 上报函数:用户开启实时日志成功之后,业务函数上报数据。


# -*- coding: utf8 -*-import osimport jsonimport requests

def main_handler(event, context): try: print("event is: ", event)
body = json.loads(event["body"])
url = os.environ.get("url")
retmsg = {} retmsg['websocket'] = {} retmsg['websocket']['action'] = "data send" retmsg['websocket']['secConnectionID'] = body["coid"] retmsg['websocket']['dataType'] = 'text' retmsg['websocket']['data'] = body["data"] print(retmsg) requests.post(url, json=retmsg)
return True except Exception as e: return False

复制代码


  • 清理函数:客户端关闭链接时触发的函数,部分操作是注册函数的逆操作。


# -*- coding: utf8 -*-
import json, osimport requestsfrom qcloud_cos_v5 import CosConfigfrom qcloud_cos_v5 import CosS3Clientfrom tencentcloud.common import credentialfrom tencentcloud.scf.v20180416 import scf_client, models

def setFunctionConfigure(name, namespace, region, secreetId, secretKey, token): try: environmentVariablesList = [{ "Key": "real_time_log", "Value": "close" }] cred = credential.Credential(secreetId, secretKey, token=token) client = scf_client.ScfClient(cred, region)
req = models.GetFunctionRequest() params = json.dumps({"FunctionName": name, "Namespace": namespace, "ShowCode": "FALSE"}) req.from_json_string(params)
resp = client.GetFunction(req) environmentVariables = json.loads(resp.to_json_string())["Environment"]["Variables"]
for eveVariables in environmentVariables: if eveVariables["Key"] == "real_time_log_id" or eveVariables["Key"] == "real_time_log_url" or eveVariables["Key"] == "real_time_log": continue environmentVariablesList.append(eveVariables)
print(environmentVariablesList) req = models.UpdateFunctionConfigurationRequest() params = json.dumps({"FunctionName": name, "Environment": { "Variables": environmentVariablesList }, "Namespace": namespace}) req.from_json_string(params)
resp = client.UpdateFunctionConfiguration(req) print(resp.to_json_string()) return True except Exception as e: print(e) return False

def main_handler(event, context): print("event is: ", event)
connectionID = event['websocket']['secConnectionID']
region = os.environ.get("bucket_region") secreetId = os.environ.get("TENCENTCLOUD_SECRETID") secretKey = os.environ.get("TENCENTCLOUD_SECRETKEY") token = os.environ.get("TENCENTCLOUD_SESSIONTOKEN") config = CosConfig(Region=region, SecretId=secreetId, SecretKey=secretKey, Token=token) client = CosS3Client(config) response = client.get_object( Bucket=os.environ.get("bucket"), Key=connectionID, ) response['Body'].get_stream_to_file('/tmp/connid.json') with open('/tmp/connid.json') as f: data = json.loads(f.read())
if not setFunctionConfigure( data["function"], data["namespace"], data["region"], secreetId, secretKey, token, ): return False
retmsg = {} retmsg['websocket'] = {} retmsg['websocket']['action'] = "closing" retmsg['websocket']['secConnectionID'] = connectionID requests.post(os.environ.get("url"), json=retmsg) return retmsg

复制代码


业务函数上报数据的逻辑,实际上就是修改常见组件的日志方法,以 Python 为例,例如重写print()方法以及logging组件:


重写print()


# -*- coding: utf8 -*-
import osimport sysimport jsonimport urllib.parseimport urllib.request

def print(*args): url = os.environ.get("real_time_log_url") cid = os.environ.get("real_time_log_id") if url and cid and os.environ.get("real_time_log_id", None): try: retmsg = { "coid": cid, "data": " ".join([str(eveObject) for eveObject in args]) } urllib.request.urlopen( urllib.request.Request( url=url, data=json.dumps(retmsg).encode("utf-8") ) ) except Exception as e: sys.stdout.write("Debug Error:" + str(e)) sys.stdout.write("aaa"+ " ".join([str(eveObject) for eveObject in args]) + "\n")
复制代码


logging进行额外的处理,将文件中的log/info…等接口增加上报逻辑,例如:


def warning(msg, *args, **kwargs):    """    Log a message with severity 'WARNING' on the root logger. If the logger has    no handlers, call basicConfig() to add a console handler with a pre-defined    format.    """    realTimeLogs("WARNING %s %s"%(str(msg), " ".join([str(eveObject) for eveObject in args])))    if len(root.handlers) == 0:        basicConfig()    root.warning(msg, *args, **kwargs)
复制代码


上报逻辑:


def realTimeLogs(data):    url = os.environ.get("real_time_log_url")    cid = os.environ.get("real_time_log_id")    if url and cid and os.environ.get("real_time_log_id", None):        try:            retmsg = {                "coid": cid,                "data": data            }            urllib.request.urlopen(                urllib.request.Request(                    url=url,                    data=json.dumps(retmsg).encode("utf-8")                )            )        except Exception as e:            sys.stdout.write("Debug Error:" + str(e))
复制代码

封装成工具

  • 将重写部分封装成客户端工具

  • 将线上函数部分封装成 Component


封装成工具后的整体使用流程:

组件的安装与配置

  • 安装scflog


npm install scflog
复制代码


  • 部署实时日志组件,新建项目,并且建立serverless.yaml,内容:


PythonLogs:  component: '@gosls/tencent-pythonlogs'  inputs:    region: ap-guangzhou
复制代码


通过sls --debug部署:


DEBUG ─ Setting tags for function PythonRealTimeLogs_CleanupDEBUG ─ Creating trigger for function PythonRealTimeLogs_CleanupDEBUG ─ Deployed function PythonRealTimeLogs_Cleanup successful
PythonLogs: websocket: ws://service-laabz6zm-1256773370.gz.apigw.tencentcs.com/test/python_real_time_logs 26s › PythonLogs › done

复制代码


配置组件:


scflog set -w ws://service-laabz6zm-1256773370.gz.apigw.tencentcs.com/test/python_real_time_logs
复制代码


配置成功输出:


DFOUNDERLIU-MB0:~ dfounderliu$ scflog set -w ws://service-laabz6zm-1256773370.gz.apigw.tencentcs.com/test/python_real_time_logs设置成功  websocket: ws://service-laabz6zm-1256773370.gz.apigw.tencentcs.com/test/python_real_time_logs  region: ap-guangzhou  namespace: default
复制代码

函数的初始化与部署

在项目中使用该组件的方法很简单。


  • 创建一个文件夹,并进入


mkdir scflogs && cd scflogs


  • 初始化项目


scflog init -l python


  • 创建index.py文件以及serverless.yaml文件:


vim index.py
复制代码


内容是:


from logs import *import timeimport logging
def main_handler(event, context): print("event is: ", event) time.sleep(1) logging.debug("this is debug_msg") time.sleep(1) logging.info("this is info_msg") time.sleep(1) logging.warning("this is warning_msg") time.sleep(1) logging.error("this is error_msg") time.sleep(1) logging.critical("this is critical_msg") time.sleep(1) print("context is: ", event) return "hello world"

复制代码


vim serverless.yaml
复制代码


内容是:


Hello_World:  component: "@serverless/tencent-scf"  inputs:    name: Hello_World    codeUri: ./    handler: index.main_handler    runtime: Python3.6    region: ap-guangzhou    description: My Serverless Function    memorySize: 64    timeout: 20    exclude:      - .gitignore      - .git/**      - node_modules/**      - .serverless      - .env    events:      - apigw:          name: serverless          parameters:            protocols:              - http            serviceName: serverless            description: the serverless service            environment: release            endpoints:              - path: /test                method: ANY

复制代码


通过sls --debug部署:


DEBUG ─ Deployed function Hello_World successful
Hello_World: Name: Hello_World Runtime: Python3.6 Handler: index.main_handler MemorySize: 64 Timeout: 20 Region: ap-guangzhou Namespace: default Description: My Serverless Function APIGateway: - serverless - http://service-89bjzrye-1256773370.gz.apigw.tencentcs.com/release
30s › Hello_World › done

复制代码

实时日志功能的测试

配置 APIGW 的触发器,地址是上面输出的地址 + endpoints 中的 path:


http://service-89bjzrye-1256773370.gz.apigw.tencentcs.com/release/test
复制代码


打开实时日志:


scflog logs -n Hello_World -r ap-guangzhou
复制代码


提醒实时日志开启成功:


DFOUNDERLIU-MB0:~ dfounderliu$ scflog logs -n Hello_World -r ap-guangzhou实时日志开启 ... 
复制代码


用浏览器通过刚才函数部署完成返回的地址触发函数:


实时日志开启 ... [2020-03-04 16:36:08] :  ......}[2020-03-04 16:36:09] :  DEBUG debug_msg [2020-03-04 16:36:10] :  INFO info_msg [2020-03-04 16:36:11] :  WARNING warning_msg [2020-03-04 16:36:14] :  ERROR error_msg [2020-03-04 16:36:14] :  CRITICAL critical_msg [2020-03-04 16:36:16] :  context is: .......}.......
复制代码


至此,实现实时日志功能。

总结

Serverless 架构虽然拥有很多优势,但是同时也有劣势,没有什么事情是完美的,Serverless 架构也是如此。在 Serverless 架构下,日志的实时性确实是一个问题,这个问题不仅仅是我们可能要等十几秒才能看到日志,而且会影响开发效率、维护效率以及问题定位效率,但是我们可以通过自身来实现这样的功能,通过 API 网关的 Websocket 能力,通过云函数的与 API 网关的结合,构建一个实时日志的系统。


2020-07-31 16:332394

评论

发布
暂无评论
发现更多内容

亚马逊店铺引流:海外云手机的利用方法

Ogcloud

云手机 海外云手机 云手机海外版 国外云手机 美国云手机

我们是如何测试人工智能的(七)智能客服系统拆解与测试方法

测试人

人工智能 软件测试

我们是如何测试人工智能的(七)包含大模型的企业级智能客服系统拆解与测试方法 – 知识引擎

测试人

人工智能 软件测试 自动化测试 测试开发

Mistral Large模型现已在Amazon Bedrock上正式可用

财见

我们是如何测试人工智能的(六)推荐系统拆解

测吧(北京)科技有限公司

测试

2024 年“和鲸杯”辽宁省普通高等学校本科大学生计算机设计竞赛启动会圆满结束!

ModelWhale

人工智能 大数据 大学生竞赛

Sam Altman 联手苹果前首席设计官打造 AI 设备;特斯拉将推出无人驾驶出租车丨 RTE 开发者日报 Vol.178

声网

企业级依赖管理: 深入解读 Maven BOM

LightGao

maven 设计模式 架构设计 软件系统 java 架构

一文读懂模块化赛道新的头部公链Meta Earth

加密眼界

我们是如何测试人工智能的(三)数据构造与性能测试篇

测吧(北京)科技有限公司

测试

今日分享丨单点登录原理及OAuth20授权码协议

inBuilder低代码平台

低代码 单点登录

Digital Realty 将人工智能驱动的能效平台扩展至亚太地区

财见

我们是如何测试人工智能的(八)包含大模型的企业级智能客服系统拆解与测试方法 – 大模型 RAG

测吧(北京)科技有限公司

测试

为什么中小企业普遍选择IT运维外包了?

Ogcloud

IT运维 IT外包 IT外包公司 IT外包服务 IT运维外包

一文读懂模块化赛道新的头部公链Meta Earth

大瞿科技

TCL实业盘古实验室发布全域光晕控制等多项创新显示技术

Geek_2d6073

数仓调优实战:GUC参数调优

华为云开发者联盟

数据库 华为云 华为云开发者联盟 华为云GaussDB(DWS) 企业号2024年4月PK榜

我们是如何测试人工智能的(四)补充:模型全生命周期流程与测试图

测吧(北京)科技有限公司

测试

零信任安全模型:构建未来数字世界的安全基石

GousterCloud

零信任

我们是如何测试人工智能的(五)案例介绍:ASR 效果测试介绍

测吧(北京)科技有限公司

测试

天翼云超大规模高性能云基础底座、“息壤”获国资委权威认可!

编程猫

浪潮信息发布全球首个单存储16节点SAP HANA集群方案

财见

建设智慧公厕有什么好处?都有哪些功能?

光明源智慧厕所

广东智慧公厕管理系统哪家好

光明源智慧厕所

IT外包服务助推企业产业融通

Ogcloud

IT IT外包 IT外包公司 IT外包服务 IT外包企业

【IoTDB 线上小课 01】我们聊聊“金三银四”下的开源

Apache IoTDB

我们是如何测试人工智能的(七)包含大模型的企业级智能客服系统拆解与测试方法 – 知识引擎

测吧(北京)科技有限公司

测试

KaiwuDB 成功入选《2023 ToB 行业影响力价值榜 · 创新力产品榜》

KaiwuDB

数据库

和鲸科技入选 2023 年度中国高科技高成长企业系列榜单丨第一新声 & 天眼查

ModelWhale

大数据 #人工智能 人工智能公司

2024年智慧厕所解决方案,光明源智能科技是怎么实现的。

光明源智慧厕所

我们是如何测试人工智能的(二)数据挖掘篇

测吧(北京)科技有限公司

测试

Serverless架构下如何实现日志的实时输出?_服务革新_刘宇_InfoQ精选文章