Java 8 & 9 – Parallel streams with custom ForkJoinPool bug

The Java Stream API is a wonderful tool, but not without its shortcomings. There is a bug in Java 8 & 9 which affects the number of threads used by a parallel stream in a seemingly unpredictable way. Beware!

The bug I’m talking about is JDK-8190974 and it seems to have been fixed only in Java 10, which means those of us still using Java 8 or 9 are affected. The test class below illustrates the problem:

// Make sure to not run these tests in parallel!
public class ParallelStreamsTest {
    private final String COMMON_FORK_JOIN_POOL_PARALLELISM = "java.util.concurrent.ForkJoinPool.common.parallelism";
    private List<String> strings;

    @Before
    public void setup() {
        strings = new ArrayList<>();
        for (int i = 1; i <= 100; i++) {
            strings.add(String.valueOf(i));
        }
    }

    @Test
    public void commonPoolWithCommonPoolValueOne() {
        System.setProperty(COMMON_FORK_JOIN_POOL_PARALLELISM, "1");
        long threadsUsed = countThreadsUsedForParallelStream();

        assertEquals(2, threadsUsed);
    }

    private long countThreadsUsedForParallelStream() {
        return strings.stream().parallel()
                .map(s -> {
                    try {
                        Thread.sleep(10);
                        return s;
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                })
                .map(s -> Thread.currentThread().getName())
                .distinct()
                .count();
    }

    @Test
    public void customPoolWithCommonPoolValueOne() throws ExecutionException, InterruptedException {
        System.setProperty(COMMON_FORK_JOIN_POOL_PARALLELISM, "1");
        ForkJoinPool forkJoinPool = new ForkJoinPool(8);
        ForkJoinTask<Long> threads = forkJoinPool.submit(this::countThreadsUsedForParallelStream);

        int threadsUsed = threads.get().intValue();
        assertEquals(4, threadsUsed);
    }

    @Test
    public void customPoolWithCommonPoolValueTwo() throws ExecutionException, InterruptedException {
        System.setProperty(COMMON_FORK_JOIN_POOL_PARALLELISM, "2");
        ForkJoinPool forkJoinPool = new ForkJoinPool(8);
        ForkJoinTask<Long> threads = forkJoinPool.submit(this::countThreadsUsedForParallelStream);

        int threadsUsed = threads.get().intValue();
        assertEquals(8, threadsUsed);
    }
}

As you can see, we’re running a stream of 100 strings in parallel and then counting the distinct threads that they run in. The results are:

Common pool (CPPV=1) -> 2 threads
Custom pool (CPPV=1) -> 4 threads
Custom pool (CPPV=2) -> 8 threads
(CPPV = Common Pool Parallelism Value)

What’s interesting is that not only do the custom pool thread numbers vary in proportion to the common pool, the number of threads can vary between a common and custom pool even when the CPPV value stays the same.

As stated in this StackOverflow answer, the root of the bug lies in the java.util.stream.AbstractTask class, namely its LEAF_TARGET field:

//src/share/classes/java/util/stream/AbstractTask.java

/**
     * Default target factor of leaf tasks for parallel decomposition.
     * To allow load balancing, we over-partition, currently to approximately
     * four tasks per processor, which enables others to help out
     * if leaf tasks are uneven or some processors are otherwise busy.
     */
    static final int LEAF_TARGET = ForkJoinPool.getCommonPoolParallelism() << 2;

The number of tasks for parallel decomposition is calculated as theNumberOfCommonPoolThreads * 4, regardless of whether or not we are actually using the common pool.

While the bug is reported to have been fixed in Java 10, it is important to stay vigilant and aware of problems like these, especially since many of us are still working on earlier versions of the platform.

Daniel Frąk Written by:

Be First to Comment

Leave a Reply

Your email address will not be published. Required fields are marked *