大数跨境
0
0

LangChain 系统性能对比:Python 与 Go 的实践分析

LangChain 系统性能对比:Python 与 Go 的实践分析 AI大模型观察站
2025-12-28
4
导读:随着 LangChain 应用走向生产,性能成为关键考量。本文对比基于 LangChain 的 Python 与 Go 系统在并发、延迟与资源占用上的表现,分析两种语言在工程实现与运行效率上的差异,为

基于 LangChain 的 Python 与 Go 系统性能对比 Retrieval、Agent 推理、并发、可扩展性等

Lovable,这家高速增长的 AI 公司,今年将其后端从 [Python 迁移到 Go](https://lovable.dev/blog/from-python-to-go),将服务器使用和部署时间从约 15 分钟降至约 3 分钟。Go 正日益被视为现代 AI 系统的理想语言,从高吞吐的 RAG 管道到多工具的 Agent。

但若要真正评估 Go 在生产级 AI 系统中的表现,必须在真实负载下测试,因为 ingestion、embedding、retrieval 和 orchestration 各自都有性能上限。

Go vs Python Benchmarking(作者 Fareed Khan)

这些组件都是潜在瓶颈,编程语言的选择可能决定一个系统是停留在迟缓的原型,还是成长为可扩展、可生产的高可用平台。我们将全面测试,重点覆盖五大领域。

  1. 核心 LLM 交互:单次、原始模型调用有多快?这是所有功能的基石,任何框架在这里引入的开销都会在每个请求上重复支付。
  2. 数据摄取与检索:任何 RAG 系统的核心。我们将测量从原始文档切分到 chunk 的 embedding,再到在压力下的检索的完整管道。
  3. Agent 推理与工具使用:Agent “思考”的成本是什么?我们将测试单步、多跳和高频工具调用循环的开销。
  4. 并发与可扩展性:当 50 个用户同时访问服务时会发生什么?这测试了每种语言在不崩溃的情况下处理真实并行负载的能力。
  5. 运营效率与韧性:隐藏成本是什么?我们将衡量长时间运行的 Agent 的内存占用,以及加入必要可观测性后带来的性能损耗。

在这篇博客中,我们将逐一走通每种基准测试方法,理解 Go 与 Python 的策略差异,并看看为何 Go 往往表现更佳。

所有代码均在我的 GitHub 仓库:

GitHub - FareedKhan-dev/langchain-go-vs-python: Benchmarking RAG and agentic systems in Go vs...

代码库组织如下:


   
   
   
    
   
   
   langchain-go-vs-python/
    ├── agent_multi_hop/
    │   ├── agent.go
    │   └── agent.py
    ├── concurrent_rag/
    │   ├── rag.go
    │   └── rag.py
    ├── data_embedding_batching/
    │   ├── embedding.go
    │   └── embedding.py
    
    ...
    
    ├── gpu_saturation/
    │   ├── saturation.go
    │   └── saturation.py
    ├── ingestion_throughput/
    │   ├── ingestion.go
    │   └── ingestion.py

    ...
    
    ├── resiliency_parsing_failure/
    │   ├── parsing.go
    │   └── parsing.py
    └── workflow_transformation/
        ├── transformation.go
        └── transformation.py

目录

  • 目录
  • 环境搭建
  • 第一部分:核心 LLM 性能基准测试
    • 单轮延迟测试
    • Time-to-First-Token(TTFT)流式测试
  • 第二部分:生产级 RAG 管道测试
    • 大文档切分:CPU 受限任务
    • 批量 Embedding:处理吞吐
    • 完整摄取管道
    • 检索延迟:信息能多快被找到?
    • 端到端 RAG:整合验证
  • 第三部分:Agentic 架构评估
    • 简单 Agent:单次工具调用
    • 多跳 Agent:串联多次工具调用
    • 高频工具使用:测试 Agent 开销
    • 工作流逻辑:数据转换与路由
  • 第四部分:生产就绪的压力测试
    • 并发 RAG 与 Agent 系统:应对真实负载
    • GPU 饱和:将模型服务推向极限
    • 处理超时、工具失败与解析错误
  • 第五部分:衡量真实运营成本
    • 内存占用:长会话对话
    • 可观测性开销:Tracing 的成本

环境搭建

开始基准测试前,我们需要先搭建测试环境。为公平比较 Go 与 Python,二者必须使用完全相同的后端服务。

我们的环境由三部分组成:

  1. 本地 LLM Server(Ollama):用于运行我们的 llama3:8b 模型。
  2. 本地向量数据库(ChromaDB):为 RAG 管道存储和管理 embedding。
  3. Go 与 Python 项目代码:我们将编写并运行的自定义 Go 与 Python 脚本。

首先访问 Ollama 官网,下载适合你操作系统(macOS、Linux 或 Windows)的应用。安装过程非常简单,一键即可。

Ollama 启动后,打开终端拉取 llama3:8b 模型。该模型非常适合本地测试,速度与推理能力均衡。


   
   
   
    
   
   
   # Downlaod 8B llama (you can choose anyother model ollama supported)
ollama pull llama3:8b

为确认一切正常,可直接在终端运行模型:


   
   
   
    
   
   
   # Testing ollama running server
ollama run llama3:8b "Why is the sky blue?"

##### OUTPUT ####
The color of sky is ....

你应能看到模型开始生成回复。对于 RAG 管道基准,我们需要一个向量数据库来存储文档 embedding。我们使用 ChromaDB,你也可以选用其他数据库,但本文使用它做本地 embedding 存储。最简单的方式是用 Docker 运行。

我们需要创建一个简单的 docker-compose.yml 文件。请确保你本地已安装 Docker


   
   
   
    
   
   
   version: '3.10'   # Specifies the docker-compose file format version (compatible with Docker Compose v2.x+)

services:
chroma:# Defines a service named "chroma"
    image:chromadb/chroma# Uses the official ChromaDB Docker image from Docker Hub
    ports:
      -"8000:8000"# Maps port 8000 on the host to port 8000 inside the container
    volumes:
      -chroma_data:/chroma/.chroma/# Mounts a named volume for persistent Chroma data storage

volumes:
chroma_data:# Declares a named volume to persist data outside the container lifecycle

该配置做了两件事:拉取官方的 chromadb/chroma 镜像并启动容器,让它在 localhost:8000 可用;同时创建名为 chroma_data 的 Docker 卷,以确保即便容器停止或重启,向量数据库也能持久化。

启动服务时,在终端进入克隆后的仓库根目录并运行:


   
   
   
    
   
   
   # Composing docker container
docker-compose up -d

最后,你还需要在本地安装 Go。可在官方站点下载,过程同样简单。

我们的每个基准目录(例如 agent_single_toolconcurrent_rag)都是独立的 Go module。

你会注意到每个目录中都有一个 go.mod 文件。它是 Go module 的核心;可类比 Python 的 requirements.txt 或 Node.js 的 package.json

如下是仓库中一个全新 go.mod 的示例:


   
   
   
    
   
   
   module agent_single_tool  // Declares the module name (import path) for your Go project

go 1.22.0                 // Specifies the Go version the module is intended to be built with

它只声明了模块名和 Go 版本。我们用 go mod tidy 来安装依赖。

在 Python 生态中,LangChainLangGraph 和 LangSmith 已经演进为一个完整生态。在 Go 中,这仍处早期阶段。好在已有一个 LangChainGo 实现可以作为 module 使用。

这意味着我们不必事事从零开始,只在必要处自行编码,其余可借助该实现来构建 Go 版本方案。


第一部分:核心 LLM 性能基准测试

在构建复杂的 RAG 管道或 agentic 系统之前,我们必须从最基础的部分开始,也就是与大型语言模型(LLM)进行单次交互的原始速度。

我们构建的每一个高级功能都建立在这一核心操作之上。如果这一层很慢,整个系统就会很慢。

LLM Core benchmark(作者 Fareed Khan)
LLM Core benchmark(作者 Fareed Khan)

本部分我们将测量两个最基本的性能指标:

  1. 完整回答所需时间(latency)。
  2. 模型开始“说话”的速度(Time-to-First-Token,TTFT)。

这能为我们提供一个起点,展示各框架额外引入的开销。

单轮延迟测试

与 LLM 的最基础交互是简单的单轮补全。这就像问聊天机器人一个问题并等待完整答案。它也常用于分类、数据抽取或简易问答等任务。

通过测量它,我们可以了解各框架的原始开销:有多少时间花在框架逻辑上,而非实际的 LLM 推理?

Single turn latency(作者 Fareed Khan)
Single turn latency(作者 Fareed Khan)

我们先编写 Go 版本。构建一个简单基准:只初始化一次 Ollama client,然后在循环中用相同 prompt 调用 LLM 200 次,测量每次耗时。


   
   
   
    
   
   
   package main

import (
"context"
"fmt"
"log"
"time"
"github.com/tmc/langchaingo/llms"
"github.com/tmc/langchaingo/llms/ollama"
)

// --- Configuration ---
const (
 numIterations = 200
 modelName     = "llama3:8b"
 prompt        = "Why is the sky blue?"
)
funcmain() {
 fmt.Println("--- LangChainGo: Single-Turn Completion Latency Test (200 Iterations) ---")

// 1. Initialize the Ollama LLM client.
// This is created once and reused, which is standard practice.
 llm, err := ollama.New(ollama.WithModel(modelName))
if err != nil {
  log.Fatalf("Failed to create Ollama client: %v", err)
 }
 latencies := make([]time.Duration, 0, numIterations)

 ctx := context.Background()
// 3. Start the main benchmark loop.
for i := 0; i < numIterations; i++ {
  start := time.Now()

// 4. Execute the LLM call. This is a blocking call.
// The framework handles HTTP request creation, sending, and JSON parsing.
  _, err := llms.GenerateFromSinglePrompt(ctx, llm, prompt)
if err != nil {
   log.Printf("Warning: Iteration %d failed: %v\n", i+1, err)
   continue
  }
  latency := time.Since(start)
  latencies = append(latencies, latency)
  fmt.Printf("Iteration %d: %v\n", i+1, latency)
 }

// 6. After all iterations, calculate and display the final statistics.
 fmt.Println("\n--- LangChainGo Benchmark Results ---")
 calculateAndPrintStats(latencies)
}

在 main 中我们首先初始化一次 ollama.New client。将其置于循环外以模拟真实应用。

基准的核心是运行 200 次的 for 循环。每次记录开始时间,调用 llms.GenerateFromSinglePrompt 执行主操作,再计算延迟。

这个函数封装了完整的请求-响应周期:

  1. 框架构造 HTTP 请求;
  2. 发送给 Ollama server;
  3. 等待完整响应并解析。我们正是要测量这段时间。

接着看 Python 等效实现。结构非常相似,我们用 langchain_community 连接 Ollama,并用 numpy 做统计。


   
   
   
    
   
   
   import time
import numpy as np
from langchain_community.llms import Ollama

# --- Configuration ---
NUM_ITERATIONS = 200
MODEL_NAME = "llama3:8b"
PROMPT = "Why is the sky blue?"


defmain():
    """Main function to run the benchmark."""
    print("--- LangChain Python: Single-Turn Completion Latency Test (200 Iterations) ---")

    # 1. Initialize the Ollama LLM client.
    llm = Ollama(model=MODEL_NAME)
    latencies = []

    # 3. Start the main benchmark loop.
    for i inrange(NUM_ITERATIONS):
        start_time = time.perf_counter()

        # 4. Execute the LLM call using the modern `.invoke()` method.
        llm.invoke(PROMPT)
        
        end_time = time.perf_counter()
        latency = end_time - start_time
        latencies.append(latency)
        print(f"Iteration {i + 1}{latency:.4f}s")

    # 6. Calculate and display statistics.
    print("\n--- LangChain Python Benchmark Results ---")
    calculate_and_print_stats(latencies)

与 Go 一样,Python 的 main 也只初始化一次 Ollama client。循环使用 .invoke() 进行标准的同步 LLM 调用,并用 time.perf_counter() 高精度计时。

运行两种基准:


   
   
   
    
   
   
   # running go verion
go run latency.go

# running python version
python latency.py

   
   
   
    
   
   
   --- LangChainGo Benchmark Results ---
Total Iterations: 200
Total Time:       199.85s
Min Latency:      980.1ms
Max Latency:      1.15s
Average Latency:  999.2ms
Std Deviation:    28.5ms

--- LangChain Python Benchmark Results ---
Total Iterations: 200
Total Time:       238.1512s
Min Latency:      1152.34ms
Max Latency:      1.48s
Average Latency:  1190.76ms
Std Deviation:    89.31ms

在 200 次请求下差异明显。LangChainGo 稳定更快,平均延迟为 999.2ms,而 Python 为 1190.76ms。

这意味着在最基础操作上就有约 19% 的性能提升。

标准差也表明 Go 的波动更小(28.5ms vs 89.31ms)。

原因在于 Go 以二进制运行,几乎没有启动或解释开销;而 Python 每次都会引入些许延迟,累计效应明显。

Time-to-First-Token(TTFT)流式测试

在构建用户应用(如聊天机器人)时,总延迟并非唯一重要指标。

真正影响体验的是用户看到第一个响应的速度。

TTFT(作者 Fareed Khan)
TTFT(作者 Fareed Khan)

TTFT 度量的是首个输出 token 的出现时间。TTFT 越低,应用感觉越“跟手”,即便完整回复仍需时间。

在 Go 实现中,我们利用并发原语精确测量。用一个 channel 在流式回调收到首块数据的瞬间发信号。


   
   
   
    
   
   
   func main() {
 fmt.Println("--- LangChainGo: Streaming Time-to-First-Token (TTFT) Test (200 Iterations) ---")

 llm, err := ollama.New(ollama.WithModel(modelName))
if err != nil {
  log.Fatalf("Failed to create Ollama client: %v", err)
 }

 ttfts := make([]time.Duration, 0, numIterations)
 ctx := context.Background()

 fmt.Printf("Running %d iterations with model '%s'...\n\n", numIterations, modelName)

for i := 0; i < numIterations; i++ {
// `firstTokenCh` is a channel that acts as a signal. It's a highly efficient
// way for a goroutine to notify another that an event has occurred.
// A buffer of 1 prevents the sending goroutine from blocking.
  firstTokenCh := make(chanstruct{}, 1)

// `wg` ensures that the main loop doesn't exit before the streaming call is fully completed.
var wg sync.WaitGroup
  wg.Add(1)

  start := time.Now()

// The streaming function callback. It's invoked for every chunk received.
  streamingFunc := func(ctx context.Context, chunk []byte)error {
   // This select statement is the core of the TTFT measurement.
   // It attempts to send a signal on the channel.
   select {
   case firstTokenCh <- struct{}{}:
    // This case will only execute for the VERY FIRST chunk received.
    // Subsequent calls will find the channel is already full (or closed) and go to the default case.
   default:
    // Do nothing for subsequent chunks. This is an efficient no-op.
   }
   returnnil
  }

// We run the LLM call in a separate goroutine to allow the main thread
// to immediately start waiting for the first token signal.
gofunc() {
   defer wg.Done()
   _, err := llms.GenerateFromSinglePrompt(
    ctx,
    llm,
    longPrompt,
    llms.WithStreamingFunc(streamingFunc),
   )
   if err != nil {
    log.Printf("Warning: Goroutine for iteration %d failed: %v\n", i+1, err)
    // In case of error, we must still unblock the main thread to avoid a deadlock.
    select {
    case firstTokenCh <- struct{}{}:
    default:
    }
   }
  }()

// The main thread blocks here until the signal is received on `firstTokenCh`.
// The time it takes to get here from `start` is our TTFT.
  <-firstTokenCh
  ttft := time.Since(start)
  ttfts = append(ttfts, ttft)
  fmt.Printf("Iteration %d: TTFT = %v\n", i+1, ttft)

// Wait for the entire stream to finish before starting the next iteration.
// This prevents subsequent tests from being affected by lingering background processes.
  wg.Wait()
 }

 fmt.Println("\n--- LangChainGo Benchmark Results ---")
 calculateAndPrintStats(ttfts)
}

上面代码的关键是 streamingFunc。它被传入 LLM 调用,每收到一个流式 chunk 就执行一次。

内部的 select 尝试向 firstTokenCh 发送信号。由于该 channel 的缓冲区大小为 1,这个发送操作仅会在第一次成功。之后的调用命中 default 不执行任何操作。

与此同时,主循环在后台 goroutine 中发起调用,然后立刻在 <-firstTokenCh 处等待。首个 chunk 到达且回调发送信号的瞬间,该行解除阻塞并计算耗时,这就是 TTFT。我们还用 WaitGroup 确保开始下一轮前,上一轮流式输出已完整结束。

Python 版本使用迭代器实现相同目标。.stream() 返回迭代器,只有调用 next() 时才会真正发起网络请求。


   
   
   
    
   
   
   def main():
    """Main function to run the streaming benchmark."""
    print("--- LangChain Python: Streaming Time-to-First-Token (TTFT) Test (200 Iterations) ---")

    llm = Ollama(model=MODEL_NAME)
    ttfts = []

    print(f"Running {NUM_ITERATIONS} iterations with model '{MODEL_NAME}'...\n")

    for i inrange(NUM_ITERATIONS):
        try:
            start_time = time.perf_counter()

            # 1. `.stream()` returns an iterator. No network call is made at this point.
            # It's a lazy operation.
            stream_iterator = llm.stream(LONG_PROMPT)

            # 2. The actual network request is initiated when we first try to get an item
            # from the iterator. The time spent in this `next()` call is what we
            # measure as TTFT.
            next(stream_iterator)
            
            first_token_time = time.perf_counter()

            # 3. Calculate and store the TTFT.
            ttft = first_token_time - start_time
            ttfts.append(ttft)
            print(f"Iteration {i + 1}: TTFT = {ttft * 1000:.2f}ms")

            # 4. VERY IMPORTANT: We must consume the entire iterator to close the connection.
            # If we don't, the underlying HTTP connection may be left open in the connection pool,
            # which can exhaust resources and cause subsequent tests to fail or hang.
            # This is a key difference in resource management compared to the Go callback model.
            for _ in stream_iterator:
                pass
        
        except Exception as e:
            print(f"Warning: Iteration {i + 1} failed: {e}")

    print("\n--- LangChain Python Benchmark Results ---")
    calculate_and_print_stats(ttfts)

在 Python 中,理解迭代器的惰性至关重要。调用 llm.stream(LONG_PROMPT) 只返回迭代器对象,并未发出网络请求;直到 next(stream_iterator) 才真正发送。我们先启动计时,然后 stream() 接着立刻 next(),代码会阻塞在 next(),直到服务端返回第一个 token 为止。此时记录时间差即为 TTFT。最后必须消费完整个迭代器以关闭底层 HTTP 连接,否则连接池资源可能被耗尽。

运行 TTFT 基准:


   
   
   
    
   
   
   # running go verion
go run ttft.go

# running python version
python ttft.py

   
   
   
    
   
   
   --- LangChainGoBenchmarkResults---
Total Iterations:200
Total Time:       28.15s
Min Latency:      135.2ms
Max Latency:      159.8ms
Average Latency:140.75ms
Std Deviation:    4.1ms

---LangChainPythonBenchmarkResults---
Total Iterations:200
Total Time:       42.6743s
Min Latency:      205.10ms
Max Latency:      251.56ms
Average Latency:213.37ms
Std Deviation:    15.88ms

可以看到在 TTFT 评估上,Go 同样表现更佳。

Go 的平均 TTFT 为 140.75ms,相比 LangChain Python 的 213.37ms,约快 51%。

Go 能几乎即时处理首个数据块并触发回调,而 Python 额外的抽象层让其慢了一拍。

对于任何实时、对话式 AI,这个初始体验至关重要。更快的 TTFT 直接转化为更好的用户体验。

第二部分:生产级 RAG 管道测试

有了核心 LLM 性能的基线,我们可以进入更复杂也更贴近生产的场景:构建完整的 Retrieval-Augmented Generation(RAG)管道。

在生产环境中,Go 与 Python 的 RAG 实现差异将更加明显,尤其在效率与吞吐至关重要的情况下。

RAG Benchmark(作者 Fareed Khan)
RAG Benchmark(作者 Fareed Khan)

RAG 系统性能不仅取决于最终的 LLM 调用,而在于……

它是多个步骤之和:加载数据、切分、embedding、存储与检索。任何一步的瓶颈都可能拖垮全局。

本部分我们从零搭建摄取与查询的完整管道,并对每个关键步骤基准测试。

大文档切分:CPU 受限任务

在处理文档之前,必须先将其拆分为更小、可管理的 chunk。这是纯 CPU 受限任务,涉及大量字符串处理。

CPU Bound Task(作者 Fareed Khan)
CPU Bound Task(作者 Fareed Khan)

这是摄取管道的第一大步骤,其效率直接影响你处理新信息的速度。

一个低效的切分器在需要同时摄取成千上万文档时会成为严重瓶颈。

我们先看 Go 实现。加载一个 10MB 的大文件,并多次运行 RecursiveCharacterTextSplitter 以得到稳定的性能测度。


   
   
   
    
   
   
   package main

// --- Configuration ---
const (
 largeFilePath = "../data_large/large_document.txt"
 chunkSize     = 1000
 chunkOverlap  = 200
 numIterations = 5// Repeat to get a stable average
)

funcmain() {
 fmt.Println("--- LangChainGo: Text Splitter Throughput Test ---")

// 1. Load the large document into memory first.
// We do this outside the loop so we only benchmark the splitting, not file I/O.
 content, err := os.ReadFile(largeFilePath)
if err != nil {
  log.Fatalf("Failed to read large document file: %v", err)
 }
 doc := []schema.Document{{PageContent: string(content)}}
 docSizeMB := float64(len(content)) / (1024 * 1024)
 fmt.Printf("Loaded document of size: %.2f MB\n", docSizeMB)

// 2. Initialize the text splitter.
 splitter := textsplitter.NewRecursiveCharacter(
  textsplitter.WithChunkSize(chunkSize),
  textsplitter.WithChunkOverlap(chunkOverlap),
 )

var latencies []time.Duration
var totalChunks int

// --- Profiling Setup ---
var startMem, endMem runtime.MemStats
 runtime.ReadMemStats(&startMem)

 fmt.Printf("\nRunning %d splitting iterations...\n", numIterations)
 totalStart := time.Now()

// 3. Run the benchmark loop.
for i := 0; i < numIterations; i++ {
  iterStart := time.Now()

// This is the core CPU-bound operation we are measuring.
  chunks, err := textsplitter.SplitDocuments(splitter, doc)
if err != nil {
   log.Fatalf("Iteration %d failed: %v", i+1, err)
  }

  latency := time.Since(iterStart)
  latencies = append(latencies, latency)
  totalChunks = len(chunks) // Should be the same each time
  fmt.Printf("Iteration %d: Split into %d chunks in %v\n", i+1, totalChunks, latency)
 }

 totalDuration := time.Since(totalStart)
 runtime.ReadMemStats(&endMem)

// --- Calculate and Print Metrics ---
var totalLatency time.Duration
for _, l := range latencies {
  totalLatency += l
 }
 avgLatency := totalLatency / time.Duration(numIterations)
 throughputMBps := (docSizeMB * float64(numIterations)) / totalDuration.Seconds()
 throughputChunksps := float64(totalChunks*numIterations) / totalDuration.Seconds()

 heapAlloc := endMem.Alloc - startMem.Alloc
 totalAlloc := endMem.TotalAlloc - startMem.TotalAlloc
 numGC := endMem.NumGC - startMem.NumGC
 pauseTotalMs := float64(endMem.PauseTotalNs-startMem.PauseTotalNs) / 1_000_000

 fmt.Println("\n--- LangChainGo Splitting Results ---")
 fmt.Printf("Average Latency per Run: %v\n", avgLatency)
 fmt.Printf("Throughput (Data Size):  %.2f MB/s\n", throughputMBps)
 fmt.Printf("Throughput (Chunks):     %.2f chunks/s\n", throughputChunksps)
 fmt.Println("\n--- Memory & GC Metrics ---")
 fmt.Printf("Peak Heap Increase:      %.2f KB\n"float64(heapAlloc)/1024)
 fmt.Printf("Total Alloc (Churn):     %.2f MB\n"float64(totalAlloc)/(1024*1024))
 fmt.Printf("Number of GC Runs:       %d\n", numGC)
 fmt.Printf("Total GC Pause Time:     %.2f ms\n", pauseTotalMs)
}

我们将代码设计为隔离切分器的性能:

  1. 仅加载一次磁盘文档,置于循环外,确保只测切分性能;
  2. 基准中重复调用 textsplitter.SplitDocuments(负责递归切分的重活);
  3. 记录每轮耗时,同时跟踪内存与 GC 指标,观察 Go 在 CPU 压力下资源管理效率。

再看 Python 实现对比:


   
   
   
    
   
   
   # --- Configuration ---
LARGE_FILE_PATH = "../data_large/large_document.txt"
CHUNK_SIZE = 1000
CHUNK_OVERLAP = 200
NUM_ITERATIONS = 5

defget_cpu_time():
    """Gets the CPU time used by the current process."""
    process = psutil.Process(os.getpid())
    return process.cpu_times().user

defget_memory_usage():
    """Gets the current memory usage (RSS) of the process in bytes."""
    process = psutil.Process(os.getpid())
    return process.memory_info().rss

defmain():
    """Main function to run the text splitter benchmark."""
    print("--- LangChain Python: Text Splitter Throughput Test ---")

    # 1. Load the document.
    loader = TextLoader(LARGE_FILE_PATH, encoding="utf-8")
    doc = loader.load()
    doc_size_mb = os.path.getsize(LARGE_FILE_PATH) / (1024 * 1024)
    print(f"Loaded document of size: {doc_size_mb:.2f} MB")

    # 2. Initialize the text splitter.
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=CHUNK_SIZE,
        chunk_overlap=CHUNK_OVERLAP
    )

    latencies = []
    total_chunks = 0

    # --- Profiling Setup ---
    start_mem = get_memory_usage()
    start_cpu = get_cpu_time()

    print(f"\nRunning {NUM_ITERATIONS} splitting iterations...\n")
    total_start_time = time.perf_counter()

    # 3. Run the benchmark loop.
    for i inrange(NUM_ITERATIONS):
        iter_start_time = time.perf_counter()

        # This is the core CPU-bound operation.
        chunks = text_splitter.split_documents(doc)
        
        latency = time.perf_counter() - iter_start_time
        latencies.append(latency)
        total_chunks = len(chunks)
        print(f"Iteration {i + 1}: Split into {total_chunks} chunks in {latency:.4f}s")
        
    total_duration = time.perf_counter() - total_start_time
    end_mem = get_memory_usage()
    end_cpu = get_cpu_time()

    # --- Calculate and Print Metrics ---
    avg_latency = np.mean(latencies)
    throughput_mb_ps = (doc_size_mb * NUM_ITERATIONS) / total_duration
    throughput_chunks_ps = (total_chunks * NUM_ITERATIONS) / total_duration
    
    mem_increase = end_mem - start_mem
    cpu_used = end_cpu - start_cpu

    print("\n--- LangChain Python Splitting Results ---")
    print(f"Average Latency per Run: {avg_latency:.4f}s")
    print(f"Throughput (Data Size):  {throughput_mb_ps:.2f} MB/s")
    print(f"Throughput (Chunks):     {throughput_chunks_ps:.2f} chunks/s")
    print("\n--- Resource Metrics ---")
    print(f"Memory Usage Increase (RSS): {mem_increase / 1024:.2f} KB")
    print(f"Total CPU Time Used:         {cpu_used:.4f}s")

if __name__ == "__main__":
    main()

逻辑相同:

  1. 用 TextLoader 加载文档,初始化 RecursiveCharacterTextSplitter
  2. 循环 NUM_ITERATIONS 次,计时 split_documents(doc)
  3. 用 psutil 统计总 CPU 时间与内存增长(RSS)。

运行对比:


   
   
   
    
   
   
   # running go version
go run splitter.go

# running python version
python splitter.py

   
   
   
    
   
   
   --- LangChainGo:TextSplitterThroughputTest---
Loaded document of size:10.05MB
...
---LangChainGoSplittingResults---
Average Latency per Run:151.2ms
Throughput (Data Size):66.47MB/s
Throughput (Chunks):     83088.62chunks/s
---Memory&GCMetrics---
Total Alloc (Churn):     95.15MB
Total GC Pause Time:     0.81ms


---LangChain Python:TextSplitterThroughputTest---
Loaded document of size:10.05MB
...
---LangChainPythonSplittingResults---
Average Latency per Run:3.5476s
Throughput (Data Size):2.83MB/s
Throughput (Chunks):     3543.83 chunks/s
---ResourceMetrics---
Total CPU Time Used:         17.5123s

结果出乎意料地“悬殊”。

LangChainGo 平均 151ms 完成 10MB 文件处理,吞吐达 66.47 MB/s;LangChain Python 平均 3.5s,吞吐仅 2.83 MB/s。

速度提升达 23 倍。对于字符串处理这类 CPU 受限任务,Go 的编译特性优势巨大——直接编译为本机码;而 Python 的解释器与 GIL 引入显著开销,这也体现在较高的 Total CPU Time Used 上。

此外,Go 的垃圾回收在内存 churn 与暂停时间方面也表现稳定。

批量 Embedding:处理吞吐

切分完文档后,下一步是将其转换为向量 embedding。该过程既涉及 CPU(分词)也涉及网络 I/O(以批次发送到 Ollama)。效率的关键在于 batching 与 concurrency。

Batch Embedding(作者 Fareed Khan)
Batch Embedding(作者 Fareed Khan)

我们将测试两种策略:

  1. 顺序批量:单线程处理所有 chunk。
  2. 并发批量:多工作线程并行处理批次。

这将展示框架在并行网络请求与 CPU 资源管理方面的能力。

先看 Go 实现。我们用 goroutine 构建一个 worker pool 来做并发测试。


   
   
   
    
   
   
   package main

// --- Configuration ---
const (
 largeFilePath    = "../data_large/large_document.txt"
 modelName        = "llama3:8b"
 chunkSize        = 1000
 chunkOverlap     = 200
 batchSize        = 100// How many documents to send in a single embedding request
 concurrencyLevel = 8   // Number of parallel workers for the concurrent test
)


funcmain() {
 fmt.Println("--- LangChainGo: Embedding Batching Performance Test ---")

// 1. Prepare the data: Load and split the document into chunks.
 chunks := prepareChunks()
 fmt.Printf("Prepared %d text chunks for embedding.\n"len(chunks))

// 2. Initialize the embedder.
 llm, err := ollama.New(ollama.WithModel(modelName))
if err != nil {
  log.Fatalf("Failed to create LLM for embedder: %v", err)
 }
 embedder, err := embeddings.NewEmbedder(llm)
if err != nil {
  log.Fatalf("Failed to create embedder: %v", err)
 }

// --- Run Sequential Batching Test ---
 runSequentialTest(embedder, chunks)

// --- Run Concurrent Batching Test ---
 runConcurrentTest(embedder, chunks)
}

// prepareChunks loads a large file and splits it into text chunks.
funcprepareChunks() []schema.Document {
 content, err := os.ReadFile(largeFilePath)
if err != nil {
  log.Fatalf("Failed to read large document file: %v", err)
 }
 doc := []schema.Document{{PageContent: string(content)}}
 splitter := textsplitter.NewRecursiveCharacter(
  textsplitter.WithChunkSize(chunkSize),
  textsplitter.WithChunkOverlap(chunkOverlap),
 )
 chunks, err := textsplitter.SplitDocuments(splitter, doc)
if err != nil {
  log.Fatalf("Failed to split documents: %v", err)
 }
return chunks
}

然后编写顺序批处理函数,负责将 chunk 做 embedding:


   
   
   
    
   
   
   // runSequentialTest benchmarks embedding chunks one batch at a time.
funcrunSequentialTest(embedder embeddings.Embedder, docs []schema.Document) {
 fmt.Println("\n--- Starting Sequential Embedding Test ---")

 docContents := make([]stringlen(docs))
for i, doc := range docs {
  docContents[i] = doc.PageContent
 }

 start := time.Now()
var startMem runtime.MemStats
 runtime.ReadMemStats(&startMem)

// The CreateEmbedding function in langchaingo handles batching automatically.
 _, err := embedder.CreateEmbedding(context.Background(), docContents)
if err != nil {
  log.Fatalf("Sequential embedding failed: %v", err)
 }
 duration := time.Since(start)
var endMem runtime.MemStats
 runtime.ReadMemStats(&endMem)
 throughput := float64(len(docs)) / duration.Seconds()
 totalAlloc := endMem.TotalAlloc - startMem.TotalAlloc

 fmt.Println("--- Go Sequential Results ---")
 fmt.Printf("Total Time:         %v\n", duration)
 fmt.Printf("Throughput:         %.2f docs/sec\n", throughput)
 fmt.Printf("Total Alloc (Churn): %.2f MB\n"float64(totalAlloc)/(1024*1024))
}

// runConcurrentTest benchmarks embedding chunks using multiple parallel workers.
funcrunConcurrentTest(embedder embeddings.Embedder, docs []schema.Document) {
 fmt.Println("\n--- Starting Concurrent Embedding Test ---")

// Create a channel to distribute chunks to workers.
 tasks := make(chanstringlen(docs))
for _, doc := range docs {
  tasks <- doc.PageContent
 }
close(tasks)
var wg sync.WaitGroup

 start := time.Now()
var startMem runtime.MemStats
 runtime.ReadMemStats(&startMem)

// 1. Start the concurrent workers (goroutines).
for i := 0; i < concurrencyLevel; i++ {
  wg.Add(1)
gofunc(workerID int) {
   defer wg.Done()
   
   batch := make([]string0, batchSize)
   
   // Each worker pulls tasks from the channel until it's empty.
   for task := range tasks {
    batch = append(batch, task)
    iflen(batch) == batchSize {
     // When a batch is full, embed it.
     _, err := embedder.CreateEmbedding(context.Background(), batch)
     if err != nil {
      log.Printf("Worker %d failed: %v", workerID, err)
     }
     // Reset the batch.
     batch = make([]string0, batchSize)
    }
   }
   
   // Embed any remaining items in the last batch.
   iflen(batch) > 0 {
    _, err := embedder.CreateEmbedding(context.Background(), batch)
    if err != nil {
     log.Printf("Worker %d failed on final batch: %v", workerID, err)
    }
   }
  }(i)
 }

// 2. Wait for all workers to finish.
 wg.Wait()
 duration := time.Since(start)
var endMem runtime.MemStats
 runtime.ReadMemStats(&endMem)
 throughput := float64(len(docs)) / duration.Seconds()
 totalAlloc := endMem.TotalAlloc - startMem.TotalAlloc
 fmt.Println("--- Go Concurrent Results ---")
 fmt.Printf("Total Time:         %v\n", duration)
 fmt.Printf("Throughput:         %.2f docs/sec\n", throughput)
 fmt.Printf("Total Alloc (Churn): %.2f MB\n"float64(totalAlloc)/(1024*1024))
}

这里我们采用典型的 worker pool 模式:

  1. 创建 tasks channel 并填入所有 chunk,启动 concurrencyLevel 个 goroutine;
  2. 每个 worker 从 channel 拉取任务,凑满 batchSize 后调用 CreateEmbedding,实现并发批处理。

Go 的轻量 goroutine 天然适配 I/O 并发,有助于充分压榨到 Ollama server 的网络带宽。

Python 版本使用 ThreadPoolExecutor 实现类似并发:


   
   
   
    
   
   
   # --- Configuration ---
LARGE_FILE_PATH = "../data_large/large_document.txt"
MODEL_NAME = "llama3:8b"
CHUNK_SIZE = 1000
CHUNK_OVERLAP = 200
BATCH_SIZE = 100
CONCURRENCY_LEVEL = 8
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

defget_cpu_time():
    """Gets the CPU time used by the current process."""
    process = psutil.Process(os.getpid())
    return process.cpu_times().user

defget_memory_usage():
    """Gets the current memory usage (RSS) of the process in bytes."""
    process = psutil.Process(os.getpid())
    return process.memory_info().rss

defprepare_chunks():
    """Loads a large file and splits it into text chunks."""
    loader = TextLoader(LARGE_FILE_PATH, encoding="utf-8")
    doc = loader.load()
    text_splitter = RecursiveCharacterTextSplitter(chunk_size=CHUNK_SIZE, chunk_overlap=CHUNK_OVERLAP)
    chunks = text_splitter.split_documents(doc)
    return [c.page_content for c in chunks]

defrun_sequential_test(embedder, chunks):
    """Benchmarks embedding chunks one batch at a time."""
    print("\n--- Starting Sequential Embedding Test ---")
    
    start_time = time.perf_counter()
    start_mem = get_memory_usage()
    start_cpu = get_cpu_time()
    try:
        embedder.embed_documents(chunks)
    except Exception as e:
        logging.fatal(f"Sequential embedding failed: {e}")
        return
    duration = time.perf_counter() - start_time
    end_mem = get_memory_usage()
    end_cpu = get_cpu_time()
    throughput = len(chunks) / duration
    mem_increase = end_mem - start_mem
    cpu_used = end_cpu - start_cpu
    
    print("--- Python Sequential Results ---")
    print(f"Total Time:         {duration:.4f}s")
    print(f"Throughput:         {throughput:.2f} docs/sec")
    print(f"Memory Increase (RSS): {mem_increase / 1024:.2f} KB")
    print(f"Total CPU Time Used:   {cpu_used:.4f}s")

defembed_batch(embedder, batch):
    """A helper function for a single worker to embed one batch."""
    try:
        embedder.embed_documents(batch)
        returnlen(batch)
    except Exception as e:
        logging.warning(f"A batch failed to embed: {e}")
        return0

defrun_concurrent_test(embedder, chunks):
    """Benchmarks embedding chunks using multiple parallel workers."""
    print("\n--- Starting Concurrent Embedding Test ---")
    
    start_time = time.perf_counter()
    start_mem = get_memory_usage()
    start_cpu = get_cpu_time()
    
    with ThreadPoolExecutor(max_workers=CONCURRENCY_LEVEL) as executor:
        batches = [chunks[i:i + BATCH_SIZE] for i inrange(0len(chunks), BATCH_SIZE)]
        
        futures = [executor.submit(embed_batch, embedder, batch) for batch in batches]
        
        for future in as_completed(futures):
            future.result()
    duration = time.perf_counter() - start_time
    end_mem = get_memory_usage()
    end_cpu = get_cpu_time()
    throughput = len(chunks) / duration
    mem_increase = end_mem - start_mem
    cpu_used = end_cpu - start_cpu
    print("--- Python Concurrent Results ---")
    print(f"Total Time:         {duration:.4f}s")
    print(f"Throughput:         {throughput:.2f} docs/sec")
    print(f"Memory Increase (RSS): {mem_increase / 1024:.2f} KB")
    print(f"Total CPU Time Used:   {cpu_used:.4f}s")

defmain():
    print("--- LangChain Python: Embedding Batching Performance Test ---")
    chunks = prepare_chunks()
    print(f"Prepared {len(chunks)} text chunks for embedding.")
    
    embedder = OllamaEmbeddings(model=MODEL_NAME)
    
    run_sequential_test(embedder, chunks)
    run_concurrent_test(embedder, chunks)
if __name__ == "__main__":
    main()

Python 使用标准库的 ThreadPoolExecutor 管理线程池。我们先将 chunk 切成 batches,再提交任务给线程池,并通过 as_completed 等待所有任务完成。

运行基准:


   
   
   
    
   
   
   # running go version
go run embedding.go

# running python version
python embedding.py

   
   
   
    
   
   
   --- LangChainGo:EmbeddingBatchingPerformanceTest---
---GoSequentialResults---
Total Time:         1m31s
Throughput:         138.02docs/sec
---GoConcurrentResults---
Total Time:         22.5s
Throughput:         558.22docs/sec


---LangChain Python:EmbeddingBatchingPerformanceTest---
---PythonSequentialResults---
Total Time:         128.1234s
Throughput:         98.03docs/sec
---PythonConcurrentResults---
Total Time:         49.8123s
Throughput:         252.15docs/sec

结果表明两点:其一,并发对两种语言都有显著提升。Go 吞吐从 138 docs/sec 提升到 558 docs/sec(4 倍);Python 则从 98 提升至 252 docs/sec。

其二,Go 的并发实现更高效:

在 8 个并发 worker 下,LangChainGo 的吞吐为 558 docs/sec,是 LangChain Python(252 docs/sec)的 2.2 倍以上。

这是 Go 轻量级并发模型的优势所在。相比 Python 线程,goroutine 开销极低;Go 的调度器对 I/O 受限任务高度优化,能更有效地管理并行网络请求,从而获得更高总体吞吐。在需要实时对数据流做 embedding 的生产系统中,这是关键优势。

完整摄取管道

现在我们把上述两步结合起来,对完整的摄取管道进行基准:从磁盘加载原始文件,到 embedding 并存入 ChromaDB。这样可得到端到端的真实摄取性能衡量。

Full ingestion(作者 Fareed Khan)
Full ingestion(作者 Fareed Khan)

Go 实现是前述组件的顺序拼接:加载、切分、存储。


   
   
   
    
   
   
   package main

funcmain() {
 fmt.Println("--- LangChainGo: RAG Ingestion Throughput Test ---")

 totalStart := time.Now()

// 1. Initialize Embedder
 ollamaLLM, err := ollama.New(ollama.WithModel(modelName))
if err != nil {
  log.Fatalf("Failed to create Ollama client for embeddings: %v", err)
 }
 embedder, err := embeddings.NewEmbedder(ollamaLLM)
if err != nil {
  log.Fatalf("Failed to create embedder: %v", err)
 }

// 2. Initialize Vector Store (ChromaDB)
 store, err := chroma.New(
  chroma.WithChromaURL("http://localhost:8000"),
  chroma.WithNameSpace(collectionName),
  chroma.WithEmbedder(embedder),
 )
if err != nil {
  log.Fatalf("Failed to create Chroma vector store: %v", err)
 }

// 3. Load Documents from Directory
 loadStart := time.Now()
 loader := documentloaders.NewDirectory(dataDir)
 docs, err := loader.Load(context.Background())
if err != nil {
  log.Fatalf("Failed to load documents: %v", err)
 }
 loadDuration := time.Since(loadStart)
 fmt.Printf("Step 1: Loaded %d documents in %v\n"len(docs), loadDuration)

// 4. Split Documents into Chunks
 splitStart := time.Now()
 splitter := textsplitter.NewRecursiveCharacter(
  textsplitter.WithChunkSize(chunkSize),
  textsplitter.WithChunkOverlap(chunkOverlap),
 )
 chunks, err := textsplitter.SplitDocuments(splitter, docs)
if err != nil {
  log.Fatalf("Failed to split documents: %v", err)
 }
 splitDuration := time.Since(splitStart)
 fmt.Printf("Step 2: Split %d documents into %d chunks in %v\n"len(docs), len(chunks), splitDuration)

// 5. Add Documents to Vector Store (Embedding + Storing)
 storeStart := time.Now()
 _, err = store.AddDocuments(context.Background(), chunks)
if err != nil {
  _ = store.RemoveCollection(context.Background())
  log.Fatalf("Failed to add documents to vector store: %v", err)
 }
 storeDuration := time.Since(storeStart)
 fmt.Printf("Step 3: Embedded and stored %d chunks in %v\n"len(chunks), storeDuration)
 totalDuration := time.Since(totalStart)
 fmt.Println("\n--- LangChainGo Ingestion Results ---")
 fmt.Printf("Total time to ingest %d documents: %v\n"len(docs), totalDuration)

 fmt.Println("Cleaning up ChromaDB collection...")
if err := store.RemoveCollection(context.Background()); err != nil {
  log.Printf("Warning: failed to remove collection '%s': %v\n", collectionName, err)
 }
}

按顺序执行每个步骤并计时,其中 store.AddDocuments 会执行 embedding 并与 ChromaDB 通信,langchaingo 会在内部自动处理 batching。

Python 结构几乎相同:


   
   
   
    
   
   
   def main():
    """Main function to run the ingestion benchmark."""
    print("--- LangChain Python: RAG Ingestion Throughput Test ---")
    
    total_start = time.perf_counter()

    # 1. Initialize Embedder
    embeddings = OllamaEmbeddings(model=MODEL_NAME)

    # 2. Initialize Vector Store (ChromaDB)
    client = chromadb.HttpClient(host='localhost', port=8000)
    
    try:
        client.delete_collection(name=COLLECTION_NAME)
        logging.info(f"Existing collection '{COLLECTION_NAME}' deleted for a clean test.")
    except Exception:
        logging.info(f"Collection '{COLLECTION_NAME}' did not exist, creating new.")
        pass

    # 3. Load Documents from Directory
    load_start = time.perf_counter()
    loader = DirectoryLoader(DATA_DIR, glob="**/*.md")
    docs = loader.load()
    load_duration = time.perf_counter() - load_start
    print(f"Step 1: Loaded {len(docs)} documents in {load_duration:.4f}s")

    # 4. Split Documents into Chunks
    split_start = time.perf_counter()
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=CHUNK_SIZE,
        chunk_overlap=CHUNK_OVERLAP
    )
    chunks = text_splitter.split_documents(docs)
    split_duration = time.perf_counter() - split_start
    print(f"Step 2: Split {len(docs)} documents into {len(chunks)} chunks in {split_duration:.4f}s")
    
    # 5. Add Documents to Vector Store (Embedding + Storing)
    store_start = time.perf_counter()
    try:
        Chroma.from_documents(
            documents=chunks,
            embedding=embeddings,
            collection_name=COLLECTION_NAME,
            client=client
        )
    except Exception as e:
        logging.fatal(f"Failed to add documents to vector store: {e}")
        return
    store_duration = time.perf_counter() - store_start
    print(f"Step 3: Embedded and stored {len(chunks)} chunks in {store_duration:.4f}s")
    
    total_duration = time.perf_counter() - total_start
    print("\n--- LangChain Python Ingestion Results ---")
    print(f"Total time to ingest {len(docs)} documents: {total_duration:.4f}s")
    
    print("Cleaning up ChromaDB collection...")
    try:
        client.delete_collection(name=COLLECTION_NAME)
    except Exception as e:
        logging.warning(f"Warning: failed to remove collection '{COLLECTION_NAME}': {e}")
if __name__ == "__main__":
    main()

这里我们使用 LangChain 提供的 Chroma.from_documents 高阶封装,同时完成 embedding 与写入向量库。

运行:


   
   
   
    
   
   
   # running go version
go run ingestion.go

# running python version
python ingestion.py

   
   
   
    
   
   
   --- LangChainGo: RAG Ingestion Throughput Test ---
Step 1: Loaded 50 documents in17.8ms
Step 2: Split 50 documents into853 chunks in45.1ms
Step 3: Embedded and stored 853 chunks in118s

--- LangChainGo Ingestion Results ---
Total timeto ingest 50 documents: 118s

--- LangChain Python: RAG Ingestion Throughput Test ---
Step 1: Loaded 50 documents in0.1105s
Step 2: Split 50 documents into853 chunks in0.6158s
Step 3: Embedded and stored 853 chunks in215s

--- LangChain Python Ingestion Results ---
Total timeto ingest 50 documents: 135.7263s

整体结果与之前各步骤一致:CPU 受限的切分 Go 远快于 Python(45ms vs 615ms);I/O 为主的 embedding 与存储,同样显著快。

总体上,LangChainGo 完成 50 篇文档摄取耗时 78 秒,LangChain Python 需要 135 秒。

端到端摄取速度提升约 73%。对于需要持续更新知识库、不断摄取新文档的应用,这样的速度差异将转化为实际运营优势。

检索延迟:信息能多快被找到?

数据入库后,我们需要测试检索速度。检索延迟是 RAG 查询总时延的关键组成部分,包含两步:

Retrieval latency(作者 Fareed Khan)
Retrieval latency(作者 Fareed Khan)
  1. 对用户查询做 embedding;
  2. 用该 embedding 到 ChromaDB 做相似度搜索。

我们将运行 100 次以得到稳定平均值。

Go 版本连接已有的 ChromaDB 集合,并创建一个 retriever 对象:


   
   
   
    
   
   
   package main

// --- Configuration ---
const (
 numIterations  = 100
 modelName      = "llama3:8b"
 collectionName = "langchaingo-ingestion-test"
 query          = "What is the main topic of these documents?"
 topK           = 5
)
funcmain() {
 fmt.Println("--- LangChainGo: RAG Retrieval Latency Test ---")

// 1. Initialize Embedder.
 ollamaLLM, err := ollama.New(ollama.WithModel(modelName))
if err != nil {
  log.Fatalf("Failed to create Ollama client for embeddings: %v", err)
 }
 embedder, err := embeddings.NewEmbedder(ollamaLLM)
if err != nil {
  log.Fatalf("Failed to create embedder: %v", err)
 }

// 2. Initialize Vector Store and connect to the existing collection.
 store, err := chroma.New(
  chroma.WithChromaURL("http://localhost:8000"),
  chroma.WithNameSpace(collectionName),
  chroma.WithEmbedder(embedder),
 )
if err != nil {
  log.Fatalf("Failed to create Chroma vector store: %v", err)
 }

// 3. Create a retriever from the vector store.
 retriever := vectorstores.ToRetriever(store, topK)
 latencies := make([]time.Duration, 0, numIterations)
 ctx := context.Background()
 fmt.Printf("Running %d iterations to retrieve %d documents...\n\n", numIterations, topK)

// 5. Run the benchmark loop.
for i := 0; i < numIterations; i++ {
  start := time.Now()

// 6. Execute the retrieval.
  docs, err := retriever.GetRelevantDocuments(ctx, query)
if err != nil {
   log.Printf("Warning: Iteration %d failed: %v\n", i+1, err)
   continue
  }
  latency := time.Since(start)
  latencies = append(latencies, latency)
iflen(docs) != topK {
   log.Printf("Warning: Expected %d docs, but got %d\n", topK, len(docs))
  }
  fmt.Printf("Iteration %d: %v\n", i+1, latency)
 }
 fmt.Println("\n--- LangChainGo Retrieval Results ---")
 calculateAndPrintStats(latencies)
}

该基准的核心是 retriever.GetRelevantDocuments:它一并完成创建查询 embedding(调用 Ollama)与到 ChromaDB 查询 top-k 最相似文档。

Python 版本结构相似:


   
   
   
    
   
   
   # --- Configuration ---
NUM_ITERATIONS = 100
MODEL_NAME = "llama3:8b"
COLLECTION_NAME = "langchaingo-ingestion-test"
QUERY = "What is the main topic of these documents?"
TOP_K = 5

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

defmain():
    """Main function to run the retrieval benchmark."""
    print("--- LangChain Python: RAG Retrieval Latency Test ---")

    # 1. Initialize Embedder.
    embeddings = OllamaEmbeddings(model=MODEL_NAME)

    # 2. Initialize Vector Store client.
    client = chromadb.HttpClient(host='localhost', port=8000)
    
    vector_store = Chroma(
        collection_name=COLLECTION_NAME,
        embedding_function=embeddings,
        client=client
    )

    # 3. Create a retriever from the vector store.
    retriever = vector_store.as_retriever(search_kwargs={"k": TOP_K})
    latencies = []
    print(f"Running {NUM_ITERATIONS} iterations to retrieve {TOP_K} documents...\n")

    # 5. Run the benchmark loop.
    for i inrange(NUM_ITERATIONS):
        try:
            start_time = time.perf_counter()

            # 6. Execute the retrieval.
            docs = retriever.invoke(QUERY)
            
            end_time = time.perf_counter()
            latency = end_time - start_time
            latencies.append(latency)
            
            iflen(docs) != TOP_K:
                logging.warning(f"Expected {TOP_K} docs, but got {len(docs)}")
            print(f"Iteration {i + 1}{latency:.4f}s")
        except Exception as e:
            logging.warning(f"Warning: Iteration {i + 1} failed: {e}")
    print("\n--- LangChain Python Retrieval Results ---")
    calculate_and_print_stats(latencies)

if __name__ == "__main__":
    main()

在这里,我们实例化 Chroma 向量库连接既有集合,使用 .as_retriever() 创建检索器,并重复调用 .invoke() 计时。

结果:


   
   
   
    
   
   
   --- LangChainGo:RAGRetrievalLatencyTest---
---LangChainGoRetrievalResults---
Total Iterations:100
Total Time:       23.78s
Min Latency:      230.1ms
Max Latency:      255.9ms
Average Latency:237.8ms
Std Deviation:    5.9ms

---LangChain Python:RAGRetrievalLatencyTest---
---LangChainPythonRetrievalResults---
Total Iterations:100
Total Time:       34.8521s
Min Latency:      330.43ms
Max Latency:      398.81ms
Average Latency:348.52ms
Std Deviation:    18.55ms

结果与前述一致:

LangChainGo 平均检索延迟为 237.8ms,较 LangChain Python 的 348.52ms 快约 46%。

优势来源依旧:Go 在发起 embedding 的网络调用与处理请求/响应的框架开销更低,标准差更小意味着表现更稳定。

端到端 RAG:整合验证

最后,我们对端到端 RAG 查询流程做基准,将检索与最终基于上下文的 LLM 回答合并。这是对真实查询性能的终极测试。

RAG(作者 Fareed Khan)
RAG(作者 Fareed Khan)

Go 使用 chains.NewRetrievalQAFromLLM,将 retriever 与 LLM 组装为标准 RAG workflow。


   
   
   
    
   
   
   package main

// --- Configuration ---
const (
 numIterations  = 20
 modelName      = "llama3:8b"
 collectionName = "langchaingo-ingestion-test"
 query          = "Summarize the key themes from the documents in one paragraph."
 topK           = 5
)
funcmain() {
 fmt.Println("--- LangChainGo: End-to-End RAG Latency Test ---")

// 1. Initialize LLM for generation.
 llm, err := ollama.New(ollama.WithModel(modelName))
if err != nil {
  log.Fatalf("Failed to create Ollama client: %v", err)
 }

// 2. Initialize Embedder.
 embedder, err := embeddings.NewEmbedder(llm)
if err != nil {
  log.Fatalf("Failed to create embedder: %v", err)
 }

// 3. Initialize Vector Store.
 store, err := chroma.New(
  chroma.WithChromaURL("http://localhost:8000"),
  chroma.WithNameSpace(collectionName),
  chroma.WithEmbedder(embedder),
 )
if err != nil {
  log.Fatalf("Failed to create Chroma vector store: %v", err)
 }

// 4. Create the full RAG chain.
 ragChain := chains.NewRetrievalQAFromLLM(llm, vectorstores.ToRetriever(store, topK))
 latencies := make([]time.Duration, 0, numIterations)
 ctx := context.Background()
 inputValues := map[string]any{"query": query}
 fmt.Printf("Running %d end-to-end RAG iterations...\n\n", numIterations)

// 6. Run the benchmark loop.
for i := 0; i < numIterations; i++ {
  start := time.Now()

// 7. Execute the entire RAG chain with a single call.
  _, err := chains.Call(ctx, ragChain, inputValues)
if err != nil {
   log.Printf("Warning: Iteration %d failed: %v\n", i+1, err)
   continue
  }
  latency := time.Since(start)
  latencies = append(latencies, latency)
  fmt.Printf("Iteration %d: %v\n", i+1, latency)
 }
 fmt.Println("\n--- LangChainGo End-to-End RAG Results ---")
 calculateAndPrintStats(latencies)
}

chains.Call 调用 ragChain 将整个流程编排好:

  1. 调用 retriever 获取文档,将其与用户 query “stuff” 入 prompt 模板;
  2. 然后调用 LLM 生成最终答案。

Python 使用等价的 RetrievalQA chain:


   
   
   
    
   
   
   # --- Configuration ---
NUM_ITERATIONS = 20
MODEL_NAME = "llama3:8b"
COLLECTION_NAME = "langchaingo-ingestion-test"
QUERY = "Summarize the key themes from the documents in one paragraph."
TOP_K = 5

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

defmain():
    """Main function to run the end-to-end RAG benchmark."""
    print("--- LangChain Python: End-to-End RAG Latency Test ---")

    # 1. Initialize LLM, Embedder, and Vector Store client.
    llm = Ollama(model=MODEL_NAME)
    embeddings = OllamaEmbeddings(model=MODEL_NAME)
    client = chromadb.HttpClient(host='localhost', port=8000)
    
    vector_store = Chroma(
        collection_name=COLLECTION_NAME,
        embedding_function=embeddings,
        client=client
    )

    # 2. Create the full RAG chain.
    rag_chain = RetrievalQA.from_chain_type(
        llm=llm,
        chain_type="stuff",
        retriever=vector_store.as_retriever(search_kwargs={"k": TOP_K}),
        return_source_documents=False
    )
    latencies = []
    print(f"Running {NUM_ITERATIONS} end-to-end RAG iterations...\n")

    # 4. Run the benchmark loop.
    for i inrange(NUM_ITERATIONS):
        try:
            start_time = time.perf_counter()

            # 5. Execute the RAG chain.
            rag_chain.invoke({"query": QUERY})
            
            end_time = time.perf_counter()
            latency = end_time - start_time
            latencies.append(latency)
            print(f"Iteration {i + 1}{latency:.4f}s")
        except Exception as e:
            logging.warning(f"Warning: Iteration {i + 1} failed: {e}")

    print("\n--- LangChain Python End-to-End RAG Results ---")
    calculate_and_print_stats(latencies)


if __name__ == "__main__":
    main()

RetrievalQA.from_chain_type 是 LangChain 的标准构造器,逻辑与 Go 一致。

端到端性能:


   
   
   
    
   
   
   --- LangChainGoEnd-to-EndRAGResults---
Total Iterations:20
Total Time:       54.12s
Min Latency:      2.61s
Max Latency:      2.95s
Average Latency:2.70s
Std Deviation:    95.4ms

---LangChainPythonEnd-to-EndRAGResults---
Total Iterations:20
Total Time:       83.5432s
Min Latency:      4.0123s
Max Latency:      4.9521s
Average Latency:4.1771s
Std Deviation:    0.2589s

LangChainGo 的端到端 RAG 查询平均 2.70s,比 LangChain Python 的 4.17s 快 54%。

这是之前各优势的总和:更低的网络调用开销、更高效的 CPU 处理与更精简的框架逻辑。对面向用户的生产 RAG 系统,54% 的响应时间缩短是非常可观的改进。

第三部分:Agentic 架构评估

既然在 RAG 管道上 Go 已展现优势,是时候升级到更复杂的场景——agentic 系统。

如果说 RAG 是让 LLM 访问知识,那么 Agent 则是赋予它行动与推理的能力。

Agentic Benchmark(作者 Fareed Khan)
Agentic Benchmark(作者 Fareed Khan)

Agent 不只是回答,它会拆解问题、调用工具、串联多步以达成目标。这引入了“推理循环”(Think -> Act -> Observe),其中框架开销会快速累积。

本部分我们测试 LangChainGo 与 LangChain Python 如何应对这种复杂性。

简单 Agent:单次工具调用

从最基础的 agentic 任务开始:回答一个只需调用一次工具的问题。这是 Agent 的 “Hello, World!”,测试框架是否能:

  1. 理解用户意图;
  2. 从列表中选择正确的工具;
  3. 执行工具并利用其输出给出最终答案。
A Single Tool(作者 Fareed Khan)
A Single Tool(作者 Fareed Khan)

度量它能给出单次 Agent 推理循环的开销基线。

我们定义几个简单的本地工具,以隔离框架性能(避免外部 API 速度影响):WordLengthToolSimpleCalculator 与 CounterTool

Go 实现如下:


   
   
   
    
   
   
   package main

// --- Configuration ---
const (
 modelName = "llama3:8b"
)
// --- Custom Local Tools ---
// These tools perform simple, local computations to isolate framework overhead.
// WordLengthTool calculates the length of a given word.
type WordLengthTool struct{}
func(t WordLengthTool) Name() string { return"WordLengthTool" }
func(t WordLengthTool) Description() string {
return"Calculates the character length of a single word. Input must be one word."
}
func(t WordLengthTool) Call(_ context.Context, input string) (stringerror) {
return strconv.Itoa(len(strings.TrimSpace(input))), nil
}

// SimpleCalculatorTool evaluates a simple mathematical expression.
type SimpleCalculatorTool struct{}
func(t SimpleCalculatorTool) Name() string { return"SimpleCalculator" }
func(t SimpleCalculatorTool) Description() string {
return"A simple calculator that can evaluate basic arithmetic expressions like '1+2*3'. Do not use for multiple operations."
}

func(t SimpleCalculatorTool) Call(_ context.Context, input string) (stringerror) {
// Note: This is a highly insecure way to implement a calculator in a real app!
// For this benchmark, it's a stand-in for a CPU-bound tool.
// We'll simulate a simple calculation.
if strings.Contains(input, "*") {
  parts := strings.Split(input, "*")
iflen(parts) == 2 {
   a, _ := strconv.Atoi(strings.TrimSpace(parts[0]))
   b, _ := strconv.Atoi(strings.TrimSpace(parts[1]))
   return strconv.Itoa(a * b), nil
  }
 }
return"invalid expression"nil
}
// CounterTool simulates a stateful operation.
var counter = 0
type CounterTool struct{}
func(t CounterTool) Name() string { return"CounterTool" }
func(t CounterTool) Description() string {
return"Increments a global counter by one and returns the new value."
}
func(t CounterTool) Call(_ context.Context, input string) (stringerror) {
 counter++
return strconv.Itoa(counter), nil
}
// --- Benchmark Runner ---
funcmain() {
// Determine which test to run based on the command-line arguments.
iflen(os.Args) < 2 {
  fmt.Println("Usage: go run .  " )
  fmt.Println("Available tests: single, multi, high_freq")
  os.Exit(1)
 }
 testName := os.Args[1]
// 1. Initialize the LLM and the set of tools.
 llm, err := ollama.New(ollama.WithModel(modelName))
if err != nil {
  log.Fatalf("Failed to create LLM: %v", err)
 }

// The agent will have access to all these tools.
 availableTools := []tools.Tool{
  WordLengthTool{},
  SimpleCalculator{},
  CounterTool{},
 }

// 2. Create the agent executor.
// We use a Zero-Shot ReAct agent, which is a standard choice for this kind of task.
 agentExecutor, err := agents.Initialize(
  llm,
  availableTools,
  agents.ZeroShotReactDescription,
 )
if err != nil {
  log.Fatalf("Failed to initialize agent: %v", err)
 }

// 3. Define the prompts for each test scenario.
 prompts := map[string]string{
"single":    "What is the character length of the word 'phenomenon'?",
"multi":     "What is 25 multiplied by 4, and what is the character length of the word 'knowledge'?",
"high_freq""Using the CounterTool, count from 1 to 5 by calling the tool for each number.",
 }
 prompt, ok := prompts[testName]
if !ok {
  log.Fatalf("Invalid test name: %s", testName)
 }
 fmt.Printf("--- LangChainGo: Agent Test '%s' ---\n", testName)
 fmt.Printf("Prompt: %s\n\n", prompt)

// --- Profiling Setup ---
var startMem, endMem runtime.MemStats
 runtime.ReadMemStats(&startMem) // Read memory stats before the run.

// 4. Run the agent and measure performance.
 startTime := time.Now()
 result, err := chains.Run(context.Background(), agentExecutor, prompt)
if err != nil {
  log.Fatalf("Agent execution failed: %v", err)
 }
 duration := time.Since(startTime)
 runtime.ReadMemStats(&endMem) // Read memory stats after the run.

 memAllocated := endMem.Alloc - startMem.Alloc
 totalMemAllocated := endMem.TotalAlloc - startMem.TotalAlloc
 fmt.Println("--- Agent Final Answer ---")
 fmt.Println(result)
 fmt.Println("--------------------------\n")
 fmt.Println("--- Performance Metrics ---")
 fmt.Printf("End-to-End Latency: %v\n", duration)
 fmt.Printf("Memory Allocated (Heap): %d bytes (%.2f KB)\n", memAllocated, float64(memAllocated)/1024)
 fmt.Printf("Total Memory Allocated (Cumulative): %d bytes (%.2f MB)\n", totalMemAllocated, float64(totalMemAllocated)/1024/1024)
}

工具以实现 tools.Tool 接口的结构体定义,包含 Name、Description 与 Call。在 main 中初始化 Ollama LLM,并通过 agents.Initialize 创建 Agent(ReAct 逻辑)。基准的核心在 chains.Run,它启动 Agent 的推理过程。

Python 使用 @tool 装饰器定义工具(更现代也更简洁):


   
   
   
    
   
   
   # --- Configuration ---
MODEL_NAME = "llama3:8b"

# --- Custom Local Tools ---

# Using the `@tool` decorator is the modern way to define tools in LangChain.
@tool
defWordLengthTool(word: str) -> int:
    """Calculates the character length of a single word. Input must be one word."""
    returnlen(word.strip())

@tool
defSimpleCalculator(expression: str) -> str:
    """A simple calculator that can evaluate basic arithmetic expressions like '1+2*3'. Do not use for multiple operations."""
    # This is an insecure eval for benchmark purposes only!
    try:
        if"*"in expression:
            parts = [p.strip() for p in expression.split("*")]
            iflen(parts) == 2:
                returnstr(int(parts[0]) * int(parts[1]))
        return"invalid expression"
    except:
        return"error evaluating expression"

counter_val = 0

@tool
defCounterTool(placeholder: str = "") -> int:
    """Increments a global counter by one and returns the new value. The input is ignored."""
    global counter_val
    counter_val += 1
    return counter_val

defget_memory_usage():
    """Gets the current memory usage of the process."""
    process = psutil.Process(os.getpid())
    # rss: Resident Set Size, the non-swapped physical memory a process has used.
    return process.memory_info().rss

defmain():
    """Main function to run the agent benchmarks."""
    iflen(sys.argv) < 2:
        print("Usage: python agent.py  " )
        print("Available tests: single, multi, high_freq")
        sys.exit(1)
    
    test_name = sys.argv[1]

    # 1. Initialize LLM and tools.
    llm = Ollama(model=MODEL_NAME)
    available_tools = [WordLengthTool, SimpleCalculator, CounterTool]

    # 2. Create the agent.
    # We construct a ReAct prompt template. This defines the agent's reasoning process.
    prompt_template = PromptTemplate.from_template(
        """Answer the following questions as best you can. You have access to the following tools:
{tools}
Use the following format:
Question: the input question you must answer
Thought: you should always think about what to do
Action: the action to take, should be one of [{tool_names}]
Action Input: the input to the action
Observation: the result of the action
... (this Thought/Action/Action Input/Observation can repeat N times)
Thought: I now know the final answer
Final Answer: the final answer to the original input question
Begin!
Question: {input}
Thought:{agent_scratchpad}"""

    )
    
    agent = create_react_agent(llm, available_tools, prompt_template)
    agent_executor = AgentExecutor(agent=agent, tools=available_tools, verbose=True)

    # 3. Define prompts for each test.
    prompts = {
        "single""What is the character length of the word 'phenomenon'?",
        "multi""What is 25 multiplied by 4, and what is the character length of the word 'knowledge'?",
        "high_freq""Using the CounterTool, count from 1 to 5 by calling the tool for each number.",
    }
    prompt_text = prompts.get(test_name)
    ifnot prompt_text:
        print(f"Invalid test name: {test_name}")
        sys.exit(1)
    print(f"--- LangChain Python: Agent Test '{test_name}' ---")
    print(f"Prompt: {prompt_text}\n")

    # --- Profiling Setup ---
    start_mem = get_memory_usage()

    # 4. Run the agent and measure performance.
    start_time = time.perf_counter()
    result = agent_executor.invoke({"input": prompt_text})
    duration = time.perf_counter() - start_time
    
    end_mem = get_memory_usage()
    mem_used = end_mem - start_mem
    
    print("\n\n--- Agent Final Answer ---")
    print(result.get('output'))
    print("--------------------------\n")
    print("--- Performance Metrics ---")
    print(f"End-to-End Latency: {duration:.4f}s")
    print(f"Memory Usage Increase (RSS): {mem_used} bytes ({mem_used / 1024:.2f} KB)")

if __name__ == "__main__":
    main()

Python 代码目标相同但结构稍异。@tool 自动处理工具 schema 与描述;我们显式构建 ReAct 的 PromptTemplate,用 create_react_agent 组合,AgentExecutor 作为最终可运行对象。基准围绕 agent_executor.invoke 展开。

运行 “single” 测试:


   
   
   
    
   
   
   # running go version
go run single_tool.go

# running python version
python single_tool.py

   
   
   
    
   
   
   --- LangChainGo: Agent Test 'single' ---
--- Performance Metrics ---
End-to-End Latency: 1.95s
Total Memory Allocated (Cumulative): 2.55 MB

--- LangChain Python: Agent Test 'single' ---
--- Performance Metrics ---
End-to-End Latency: 3.0513s
Memory Usage Increase (RSS): 412.00 KB

即便是如此简单的单循环任务,性能差异也很显著。

LangChainGo 用时 1.95s,LangChain Python 用时 3.05s,Go 快 56%。

Python 的解释器、prompt 构造、解析 LLM 的 “thought” 输出、调用工具函数等各环节的开销叠加,最终比编译型的 Go 多出 1 秒以上延迟。

多跳 Agent:串联多次工具调用

Agent 的真正价值在于多跳推理——一个工具输出作为下一步输入。这需要 Agent 多次运行推理循环。

当一个问题需要 Agent 连续使用两个不同工具时会怎样?

Multi hop(作者 Fareed Khan)
Multi hop(作者 Fareed Khan)

例如:“25 乘以 4 等于多少?并给出单词 knowledge 的字符长度。”

此测试的代码与单工具测试完全一致。我们使用同一组工具与同一个 agent executor,唯一变化是 prompt 更复杂。它很好地测试了 agent 的推理能力与多轮循环下的框架效率。


   
   
   
    
   
   
   --- LangChainGo: Agent Test 'multi' ---
--- Performance Metrics ---
End-to-End Latency: 3.98s
Total Memory Allocated (Cumulative): 5.81 MB

--- LangChain Python: Agent Test 'multi' ---
--- Performance Metrics ---
End-to-End Latency: 6.8812s
Memory Usage Increase (RSS): 1024.00 KB

性能差距进一步扩大。

LangChainGo 完成两步任务耗时 3.98s;LangChain Python 6.88s,Go 快 73%。

这是因为在单工具测试中的框架开销在这里被“支付了两次”。每个 “Think -> Act -> Observe” 周期,Go 都更轻更快;串联两个循环后,Go 的初始优势复利,带来更大的绝对时间节省。

高频工具使用:测试 Agent 开销

最后一个 Agent 测试尽可能隔离框架开销:

如果给 Agent 一个需要多次调用“非常快的本地工具”的任务,会怎样?

High frequency(作者 Fareed Khan)
High frequency(作者 Fareed Khan)

例如:“用 CounterTool 从 1 数到 5”,强制 Agent 运行 5 次推理循环,而每步的 LLM “思考”最少。这样可以把 spotlight 直接打在框架处理循环机制的效率上。

仍然是相同的 agent 代码,只是换了一个高频 prompt。


   
   
   
    
   
   
   --- LangChainGo: Agent Test 'high_freq' ---
--- Performance Metrics ---
End-to-End Latency: 9.12s
Total Memory Allocated (Cumulative): 14.50 MB

--- LangChain Python: Agent Test 'high_freq' ---
--- Performance Metrics ---
End-to-End Latency: 18.5123s
Memory Usage Increase (RSS): 2856.00 KB

结果再次表明 Go 的优势:

LangChainGo 完成 5 次循环耗时 9.12s,而 LangChain Python 为 18.51s。Go 速度超过 2 倍(提升 103%)。

该测试尽量减少等待 LLM 的时间,最大化框架自身循环开销的占比。Python 的解释型特性意味着每一步(解析 LLM 输出、路由到工具、调用工具、格式化 observation)都要付出小开销;五次快速循环中,这些成为总延迟的主因。而 Go 编译型、内存更高效,在快速循环中代价更低。

工作流逻辑:数据转换与路由

实际应用往往需要更结构化的工作流,而非单个 ReAct 循环。工作流可能需要将查询路由至不同的专业链,或在 LLM 调用间执行自定义数据转换。

这些“胶水代码”的效率,对整体性能同样重要。

Workflow(作者 Fareed Khan)
Workflow(作者 Fareed Khan)

先测试一个 Router Chain。该模式用于智能地将用户查询路由到最适合的链,从而节省时间与资源。我们创建两个专项链 mathChain 与 proseChain,以及一个 routerChain 决定使用哪一个。

Go 实现:


   
   
   
    
   
   
   // --- Configuration ---
const (
 modelName = "llama3:8b"
 mathQuery = "What is 15 * 12?"
 proseQuery = "Write a haiku about a sunset."
)

funcmain() {
 fmt.Println("--- LangChainGo: Conditional (Router) Chain Test ---")
 llm, err := ollama.New(ollama.WithModel(modelName))
if err != nil {
  log.Fatalf("Failed to create LLM: %v", err)
 }

// 2. Define the destination chains.
 mathTemplate := prompts.NewPromptTemplate("Evaluate this math expression: {{.input}}", []string{"input"})
 mathChain := chains.NewLLMChain(llm, mathTemplate)

 proseTemplate := prompts.NewPromptTemplate("Write a creative piece based on this prompt: {{.input}}", []string{"input"})
 proseChain := chains.NewLLMChain(llm, proseTemplate)

// 3. Define the Router Chain.
 routerTemplate := prompts.NewPromptTemplate(
`Given the user query, classify it as either "math" or "prose". Respond with only one of these two words.\nQuery: {{.input}}`,
  []string{"input"},
 )
 routerChain := chains.NewLLMChain(llm, routerTemplate)

// --- Run Benchmarks ---
 fmt.Println("\n--- Testing Math Query ---")
 runAndProfile(routerChain, mathChain, proseChain, mathQuery)
 fmt.Println("\n--- Testing Prose Query ---")
 runAndProfile(routerChain, mathChain, proseChain, proseQuery)
}
funcrunAndProfile(router, math, prose chains.Chain, query string) {
 totalStart := time.Now()
var totalLLMDuration time.Duration
 ctx := context.Background()

// --- Step 1: Run the router ---
 routerStart := time.Now()
 routerResult, err := chains.Run(ctx, router, query)
 llm1Duration := time.Since(routerStart)
 totalLLMDuration += llm1Duration
 destination := strings.TrimSpace(strings.ToLower(routerResult))

// --- Step 2: Route to the appropriate destination chain ---
 destinationStart := time.Now()
switch destination {
case"math":
  _, err = chains.Run(ctx, math, query)
case"prose":
  _, err = chains.Run(ctx, prose, query)
 }
 llm2Duration := time.Since(destinationStart)
 totalLLMDuration += llm2Duration
 totalDuration := time.Since(totalStart)
 frameworkOverhead := totalDuration - totalLLMDuration
// ... print results
}

我们手工实现路由逻辑:

  1. 先运行 routerChain 得到分类("math" 或 "prose");
  2. 然后简单 switch 跳转到目标链。

这种低层控制让我们能精确区分框架逻辑耗时与 LLM 调用耗时。

Python 则用 LangChain 内置的 MultiPromptChain(更高阶的抽象):


   
   
   
    
   
   
   import time
from langchain.chainsimportLLMChainConversationChain
from langchain.chains.routerimportMultiPromptChain
from langchain.chains.router.llm_routerimportLLMRouterChainRouterOutputParser
from langchain.promptsimportPromptTemplate
from langchain_community.llmsimportOllama


//...
def main():
    // 1. Initialize the shared LLM.
    llm = Ollama(model=MODEL_NAME)
    // 2. Define prompt templates for the destination chains.
    prompt_infos = [
        {"name""math""description""Good for answering questions about math"/*...*/},
        {"name""prose""description""Good for writing creative prose"/*...*/},
    ]
    // 3. Create the destination chains.
    destination_chains = { /* ... create LLMChains ... */ }
    
    // 4. Create the Router Chain.
    router_template_str = ("...")
    router_prompt = PromptTemplate(/*...*/)
    router_chain = LLMRouterChain.from_llm(llm, router_prompt)
    
    // 5. Create the final MultiPromptChain.
    chain = MultiPromptChain(
        router_chain=router_chain,
        destination_chains=destination_chains,
        default_chain=ConversationChain(llm=llm, output_key="text"),
        verbose=True,
    )
    // --- Run Benchmarks ---
    run_and_profile(chain, MATH_QUERY)
    run_and_profile(chain, PROSE_QUERY)
def run_and_profile(chain, query):
    total_start = time.perf_counter()
    result = chain.invoke({"input": query})
    total_duration = time.perf_counter() - total_start
    // ... print results

由于前文大部分组件已经实现,这里不再重复。Python 借助 MultiPromptChain 完成路由,它基于各目标链的 description 构造路由 prompt。尽管方便,但抽象层增加也会引入额外开销。

结果:


   
   
   
    
   
   
   --- LangChainGo: Conditional (Router) Chain Test ---
--- Testing Math Query ---
--- Routing Performance Metrics ---
Final Answer: 180
Total End-to-End Latency: 1.402s
Total LLM-OnlyTime:      1.400s
Framework Logic Overhead: 2.0ms
Correctness:              Routed correctly


--- LangChain Python: Conditional (Router) Chain Test ---
--- Testing Math Query ---
--- Routing Performance Metrics ---
Final Answer: 180
Total End-to-End Latency: 2.1534s
Process CPU Time Used:    0.6521s
Correctness:              Routed correctly

Go 在路由逻辑上的框架开销仅 2.0ms;而 LangChain Python 同一任务的 Process CPU Time Used 为 652ms。尽管不能严格一一对照,但……

这清晰表明,Go 以编译型、最小化方式处理两次 LLM 调用的效率远高于 Python 的高层动态抽象。

对于有大量决策点、复杂路由逻辑的 agentic 系统,这种开销在 Python 中可能成为显著的性能杀手。需要快速执行大量动作时,Go 的效率优势十分明显(当然也建议进一步分析,但本测试结果已具代表性)。

第四部分:生产就绪的压力测试

到目前为止,我们在可控、一次只测一件事的条件下做了基准。但你可能会想,这与真实世界相去甚远。

生产级 AI 系统处在混乱环境中,要不断处理并发请求、应对不可靠网络,并且需要 7x24 小时稳定运行而不崩溃。

Stress Benchmark(作者 Fareed Khan)
Stress Benchmark(作者 Fareed Khan)

本部分我们从“实验室”走向“实战”。模拟真实生产压力,看看哪个框架更能扛住。

我们将把 LangChainGo 与 LangChain Python 推至极限,通过三类压力测试衡量其可扩展性、饱和潜力与面对失败时的韧性。

并发 RAG 与 Agent 系统:应对真实负载

生产服务器不会一次只服务一个用户,而是成百上千。高效管理大量并发请求是可扩展 AI 应用最重要的因素之一。

此测试衡量各框架在高负载场景下的并发模型表现。

Concurrent RAG(作者 Fareed Khan)
Concurrent RAG(作者 Fareed Khan)

我们将模拟 50 个用户同时查询 RAG,和 25 个用户同时跑多跳 Agent。测试框架处理并行 I/O 与 CPU 任务的能力。

Go 并发 RAG 测试使用 goroutine 与 sync.WaitGroup 同时发起和管理 50 个请求:


   
   
   
    
   
   
   package main

// --- Configuration ---
const (
 concurrencyLevel = 50// Number of concurrent requests to simulate
 modelName        = "llama3:8b"
 collectionName   = "langchaingo-ingestion-test"
 topK             = 3
)

// A sample list of queries to make the benchmark more realistic.
var queries = []string{
"What are the main themes?",
"Tell me about the introduction.",
"Is there a conclusion section?",
"What is the most complex topic discussed?",
}

funcmain() {
 fmt.Printf("--- LangChainGo: Concurrent RAG Queries Test (Concurrency: %d) ---\n", concurrencyLevel)

// 1. Initialize shared components ONCE.
 llm, err := ollama.New(ollama.WithModel(modelName))
if err != nil { /* ... error handling ... */ }
 embedder, err := embeddings.NewEmbedder(llm)
if err != nil { /* ... error handling ... */ }
 store, err := chroma.New(
  chroma.WithChromaURL("http://localhost:8000"),
  chroma.WithNameSpace(collectionName),
  chroma.WithEmbedder(embedder),
 )
if err != nil { /* ... error handling ... */ }
 ragChain := chains.NewRetrievalQAFromLLM(llm, vectorstores.ToRetriever(store, topK))

// 2. Prepare for the concurrent benchmark.
var wg sync.WaitGroup
 resultsChan := make(chan time.Duration, concurrencyLevel)
 errorChan := make(chanerror, concurrencyLevel)
 fmt.Printf("Dispatching %d concurrent RAG queries...\n", concurrencyLevel)
 totalStart := time.Now()

// 3. Dispatch worker goroutines.
for i := 0; i < concurrencyLevel; i++ {
  wg.Add(1)
gofunc(workerID int) {
   defer wg.Done()
   
   ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
   defer cancel()
   query := queries[workerID%len(queries)] 
   inputValues := map[string]any{"query": query}
   
   start := time.Now()
   _, err := chains.Call(ctx, ragChain, inputValues)
   latency := time.Since(start)
   if err != nil {
    errorChan <- fmt.Errorf("worker %d failed: %w", workerID, err)
   } else {
    resultsChan <- latency
    fmt.Printf("Worker %d completed in %v\n", workerID, latency)
   }
  }(i)
 }

// 4. Wait for all goroutines to complete.
 wg.Wait()
close(resultsChan)
close(errorChan)

// ... calculate and print results ...
}

我们只初始化一次 LangChain 组件,随后启动 50 个独立 goroutine(每个代表一个用户)运行完整 RAG 查询。用 WaitGroup 等待全部完成。这是 Go 原生、对大规模 I/O 并发极其高效的方式。

Python 版本使用 asyncio 实现同样目标:


   
   
   
    
   
   
   # --- Configuration ---
CONCURRENCY_LEVEL = 50
MODEL_NAME = "llama3:8b"
COLLECTION_NAME = "langchaingo-ingestion-test"
TOP_K = 3
QUERIES = [
    "What are the main themes?""Tell me about the introduction.""Is there a conclusion section?",
    "What is the most complex topic discussed?",
]

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

asyncdefrun_rag_query(rag_chain, query, worker_id):
    """Asynchronously runs a single RAG query and returns its latency."""
    start_time = time.perf_counter()
    try:
        await rag_chain.ainvoke({"query": query})
        latency = time.perf_counter() - start_time
        print(f"Worker {worker_id} completed in {latency:.4f}s")
        return latency, None
    except Exception as e:
        latency = time.perf_counter() - start_time
        error_message = f"Worker {worker_id} failed after {latency:.4f}s: {e}"
        logging.warning(error_message)
        returnNone, error_message

asyncdefmain():
    """Main async function to orchestrate the concurrent benchmark."""
    print(f"--- LangChain Python: Concurrent RAG Queries Test (Concurrency: {CONCURRENCY_LEVEL}) ---")

    # 1. Initialize shared components ONCE.
    llm = Ollama(model=MODEL_NAME)
    embeddings = OllamaEmbeddings(model=MODEL_NAME)
    client = chromadb.HttpClient(host='localhost', port=8000)
    vector_store = Chroma(/*...*/)
    rag_chain = RetrievalQA.from_chain_type(/*...*/)

    # 2. Create a list of concurrent tasks.
    tasks = [run_rag_query(rag_chain, QUERIES[i % len(QUERIES)], i) for i inrange(CONCURRENCY_LEVEL)]
    total_start = time.perf_counter()

    # 3. `asyncio.gather` runs all tasks concurrently.
    results = await asyncio.gather(*tasks, return_exceptions=True)
    
    total_duration = time.perf_counter() - total_start
    
    # ... process results and print stats ...

if __name__ == "__main__":
    asyncio.run(main())

Python 中,asyncio 是 I/O 并发的标准做法。我们定义 run_rag_query 协程,使用 ainvoke 异步调用。在 main 中创建 50 个任务并用 asyncio.gather 并发运行。

分别对并发 RAG 与并发 Agent 运行测试:


   
   
   
    
   
   
   # running go version
go run concurrent.go

# running python version
python concurrent.py

   
   
   
    
   
   
   --- LangChainGo: Concurrent RAG Queries Test (Concurrency: 50) ---
Total Time Elapsed: 6.5s
Throughput:         7.69 ops/sec

--- LangChain Python: Concurrent RAG Queries Test (Concurrency: 50) ---
Total Time Elapsed: 18.2s
Throughput:         2.75 ops/sec

   
   
   
    
   
   
   --- LangChainGo: Concurrent Agent Executions Test (Concurrency: 25) ---
Total Time Elapsed: 10.5s
Throughput:         2.38 ops/sec

--- LangChain Python: Concurrent Agent Executions Test (Concurrency: 25) ---
Total Time Elapsed: 32.1s
Throughput:         0.78 ops/sec

并发 RAG 中,LangChainGo 吞吐为 7.69 ops/sec,几乎是 Python(2.75 ops/sec)的 3 倍。

更复杂的并发 Agent 任务差距更大:Go 2.38 ops/sec,Python 0.78 ops/sec,Go 超过 3 倍。

这是生产系统中最重要的基准之一。Go runtime 从设计之初就面向大规模网络并发,goroutine 极其轻量,可轻松处理成千上万并发 I/O。Python 的 asyncio 虽强大,但事件循环与线程管理的复杂性与开销在该规模下难以匹敌 Go 的原生效率。

GPU 饱和:将模型服务推向极限

另一类可扩展问题是:客户端应用能多有效地让服务端昂贵的 GPU 保持“忙碌”?

若客户端发送新请求过慢,GPU 会空闲,带来成本浪费与吞吐下降。

GPU Saturation(作者 Fareed Khan)
GPU Saturation(作者 Fareed Khan)

该测试在固定时间内尽可能地轰击 Ollama server,看哪个框架能产生更高负载。

Go 使用一个持续向 job channel 投递任务的 worker pool,以确保 worker 一空闲就有请求上手:


   
   
   
    
   
   
   package main

// --- Configuration ---
const (
 concurrencyLevel = 100
 testDuration     = 60 * time.Second 
)
funcmain() {

// ... same setup ...
 llm, err := ollama.New(ollama.WithModel(modelName))

 jobs := make(chanbool, concurrencyLevel) 
var totalRequests atomic.Int64 
var wg sync.WaitGroup

// 3. Start the worker pool.
for i := 0; i < concurrencyLevel; i++ {
  wg.Add(1)
gofunc(workerID int) {
   defer wg.Done()
   forrange jobs {
    _, err := llms.GenerateFromSinglePrompt(context.Background(), llm, prompt)
    if err == nil {
     totalRequests.Add(1)
    }
   }
  }(i)
 }

// 4. Feed the job channel for the duration of the test.
 ctx, cancel := context.WithTimeout(context.Background(), testDuration)
defer cancel()
for {
select {
case jobs <- true// Dispatch a job
case <-ctx.Done(): // Time's up
   goto end_loop
  }
 }
end_loop:
close(jobs) 
 wg.Wait()   
// ... calculate and print results
}

这里我们创建 100 个等待任务的 worker goroutine。主线程在 60 秒内不断尝试向 jobs 发送“任务”(一个布尔),从而维持模型端的最大压力。

Python 使用 asyncio 与 Semaphore 达到类似效果:


   
   
   
    
   
   
   async defworker(llm, semaphore, request_counter):
    """A single worker that continuously makes requests."""
    whileTrue:
        asyncwith semaphore:
            try:
                await llm.ainvoke(PROMPT)
                request_counter[0] += 1
            except Exception as e:
                # ...
            await asyncio.sleep(0)

asyncdefmain():
    # ... setup ...
    llm = Ollama(model=MODEL_NAME)
    
    # 2. Setup concurrency controls.
    semaphore = asyncio.Semaphore(CONCURRENCY_LEVEL)
    request_counter = [0]
    
    # 3. Create and start the worker tasks.
    tasks = [asyncio.create_task(worker(llm, semaphore, request_counter)) for _ inrange(CONCURRENCY_LEVEL)]
    
    # 4. Let the workers run for the specified duration.
    try:
        await asyncio.sleep(TEST_DURATION_SECONDS)
    finally:
        # 5. Cancel all worker tasks to stop the benchmark.
        for task in tasks:
            task.cancel()
        await asyncio.gather(*tasks, return_exceptions=True)
    # ... calculate and print results

我们创建 100 个无限循环的 worker,asyncio.Semaphore(CONCURRENCY_LEVEL) 作为关键控制,确保同时“在途”的请求不超过 100。让其运行 60 秒后取消任务收尾。


   
   
   
    
   
   
   --- LangChainGo Saturation Results ---
Total Duration:      1m0.02s
Completed Requests:  395
Throughput:          6.58 req/sec

--- LangChain Python Saturation Results ---
Total Duration:      60.01s
Completed Requests:  255
Throughput:          4.25 req/sec

结果显示客户端生成负载的效率差异:

60 秒内,Go 客户端完成 395 个请求,吞吐 6.58 req/sec;Python 客户端仅完成 255 个,请求吞吐 4.25 req/sec。

这意味着 LangChainGo 客户端在“喂饱”服务端方面高出 55%。客户端侧更低的开销让其能更快地派发新请求,最大化利用昂贵的 GPU 资源。

处理超时、工具失败与解析错误

最后,生产系统必须具备韧性。我们测试三个常见失败场景:

  1. 超时:当 LLM server 响应缓慢会怎样?
  2. 工具失败:若 Agent 的工具崩溃或返回错误会怎样?
  3. 解析错误:如果 LLM 返回了畸形 JSON 会怎样?
Resiliency(作者 Fareed Khan)
Resiliency(作者 Fareed Khan)

对于超时,Go 使用 context.WithTimeout,这是 Go 处理中止的惯用方式:


   
   
   
    
   
   
   package main

// --- Configuration ---
const (
 modelName = "llama3:8b"
 prompt    = "Tell me a long story about the history of the internet."
 timeout   = 2 * time.Second 
)
funcmain() {
 fmt.Printf("--- LangChainGo: Timeout and Cancellation Test (Timeout: %v) ---\n", timeout)
 llm, err := ollama.New(ollama.WithModel(modelName))
if err != nil {
  log.Fatalf("Failed to create Ollama client: %v", err)
 }

// 2. Create a context with a deadline.
 ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel() 
 fmt.Println("Sending request that is expected to time out...")
 start := time.Now()

// 3. Call the LLM with the cancellable context.
 _, err = llm.Call(ctx, prompt)
 duration := time.Since(start)

// --- Verification ---
 fmt.Println("\n--- LangChainGo Resiliency Results ---")
 fmt.Printf("Request completed in: %v\n", duration)
if err == nil {
  log.Fatalf("TEST FAILED: Expected a timeout error, but got none.")
 }

// 4. Verify the error type.
if errors.Is(err, context.DeadlineExceeded) {
  fmt.Println("SUCCESS: Received the correct 'context.DeadlineExceeded' error.")
 } elseif netErr, ok := err.(interface{ Timeout() bool }); ok && netErr.Timeout() {
  fmt.Println("SUCCESS: Received a network timeout error, as expected.")
 } else {
  log.Fatalf("TEST FAILED: Expected a timeout error, but got a different error: %v", err)
 }

// 5. Verify adherence to the timeout.
if duration >= timeout && duration < timeout+500*time.Millisecond {
  fmt.Printf("SUCCESS: The function returned after ~%v, respecting the timeout.\n", timeout)
 } else {
  log.Fatalf("TEST FAILED: The function did not adhere to the timeout. Took %v.", duration)
 }
}

Python 通常在 client 初始化时配置超时:


   
   
   
    
   
   
   # --- Configuration ---
MODEL_NAME = "llama3:8b"
PROMPT = "Tell me a long story about the history of the internet."
TIMEOUT_SECONDS = 2.0

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')


defmain():
    """Main function to run the timeout benchmark."""
    print(f"--- LangChain Python: Timeout Test (Timeout: {TIMEOUT_SECONDS}s) ---")

    # 1. Initialize the Ollama client with a timeout.
    try:
        llm = Ollama(model=MODEL_NAME, request_timeout=TIMEOUT_SECONDS)
    except Exception as e:
        logging.fatal(f"Failed to create Ollama client: {e}")
        return

    print("Sending request that is expected to time out...")
    start_time = time.perf_counter()
    err = None

    # 2. Call the LLM.
    try:
        llm.invoke(PROMPT)
    except Exception as e:
        err = e

    duration = time.perf_counter() - start_time

    # --- Verification ---
    print("\n--- LangChain Python Resiliency Results ---")
    print(f"Request completed in: {duration:.4f}s")

    if err isNone:
        logging.fatal("TEST FAILED: Expected a timeout error, but got none.")

    # 3. Verify the error type.
    error_string = str(err).lower()
    if"timed out"in error_string or"timeout"in error_string:
        print(f"SUCCESS: Received an expected timeout-related error: {type(err).__name__}")
    else:
        logging.fatal(f"TEST FAILED: Expected a timeout error, but got a different error: {err}")

    # 4. Verify adherence to the timeout.
    if TIMEOUT_SECONDS <= duration < TIMEOUT_SECONDS + 0.5:
        print(f"SUCCESS: The function returned after ~{TIMEOUT_SECONDS}s, respecting the timeout.")
    else:
        logging.fatal(f"TEST FAILED: The function did not adhere to the timeout. Took {duration:.4f}s.")


if __name__ == "__main__":
    main()

运行超时测试:


   
   
   
    
   
   
   # running go verion
go run timeout.go

# running python version
python timeout.py

   
   
   
    
   
   
   --- LangChainGo: Timeout and Cancellation Test (Timeout: 2s) ---
--- LangChainGo Resiliency Results ---
Request completed in2.0015s
SUCCESS: Received the correct 'context.DeadlineExceeded' error.
SUCCESS: The function returned after ~2s, respecting the timeout.

--- LangChain Python: Timeout Test (Timeout: 2.0s) ---
--- LangChain Python Resiliency Results ---
Request completed in2.0081s
SUCCESS: Received an expected timeout-related error: ReadTimeout
SUCCESS: The function returned after ~2.0s, respecting the timeout.

这里看的是正确性而非速度。两种框架都能正确处理超时,并在截止时间后立即返回。这是生产库的基本素养。

主要区别在于方式:

  1. Go 以每个请求的 context 控制,更灵活、适合复杂系统为不同 API 调用设定不同超时;
  2. Python 在 client 层统一配置更简单,但粒度更粗。

对于工具和解析失败,两者同样提供错误处理机制:

  1. Go 显式传播错误,便于开发者直接处理,控制明确;
  2. LangChain Python 提供更高阶的封装,比如 AgentExecutor 的 handle_parsing_errors=True,可自动捕获工具失败并反馈给 Agent prompt,允许自我纠正。

可以说:Go 提供底层可控、可预测的错误处理;Python 则提供构建复杂自愈 agentic 逻辑的便捷。

第五部分:衡量真实运营成本

目前我们关注的多是速度、延迟、吞吐与响应时间。但在真实生产环境中,速度只是故事的一半,另一半是运营成本。这不仅是金钱,也包括应用在生命周期中所消耗的资源。

一个很快、却泄露内存,或者一加监控就变慢的应用,并不具备生产可用性,反而是问题。

Operation Cost Benchmark(作者 Fareed Khan)
Operation Cost Benchmark(作者 Fareed Khan)

最后我们测量两个关键运营成本:

  1. 有状态 Agent 在长时会话中的内存消耗;
  2. 加上必要可观测性后带来的性能损耗。

内存占用:长会话对话

聊天机器人与 Agent 需要记忆。最简单方式是 ConversationBufferMemory,每一轮都把完整历史附加到 prompt。这虽简单,但会导致内存占用快速增长。

Memory Footprint(作者 Fareed Khan)
Memory Footprint(作者 Fareed Khan)

该测试模拟 100 轮的对话,并在关键检查点测量内存使用,观察框架如何处理不断增长的状态。

Go 使用内置 runtime 获取精确的堆内存统计:


   
   
   
    
   
   
   package main

// --- Configuration ---
const (
 numTurns      = 100
 modelName     = "llama3:8b"
 initialPrompt = "My name is John."
)

// A struct to hold the metrics we want to capture at each checkpoint.
type BenchmarkMetrics struct {
 Turn           int
 Latency        time.Duration
 HeapAlloc      uint64// Bytes currently allocated on the heap
 TotalAlloc     uint64// Cumulative bytes allocated (shows churn)
 NumGC          uint32// Number of garbage collections
 PauseTotalNs   uint64// Total time spent in GC pauses
}
funcmain() {
 fmt.Printf("--- LangChainGo: Long-Term Memory Footprint Test (%d Turns) ---\n", numTurns)

// 1. Initialize the LLM and Memory components.
 llm, err := ollama.New(ollama.WithModel(modelName))
if err != nil {
  log.Fatalf("Failed to create LLM: %v", err)
 }

 mem := memory.NewConversationBuffer()
 conversationChain := chains.NewConversation(llm, mem)
 checkpoints := map[int]bool{1true10true50true100true}
var metrics []BenchmarkMetrics
var initialMemStats runtime.MemStats
 runtime.ReadMemStats(&initialMemStats)

 fmt.Println("Starting simulated conversation...")

// 2. Start the conversation loop.
 currentPrompt := initialPrompt
 ctx := context.Background()
for i := 1; i <= numTurns; i++ {
  start := time.Now()


// 3. Run the chain.
  result, err := chains.Run(ctx, conversationChain, currentPrompt)
if err != nil {
   log.Fatalf("Chain run failed at turn %d: %v", i, err)
  }
  latency := time.Since(start)
  currentPrompt = fmt.Sprintf("That's interesting. Can you tell me more about the last thing you said? My name is still %s.""John")

// 4. Record metrics at specified checkpoints.
if checkpoints[i] {
   var currentMemStats runtime.MemStats
   runtime.ReadMemStats(¤tMemStats)
   
   metrics = append(metrics, BenchmarkMetrics{
    Turn:         i,
    Latency:      latency,
    HeapAlloc:    currentMemStats.Alloc, // Current live memory
    TotalAlloc:   currentMemStats.TotalAlloc - initialMemStats.TotalAlloc,
    NumGC:        currentMemStats.NumGC - initialMemStats.NumGC,
    PauseTotalNs: currentMemStats.PauseTotalNs - initialMemStats.PauseTotalNs,
   })
   fmt.Printf("Checkpoint Turn %d: Latency=%v\n", i, latency)
  }
 }

// 5. Print the final results table.
 printResults(metrics)
}

// Eval Result metrics
funcprintResults(metrics []BenchmarkMetrics) {
 fmt.Println("\n--- LangChainGo Memory & Performance Results ---")
 fmt.Printf("%-5s | %-15s | %-15s | %-20s | %-10s | %-20s\n""Turn""Latency""Heap Alloc""Total Alloc (Churn)""Num GC""Total GC Pause")
for _, m := range metrics {
  heapAllocKB := float64(m.HeapAlloc) / 1024
  totalAllocMB := float64(m.TotalAlloc) / (1024 * 1024)
  pauseMs := float64(m.PauseTotalNs) / 1_000_000

  fmt.Printf("%-5d | %-15v | %-15s | %-20s | %-10d | %-20s\n",
   m.Turn, m.Latency.Round(time.Millisecond),
   fmt.Sprintf("%.2f KB", heapAllocKB), fmt.Sprintf("%.2f MB", totalAllocMB),
   m.NumGC, fmt.Sprintf("%.2f ms", pauseMs),
  )
 }
}

我们建立带 ConversationBuffer 的 ConversationChain

  1. 循环中,每次 chains.Run 都会:从内存加载完整历史、格式化为 prompt、调用 LLM、再将本轮输入输出写回内存;
  2. 用 runtime.ReadMemStats 捕捉 HeapAlloc(当前活跃内存)与 TotalAlloc(累计分配,反映 churn)。

Python 用 psutil 读取进程 RSS 作为内存指标:


   
   
   
    
   
   
   # --- Configuration ---
NUM_TURNS = 100
MODEL_NAME = "llama3:8b"
INITIAL_PROMPT = "My name is John."

defget_memory_usage():
    """Gets the current memory usage (RSS) of the process in bytes."""
    process = psutil.Process(os.getpid())
    return process.memory_info().rss

defmain():
    """Main function to run the memory footprint benchmark."""

    print(f"--- LangChain Python: Long-Term Memory Footprint Test ({NUM_TURNS} Turns) ---")

    llm = Ollama(model=MODEL_NAME)
    memory = ConversationBufferMemory()
    conversation_chain = ConversationChain(llm=llm, memory=memory)
    checkpoints = {11050100}
    metrics = []
    initial_mem_usage = get_memory_usage()
    
    print("Starting simulated conversation...")

    current_prompt = INITIAL_PROMPT

    for i inrange(1, NUM_TURNS + 1):
        start_time = time.perf_counter()

        # 3. Run one turn of the conversation.
        response = conversation_chain.invoke({"input": current_prompt})
        
        latency = time.perf_counter() - start_time
        current_prompt = f"That's interesting. Can you tell me more about the last thing you said? My name is still John."

        if i in checkpoints:
            current_mem_usage = get_memory_usage()
            mem_increase = current_mem_usage - initial_mem_usage
            
            metrics.append({
                "turn": i,
                "latency_s": latency,
                "rss_increase_bytes": mem_increase,
            })
            print(f"Checkpoint Turn {i}: Latency={latency:.4f}s")
            
    print_results(metrics)

defprint_results(metrics):
    """Prints the final benchmark results in a formatted table."""

    print("\n--- LangChain Python Memory & Performance Results ---")

    print(f"{'Turn':<5} | {'Latency (s)':<15} | {'Memory Increase (RSS)':<25}")

    for m in metrics:
        rss_increase_mb = m['rss_increase_bytes'] / (1024 * 1024)
        print(f"{m['turn']:<5} | {m['latency_s']:<15.4f} | {f'{rss_increase_mb:.2f} MB':<25}")

if __name__ == "__main__":
    main()

Python 逻辑等同:创建带 ConversationBufferMemory 的 ConversationChain,循环调用 invoke,在检查点读取 RSS 增量。

运行结果:


   
   
   
    
   
   
   --- LangChainGo Memory & Performance Results ---
Turn  | Latency         | Heap Alloc      | Total Alloc (Churn)  | Num GC     | Total GC Pause
-----------------------------------------------------------------------------------------------------
1     |1.95s           |160.25 KB       |5.10 MB              |2          |0.31 ms
10    |2.85s           |510.50 KB       |58.60 MB             |18         |2.55 ms
50    |6.10s           |2850.20 KB      |310.40 MB            |85         |12.82 ms
100   |11.25s          |5910.80 KB      |680.95 MB            |165        |25.15 ms

--- LangChain Python Memory & Performance Results ---
Turn  | Latency (s)     | Memory Increase (RSS)
-------------------------------------------------------------------
1     |2.5012          |22.50 MB
10    |4.1534          |45.12 MB
50    |10.2912         |145.80 MB
100   |19.8567         |290.25 MB

内存效率差异巨大:

100 轮后,Go 应用堆上仅 5.9 MB 活跃内存;而 Python 进程 RSS 增加了 290.25 MB。

尽管不完全可一一对应,但足以说明:Go 作为编译型、有值类型与高效 GC 的语言,内存更精简;Python 的动态特性与对象开销使同等对话历史需要显著更多内存。

对于要同时处理成千上万长会话的服务,这直接关系到服务器成本与稳定性。

可观测性开销:Tracing 的成本

在生产环境需要可观测性:日志、指标与 trace(如 LangSmith)以理解系统行为、调试问题与监控性能。

但这些“观察”不是免费的,每条日志与每个 trace 事件都带来少量开销。

Observability(作者 Fareed Khan)
Observability(作者 Fareed Khan)

该测试度量这部分开销。我们将并发 Agent 基准运行两次:一次无任何仪表(baseline),一次开启简单的 logging 回调,打印 Agent 生命周期的每个事件。

Go 实现:定义实现 callbacks.Handler 接口的 SimpleLogHandler


   
   
   
    
   
   
   package main

import (
"context"
"fmt"
"log"
"sort"
"strings"
"sync"
"time"
"github.com/tmc/langchaingo/agents"
"github.com/tmc/langchaingo/callbacks"
"github.com/tmc/langchaingo/chains"
"github.com/tmc/langchaingo/llms/ollama"
"github.com/tmc/langchaingo/schema"
"github.com/tmc/langchaingo/tools"
)

// ... Tool definition ...
// --- Custom Callback Handler ---
type SimpleLogHandler struct {
 callbacks.SimpleHandler
}

func(h SimpleLogHandler) HandleChainStart(ctx context.Context, inputs map[string]any) {
 log.Printf("[HANDLER] Chain Start. Inputs: %v", inputs)
}

func(h SimpleLogHandler) HandleChainEnd(ctx context.Context, outputs map[string]any) { /* ... */ }
func(h SimpleLogHandler) HandleLLMStart(ctx context.Context, prompts []string) { /* ... */ }
func(h SimpleLogHandler) HandleLLMEnd(ctx context.Context, output schema.LLMResult) { /* ... */ }
func(h SimpleLogHandler) HandleToolStart(ctx context.Context, input string) { /* ... */ }
func(h SimpleLogHandler) HandleToolEnd(ctx context.Context, output string) { /* ... */ }
func(h SimpleLogHandler) HandleAgentAction(ctx context.Context, action schema.AgentAction) { /* ... */ }

// ...
funcrunBenchmark(withCallbacks bool) BenchmarkMetrics {
// ... setup llm and tools ...

for i := 0; i < concurrencyLevel; i++ {
  wg.Add(1)
gofunc(workerID int) {
   defer wg.Done()
   var opts []chains.ChainCallOption
   if withCallbacks {
    // Each goroutine gets its own handler instance.
    opts = append(opts, chains.WithCallbacks(SimpleLogHandler{}))
   }
   
   agentExecutor, _ := agents.Initialize(llm, availableTools, agents.ZeroShotReactDescription)
   
   start := time.Now()
   _, err := chains.Run(context.Background(), agentExecutor, multiHopPrompt, opts...)
   // ...
  }(i)
 }
// ... wait and calculate metrics ...
}

关键在 runBenchmark:当 withCallbacks 为 true 时,创建 SimpleLogHandler 实例并通过 chains.WithCallbacks 传入 chains.Run,框架会在 Agent 的每一步调用相应的 handler。

Python 版本定义 BaseCallbackHandler 子类:


   
   
   
    
   
   
   # --- Custom Callback Handler ---
classSimpleLogHandler(BaseCallbackHandler):
    defon_chain_start(self, serialized: Dict[strAny], inputs: Dict[strAny], **kwargs: Any) -> Any:
        print(f"[HANDLER] Chain Start. Inputs: {list(inputs.keys())}")

    defon_chain_end(self, outputs: Dict[strAny], **kwargs: Any) -> Any:
        print(f"[HANDLER] Chain End. Outputs: {list(outputs.keys())}")

    defon_llm_start(self, serialized: Dict[strAny], prompts: List[str], **kwargs: Any) -> Any:
        print(f"[HANDLER] LLM Start. Prompt: {prompts[0][:50]}...")

    defon_llm_end(self, response, **kwargs: Any) -> Any:
        print(f"[HANDLER] LLM End. Generations: {len(response.generations)}")

    defon_tool_start(self, serialized: Dict[strAny], input_str: str, **kwargs: Any) -> Any:
        print(f"[HANDLER] Tool Start. Input: {input_str}")

    defon_tool_end(self, output: str, **kwargs: Any) -> Any:
        print(f"[HANDLER] Tool End. Output: {output}")


# --- Agent runner ---
defrun_agent_task(llm, tools, prompt_template, with_callbacks: bool, worker_id: int):
    """Runs a single agent task and returns its latency."""
    callbacks = [SimpleLogHandler()] if with_callbacks else []

    agent = create_react_agent(llm, tools, prompt_template)
    agent_executor = AgentExecutor(
        agent=agent,
        tools=tools,
        handle_parsing_errors=True,
        callbacks=callbacks,
    )

    start_time = time.perf_counter()
    agent_executor.invoke({"input": PROMPT})
    return time.perf_counter() - start_time

我们将 handler 实例传给 AgentExecutor 的 callbacks 列表,LangChain 的回调管理器会在 Agent 运行时调用相应 on_... 方法。

运行并分析结果:


   
   
   
    
   
   
   --- Observability Overhead Analysis ---
--------------------------------------------------
Metric                | Baseline        |With Callbacks  | Overhead/Degradation
--------------------------------------------------
Throughput (ops/sec)  |10.51           |10.15           |3.43%
Avg Latency           |3.89s           |4.05s           |+4.11%
P99 Latency           |4.75s           |4.95s           |
--------------------------------------------------


--- Observability Overhead Analysis ---
----------------------------------------------------------------------------------
Metric                   | Baseline        |With Callbacks  | Overhead/Degradation
----------------------------------------------------------------------------------
Throughput (ops/sec)     |6.95            |4.51            |35.11%
Avg Latency (s)          |5.85            |8.95            |+53.00%
P99 Latency (s)          |6.91            |11.21           |
Total CPU Time (s)       |9.85            |18.92           |+92.08%
----------------------------------------------------------------------------------

这也许是最醒目的结果:

在 Go 中,开启简单日志只带来……

约 3–4% 的轻微性能下降;而在 Python 中,同等可观测性导致吞吐骤降 35%、平均延迟增加 53%、CPU 使用几乎翻倍。

原因在于 Python 中每个回调事件都需要动态函数调用、对象创建,并在解释型框架的多层中传递。在高并发负载下,这种“千刀万剐”的代价极其可观;而 Go 的编译期函数调用几乎无开销。

对任何依赖细粒度、实时 tracing 来调试与监控的严肃生产系统而言,这是关键发现:在 Python 中可观测性的成本可能高昂,而在 Go 中则几乎“免费”。


【声明】内容源于网络
0
0
AI大模型观察站
专注于人工智能大模型的最新进展,涵盖Transformer架构、LLM训练优化、推理加速、多模态应用等核心技术领域。通过深度解析论文、开源项目和行业动态,揭示大模型技术的演进趋势,助力开发者、研究者和AI爱好者把握前沿创新。
内容 263
粉丝 0
AI大模型观察站 专注于人工智能大模型的最新进展,涵盖Transformer架构、LLM训练优化、推理加速、多模态应用等核心技术领域。通过深度解析论文、开源项目和行业动态,揭示大模型技术的演进趋势,助力开发者、研究者和AI爱好者把握前沿创新。
总阅读217
粉丝0
内容263