在进行代码审查讨论时,我们经常会听到关于线程安全和并发工具的一些片面观点和结论,比如“只需将 HashMap 替换为 ConcurrentHashMap 就可以解决并发问题”或“尝试使用无锁的 CopyOnWriteArrayList,性能更好”。
然而,这些说法并不总是准确的。当然,现代编程语言为开发者提供了各种并发工具类,以便更轻松地进行多线程编程。但如果我们没有充分了解这些工具的使用场景、问题解决方法以及最佳实践,盲目使用可能导致一些问题。这些问题可能从性能上造成小的损失,到无法确保业务逻辑在多线程情况下的正确性。
需要先说明的是,这里所说的并发工具类指的是用来解决多线程环境下并发问题的工具类库。通常,这些并发工具包括同步器和容器两大类。在业务代码中,使用并发容器的情况可能更为常见一些。今天我要分享的例子也将侧重于并发容器。接下来,让我们看看在使用并发工具时,最常遇到的问题有哪些,以及如何解决和避免这些问题。
之前,一位同事在生产环境中遇到了一个奇怪的问题:有时候获取到的用户信息竟然是其他用户的。在仔细查看代码后,我发现他在缓存获取到的用户信息时使用了 ThreadLocal。ThreadLocal 被设计用于在线程之间隔离变量,而在方法或类之间共享的场景。如果获取用户信息比较昂贵(比如从数据库查询用户信息),将数据缓存到 ThreadLocal 中是一种合理的做法。然而,为什么这样的实现会导致用户信息混乱的 Bug 呢?让我们通过一个具体的案例来了解。
我们可以通过使用 Spring Boot 创建一个 Web 应用程序来演示这个问题。在这个例子中,我们将一个 Integer 值存储在 ThreadLocal 中,代表需要在线程中保存的用户信息,初始值为 null。在业务逻辑中,我们先从 ThreadLocal 中获取一次值,然后将外部传入的参数设置到 ThreadLocal 中,模拟从当前上下文获取用户信息的逻辑。接着再次获取一次值,最后输出两次获得的值以及线程名称。这样我们就可以看到问题出现的具体情境。
private static final ThreadLocal<Integer> currentUser = ThreadLocal.withInitial(() -> null);@GetMapping("wrong")public Map wrong(@RequestParam("userId") Integer userId) {//设置用户信息之前先查询一次ThreadLocal中的用户信息String before = Thread.currentThread().getName() + ":" + currentUser.get();//设置用户信息到ThreadLocalcurrentUser.set(userId);//设置用户信息之后再查询一次ThreadLocal中的用户信息String after = Thread.currentThread().getName() + ":" + currentUser.get();//汇总输出两次查询结果Map result = new HashMap();result.put("before", before);result.put("after", after);return result;}
理论上,在设置用户信息之前第一次获取的值应该始终是 null。然而,需要注意的是,程序运行在Tomcat中,执行程序的线程是Tomcat的工作线程,而Tomcat的工作线程是基于线程池的。正如其名称所示,线程池会重用一组固定的线程。一旦线程被重用,很可能首次从ThreadLocal获取的值是之前其他用户请求留下的值。这种情况下,ThreadLocal中的用户信息就变成了其他用户的信息。为了更迅速地重现这个问题,我在配置文件中调整了Tomcat的参数,将工作线程池的最大线程数设置为1,这样就始终是同一个线程在处理请求。
server.tomcat.max-threads=1br
运行程序后先让用户 1 来请求接口,可以看到第一和第二次获取到用户 ID 分别是 null 和 1,符合预期:

随后用户 2 来请求接口,这次就出现了 Bug,第一和第二次获取到用户 ID 分别是 1 和 2,显然第一次获取到了用户 1 的信息,原因就是 Tomcat 的线程池重用了线程。从图中可以看到,两次请求的线程都是同一个线程:http-nio-8080-exec-1。

这个例子提醒我们,在编写业务代码时,首先要了解代码将在哪个线程上运行。我们可能会感觉多线程知识不太相关,因为代码中并没有显式地启用多线程。然而,实际上,比如在像Tomcat这样的Web服务器中运行的业务代码,本身就是在一个多线程环境中执行的(否则,接口也无法支持如此高的并发)。不能因为没有显式地开启多线程就认为不存在线程安全问题。由于线程的创建成本相对较高,Web服务器通常会使用线程池来处理请求,这意味着线程会被重用。
在这种情况下,如果使用诸如ThreadLocal之类的工具来存储数据,就需要特别注意在代码运行完毕后显式地清空设置的数据。同样,如果代码中使用了自定义的线程池,也可能面临类似的问题。
理解了这个知识点后,我们修正这段代码的方案是,在代码的 finally 代码块中,显式清除 ThreadLocal 中的数据。这样一来,新的请求过来即使使用了之前的线程也不会获取到错误的用户信息了。修正后的代码如下:
public Map right( Integer userId) {String before = Thread.currentThread().getName() + ":" + currentUser.get();currentUser.set(userId);try {String after = Thread.currentThread().getName() + ":" + currentUser.get();Map result = new HashMap();result.put("before", before);result.put("after", after);return result;} finally {//在finally代码块中删除ThreadLocal中的数据,确保数据不串currentUser.remove();}}
重新运行程序可以验证,再也不会出现第一次查询用户信息查询到之前用户请求的 Bug:

ThreadLocal 是利用独占资源的方式,来解决线程安全问题,那如果我们确实需要有资源在线程之间共享,应该怎么办呢?这时,我们可能就需要用到线程安全的容器了。
使用了线程安全的并发工具,并不代表解决了所有线程安全问题
JDK 1.5之后引入的ConcurrentHashMap是一个高性能的线程安全哈希表容器。然而,“线程安全”这个术语容易导致误解,因为ConcurrentHashMap只能确保提供的原子性读写操作是线程安全的。
我在很多业务代码中都见过这种误区,例如下面的情景。假设有一个包含900个元素的Map,现在要并发地添加100个元素,由10个线程同时执行。开发人员错误地认为使用了ConcurrentHashMap就能够避免线程安全问题,于是不加思考地编写了以下代码:在每个线程的逻辑中,首先通过size方法获取当前元素数量,计算ConcurrentHashMap目前需要添加多少元素,并在日志中输出了这个值。然后,通过putAll方法将缺少的元素添加进去。
为方便观察问题,我们输出了这个 Map 一开始和最后的元素个数。
//线程个数private static int THREAD_COUNT = 10;//总元素数量private static int ITEM_COUNT = 1000;//帮助方法,用来获得一个指定元素数量模拟数据的ConcurrentHashMapprivate ConcurrentHashMap<String, Long> getData(int count) {return LongStream.rangeClosed(1, count).boxed().collect(Collectors.toConcurrentMap(i -> UUID.randomUUID().toString(), Function.identity(),(o1, o2) -> o1, ConcurrentHashMap::new));}@GetMapping("wrong")public String wrong() throws InterruptedException {ConcurrentHashMap<String, Long> concurrentHashMap = getData(ITEM_COUNT - 100);//初始900个元素log.info("init size:{}", concurrentHashMap.size());ForkJoinPool forkJoinPool = new ForkJoinPool(THREAD_COUNT);//使用线程池并发处理逻辑forkJoinPool.execute(() -> IntStream.rangeClosed(1, 10).parallel().forEach(i -> {//查询还需要补充多少个元素int gap = ITEM_COUNT - concurrentHashMap.size();log.info("gap size:{}", gap);//补充元素concurrentHashMap.putAll(getData(gap));}));//等待所有任务完成forkJoinPool.shutdown();forkJoinPool.awaitTermination(1, TimeUnit.HOURS);//最后元素个数会是1000吗?log.info("finish size:{}", concurrentHashMap.size());return "OK";}
访问接口后程序输出的日志内容如下:

从日志中我们可以观察到以下情况:初始大小为900是符合预期的,但还需要填充100个元素。在worker1线程中,查询到当前需要填充的元素为36,而这不是100的倍数。而在worker13线程中,查询到需要填充的元素数是负的,显然已经过度填充了。最终HashMap的总项目数是1536,显然不符合填充至1000的预期。
为了更形象地解释这个情况,可以做一个比喻。ConcurrentHashMap就像是一个大篮子,现在篮子里有900个桔子,我们希望再往篮子里装满1000个桔子,也就是再装入100个桔子。有10个工人来完成这项任务,每个人上岗后都会计算还需要往篮子里添加多少个桔子,最后再将桔子放入篮子。
ConcurrentHashMap这个篮子本身可以确保多个工人在往里装东西时不会相互影响,但不能确保工人A看到篮子里需要装100个桔子而还未装的时候,工人B就能够看到篮子中的桔子数量。更值得注意的是,往这个篮子里装100个桔子的操作并不是原子性的,从别人的角度看,可能会有一个瞬间篮子里有964个桔子,而还需要装36个桔子。
回到ConcurrentHashMap,我们需要注意其对外提供的方法或能力的限制:使用ConcurrentHashMap并不代表对其多个操作之间的状态是一致的,也不代表没有其他线程在对其进行操作。如果需要确保状态一致性,需要手动加锁。一些聚合方法,如size、isEmpty和containsValue,在并发情况下可能会反映ConcurrentHashMap的中间状态。
因此,在并发情况下,这些方法的返回值只能用作参考,而不能用于流程控制。显然,使用size方法计算差异值就是一种流程控制。类似于putAll这样的聚合方法也不能确保原子性,在putAll的过程中去获取数据可能会获取到部分数据。
代码的修改方案很简单,整段逻辑加锁即可:
@GetMapping("right")public String right() throws InterruptedException {ConcurrentHashMap<String, Long> concurrentHashMap = getData(ITEM_COUNT - 100);log.info("init size:{}", concurrentHashMap.size());ForkJoinPool forkJoinPool = new ForkJoinPool(THREAD_COUNT);forkJoinPool.execute(() -> IntStream.rangeClosed(1, 10).parallel().forEach(i -> {//下面的这段复合逻辑需要锁一下这个ConcurrentHashMapsynchronized (concurrentHashMap) {int gap = ITEM_COUNT - concurrentHashMap.size();log.info("gap size:{}", gap);concurrentHashMap.putAll(getData(gap));}}));forkJoinPool.shutdown();forkJoinPool.awaitTermination(1, TimeUnit.HOURS);log.info("finish size:{}", concurrentHashMap.size());return "OK";}
重新调用接口,程序的日志输出结果符合预期:

可以看到,只有一个线程查询到了需要补 100 个元素,其他 9 个线程查询到不需要补元素,最后 Map 大小为 1000。到了这里,你可能又要问了,使用 ConcurrentHashMap 全程加锁,还不如使用普通的 HashMap 呢。其实不完全是这样。ConcurrentHashMap 提供了一些原子性的简单复合逻辑方法,用好这些方法就可以发挥其威力。这就引申出代码中常见的另一个问题:在使用一些类库提供的高级工具类时,开发人员可能还是按照旧的方式去使用这些新类,因为没有使用其特性,所以无法发挥其威力。
没有充分了解并发工具的特性,从而无法发挥其威力
让我们考虑一个使用Map来统计Key出现次数的场景,这个逻辑在业务代码中非常普遍。我们使用ConcurrentHashMap来进行统计,Key的范围是10。最多使用10个并发线程,循环执行1000万次操作,每次操作都会将一个随机的Key累加。如果Key不存在,首次设置其值为1。
//循环次数private static int LOOP_COUNT = 10000000;//线程数量private static int THREAD_COUNT = 10;//元素数量private static int ITEM_COUNT = 10;private Map<String, Long> normaluse() throws InterruptedException {ConcurrentHashMap<String, Long> freqs = new ConcurrentHashMap<>(ITEM_COUNT);ForkJoinPool forkJoinPool = new ForkJoinPool(THREAD_COUNT);forkJoinPool.execute(() -> IntStream.rangeClosed(1, LOOP_COUNT).parallel().forEach(i -> {//获得一个随机的KeyString key = "item" + ThreadLocalRandom.current().nextInt(ITEM_COUNT);synchronized (freqs) {if (freqs.containsKey(key)) {//Key存在则+1freqs.put(key, freqs.get(key) + 1);} else {//Key不存在则初始化为1freqs.put(key, 1L);}}}));forkJoinPool.shutdown();forkJoinPool.awaitTermination(1, TimeUnit.HOURS);return freqs;}
我们吸取之前的教训,直接通过锁的方式锁住 Map,然后做判断、读取现在的累计值、加 1、保存累加后值的逻辑。这段代码在功能上没有问题,但无法充分发挥 ConcurrentHashMap 的威力,改进后的代码如下:
private Map<String, Long> gooduse() throws InterruptedException {ConcurrentHashMap<String, LongAdder> freqs = new ConcurrentHashMap<>(ITEM_COUNT);ForkJoinPool forkJoinPool = new ForkJoinPool(THREAD_COUNT);forkJoinPool.execute(() -> IntStream.rangeClosed(1, LOOP_COUNT).parallel().forEach(i -> {String key = "item" + ThreadLocalRandom.current().nextInt(ITEM_COUNT);//利用computeIfAbsent()方法来实例化LongAdder,然后利用LongAdder来进行线程安全计数freqs.computeIfAbsent(key, k -> new LongAdder()).increment();}));forkJoinPool.shutdown();forkJoinPool.awaitTermination(1, TimeUnit.HOURS);//因为我们的Value是LongAdder而不是Long,所以需要做一次转换才能返回return freqs.entrySet().stream().collect(Collectors.toMap(e -> e.getKey(),e -> e.getValue().longValue()));}
在这个经过改进的代码中,我们灵活运用了以下两个关键点:
利用ConcurrentHashMap的原子性方法computeIfAbsent执行复合逻辑操作。该方法判断Key是否存在对应的Value,如果不存在,则运行Lambda表达式并将结果作为新的Value放入Map。在这里,我们使用computeIfAbsent来检查Key是否存在,如果不存在,则创建一个新的LongAdder对象,并将其作为新的Value返回。由于computeIfAbsent方法保证原子性,因此多线程环境下也能正确执行。
由于computeIfAbsent方法返回的Value是LongAdder,它是一个线程安全的累加器,因此我们可以直接调用其increment方法进行累加操作。
除了像ConcurrentHashMap这样通用的并发工具类,我们的工具包中还包含了一些专为特殊场景设计的实现。通常来说,针对通用场景的通用解决方案在各种场景下性能表现良好,被称为“万金油”;而专为特殊场景设计的实现可能在其特定场景下表现更出色,但必须在这些场景中使用,否则可能引发性能问题甚至Bug。
在之前排查一个生产性能问题时,我们发现一段简单的非数据库操作的业务逻辑耗费了超出预期的时间,而在修改数据时,操作本地缓存的性能比回写数据库还慢。检查代码后发现,开发同事使用了CopyOnWriteArrayList来缓存大量的数据,而且数据变化频繁。
CopyOnWrite是一种时髦的技术,在Linux和Redis等地方都有应用。在Java中,虽然CopyOnWriteArrayList是一个线程安全的ArrayList,但由于其实现方式是每次修改数据都会复制一份数据,因此适用场景相对明显,即在读操作远多于写操作或者需要无锁读取的场景。如果选择使用CopyOnWriteArrayList,必须是因为场景需要,而不是仅仅因为它看起来很酷炫。如果读写比例平衡或者有大量写操作,使用CopyOnWriteArrayList的性能可能会非常糟糕。
开发人员在解决线程安全问题时容易犯的四类错误。
首先是只知道使用并发工具,却对当前线程的情况了解不够,例如使用ThreadLocal来缓存数据,以为ThreadLocal在线程之间做了隔离不会有线程安全问题,但却忽略了线程重用可能导致数据混淆的问题。务必记得在业务逻辑结束前清理ThreadLocal中的数据。
其次是误以为使用了并发工具就能解决所有线程安全问题,期望通过将线程不安全的类替换为线程安全的类来一键解决问题。例如,认为使用ConcurrentHashMap就可以解决线程安全问题,而没有在复合逻辑中加锁,导致业务逻辑错误。如果希望在整个业务逻辑中保持容器操作的整体一致性,需要添加适当的锁。
第三是没有充分了解并发工具的特性,仍然按照老的方式使用新的工具,导致无法充分发挥其性能。例如,使用了ConcurrentHashMap,但没有充分利用其提供的基于CAS安全的方法,仍然使用锁的方式来实现逻辑。在使用ConcurrentHashMap时,建议阅读其文档,查看相关原子性操作的API是否满足业务需求,如果可以的话优先考虑使用。
最后是没有了解清楚工具的适用场景,在不合适的场景下使用了错误的工具导致性能下降。比如,没有理解CopyOnWriteArrayList的适用场景,却在读写平衡或大量写操作的情况下使用,导致性能问题。对于这种场景,可以考虑使用普通的List。

