关注「索引目录」公众号,获取更多干货。
我们很多人都遇到过这样的情况:发布了一些通过所有测试的功能,结果却在生产环境中崩溃。几个月前,我为了提升速度尝试并发,却忽略了一个重要的限制:数据库事务并非线程安全,因此无法安全地在不同的 goroutine 之间共享事务。我们来看一下这段代码。
func (s *service) ProcessOrder(ctx context.Context, orderID int) error {
tx, err := s.db.BeginTx(ctx, nil)
if err != nil {
return err
}
defer tx.Rollback()
var order *Order
var inventory *Inventory
errG, grpCtx := errgroup.WithContext(ctx)
errG.Go(func() error {
o, err := s.repo.GetOrder(grpCtx, tx, orderID)
if err != nil {
return err
}
order = o
return nil
})
errG.Go(func() error {
i, err := s.repo.GetInventory(grpCtx, tx, orderID)
if err != nil {
return err
}
inventory = i
return nil
})
if err := errG.Wait(); err != nil {
return err
}
if inventory.Stock < order.Quantity {
return ErrInsufficientStock
}
if err := s.repo.DeductStock(ctx, tx, order.ProductID, order.Quantity); err != nil {
return err
}
if err := s.repo.MarkOrderShipped(ctx, tx, orderID); err != nil {
return err
}
return tx.Commit()
}
我创建了两个 goroutine 来同时获取订单和库存信息。这看起来似乎是正确的做法——它们是独立的读取操作,何乐而不为呢?起初一切似乎都很正常:在测试环境中运行了几天,处理了数百个订单,没有发现任何异常。但上线几个小时后,订单处理就停止了。唯一的线索是一个起初让人摸不着头脑的错误:
pq: unexpected Parse response 'C'
与此同时,另一个症状也出现了:订单卡住。用户下单成功,但订单状态却始终没有更新。从外部来看,操作似乎已经完成,但数据库状态却没有任何变化。这是一种静默的事务处理中途故障。
在 Go 语言中,goroutine*sql.Tx绑定到单个数据库连接,而该连接是一个非线程安全的 TCP 套接字。当两个 goroutine 同时写入同一个套接字时,PostgreSQL 的协议消息会交错传输,导致数据流损坏。
如果两个 goroutine 发生竞争,协议消息序列会重叠,goroutine A 可能会收到原本发给 goroutine B 的解析响应,因此驱动程序会丢失对哪个响应对应哪个查询的跟踪,最终触发我们上面看到的错误。
为什么在测试环境中没有出错?这就是海森堡漏洞的特性,当你刻意寻找它们时,它们似乎就消失了。竞态条件是概率性的,而这个漏洞一直存在。测试环境流量很低,所以两个 goroutine 经常在重叠之前就完成了。如果你想知道:不,go test -race它不会捕获到这个问题。竞态条件检测器检测的是 Go 内存,而不是 TCP 套接字 I/O。错误发生在网络层,超出了 Go 的可见范围。
解决方法很简单:顺序查询,不在 goroutine 之间共享事务。
func (s *service) ProcessOrder(ctx context.Context, orderID int) error {
tx, err := s.db.BeginTx(ctx, nil)
if err != nil {
return err
}
defer tx.Rollback()
order, err := s.repo.GetOrder(ctx, tx, orderID)
if err != nil {
return err
}
inventory, err := s.repo.GetInventory(ctx, tx, orderID)
if err != nil {
return err
}
if inventory.Stock < order.Quantity {
return ErrInsufficientStock
}
if err := s.repo.DeductStock(ctx, tx, order.ProductID, order.Quantity); err != nil {
return err
}
if err := s.repo.MarkOrderShipped(ctx, tx, orderID); err != nil {
return err
}
return tx.Commit()
}
如果需要并发数据库操作,请在启动事务之前执行,或者使用单独的连接。但是,一旦调用了 BeginTx,该连接就只属于一个 goroutine,直到调用 Commit 或 Rollback 为止。
关注「索引目录」公众号,获取更多干货。

