Java 8 Parallel Streams
Introduction
As we have seen previously in Java 8 Streams, Java 8 introduced streams as way to perform functional operations over collections of elements. In this article we will see how we may take advantage of the parallel streams feature in order to implement parallel stream processing.
The well known Collection Framework provides wrappers that add synchronization capabilities to Java collections (by default, Java collections are not thread-safe). The problem with synchronized collections is that they will introduce thread contention: Threads will race to gather access to resources, and may have to wait before being able to proceed, while other threads may get in the way.
Parallel streams provide the capability of parallel processing over collections that are not thread-safe. It is although required that one does not modify the collection during the parallel processing.
Parallel Streams
In order to create a parallel processing stream one may invoke the method Collection.parallelStream() against a collection:
List<Integer> integerList = Arrays.asList(new Integer[] { 1, 2, 3, 4, 5, 6, 7, 8, 9 }); int sum = integerList .parallelStream() .filter(i -> i % 2 == 0) .mapToInt(i -> i) .sum(); // Will print 20 System.out.println(sum);
Keep in mind that it is the JVM that partitions the stream into multiple sub-streams, and additionally picks the order from which the elements will be processed. One should not expect the parallel stream to process the elements in the order they are defined in the original collection (although one may force the processing to be ordered as we will see later in this article).
Reduction
Let's perform a reduction in order to convert a Collection into a Map with a couple of entries: one containing a list of even numbers and the other containing a list of odd numbers:
List<Integer> integerList = Arrays.asList(new Integer[] { 1, 2, 3, 4, 5, 6, 7, 8, 9 }); Map<Integer, List<Integer>> evenOddMap = integerList .stream().collect(Collectors.groupingBy(i -> i % 2 == 0 ? 0 : 1)); // Will print 2, 4, 6, 8 System.out.println(evenOddMap.get(0)); // Will print 1, 3, 5, 7, 9 System.out.println(evenOddMap.get(1));
The following reduction is the equivalent parallel reduction:
ConcurrentMap<Integer, List<Integer>> concurrentEvenOddMap = integerList .parallelStream().collect(Collectors.groupingByConcurrent(i -> i % 2 == 0 ? 0 : 1)); System.out.println(concurrentEvenOddMap.get(0)); System.out.println(concurrentEvenOddMap.get(1));
The results produced by the parallel processing will be unordered. One may be assured that each map will contain the expected even and odd numbers respectively, but the order will not be the same for every execution.
[4, 2, 8, 6] [3, 1, 9, 7, 5] [6, 8, 4, 2] [5, 9, 7, 3, 1] [4, 2, 8, 6] [3, 1, 9, 7, 5]
As we have said earlier, it is the JVM that will split the stream into multiple sub-streams and pick the element processing order.
Note that we have used the concurrent counterparts in the reduction: ConcurrentMap as a placeholder since multiple threads will access the map in order to put the elements as they are processed. Collectors.groupingByConcurrent() in order to process the concurrent grouping operation. It will perform much faster in parallel processing than its serial counterpart.
Ordering
As we have seen earlier, the order of processing in parallel streams in undefined. This is because the JVM will split the stream into multiple sub-streams and pick the elements for processing in an optimized order for the current parallel processing.
List<Integer> integerList = Arrays.asList(new Integer[] { 1, 2, 3, 4, 5, 6, 7, 8, 9 }); integerList.parallelStream().forEach(i -> System.out.print(i + " "));
5 6 8 9 7 1 2 3 4 3 4 5 6 1 2 8 9 7 5 6 3 4 1 2 8 7 9
One may force the parallel processing to be ordered by invoking forEachOrdered, but the benefits inherent to parallel processing may be lost:
List<Integer> integerList = Arrays.asList(new Integer[] { 1, 2, 3, 4, 5, 6, 7, 8, 9 }); // Will always print 1 2 3 4 5 6 7 8 9 integerList.parallelStream().forEachOrdered(i -> System.out.print(i + " "));
Additional considerations
Keep in mind that the processing will only take place when the last (or terminal) instruction of the pipeline is invoked. Let«s see an example:
int sum = integerList .parallelStream() .filter(i -> i % 2 == 0) .mapToInt(i -> i) .sum();
The processing will only be started when sum() instruction is invoked. The JVM will then select the appropriate order to pass the elements into the intermediate actions, in order to compute the final result. This is a very important concept, as we will see in the next example:
List<Integer> integerList = Arrays.asList(new Integer[] { 1, 2, 3, 4, 5, 6, 7, 8, 9 }); List<Integer> resultingIntegerList = Collections.synchronizedList(new ArrayList<>()); integerList.parallelStream().map(i -> { resultingIntegerList.add(i); return i; }).forEachOrdered(i -> System.out.print(i + " "));
The forEachOrdered operation will guarantee that the element processing is in fact ordered, but we may not assume that the elements will be passed to the map method in the same order as they were picked up for processing. This is because the map operation will be executed simultaneously my multiple threads.
The above execution produces always the following result:
1 2 3 4 5 6 7 8 9
But the order in which the elements are stored in resultingIntegerList will not be the same accross multiple executions. This is because the map operation will be executed by multiple threads concurrently, event if the elements processing is initialized in a given order. It is not guaranteed that the initial processing order will be preserved during the intermediate instructions pipeline.
If we inspect the contents of resultingIntegerList after multiple independent invocations we will confirm that the order of elements will change across executions:
[5, 6, 8, 9, 7, 1, 3, 4, 2] [3, 5, 6, 4, 1, 8, 9, 7, 2] [5, 3, 6, 4, 1, 8, 9, 7, 2]