作者丨刘宇
策划丨田晓旭
敏感词过滤是随着互联网社区一起发展起来的一种阻止网络犯罪和网络暴力的技术手段,通过对可能存在犯罪或网络暴力的关键词进行有针对性的筛查和屏蔽,能够防患于未然,将后果严重的犯罪行为扼杀于萌芽之中。
随着各种社交论坛的日益火爆,敏感词过滤逐渐成为了非常重要的功能。那么在 Serverless 架构下,利用 Python 语言,敏感词过滤又有那些新的实现呢?我们能否用最简单的方法实现一个敏感词过滤的 API 呢?
敏感词过滤,其实在一定程度上是文本替换,以 Python 为例,我们可以通过 replace 来实现,首先准备一个敏感词库,然后通过 replace 进行敏感词替换:
def worldFilter(keywords, text):for eve in keywords:text = text.replace(eve, "***")return textkeywords = (" 关键词 1", " 关键词 2", " 关键词 3")content = " 这是一个关键词替换的例子,这里涉及到了关键词 1 还有关键词 2,最后还会有关键词 3。"print(worldFilter(keywords, content))
这种方法虽然操作简单,但是存在一个很大的问题:在文本和敏感词汇非常庞大的情况下,会出现很严重的性能问题。
举个例子,我们先修改代码进行基本的性能测试:
import timedef worldFilter(keywords, text):for eve in keywords:text = text.replace(eve, "***")return textkeywords =[ " 关键词 " + str(i) for i in range(0,10000)]content = " 这是一个关键词替换的例子,这里涉及到了关键词 1 还有关键词 2,最后还会有关键词 3。" * 1000startTime = time.time()worldFilter(keywords, content)print(time.time()-startTime)
此时的输出结果是:0.12426114082336426,可以看到性能非常差。
相较于 replace,使用正则表达 re.sub 实现可能更加快速。
import timeimport redef worldFilter(keywords, text):return re.sub("|".join(keywords), "***", text)keywords =[ " 关键词 " + str(i) for i in range(0,10000)]content = " 这是一个关键词替换的例子,这里涉及到了关键词 1 还有关键词 2,最后还会有关键词 3。" * 1000startTime = time.time()worldFilter(keywords, content)print(time.time()-startTime)
增加性能测试之后,我们按照上面的方法进行改造测试,输出结果是 0.24773502349853516。
对比这两个例子,我们会发现当前两种方法的性能差距不是很大,但是随着文本数量的增加,正则表达的优势会逐渐凸显,性能提升明显。
正则表达方法相较于 replace,使用正则表达 re.sub 实现可能更加快速。
相对来说,DFA 过滤敏感词的效率会更高一些,例如我们把坏人、坏孩子、坏蛋作为敏感词,那么它们的树关系可以这样表达:
而 DFA 字典是这样表示的:
{'坏': {'蛋': {'\x00': 0},'人': {'\x00': 0},'孩': {'子': {'\x00': 0}}}}
使用这种树表示问题最大的好处就是可以降低检索次数、提高检索效率。其基本代码实现如下:
import timeclass DFAFilter(object):def __init__(self):self.keyword_chains = {} # 关键词链表self.delimit = '\x00' # 限定def parse(self, path):with open(path, encoding='utf-8') as f:for keyword in f:chars = str(keyword).strip().lower() # 关键词英文变为小写if not chars: # 如果关键词为空直接返回returnlevel = self.keyword_chainsfor i in range(len(chars)):if chars[i] in level:level = level[chars[i]]else:if not isinstance(level, dict):breakfor j in range(i, len(chars)):level[chars[j]] = {}last_level, last_char = level, chars[j]level = level[chars[j]]last_level[last_char] = {self.delimit: 0}breakif i == len(chars) - 1:level[self.delimit] = 0def filter(self, message, repl="*"):message = message.lower()ret = []start = 0while start < len(message):level = self.keyword_chainsstep_ins = 0for char in message[start:]:if char in level:step_ins += 1if self.delimit not in level[char]:level = level[char]else:ret.append(repl * step_ins)start += step_ins - 1breakelse:ret.append(message[start])breakelse:ret.append(message[start])start += 1return ''.join(ret)gfw = DFAFilter()gfw.parse( "./sensitive_words")content = " 这是一个关键词替换的例子,这里涉及到了关键词 1 还有关键词 2,最后还会有关键词 3。" * 1000startTime = time.time()result = gfw.filter(content)print(time.time()-startTime)
这里的字典库是:
with open("./sensitive_words", 'w') as f:f.write("\n".join( [ " 关键词 " + str(i) for i in range(0,10000)]))
执行结果:
0.06450581550598145
从中,我们可以看到性能又进一步得到了提升。
什么是 AC 自动机?简单来说,AC 自动机就是字典树 +kmp 算法 + 失配指针,一个常见的例子就是给出 n 个单词,再给出一段包含 m 个字符的文章,让你找出有多少个单词在文章里出现过。
代码实现:
import timeclass Node(object):def __init__(self):self.next = {}self.fail = Noneself.isWord = Falseself.word = ""class AcAutomation(object):def __init__(self):self.root = Node()# 查找敏感词函数def search(self, content):p = self.rootresult = []currentposition = 0while currentposition < len(content):word = content[currentposition]while word in p.next == False and p != self.root:p = p.failif word in p.next:p = p.next[word]else:p = self.rootif p.isWord:result.append(p.word)p = self.rootcurrentposition += 1return result# 加载敏感词库函数def parse(self, path):with open(path, encoding='utf-8') as f:for keyword in f:temp_root = self.rootfor char in str(keyword).strip():if char not in temp_root.next:temp_root.next[char] = Node()temp_root = temp_root.next[char]temp_root.isWord = Truetemp_root.word = str(keyword).strip()# 敏感词替换函数def wordsFilter(self, text):""":param ah: AC 自动机:param text: 文本:return: 过滤敏感词之后的文本"""result = list(set(self.search(text)))for x in result:m = text.replace(x, '*' * len(x))text = mreturn textacAutomation = AcAutomation()acAutomation.parse('./sensitive_words')startTime = time.time()print(acAutomation.wordsFilter(" 这是一个关键词替换的例子,这里涉及到了关键词 1 还有关键词 2,最后还会有关键词 3。"*1000))print(time.time()-startTime)
词库同样是:
with open("./sensitive_words", 'w') as f:f.write("\n".join( [ " 关键词 " + str(i) for i in range(0,10000)]))
使用上面的方法,测试结果为 0.017391204833984375。
根据上文的测试对比,我们可以发现在所有算法中,DFA 过滤敏感词性能最高,但是在实际应用中,DFA 过滤和 AC 自动机过滤各自有自己的适用场景,可以根据具体业务来选择。
想要实现敏感词过滤 API,就需要将代码部署到 Serverless 架构上,选择 API 网关与函数计算进行结合。以 AC 自动机过滤敏感词算法为例:我们只需要增加是几行代码就好:
# -*- coding:utf-8 -*-import json, uuidclass Node(object):def __init__(self):self.next = {}self.fail = Noneself.isWord = Falseself.word = ""class AcAutomation(object):def __init__(self):self.root = Node()# 查找敏感词函数def search(self, content):p = self.rootresult = []currentposition = 0while currentposition < len(content):word = content[currentposition]while word in p.next == False and p != self.root:p = p.failif word in p.next:p = p.next[word]else:p = self.rootif p.isWord:result.append(p.word)p = self.rootcurrentposition += 1return result# 加载敏感词库函数def parse(self, path):with open(path, encoding='utf-8') as f:for keyword in f:temp_root = self.rootfor char in str(keyword).strip():if char not in temp_root.next:temp_root.next[char] = Node()temp_root = temp_root.next[char]temp_root.isWord = Truetemp_root.word = str(keyword).strip()# 敏感词替换函数def wordsFilter(self, text):""":param ah: AC 自动机:param text: 文本:return: 过滤敏感词之后的文本"""result = list(set(self.search(text)))for x in result:m = text.replace(x, '*' * len(x))text = mreturn textdef response(msg, error=False):return_data = {"uuid": str(uuid.uuid1()),"error": error,"message": msg}print(return_data)return return_dataacAutomation = AcAutomation()path = './sensitive_words'acAutomation.parse(path)def main_handler(event, context):try:sourceContent = json.loads(event["body"])["content"]return response({"sourceContent": sourceContent,"filtedContent": acAutomation.wordsFilter(sourceContent)})except Exception as e:return response(str(e), True)
最后,为了方便本地测试,我们可以再增加以下代码:
def test():event = {"requestContext": {"serviceId": "service-f94sy04v","path": "/test/{path}","httpMethod": "POST","requestId": "c6af9ac6-7b61-11e6-9a41-93e8deadbeef","identity": {"secretId": "abdcdxxxxxxxsdfs"},"sourceIp": "14.17.22.34","stage": "release"},"headers": {"Accept-Language": "en-US,en,cn","Accept": "text/html,application/xml,application/json","Host": "service-3ei3tii4-251000691.ap-guangzhou.apigateway.myqloud.com","User-Agent": "User Agent String"},"body": "{\"content\":\" 这是一个测试的文本,我也就呵呵了\"}","pathParameters": {"path": "value"},"queryStringParameters": {"foo": "bar"},"headerParameters": {"Refer": "10.0.2.14"},"stageVariables": {"stage": "release"},"path": "/test/value","queryString": {"foo": "bar","bob": "alice"},"httpMethod": "POST"}print(main_handler(event, None))if __name__ == "__main__":test()
完成之后,就可以进行测试运行,例如我的字典是:
呵呵测试
执行之后结果:
{'uuid': '9961ae2a-5cfc-11ea-a7c2-acde48001122', 'error': False, 'message': {'sourceContent': '这是一个测试的文本,我也就呵呵了', 'filtedContent': '这是一个 ** 的文本,我也就 ** 了'}}
接下来,我们将代码部署到云端,新建 serverless.yaml:
sensitive_word_filtering:component: "@serverless/tencent-scf"inputs:name: sensitive_word_filteringcodeUri: ./exclude:.gitignore.git/**.serverless.envhandler: index.main_handlerruntime: Python3.6region: ap-beijingdescription: 敏感词过滤memorySize: 64timeout: 2events:apigw:name: serverlessparameters:environment: releaseendpoints:path: /sensitive_word_filteringdescription: 敏感词过滤method: POSTenableCORS: trueparam:name: contentposition: BODYrequired: 'FALSE'type: string: 待过滤的句子
然后通过 sls --debug 进行部署,部署结果:
最后,通过 PostMan 进行测试:
敏感词过滤是当前企业的普遍需求,通过敏感词过滤,我们可以在一定程度上遏制恶言恶语和违规言论的出现。在具体实现过程中,有两个方面需要额外主要:
敏感词库的获得问题:Github 上有很多敏感词库,其中包含了各种场景中的敏感词,大家可以自行搜索下载使用;
API 使用场景的问题:我们可以将这个 API 放置在社区跟帖系统、留言评论系统或者是博客发布系统中,这样可以防止出现敏感词汇,减少不必要的麻烦。
刘宇,腾讯 Serverless 团队后台研发工程师。毕业于浙江大学,硕士研究生学历,曾在滴滴出行、腾讯科技做产品经理,本科开始有自主创业经历,是 Anycodes 在线编程的负责人(该软件累计下载量超 100 万次)。目前投身于 Serverless 架构研发,著书《Serverless 架构:从原理、设计到项目实战》,参与开发和维护多个 Serverless 组件,是活跃的 Serverless Framework 的贡献者,也曾多次公开演讲和分享 Serverless 相关技术与经验,致力于 Serverless 的落地与项目上云。
InfoQ 读者交流群上线啦!各位小伙伴可以扫描下方二维码,添加 InfoQ 小助手,回复关键字“进群”申请入群。大家可以和 InfoQ 读者一起畅所欲言,和编辑们零距离接触,超值的技术礼包等你领取,还有超值活动等你参加,快来加入我们吧!
点个在看少个 bug 👇