A Stream
is a sequence of values, objects and data which can be pipelined
as input to another function and get the desired result.
Java 8
introduces java.util.stream
and java.util.function
package, it contains Classes
, Enum
, Interfaces
which provides Stream API
. Streams
utilizes existing data structures such as Collections
, arrays
or I/O
channels and provides a functional
approach for processing.
Streams
put a wrapper around the data source which allows performing operations and bulk processing conveniently and fast. Streams provide internal iteration of elements.
How to create Streams
There are multiple ways to get Stream
from Collection
or Arrays
, some are as below.
- Stream.of
- Stream.of(array)
- List.stream()
- Stream.generate()
- Stream.iterate()
- Stream of String chars or tokens
- Stream.Buider
package org.wesome.java8;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.stream.IntStream;
import java.util.stream.Stream;
class Apple {
public static void main(String args[]) {
System.out.println("*------------Stream of Array------------*");
String[] appleArray = new String[]{"Macintosh", "Fuji", "Gala", "Jonagold"};
Stream.of(appleArray);
Stream<String> streamOfArray = Arrays.stream(appleArray);
System.out.println("*------------Stream of Collection------------*");
List<String> appleList = Arrays.asList("Macintosh", "Fuji", "Gala", "Jonagold");
Stream<List<String>> streamOfCollection = Stream.of(appleList);
System.out.println("*------------Collection Stream------------*");
Stream<String> collectionSequentialStream = appleList.stream();
Stream<String> collectionParallelStream = appleList.parallelStream();
System.out.println("*------------Stream Generate------------*");
Stream.generate(Math::random).limit(5).forEach(System.out::println);
Stream.generate(Random::new).map(Random::nextInt).limit(5).forEach(System.out::println);
System.out.println("*------------Stream Iterate------------*");
Stream.iterate(1, i -> i + 1).limit(5).forEach(System.out::println);
System.out.println("*------------Random Int Stream------------*");
IntStream randomInt = new Random().ints().limit(10);
System.out.println("*------------Stream Builder------------*");
Stream.Builder<String> empStreamBuilder = Stream.builder();
empStreamBuilder.accept("Macintosh");
empStreamBuilder.accept("Fuji");
empStreamBuilder.accept("Gala");
empStreamBuilder.accept("Jonagold");
Stream<String> appleStream = empStreamBuilder.build();
}
}
Stream is not a data Structure
Stream
is not a data structure, it reads data from other data structures such as Collection, Array or I/O but it doesn't store or hold data, unlike Collections
, data location or position cannot be pinpointed in Stream
. Only functions can be provided which will operate on Stream
.
package org.wesome.java8;
import java.util.Arrays;
import java.util.List;
class Apple {
public static void main(String args[]) {
List<String> appleList = Arrays.asList("Macintosh", "Fuji", "Gala", "Jonagold");
appleList.stream().forEach(apple -> System.out.println(apple + " is at index " + appleList.indexOf(apple)));
}
}
Streams are Functional in Nature
Stream
never changes the original data but just pipelines them as input into another function, for example, a filtering operation on a Stream
of data will not remove the elements from the original Stream
but create a new Stream
of filtered results, and the original Stream
will remain intact.
package org.wesome.java8;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
class Apple {
public static void main(String args[]) {
List<String> appleList = Arrays.asList("Macintosh", "Fuji", "Gala", "Jonagold");
List<String> filterList = appleList.stream().filter(a -> a.startsWith("M")).collect(Collectors.toList());
System.out.println("appleList = " + appleList);
System.out.println("filterList = " + filterList);
}
}
Streams are Lazy by default
Streams
intermediate methods are lazy-loaded
by default in nature and will only be evaluated when a terminal operator executes.
Functions have Stream as Return Type are Lazy other wise Eager
Intermediate
functions such as filter
, map
, limit
, skip
usually takes stream
as input and returns streams
back hence are lazy
functions whereas Terminal
or non stream
functions such as sum
, min
, max
,findFirst
, reduce
, collect
are Eager
functions.
package org.wesome.java8;
import java.util.stream.IntStream;
class Apple {
public static void main(String args[]) {
System.out.println("*--------------Intermediate function without terminal function--------------*");
IntStream.rangeClosed(1, 5).filter((i) -> {
System.out.println("This method will never execute");
System.out.println("integer should be divisible by 2 = " + i);
return i % 2 == 0;
});
System.out.println("*--------------Intermediate function with terminal function--------------*");
IntStream.rangeClosed(1, 5).filter((i) -> {
System.out.println("integer should be divisible by 2 = " + i);
return i % 2 == 0;
}).forEach(System.out::println);
}
}
Streams are Efficient
Streams
only operate when it's absolutely necessary. for example, in the below scenario, the stream
will apply all the filters
on the first element, and if all holds true, it will break without even touching other elements.
package org.wesome.java8;
import java.util.Arrays;
import java.util.OptionalInt;
import java.util.function.IntPredicate;
public class Apple1 {
public static void main(String[] args) {
IntPredicate greaterThen5 = integer -> integer > 5;
IntPredicate lessThen7 = integer -> integer < 7;
IntPredicate even = integer -> integer % 2 == 0;
int[] arr = new int[]{6, 7, 8};
OptionalInt first = Arrays.stream(arr)
.peek(value -> System.out.println("value after stream = " + value))
.filter(greaterThen5)
.peek(value -> System.out.println("value after greaterThen5 = " + value))
.filter(lessThen7)
.peek(value -> System.out.println("value after lessThen7 = " + value))
.filter(even)
.peek(value -> System.out.println("value after even = " + value))
.findFirst();
System.out.println("first = " + first.getAsInt());
}
}
Streams are Consumable only Once
like an Iterator
, once visited, it cannot be iterated again, same with Streams
, they are consumable only once in nature, once iterated over a Stream
, in order to iterate again, a new Stream
must be created.
package org.wesome.java8;
import java.util.stream.IntStream;
class Apple {
public static void main(String args[]) {
/*---------------Generate Stream---------------*/
IntStream intStream = IntStream.rangeClosed(1, 5);
/*---------------Iterate over the Stream---------------*/
intStream.forEach(System.out::println);
/*---------------Since Stream has already been Iterated, it will give error---------------*/
try {
intStream.forEach(System.out::println);
} catch (IllegalStateException e) {
System.out.println("e = " + e);
}
}
}
Streams provide Runtime Polymorphism
for
and enhanced for
loop passes the collection
to the external iterator
ie passing an object to a function, whereas stream
is invoking the loop on the collection
ie invoking a function on an object which is polymorphism
.
stream
doesn't require any code change for list
or array
iteration, based on input type it automatically performs the operation internally.
package org.wesome.java8;
import java.util.Arrays;
import java.util.List;
public class Apple {
public static void main(String[] args) {
List<Integer> list = Arrays.asList(1, 2, 3, 4, 5);
/* External iterator, for loop */
for (int i = 0; i < list.size(); i++) {
System.out.println(list.get(i));
}
/* External iterator, enhanced for loop */
for (Integer integer : list) {
System.out.println(integer);
}
/* Internal Iterator, stream */
list.forEach(System.out::println);
}
}
Stream Support Dependency Inversion
Dependency Inversion
is a concept of inverting or altering the flow of execution based on dependency or conditions, it can also relate to Strategy Patterns
. In some cases, Streams
allow altering the flow of execution of the same code based on the conditions provided.
package org.wesome.java8;
import java.util.Arrays;
import java.util.List;
import java.util.function.Predicate;
class Apple {
public static int count(List<Integer> list, Predicate<Integer> selector) {
return list.stream().filter(selector).reduce(0, Math::addExact);
}
public static void main(String args[]) {
List<Integer> list = Arrays.asList(1, 2, 3, 4, 5);
Predicate<Integer> even = (number) -> number % 2 == 0;
Predicate<Integer> odd = (number) -> number % 2 != 0;
Predicate<Integer> divisibleBy3 = (number) -> number % 3 == 0;
Predicate<Integer> divisibleBy4 = (number) -> number % 4 == 0;
System.out.println("count all numbers " + count(list, e -> true));
System.out.println("count even numbers " + count(list, even));
System.out.println("count odd numbers " + count(list, odd));
System.out.println("count divisible by 3 numbers " + count(list, divisibleBy3));
System.out.println("count divisible by 4 numbers " + count(list, divisibleBy4));
}
}
Stream Makes Code More Expressive
Declarative programming
focuses on what to do as well as how to do it, whereas Streams
makes code more Expressive
, it focuses more on what to do and takes care of how to do it itself.
Streams
take care of some basic edge cases as well, like in ImperativeStyle
return value might be null
which might result into NullPointerException
. In contrast, Streams
returns Optional
which will take care of null
. let's understand by example
- Imperative Style
package org.wesome.java8;
import java.util.Arrays;
import java.util.List;
class Apple {
public static void main(String args[]) {
/* find double of first even number which is greater then 3 */
List<Integer> list = Arrays.asList(1, 2, 3, 4, 5);
System.out.println(findImperativeStyle(list));
}
/*================ imperative style================*/
private static int findImperativeStyle(List<Integer> list) {
int result = 0;
for (Integer integer : list) {
if (integer > 3 && integer % 2 == 0) {
result = integer * integer;
break;
}
}
return result;
}
}
- Declarative Style Using Lambda Expression
package org.wesome.java8;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
class Apple {
public static void main(String args[]) {
/* find double of first even number which is greater then 3 */
List<Integer> list = Arrays.asList(1, 2, 3, 4, 5);
/* using lambda expression */
findLambdaStyle(list).ifPresent(System.out::println);
}
/*================ lambda expression================*/
private static Optional<Integer> findLambdaStyle(List<Integer> list) {
return list.stream()
.filter(integer -> integer > 3)
.filter(integer -> integer % 2 == 0)
.map(integer -> integer * integer)
.findFirst();
}
}
- Declarative Style Using Method Reference
package org.wesome.java8;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
class Apple {
public static void main(String args[]) {
/* find double of first even number which is greater then 3 */
List<Integer> list = Arrays.asList(1, 2, 3, 4, 5);
/* using Method Reference */
findMethodReferenceStyle(list).ifPresent(System.out::println);
}
/*================ Method Reference style================*/
private static Optional<Integer> findMethodReferenceStyle(List<Integer> list) {
return list.stream()
.filter(Apple::isGreaterThenThree)
.filter(Apple::isEven)
.map(Apple::getDouble)
.findFirst();
}
private static int getDouble(Integer number) {
return number * number;
}
private static boolean isEven(Integer number) {
return number % 2 == 0;
}
private static boolean isGreaterThenThree(int number) {
return number > 3;
}
}
- Declarative Style Using Functional Interface
package org.wesome.java8;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.function.Function;
import java.util.function.Predicate;
class Apple {
public static void main(String args[]) {
/* find double of first even number which is greater then 3 */
List<Integer> list = Arrays.asList(1, 2, 3, 4, 5);
/* using Method Reference */
findFunctionalInterfaceStyle(list).ifPresent(System.out::println);
}
private static Optional<Integer> findFunctionalInterfaceStyle(List<Integer> list) {
Function<Integer, Integer> getDouble = number -> number * number;
Predicate<Integer> isEven = number -> number % 2 == 0;
Predicate<Integer> isGreaterThenThree = number -> number > 3;
return list.stream()
.filter(isGreaterThenThree)
.filter(isEven)
.map(getDouble)
.findFirst();
}
}
Parallel Streams
By default, the stream will execute in a sequential
manner in the main thread. sequential stream
process the elements sequentially
just like for loop
by a single thread.
package org.wesome.java8;
import java.util.stream.IntStream;
class Apple {
public static void main(String args[]) {
IntStream.rangeClosed(1, 50).forEach(Apple::print);
}
public static void print(int integer) {
System.out.println(Thread.currentThread().getName() + " : " + integer);
}
}
Stream
provides Parallel
execution options which split the stream into multiple sub streams
which are processed parallelly with multiple threads
and utilize multiple cores of the CPU
, once the computation is done, the intermediate streams
combined to create the final result. by default parallel stream
will provide the multiple threads based on the no of cores available in the CPU
.
package org.wesome.java8;
import java.time.LocalTime;
import java.util.Arrays;
import java.util.List;
class Apple {
public static int getDouble(int number) {
int result = 0;
try {
Thread.sleep(1000);
result = number * number;
System.out.print("processed by " + Thread.currentThread().getName() + " ");
} catch (InterruptedException e) {
}
return result;
}
public static void main(String args[]) {
List<Integer> list = Arrays.asList(1, 2, 3, 4, 5);
LocalTime start = LocalTime.now();
list.stream().map(Apple::getDouble).forEach(System.out::println);
LocalTime end = LocalTime.now();
System.out.println("sequential stream took = " + (end.getSecond() - start.getSecond()));
start = LocalTime.now();
list.parallelStream().map(Apple::getDouble).forEach(System.out::println);
end = LocalTime.now();
System.out.println("parallel stream took = " + (end.getSecond() - start.getSecond()));
}
}
Stream API
allows running sequential streams
in parallel
without rewriting the code. Behind the scene, Parallel Stream
utilises the fork join framework
for thread management and processing parallel
tasks.
package org.wesome.java8;
import java.util.stream.IntStream;
class Apple {
public static void main(String args[]) {
System.out.println("available processors are " + Runtime.getRuntime().availableProcessors());
IntStream.rangeClosed(1, 50).parallel().forEach(Apple::print);
}
public static void print(int integer) {
System.out.println(Thread.currentThread().getName() + " : " + integer);
}
}
Parallelization
by default depends upon the CPU
core of this system, which can be altered using JVM
arguments if more threads are required.
package org.wesome.java8;
import java.util.concurrent.ForkJoinPool;
class Apple {
public static void main(String args[]) {
System.out.println("getParallelism=" + ForkJoinPool.commonPool().getParallelism());
}
}
then run the same above program using
-Djava.util.concurrent.ForkJoinPool.common.parallelism=20
Parallel
thread executes in sequential batches
of processor count
, once parallel threads
occupies all the available CPU
processors.
package org.wesome.java8;
import java.time.LocalTime;
import java.util.concurrent.ForkJoinPool;
import java.util.stream.IntStream;
class Apple {
public static void getDouble(int number) {
try {
Thread.sleep(2000);
System.out.println("processed by " + Thread.currentThread().getName() + " " + LocalTime.now());
} catch (InterruptedException e) {
}
}
public static void main(String args[]) {
System.out.println("processors " + Runtime.getRuntime().availableProcessors());
System.out.println("Parallelism=" + ForkJoinPool.commonPool().getParallelism());
IntStream.rangeClosed(1, 50).parallel().forEach(Apple::getDouble);
}
}
-Djava.util.concurrent.ForkJoinPool.common.parallelism=10
play with JVM argument
to understand Processors
and Parallelism
better
Operations Type
Streams
can be attached to listeners or operations, whenever an element is iterated, the attached listeners will be called once for each element and will process each element once.
Streams
majorly supports Intermediate
and Terminal
operations which combine to create Stream
Pipeline.
A Stream
pipeline has a source for example a Collection
, an Array
or an I/O
channel, which will provide input to a single or multiple Intermediate
operations such as filter
or map
and in the end a terminal
operation such as forEach
or reduce
.
let's understand them one by one.
Intermediate Operations
Intermediate Operations
takes Streams
as input, transforms or filters the data, creates a new Streams
and passes it to next Intermediate
or Terminal
operation.
Multiple Intermediate
methods can be pipelined or chained one after another to get the desired result, the Terminal
method marks the end of a Streams
and returns results.
package org.wesome.java8;
import java.util.stream.IntStream;
class Apple {
public static void main(String args[]) {
IntStream.rangeClosed(1, 1000).filter(i -> i % 2 == 0).filter(i -> i % 3 == 0).filter(i -> i % 5 == 0).forEach(System.out::println);
}
}
All Intermediate
operations always return a new Stream
. it never changes or alters the existing source.
package org.wesome.java8;
import java.util.Arrays;
import java.util.List;
class Apple {
public static void main(String args[]) {
List<String> appleList = Arrays.asList("Macintosh", "Fuji", "Gala", "Jonagold");
System.out.println("*---------------------------filter all Apples whose name is less then 4 characters---------------------------*");
appleList.stream().filter(str -> str.length() > 4).forEach(System.out::println);
System.out.println("*---------------------------Printing original Apple List---------------------------*");
appleList.stream().forEach(System.out::println);
}
}
Traversal of the pipeline source does not begin until the terminal
operation of the pipeline is executed, if an intermediate
function such as a filter is called standalone without any terminal
function, it won't execute.
package org.wesome.java8;
import java.util.Arrays;
import java.util.List;
class Apple {
public static void main(String args[]) {
List<String> appleList = Arrays.asList("Macintosh", "Fuji", "Gala", "Jonagold");
System.out.println("*---------------------------Intermediate operation without terminal operation---------------------------*");
appleList.stream().filter(str -> {
System.out.println("checking for = " + str);
return str.length() > 4;
});
System.out.println("*---------------------------Intermediate operation with terminal operation---------------------------*");
appleList.stream().filter(str -> {
System.out.println("checking for = " + str);
return str.length() > 4;
}).forEach(str -> {
System.out.println(str + " has more then 4 characters");
});
}
}
stateless and stateful operations
Intermediate
operations are further divided into stateful
and stateless
operations.
Stateless
operations such as filter
or map
don't need the state of the previous element of the Stream
while processing the current element, each element can be processed independently. Streams
processed sequentially can be processed in a single flow
package org.wesome.java8;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
class Apple {
public static void main(String args[]) {
List<String> appleList = Arrays.asList("Macintosh", "Fuji", "Gala", "Jonagold");
System.out.println("*------------Stateless Operations------------*");
appleList.stream().filter(str -> str.length() > 4).forEach(System.out::println);
appleList.stream().map(String::toUpperCase).forEach(System.out::println);
System.out.println("*------------Stateful Operations------------*");
appleList.stream().distinct().forEach(System.out::println);
appleList.stream().sorted(Comparator.naturalOrder()).forEach(System.out::println);
}
}
Stateful
operations such as distinct
or sort
require the state of previous elements while processing current. These operations need to process the entire Stream
input before producing the results, for example, sort
or distinct
cannot be computed unless the entire input is compared. Statefull
operations with Parallel
Stream needs to buffer some extra data, hence stateful performs slower in a multithreaded environment.
package org.wesome.java8;
import java.util.OptionalInt;
import java.util.stream.IntStream;
class Apple {
public static void main(String args[]) {
System.out.println("*---------------------------Sequential Stream---------------------------*");
long start = System.currentTimeMillis();
OptionalInt max = IntStream.rangeClosed(1, 50).max();
long end = System.currentTimeMillis();
long seqDuration = end - start;
max.ifPresent(x -> System.out.println("max is " + x + " sequential stream took = " + seqDuration));
System.out.println("*---------------------------Parallel Stream---------------------------*");
start = System.currentTimeMillis();
IntStream.rangeClosed(1, 50).parallel().max();
end = System.currentTimeMillis();
long parDuration = end - start;
max.ifPresent(x -> System.out.println("max is " + x + " sequential stream took = " + parDuration));
}
}
some Intermediate Operations
are as below.
- filter()
- map()
- flatMap()
- distinct()
- sorted()
- peek()
- limit()
- skip()
Terminal Operations
Terminal
operations mark the ending of the Streams
, it returns the result of operations. it usually returns a single value. Intermediate functions are by default lazy in nature and will only execute once terminal operation is in flow.
package org.wesome.java8;
import java.util.stream.IntStream;
class Apple {
public static void main(String args[]) {
System.out.println("*--------------Intermediate function without terminal function--------------*");
IntStream.rangeClosed(1, 5).filter((i) -> {
System.out.println("This method will never execute");
System.out.println("integer should be divisible by 2 = " + i);
return i % 2 == 0;
});
System.out.println("*--------------Intermediate function with terminal function--------------*");
IntStream.rangeClosed(1, 5).filter((i) -> {
System.out.println("integer should be divisible by 2 = " + i);
return i % 2 == 0;
}).forEach(System.out::println);
}
}
some Terminal Operations are as below.
- forEach()
- forEachOrdered()
- toArray()
- reduce()
- collect()
- min()
- max()
- count()
- anyMatch()
- allMatch()
- noneMatch()
- findFirst()
- findAny()