Serverless 实战:3 分钟实现文本敏感词过滤

发布于:2020 年 5 月 10 日 08:00

Serverless实战:3分钟实现文本敏感词过滤

敏感词过滤是随着互联网社区一起发展起来的一种阻止网络犯罪和网络暴力的技术手段,通过对可能存在犯罪或网络暴力的关键词进行有针对性的筛查和屏蔽,能够防患于未然,将后果严重的犯罪行为扼杀于萌芽之中。

随着各种社交论坛的日益火爆,敏感词过滤逐渐成为了非常重要的功能。那么在 Serverless 架构下,利用 Python 语言,敏感词过滤又有那些新的实现呢?我们能否用最简单的方法实现一个敏感词过滤的 API 呢?

了解敏感过滤的几种方法

Replace 方法

敏感词过滤,其实在一定程度上是文本替换,以 Python 为例,我们可以通过replace来实现,首先准备一个敏感词库,然后通过replace进行敏感词替换:

复制代码
def worldFilter(keywords, text):
for eve in keywords:
text = text.replace(eve, "***")
return text
keywords = (" 关键词 1", " 关键词 2", " 关键词 3")
content = " 这是一个关键词替换的例子,这里涉及到了关键词 1 还有关键词 2,最后还会有关键词 3。"
print(worldFilter(keywords, content))

这种方法虽然操作简单,但是存在一个很大的问题:在文本和敏感词汇非常庞大的情况下,会出现很严重的性能问题。

举个例子,我们先修改代码进行基本的性能测试:

复制代码
import time
def worldFilter(keywords, text):
for eve in keywords:
text = text.replace(eve, "***")
return text
keywords =[ " 关键词 " + str(i) for i in range(0,10000)]
content = " 这是一个关键词替换的例子,这里涉及到了关键词 1 还有关键词 2,最后还会有关键词 3。" * 1000
startTime = time.time()
worldFilter(keywords, content)
print(time.time()-startTime)

此时的输出结果是:0.12426114082336426,可以看到性能非常差。

正则表达方法

相较于replace,使用正则表达re.sub实现可能更加快速。

复制代码
import time
import re
def worldFilter(keywords, text):
return re.sub("|".join(keywords), "***", text)
keywords =[ " 关键词 " + str(i) for i in range(0,10000)]
content = " 这是一个关键词替换的例子,这里涉及到了关键词 1 还有关键词 2,最后还会有关键词 3。" * 1000
startTime = time.time()
worldFilter(keywords, content)
print(time.time()-startTime)

增加性能测试之后,我们按照上面的方法进行改造测试,输出结果是0.24773502349853516

对比这两个例子,我们会发现当前两种方法的性能差距不是很大,但是随着文本数量的增加,正则表达的优势会逐渐凸显,性能提升明显。

DFA 过滤敏感词

相对来说,DFA 过滤敏感词的效率会更高一些,例如我们把坏人、坏孩子、坏蛋作为敏感词,那么它们的树关系可以这样表达:

Serverless实战:3分钟实现文本敏感词过滤

而 DFA 字典是这样表示的:

复制代码
{
'坏': {
'蛋': {
'\x00': 0
},
'人': {
'\x00': 0
},
'孩': {
'子': {
'\x00': 0
}
}
}
}

使用这种树表示问题最大的好处就是可以降低检索次数、提高检索效率。其基本代码实现如下:

复制代码
import time
class DFAFilter(object):
def __init__(self):
self.keyword_chains = {} # 关键词链表
self.delimit = '\x00' # 限定
def parse(self, path):
with open(path, encoding='utf-8') as f:
for keyword in f:
chars = str(keyword).strip().lower() # 关键词英文变为小写
if not chars: # 如果关键词为空直接返回
return
level = self.keyword_chains
for i in range(len(chars)):
if chars[i] in level:
level = level[chars[i]]
else:
if not isinstance(level, dict):
break
for j in range(i, len(chars)):
level[chars[j]] = {}
last_level, last_char = level, chars[j]
level = level[chars[j]]
last_level[last_char] = {self.delimit: 0}
break
if i == len(chars) - 1:
level[self.delimit] = 0
def filter(self, message, repl="*"):
message = message.lower()
ret = []
start = 0
while start < len(message):
level = self.keyword_chains
step_ins = 0
for char in message[start:]:
if char in level:
step_ins += 1
if self.delimit not in level[char]:
level = level[char]
else:
ret.append(repl * step_ins)
start += step_ins - 1
break
else:
ret.append(message[start])
break
else:
ret.append(message[start])
start += 1
return ''.join(ret)
gfw = DFAFilter()
gfw.parse( "./sensitive_words")
content = " 这是一个关键词替换的例子,这里涉及到了关键词 1 还有关键词 2,最后还会有关键词 3。" * 1000
startTime = time.time()
result = gfw.filter(content)
print(time.time()-startTime)

这里的字典库是:

复制代码
with open("./sensitive_words", 'w') as f:
f.write("\n".join( [ " 关键词 " + str(i) for i in range(0,10000)]))

执行结果:

复制代码
0.06450581550598145

从中,我们可以看到性能又进一步得到了提升。

AC 自动机过滤敏感词算法

什么是 AC 自动机?简单来说,AC 自动机就是字典树 +kmp 算法 + 失配指针,一个常见的例子就是给出 n 个单词,再给出一段包含 m 个字符的文章,让你找出有多少个单词在文章里出现过。

代码实现:

复制代码
import time
class Node(object):
def __init__(self):
self.next = {}
self.fail = None
self.isWord = False
self.word = ""
class AcAutomation(object):
def __init__(self):
self.root = Node()
# 查找敏感词函数
def search(self, content):
p = self.root
result = []
currentposition = 0
while currentposition < len(content):
word = content[currentposition]
while word in p.next == False and p != self.root:
p = p.fail
if word in p.next:
p = p.next[word]
else:
p = self.root
if p.isWord:
result.append(p.word)
p = self.root
currentposition += 1
return result
# 加载敏感词库函数
def parse(self, path):
with open(path, encoding='utf-8') as f:
for keyword in f:
temp_root = self.root
for char in str(keyword).strip():
if char not in temp_root.next:
temp_root.next[char] = Node()
temp_root = temp_root.next[char]
temp_root.isWord = True
temp_root.word = str(keyword).strip()
# 敏感词替换函数
def wordsFilter(self, text):
"""
:param ah: AC 自动机
:param text: 文本
:return: 过滤敏感词之后的文本
"""
result = list(set(self.search(text)))
for x in result:
m = text.replace(x, '*' * len(x))
text = m
return text
acAutomation = AcAutomation()
acAutomation.parse('./sensitive_words')
startTime = time.time()
print(acAutomation.wordsFilter(" 这是一个关键词替换的例子,这里涉及到了关键词 1 还有关键词 2,最后还会有关键词 3。"*1000))
print(time.time()-startTime)

词库同样是:

复制代码
with open("./sensitive_words", 'w') as f:
f.write("\n".join( [ " 关键词 " + str(i) for i in range(0,10000)]))

使用上面的方法,测试结果为0.017391204833984375

敏感词过滤方法小结

根据上文的测试对比,我们可以发现在所有算法中,DFA 过滤敏感词性能最高,但是在实际应用中,DFA 过滤和 AC 自动机过滤各自有自己的适用场景,可以根据具体业务来选择。

实现敏感词过滤 API

想要实现敏感词过滤 API,就需要将代码部署到 Serverless 架构上,选择 API 网关与函数计算进行结合。以 AC 自动机过滤敏感词算法为例:我们只需要增加是几行代码就好:

复制代码
# -*- coding:utf-8 -*-
import json, uuid
class Node(object):
def __init__(self):
self.next = {}
self.fail = None
self.isWord = False
self.word = ""
class AcAutomation(object):
def __init__(self):
self.root = Node()
# 查找敏感词函数
def search(self, content):
p = self.root
result = []
currentposition = 0
while currentposition < len(content):
word = content[currentposition]
while word in p.next == False and p != self.root:
p = p.fail
if word in p.next:
p = p.next[word]
else:
p = self.root
if p.isWord:
result.append(p.word)
p = self.root
currentposition += 1
return result
# 加载敏感词库函数
def parse(self, path):
with open(path, encoding='utf-8') as f:
for keyword in f:
temp_root = self.root
for char in str(keyword).strip():
if char not in temp_root.next:
temp_root.next[char] = Node()
temp_root = temp_root.next[char]
temp_root.isWord = True
temp_root.word = str(keyword).strip()
# 敏感词替换函数
def wordsFilter(self, text):
"""
:param ah: AC 自动机
:param text: 文本
:return: 过滤敏感词之后的文本
"""
result = list(set(self.search(text)))
for x in result:
m = text.replace(x, '*' * len(x))
text = m
return text
def response(msg, error=False):
return_data = {
"uuid": str(uuid.uuid1()),
"error": error,
"message": msg
}
print(return_data)
return return_data
acAutomation = AcAutomation()
path = './sensitive_words'
acAutomation.parse(path)
def main_handler(event, context):
try:
sourceContent = json.loads(event["body"])["content"]
return response({
"sourceContent": sourceContent,
"filtedContent": acAutomation.wordsFilter(sourceContent)
})
except Exception as e:
return response(str(e), True)

最后,为了方便本地测试,我们可以再增加以下代码:

复制代码
def test():
event = {
"requestContext": {
"serviceId": "service-f94sy04v",
"path": "/test/{path}",
"httpMethod": "POST",
"requestId": "c6af9ac6-7b61-11e6-9a41-93e8deadbeef",
"identity": {
"secretId": "abdcdxxxxxxxsdfs"
},
"sourceIp": "14.17.22.34",
"stage": "release"
},
"headers": {
"Accept-Language": "en-US,en,cn",
"Accept": "text/html,application/xml,application/json",
"Host": "service-3ei3tii4-251000691.ap-guangzhou.apigateway.myqloud.com",
"User-Agent": "User Agent String"
},
"body": "{\"content\":\" 这是一个测试的文本,我也就呵呵了\"}",
"pathParameters": {
"path": "value"
},
"queryStringParameters": {
"foo": "bar"
},
"headerParameters": {
"Refer": "10.0.2.14"
},
"stageVariables": {
"stage": "release"
},
"path": "/test/value",
"queryString": {
"foo": "bar",
"bob": "alice"
},
"httpMethod": "POST"
}
print(main_handler(event, None))
if __name__ == "__main__":
test()

完成之后,就可以进行测试运行,例如我的字典是:

复制代码
呵呵
测试

执行之后结果:

复制代码
{'uuid': '9961ae2a-5cfc-11ea-a7c2-acde48001122', 'error': False, 'message': {'sourceContent': '这是一个测试的文本,我也就呵呵了', 'filtedContent': '这是一个 ** 的文本,我也就 ** 了'}}

接下来,我们将代码部署到云端,新建serverless.yaml:

复制代码
sensitive_word_filtering:
component: "@serverless/tencent-scf"
inputs:
name: sensitive_word_filtering
codeUri: ./
exclude:
- .gitignore
- .git/**
- .serverless
- .env
handler: index.main_handler
runtime: Python3.6
region: ap-beijing
description: 敏感词过滤
memorySize: 64
timeout: 2
events:
- apigw:
name: serverless
parameters:
environment: release
endpoints:
- path: /sensitive_word_filtering
description: 敏感词过滤
method: POST
enableCORS: true
param:
- name: content
position: BODY
required: 'FALSE'
type: string
desc: 待过滤的句子

然后通过sls --debug进行部署,部署结果:

Serverless实战:3分钟实现文本敏感词过滤

最后,通过 PostMan 进行测试:

Serverless实战:3分钟实现文本敏感词过滤

总结

敏感词过滤是当前企业的普遍需求,通过敏感词过滤,我们可以在一定程度上遏制恶言恶语和违规言论的出现。在具体实现过程中,有两个方面需要额外主要:

  • 敏感词库的获得问题:Github 上有很多敏感词库,其中包含了各种场景中的敏感词,大家可以自行搜索下载使用;

  • API 使用场景的问题:我们可以将这个 API 放置在社区跟帖系统、留言评论系统或者是博客发布系统中,这样可以防止出现敏感词汇,减少不必要的麻烦。

作者介绍:

刘宇,腾讯 Serverless 团队后台研发工程师。毕业于浙江大学,硕士研究生学历,曾在滴滴出行、腾讯科技做产品经理,本科开始有自主创业经历,是 Anycodes 在线编程的负责人(该软件累计下载量超 100 万次)。目前投身于 Serverless 架构研发,著书《Serverless 架构:从原理、设计到项目实战》,参与开发和维护多个 Serverless 组件,是活跃的 Serverless Framework 的贡献者,也曾多次公开演讲和分享 Serverless 相关技术与经验,致力于 Serverless 的落地与项目上云。

阅读数:3930 发布于:2020 年 5 月 10 日 08:00

评论

发布
暂无评论