使用 AWS Sagemaker 部署的终端节点进行推荐预测的常用场景

阅读数:55 2019 年 10 月 8 日 10:21

使用AWS Sagemaker部署的终端节点进行推荐预测的常用场景

上次我们初步介绍了使用 SageMaker 快速训练和部署 Factorization Machines 模型,接下来我们利用 Endpoint 进行预测模拟的实际用例。

利用 Endpoint 进行预测模拟的实际用例

考虑几个常见的实际用例在生产环境中的推荐系统

  • 从参与训练的电影库(26004 部)中推荐 10 部给已存在的用户;
  • 从参与训练的电影库(26004 部)中推荐 10 部给新用户;
  • 应用内容筛选并推荐 10 部电影给已存在的用户;

1. 从参与训练的电影库(26004 部)中推荐 10 部给已存在的用户

首先我们随机选择一位用户,本例中该用户 ID 是 100020528。观察该用户在训练集中点评过的电影,共计 4 部。

In [56]:

复制代码
#select 100020528 as a random user
for k in df[df['UserId']=='100020528'].index:
print(df_movie_all_db[df_movie_all_db['MovieID']==str(df.at[k,'MovieID'])].Title.values[0], " ", df.at[k,'Rate'])

苏醒 6

推动情人的床 6

录影片段 4

雷德怒潮 6

为这一用户生成预测数据的稀疏矩阵。

In [7]:

复制代码
#revise the loadDataset() to accept the userID as an input.
def createInferenceForUser(UserId, lines, columns):
user_number = 53902
X = scipy.sparse.lil_matrix((lines, columns)).astype('float32')
line = 0
for line in range(0, lines):
X[line, u_Dict[UserId]] = 1
X[line, user_number+line] = 1
line=line+1
print(X.shape)
return X

In [8]:

复制代码
user_inference = createInferenceForUser('100020528', movie_number, user_number+movie_number)
print(user_inference.shape)

(26004, 79906)

(26004, 79906)

生成的是 26004✖️79906 的矩阵。

随后调用 Endpoint 对预测数据进行预测,并返回预测结果。由于预测数据较大,我们分段进行,每一个请求传递 10 行预测数据。

In [10]:

复制代码
#invoke endpoint
client = boto3.client('sagemaker-runtime')
In [20]:
#Make inference
import json
inf_start = 0
step = 10
response = []
while inf_start <= user_inference.shape[0]+1:
raw_response = client.invoke_endpoint(
EndpointName = 'factorization-machines-2018-12-18-05-47-26-108',
Body = fm_serializer(user_inference[inf_start:inf_start+step].toarray()),
ContentType='application/json')
result = json.loads(raw_response['Body'].read())
for counter, p in enumerate(result['predictions']):
p['MovieID'] = df_m_Dict[df_m_Dict.movie_index == (inf_start + int(counter))].MovieID.values[0]
response.append(p)
inf_start = inf_start+step
print(inf_start)
print(len(response))
26004

对预测结果进行整理后,显示如下:

In [127]:

复制代码
df_response_like = df_response[df_response.predicted_label == 1.0]
final = df_response_like.sort_values('score', ascending=False).head(10)
for final_index in final.index:
print(df_movie_all_db[df_movie_all_db['MovieID']==str(final.at[final_index,'MovieID'])].Title.values[0],'\t',df_movie_all_db[df_movie_all_db['MovieID']==str(final.at[final_index,'MovieID'])].Rate.values[0],'\t',final.at[final_index,'score'])

逃离循环 6.8 0.8900507092475891

燕尾蝶 8.6 0.8846622705459595

盲证 7.4 0.8812546133995056

头文字 D 新 OVA 启程之绿 7.5 0.8807412981987

狐妖小红娘剧场版:月红篇 9.2 0.8801539540290833

追随你脚步 7.5 0.8792228698730469

黑色闪电 7.4 0.879056453704834

蓝色情人节 7.8 0.8788111805915833

骗中骗 2 7.5 0.878635048866272

放牛班的春天 9.2 0.8783532977104187

2. 从参与训练的电影库(26004 部)中推荐 10 部给新用户

如果是一个全新的用户,并不存在于我们训练数据集中的用户列表里。我们可以将稀疏矩阵中所有表示用户的列置为 0 来生成预测数据。

In [130]:

复制代码
def createNewUserInference(lines, columns):
user_number = 53902
X = scipy.sparse.lil_matrix((lines, columns)).astype('float32')
line = 0
for line in range(0, lines):
X[line, user_number+line] = 1
line=line+1
print(X.shape)
return X

In [123]:

复制代码
def callEndpointInference(query):
import json
inf_start = 0
step = 10
response = []
while inf_start <= query.shape[0]:
raw_response = client.invoke_endpoint(
EndpointName = 'factorization-machines-2018-12-18-05-47-26-108',
Body = fm_serializer(query[inf_start:min(inf_start+step, query.shape[0]+1)].toarray()),
ContentType='application/json')
result = json.loads(raw_response['Body'].read())
for counter, p in enumerate(result['predictions']):
p['MovieID'] = df_m_Dict[df_m_Dict.movie_index == (inf_start + int(counter))].MovieID.values[0]
response.append(p)
inf_start = inf_start+step
print(inf_start)
return response

In [125]:

复制代码
def showResult(response):
df_response = pd.DataFrame(response)
df_response_like = df_response[df_response.predicted_label == 1.0]
final = df_response_like.sort_values('score', ascending=False).head(10)
for final_index in final.index:
print(df_movie_all_db[df_movie_all_db['MovieID']==str(final.at[final_index,'MovieID'])].Title.values[0],'\t',df_movie_all_db[df_movie_all_db['MovieID']==str(final.at[final_index,'MovieID'])].Rate.values[0],'\t',final.at[final_index,'score'])

针对新用户生成的预测数据依然是一个 26004✖️79906 的矩阵。

In [131]:

复制代码
new_inference = createNewUserInference(movie_number, user_number+movie_number)
(26004, 79906)

In [132]:

复制代码
%%time
new_response = callEndpointInference(new_inference)
CPU times: user 5min 18s, sys: 25.9 s, total: 5min 44s
Wall time: 16min

预测结果如下,可以看到,与之前已存在用户的预测结果有很大不同。

In [133]:

复制代码
showResult(new_response)

青涩恋爱 7.9 0.7878496646881104

死亡实验 8.0 0.7846202850341797

时空线索 7.7 0.7836878299713135

侠僧探案传奇之王陵之谜 7.0 0.7817953824996948

宝拉 X 7.1 0.7813129425048828

没有秘密 6.5 0.7806361317634583

雨中曲 9.0 0.7800688743591309

卡里加里博士的小屋 8.7 0.7798536419868469

玩命警车 6.3 0.7796841263771057

玩具总动员 3 8.8 0.7790806293487549

3. 应用内容筛选并推荐 10 部电影给已存在的用户

回到已存在用户的推荐预测部分,之前我们的预测耗时较长,约需要 17 分钟左右才能完成一次查询,这在实际应用中是不现实的。主要原因之一是我们采用对电影库进行全量预测,其实这降低了性能表现和相关性。如果采用内容筛选的办法,提前限定预测影片的范围,缩小预测数据规模,可以提高性能表现和相关性。

让我们看一下 100020528 用户在全量数据中的历史表现。这位用户点评了 135 部电影,对应的类型如下:

In [88]:

复制代码
df_movie_preference = df_movie_all_db[df_movie_all_db['MovieID'].isin(movie_rated_by_user)].copy()
df_movie_preference.groupby('Tags').count()

Out[88]:

Link Rate Title Country RateCollected MovieID
Tags
剧情 / 动作 / 恐怖 1 1 1 1 0 1
剧情 / 动作 / 犯罪 2 2 2 2 0 2
剧情 / 历史 / 奇幻 1 1 1 1 0 1
剧情 / 喜剧 / 奇幻 1 1 1 1 0 1
剧情 / 喜剧 / 悬疑 / 惊悚 / 恐怖 1 1 1 1 0 1
剧情 / 喜剧 / 爱情 1 1 1 1 0 1
剧情 / 喜剧 / 爱情 / 惊悚 / 恐怖 1 1 1 1 0 1
剧情 / 奇幻 1 1 1 1 0 1
剧情 / 恐怖 3 3 3 3 0 3
剧情 / 恐怖 / 历史 1 1 1 1 0 1
剧情 / 恐怖 / 短片 1 1 1 1 0 1
剧情 / 悬疑 3 3 3 3 0 3
剧情 / 悬疑 / 恐怖 1 1 1 1 0 1
剧情 / 悬疑 / 惊悚 2 2 2 2 0 2
剧情 / 悬疑 / 惊悚 / 恐怖 1 1 1 1 0 1
剧情 / 悬疑 / 惊悚 / 犯罪 1 1 1 1 0 1
剧情 / 情色 1 1 1 1 0 1
剧情 / 惊悚 9 9 9 9 0 9
剧情 / 惊悚 / 历史 1 1 1 1 0 1
剧情 / 惊悚 / 恐怖 6 6 6 6 0 6
剧情 / 惊悚 / 犯罪 4 4 4 4 0 4
剧情 / 歌舞 / 奇幻 1 1 1 1 0 1
剧情 / 爱情 3 3 3 3 0 3
剧情 / 爱情 / 冒险 1 1 1 1 0 1
剧情 / 爱情 / 同性 1 1 1 1 0 1
剧情 / 爱情 / 悬疑 / 恐怖 / 奇幻 1 1 1 1 0 1
剧情 / 犯罪 4 4 4 4 0 4
剧情 / 科幻 / 恐怖 2 2 2 2 0 2
剧情 / 科幻 / 惊悚 2 2 2 2 0 2
剧情 / 科幻 / 惊悚 / 犯罪 1 1 1 1 0 1
喜剧 / 动作 / 科幻 1 1 1 1 0 1
喜剧 / 动作 / 科幻 / 恐怖 1 1 1 1 0 1
喜剧 / 恐怖 2 2 2 2 0 2
喜剧 / 恐怖 / 奇幻 2 2 2 2 0 2
喜剧 / 悬疑 / 恐怖 1 1 1 1 0 1
喜剧 / 惊悚 2 2 2 2 0 2
喜剧 / 惊悚 / 恐怖 1 1 1 1 0 1
喜剧 / 惊悚 / 恐怖 / 犯罪 1 1 1 1 0 1
喜剧 / 惊悚 / 犯罪 1 1 1 1 0 1
喜剧 / 爱情 / 恐怖 1 1 1 1 0 1
喜剧 / 科幻 / 恐怖 1 1 1 1 0 1
恐怖 15 15 15 15 0 15
恐怖 / 奇幻 1 1 1 1 0 1
恐怖 / 短片 / 犯罪 1 1 1 1 0 1
悬疑 / 恐怖 1 1 1 1 0 1
悬疑 / 惊悚 2 2 2 2 0 2
悬疑 / 惊悚 / 恐怖 6 6 6 6 0 6
悬疑 / 惊悚 / 恐怖 / 犯罪 1 1 1 1 0 1
悬疑 / 惊悚 / 犯罪 2 2 2 2 0 2
悬疑 / 短片 / 奇幻 1 1 1 1 0 1
惊悚 7 7 7 7 0 7
惊悚 / 恐怖 10 10 10 10 0 10
惊悚 / 恐怖 / 犯罪 3 3 3 3 0 3
惊悚 / 犯罪 1 1 1 1 0 1
爱情 1 1 1 1 0 1
爱情 / 恐怖 / 奇幻 1 1 1 1 0 1
爱情 / 短片 / 情色 1 1 1 1 0 1
科幻 1 1 1 1 0 1
科幻 / 奇幻 1 1 1 1 0 1
科幻 / 恐怖 1 1 1 1 0 1

可以看到这位用户偏爱含有“恐怖”和“惊悚”标签的电影,加起来几乎是其点评过电影的 50%,所以我们可以对电影库进行一下筛选,看有多少电影是包含这两个标签的。

In [93]:

复制代码
df_movie_all_db[df_movie_all_db.Tags.isin(['惊悚','恐怖'])].count()

Out[93]:

Link 1625

Rate 1625

Title 1625

Tags 1625

Country 1625

RateCollected 0

MovieID 1625

dtype: int64

In [111]:

复制代码
#Check whether all 1625 movies is in m_Dict, and clean the movies that are not in m_Dict
for m_h in horror_and_terrified_movie:
if m_h in m_Dict.keys():
continue
else:
print(m_h)
horror_and_terrified_movie.remove(m_h)
print(len(horror_and_terrified_movie))
1124

观测结果,在我们参与训练的电影库(26004 部)中有 1124 部是符合这两个类型的。我们进一步把 100020528 这位用户点评过的电影从这 1124 部中移除出去。

In [137]:

复制代码
m_rated = []
for el_index in df_movie_preference.index:
m_rated.append(df_movie_preference.at[el_index, 'MovieID'])

In [141]:

复制代码
for m_h in horror_and_terrified_movie:
if m_h in m_rated:
print(m_h)
horror_and_terrified_movie.remove(m_h)
print(len(horror_and_terrified_movie))
1104

最后我们获得了共计 1104 部电影的清单。针对这位用户,以及这 1104 部电影的清单重新生成预测数据矩阵。预测数据矩阵为 1104✖️79906 阶。

In [143]:

复制代码
filtered_user_inference = createInferenceForUserPreference('100020528', horror_and_terrified_movie, user_number+movie_number)
print(filtered_user_inference.shape)
(1104, 79906)

使用这一矩阵进行预测。

In [149]:

复制代码
%%time
filtered_response = callEndpointInference_noid(filtered_user_inference)
CPU times: user 13.2 s, sys: 879 ms, total: 14.1 s
Wall time: 40.9 s

预测用时 40.9 秒,预测结果如下:

In [154]:

复制代码
showResultfromDataFrame(df_filtered_counter)

杀人鳄鱼潭 7.4 0.8682911992073059

没有秘密 6.5 0.8636484742164612

热血青年 6.5 0.8625574111938477

驱魔警察 7.4 0.8622710108757019

鬼玩人 7.0 0.8613290786743164

一千灵异夜之灵魂实验 7.0 0.8611863851547241

连体 5.4 0.8586516976356506

僵尸城市 6.3 0.8585190176963806

35 楼的生存游戏 6.7 0.8584233522415161

危险工作 6.5 0.8558875918388367

从预测结果中可以直观感受到,更贴近用户历史行为的喜好。

应用稀疏数据格式优化响应时间

当然 40.9 秒的预测时间在实际中依然过长,这是由于在本例的演示中,预测数据全部采用 json dense 格式传递,造成数据量大,只能分批预测和返回。实际应用中可以采用 FM 接受的 json sparse 数据格式。

我们使用最后的用户示例演示一下 json sparse 格式传递预测数据获得结果的过程以及对这一格式整个响应时间做个测量。

重新定义 Invoke Endpoint 的方法以及内部构成数据的格式如下:

In [298]:

复制代码
def slicedInferenceUserPreference(userid, step, movielist):
import json
#total_line = 26004 #movie_number
start = 0
response = []
line = 0
response = []
while start <= len(movielist)+1:
js={'instances': []}
for line in range(0, min(int(step), len(movielist)-start)):
js['instances'].append({'data':{'features': {
'keys':[int(u_Dict[userid]), 53902+m_Dict[movielist[line+int(start)]]],
"shape":[79906],
"values":[1,1]}}})
#print(json.dumps(js))
raw_response = client.invoke_endpoint(
EndpointName = 'factorization-machines-2018-12-18-05-47-26-108',
Body = json.dumps(js),
ContentType='application/json')
result = json.loads(raw_response['Body'].read())
for counter, p in enumerate(result['predictions']):
p['MovieID'] = movielist[counter]
response.append(p)
print(start+line)
start = start+step
return(response)
In [299]:
%%time
test_user_preference_response = slicedInferenceUserPreference('100020528', 1000, horror_and_terrified_movie)
999
1103
CPU times: user 21.8 ms, sys: 519 µs, total: 22.3 ms
Wall time: 5.93 s

运行结果共计耗时 5.93 秒,且其中大部分是由于网络延时,已经可以考虑在实际中应用了。预测结果也与之前保持一致。

In [301]:

复制代码
showResultfromDataFrame(preference_response)

杀人鳄鱼潭 7.4 0.8682911992073059

没有秘密 6.5 0.8636484742164612

热血青年 6.5 0.8625574111938477

驱魔警察 7.4 0.8622710108757019

鬼玩人 7.0 0.8613290786743164

一千灵异夜之灵魂实验 7.0 0.8611863851547241

连体 5.4 0.8586516976356506

僵尸城市 6.3 0.8585190176963806

35 楼的生存游戏 6.7 0.8584233522415161

危险工作 6.5 0.8558875918388367

从本次的实验示例中可以感受到,SageMaker 作为 AWS 平台上机器学习及人工智能的核心服务之一,极大地简化了我们准备数据、分析数据、构建模型、验证模型、参数调优和部署模型的整个工作流程。使用 SageMaker 提供的算法,更可以使拥有大量数据资产的用户以极快的速度发挥数据的价值,在应用中引入机器学习或人工智能。

使用 AWS Sagemaker 系列文章:
第一篇:使用 AWS Sagemaker 训练因子分解机模型并应用于推荐系统

第二篇:使用 AWS Sagemaker 部署的终端节点进行推荐预测的常用场景(本博文)

作者介绍:

崔辰
AWS 大中华区创新中心技术业务拓展经理。加入 AWS 之前,崔辰在中国惠普、IBM、微软以及海航科技等公司担任过售前技术顾问、市场经理和战略合作经理等职务。在 10 多年的科技领域工作经历中,崔辰服务过众多企业级客户。

本文转载自 AWS 技术博客。

原文链接:
https://amazonaws-china.com/cn/blogs/china/aws-sagemaker-recommendation-scene/

欲了解 AWS 的更多信息,请访问【AWS 技术专区】

评论

发布