Что происходит 'под капотом' при использовании parallelStream()?
Sobes Copilot
Получайте ответы в реальном времени
Ответ от нейросети
sobes.tech AI
При вызове parallelStream() или .parallel() на стриме, операции над элементами стрима выполняются параллельно, используя пул потоков. По умолчанию используется ForkJoinPool.commonPool().
Происходит следующее:
-
Разделение (Splitting): Исходный источник данных (например,
Collection,Array) разбивается на несколько подмножеств (spliterator'ы).Spliterator— это объект, который может эффективно итерироваться по элементам источника и разбивать его на более мелкие части. -
Обработка (Processing): Каждое подмножество обрабатывается отдельно в одном из потоков
ForkJoinPool. Операции стрима (например,map,filter,reduce) применяются к элементам каждого подмножества параллельно. -
Объединение (Combining): Результаты обработки из разных потоков собираются и объединяются для получения финального результата. Этот шаг особенно важен для терминальных операций, которые агрегируют данные (например,
reduce,collect).
Fork/Join Framework:
ForkJoinPool основан на концепции Fork/Join:
- Fork: Задача разбивается на более мелкие подзадачи, которые могут выполняться параллельно.
- Join: Результаты выполнения подзадач объединяются для получения результата исходной задачи.
ForkJoinPool эффективно управляет потоками, минимизируя накладные расходы на создание/уничтожение потоков и переключение контекста. Он использует алгоритм work stealing, где незагруженные потоки забирают задачи у загруженных потоков, обеспечивая хорошую утилизацию ресурсов.
Важные аспекты:
- Не все стримы хорошо распараллеливаются. Эффективность зависит от источника данных (например,
ArrayListхорошо делится,LinkedList— плохо) и типа выполняемых операций (stateless операции распараллеливаются лучше, чем stateful). - Накладные расходы на распараллеливание могут перевесить выгоду для небольших объемов данных или простых операций.
- Результат выполнения параллельного стрима может быть недетерминированным для некоторых операций, если они не являются ассоциативными и некоммутативными (например, некоторые виды
reduce). - Порядок выполнения операций в параллельном стриме не гарантирован, если не используются упорядочивающие операции вроде
sorted()(но это может снизить эффективность распараллеливания).
Пример простого использования:
// Пример использования parallelStream для суммирования
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class ParallelStreamExample {
public static void main(String[] args) {
List<Integer> numbers = IntStream.rangeClosed(1, 1000000)
.boxed()
.collect(Collectors.toList());
long sum = numbers.parallelStream() // Создание параллельного стрима
.mapToLong(i -> i) // Маппинг к Long для суммирования
.sum(); // Терминальная операция суммирования
System.out.println("Parallel sum: " + sum);
}
}