4

This is a question I constantly ask myself when designing a data intensive application: When is it appropriate to use stream() over parallelStream()? Would it make sense to use both? How do I quantify the metrics and conditions to intelligently decide which ones to use at runtime. From what I understand, parallelStream() is a great facility to process entries in parallel but it all comes down to execution time and overhead. Does the end justify the means?

In my particular use case, do to the nature of the application, the velocity and volume of the data I am processing will be all over the place. There will be times where the volume is so large, my application would massively benefit from parallelizing the workload. Then there are times where a single thread will accomplish the task much more efficiently. I have profiled my application a dozen times and have had mixed results.

So this brings me to my question. Is there a way in Java 8 (or later) to switch between stream() and parallelStream() intelligently? I considered at one point defining boundaries on the data that would allow for alternating between the two but in the end, not every piece of equipment is designed the same. Some systems may deal with single threaded workload much better then others. And vice versa.

It might be relevant to mention that I am using Apache Kafka, using Kafka Streams with Spring Cloud Streams. For the most part, I feel like I have squeezed everything out of Kafka in terms of performance and want to focus internally on optimizing my own service.

user0000001
  • 263
  • 3
  • 7

3 Answers3

5

You can define a custom thread pool by implementing the (Executor) interface that increases or decreases the number of threads in the pool as needed. You can submit your parallelStream chain to it as shown here using a ForkJoinPool:

I've created a working example which prints the threads that are doing the work:

import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.stream.Collectors;
import java.util.stream.LongStream;

public class TestParallel
{
  public static void main(String... args) throws InterruptedException, ExecutionException
  {
    testParallel();
  }
  
  
  static Long sum(long a, long b)
  {
    System.out.println(Thread.currentThread() + " - sum: " + a + " " + b);
    return a + b;
  }
  
  public static void testParallel() 
      throws InterruptedException, ExecutionException {
        
        long firstNum = 1;
        long lastNum = 10;

        List<Long> aList = LongStream.rangeClosed(firstNum, lastNum).boxed()
          .collect(Collectors.toList());

        System.out.println("custom: ");
        System.out.println();
        
        ForkJoinPool customThreadPool = new ForkJoinPool(4);
        long totalCustom = customThreadPool.submit(
          () -> aList.parallelStream().reduce(0L, TestParallel::sum)).get();
        
        System.out.println();
        System.out.println("standard: ");
        System.out.println();
        
        long totalStandard = aList.parallelStream().reduce(0L, TestParallel::sum);
        
        System.out.println();
        System.out.println(totalCustom + " " + totalStandard);
    }
}

Personally, if you want to get to that level of control, I'm not sure the streaming API is worth bothering with. It's not doing anything you can't do with Executors and concurrent libs. It's just a simplified facade to those features with limited capabilities.

Streams are kind of nice when you need to lay out a simple multi-step process in a little bit of code. But if all you are doing is using them to manage parallelism of tasks, the Executors and ExecutorService are more straightforward IMO. One thing I would avoid is pushing the number of threads above your machine's native thread count unless you have IO-bound processing. And if that's the case NIO is the more efficient solution.

What I'm not sure about is what the logic is that decides when to use multiple threads and when to use one. You'd have to better explain what factors come into play.

JimmyJames
  • 24,682
  • 2
  • 50
  • 92
  • This is a good answer. IIRC, streams incurs quite a bit of overhead because it tries to clean up once the work is completed. Thank you for this suggestion. – user0000001 Feb 04 '20 at 22:52
  • I looked at the ForkJoinPool link. I don't understand why this approach for the parallel stream should run in parallel inside the pool instead of inside the global pool? – Thorbjørn Ravn Andersen Feb 12 '21 at 18:58
  • @ThorbjørnRavnAndersen Can you clarify what you mean by 'global thread pool'? This doesn't seem to be a standard term with regard to Java. – JimmyJames Feb 12 '21 at 19:09
  • @JimmyJames To my understanding the Stream parallelization uses a single, global data structure for handling the individual tasks, which cannot easily be overridden as you say it can. – Thorbjørn Ravn Andersen Feb 14 '21 at 09:20
  • @ThorbjørnRavnAndersen Can you please provide some technical reference to what you are referring to? Streams are a relatively new feature relative to threads and the concurrency packages. – JimmyJames Feb 15 '21 at 14:55
  • @ThorbjørnRavnAndersen Are you referring to [this](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ForkJoinPool.html#commonPool--)? What's the concern here? There's nothing in my answer about 'overriding' this and I don't see any reason you would need to. – JimmyJames Feb 15 '21 at 16:34
  • I have now had a better look at your answer and may have been a bit hasty when understanding what you were suggesting to do while keeping the semantics of `parallelStream()` (which would require you hooking into the pool used under the covers by the implementation of Stream. Instead you are suggesting using the old and tried method of feeding Runnables to an ExecutorService, which is fine (I've used it a lot) but loses many of the advantages of the Stream API (which you mention in other words). – Thorbjørn Ravn Andersen Feb 16 '21 at 10:49
  • @ThorbjørnRavnAndersen I'm not sure I follow. You can pass the parallelStream to a custom ForkJoinPool. As I understand it, this is what is done by default. I'm not sure what advantages you are losing. And actually what I am saying is that the streams API is limited compared to what you can do with the concurrency apis such as executors. They are simpler to use but far less powerful. – JimmyJames Feb 16 '21 at 17:14
  • `long actualTotal = customThreadPool.submit( () -> aList.parallelStream().reduce(0L, Long::sum)).get(); ` runs as a single Callable in the customThreadPool. The parallelized stream executes in its own internal pool, not the customThreadPool. – Thorbjørn Ravn Andersen Feb 16 '21 at 19:11
  • @ThorbjørnRavnAndersen I created an example to test this out and see what threads are doing the work. It seems your assertion is incorrect. It is interesting though and I'm don't fully understand it but the custom pool is definitely doing the summing. – JimmyJames Feb 16 '21 at 20:21
  • @ThorbjørnRavnAndersen This piqued my curiosity and I dug into the JDK source a little bit. It's hairy but I think I found the magic. In the ForkJoinTask class, there's a method `doInvoke` which contains this snippet: `(t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? (wt = (ForkJoinWorkerThread)t).pool.awaitJoin(wt.workQueue, this, 0L)` which (if the thread is a `ForkJoinWorkerThread`) get's the parent pool and adds the work to it. – JimmyJames Feb 16 '21 at 21:14
  • @JimmyJames It appears my knowledge is either incomplete or outdated. So what you are saying is that the lambda is not of type Callable (which I assumed) but of type ForkJoinTask (as submit() belongs to ForkJoinPool). Interesting. I need to do some experiments too then :) – Thorbjørn Ravn Andersen Feb 17 '21 at 02:57
  • @ThorbjørnRavnAndersen Now that I see what's happening, it really is quite inelegant and ugly IMO. A branch based on checking the type of the current thread is a textbook example of bad practice in Java IMO. Moreover, it's completely hardcoded around `ForkJoinThread`. It's the kind of thing that drives me crazy in Java. It's like the XML libraries using behind the scenes nastiness to pick an implementation for you (which is often not what is wanted.) – JimmyJames Feb 17 '21 at 15:37
2

I don't know if this is useful but there is a design pattern called Bridge that decouples the abstraction from its implementation so you can, at runtime change between implementations.

A simple example would be a stack. For stacks where the total amount of data stored at one time is relatively small, it is more efficient to use an array. When the amount of data hits a certain point, it becomes better to use a linked-list. The stack implementation determines when it switches from one to the other.

For your case, it sounds like the processing would be behind some interface and based on the volume (do you know it before you start the processing?) your Processor class could use streams or parallel streams as appropriate.

Matthew
  • 1,085
  • 6
  • 8
  • A stack is great example. I would like to read more on that specification. The biggest question is when does it decide to alternate between those two implementations? – user0000001 Feb 04 '20 at 22:34
  • @user0000001 You make the decision because you're coding it. I hope I didn't give the impression that this is how all stacks are implemented; it's just an illustration of the pattern. So you need to decide when to choose the different implementation; maybe you already know how big your dataset is? – Matthew Feb 05 '20 at 13:34
0

StreamSupport.stream creates a new sequential or parallel Stream from a Spliterator (which in turn can be obtained from any Collection). Conditionally you can switch parallel processing on or off.

boolean parallel = true;
StreamSupport.stream(IntStream.range(0, 10).spliterator(), parallel).forEach(...);

Of course, you can also if-else if you have such a flag:

boolean parallel = findOutIfParallelizingIsWorthIt();
Stream<T> myStream = parallel ? myCollection.parallelStream() : myCollection.stream();
myStream.forEach(...);
Johannes Hahn
  • 328
  • 1
  • 10