大数跨境

企业级高可控智能体实战:Parlant调用Qwen3私有化部署大模型

企业级高可控智能体实战:Parlant调用Qwen3私有化部署大模型 跨境电商创业日记
2025-09-29
109
导读:Parlant,企业级高可控智能体

学习资料:

https://www.parlant.io

https://github.com/emcie-co/parlant

安装:pip install parlant
如果您喜欢冒险并想尝试新功能,还可以直接从 GitHub 安装最新的开发版本:
pip install git+https://github.com/emcie-co/parlant@develop

本次分享安装Develop版本,通过修改SDK的方式,体验本地私有化部署模型的表现。

如果安装Develop版本出错:

WARNING: Ignoring invalid distribution ~umpy (D:\ProgramData\Anaconda3\Lib\site-packages)Collecting git+https://github.com/emcie-co/parlant@develop  Cloning https://github.com/emcie-co/parlant (to revision develop) to c:\users\liang\appdata\local\temp\pip-req-build-jtzrkmza  Running command git clone --filter=blob:none --quiet https://github.com/emcie-co/parlant 'C:\Users\liang\AppData\Local\Temp\pip-req-build-jtzrkmza'  Host key verification failed.  fatal: Could not read from remote repository.
  Please make sure you have the correct access rights  and the repository exists.  error: subprocess-exited-with-error
  × git clone --filter=blob:none --quiet https://github.com/emcie-co/parlant 'C:\Users\liang\AppData\Local\Temp\pip-req-build-jtzrkmza' did not run successfully.  │ exit code: 128  ╰─> See above for output.
  note: This error originates from a subprocess, and is likely not a problem with pip.error: subprocess-exited-with-error
× git clone --filter=blob:none --quiet https://github.com/emcie-co/parlant 'C:\Users\liang\AppData\Local\Temp\pip-req-build-jtzrkmza' did not run successfully.│ exit code: 128╰─> See above for output.
note: This error originates from a subprocess, and is likely not a problem with pip.

https://github.com/emcie-co/parlant/archive/refs/heads/develop.zip下载:

下载完后相关文件夹和文件直接覆盖原有的:

之后找到qwen_service

修改代码(AI辅助编程的修改):

# Copyright 2025 Emcie Co Ltd.## Licensed under the Apache License, Version 2.0 (the "License");# you may not use this file except in compliance with the License.# You may obtain a copy of the License at##     http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing, software# distributed under the License is distributed on an "AS IS" BASIS,# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.# See the License for the specific language governing permissions and# limitations under the License.# Maintainer: Ji Qing <jiqing19861123@163.com>from __future__ import annotationsimport timefrom openai import (    APIConnectionError,    APIResponseValidationError,    APITimeoutError,    AsyncClient,    ConflictError,    InternalServerError,    RateLimitError,)from typing import AnyCallable, Mappingfrom typing_extensions import overrideimport jsonimport jsonfinder  # type: ignoreimport osfrom pydantic import ValidationErrorimport tiktokenfrom parlant.adapters.nlp.common import normalize_json_outputfrom parlant.core.engines.alpha.prompt_builder import PromptBuilderfrom parlant.core.loggers import Loggerfrom parlant.core.nlp.policies import policy, retryfrom parlant.core.nlp.tokenization import EstimatingTokenizerfrom parlant.core.nlp.service import NLPServicefrom parlant.core.nlp.embedding import Embedder, EmbeddingResultfrom parlant.core.nlp.generation import (    T,    SchematicGenerator,    SchematicGenerationResult,)from parlant.core.nlp.generation_info import GenerationInfo, UsageInfofrom parlant.core.nlp.moderation import (    ModerationService,    NoModeration,)RATE_LIMIT_ERROR_MESSAGE = """\Qwen API rate limit exceeded. Possible reasons:1. Your account may have insufficient API credits.2. You may be using a free-tier account with limited request capacity.3. You might have exceeded the requests-per-minute limit for your account.Recommended actions:- Check your Qwen account balance and billing status.- Review your API usage limits in Qwen's dashboard.- For more details on rate limits and usage tiers, visit:    https://docs.bigmodel.cn/cn/faq/api-code"""class QwenEstimatingTokenizer(EstimatingTokenizer):    def __init__(self, model_name: str) -> None:        self.model_name = model_name        self.encoding = tiktoken.encoding_for_model("gpt-4o-2024-08-06")    @override    async def estimate_token_count(self, prompt: str) -> int:        tokens = self.encoding.encode(prompt)        return len(tokens)class QwenEmbedder(Embedder):    supported_arguments = ["dimensions"]    def __init__(self, model_name: str, logger: Logger) -> None:        self.model_name = model_name        self._logger = logger        self._client = AsyncClient(            base_url=os.environ.get(                "BASE_URL""https://dashscope-intl.aliyuncs.com/compatible-mode/v1"            ),            api_key=os.environ.get("DASHSCOPE_API_KEY"""),        )        self._tokenizer = QwenEstimatingTokenizer(model_name=self.model_name)    @property    @override    def id(self) -> str:        return f"qwen/{self.model_name}"    @property    @override    def tokenizer(self) -> QwenEstimatingTokenizer:        return self._tokenizer    @policy(        [            retry(                exceptions=(                    APIConnectionError,                    APITimeoutError,                    ConflictError,                    RateLimitError,                    APIResponseValidationError,                ),            ),            retry(InternalServerError, max_exceptions=2, wait_times=(1.05.0)),        ]    )    @override    async def embed(        self,        texts: list[str],        hints: Mapping[strAny] = {},    ) -> EmbeddingResult:        filtered_hints = {k: v for k, v in hints.items() if k in self.supported_arguments}        try:            response = await self._client.embeddings.create(                model=self.model_name,                input=texts,                **filtered_hints,            )        except RateLimitError:            self._logger.error(RATE_LIMIT_ERROR_MESSAGE)            raise        vectors = [data_point.embedding for data_point in response.data]        return EmbeddingResult(vectors=vectors)class QwenTextEmbedding_V4(QwenEmbedder):    def __init__(self, logger: Logger) -> None:        super().__init__(model_name="qwen3-embedding-8b", logger=logger)                # Override the client to use the embedding-specific endpoint        self._client = AsyncClient(            base_url=os.environ.get(                "EMBEDDING_BASE_URL""http://你的IP:你的端口/v1"            ),            api_key=os.environ.get("DASHSCOPE_API_KEY"""),        )    @property    @override    def max_tokens(self) -> int:        return 8192    @property    def dimensions(self) -> int:        return 4096class QwenSchematicGenerator(SchematicGenerator[T]):    supported_qwen_params = ["temperature""max_tokens"]    def __init__(        self,        model_name: str,        logger: Logger,    ) -> None:        self.model_name = model_name        self._logger = logger        self._client = AsyncClient(            base_url=os.environ.get(                "BASE_URL""https://dashscope-intl.aliyuncs.com/compatible-mode/v1"            ),            api_key=os.environ["DASHSCOPE_API_KEY"],        )        self._tokenizer = QwenEstimatingTokenizer(model_name=self.model_name)    @property    @override    def id(self) -> str:        return f"Qwen/{self.model_name}"    @property    @override    def tokenizer(self) -> QwenEstimatingTokenizer:        return self._tokenizer    @policy(        [            retry(                exceptions=(                    APIConnectionError,                    APITimeoutError,                    ConflictError,                    RateLimitError,                    APIResponseValidationError,                ),            ),            retry(InternalServerError, max_exceptions=2, wait_times=(1.05.0)),        ]    )    @override    async def generate(        self,        prompt: str | PromptBuilder,        hints: Mapping[strAny] = {},    ) -> SchematicGenerationResult[T]:        with self._logger.operation(f"Qwen LLM Request ({self.schema.__name__})"):            return await self._do_generate(prompt, hints)    async def _do_generate(        self,        prompt: str | PromptBuilder,        hints: Mapping[strAny] = {},    ) -> SchematicGenerationResult[T]:        if isinstance(prompt, PromptBuilder):            prompt = prompt.build()        qwen_api_arguments = {k: v for k, v in hints.items() if k in self.supported_qwen_params}        t_start = time.time()        response = await self._client.chat.completions.create(            messages=[{"role""user""content": prompt}],            model=self.model_name,            max_tokens=8 * 1024,            response_format={"type""json_object"},            **qwen_api_arguments,        )        t_end = time.time()        if response.usage:            self._logger.trace(response.usage.model_dump_json(indent=2))        raw_content = response.choices[0].message.content or "{}"        try:            json_content = json.loads(normalize_json_output(raw_content))            # Fix common schema issues: convert conditions from list to string            if isinstance(json_content, dictand 'actions' in json_content:                for action in json_content['actions']:                    if isinstance(action.get('conditions'), listand action['conditions']:                        action['conditions'] = action['conditions'][0]                    elif isinstance(action.get('conditions'), listand not action['conditions']:                        action['conditions'] = ""        except json.JSONDecodeError:            self._logger.warning(f"Invalid JSON returned by {self.model_name}:\n{raw_content})")            json_content = jsonfinder.only_json(raw_content)[2]            self._logger.warning("Found JSON content within model response; continuing...")            # Apply the same fix to extracted JSON            if isinstance(json_content, dictand 'actions' in json_content:                for action in json_content['actions']:                    if isinstance(action.get('conditions'), listand action['conditions']:                        action['conditions'] = action['conditions'][0]                    elif isinstance(action.get('conditions'), listand not action['conditions']:                        action['conditions'] = ""        try:            content = self.schema.model_validate(json_content)            assert response.usage            return SchematicGenerationResult(                content=content,                info=GenerationInfo(                    schema_name=self.schema.__name__,                    model=self.id,                    duration=(t_end - t_start),                    usage=UsageInfo(                        input_tokens=response.usage.prompt_tokens,                        output_tokens=response.usage.completion_tokens,                        extra={                            "cached_input_tokens"getattr(                                response,                                "usage.prompt_cache_hit_tokens",                                0,                            )                        },                    ),                ),            )        except ValidationError:            self._logger.error(                f"JSON content returned by {self.model_name} does not match expected schema:\n{raw_content}"            )            raiseclass Qwen_MAX(QwenSchematicGenerator[T]):    def __init__(self, logger: Logger) -> None:        super().__init__(model_name="qwen-max", logger=logger)    @property    @override    def max_tokens(self) -> int:        return 32 * 1024class Qwen_Plus(QwenSchematicGenerator[T]):    def __init__(self, logger: Logger) -> None:        super().__init__(model_name="qwen-plus", logger=logger)    @property    @override    def max_tokens(self) -> int:        return 128 * 1024class Qwen_2_5_72b(QwenSchematicGenerator[T]):    def __init__(self, logger: Logger) -> None:        super().__init__(model_name="qwen2.5-72b-instruct", logger=logger)    @property    @override    def max_tokens(self) -> int:        return 128 * 1024class Qwen_3_30b_A3B(QwenSchematicGenerator[T]):    def __init__(self, logger: Logger) -> None:        super().__init__(model_name="qwen3-30b-a3b-instruct", logger=logger)    @property    @override    def max_tokens(self) -> int:        return 128 * 1024class QwenService(NLPService):    @staticmethod    def verify_environment() -> str | None:        """Returns an error message if the environment is not set up correctly."""        if not os.environ.get("DASHSCOPE_API_KEY"):            return """\You're using the Qwen NLP service, but DASHSCOPE_API_KEY is not set.Please set DASHSCOPE_API_KEY in your environment before running Parlant."""        return None    def __init__(        self,        logger: Logger,    ) -> None:        self._logger = logger        self._logger.info("Initialized QwenService")        self.model_name = os.environ.get("QWEN_MODEL""qwen-plus")        self._logger.info(f"Qwen model name: {self.model_name}")    def _get_specialized_generator_class(        self,        model_name: str,        t: type[T],    ) -> Callable[..., QwenSchematicGenerator[T]] | None:        """        Returns the specialized generator class for known models        """        model_mapping: dict[strtype[QwenSchematicGenerator[T]]] = {            "qwen-max": Qwen_MAX[t],  # type: ignore            "qwen-plus": Qwen_Plus[t],  # type: ignore            "qwen2.5-72b-instruct": Qwen_2_5_72b[t],  # type: ignore            "qwen3-30b-a3b-instruct": Qwen_3_30b_A3B[t],  # type: ignore        }        if generator_class := model_mapping.get(model_name):            return generator_class        else:            return None    @override    async def get_schematic_generator(self, t: type[T]) -> QwenSchematicGenerator[T]:        qwen_generator = self._get_specialized_generator_class(self.model_name, t)        assert qwen_generator is not Nonef"Unsupported Qwen model: {self.model_name}"        return qwen_generator(self._logger)    @override    async def get_embedder(self) -> Embedder:        return QwenTextEmbedding_V4(logger=self._logger)    @override    async def get_moderation_service(self) -> ModerationService:        return NoModeration()

修改后保存。最后运行以下代码:

# healthcare.pyimport parlant.sdk as pimport asyncioimport osfrom datetime import datetime@p.toolasync def get_insurance_providers(context: p.ToolContext) -> p.ToolResult:    return p.ToolResult(["大型保险公司""顶点保险公司"])@p.toolasync def get_upcoming_slots(context: p.ToolContext) -> p.ToolResult:    # 模拟从数据库或API获取可用时间段    return p.ToolResult(data=["周一上午10点""周二下午2点""周三下午1点"])@p.toolasync def get_later_slots(context: p.ToolContext) -> p.ToolResult:    # 模拟获取后续可用时间段    return p.ToolResult(data=["11月3日上午11:30""11月12日下午3点"])@p.toolasync def schedule_appointment(context: p.ToolContext, datetime: datetime) -> p.ToolResult:    # 模拟预约安排    return p.ToolResult(data=f"预约已安排在 {datetime}")@p.toolasync def get_lab_results(context: p.ToolContext) -> p.ToolResult:    # 模拟从数据库或API获取化验结果,    # 使用上下文中的客户ID。    lab_results = {        "report""所有检测结果都在正常范围内",        "prognosis""患者健康状况良好!",    }    return p.ToolResult(        data={            "report": lab_results["report"],            "prognosis": lab_results["prognosis"],        }    )async def add_domain_glossary(agent: p.Agent) -> None:    await agent.create_term(        name="办公室电话号码",        description="我们办公室的电话号码,+1-234-567-8900",    )    await agent.create_term(        name="办公时间",        description="办公时间为周一至周五,上午9点到下午5点",    )    await agent.create_term(        name="查尔斯·泽维尔",        synonyms=["X教授"],        description="专门从事神经科工作的医生,周一和周二坐诊。",    )    # 如需要,在此处添加其他特定术语和定义...# <<添加此函数>>async def create_scheduling_journey(server: p.Server, agent: p.Agent) -> p.Journey:    # 创建旅程    journey = await agent.create_journey(        title="预约安排",        description="帮助患者找到适合的预约时间。",        conditions=["患者想要预约"],    )    # 首先,确定预约原因    t0 = await journey.initial_state.transition_to(chat_state="确定就诊原因")    # 将即将到来的预约时间段加载到上下文中    t1 = await t0.target.transition_to(tool_state=get_upcoming_slots)    # 询问哪个时间段适合他们    # 我们将根据患者的回复从此处有条件地转换    t2 = await t1.target.transition_to(        chat_state="列出可用时间并询问哪个时间段适合他们"    )    # 我们从患者选择时间的愉快路径开始    t3 = await t2.target.transition_to(        chat_state="在安排前与患者确认详细信息",        condition="患者选择了时间",    )    t4 = await t3.target.transition_to(        tool_state=schedule_appointment,        condition="患者确认了详细信息",    )    t5 = await t4.target.transition_to(chat_state="确认预约已安排")    await t5.target.transition_to(state=p.END_JOURNEY)    # 否则,如果他们说这些时间都不行,询问后续时间段    t6 = await t2.target.transition_to(        tool_state=get_later_slots,        condition="这些时间对患者都不合适",    )    t7 = await t6.target.transition_to(chat_state="列出后续时间并询问是否有合适的时间")    # 如果他们选择时间,转换回我们的愉快路径    await t7.target.transition_to(state=t3.target, condition="患者选择了时间")    # 否则,请他们致电办公室    t8 = await t7.target.transition_to(        chat_state="请患者致电办公室预约",        condition="这些时间对患者也都不合适",    )    await t8.target.transition_to(state=p.END_JOURNEY)    # 使用指导原则有意识地处理边缘情况    await journey.create_guideline(        condition="患者说他们的就诊很紧急",        action="告诉他们立即致电办公室",    )    return journeyasync def create_lab_results_journey(server: p.Server, agent: p.Agent) -> p.Journey:    # 创建旅程    journey = await agent.create_journey(        title="化验结果",        description="获取患者的化验结果并进行解释。",        conditions=["患者想要查看他们的化验结果"],    )    t0 = await journey.initial_state.transition_to(tool_state=get_lab_results)    await t0.target.transition_to(        chat_state="告诉患者结果尚未可用,请稍后再试",        condition="找不到化验结果",    )    await t0.target.transition_to(        chat_state="向患者解释化验结果 - 结果正常",        condition="化验结果良好 - 即无需担心",    )    await t0.target.transition_to(        chat_state="呈现结果并请他们致电办公室 "        "对结果进行澄清,因为你不是医生",        condition="化验结果不好 - 即患者健康有问题",    )    # 使用指导原则处理边缘情况...    await agent.create_guideline(        condition="患者催促你提供更多关于化验结果的结论",        action="明确告诉他们你无法帮助,他们应该致电办公室",    )    return journeyasync def main() -> None:    # 设置千问服务配置    os.environ['BASE_URL'] = 'http://您的IP:端口/v1'  # 私有化部署的千问模型地址    os.environ['EMBEDDING_BASE_URL'] = 'http://您的IP:端口/v1'  # 私有化部署的embedding模型地址    os.environ['DASHSCOPE_API_KEY'] = 'not-needed'  # 私有化部署不需要 API key    os.environ['QWEN_MODEL'] = 'qwen3-30b-a3b-instruct'  # 使用的模型名称        async with p.Server(nlp_service=p.NLPServices.qwen) as server:        agent = await server.create_agent(            name="医疗助手",            description="对患者富有同情心且能安抚情绪。",        )        await agent.create_canned_response("今天我能为您做什么?")        await add_domain_glossary(agent)        scheduling_journey = await create_scheduling_journey(server, agent)        lab_results_journey = await create_lab_results_journey(server, agent)        status_inquiry = await agent.create_observation(            "患者询问跟进就诊情况,但具体方式不明确",        )        # 使用此观察来区分两个旅程        await status_inquiry.disambiguate([scheduling_journey, lab_results_journey])        await agent.create_guideline(            condition="患者询问保险问题",            action="列出我们接受的保险提供商,并告诉他们致电办公室了解更多详情",            tools=[get_insurance_providers],        )        await agent.create_guideline(            condition="患者要求与人工代理通话",            action="请他们致电办公室,提供电话号码",        )        await agent.create_guideline(            condition="患者询问与我们医疗保健无关的事情",            action="礼貌地告诉您无法协助处理无关询问 - 不要回应他们的请求。",        )if __name__ == "__main__":    asyncio.run(main())

实例代码来源参考https://github.com/emcie-co/parlant/blob/develop/examples/healthcare.py

启动报错:

Traceback (most recent call last):  File "D:\ProgramData\anaconda3\Lib\site-packages\parlant\sdk.py", line 64in <module>    from parlant.adapters.vector_db.transient import TransientVectorDatabase  File "D:\ProgramData\anaconda3\Lib\site-packages\parlant\adapters\vector_db\transient.py", line 50in <module>    import nano_vectordb  # type: ignore    ^^^^^^^^^^^^^^^^^^^^  File "D:\ProgramData\anaconda3\Lib\site-packages\nano_vectordb\__init__.py", line 1in <module>    from .dbs import NanoVectorDB, MultiTenantNanoVDB  File "D:\ProgramData\anaconda3\Lib\site-packages\nano_vectordb\dbs.py", line 18in <module>    Data = TypedDict("Data", {"__id__"str"__vector__": np.ndarray})                                                           ^^^^^^^^^^AttributeError: module 'numpy' has no attribute 'ndarray'

使用的是numpy 2.3.3版本,这是一个非常新的版本,而nano-vectordb包(版本0.0.4.3)还在使用旧的numpy.ndarray语法。在numpy 2.0+中,numpy.ndarray已经被弃用,应该使用numpy.typing.NDArray。没办法只能降级numpy到兼容版本。

pip install "numpy<2.0"

启动成功:

启动成功后看效果:http://localhost:8800
进行多轮对话
可以看到流程严谨,规则明确
发现后台有些报错,但不影响正常对话:
[Evaluation(built-in:get_lab_results)] SingleToolBatch attempt 0 failed: ['Traceback (most recent call last):\n', '  File "D:\\ProgramData\\anaconda3\\Lib\\site-packages\\parlant\\core\\engines\\alpha\\tool_calling\\single_tool_batch.py", line 228in _infer_calls_for_single_tool\n    ) = await self._evaluate_tool_calls(inference_output, candidate_descriptor)\n        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n', '  File "D:\\ProgramData\\anaconda3\\Lib\\site-packages\\parlant\\core\\engines\\alpha\\tool_calling\\single_tool_batch.py", line 262in _evaluate_tool_calls\n    descriptor, options = tool.parameters[evaluation.parameter_name]\n                          ~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^\n', "KeyError: 'None'\n"]
出现error就阻断了对话:
2025-09-29T02:57:26.747269Z [error    ] [R2mEaHAj0vh::process][ToolCallerProcessing context for session MkDEb63x0k failed2025-09-29T02:57:26.749230Z [error    ] [R2mEaHAj0vh::process][ToolCallerTraceback (most recent call last):   File "D:\ProgramData\anaconda3\Lib\site-packages\parlant\core\engines\alpha\tool_calling\single_tool_batch.py", line 228in _infer_calls_for_single_tool    ) = await self._evaluate_tool_calls(inference_output, candidate_descriptor)        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^   File "D:\ProgramData\anaconda3\Lib\site-packages\parlant\core\engines\alpha\tool_calling\single_tool_batch.py", line 262in _evaluate_tool_calls    descriptor, options = tool.parameters[evaluation.parameter_name]                          ~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^ KeyError: 'reason_for_visit'
 '"D:\\ProgramData\\anaconda3\\Lib\\site-packages\\parlant\\core\\engines\\alpha\\tool_event_generator.py", ' 'line 131in generate_events\n' '    inference_result = await self._tool_caller.infer_tool_calls(\n' '                       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n', '  File ' '"D:\\ProgramData\\anaconda3\\Lib\\site-packages\\parlant\\core\\engines\\alpha\\tool_calling\\tool_caller.py", ' 'line 182in infer_tool_calls\n' '    return await self._do_infer_tool_calls(context)\n' '           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n', '  File ' '"D:\\ProgramData\\anaconda3\\Lib\\site-packages\\parlant\\core\\engines\\alpha\\tool_calling\\tool_caller.py", ' 'line 220in _do_infer_tool_calls\n' '    batch_results = await async_utils.safe_gather(*batch_tasks)\n' '                    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n', '  File ' '"D:\\ProgramData\\anaconda3\\Lib\\site-packages\\parlant\\core\\async_utils.py", ' 'line 148in safe_gather\n' '    return await asyncio.gather(\n' '           ^^^^^^^^^^^^^^^^^^^^^\n', '  File ' '"D:\\ProgramData\\anaconda3\\Lib\\site-packages\\parlant\\core\\engines\\alpha\\tool_calling\\single_tool_batch.py", ' 'line 138in process\n' '    ) = await self._infer_calls_for_single_tool(\n' '        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n', '  File ' '"D:\\ProgramData\\anaconda3\\Lib\\site-packages\\parlant\\core\\engines\\alpha\\tool_calling\\single_tool_batch.py", ' 'line 239in _infer_calls_for_single_tool\n' '    raise ToolCallBatchError() from last_generation_exception\n', 'parlant.core.engines.alpha.tool_calling.tool_caller.ToolCallBatchError: Tool ' 'Call Batch failed\n']
阻断后需要再次进行对话:
Develop尝鲜版体验:功能先进,流程严谨,规则明确,偶尔卡顿,偶尔报错。
学习总结:

告别 AI “幻觉”:Parlant 框架如何让企业级 AI 代理真正 “可控”?(Develop版本实践的时候还是有些不稳定)

在企业数字化转型的浪潮中,AI 代理本应是提升效率的 “得力助手”—— 客服 AI 处理咨询、金融 AI 解答业务、医疗 AI 辅助预约。但现实往往是:开发者熬夜写了几十条提示词,AI 却在用户面前 “自由发挥”;明明要解决退款问题,AI 却死磕 “先填姓名电话”;工具调用时参数乱填,订单状态查成 “张三” 的…… 这些 “不听话” 的问题,让很多企业的 AI 项目陷入 “投入大、效果差” 的困境。

而 Parlant 的出现,像给混乱的 AI 代理世界立了一套 “交通规则”—— 通过 “行为建模” 的全新范式,让 AI 从 “叛逆少年” 变成 “靠谱助手”。今天我们就来拆解,这个框架如何解决企业级 AI 的核心痛点,以及它能为不同行业带来哪些改变。

一、传统 AI Agent 的三大 “拦路虎”:为什么 AI 总 “不听话”?

在了解 Parlant 之前,我们得先看清传统 AI 代理开发的 “坑”—— 这些问题不是技术不够先进,而是设计思路错了方向。

1. 提示词 “无底洞”:规则越多,AI 越 “糊涂”

传统开发像给 AI 写 “操作手册”,规则一条条堆进去:“客服 AI 要礼貌”“问退款先查订单”“不确定的信息别乱讲”…… 最后提示词能有几百字,结果呢?AI 要么 “选择性失忆”(漏看某条规则),要么 “断章取义”(误解规则优先级)。更麻烦的是,业务一变(比如退款政策调整),整个提示词都要重写,维护成本高到离谱。

曾有电商团队为了让 AI 处理售后,写了 53 条提示词规则,结果用户问 “退货要多久到账”,AI 却回复 “请提供您的订单号”—— 原因是它优先执行了 “问问题先要订单号” 的规则,完全忽略了用户的核心诉求。

2. 对话流程 “僵化”:用户体验被 “流程” 绑架

很多框架用 “状态机” 控制对话:必须先问姓名→再要电话→最后处理需求。这种 “一刀切” 的流程,在真实场景里特别 “反人类”。比如用户急着说 “我买的手机开不了机,要退货”,AI 却机械地问 “请告诉我您的姓名”,大概率会让用户直接关掉对话。

本质上,传统流程把 “机器方便” 放在了 “用户体验” 前面,而用户需要的是 “我有问题,直接解决”,不是 “按步骤填表”。

3. 工具调用 “瞎猜”:该用不用,不该用乱⽤

AI 要查订单、调库存,得调用工具,但传统框架里,AI 全靠 “猜”—— 什么时候调用?传什么参数?比如用户说 “查一下我的订单”,AI 可能乱填参数(把 “李四” 的订单号写成 “张三”),或者在用户还没说清订单号时,就提前调用工具,结果返回 “无此订单”,反而增加用户麻烦。

这些问题的核心,是传统框架把 AI 当成 “需要背手册的机器”,而不是 “能根据场景做判断的助手”。Parlant 的突破,就在于换了一种思路:不硬灌规则,而是给 AI “行为准则”。

二、Parlant 的核心逻辑:用 “行为建模” 让 AI “懂规矩”

Parlant 的设计哲学很简单:从 “控制 AI” 变成 “引导 AI”。传统方法是 “给 AI 一本手册,让它自己翻”,Parlant 则是 “给 AI 一个导航,告诉它‘遇到 A 情况,走 B 路线’”。这套逻辑靠四大核心组件落地,我们用电商客服场景举例,一看就懂。

1. Guidelines:AI 的 “行为准则”,有条件有动作

这是 Parlant 最核心的创新 —— 每个 “准则” 都明确两件事:什么情况触发(Condition) 和该做什么(Action),还能绑定对应的工具。比如电商客服可以设置:

  • 条件:用户询问 “商品是否有货”
  • 动作:调用 “查库存工具”,用预制模板回复结果
  • 工具:check_stock(商品ID)

用代码逻辑表达更直观(非原网页代码,适配电商场景):

await agent.create_guideline(    condition="用户询问商品库存或 availability",    action="获取用户提及的商品ID,调用查库存工具,用模板告知库存状态",    tools=[check_stock]  # 工具只在这个条件下触发,不瞎调用)

这种设计的好处是:规则模块化,改 “库存查询” 的逻辑,不用动其他准则;AI 也不会漏规则 —— 只要满足条件,就会执行对应的动作。

2. Journeys:灵活的 “对话路线”,不绑架用户

如果说 Guidelines 是 “单次动作准则”,Journeys 就是 “完整对话流程”—— 但它不是僵化的 “填表格”,而是能根据用户需求跳步。比如电商 “商品咨询→下单” 的 Journeys 可以设计:

  1. 初始状态:用户提及商品(如 “连衣裙”)
  2. 可跳转:如果用户问 “有货吗”→直接调用查库存工具(跳过 “问姓名”)
  3. 分支处理:有货→推荐尺码 / 搭配;没货→推荐相似款
  4. 兜底:用户说 “要下单”→调用下单工具,确认收货地址

对比传统状态机,Journeys 的核心是 “跟着用户走”—— 用户急着买东西,就不纠结无关信息;用户有疑问,就先解决疑问。比如用户说 “我要这件黑色连衣裙,有 M 码吗”,AI 不用问 “您的姓名”,直接查库存并回复,体验自然多了。

3. Tools:绑定 “准则” 的 “工具箱”,不瞎用

传统框架里,工具是 “散装” 的,AI 想调用就调用;Parlant 里,工具是 “绑定准则” 的 —— 只有触发对应的 Guideline,工具才会被调用,还能明确参数要求。比如 “查订单工具”check_order(order_id),只在 “用户询问订单状态” 的 Guideline 里触发,且必须传入order_id参数,不会出现 “用用户名查订单” 的错误。

电商场景中,“推荐商品工具”recommend_products(category, budget),只会在用户说 “推荐 200 元以内的口红” 时触发,参数自动从用户对话中提取(品类 = 口红,预算 = 200),不用 AI “瞎猜”。

4. Canned Responses:预制 “安全回复”,无幻觉

AI “胡说八道” 的根源,是自由生成内容时脱离控制。Parlant 的 “罐装响应”(Canned Responses)直接用预制模板,从源头杜绝幻觉。比如:

  • 问候模板:“您好 {{customer.name}}!我是电商助手小电,需要帮您查商品、查订单吗?”
  • 库存模板:“您好,{{product.name}} 当前有 {{stock.sizes}} 码现货,预计 {{delivery.time}} 送达~”
  • 售后模板:“您的订单 {{order.id}} 已申请退款,预计 {{refund.time}} 到账,请注意查收。”

这些模板里的变量(如{{product.name}})会自动填充工具返回的数据,既保证了准确性,又能保持品牌话术统一 —— 不会出现 “这款有货” 和 “这款没货” 的矛盾回复。

三、场景:电商客服 AI 的 “变身记”

光说理论不够,我们用一个真实的电商客服场景,再看 Parlant 如何落地。

需求:解决三大核心问题

  1. 处理商品咨询(库存、尺码、价格)
  2. 处理订单售后(退款、物流)
  3. 引导用户下单(推荐、搭配)

第一步:定义 Guidelines(核心行为准则)

# 1. 商品库存查询await agent.create_guideline(    condition="用户询问商品是否有货、库存数量或尺码 availability",    action="提取商品ID/名称,调用查库存工具,用库存模板回复",    tools=[check_stock])# 2. 订单退款咨询await agent.create_guideline(    condition="用户提及退款、退货或退款到账时间",    action="获取订单号,调用查退款状态工具,用退款模板回复",    tools=[check_refund_status])# 3. 商品推荐await agent.create_guideline(    condition="用户请求推荐商品,或提及品类+预算",    action="提取品类(如口红)和预算,调用推荐工具,用推荐模板回复",    tools=[recommend_products])

第二步:设计 Journeys(下单全流程)

# 创建“商品咨询→下单”旅程journey = await agent.create_journey(    title="商品购买引导",    conditions=["用户提及商品、表达购买意向"],    description="从咨询到下单的灵活引导流程")# 状态1:商品咨询→查库存t1 = await journey.initial_state.transition_to(    tool_state=check_stock,  # 触发查库存工具    condition="用户问库存")# 状态2:库存有货→推荐尺码/搭配t2 = await t1.target.transition_to(    chat_state="询问用户需要的尺码,或推荐搭配商品",    condition="库存有货")# 状态3:用户确认购买→调用下单工具t3 = await t2.target.transition_to(    tool_state=create_order,  # 触发下单工具    condition="用户说‘我要下单’或‘买了’")# 状态4:下单成功→告知物流await t3.target.transition_to(    chat_state="用物流模板告知预计送达时间,询问是否需要其他帮助",    condition="下单成功")# 兜底:库存无货→推荐相似款await t1.target.transition_to(    tool_state=recommend_similar,  # 触发相似商品推荐    condition="库存无货")

第三步:工具与模板落地

# 1. 查库存工具@p.toolasync def check_stock(context: p.ToolContext, product_id: str) -> p.ToolResult:    # 从电商系统获取库存数据    stock_data = await ecommerce_db.get_stock(product_id)    return p.ToolResult(        data={            "name": stock_data["name"],            "sizes": stock_data["available_sizes"],            "delivery""2-3天"        },        canned_responses=[            "您好,{{name}}当前有{{sizes}}码现货,预计{{delivery}}送达~"        ]    )# 2. 推荐商品工具@p.toolasync def recommend_products(context: p.ToolContext, category: str, budget: float) -> p.ToolResult:    products = await ecommerce_db.search(category=category, price_max=budget, in_stock=True)    return p.ToolResult(        data={"products": products},        canned_responses=[            "根据您的需求,推荐{{products.length}}款商品:{{#each products}}{{name}}(¥{{price}}){{#unless @last}}、{{/unless}}{{/each}},需要了解某款的详情吗?"        ]    )

最终对话效果:自然且可控

👤 用户:你好,这款黑色连衣裙有M码吗?🤖 小电:稍等,我查一下库存…您好,黑色连衣裙当前有M和L码现货,预计2-3天送达~需要了解尺码表吗?👤 用户:不用,我直接买M码🤖 小电:好的,正在为您创建订单…订单创建成功!订单号是ORD20250929001,预计9月30日送达,收货地址是您上次的“XX小区1号楼”,对吗?👤 用户:对的,谢谢🤖 小电:不客气!后续有订单问题,随时找我哦~

整个对话没有僵化的 “填信息” 步骤,AI 跟着用户需求走,回复全是预制模板 + 真实数据,没有一句 “胡说八道”。

四、Parlant 的 “企业级” 优势:不止 “听话”,更 “可靠”

对企业来说,AI “听话” 只是基础,“可靠” 才是关键 —— 比如金融场景要合规,医疗场景要准确,电商场景要稳定。Parlant 的企业级特性,正好解决这些需求。

1. 可控性:从 “不可预测” 到 “全流程可管”

传统 AI 的输出是 “黑箱”,Parlant 则让每一步都可追溯:

  • 为什么调用这个工具?→ 因为触发了 “查库存” 的 Guideline
  • 回复内容从哪来?→ 来自 “库存模板”+ 工具返回的真实数据
  • 流程为什么跳步?→ 因为 Journeys 允许 “用户问库存直接查,不用填姓名”

这种可控性,让金融场景能严格限制 AI:比如 “用户问投资建议”,AI 只会回复预制的 “根据监管要求,建议咨询持证顾问”,不会出现违规推荐。

2. 可维护:改一点,不影响全局

传统提示词改一个规则,要重写几百字;Parlant 改一个 Guideline,其他规则不受影响。比如电商退款政策从 “7 天无理由” 改成 “15 天无理由”,只需要更新 “退款咨询” 的 Guideline 动作,不用动库存、推荐等其他模块 —— 维护效率提升 80% 以上。

3. 可扩展:多 Agent 协作,覆盖全业务

企业业务不是 “单一线”,比如电商需要 “客服 Agent”“售后 Agent”“物流 Agent”,Parlant 支持多 Agent 部署,还能跨 Agent 协作:

# 同时创建3个Agentasync with p.Server() as server:    cs_agent = await server.create_agent(name="电商客服Agent")  # 处理咨询    after_sales_agent = await server.create_agent(name="售后Agent")  # 处理退款    logistics_agent = await server.create_agent(name="物流Agent")  # 查物流# 客服Agent处理不了的售后问题,自动转售后Agentawait cs_agent.create_guideline(    condition="用户问题涉及售后退款,且客服无法解决",    action="转接售后Agent,并同步对话历史",    tools=[transfer_to_agent(after_sales_agent)])

4. 可监控:数据驱动优化

Parlant 内置监控功能,能实时看核心指标:

  • 对话成功率(用户问题是否解决)
  • 工具调用准确率(参数是否正确)
  • 平均响应时间(用户等多久)

比如发现 “商品推荐” 的成功率只有 60%,可以优化推荐工具的算法;发现 “转人工率” 太高,就补充更多 Guideline 覆盖常见问题 —— 用数据不断优化 AI 表现。

五、未来:AI 代理的 “可靠化” 才是真趋势

Parlant 的价值,不止是解决了当下的痛点,更指明了 AI 代理的未来方向 ——从 “追求聪明” 到 “追求可靠”

未来的 Parlant,还会向这些方向进化:

  • 多模态支持:用户发一张衣服图片,AI 自动识别商品→查库存→推荐搭配,不用手动输入商品名;
  • 自主学习:根据用户反馈自动优化 Guidelines,比如很多用户问 “连衣裙洗后会缩水吗”,系统自动新增 “询问洗护问题→调用洗护说明工具” 的准则;
  • 跨行业适配:金融场景自动识别 “违规话术”,医疗场景严格限制 “不做诊断”,教育场景只回复 “教学范围内的内容”。

对开发者来说,这意味着一种思维转变:不用再死磕 “怎么写提示词让 AI 更聪明”,而是要思考 “怎么设计行为准则让 AI 更可靠”。和业务团队紧密合作,把 “退款流程”“库存规则” 这些业务逻辑,变成 AI 能理解的 Guidelines,才是核心能力。

AI 的 “有用” 比 “炫技” 更重要

很多 AI 技术停留在 “演示阶段”—— 能写诗歌、能画图片,但到了企业真实场景里,却因为 “不听话”“不可控” 被弃用。而 Parlant 的出现,让 AI 从 “实验室里的黑科技” 变成 “生产线上的靠谱工具”。

对企业来说,真正有价值的 AI,不是能 “聊哲学” 的 AI,而是能稳定处理 “查订单”“退货款”“预约挂号” 这些小事的 AI;不是能 “自由创作” 的 AI,而是能严格遵守 “合规规则”“品牌话术” 的 AI。

Parlant 让我们看到:AI 的下一个时代,不是 “更聪明”,而是 “更可靠”。当 AI 能真正 “听话”“可控”,才能成为企业数字化转型的 “得力助手”,而不是 “麻烦制造者”。

如果你也在为 AI “不听话” 头疼,不妨试试用 Parlant 的 “行为建模” 思路 —— 给 AI 一套清晰的 “行为准则”。

企业选择:

Parlant 的分类:典型的混合式(Hybrid)智能体

Parlant 的核心设计(基于 ABM 行为建模引擎)完美融合了 “反应式” 与 “深思熟虑” 的特性,属于混合式智能体,具体依据如下:

1. 具备 “反应式” 的快速规则触发能力

Parlant 的Guidelines(指导原则)是典型的反应式设计:

  • 核心逻辑:“满足条件(Condition)→ 执行动作(Action)”,例如 “用户问退款→调用查订单工具 + 预制模板回复”;
  • 特性:规则模块化、触发即时,无需复杂推理,类似 “条件反射”,但比传统反应式更灵活(支持工具绑定、动态上下文筛选)。

2. 具备 “深思熟虑” 的多步规划与推理能力

Parlant 通过Journeys(对话旅程)ARQ(注意力推理查询)实现深思熟虑特性:

  • Journeys
    支持 “动态流程规划”,例如 “预约医生” 场景中,可根据用户需求跳步(用户说 “头疼”→直接查神经科时段,不强制填姓名),还能处理分支逻辑(有合适时段→确认预约;无→推荐其他时间),本质是 “基于目标的多步决策”;
  • ARQ
    强制 LLM 按 “情境评估→规则匹配→行动规划→执行验证” 的结构化步骤推理,例如判断 “用户是否需要转人工” 时,会参考对话历史、业务规则、用户情绪,而非单纯触发规则;
  • 特性:依赖历史上下文、支持复杂目标拆解、能动态调整策略,符合 “深思熟虑” 的核心定义。

3. 关键补充:动态上下文管理强化混合能力

Parlant 的动态上下文管理机制(仅加载当前场景相关的规则 / 旅程),既避免了反应式 “规则泛滥” 的问题,又减轻了深思熟虑 “认知超载” 的负担,进一步优化了混合式智能体的效率与可控性。

参考:

用反应式架构搭建故障诊断Agent实战

用深思熟虑式架构搭建房产智能投研助手Agent实战

用混合智能体架构搭建投保AI投保助手实战

  • 反应式:基于预设规则 / 当前环境触发,无复杂规划,不依赖历史上下文深推理,比如自动门、简单客服规则。
  • 深思熟虑:会进行目标规划、环境建模、多步推理,考虑历史上下文和未来路径,比如自动驾驶的路径规划、复杂任务分解。
  • 混合式:结合两者,既有预设规则的快速反应,又有动态规划 / 推理能力,平衡效率和复杂度。

Parlant 与 LangChain、LangGraph、Qwen-Agent、Dify 的核心区别

Parlant 与主流框架的差异,本质是设计理念(行为建模 vs 组件组合 / 流程控制 / 低代码) 和核心目标(企业级可控性 vs 灵活度 / 快速落地) 的不同

  • LangChain:模块化组件链,生态丰富,灵活,侧重组件组合,适合快速原型,复杂任务需手动编排,可预测性一般。
  • LangGraph:基于状态机的工作流,精确控制流程,支持循环 / 条件边,适合复杂逻辑,但学习曲线稍高,侧重流程控制。
  • Qwen-Agent:阿里的,中文环境 / 电商场景优势,多模态支持,低代码,自动生成 Web 界面,适合中文相关和快速部署。
  • Dify:低代码 / 无代码,可视化界面,快速部署,私有化支持,适合非技术人员,灵活度稍低。
  • Parlant:行为建模(ABM),混合式智能体,Guidelines+Journeys+ARQ,可控性强(罐装响应、规则模块化),企业级合规,多智能体协同(比如无人机集群),适合高要求的企业场景(金融、医疗)。

Parlant 的框架定位与选择建议

Parlant 并非 “替代其他框架”,而是填补了 “企业级高可控性智能体”的空白:

  • 若需快速原型、灵活组件组合:选 LangChain;
  • 若需复杂流程精确控制:选 LangGraph;
  • 若需中文 / 电商场景、快速 Web 部署:选 Qwen-Agent;
  • 若需非技术人员低代码落地:选 Dify;
  • 若需高合规、零幻觉、多智能体协同(如金融服务、医疗预约、无人机集群):选 Parlant。

简言之,Parlant 是为 “对 AI 行为有严格要求” 的企业场景设计的框架,其核心价值是让 AI 从 “灵活但不可控” 变成 “可控且高效”。

【声明】内容源于网络
0
0
跨境电商创业日记
跨境分享馆 | 每天分享跨境见解
内容 0
粉丝 4
跨境电商创业日记 跨境分享馆 | 每天分享跨境见解
总阅读0
粉丝4
内容0