本文转载自阿里云安全公众号,点击“阅读原文”可查看原文。
随着大模型在企业前台业务中的广泛应用,从智能客服、AI搜索到虚拟助手,每一次用户交互都潜藏着不可忽视的安全风险。一旦模型输出违规内容、泄露敏感信息或被恶意操控,不仅影响用户体验,更可能引发法律合规、品牌声誉乃至系统级安全危机。
在此背景下,阿里云AI安全护栏(AI Guardrails)正式登陆Dify Marketplace,为使用Dify构建AI应用的开发者提供原生集成的一站式AI安全解决方案。通过插件化部署或API扩展方式,开发者可在工作流(Workflow)、Agent和Chatflow中快速启用输入输出双向防护机制,实现从用户输入与模型输出的双向检测。
阿里云AI安全护栏产品架构图
从输入到输出,构建端到端安全闭环,全面应对大模型在真实业务中面临的内容安全、外部攻击、隐私泄露与输出失控等关键挑战。
深度集成Qwen3-Guard与基于Qwen系列SFT的审核大模型,融合对抗检测与语义理解能力,可精准识别变体、谐音、隐喻表达及意识形态渗透等高隐蔽性风险。
支持全链路流式审核,在模型逐段生成内容的过程中实时送检,显著降低从token生成到风险发现的延迟,保障高并发场景下的交互流畅性与安全性。
支持单轮问答、多轮问答等场景下的风险检测,通过融合历史对话信息,识别跨轮次诱导、语义漂移与越狱行为,实现对完整对话意图的准确理解,避免因上下文割裂导致的误判。
支持文本、图片、文件等模态的混合检测,有效识别跨模态隐藏指令与复合型攻击,实现多模态风险覆盖。
支持All-in-One API接入,一次调用即可完成全模态检测,按需启用防护能力,集成简单高效,适配主流AI应用架构,助力客户敏捷上线。目前,阿里云AI安全护栏为客户提供包括API接入、阿里云百炼接入、阿里云WAF接入等多种一键接入模式。
通过算法编排动态平衡精度、时延与成本;对于高并发、低延迟场景,可在保障检测效果的前提下提供高性能服务,满足严苛生产需求。
提供可视化控制台,支持风险策略配置、黑白名单、阈值调节与效果验证;更支持用户创建自定义检测Agent——可定义专属标签与提示词,精准识别金融、医疗、教育等行业或特定场景下的业务风险,实现安全能力的灵活扩展与深度定制。
针对当前大量开发者使用Dify构建AI应用的现状,AI安全护栏已上架Dify官方插件市场,提供两种集成路径:
◆ 适用场景:标准Workflow、非流式输出应用;
-
在Dify插件市场下载并安装“AI安全护栏”插件。
-
在LLM节点前插入“输入检测”节点,拦截提示词注入、DDoS攻击等。
-
在LLM节点后插入“输出检测”节点,防止敏感信息泄露或不当内容输出。
-
效果示例:
◆ 适用场景:Agent、Chatflow、需支持流式输出的应用;
◆ 操作方式:
-
部署转发服务 -
AI安全护栏产品的API最大支持单次2000字符输入,因此输入长度大于2000字符的情况下,需要进行适配,处理方法如下:
-
输入审查:将输入切分为多段,每段不超过2000个字符,并发调用安全护栏API。
-
输出审查:Dify每隔300字符左右发起一次内容审核API调用,处理上截取最近2000字符进行调用。
-
以下分别为处理逻辑和启动脚本的示例代码:
from fastapi import FastAPI, Body, HTTPException, Headerfrom pydantic import BaseModelimport base64from collections.abc import Generatorfrom typing import Anyimport hmacimport hashlibfrom urllib.parse import quoteimport requestsfrom datetime import datetimefrom datetime import timezoneimport uuidimport jsonimport reimport concurrent.futures# 可以根据需要调用不同区域的服务,支持上海(cn-shanghai)、北京(cn-beijing)、杭州(cn-hangzhou)、深圳(cn-shenzhen)SERVICE_URL = "https://green-cip.cn-shanghai.aliyuncs.com"# 超过这个长度时对文本进行切分MAX_LENGTH = 2000# 调用安全护栏的输入检测和输出检测的ServiceCodeSERVICE_INPUT = "query_security_check"SERVICE_OUTPUT = "response_security_check"ENCODING = "UTF-8"ISO8601_DATE_FORMAT = "%Y-%m-%dT%H:%M:%SZ"ALGORITHM = "HmacSHA1"def format_iso8601_date():return datetime.now(timezone.utc).strftime(ISO8601_DATE_FORMAT)def percent_encode(value):if value is None:return ""return (quote(value.encode(ENCODING), safe="~").replace("+", "%20").replace("*", "%2A"))def create_signature(string_to_sign, secret):secret = secret + "&"signature = hmac.new(secret.encode(ENCODING), string_to_sign.encode(ENCODING), hashlib.sha1).digest()return base64.b64encode(signature).decode(ENCODING)def create_string_to_sign(http_method, parameters):sorted_keys = sorted(parameters.keys())canonicalized_query_string = ""for key in sorted_keys:canonicalized_query_string += ("&" + percent_encode(key) + "=" + percent_encode(parameters[key]))string_to_sign = (http_method+ "&"+ percent_encode("/")+ "&"+ percent_encode(canonicalized_query_string[1:]))return string_to_signdef split_text(text: str, max_length: int = 1950) -> list[str]:"""将文本按 max_length 分段,尽量保留完整句子(识别多种标点)"""segments = []while len(text) > max_length:# 提取当前最大长度范围内的子串chunk = text[:max_length]# 使用正则查找最后一个句号、感叹号、问号等断句符号的位置match = Nonefor pattern in [r"[。!?;:\.?!]+"]: # 匹配多种结束符号matches = list(re.finditer(pattern, chunk))if matches:match = matches[-1] # 取最后一个匹配项if match:cut_point = match.end() # 包含标点符号else:cut_point = max_length # 找不到就强制截断segments.append(text[:cut_point])text = text[cut_point:]if text:segments.append(text)return segmentsdef request(content_segment, type, aliyun_access_key, aliyun_access_secret):print(datetime.now(), f" [{type} request content]-> {content_segment}")# 3.1 构造请求参数parameters = {"Action": "MultiModalGuard","Version": "2022-03-02","AccessKeyId": aliyun_access_key,"Timestamp": format_iso8601_date(),"SignatureMethod": "HMAC-SHA1","SignatureVersion": "1.0","SignatureNonce": str(uuid.uuid4()),"Format": "JSON","Service": (SERVICE_INPUT if type == "input" else SERVICE_OUTPUT),"ServiceParameters": json.dumps({"content": content_segment}, ensure_ascii=False),}string_to_sign = create_string_to_sign("POST", parameters)signature = create_signature(string_to_sign, aliyun_access_secret)parameters["Signature"] = signature# 3.2 发送请求response = requests.post(SERVICE_URL, data=parameters)body = response.json()print(datetime.now(), " [response body]-> ", body)if response.status_code != 200:raise Exception(f"response http status_code not 200. status_code: {response.status_code}, body: {body}")if body.get("Code") != 200:raise Exception(f"response code not 200. code: {body.get('Code')}, body: {body}")return bodyapp = FastAPI()class InputData(BaseModel):point: strparams: dict = {}@app.post("/api/dify/receive")async def dify_receive(data: InputData = Body(...), authorization: str = Header(None)):"""Receive API query data from Dify."""#print(data)auth_scheme, _, api_key = authorization.partition(" ")if auth_scheme.lower() != "bearer":raise HTTPException(status_code=401, detail="Unauthorized")# api_key decodetry:decoded_bytes = base64.b64decode(api_key)decoded_str = decoded_bytes.decode("utf-8")ak, sk = decoded_str.split(":", 1)except Exception as e:# 如果调用失败,抛出异常raise HTTPException(status_code=401, detail=f"Base64 Decode AK/SK fail: {e}")point = data.pointif point == "ping":return {"result": "pong"}if point == "app.moderation.input":return handle_app_moderation_input(params=data.params, ak=ak, sk=sk)elif point == "app.moderation.output":return handle_app_moderation_output(params=data.params, ak=ak, sk=sk)raise HTTPException(status_code=400, detail="Not implemented")def handle_app_moderation_input(params: dict, ak: str, sk: str):app_id = params.get("app_id")inputs = params.get("inputs", {})query = params.get("query")contents = ([query] if len(query) <= MAX_LENGTH else split_text(query, MAX_LENGTH - 50))# 并发执行bodys = []with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:futures = [executor.submit(request, seg, "input", ak, sk) for seg in contents]for future in concurrent.futures.as_completed(futures):bodys.append(future.result())contentModerationSuggestion=""sensitiveDataSuggestion=""promptAttackSuggestion=""maliciousUrlSuggestion=""_finalSuggestion="pass"desensitization=""# 遍历bodys解析出各个检测项的建议for body in bodys:finalSuggestion = body.get("Data", {}).get("Suggestion", "")detailList = body.get("Data", {}).get("Detail", [])if finalSuggestion and _finalSuggestion!="block" :_finalSuggestion = finalSuggestionfor detail in detailList:suggestion = detail.get("Suggestion", "")type = detail.get("Type", "")if type == "contentModeration":if suggestion and contentModerationSuggestion!="block" :contentModerationSuggestion = suggestionelif type == "sensitiveData":desensitization = detail.get("Result",[])[0].get("Ext",{}).get("Desensitization","")if suggestion and sensitiveDataSuggestion!="block" :sensitiveDataSuggestion = suggestionelif type == "promptAttack":if suggestion and promptAttackSuggestion!="block" :promptAttackSuggestion = suggestionelif type == "maliciousUrl":if suggestion and maliciousUrlSuggestion!="block" :maliciousUrlSuggestion = suggestion# 可以根据不同的场景返回不同的回答内容output_response = "Your content violates our usage policy."if contentModerationSuggestion=="block":output_response = "Your content involves content security."elif sensitiveDataSuggestion=="block" or sensitiveDataSuggestion=="mask":output_response = "Your content involves sensitive data."elif promptAttackSuggestion=="block":output_response = "Your content involves prompt attack."elif maliciousUrlSuggestion=="block":output_response = "Your content involves malicious url."flagged = Falseaction = "direct_output"if _finalSuggestion == "block" :flagged = Trueelif sensitiveDataSuggestion=="mask":flagged = Trueaction = "overridden"query = desensitizationresponse = {"flagged": flagged, "action": action}if flagged:if action == "direct_output":response["preset_response"] = output_responseelif action == "overridden":response["inputs"] = inputsresponse["query"] = queryprint(response)return responsedef handle_app_moderation_output(params: dict, ak: str, sk: str):app_id = params.get("app_id")text = params.get("text", "")print(f"handle_app_moderation_output length:{len(text)}")# 获取最近的2000字符,大小根据需要调整,建议大于dify的窗口大小if len(text) > MAX_LENGTH:content = text[-MAX_LENGTH:]else:content = text# 执行检测body = request(content, "output", ak, sk)contentModerationSuggestion=""sensitiveDataSuggestion=""promptAttackSuggestion=""maliciousUrlSuggestion=""desensitization=""_finalSuggestion=body.get("Data", {}).get("Suggestion", "")detailList = body.get("Data", {}).get("Detail", [])for detail in detailList:suggestion = detail.get("Suggestion", "")type = detail.get("Type", "")if type == "contentModeration":contentModerationSuggestion = suggestionelif type == "sensitiveData":desensitization = detail.get("Result",[])[0].get("Ext",{}).get("Desensitization","")sensitiveDataSuggestion = suggestionelif type == "promptAttack":promptAttackSuggestion = suggestionelif type == "maliciousUrl":maliciousUrlSuggestion = suggestion# 可以根据不同的场景返回不同的回答内容output_response = "Your content violates our usage policy."if contentModerationSuggestion=="block":output_response = "Your content involves content security."elif sensitiveDataSuggestion=="block":output_response = "Your content involves sensitive data."elif promptAttackSuggestion=="block":output_response = "Your content involves prompt attack."elif maliciousUrlSuggestion=="block":output_response = "Your content involves malicious url."flagged = Falseaction = "direct_output"if _finalSuggestion == "block":flagged = Trueelif sensitiveDataSuggestion=="mask":flagged = Trueaction = "overridden"response = {"flagged": flagged, "action": action}if flagged:if action == "direct_output":response["preset_response"] = output_responseelif action == "overridden":response["text"] = desensitizationprint(response)return responseif __name__ == "__main__":import uvicorn# 开放端口可以根据自定义选择uvicorn.run(app, host="0.0.0.0", port=8000, reload=True)
-
将上述Python代码保存到main.py中,使用如下命令启动:
# 启动脚本示例pip install fastapi uvicornuvicorn main:app --reload --host 0.0.0.0
以上输出内容审核的示例代码中,默认为直接回复拒答内容,您可以通过调整返回的action字段,更换为内容替换模式,即将命中的关键词或敏感内容数据替换为*(星号)。
-
新增API扩展:通过设置-API扩展页面,来实现新增API扩展。
API Endpoint:填写部署转发服务脚本后的可访问地址。
API-Key:填写的是阿里云AK/SK通过:拼接后的Base64字符串,伪代码参考base64({aliyun_accessKey_id}:{aliyun_accessKey_secret})
import base64# AccessKeyId 和 AccessKeySecretaccess_key_id = ""access_key_secret = ""# 拼接并编码auth_str = f"{access_key_id}:{access_key_secret}"encoded_auth = base64.b64encode(auth_str.encode('utf-8')).decode('utf-8')print(encoded_auth)
-
在Agent中配置API扩展:需要在Agent中完成API扩展的配置,已实现成功集成。
-
在Agent页面右下角选择管理,去配置内容审查。

-
选择API扩展。
-
选中已创建完成的AI安全护栏的API扩展。
-
根据业务需求选择是否打开输入和输出内容的开关。

在输出时Dify会累计约300个字符做一次内容审查。

-
效果示例:
AI安全能力的建设,不应成为应用开发的额外负担。阿里云AI安全护栏的加入,为Dify社区提供了一种新的可能性:将专业的安全检测能力,以标准化、可配置的方式,无缝融入生成式AI的构建流程中。这不仅是工具链的延伸,更是对“可信赖AI”实践路径的一次探索。
我们相信,只有当安全能力足够轻量、灵活且易于集成,开发者才能在创新与风险控制之间,真正掌握主动权。Dify始终致力于打造开放、透明、可持续的插件生态。也期待每一位开发者在实际场景中验证、反馈并完善这些能力。让AI更好用,也要更安心。
END
🥳
如果你喜欢 Dify,欢迎:
体验 Dify 云端版本:https://dify.ai/
在 GitHub 上给我们点亮:支持我们的开源项目
https://github.com/langgenius/dify
贡献代码,和我们一起打造更强大的 Dify:你的每一行代码都能让 Dify 更加完美。
通过社交媒体和线下活动:分享 Dify 与你的使用心得,让更多人受益于这个强大的工具。
我们正在招聘,简历请投至 joinus@dify.ai。
职位详情见

