Stream API

A Stream is a sequence of values, objects and data which can be pipelined as input to another function and get the desired result.

Java 8 introduces java.util.stream and java.util.function package, it contains Classes, Enum, Interfaces which provides Stream API. Streams utilizes existing data structures such as Collections, arrays or I/O channels and provides a functional approach for processing.

Streams put a wrapper around the data source which allows performing operations and bulk processing conveniently and fast. Streams provide internal iteration of elements. 

How to create Streams

There are multiple ways to get Stream from Collection or Arrays, some are as below.

  • Stream.of
  • Stream.of(array)
  • List.stream()
  • Stream.generate()
  • Stream.iterate()
  • Stream of String chars or tokens
  • Stream.Buider
package org.wesome.java8;

import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.stream.IntStream;
import java.util.stream.Stream;

class Apple {
    public static void main(String args[]) {

        System.out.println("*------------Stream of Array------------*");
        String[] appleArray = new String[]{"Macintosh", "Fuji", "Gala", "Jonagold"};
        Stream.of(appleArray);
        Stream<String> streamOfArray = Arrays.stream(appleArray);

        System.out.println("*------------Stream of Collection------------*");
        List<String> appleList = Arrays.asList("Macintosh", "Fuji", "Gala", "Jonagold");
        Stream<List<String>> streamOfCollection = Stream.of(appleList);

        System.out.println("*------------Collection Stream------------*");
        Stream<String> collectionSequentialStream = appleList.stream();
        Stream<String> collectionParallelStream = appleList.parallelStream();

        System.out.println("*------------Stream Generate------------*");
        Stream.generate(Math::random).limit(5).forEach(System.out::println);
        Stream.generate(Random::new).map(Random::nextInt).limit(5).forEach(System.out::println);

        System.out.println("*------------Stream Iterate------------*");
        Stream.iterate(1, i -> i + 1).limit(5).forEach(System.out::println);

        System.out.println("*------------Random Int Stream------------*");
        IntStream randomInt = new Random().ints().limit(10);

        System.out.println("*------------Stream Builder------------*");
        Stream.Builder<String> empStreamBuilder = Stream.builder();
        empStreamBuilder.accept("Macintosh");
        empStreamBuilder.accept("Fuji");
        empStreamBuilder.accept("Gala");
        empStreamBuilder.accept("Jonagold");
        Stream<String> appleStream = empStreamBuilder.build();
    }
}

Stream is not a data Structure

Stream is not a data structure, it reads data from other data structures such as Collection, Array or I/O but it doesn't store or hold data, unlike Collections, data location or position cannot be pinpointed in Stream. Only functions can be provided which will operate on Stream.

package org.wesome.java8;

import java.util.Arrays;
import java.util.List;

class Apple {
    public static void main(String args[]) {
        List<String> appleList = Arrays.asList("Macintosh", "Fuji", "Gala", "Jonagold");
        appleList.stream().forEach(apple -> System.out.println(apple + " is at index " + appleList.indexOf(apple)));
    }
}

Streams are Functional in Nature

Stream never changes the original data but just pipelines them as input into another function, for example, a filtering operation on a Stream of data will not remove the elements from the original Stream but create a new Stream of filtered results, and the original Stream will remain intact.

package org.wesome.java8;

import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

class Apple {
    public static void main(String args[]) {
        List<String> appleList = Arrays.asList("Macintosh", "Fuji", "Gala", "Jonagold");
        List<String> filterList = appleList.stream().filter(a -> a.startsWith("M")).collect(Collectors.toList());
        System.out.println("appleList = " + appleList);
        System.out.println("filterList = " + filterList);
    }
}

Streams are Lazy by default

Streams intermediate methods are lazy-loaded by default in nature and will only be evaluated when a terminal operator executes.

Functions have Stream as Return Type are Lazy other wise Eager

Intermediate functions such as filter, map, limit, skip usually takes stream as input and returns streams back hence are lazy functions whereas Terminal or non stream functions such as sum, min, max,findFirst, reduce, collect are Eager functions.

package org.wesome.java8;

import java.util.stream.IntStream;

class Apple {
    public static void main(String args[]) {

        System.out.println("*--------------Intermediate function without terminal function--------------*");
        IntStream.rangeClosed(1, 5).filter((i) -> {
            System.out.println("This method will never execute");
            System.out.println("integer should be divisible by 2 = " + i);
            return i % 2 == 0;
        });

        System.out.println("*--------------Intermediate function with terminal function--------------*");
        IntStream.rangeClosed(1, 5).filter((i) -> {
            System.out.println("integer should be divisible by 2 = " + i);
            return i % 2 == 0;
        }).forEach(System.out::println);
    }
}

Streams are Efficient

Streams only operate when it's absolutely necessary. for example, in the below scenario, the stream will apply all the filters on the first element, and if all holds true, it will break without even touching other elements.

package org.wesome.java8;

import java.util.Arrays;
import java.util.OptionalInt;
import java.util.function.IntPredicate;

public class Apple1 {
    public static void main(String[] args) {
        IntPredicate greaterThen5 = integer -> integer > 5;
        IntPredicate lessThen7 = integer -> integer < 7;
        IntPredicate even = integer -> integer % 2 == 0;

        int[] arr = new int[]{6, 7, 8};
        OptionalInt first = Arrays.stream(arr)
                .peek(value -> System.out.println("value after stream = " + value))
                .filter(greaterThen5)
                .peek(value -> System.out.println("value after greaterThen5 = " + value))
                .filter(lessThen7)
                .peek(value -> System.out.println("value after lessThen7 = " + value))
                .filter(even)
                .peek(value -> System.out.println("value after even = " + value))
                .findFirst();
        System.out.println("first = " + first.getAsInt());
    }
}

Streams are Consumable only Once

like an Iterator, once visited, it cannot be iterated again, same with Streams, they are consumable only once in nature, once iterated over a Stream, in order to iterate again, a new Stream must be created.

package org.wesome.java8;

import java.util.stream.IntStream;

class Apple {
    public static void main(String args[]) {

        /*---------------Generate Stream---------------*/
        IntStream intStream = IntStream.rangeClosed(1, 5);

        /*---------------Iterate over the Stream---------------*/
        intStream.forEach(System.out::println);

        /*---------------Since Stream has already been Iterated, it will give error---------------*/
        try {
            intStream.forEach(System.out::println);
        } catch (IllegalStateException e) {
            System.out.println("e = " + e);
        }
    }
}

Streams provide Runtime Polymorphism

for and enhanced for loop passes the collection to the external iterator ie passing an object to a function, whereas stream is invoking the loop on the collection ie invoking a function on an object which is polymorphism.

stream doesn't require any code change for list or array iteration, based on input type it automatically performs the operation internally.

package org.wesome.java8;

import java.util.Arrays;
import java.util.List;

public class Apple {
    public static void main(String[] args) {
        List<Integer> list = Arrays.asList(1, 2, 3, 4, 5);
        /*  External iterator, for loop */
        for (int i = 0; i < list.size(); i++) {
            System.out.println(list.get(i));
        }
        /*  External iterator, enhanced for loop */
        for (Integer integer : list) {
            System.out.println(integer);
        }
        /*  Internal Iterator, stream  */
        list.forEach(System.out::println);
    }
}

Stream Support Dependency Inversion

Dependency Inversion is a concept of inverting or altering the flow of execution based on dependency or conditions, it can also relate to Strategy Patterns. In some cases, Streams allow altering the flow of execution of the same code based on the conditions provided.

package org.wesome.java8;

import java.util.Arrays;
import java.util.List;
import java.util.function.Predicate;

class Apple {
    public static int count(List<Integer> list, Predicate<Integer> selector) {
        return list.stream().filter(selector).reduce(0, Math::addExact);
    }

    public static void main(String args[]) {
        List<Integer> list = Arrays.asList(1, 2, 3, 4, 5);
        Predicate<Integer> even = (number) -> number % 2 == 0;
        Predicate<Integer> odd = (number) -> number % 2 != 0;
        Predicate<Integer> divisibleBy3 = (number) -> number % 3 == 0;
        Predicate<Integer> divisibleBy4 = (number) -> number % 4 == 0;

        System.out.println("count all numbers " + count(list, e -> true));
        System.out.println("count even numbers " + count(list, even));
        System.out.println("count odd numbers " + count(list, odd));
        System.out.println("count divisible by 3 numbers " + count(list, divisibleBy3));
        System.out.println("count divisible by 4 numbers " + count(list, divisibleBy4));
    }
}

Stream Makes Code More Expressive

Declarative programming focuses on what to do as well as how to do it, whereas Streams makes code more Expressive, it focuses more on what to do and takes care of how to do it itself.

Streams take care of some basic edge cases as well, like in ImperativeStyle return value might be null which might result into NullPointerException. In contrast, Streams returns Optional which will take care of null. let's understand by example

  • Imperative Style
package org.wesome.java8;

import java.util.Arrays;
import java.util.List;

class Apple {
    public static void main(String args[]) {
        /*  find double of first even number which is greater then 3  */
        List<Integer> list = Arrays.asList(1, 2, 3, 4, 5);
        System.out.println(findImperativeStyle(list));
    }

    /*================ imperative style================*/
    private static int findImperativeStyle(List<Integer> list) {
        int result = 0;
        for (Integer integer : list) {
            if (integer > 3 && integer % 2 == 0) {
                result = integer * integer;
                break;
            }
        }
        return result;
    }
}
  • Declarative Style Using Lambda Expression
package org.wesome.java8;

import java.util.Arrays;
import java.util.List;
import java.util.Optional;

class Apple {

    public static void main(String args[]) {

        /*  find double of first even number which is greater then 3  */
        List<Integer> list = Arrays.asList(1, 2, 3, 4, 5);

        /*  using lambda expression    */
        findLambdaStyle(list).ifPresent(System.out::println);
    }

    /*================ lambda expression================*/
    private static Optional<Integer> findLambdaStyle(List<Integer> list) {
        return list.stream()
                .filter(integer -> integer > 3)
                .filter(integer -> integer % 2 == 0)
                .map(integer -> integer * integer)
                .findFirst();
    }
}
  • Declarative Style Using Method Reference
package org.wesome.java8;

import java.util.Arrays;
import java.util.List;
import java.util.Optional;

class Apple {

    public static void main(String args[]) {

        /*  find double of first even number which is greater then 3  */
        List<Integer> list = Arrays.asList(1, 2, 3, 4, 5);

        /*  using Method Reference    */
        findMethodReferenceStyle(list).ifPresent(System.out::println);
    }

    /*================ Method Reference style================*/
    private static Optional<Integer> findMethodReferenceStyle(List<Integer> list) {
        return list.stream()
                .filter(Apple::isGreaterThenThree)
                .filter(Apple::isEven)
                .map(Apple::getDouble)
                .findFirst();
    }

    private static int getDouble(Integer number) {
        return number * number;
    }

    private static boolean isEven(Integer number) {
        return number % 2 == 0;
    }

    private static boolean isGreaterThenThree(int number) {
        return number > 3;
    }
}
  • Declarative Style Using Functional Interface
package org.wesome.java8;

import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.function.Function;
import java.util.function.Predicate;

class Apple {

    public static void main(String args[]) {

        /*  find double of first even number which is greater then 3  */
        List<Integer> list = Arrays.asList(1, 2, 3, 4, 5);

        /*  using Method Reference    */
        findFunctionalInterfaceStyle(list).ifPresent(System.out::println);
    }

    private static Optional<Integer> findFunctionalInterfaceStyle(List<Integer> list) {
        Function<Integer, Integer> getDouble = number -> number * number;
        Predicate<Integer> isEven = number -> number % 2 == 0;
        Predicate<Integer> isGreaterThenThree = number -> number > 3;

        return list.stream()
                .filter(isGreaterThenThree)
                .filter(isEven)
                .map(getDouble)
                .findFirst();
    }
}

Parallel Streams

By default, the stream will execute in a sequential manner in the main thread. sequential stream process the elements sequentially just like for loop by a single thread.

package org.wesome.java8;

import java.util.stream.IntStream;

class Apple {
    public static void main(String args[]) {
        IntStream.rangeClosed(1, 50).forEach(Apple::print);
    }

    public static void print(int integer) {
        System.out.println(Thread.currentThread().getName() + " : " + integer);
    }
}

Stream provides Parallel execution options which split the stream into multiple sub streams which are processed parallelly with multiple threads and utilize multiple cores of the CPU, once the computation is done, the intermediate streams combined to create the final result. by default parallel stream will provide the multiple threads based on the no of cores available in the CPU.

package org.wesome.java8;

import java.time.LocalTime;
import java.util.Arrays;
import java.util.List;

class Apple {
    public static int getDouble(int number) {
        int result = 0;
        try {
            Thread.sleep(1000);
            result = number * number;
            System.out.print("processed by " + Thread.currentThread().getName() + " ");
        } catch (InterruptedException e) {
        }
        return result;
    }

    public static void main(String args[]) {
        List<Integer> list = Arrays.asList(1, 2, 3, 4, 5);
        LocalTime start = LocalTime.now();
        list.stream().map(Apple::getDouble).forEach(System.out::println);
        LocalTime end = LocalTime.now();
        System.out.println("sequential stream took = " + (end.getSecond() - start.getSecond()));

        start = LocalTime.now();
        list.parallelStream().map(Apple::getDouble).forEach(System.out::println);
        end = LocalTime.now();
        System.out.println("parallel stream took = " + (end.getSecond() - start.getSecond()));
    }
}

Stream API allows running sequential streams in parallel without rewriting the code. Behind the scene, Parallel Stream utilises the fork join framework for thread management and processing parallel tasks.

package org.wesome.java8;

import java.util.stream.IntStream;

class Apple {
    public static void main(String args[]) {
        System.out.println("available processors are " + Runtime.getRuntime().availableProcessors());
        IntStream.rangeClosed(1, 50).parallel().forEach(Apple::print);
    }

    public static void print(int integer) {
        System.out.println(Thread.currentThread().getName() + " : " + integer);
    }
}

Parallelization by default depends upon the CPU core of this system, which can be altered using JVM arguments if more threads are required.

package org.wesome.java8;

import java.util.concurrent.ForkJoinPool;

class Apple {
    public static void main(String args[]) {
        System.out.println("getParallelism=" + ForkJoinPool.commonPool().getParallelism());
    }
}

then run the same above program using

-Djava.util.concurrent.ForkJoinPool.common.parallelism=20

Parallel thread executes in sequential batches of processor count, once parallel threads occupies all the available CPU processors.

package org.wesome.java8;

import java.time.LocalTime;
import java.util.concurrent.ForkJoinPool;
import java.util.stream.IntStream;

class Apple {
    public static void getDouble(int number) {
        try {
            Thread.sleep(2000);
            System.out.println("processed by " + Thread.currentThread().getName() + " " + LocalTime.now());
        } catch (InterruptedException e) {
        }
    }

    public static void main(String args[]) {
        System.out.println("processors " + Runtime.getRuntime().availableProcessors());
        System.out.println("Parallelism=" + ForkJoinPool.commonPool().getParallelism());
        IntStream.rangeClosed(1, 50).parallel().forEach(Apple::getDouble);
    }
}
-Djava.util.concurrent.ForkJoinPool.common.parallelism=10

play with JVM argument to understand Processors and Parallelism better

Operations Type

Streams can be attached to listeners or operations, whenever an element is iterated, the attached listeners will be called once for each element and will process each element once.

Streams majorly supports Intermediate and Terminal operations which combine to create Stream Pipeline.
A Stream pipeline has a source for example a Collection, an Array or an I/O channel, which will provide input to a single or multiple Intermediate operations such as filter or map and in the end a terminal operation such as forEach or reduce.

let's understand them one by one.

Intermediate Operations

Intermediate Operations takes Streams as input, transforms or filters the data, creates a new Streams and passes it to next Intermediate or Terminal operation.

Multiple Intermediate methods can be pipelined or chained one after another to get the desired result, the Terminalmethod marks the end of a Streams and returns results.

package org.wesome.java8;

import java.util.stream.IntStream;

class Apple {
    public static void main(String args[]) {
        IntStream.rangeClosed(1, 1000).filter(i -> i % 2 == 0).filter(i -> i % 3 == 0).filter(i -> i % 5 == 0).forEach(System.out::println);
    }
}

All Intermediate operations always return a new Stream. it never changes or alters the existing source.

package org.wesome.java8;

import java.util.Arrays;
import java.util.List;

class Apple {
    public static void main(String args[]) {
        List<String> appleList = Arrays.asList("Macintosh", "Fuji", "Gala", "Jonagold");

        System.out.println("*---------------------------filter all Apples whose name is less then 4 characters---------------------------*");
        appleList.stream().filter(str -> str.length() > 4).forEach(System.out::println);

        System.out.println("*---------------------------Printing original Apple List---------------------------*");
        appleList.stream().forEach(System.out::println);
    }
}

Traversal of the pipeline source does not begin until the terminal operation of the pipeline is executed, if an intermediate function such as a filter is called standalone without any terminal function, it won't execute.

package org.wesome.java8;

import java.util.Arrays;
import java.util.List;

class Apple {
    public static void main(String args[]) {
        List<String> appleList = Arrays.asList("Macintosh", "Fuji", "Gala", "Jonagold");

        System.out.println("*---------------------------Intermediate operation without terminal operation---------------------------*");
        appleList.stream().filter(str -> {
            System.out.println("checking for  = " + str);
            return str.length() > 4;
        });

        System.out.println("*---------------------------Intermediate operation with terminal operation---------------------------*");
        appleList.stream().filter(str -> {
            System.out.println("checking for  = " + str);
            return str.length() > 4;
        }).forEach(str -> {
            System.out.println(str + " has more then 4 characters");
        });
    }
}

stateless and stateful operations

Intermediate operations are further divided into stateful and stateless operations.

Stateless operations such as filter or map don't need the state of the previous element of the Stream while processing the current element, each element can be processed independently. Streams processed sequentially can be processed in a single flow

package org.wesome.java8;

import java.util.Arrays;
import java.util.Comparator;
import java.util.List;

class Apple {
    public static void main(String args[]) {
        List<String> appleList = Arrays.asList("Macintosh", "Fuji", "Gala", "Jonagold");

        System.out.println("*------------Stateless Operations------------*");
        appleList.stream().filter(str -> str.length() > 4).forEach(System.out::println);
        appleList.stream().map(String::toUpperCase).forEach(System.out::println);

        System.out.println("*------------Stateful Operations------------*");
        appleList.stream().distinct().forEach(System.out::println);
        appleList.stream().sorted(Comparator.naturalOrder()).forEach(System.out::println);
    }
}

Stateful operations such as distinct or sort require the state of previous elements while processing current. These operations need to process the entire Stream input before producing the results, for example, sort or distinct cannot be computed unless the entire input is compared. Statefull operations with Parallel Stream needs to buffer some extra data, hence stateful performs slower in a multithreaded environment.

package org.wesome.java8;

import java.util.OptionalInt;
import java.util.stream.IntStream;

class Apple {
    public static void main(String args[]) {

        System.out.println("*---------------------------Sequential Stream---------------------------*");
        long start = System.currentTimeMillis();
        OptionalInt max = IntStream.rangeClosed(1, 50).max();
        long end = System.currentTimeMillis();
        long seqDuration = end - start;
        max.ifPresent(x -> System.out.println("max is " + x + " sequential stream took = " + seqDuration));

        System.out.println("*---------------------------Parallel Stream---------------------------*");
        start = System.currentTimeMillis();
        IntStream.rangeClosed(1, 50).parallel().max();
        end = System.currentTimeMillis();
        long parDuration = end - start;
        max.ifPresent(x -> System.out.println("max is " + x + " sequential stream took = " + parDuration));
    }
}

some Intermediate Operations are as below.

  • filter()
  • map()
  • flatMap()
  • distinct()
  • sorted()
  • peek()
  • limit()
  • skip()

Terminal Operations

Terminal operations mark the ending of the Streams, it returns the result of operations. it usually returns a single value. Intermediate functions are by default lazy in nature and will only execute once terminal operation is in flow.

package org.wesome.java8;

import java.util.stream.IntStream;

class Apple {
    public static void main(String args[]) {

        System.out.println("*--------------Intermediate function without terminal function--------------*");
        IntStream.rangeClosed(1, 5).filter((i) -> {
            System.out.println("This method will never execute");
            System.out.println("integer should be divisible by 2 = " + i);
            return i % 2 == 0;
        });

        System.out.println("*--------------Intermediate function with terminal function--------------*");
        IntStream.rangeClosed(1, 5).filter((i) -> {
            System.out.println("integer should be divisible by 2 = " + i);
            return i % 2 == 0;
        }).forEach(System.out::println);
    }
}

some Terminal Operations are as below.

  • forEach()
  • forEachOrdered()
  • toArray()
  • reduce()
  • collect()
  • min()
  • max()
  • count()
  • anyMatch()
  • allMatch()
  • noneMatch()
  • findFirst()
  • findAny()

follow us on