GTLC全球技术领导力峰会·上海站,首批讲师正式上线! 了解详情
写点什么

Serverless 实战:利用函数计算与对象存储实现 WordCount

2020 年 6 月 16 日

Serverless实战:利用函数计算与对象存储实现WordCount

MapReduce 在百度百科中的解释是:


MapReduce 是一种编程模型,用于大规模数据集(大于 1TB)的并行运算。"Map(映射)"和"Reduce(归约)"是它们的主要思想,都是从函数式编程语言和矢量编程语言借来的特性。它极大地方便了编程人员在不会分布式并行编程的情况下,将自己的程序运行在分布式系统上。 当前的软件实现是指定一个 Map(映射)函数,用来把一组键值对映射成一组新的键值对,指定并发的 Reduce(归约)函数,用来保证所有映射的键值对中的每一个共享相同的键组。


通过这段描述,我们可以明确知道 MapReduce 是面向大数据并行处理的计算模型、框架和平台,在传统学习中,通常会在 Hadoop 等分布式框架下进行 MapReduce 相关工作,随着云计算的逐渐发展,各个云厂商也都先后推出了在线的 MapReduce 业务。


本文我们将通过 MapReduce 模型实现一个简单的 WordCount 算法,区别于传统使用 Hadoop 等大数据框架,我们使用的是对象存储与云函数的结合。


理论基础

在开始之前,我们根据 MapReduce 要求先绘制一个简单的流程图:



在这个结构中,我们需要 2 个云函数分别作 Mapper 和 Reducer,3 个对象存储的存储桶分别作为输入的存储桶、中间临时缓存的存储桶以及结果存储桶。在开始实践前,我们先在广州区准备 3 个对象存储:


对象存储1  ap-guangzhou  srcmr对象存储2  ap-guangzhou  middlestagebucket对象存储3  ap-guangzhou  destcmr
复制代码


为了让整个 Mapper 和 Reducer 逻辑更加清晰,我们先对传统的 WordCount 结构进行改造,使其更加适合云函数,同时合理分配 Mapper 和 Reducer 的工作:



功能实现

编写 Mapper 相关逻辑:


# -*- coding: utf8 -*-import datetimefrom qcloud_cos_v5 import CosConfigfrom qcloud_cos_v5 import CosS3Clientfrom qcloud_cos_v5 import CosServiceErrorimport reimport osimport sysimport logginglogging.basicConfig(level=logging.INFO, stream=sys.stdout)logger = logging.getLogger()logger.setLevel(level=logging.INFO)region = u'ap-guangzhou'  # 根据实际情况,修改地域middle_stage_bucket = 'middlestagebucket'  # 根据实际情况,修改bucket名def delete_file_folder(src):    if os.path.isfile(src):        try:            os.remove(src)        except:            pass    elif os.path.isdir(src):        for item in os.listdir(src):            itemsrc = os.path.join(src, item)            delete_file_folder(itemsrc)        try:            os.rmdir(src)        except:            passdef download_file(cos_client, bucket, key, download_path):    logger.info("Get from [%s] to download file [%s]" % (bucket, key))    try:        response = cos_client.get_object(Bucket=bucket, Key=key, )        response['Body'].get_stream_to_file(download_path)    except CosServiceError as e:        print(e.get_error_code())        print(e.get_error_msg())        return -1    return 0def upload_file(cos_client, bucket, key, local_file_path):    logger.info("Start to upload file to cos")    try:        response = cos_client.put_object_from_local_file(            Bucket=bucket,            LocalFilePath=local_file_path,            Key='{}'.format(key))    except CosServiceError as e:        print(e.get_error_code())        print(e.get_error_msg())        return -1    logger.info("Upload data map file [%s] Success" % key)    return 0def do_mapping(cos_client, bucket, key, middle_stage_bucket, middle_file_key):    src_file_path = u'/tmp/' + key.split('/')[-1]    middle_file_path = u'/tmp/' + u'mapped_' + key.split('/')[-1]    download_ret = download_file(cos_client, bucket, key, src_file_path)  # download src file    if download_ret == 0:        inputfile = open(src_file_path, 'r')  # open local /tmp file        mapfile = open(middle_file_path, 'w')  # open a new file write stream        for line in inputfile:            line = re.sub('[^a-zA-Z0-9]', ' ', line)  # replace non-alphabetic/number characters            words = line.split()            for word in words:                mapfile.write('%s\t%s' % (word, 1))  # count for 1                mapfile.write('\n')        inputfile.close()        mapfile.close()        upload_ret = upload_file(cos_client, middle_stage_bucket, middle_file_key,                                 middle_file_path)  # upload the file's each word        delete_file_folder(src_file_path)        delete_file_folder(middle_file_path)        return upload_ret    else:        return -1def map_caller(event, context, cos_client):    appid = event['Records'][0]['cos']['cosBucket']['appid']    bucket = event['Records'][0]['cos']['cosBucket']['name'] + '-' + appid    key = event['Records'][0]['cos']['cosObject']['key']    key = key.replace('/' + str(appid) + '/' + event['Records'][0]['cos']['cosBucket']['name'] + '/', '', 1)    logger.info("Key is " + key)    middle_bucket = middle_stage_bucket + '-' + appid    middle_file_key = '/' + 'middle_' + key.split('/')[-1]    return do_mapping(cos_client, bucket, key, middle_bucket, middle_file_key)def main_handler(event, context):    logger.info("start main handler")    if "Records" not in event.keys():        return {"errorMsg": "event is not come from cos"}    secret_id = ""     secret_key = ""      config = CosConfig(Region=region, SecretId=secret_id, SecretKey=secret_key, )    cos_client = CosS3Client(config)    start_time = datetime.datetime.now()    res = map_caller(event, context, cos_client)    end_time = datetime.datetime.now()    print("data mapping duration: " + str((end_time - start_time).microseconds / 1000) + "ms")    if res == 0:        return "Data mapping SUCCESS"    else:        return "Data mapping FAILED"

复制代码


同样的方法,建立 reducer.py 文件,编写 Reducer 逻辑:


# -*- coding: utf8 -*-from qcloud_cos_v5 import CosConfigfrom qcloud_cos_v5 import CosS3Clientfrom qcloud_cos_v5 import CosServiceErrorfrom operator import itemgetterimport osimport sysimport datetimeimport loggingregion = u'ap-guangzhou'  # 根据实际情况,修改地域result_bucket = u'destmr'  # 根据实际情况,修改bucket名logging.basicConfig(level=logging.INFO, stream=sys.stdout)logger = logging.getLogger()logger.setLevel(level=logging.INFO)def delete_file_folder(src):    if os.path.isfile(src):        try:            os.remove(src)        except:            pass    elif os.path.isdir(src):        for item in os.listdir(src):            itemsrc = os.path.join(src, item)            delete_file_folder(itemsrc)        try:            os.rmdir(src)        except:            passdef download_file(cos_client, bucket, key, download_path):    logger.info("Get from [%s] to download file [%s]" % (bucket, key))    try:        response = cos_client.get_object(Bucket=bucket, Key=key, )        response['Body'].get_stream_to_file(download_path)    except CosServiceError as e:        print(e.get_error_code())        print(e.get_error_msg())        return -1    return 0def upload_file(cos_client, bucket, key, local_file_path):    logger.info("Start to upload file to cos")    try:        response = cos_client.put_object_from_local_file(            Bucket=bucket,            LocalFilePath=local_file_path,            Key='{}'.format(key))    except CosServiceError as e:        print(e.get_error_code())        print(e.get_error_msg())        return -1    logger.info("Upload data map file [%s] Success" % key)    return 0def qcloud_reducer(cos_client, bucket, key, result_bucket, result_key):    word2count = {}    src_file_path = u'/tmp/' + key.split('/')[-1]    result_file_path = u'/tmp/' + u'result_' + key.split('/')[-1]    download_ret = download_file(cos_client, bucket, key, src_file_path)    if download_ret == 0:        map_file = open(src_file_path, 'r')        result_file = open(result_file_path, 'w')        for line in map_file:            line = line.strip()            word, count = line.split('\t', 1)            try:                count = int(count)                word2count[word] = word2count.get(word, 0) + count            except ValueError:                logger.error("error value: %s, current line: %s" % (ValueError, line))                continue        map_file.close()        delete_file_folder(src_file_path)    sorted_word2count = sorted(word2count.items(), key=itemgetter(1))[::-1]    for wordcount in sorted_word2count:        res = '%s\t%s' % (wordcount[0], wordcount[1])        result_file.write(res)        result_file.write('\n')    result_file.close()    upload_ret = upload_file(cos_client, result_bucket, result_key, result_file_path)    delete_file_folder(result_file_path)    return upload_retdef reduce_caller(event, context, cos_client):    appid = event['Records'][0]['cos']['cosBucket']['appid']    bucket = event['Records'][0]['cos']['cosBucket']['name'] + '-' + appid    key = event['Records'][0]['cos']['cosObject']['key']    key = key.replace('/' + str(appid) + '/' + event['Records'][0]['cos']['cosBucket']['name'] + '/', '', 1)    logger.info("Key is " + key)    res_bucket = result_bucket + '-' + appid    result_key = '/' + 'result_' + key.split('/')[-1]    return qcloud_reducer(cos_client, bucket, key, res_bucket, result_key)def main_handler(event, context):    logger.info("start main handler")    if "Records" not in event.keys():        return {"errorMsg": "event is not come from cos"}    secret_id = "SecretId"     secret_key = "SecretKey"      config = CosConfig(Region=region, SecretId=secret_id, SecretKey=secret_key, )    cos_client = CosS3Client(config)    start_time = datetime.datetime.now()    res = reduce_caller(event, context, cos_client)    end_time = datetime.datetime.now()    print("data reducing duration: " + str((end_time - start_time).microseconds / 1000) + "ms")    if res == 0:        return "Data reducing SUCCESS"    else:        return "Data reducing FAILED"

复制代码


部署与测试

通过Serverless Frameworkyaml规范,编写serveerless.yaml:


WordCountMapper:  component: "@serverless/tencent-scf"  inputs:    name: mapper    codeUri: ./code    handler: index.main_handler    runtime: Python3.6    region: ap-guangzhou    description: 网站监控    memorySize: 64    timeout: 20    events:      - cos:          name: srcmr-1256773370.cos.ap-guangzhou.myqcloud.com          parameters:            bucket: srcmr-1256773370.cos.ap-guangzhou.myqcloud.com            filter:              prefix: ''              suffix: ''            events: cos:ObjectCreated:*            enable: true
WordCountReducer: component: "@serverless/tencent-scf" inputs: name: reducer codeUri: ./code handler: index.main_handler runtime: Python3.6 region: ap-guangzhou description: 网站监控 memorySize: 64 timeout: 20 events: - cos: name: middlestagebucket-1256773370.cos.ap-guangzhou.myqcloud.com parameters: bucket: middlestagebucket-1256773370.cos.ap-guangzhou.myqcloud.com filter: prefix: '' suffix: '' events: cos:ObjectCreated:* enable: true
复制代码


完成之后,通过sls --debug指令进行部署,成功之后进行基本的测试:


  1. 首先准备一个英文文档:



  1. 登录腾讯云后台,打开最初建立的存储桶:srcmr,并上传该文件;

  2. 上传成功之后,稍等片刻就可以看到 Reducer 程序已经在 Mapper 执行之后,产出日志:


此时,打开结果存储桶,查看结果:



这样,我们就完成了简单的词频统计功能。


总结

相对来说,Serverless 架构比较适合做大数据处理,在腾讯云官网对 Serverless 应用场景的描述就包含有数据 ETL 处理:


一些数据处理系统中,常常需要周期性/计划性地处理庞大的数据量。例如:证券公司每 12 小时统计一次该时段的交易情况并整理出该时段交易量 top 5,每天处理一遍秒杀网站的交易流日志获取因售罄而导致的错误从而分析商品热度和趋势等。云函数近乎无限扩容的能力可以使您轻松地进行大容量数据的计算。我们利用云函数可以对源数据并发执行多个 mapper 和 reducer 函数,在短时间内完成工作;相比传统的工作方式,使用云函数更能避免资源的闲置浪费从而节省资金。


通过本实例,希望读者可以对 Serverless 架构的应用场景有更多的启发,了解到 Serverless 不仅仅在监控告警方面有着很好的表现,在大数据领域也不甘落后。在实际生产中,每个项目都不会是单个函数单打独斗的,而是多个函数组合应用,形成一个 Service 体系,所以一键部署多个函数就显得尤为重要。


2020 年 6 月 16 日 18:001254

评论

发布
暂无评论
发现更多内容

初识Golang之函数及方法的多返回值

Kylin

3月日更

云端数智新引擎,腾讯云原生数据湖计算重磅发布

小小的一朵云

大数据 数据湖

Python OpenCV 图像平移,取经之旅第 10 天

梦想橡皮擦

3月日更

金三银四,冰河为你整理了这份20万字134页的面试圣经!!

冰河

面试 面经 offer 金三银四 我要进大厂

Python基础之:Python中的IO

程序那些事

Python 人工智能 数据分析 程序那些事

另类数据:投资中的怪咖

博文视点Broadview

关于企业容器安全问题的思考

阿里巴巴中间件

风暴眼中的“以太坊”堪比堵车的北京东三环,NA公链(Nirvana)NAC公链对垒胜算几何?

区块链第一资讯

区块链

区块链电子合同签署平台,助力企业数字化转型

13828808769

区块链+ #区块链#

大“食”代来临,后厨重地可以更“聪明”点儿

IoT云工坊

人工智能 物联网 PaaS 智慧厨房 智慧餐饮

未来几年,低代码开发平台会如何发展?

优秀

低代码

EGG Network阿凡提的模式是怎么样的?早点了解别错失这个机会!

币圈那点事

区块链

金三银四了!必知必会,HTTP面试题!漫画图解超硬核!

9號

面试 网络编程 网络 HTTP 网络层

关于热力图数据上报清洗,我们做了一个有意思的尝试

阿里巴巴中间件

智能安防监控系统的发展与应用

anyRTC开发者

android 监控 音视频 WebRTC RTC

要求输出事故报告,线上日志文件却不见了!!

陈皮的JavaLib

Java 运维 日志框架

数字化进入深水区

鲸品堂

方法论 数字化 企业数字化转型

MindSpore实践:对篮球运动员目标的检测

华为云开发者社区

深度学习 mindspore 图像检测 yolo 篮球运动

6大创新技术及2亿美元投入计划,这个活动有点料

华为云开发者社区

人工智能 数据库 华为 云原生 HDC.Cloud

是谁拖(慢)了 Redis 的后腿?

escray

redis 极客时间 学习笔记 3月日更 Redis 核心技术与实战

降维打击:数据可视化降本增效,传统制造业价值即将扭转!

一只数据鲸鱼

物联网 数据可视化 工业物联网 数字化运维 3D

鸿蒙源码分析系列(总目录) | 百万汉字注解 百篇博客分析 | 百篇博客分析HarmonyOS源码 | v8.21

鸿蒙内核源码分析

鸿蒙 HarmonyOS 鸿蒙内核源码分析 百篇博客分析鸿蒙 百万汉字注解鸿蒙

区块链产品宗谱链,一款记录族谱的APP

13828808769

区块链+ #区块链#

基于深度学习的端到端通信系统模型

华为云开发者社区

深度学习 端到端 编码器 通信系统 信道模型

临时表的使用

在即

28天写作 28天挑战 3月日更

Knativa 基于流量的灰度发布和自动弹性实践

Serverless Devs

Serverless Kubernetes 运维 云原生 Knative

35岁了,还不知道,TCP为什么会粘包?【硬核图解】

9號

TCP 网络 协议栈 TCP/IP 网络层

聊聊LiteOS中生成的Bin、HEX、ELF三种文件格式

华为云开发者社区

编译器 LiteOS Bin HEX ELF

智慧公安一键扫描二维码报警定位系统

13828808769

智慧交通

基于NB-IoT的智慧路灯监控系统(NB-IoT专栏—实战篇5:手机应用开发)

不脱发的程序猿

物联网 28天写作 3月日更 NB-IoT智慧路灯 手机应用开发

策略枚举:消除在项目里大批量使用if-else的正确姿势

朱季谦

枚举 策略模式

DNSPod与开源应用专场

DNSPod与开源应用专场

Serverless实战:利用函数计算与对象存储实现WordCount-InfoQ