- Глава 1: Основы потоков в OS и JVM (Уровень Junior)
- Глава 2: Проблемы общего состояния (Shared State)
- Глава 3: Классическая синхронизация (Old School)
- Глава 4: Атомики и CAS (Compare-And-Swap)
- Глава 5: Конкурентные коллекции (Middle)
- Глава 6: Эволюция управления задачами (Executors)
- Глава 7: Future и CompletableFuture
- Глава 8: Виртуальные потоки (Virtual Threads) — Революция Java 21
- Глава 9: Структурная конкурентность (Structured Concurrency)
- Глава 10: Scoped Values (JEP 446) — Убийца ThreadLocal
- Глава 11: Java Memory Model (JMM)
- Глава 12: Проблемы производительности
- Глава 13: Паттерны, Тестирование и Заключение
- Бонусная Глава 1: Продвинутые блокировки (Advanced Locking)
- Бонусная Глава 2: Инструменты синхронизации (Synchronizers)
- Бонусная Глава 3: Реактивное программирование vs Project Loom
- Бонусная Глава 4: Модель Акторов (Actor Model)
- Бонусная Глава 5: SIMD и Vector API (Project Panama)
- Бонусная Глава 6: LMAX Disruptor
- Бонусная Глава 7: Полезные утилиты (Hidden Gems)
В этой главе мы разберем физическую природу потоков, научимся их создавать правильным способом и поймем, как они живут и умирают внутри Java Virtual Machine (JVM).
Прежде чем писать код, важно понять иерархию:
- Процесс (Process): Это запущенный экземпляр программы (например, сама JVM).
- Имеет свое изолированное адресное пространство памяти. Один процесс не может просто так залезть в память другого.
- Тяжеловесный: его создание и переключение между процессами требует много ресурсов ОС.
- Поток (Thread): Это «единица исполнения» внутри процесса.
- Потоки одного процесса делят общую память (Heap / Куча). Это дает скорость обмена данными, но порождает проблемы синхронизации (Race Conditions).
- У каждого потока есть свой собственный Стек (Stack), где хранятся локальные переменные и цепочка вызовов методов.
- Легковеснее процессов, но в классической модели Java (до Project Loom) каждый Java-поток жестко привязан к потоку операционной системы (OS Thread).
В Java 21: Даже с появлением виртуальных потоков, классические потоки (Platform Threads) никуда не делись. Виртуальные потоки работают поверх них. Поэтому знание классической модели обязательно.
В Java есть два основных способа определить задачу для потока.
Этот способ считается устаревшим архитектурно, так как смешивает задачу (бизнес-логику) и механизм исполнения (поток). Плюс, в Java нет множественного наследования, и вы теряете возможность наследоваться от других классов.
Вы отделяете работу («что делать») от исполнителя («кто делает»).
Рассмотрим код с подробными комментариями:
package com.example.multithreading.basics;
public class ThreadCreationDemo {
public static void main(String[] args) {
// --- Вариант 1: Классическая реализация интерфейса Runnable ---
// Создаем объект задачи. Это просто инструкции, они еще не выполняются.
Runnable classicTask = new Runnable() {
@Override
public void run() {
// Thread.currentThread().getName() возвращает имя текущего потока
System.out.println("Привет из анонимного класса! Выполняет поток: "
+ Thread.currentThread().getName());
}
};
// --- Вариант 2: Использование Lambda (Java 8+) ---
// Это более современный и лаконичный способ записи Runnable
Runnable lambdaTask = () -> {
System.out.println("Привет из Лямбды! Выполняет поток: "
+ Thread.currentThread().getName());
try {
// Имитация тяжелой работы (сон на 1 секунду)
Thread.sleep(1000);
} catch (InterruptedException e) {
// Обработка прерывания (обсудим в следующих главах)
e.printStackTrace();
}
System.out.println("Лямбда завершила работу.");
};
// --- Создание и запуск потоков ---
// 1. Создаем объект потока и передаем ему задачу (Runnable)
Thread thread1 = new Thread(classicTask);
// Задаем читаемое имя потоку (полезно при отладке!)
thread1.setName("Worker-1");
// 2. Запуск потока
// ВАЖНО: Мы вызываем start(), а НЕ run()!
// start() командует JVM создать новый системный поток и вызвать в нем метод run().
thread1.start();
// Если вызовем run() напрямую, код выполнится в текущем потоке (main),
// никакой многопоточности не будет. Это частая ошибка новичков.
// lambdaTask.run(); // <-- ОШИБКА: выполнится синхронно в main
Thread thread2 = new Thread(lambdaTask, "Worker-2");
thread2.start();
System.out.println("Метод main завершен. Выполняет: "
+ Thread.currentThread().getName());
}
}Поток не просто «работает» или «не работает». У него есть четкие состояния (Enum Thread.State). Понимание этого критично для диагностики зависаний (deadlocks) и тормозов.
- NEW: Поток создан (
new Thread()), ноstart()еще не вызван. - RUNNABLE: Поток запущен. Внимание: это не значит, что он прямо сейчас грузит процессор. Это значит, что он готов работать, но планировщик ОС может прямо сейчас выделять время другому потоку. В Java нет состояния
RUNNING, оно объединено сRUNNABLE. - BLOCKED: Поток хочет войти в
synchronizedблок, но монитор занят другим потоком. Он «висит» и ждет освобождения блокировки. - WAITING: Поток ждет другого потока бесконечно долго (методы
wait(),join()). Ему нужен «пинок» (notify()), чтобы проснуться. - TIMED_WAITING: То же, что WAITING, но с тайм-аутом (например,
Thread.sleep(1000)). Он проснется сам по времени или по сигналу. - TERMINATED: Метод
run()завершился (успешно или с исключением). Поток умер, перезапустить его нельзя.
public class ThreadStateDemo {
public static void main(String[] args) throws InterruptedException {
Thread thread = new Thread(() -> {
try {
// Поток перейдет в TIMED_WAITING
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
// Состояние: NEW (объект есть, запуска нет)
System.out.println("1. Состояние: " + thread.getState());
thread.start();
// Скорее всего RUNNABLE, но может уже успеть перейти в TIMED_WAITING
// (зависит от скорости ОС)
System.out.println("2. Состояние: " + thread.getState());
// Даем потоку время точно уснуть
Thread.sleep(100);
// Состояние: TIMED_WAITING (спит внутри run)
System.out.println("3. Состояние: " + thread.getState());
// Ждем завершения потока
thread.join();
// Состояние: TERMINATED (отработал)
System.out.println("4. Состояние: " + thread.getState());
}
}Все потоки делятся на два типа:
- User Threads (Пользовательские) — по умолчанию.
- Daemon Threads (Служебные).
Главное правило: JVM работает до тех пор, пока жив хотя бы один User Thread. Как только все User Threads завершились, JVM принудительно убивает все оставшиеся Daemon Threads и выключается.
Примеры демонов: Сборщик мусора (Garbage Collector), потоки отправки метрик.
public class DaemonDemo {
public static void main(String[] args) {
Thread daemonThread = new Thread(() -> {
while (true) {
try {
System.out.println("Демон работает...");
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
// Помечаем поток как демон ОБЯЗАТЕЛЬНО ДО вызова start()
daemonThread.setDaemon(true);
daemonThread.start();
try {
// Главный поток (User Thread) работает 2 секунды
Thread.sleep(2000);
System.out.println("Main поток завершает работу.");
} catch (InterruptedException e) {
e.printStackTrace();
}
// Как только main выйдет из этого метода:
// 1. User потоков больше нет.
// 2. JVM видит, что остался только daemonThread.
// 3. JVM убивает daemonThread мгновенно (блок finally может не выполниться!) и завершается.
}
}Мы заложили первый кирпич:
- Поток — это исполнитель инструкций, который делит память с другими потоками.
- Создаем потоки через реализацию
Runnable. - Запускаем через
start(), а неrun(). - Поток имеет жизненный цикл, и мы можем его отслеживать.
- Если поток нужен только для фона и не должен держать приложение включенным — делаем его демоном.
В этой главе мы разберем, что происходит, когда два потока пытаются одновременно изменить одну и ту же переменную. Мы увидим, как безобидная операция i++ превращается в источник трудноуловимых багов.
В Java потоки одного процесса делят общую память (Heap). Если у вас есть один объект, и два потока имеют на него ссылку, они могут одновременно читать и писать поля этого объекта.
- Shared (Общее): Доступно нескольким потокам.
- Mutable (Изменяемое): Значение может меняться после создания.
Золотое правило: Если переменная Shared и Mutable, вам ОБЯЗАТЕЛЬНО нужна синхронизация. Если переменная неизменяемая (Immutable), синхронизация не нужна.
Почему простая операция инкремента count++ опасна? Потому что для процессора это не одна инструкция. Это операция Read-Modify-Write.
На уровне байт-кода (и ассемблера) это выглядит так:
- READ: Прочитать текущее значение
countиз памяти в регистр процессора. - MODIFY: Прибавить 1 к значению в регистре.
- WRITE: Записать новое значение из регистра обратно в память.
Это действие не атомарно (неделимо). Поток может быть прерван (переключен планировщиком ОС) на любом из этих шагов.
Race Condition возникает, когда корректность работы программы зависит от того, в каком порядке планировщик потоков (Thread Scheduler) будет переключать потоки.
Рассмотрим классический пример «Потерянного обновления» (Lost Update).
Запустите этот код у себя. Вы почти никогда не получите ожидаемые 20 000.
package com.example.multithreading.racecondition;
public class RaceConditionDemo {
// Общий ресурс. Простое поле int.
private int count = 0;
// Метод, который будет вызываться разными потоками.
// Цель: увеличить счетчик на 1.
public void increment() {
// ЭТА СТРОКА ОПАСНА!
// Это не атомарная операция.
count++;
}
public int getCount() {
return count;
}
public static void main(String[] args) throws InterruptedException {
RaceConditionDemo counter = new RaceConditionDemo();
// Задача: выполнить инкремент 10 000 раз
Runnable task = () -> {
for (int i = 0; i < 10_000; i++) {
counter.increment();
}
};
// Создаем два потока, которые используют ОДИН И ТОТ ЖЕ объект counter
Thread thread1 = new Thread(task);
Thread thread2 = new Thread(task);
thread1.start();
thread2.start();
// Ждем завершения обоих потоков
thread1.join();
thread2.join();
// Ожидаем: 20 000
// Реальность: Случайное число, например 15 432 или 19 998
System.out.println("Финальное значение count: " + counter.getCount());
}
}Представим, что count = 10.
- Поток А читает
count(10) в свой регистр. - Планировщик останавливает Поток А.
- Поток Б читает
count(всё еще 10, так как А не успел записать) в свой регистр. - Поток Б прибавляет 1 (получает 11).
- Поток Б записывает 11 в память
count. - Планировщик возвращает управление Потоку А.
- Поток А «просыпается» со своим значением в регистре (10).
- Поток А прибавляет 1 (получает 11).
- Поток А записывает 11 в память
count.
Итог: Два потока сделали increment(), но значение увеличилось только на 1 (с 10 до 11). Обновление от Потока Б было перезаписано и потеряно.
То место в коде, где происходит доступ к общему изменяемому ресурсу, называется Критической секцией.
В нашем примере критическая секция — это строка count++.
Чтобы исправить Race Condition, мы должны обеспечить Взаимное Исключение (Mutual Exclusion / Mutex). Это значит, что в критической секции одновременно может находиться только один поток.
Может показаться, что новые Виртуальные потоки (Virtual Threads) в Java 21 решат эту проблему "магически". Это не так.
Виртуальные потоки тоже подвержены Race Conditions. Если миллион виртуальных потоков попытаются сделать count++ одной общей переменной без синхронизации, результат будет неверным. Законы многопоточности едины для всех типов потоков.
- Shared Mutable State — корень всех зол в многопоточности.
- Операции вроде
++,+=,check-then-act(если x == null, то создать x) — не атомарны. - Без защиты возникает Race Condition, и данные портятся непредсказуемым образом.
- Участок кода с обращением к общим данным называется Критической секцией.
Мы выяснили, что count++ — это три операции, и если их разорвать, данные портятся. Нам нужно сделать эту группу операций атомарной (неделимой) и обеспечить видимость изменений для всех потоков.
В Java каждый объект (абсолютно каждый, наследник Object) имеет встроенную сущность, называемую Монитором (или Intrinsic Lock).
Представьте, что объект — это комната.
- У комнаты есть один ключ.
- Ключ называется Монитор.
- Ключевое слово
synchronized— это инструкция: «Возьми ключ, зайди в комнату, запри дверь, сделай работу, выйди и положи ключ на место».
Пока один поток держит ключ (находится внутри synchronized блока), другие потоки, желающие попасть в эту же комнату (к этому же монитору), переходят в состояние BLOCKED и ждут у двери.
Существует два основных способа применения:
Самый простой способ. Блокировка берется на объект this (текущий экземпляр класса).
public synchronized void increment() {
count++;
}Более гибкий способ. Позволяет взять блокировку на любой объект и уменьшить размер критической секции (блокировать не весь метод, а только опасную часть).
public void increment() {
// Какой-то безопасный код...
synchronized (this) { // Явно указываем, чей монитор берем
count++;
}
// Какой-то безопасный код...
}Давайте исправим код из Главы 2, используя монитор.
package com.example.multithreading.sync;
public class SynchronizedDemo {
private int count = 0;
// Специальный объект для блокировки (best practice).
// Почему лучше использовать отдельный объект, а не 'this'?
// Чтобы никто снаружи класса не мог случайно заблокироваться на 'this'
// и вызвать Deadlock или зависание вашей логики.
private final Object lock = new Object();
public void increment() {
// Начало критической секции
synchronized (lock) {
// 1. Поток захватывает монитор объекта 'lock'.
// 2. Если монитор занят, поток засыпает (BLOCKED) и ждет.
// 3. Если свободен — входит.
count++;
// 4. При выходе из блока (даже при Exception) монитор автоматически освобождается.
}
// Конец критической секции
}
public int getCount() {
// ВАЖНО: Чтение тоже должно быть синхронизировано!
// Иначе мы можем прочитать старое значение (проблема visibility),
// пока другой поток пишет новое.
synchronized (lock) {
return count;
}
}
public static void main(String[] args) throws InterruptedException {
SynchronizedDemo counter = new SynchronizedDemo();
Runnable task = () -> {
for (int i = 0; i < 10_000; i++) {
counter.increment();
}
};
Thread t1 = new Thread(task);
Thread t2 = new Thread(task);
t1.start();
t2.start();
t1.join();
t2.join();
// Теперь результат гарантированно 20 000
System.out.println("Count: " + counter.getCount());
}
}Помимо атомарности, есть проблема видимости.
Современные процессоры имеют кеши (L1, L2, L3). Чтобы работать быстро, поток копирует переменные из основной памяти (RAM) в кеш процессора. Если Поток А изменил переменную в своем кеше, Поток Б (на другом ядре) может этого не увидеть, так как он читает из своего кеша или из RAM, куда данные еще не дошли.
Обеспечивает и атомарность, и видимость. При выходе из блока synchronized все изменения принудительно сбрасываются (flushed) в основную память.
Это «облегченная» синхронизация. Она гарантирует только видимость, но не атомарность.
Ключевое слово volatile говорит JVM и процессору: «Не кешируй эту переменную! Всегда читай и пиши её сразу в RAM».
Когда использовать volatile?
Только когда у вас один поток пишет, а другие читают, и операция записи атомарна сама по себе (например, установка флага boolean).
Пример (Остановка потока флагом):
public class VolatileFlagDemo {
// Без volatile поток server может бесконечно крутиться в цикле,
// так как закеширует значение running = true и не увидит изменений.
private volatile boolean running = true;
public void runServer() {
while (running) {
// Делаем работу...
}
System.out.println("Сервер остановлен.");
}
public void stop() {
running = false; // Запись сразу видна всем потокам
}
}Важно:
volatileНЕ спасет в ситуацииcount++, так как там проблема в отсутствии атомарности, а не только видимости.
Использование synchronized имеет свою цену. Самая страшная ошибка — Deadlock.
Это ситуация, когда два потока ждут друг друга вечно.
- Поток 1 держит Замок А и хочет Замок Б.
- Поток 2 держит Замок Б и хочет Замок А.
public class DeadlockDemo {
private final Object lock1 = new Object();
private final Object lock2 = new Object();
public void method1() {
synchronized (lock1) {
System.out.println("Thread-1: Hold lock1...");
try { Thread.sleep(100); } catch (Exception e) {}
System.out.println("Thread-1: Waiting for lock2...");
synchronized (lock2) { // Ждет lock2
System.out.println("Thread-1: Hold lock1 & lock2");
}
}
}
public void method2() {
synchronized (lock2) {
System.out.println("Thread-2: Hold lock2...");
try { Thread.sleep(100); } catch (Exception e) {}
System.out.println("Thread-2: Waiting for lock1...");
synchronized (lock1) { // Ждет lock1
System.out.println("Thread-2: Hold lock2 & lock1");
}
}
}
// Если запустить method1 и method2 в разных потоках,
// программа зависнет навсегда.
}Как избежать Deadlock?
Всегда захватывайте ресурсы в одном и том же порядке. Если все потоки будут брать сначала lock1, а потом lock2, взаимной блокировки не случится.
- Монитор есть у каждого объекта.
synchronizedобеспечивает взаимное исключение (только один поток внутри).synchronizedрешает проблемы атомарности и видимости.volatileрешает только проблему видимости (отключает кеширование процессора для переменной).- Неправильный порядок блокировок ведет к Deadlock.
В пакете java.util.concurrent.atomic лежат классы, которые позволяют выполнять операции над общими переменными без использования блокировок и мониторов. Это делает их невероятно быстрыми и исключает возможность Deadlock.
В основе всех атомиков лежит аппаратная инструкция процессора, которая называется CAS.
Метод CAS принимает три аргумента: CAS(V, A, B)
- V (Value): Адрес ячейки памяти, которую хотим изменить.
- A (Expected): Значение, которое мы ожидаем там увидеть.
- B (New): Новое значение, которое хотим записать.
Логика работы:
- «Эй, процессор, посмотри на ячейку V. Если там сейчас лежит значение A, то запиши туда B и верни
true(успех). - Если же там лежит что-то другое (не A), то ничего не трогай и верни
false(провал)».
Самое важное: эта проверка и запись выполняются процессором как одна неделимая (атомарная) операция.
Давайте перепишем наш счетчик, используя AtomicInteger. Нам больше не нужны synchronized методы.
package com.example.multithreading.atomics;
import java.util.concurrent.atomic.AtomicInteger;
public class AtomicDemo {
// Вместо int count = 0;
// Используем потокобезопасную обертку
private final AtomicInteger count = new AtomicInteger(0);
public void increment() {
// ОДНА СТРОКА вместо synchronized блока.
// Эквивалент count++
count.incrementAndGet();
}
public int getCount() {
// Обычный get() у атомиков обладает свойствами volatile
// (гарантирует видимость)
return count.get();
}
public static void main(String[] args) throws InterruptedException {
AtomicDemo counter = new AtomicDemo();
Runnable task = () -> {
for (int i = 0; i < 10_000; i++) {
counter.increment();
}
};
Thread t1 = new Thread(task);
Thread t2 = new Thread(task);
t1.start();
t2.start();
t1.join();
t2.join();
// Гарантированно 20 000. Работает быстрее, чем synchronized.
System.out.println("Result: " + counter.getCount());
}
}В Java (до недавних оптимизаций) метод incrementAndGet() работал по принципу цикла повторений. Это отличный пример того, как использовать CAS.
Вот псевдокод, объясняющий, что происходит внутри AtomicInteger:
public int incrementAndGet() {
int currentValue;
int newValue;
// Цикл "do-while" (Spinning)
do {
// 1. Читаем текущее значение (например, 10)
currentValue = get();
// 2. Вычисляем новое значение локально (10 + 1 = 11)
newValue = currentValue + 1;
// 3. Пытаемся записать через CAS:
// "Если в памяти все еще 10, запиши 11".
// Если вернулось true -> успех, выходим из цикла.
// Если вернулось false -> значит, другой поток уже успел изменить значение
// (там уже не 10, а 12, например). Мы идем на новый круг цикла,
// снова читаем актуальное значение и пробуем снова.
} while (!compareAndSet(currentValue, newValue));
return newValue;
}Этот подход называется Lock-Free. Поток никогда не засыпает (не уходит в состояние BLOCKED или WAITING). Если ему не везет, он крутится в цикле (жжет процессор), пытаясь пропихнуть свое значение.
Представьте ситуацию: высокая конкуренция (High Contention).
У вас 1000 потоков одновременно долбят AtomicInteger.
- Все 1000 потоков читают
0. - Все вычисляют
1. - Один поток делает CAS успешно (меняет 0 на 1).
- 999 потоков получают отказ (CAS fail).
- Эти 999 потоков идут на второй круг цикла, снова читают, снова пытаются...
- Процессор захлебывается, производительность падает из-за постоянных неудач и конфликтов кешей.
Для таких случаев в Java 8 появился LongAdder.
Вместо одной переменной он создает массив переменных.
Каждый поток пишет в свою собственную ячейку массива (или ячейку с малым количеством конкурентов).
Когда нам нужен результат (sum()), LongAdder пробегается по массиву и суммирует все ячейки.
import java.util.concurrent.atomic.LongAdder;
public class LongAdderDemo {
// Используем вместо AtomicLong при очень высокой нагрузке
private final LongAdder adder = new LongAdder();
public void increment() {
adder.increment(); // Очень быстро, почти нет конфликтов
}
public long getSum() {
// Суммирование происходит только в момент чтения
return adder.sum();
}
}Правило выбора:
- Нужен точный счетчик в любой момент времени? ->
AtomicLong.- Огромное кол-во записей, а чтение редкое (статистика, метрики)? ->
LongAdder.
Атомарно можно обновлять не только примитивы, но и ссылки на объекты.
import java.util.concurrent.atomic.AtomicReference;
public class AtomicRefDemo {
static class User {
String name;
int age;
// immutable конструктор...
}
private final AtomicReference<User> userRef = new AtomicReference<>(new User("Alice", 25));
public void birthday() {
User oldUser;
User newUser;
do {
oldUser = userRef.get(); // Читаем текущего
// Создаем НОВЫЙ объект с изменениями (Immutability pattern)
newUser = new User(oldUser.name, oldUser.age + 1);
// Пытаемся подменить ссылку
} while (!userRef.compareAndSet(oldUser, newUser));
}
}Это основа для создания неблокирующих структур данных (например, ConcurrentStack, где мы через CAS меняем верхушку стека Head).
- CAS (Compare-And-Swap) — это «сердце» неблокирующей синхронизации.
- Атомики (
AtomicIntegerи др.) работают быстрееsynchronizedна простых операциях, так как не переключают контекст потока (не усыпляют его). - Они используют Spin-loop (цикл ожидания): если операция не удалась, поток пробует снова немедленно.
- При очень высокой нагрузке атомики могут тормозить из-за постоянных ретраев. В таких случаях для счетчиков используйте
LongAdder. - Мы избавились от
synchronized, но теперь наш код стал сложнее для понимания (особенно если писать сложные CAS-алгоритмы вручную).
Главная ошибка новичка — взять обычный ArrayList или HashMap и использовать их в многопоточной среде.
Результат предсказуем:
- ConcurrentModificationException — если один поток читает, а другой меняет структуру.
- Бесконечный цикл — (в старых версиях Java) при перестройке
HashMapво время записи. - Потеря данных — два потока пишут в одну ячейку.
Раньше (до Java 5) мы использовали Vector, Hashtable или Collections.synchronizedMap().
В чем их проблема? Они используют глобальную блокировку. Чтобы прочитать данные, нужно заблокировать всю коллекцию. Если один поток пишет, остальные не могут даже читать. Это «бутылочное горлышко» производительности.
Пакет java.util.concurrent (JUC) предлагает умные коллекции.
Это, пожалуй, самый используемый класс в многопоточности.
В отличие от Hashtable, который блокирует всё, ConcurrentHashMap блокирует только тот сегмент (bucket), который сейчас меняется.
- Чтение (
get): Вообще без блокировок (Lock-Free)! Используетсяvolatileсемантика для чтения узлов. Это невероятно быстро. - Запись (
put): Блокируется только конкретная корзина (первый узел связного списка или дерева), куда попадает ключ. Если вы пишете в разные корзины, блокировок друг друга не происходит. Используется CAS +synchronizedна уровне узла.
ConcurrentHashMap предоставляет методы, которые выполняют проверку и действие атомарно.
package com.example.multithreading.collections;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class ConcurrentMapDemo {
public static void main(String[] args) throws InterruptedException {
// Обычный HashMap здесь сломался бы или потерял данные
ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
ExecutorService es = Executors.newFixedThreadPool(10);
// Задача: Подсчитать количество слов (Word Count) в многопоточном режиме
Runnable task = () -> {
for (int i = 0; i < 1000; i++) {
String key = "Java";
// --- ПЛОХОЙ ПОДХОД (Race Condition) ---
// Integer val = map.get(key);
// if (val == null) map.put(key, 1);
// else map.put(key, val + 1);
// Это не атомарно! Между get и put может вклиниться другой поток.
// --- ХОРОШИЙ ПОДХОД (Java 8 Style) ---
// merge выполняет атомарно:
// "Если ключа нет - вставь 1. Если есть - возьми старое, добавь 1 и обнови".
map.merge(key, 1, (oldValue, newValue) -> oldValue + newValue);
}
};
for (int i = 0; i < 10; i++) {
es.submit(task);
}
es.shutdown();
es.awaitTermination(1, TimeUnit.SECONDS);
System.out.println("Result: " + map.get("Java")); // Ожидаем 10000
}
}Иногда нам нужно читать список очень часто, а менять редко. Например, список подписчиков (Listeners) на событие.
Принцип работы:
При любой операции модификации (add, set, remove) создается новая копия внутреннего массива.
- Поток-писатель копирует массив, вносит изменения в копию.
- Атомарно подменяет ссылку на массив.
- Потоки-читатели продолжают работать со старой версией массива, пока ссылка не обновится.
Плюсы: Чтение мгновенное, без блокировок, никогда не вылетает ConcurrentModificationException (итератор бежит по "снимку" массива).
Минусы: Запись очень дорогая (копирование памяти).
List<String> list = new CopyOnWriteArrayList<>();
list.add("One"); // Создаст массив длиной 1
list.add("Two"); // Создаст массив длиной 2, скопирует "One"
// Идеально для итераций в UI или Event Listeners
for (String s : list) {
System.out.println(s);
}Это фундамент для построения архитектуры Producer-Consumer (Производитель-Потребитель).
Вместо того чтобы использовать wait/notify для ожидания данных, мы используем очередь, которая сама "усыпляет" поток, если очередь пуста (для читателя) или переполнена (для писателя).
Популярные реализации:
ArrayBlockingQueue: Очередь фиксированного размера на массиве. Честная (FIFO). Экономит память, но имеет один лок на чтение и запись.LinkedBlockingQueue: Очередь на связном списке. Может быть безлимитной (опасно для памяти!). Имеет два лока (отдельно на take, отдельно на put), что повышает пропускную способность.
Этот паттерн идеально ложится на Virtual Threads в Java 21, так как блокировка put или take дешевая для виртуальных потоков.
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class ProducerConsumerDemo {
public static void main(String[] args) {
// Очередь вмещает только 5 элементов.
// Если Producer попытается положить 6-й, он заблокируется.
BlockingQueue<String> queue = new ArrayBlockingQueue<>(5);
// --- PRODUCER (Пекарь) ---
Thread producer = new Thread(() -> {
try {
for (int i = 0; i < 20; i++) {
String item = "Будка #" + i;
// put() - блокирующий метод! Ждет места в очереди.
queue.put(item);
System.out.println("Испекли: " + item);
Thread.sleep(100); // Печет быстро
}
queue.put("POISON_PILL"); // Сигнал окончания
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
// --- CONSUMER (Едок) ---
Thread consumer = new Thread(() -> {
try {
while (true) {
// take() - блокирующий метод! Ждет появления элемента.
String item = queue.take();
if (item.equals("POISON_PILL")) break;
System.out.println(" Съели: " + item);
Thread.sleep(500); // Ест медленно
}
System.out.println("Едок наелся.");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
producer.start();
consumer.start();
}
}Что произойдет в примере?
Так как Едок медленный (500мс), а Пекарь быстрый (100мс), очередь быстро заполнится (5 булок). После этого Пекарь перейдет в состояние WAITING на методе queue.put(), пока Едок не освободит место методом queue.take().
Вам не нужно писать ни строчки кода синхронизации!
ConcurrentLinkedQueue: Неблокирующая очередь (на CAS). Используйте, если не нужно ждать (блокироваться), а просто нужна потокобезопасная вставка/удаление.SynchronousQueue: Очередь без емкости!putблокируется, пока другой поток не сделаетtake. Используется для прямой передачи задач ("из рук в руки") в пулах потоков (CachedThreadPool).
- Забудьте про
HashtableиVector. ConcurrentHashMap— ваш лучший друг для хранения данных. Чтение не блокируется, запись блокирует только часть карты.- Используйте
putIfAbsentилиcomputeдля атомарных обновлений карты. CopyOnWriteArrayList— для списков, которые часто читают и редко меняют.BlockingQueue— лучший способ организовать передачу задач между потоками без ручного управленияwait/notify.
Вместо того чтобы нанимать фрилансера на каждую микро-задачу (создавать новый поток), мы нанимаем постоянную команду работников (Пул потоков / Thread Pool) и просто кидаем им задачи в ящик «Входящие».
Это основной интерфейс для работы с пулами. Он отделяет постановку задачи от её исполнения.
// Мы не знаем, как и где это выполнится. Мы просто отдаем задачу.
executorService.submit(() -> {
System.out.println("Work!");
});Архитектура проста:
- Очередь задач (Task Queue): Обычно это
BlockingQueue, о которой мы говорили в Главе 5. - Рабочие потоки (Worker Threads): Вечный цикл, который берет задачу из очереди, выполняет
run(), и возвращается за следующей.
Класс-фабрика Executors предоставляет готовые конфигурации.
ExecutorService pool = Executors.newFixedThreadPool(10);- Как работает: Создает ровно 10 потоков. Если задач больше, они копятся в безлимитной очереди.
- Где использовать: Основная "рабочая лошадка". Стабильная нагрузка, предсказуемое потребление ресурсов.
ExecutorService pool = Executors.newCachedThreadPool();- Как работает: Потоков изначально 0. Пришла задача — создаем поток. Если поток простаивает 60 секунд — убиваем его. Очередь не используется (точнее
SynchronousQueue), задачи передаются сразу. - Опасность: Если придет 10 000 задач одновременно, он создаст 10 000 потоков и уронит сервер.
- Где использовать: Для множества мелких кратковременных задач при невысокой интенсивности.
ExecutorService pool = Executors.newSingleThreadExecutor();- Как работает: 1 поток + безлимитная очередь.
- Где использовать: Когда нужно гарантировать, что задачи выполняются последовательно (одна за другой), но не в главном потоке (например, запись логов в файл, работа с не потокобезопасным ресурсом).
Самая частая ошибка новичков — забыть остановить пул. Программа на Java не завершится, пока жив хоть один рабочий поток пула (если они не демоны).
package com.example.multithreading.executors;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class ThreadPoolDemo {
public static void main(String[] args) {
// Создаем пул из 4 работников (по количеству ядер CPU - хорошая практика)
int cores = Runtime.getRuntime().availableProcessors();
ExecutorService executor = Executors.newFixedThreadPool(cores);
System.out.println("Пул запущен с " + cores + " потоками.");
// Отправляем 10 задач
for (int i = 0; i < 10; i++) {
int taskId = i;
executor.submit(() -> {
String threadName = Thread.currentThread().getName();
System.out.println("Задача " + taskId + " выполняется в " + threadName);
try {
Thread.sleep(200); // Имитация работы
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
// --- ЭТАП ЗАВЕРШЕНИЯ (Graceful Shutdown) ---
// 1. Перестаем принимать НОВЫЕ задачи.
// Задачи, которые уже в очереди, будут доделаны.
executor.shutdown();
try {
// 2. Ждем, пока все задачи завершатся (например, 1 секунду)
if (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
// 3. Если не успели - принудительно прерываем (interrupt) всех
System.out.println("Время вышло! Принудительная остановка...");
executor.shutdownNow();
}
} catch (InterruptedException e) {
executor.shutdownNow();
}
System.out.println("Пул остановлен.");
}
}Замена старому и кривому java.util.Timer. Позволяет запускать задачи по расписанию.
Два главных метода, которые часто путают на собеседованиях:
scheduleAtFixedRate(Строго по частоте)
- Задача: запускать каждые 5 секунд.
- Старт: 12:00:00. Исполнение заняло 2 сек.
- След. старт: 12:00:05 (независимо от того, сколько длилась задача).
- Риск: Если задачи длятся дольше интервала (7 сек), очередь переполнится, потоки захлебнутся.
scheduleWithFixedDelay(С фиксированной паузой)
- Задача: запускать с паузой 5 секунд после завершения.
- Старт: 12:00:00. Исполнение заняло 2 сек (закончили в 12:00:02).
- След. старт: 12:00:07 (12:00:02 + 5 сек).
- Это безопаснее для системы.
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
scheduler.scheduleWithFixedDelay(
() -> System.out.println("Пинг сервера..."),
0, 5, TimeUnit.SECONDS // Начальная задержка 0, период 5 сек
);В Java 7 появился специальный пул ForkJoinPool для рекурсивных задач (разделяй и властвуй).
Именно он используется внутри parallelStream() и CompletableFuture (по умолчанию).
Ключевая фишка — Work-Stealing (Кража работы): В обычных пулах одна общая очередь. В ForkJoin у каждого потока своя очередь (Deque). Если Поток А разгреб свою очередь, а Поток Б завален работой, Поток А может украсть задачу с конца очереди Потока Б.
Это позволяет максимально загружать процессор.
Важно для Java 21: Виртуальные потоки (Virtual Threads) работают поверх
ForkJoinPool. Они ведут себя как задачи, которые "скачут" по потокам-носителям (Carrier Threads). Поэтому понимание концепции пулов все еще актуально, хотя с Loom мы редко будем создавать их вручную.
- **Никогда не используйте
new Thread()**в продакшене. ИспользуйтеExecutorService. FixedThreadPool— безопасный выбор для большинства задач.CachedThreadPool— опасно при неконтролируемом потоке задач.- Всегда корректно закрывайте пул через
shutdown()иawaitTermination(). - Для периодических задач используйте
ScheduledExecutorService, а неTimer.
Здесь мы рассмотрим эволюцию получения результатов: от блокирующего Future (Java 5) к реактивному и функциональному CompletableFuture (Java 8).
Чтобы поток мог вернуть значение, в Java 5 появился интерфейс Callable<V>.
public interface Callable<V> {
V call() throws Exception; // Возвращает V и может бросить Checked Exception!
}Когда мы отправляем Callable в экзекьютор, он мгновенно возвращает нам объект Future («Будущее»).
Future — это как талончик в гардеробе. Куртки (результата) у вас пока нет, но по талончику вы сможете получить её позже, когда она будет готова.
package com.example.async;
import java.util.concurrent.*;
public class FutureDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService pool = Executors.newFixedThreadPool(2);
// 1. Создаем Callable вместо Runnable
Callable<Integer> task = () -> {
System.out.println("Тяжелые вычисления...");
Thread.sleep(2000); // Имитация работы
return 42;
};
// 2. Отправляем задачу. Метод не блокирует выполнение main.
// Мы сразу получаем объект Future.
Future<Integer> future = pool.submit(task);
System.out.println("Задача отправлена. Делаем другие дела в main...");
// ... какая-то другая работа ...
// 3. Получаем результат
// ВНИМАНИЕ: Метод get() БЛОКИРУЮЩИЙ!
// Если задача еще не готова, поток main "заснет" на этой строчке.
Integer result = future.get();
System.out.println("Результат получен: " + result);
pool.shutdown();
}
}Проблемы классического Future:
- Блокировка: Вызов
get()превращает асинхронность обратно в синхронность. Поток ждет и не делает полезной работы. - Нет цепочек: Нельзя сказать: «Когда результат будет готов, отправь его в базу данных, а потом отправь письмо». Приходится писать «лапшу» из
get()и проверок. - Сложная обработка ошибок: Исключения заворачиваются в
ExecutionException.
CompletableFuture решает все эти проблемы. Это Promise (обещание) в мире Java. Он позволяет строить конвейеры (pipelines) обработки данных, не блокируя потоки.
supplyAsync()— запустить задачу асинхронно (в ForkJoinPool по умолчанию).thenApply()— преобразовать результат (аналогmapв Stream API).thenAccept()— потребить результат (аналогforEach).exceptionally()— обработать ошибку, если она случилась на любом этапе.
Представьте задачу:
- Скачать JSON с сайта (долго).
- Распарсить его (быстро).
- Сохранить в БД (долго).
С Future нам пришлось бы три раза вызывать get(). С CompletableFuture мы описываем сценарий:
package com.example.async;
import java.util.concurrent.CompletableFuture;
public class CompletableDemo {
public static void main(String[] args) {
System.out.println("Main: Start");
// Цепочка выполняется в пуле потоков (ForkJoinPool.commonPool)
CompletableFuture.supplyAsync(() -> {
sleep(1000);
System.out.println("1. Скачиваем данные (" + Thread.currentThread().getName() + ")");
return "Raw JSON Data";
})
.thenApply(data -> {
// Этот этап выполнится ТОЛЬКО когда предыдущий завершится успешно
System.out.println("2. Парсим данные (" + Thread.currentThread().getName() + ")");
return data.length(); // Преобразуем String -> Integer
})
.thenAccept(length -> {
// Потребляем результат
System.out.println("3. Сохраняем длину " + length + " в БД");
})
.exceptionally(ex -> {
// Если ГДЕ УГОДНО выше вылетит исключение, мы попадем сюда
System.err.println("ОШИБКА: " + ex.getMessage());
return null;
});
System.out.println("Main: Я не ждал и пошел дальше!");
// CompletableFuture работают демонами по умолчанию, поэтому
// main не должен завершиться раньше времени для теста.
sleep(2000);
}
private static void sleep(int millis) {
try { Thread.sleep(millis); } catch (InterruptedException e) { }
}
}Обратите внимание: в main нет ни одного блокирующего вызова get()!
Частая задача: запустить две независимые задачи параллельно, дождаться обеих и сложить результаты.
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> getUsersCount());
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> getOrdersCount());
// Ждем оба результата и объединяем их
future1.thenCombine(future2, (users, orders) -> {
return "Статистика: " + users + " юзеров, " + orders + " заказов.";
})
.thenAccept(System.out::println);Если нужно запустить 100 запросов параллельно и дождаться всех:
List<CompletableFuture<String>> futures = ...; // Список из 100 задач
// Создаем один "Супер-Future", который завершится, когда завершатся ВСЕ
CompletableFuture<Void> allDone = CompletableFuture.allOf(
futures.toArray(new CompletableFuture[0])
);
// Блокируемся один раз в самом конце (или вешаем callback)
allDone.thenRun(() -> System.out.println("Все 100 запросов готовы!"));В CompletableFuture есть метод join().
get()— бросает проверяемоеExecutionException(нужен try-catch).join()— бросает непроверяемоеCompletionException(удобнее в лямбдах и стримах).
Оба метода блокирующие. В идеальном мире асинхронности вы используете их только один раз — в самом конце программы или веб-контроллера.
Callable— это какRunnable, но возвращает результат.Future— контейнер для будущего результата. Методget()блокирует поток — это дорого для классических потоков (OS Threads).CompletableFutureпозволяет строить цепочки действий (thenApply,thenCompose), обрабатывать ошибки (exceptionally) и комбинировать результаты (thenCombine) без блокировок.- Это был вершина развития асинхронности в Java... до выхода Java 21.
До Java 21 у нас была дилемма:
- Thread-per-Request: Писать простой, блокирующий код (один запрос — один поток).
- Плюс: Легко читать, легко отлаживать, понятный Stacktrace.
- Минус: Не масштабируется. Потоки ОС тяжелые (занимают ~1-2 МБ памяти). Вы не сможете запустить 100 000 потоков — сервер упадет с
OutOfMemoryError.
- Asynchronous (NIO / Reactor / CompletableFuture): Писать неблокирующий код.
- Плюс: Огромная масштабируемость на малом числе потоков.
- Минус: Сложный код ("Callback Hell"), разорванные Stacktrace, трудная отладка.
Project Loom (Java 21) дает нам «Святой Грааль»: вы пишете простой блокирующий код (как в п.1), а JVM исполняет его так же эффективно, как асинхронный (как в п.2).
Чтобы понять революцию, нужно понять разницу в архитектуре.
Это то, чем был java.lang.Thread раньше.
- 1 Java Поток = 1 Поток ОС.
- Создание дорогое (системный вызов).
- Переключение контекста управляется операционной системой (дорого).
- Если поток вызывает
socket.read()и ждет байты из сети, поток ОС блокируется и просто занимает память, ничего не делая.
Это легковесные сущности, управляемые самой JVM, а не операционной системой.
- M Виртуальных потоков = N Потоков ОС.
- Виртуальный поток — это просто объект в куче (Java Heap). Создать его так же дешево, как создать
String. - Вы можете создать миллион виртуальных потоков на ноутбуке с 8 ГБ RAM.
Как это работает «под капотом»?
- У JVM есть небольшой пул обычных потоков (Platform Threads), которые называются Потоки-носители (Carrier Threads). Обычно их количество равно числу ядер CPU.
- Когда виртуальный поток хочет поработать (выполнять код), JVM монтирует (mounts) его на поток-носитель. Носитель выполняет байт-код.
- Самое главное: Когда ваш код вызывает блокирующую операцию (например,
Thread.sleep(),jdbc.query(),client.send()), JVM замечает это. - Вместо того чтобы блокировать драгоценный поток-носитель, JVM размонтирует (unmounts) виртуальный поток.
- Состояние виртуального потока (его стек) сохраняется в кучу (Heap).
- Поток-носитель освобождается и немедленно берет другой виртуальный поток.
- Когда операция I/O завершается (данные пришли из БД), ОС сигнализирует JVM, и планировщик снова ставит виртуальный поток в очередь на выполнение.
Итог: Вы пишете Thread.sleep(1000), но реальный поток ОС не спит ни наносекунды. Он переключается на полезную работу.
В Java 21 больше не нужно экономить потоки и использовать пулы (Pools) для ограничения их количества. Антипаттерн Java 21: Использовать Thread Pool для виртуальных потоков. Паттерн Java 21: Создавать новый виртуальный поток на каждую задачу.
package com.example.loom;
import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
public class VirtualThreadsDemo {
public static void main(String[] args) {
// Старый способ (для сравнения):
// ExecutorService pool = Executors.newFixedThreadPool(100);
// --- НОВЫЙ СПОСОБ (Java 21) ---
// Этот экзекьютор не имеет очереди и не использует пул в привычном понимании.
// Он создает НОВЫЙ виртуальный поток для КАЖДОГО submit().
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
Instant start = Instant.now();
// Запускаем 100 000 задач!
// На Platform Threads это убило бы JVM.
for (int i = 0; i < 100_000; i++) {
int index = i;
executor.submit(() -> {
// Блокирующая операция (1 секунда сна)
// Виртуальный поток "размонтируется", поток ОС свободен.
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// ignore
}
// Полезно знать: VThread не имеет имени по умолчанию, если не задать.
// System.out.println("Task " + index + " done");
});
}
// Блок try-with-resources автоматически вызовет executor.close(),
// который эквивалентен shutdown() + awaitTermination().
// То есть main будет ждать завершения всех 100 000 задач.
}
Instant end = Instant.now();
// Время выполнения будет чуть больше 1 секунды + накладные расходы на создание объектов.
// Это доказывает, что задачи выполнялись ПАРАЛЛЕЛЬНО, используя всего ~8-16 потоков ОС.
System.out.println("Готово за: " + Duration.between(start, end).toMillis() + " мс");
}
}Если вам не нужен Executor, можно использовать билдер:
// Запуск и ожидание (join)
Thread vThread = Thread.ofVirtual()
.name("My-Virtual-Worker")
.start(() -> {
System.out.println("Привет из Loom!");
});
vThread.join();Виртуальные потоки идеальны, но есть нюанс. Существует ситуация, когда виртуальный поток прилипает (pinned) к носителю и не может быть размонтирован во время блокировки.
Это происходит в двух случаях:
- Внутри блока
synchronized. - При вызове Native метода (JNI).
Проблема: Если вы сделаете тяжелую I/O операцию внутри synchronized блока, вы заблокируете реальный поток ОС. Если все носители заблокируются — приложение встанет.
Решение:
В новом коде под Java 21 старайтесь использовать ReentrantLock вместо synchronized, если внутри есть блокирующие операции (I/O, Sleep).
// --- ПЛОХО для Virtual Threads (если внутри I/O) ---
synchronized(lock) {
socket.read(); // <-- Carrier Thread заблокирован! (Pinned)
}
// --- ХОРОШО для Virtual Threads ---
lock.lock();
try {
socket.read(); // <-- Virtual Thread размонтируется, Carrier свободен.
} finally {
lock.unlock();
}Примечание: Разработчики JDK работают над тем, чтобы
synchronizedтоже перестал пинить потоки в будущих версиях, но в Java 21 это пока актуально.
- ✅ High-Throughput Servers (Веб-серверы, Микросервисы): Spring Boot 3.2+, Tomcat, Jetty уже поддерживают это. Один запрос — один виртуальный поток.
- ✅ I/O Bound задачи: Работа с БД, сетью, файлами.
- ❌ CPU Bound задачи: Сложные математические вычисления, кодирование видео. Здесь виртуальные потоки не помогут (так как потоки реально заняты вычислениями, им некогда размонтироваться). Для этого используйте Parallel Streams.
- Virtual Threads возвращают нас к простому стилю программирования "Thread-per-Request", но с производительностью асинхронного кода.
- Используйте
Executors.newVirtualThreadPerTaskExecutor(). - Блокирующие операции (
sleep, I/O) теперь бесплатны для потоков-носителей. - Остерегайтесь Pinning: избегайте долгого I/O внутри
synchronized.
Давайте посмотрим на классический подход с ExecutorService, который мы использовали в Главе 6 и 7.
// Псевдокод проблемы
Future<String> userFuture = es.submit(() -> findUser());
Future<String> orderFuture = es.submit(() -> findOrder());
try {
String user = userFuture.get(); // Допустим, здесь вылетела ошибка!
String order = orderFuture.get();
return new Response(user, order);
} catch (Exception e) {
// МЫ ЗДЕСЬ. Мы выходим из метода с ошибкой.
// А что происходит с задачей findOrder()?
// ОНА ПРОДОЛЖАЕТ РАБОТАТЬ В ФОНЕ!
throw new RuntimeException("Fail", e);
}Что здесь плохого?
- Осиротевшие потоки:
findOrderпродолжает потреблять ресурсы (БД, CPU), хотя его результат нам уже не нужен (мы упали наfindUser). - Сложная отмена: Чтобы сделать правильно, нам нужно в блоке
catchвручную вызыватьorderFuture.cancel(true). А если задач 10? Это куча бойлерплейт-кода. - Разорванная логика: Структура кода (блоки
{}) не соответствует времени жизни потоков. Потоки живут своей жизнью вне блока, где они были созданы.
Принцип прост: «Если задача распадается на подзадачи, все они должны завершиться (успешно или нет) до того, как завершится родительская задача».
Это делает многопоточный код похожим на обычный однопоточный блок кода. Вход в блок — старт потоков. Выход из блока — гарантированное завершение всех потоков.
Инструмент для этого — StructuredTaskScope.
Самый частый сценарий: «Запусти 3 задачи параллельно. Если хоть одна упадет — отмени остальные и выбрось ошибку. Если все успешны — собери результаты».
Раньше это требовало CompletableFuture.allOf() и сложной магии. Теперь это выглядит так:
package com.example.structured;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.StructuredTaskScope;
import java.util.function.Supplier;
public class StructuredConcurrencyDemo {
public static void main(String[] args) {
try {
Response response = handleRequest();
System.out.println("Результат: " + response);
} catch (Exception e) {
System.err.println("Обработка не удалась: " + e.getMessage());
}
}
record Response(String user, String order) {}
static Response handleRequest() throws ExecutionException, InterruptedException {
// Создаем Scope. Это как try-with-resources.
// Как только мы выйдем из блока try, scope закроется и гарантирует,
// что все потоки завершены.
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
// 1. Разветвление (Fork)
// Вместо submit() используем fork(). Он возвращает Supplier (не Future!).
Supplier<String> userTask = scope.fork(() -> {
Thread.sleep(1000);
// Имитация ошибки: раскомментируйте, чтобы проверить отмену второй задачи
// if (true) throw new RuntimeException("БД пользователей недоступна!");
return "User: Alice";
});
Supplier<String> orderTask = scope.fork(() -> {
Thread.sleep(2000); // Эта задача дольше
return "Order: #12345";
});
// 2. Ожидание (Join)
// Ждем завершения ВСЕХ задач ИЛИ первой ошибки.
// Если userTask упадет через 1 сек, scope мгновенно ОТМЕНИТ orderTask (interrupt).
scope.join();
// 3. Проверка ошибок
// Если была ошибка в любом потоке, этот метод выбросит её здесь.
scope.throwIfFailed();
// 4. Сборка результатов
// Раз мы здесь, значит все задачи успешны.
return new Response(userTask.get(), orderTask.get());
}
// Здесь scope.close() вызывается автоматически.
// Он убедится, что все виртуальные потоки закрыты.
}
}Преимущества:
- Автоматическая отмена: Если
userTaskпадает,orderTaskполучает сигнал прерывания мгновенно. Экономия ресурсов! - Чистый код: Логика ошибок в одном месте (
throwIfFailed). - Observability: В дампах потоков (Thread Dump) эти потоки будут показаны как иерархия (дети
handleRequest), а не как куча разрозненных задач.
Сценарий: «У нас есть 3 сервера погоды. Опроси все параллельно. Кто ответит первым — того результат и берем. Остальных отменяем».
static String getWeather() throws ExecutionException, InterruptedException {
try (var scope = new StructuredTaskScope.ShutdownOnSuccess<String>()) {
scope.fork(() -> fetchFrom("Server A", 3000));
scope.fork(() -> fetchFrom("Server B", 1000)); // Самый быстрый
scope.fork(() -> fetchFrom("Server C", 2000));
// join() будет ждать ПЕРВОГО успешного результата.
// Как только Server B ответит, scope автоматически отменит A и C.
scope.join();
return scope.result();
}
}
static String fetchFrom(String source, int delay) throws InterruptedException {
Thread.sleep(delay);
System.out.println(source + " ответил!");
return "Weather from " + source;
}Самое мощное — эти scope можно вкладывать друг в друга.
void complexBusinessLogic() {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
scope.fork(() -> {
// Вложенная задача тоже может открыть свой Scope!
// Иерархия сохраняется.
return getDetailedUserData();
});
scope.fork(() -> getOrders());
scope.join();
scope.throwIfFailed();
}
}Если родительская задача будет отменена, отмена каскадно пройдет вниз по дереву во все вложенные Scope.
- Structured Concurrency делает работу с потоками такой же предсказуемой, как работу с обычными блоками кода.
- Используйте
StructuredTaskScopeвместоExecutorService, когда вам нужно разбить задачу на подзадачи и собрать результат. ShutdownOnFailure: "Нужны результаты всех, или падаем".ShutdownOnSuccess: "Нужен результат самого быстрого".- Главный бонус — автоматическая отмена (Propagation of Cancellation) ненужных задач.
Десятилетиями мы использовали ThreadLocal для хранения данных, привязанных к потоку. Это работало сносно, когда потоков было 200 штук (в пуле). Но с приходом Виртуальных потоков (которых могут быть миллионы), архитектурные недостатки ThreadLocal стали фатальными.
- Неограниченная мутабельность:
Любой метод в любой точке кода может вызвать
get()иset(). Это создает неявные связи и «спагетти» из данных. Вы никогда не знаете, кто и когда изменил значение. - Утечки памяти (Unbounded Lifetime):
Данные живут до тех пор, пока жив поток (или пока не вызовут
remove()). В классических Thread Pools потоки живут вечно. Если разработчик забыл сделатьremove(), данные (например, тяжелый UserContext) остаются в памяти навсегда. - Дорогое наследование (The Fatal Flaw for Loom):
Чтобы передать контекст от родительского потока к дочернему, мы использовали
InheritableThreadLocal. Он копирует всю карту значений при создании потока.
- Представьте, что у вас 1 000 000 виртуальных потоков.
- Копирование карты для каждого из них убивает производительность и память.
ScopedValue (Значения с областью видимости) — это современная альтернатива.
Ключевые отличия:
- Иммутабельность: Значение записывается один раз при старте скоупа. Внутри скоупа его нельзя изменить (
setметода нет). Можно только создать вложенный скоуп с новым значением (shadowing). - Ограниченное время жизни: Значение доступно только во время выполнения лямбды или метода. Как только блок кода завершился — значение исчезает. Никаких
remove()и утечек. - Эффективное наследование: Никакого копирования данных. Дочерние виртуальные потоки (внутри StructuredTaskScope) получают доступ к значению через умную ссылку. Это "бесплатно" по памяти.
В Java 21 это preview-фича, поэтому синтаксис может выглядеть непривычно.
package com.example.scoped;
public class ScopedValueDemo {
// 1. Объявляем ключ (аналог static final ThreadLocal)
// Обычно public static, чтобы был доступ из любой точки кода
public static final ScopedValue<String> CURRENT_USER = ScopedValue.newInstance();
public static void main(String[] args) {
// Попытка прочитать здесь вернет null (или ошибку, если использовать get())
System.out.println("Outside: " + CURRENT_USER.orElse("No User"));
// 2. Связываем значение (Binding) и запускаем задачу
// where(КЛЮЧ, ЗНАЧЕНИЕ).run(ЗАДАЧА)
ScopedValue.where(CURRENT_USER, "Alice")
.run(() -> processRequest());
// Здесь значение уже недоступно
System.out.println("Finished.");
}
// Метод где-то глубоко в цепочке вызовов
static void processRequest() {
System.out.println("Processing for: " + CURRENT_USER.get());
// Вызов еще одного метода...
dbCall();
}
static void dbCall() {
// Значение все еще доступно
if ("Alice".equals(CURRENT_USER.get())) {
System.out.println("DB Access Granted");
}
}
}Вот где происходит настоящая магия. ScopedValue идеально работает в тандеме со StructuredTaskScope. Значения автоматически "просачиваются" в дочерние потоки.
import java.util.concurrent.StructuredTaskScope;
public class ScopePropagationDemo {
private static final ScopedValue<String> REQUEST_ID = ScopedValue.newInstance();
public static void main(String[] args) {
// Допустим, это вход в контроллер
ScopedValue.where(REQUEST_ID, "req-123-abc")
.run(ScopePropagationDemo::handle);
}
static void handle() {
// Мы находимся в контексте ScopedValue
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
// Запускаем 3 виртуальных потока параллельно.
// НАМ НЕ НУЖНО передавать REQUEST_ID вручную!
// JVM делает это автоматически и эффективно.
scope.fork(() -> worker("DB"));
scope.fork(() -> worker("Cache"));
scope.fork(() -> worker("Log"));
scope.join();
} catch (Exception e) {
e.printStackTrace();
}
}
static String worker(String name) {
// Каждый виртуальный поток видит значение из родительского scope
System.out.printf("[%s] Working on Request: %s%n",
name, REQUEST_ID.get());
return "OK";
}
}Что если нам нужно временно сменить пользователя для куска кода? Мы не можем изменить значение (оно immutable), но мы можем создать вложенный скоуп.
ScopedValue.where(CURRENT_USER, "Admin").run(() -> {
System.out.println(CURRENT_USER.get()); // Admin
// Временно меняем контекст для вложенного блока
ScopedValue.where(CURRENT_USER, "System").run(() -> {
System.out.println(CURRENT_USER.get()); // System
});
// Вернулись обратно
System.out.println(CURRENT_USER.get()); // Admin
});Это безопасно, так как внешний код гарантированно получит своё значение Admin обратно после выхода из внутреннего блока.
- Scoped Values — это безопасная, быстрая и неизменяемая замена
ThreadLocal. - Они решают проблему накладных расходов памяти при использовании миллионов Виртуальных потоков.
- Значение привязывается к лексической области видимости кода (блоку), а не к жизни потока.
- Идеально работают со Structured Concurrency: контекст автоматически и дешево передается в подзадачи.
- Важно: В Java 21 это Preview API, поэтому API может незначительно измениться в Java 22/23.
В школе нас учили: «Программа выполняется строчка за строчкой, сверху вниз». В многопоточной среде это ложь.
Java Memory Model (JMM) — это спецификация (часть JLS), которая описывает, как потоки взаимодействуют через память. Это набор правил, которые гарантируют (или не гарантируют), увидит ли Поток Б изменения, сделанные Потоком А.
Ради производительности ваш код оптимизируют три участника:
- Компилятор (javac/JIT): Может менять местами инструкции, если считает, что это не повлияет на результат в рамках одного потока.
- Процессор: Выполняет инструкции вне очереди (Out-of-Order Execution), чтобы задействовать все блоки ALU.
- Система памяти (Caches): Данные могут записаться в кеш в одном порядке, а сброситься в RAM в другом.
public class ReorderingDemo {
int x = 0;
boolean flag = false;
// Выполняет Поток 1
public void writer() {
x = 42; // (1)
flag = true; // (2)
}
// Выполняет Поток 2
public void reader() {
if (flag) { // (3)
// Вопрос: Что мы увидим?
// Ожидание: 42
// Реальность JMM: Может быть 0!
System.out.println(x);
}
}
}Почему может быть 0?
С точки зрения Потока 1, переменные x и flag никак не зависят друг от друга. Компилятор или процессор могут решить: «Записать flag = true быстрее, чем x = 42. Давайте поменяем их местами».
Поток 2 увидит flag = true, зайдет внутрь if, но x все еще будет 0.
Чтобы запретить этот хаос, JMM вводит понятие Happens-Before. Это не про физическое время. Это про гарантию видимости.
Если Операция А happens-before Операция Б, это значит:
- А произошло раньше Б.
- Все изменения памяти, сделанные в А, гарантированно видны в Б.
Если между двумя операциями нет связи happens-before, JVM имеет право переставлять их как угодно.
- Program Order Rule: В рамках одного потока все команды выполняются так, как написано (семантически).
- Monitor Lock Rule: Освобождение монитора (
unlock/ выход изsynchronized) happens-before любой последующий захват (lock) того же монитора.
- Перевод: Всё, что сделал поток до выхода из synchronized, увидит следующий поток, вошедший в synchronized.
- Volatile Variable Rule: Запись в
volatileпеременную happens-before любое последующее чтение этой же переменной. - Thread Start Rule: Вызов
thread.start()happens-before первой инструкции в методеrun()этого потока. - Thread Join Rule: Возврат из
thread.join()происходит после завершения всех операций в потоке.
Зная правила выше, можно использовать интересный трюк (хотя злоупотреблять им не стоит).
Правило volatile работает как барьер памяти.
// Пример "Piggybacking"
int a = 0;
int b = 0;
int c = 0;
volatile boolean v = false;
public void writer() {
a = 1; // Обычная запись
b = 2; // Обычная запись
c = 3; // Обычная запись
// Запись в volatile создает барьер ("Memory Fence")
// JMM гарантирует: все, что было ДО этой строки,
// будет сброшено в память ДО записи v.
v = true;
}
public void reader() {
if (v) { // Чтение volatile
// Если мы увидели v == true, то по правилу Happens-Before
// мы ОБЯЗАНЫ увидеть и a=1, b=2, c=3.
System.out.println("a=" + a + ", b=" + b + ", c=" + c);
}
}Классический пример собеседования: «Напишите Singleton».
public class Singleton {
private static Singleton instance;
public static Singleton getInstance() {
if (instance == null) { // Первая проверка (без блокировки)
synchronized (Singleton.class) {
if (instance == null) { // Вторая проверка (с блокировкой)
// ОПАСНОСТЬ ЗДЕСЬ
instance = new Singleton();
}
}
}
return instance;
}
}Почему это сломается?
Строка instance = new Singleton() не атомарна. Она состоит из 3 шагов:
- Выделить память под объект.
- Вызвать конструктор (инициализировать поля).
- Присвоить ссылку на память переменной
instance.
Компилятор может переупорядочить шаги 2 и 3.
- Память выделена.
- Ссылка записана в
instance(она уже не null!). - Конструктор еще не отработал.
Другой поток заходит, видит, что instance != null, хватает ссылку и пытается использовать полуинициализированный объект (поля равны 0 или null).
public class Singleton {
// volatile запрещает переупорядочивание записи в instance
// относительно вызова конструктора.
private static volatile Singleton instance;
public static Singleton getInstance() {
if (instance == null) {
synchronized (Singleton.class) {
if (instance == null) {
instance = new Singleton();
}
}
}
return instance;
}
}У ключевого слова final в JMM есть особый статус.
Если поле помечено как final, JMM гарантирует, что после завершения конструктора любой поток увидит корректное значение этого поля.
Это объясняет, почему Immutability (неизменяемость) — лучшая стратегия для многопоточности. Если все поля final, объект публикуется безопасно автоматически, без синхронизации.
- Reordering: JVM и CPU меняют порядок команд ради скорости. Не полагайтесь на порядок строк кода.
- Happens-Before: Единственный способ гарантировать порядок — создать цепочку happens-before (через
volatile,synchronizedили старт потока). - Volatile: Это не только видимость. Это запрет на переупорядочивание инструкций вокруг переменной.
- DCL: Singleton должен быть
volatile, иначе рискуете получить сломанный объект. - Safe Publication: Используйте
finalполя или статические инициализаторы для безопасной публикации объектов.
Вы написали идеальный многопоточный код. Блокировок минимум, алгоритм верный. Но программа работает медленнее, чем однопоточная. Почему?
Скорее всего, вы столкнулись с физикой работы CPU.
Процессоры не читают из памяти по одному байту. Они читают блоками, которые называются Cache Line (Кеш-линия). Обычно это 64 байта.
Представьте, что у вас есть класс с двумя полями volatile long:
class Data {
volatile long a; // 8 байт
volatile long b; // 8 байт
}В памяти эти поля лежат рядом. Скорее всего, они попадут в одну кеш-линию.
Сценарий катастрофы:
- Ядро 1 (Поток А) хочет изменить поле
a. Оно загружает всю кеш-линию (64 байта, включаяb) в свой L1 Cache. - Ядро 2 (Поток Б) хочет изменить поле
b. Оно тоже хочет эту кеш-линию. - Протокол когерентности кешей (MESI) говорит: «Стоп! Эту линию меняет Ядро 1. Ядро 2, жди, пока Ядро 1 сбросит изменения, и линия станет невалидной».
- Ядра начинают играть в «пинг-понг» этой кеш-линией. Данные летают между ядрами, шина забита, производительность падает в 10-100 раз.
Логически переменные разные (a и b), но физически они разделяют одну «коммуналку» (кеш-линию). Это и есть False Sharing.
Раньше разработчики вручную добавляли мусорные переменные, чтобы «раздвинуть» полезные данные:
class PaddedData {
volatile long a;
long p1, p2, p3, p4, p5, p6, p7; // Мусор (56 байт)
volatile long b;
}В Java появилась специальная аннотация jdk.internal.vm.annotation.Contended (или sun.misc.Contended в старых версиях). Она говорит JVM: «Пожалуйста, помести это поле на отдельную кеш-линию». JVM сама добавит нужные отступы (padding) в байт-код.
import jdk.internal.vm.annotation.Contended;
public class FalseSharingFixed {
@Contended
volatile long a;
@Contended
volatile long b;
}Важно: Чтобы аннотация заработала, нужно запустить JVM с флагом
-XX:-RestrictContended.
Это то, за что мы платим, используя потоки.
Когда ОС решает, что Потоку А пора отдохнуть (истек квант времени или он заблокировался на I/O), происходит следующее:
- Сохранить регистры процессора и указатель инструкций (PC) в память.
- Сбросить кеши (частично), так как новые данные могут быть не нужны.
- Загрузить состояние Потока Б.
- Запустить Поток Б.
Это занимает тысячи тактов процессора (много микросекунд). Если у вас 4 ядра и 4000 активных потоков, которые постоянно переключаются, процессор будет тратить 90% времени на переключение и только 10% на полезную работу.
Здесь Virtual Threads сияют.
- Виртуальные потоки переключаются в пространстве пользователя (JVM), а не ядра ОС.
- Это намного дешевле (почти как вызов функции).
- Кеши не сбрасываются так агрессивно, так как поток-носитель (Carrier Thread) остается тот же.
Вывод: Используйте Виртуальные потоки для I/O-задач, чтобы минимизировать цену переключения.
На собеседованиях любят спрашивать: «Если я добавлю еще 10 ядер, программа станет работать в 10 раз быстрее?»
Ответ дает Закон Амдала:
Ускорение программы ограничено её последовательной частью.
Если в вашей программе есть synchronized блок, который занимает 5% времени выполнения, то максимальное теоретическое ускорение (даже с бесконечным числом процессоров) = 20 раз (1 / 0.05).
Если 50% кода последовательно — максимальное ускорение 2 раза.
Практический вывод: Не пытайтесь параллелить всё подряд. Оптимизируйте или убирайте точки синхронизации (узкие места). Именно они определяют предел масштабируемости, а не количество ядер.
- CPU Cache matters: Думайте о том, как данные лежат в памяти.
- False Sharing: Если разные потоки часто пишут в соседние переменные — разнесите их (
@Contended). - Context Switch: Платформенные потоки дороги. Если их слишком много, CPU греет воздух.
- Amdahl's Law: Синхронизация убивает масштабируемость. Лучшая синхронизация — её отсутствие (неизменяемые данные, локальные переменные).
С выходом Java 21 некоторые старые паттерны умерли, а другие возродились.
Лучший способ избежать Race Condition — не иметь изменяемого состояния вообще. Если объект нельзя изменить после создания, он автоматически потокобезопасен. Его можно читать миллионом потоков без блокировок.
Инструмент: Java Records (Java 14+).
// Record - это immutable данные "из коробки".
// Все поля final, нет сеттеров.
public record UserData(String name, List<String> roles) {
// Обязательно делаем защитную копию изменяемых коллекций в конструкторе!
// Иначе кто-то снаружи изменит список roles, и иммутабельность сломается.
public UserData {
roles = List.copyOf(roles);
}
}Долгое время мы использовали Реактивное программирование (WebFlux, RxJava), чтобы экономить потоки. С приходом Virtual Threads мы возвращаемся к простому паттерну: «Один запрос — Один (виртуальный) поток».
Код становится линейным, простым, но таким же эффективным.
Классика для разделения ответственности. Одни потоки генерируют задачи, другие их разгребают.
Используйте каналы (BlockingQueue) для связи. Это позволяет независимо масштабировать количество производителей и потребителей.
Главная ошибка новичка — вставлять Thread.sleep() в тесты.
- Проблема 1: Тесты становятся медленными.
- Проблема 2: Тесты становятся «мигающими» (Flaky). На вашей машине 100мс хватает, а на медленном CI/CD сервере — нет, и тест падает.
Библиотека, которая позволяет писать условия в стиле «Жди, пока условие не выполнится, но не дольше X секунд».
// Зависимость Maven: org.awaitility:awaitility
@Test
void testAsyncOperation() {
AtomicBoolean flag = new AtomicBoolean(false);
// Запускаем асинхронную операцию
new Thread(() -> {
doHeavyWork();
flag.set(true);
}).start();
// ПЛОХО: Thread.sleep(1000);
// Если операция закончится за 10мс, мы зря ждем 990мс.
// Если за 1100мс - тест упадет.
// ХОРОШО: Awaitility
// Проверяет условие каждые 100мс (по умолчанию).
// Как только flag станет true - тест пройдет немедленно.
// Если пройдет 2 секунды - тест упадет с понятной ошибкой.
await().atMost(2, SECONDS).untilTrue(flag);
}Для тестирования Java Memory Model (то, что мы обсуждали в Главе 11: атомарность, reordering) обычные JUnit тесты бесполезны. Вам нужен JCStress (Java Concurrency Stress Tests).
Он запускает один и тот же маленький кусок кода миллионы раз на разных ядрах, пытаясь поймать редкие состояния гонки. Это инструмент для разработчиков библиотек и High-Load систем.
Когда сервер завис в продакшене, дебаггер не подключишь.
Это текстовый отчет о том, что делает каждый поток в данный момент.
Как получить: jstack <pid> или kill -3 <pid>.
Что искать:
- Deadlock: jstack обычно сам пишет "Found one Java-level deadlock" в конце файла.
- Зависшие пулы: Если все потоки пула в состоянии
WAITINGнаqueue.take(), значит нет задач. Если вBLOCKED, значит они подрались за монитор.
Это «черный ящик» для JVM. Он записывает события (GC, блокировки, I/O) с минимальными накладными расходами (< 1%). В Java 21 JFR умеет отлично показывать события виртуальных потоков (когда они монтируются, когда паркуются).
Анализировать файлы .jfr можно через Java Mission Control (JMC).
Обычная блокировка (Mutex) работает по принципу: «Один зашел — остальные ждут». Это безопасно, но неэффективно, если у вас сценарий Read-Heavy: 99% времени потоки только читают данные, и лишь 1% времени кто-то их меняет. Зачем блокировать читателей друг от друга?
Для этого существуют специальные механизмы.
Этот замок состоит из пары блокировок: одна для чтения, одна для записи.
- Read Lock (Shared): Его могут захватить много потоков одновременно, если никто не держит Write Lock.
- Write Lock (Exclusive): Его может захватить только один поток, и только если никто не держит ни Read, ни Write Lock.
package com.example.locks;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class CacheDemo {
private final Map<String, String> cache = new HashMap<>();
// Создаем пару замков
private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
private final Lock readLock = rwLock.readLock();
private final Lock writeLock = rwLock.writeLock();
// Метод чтения: могут выполнять 1000 потоков одновременно!
public String get(String key) {
readLock.lock();
try {
// Читаем безопасно, зная, что никто не пишет в этот момент
return cache.get(key);
} finally {
readLock.unlock();
}
}
// Метод записи: эксклюзивный доступ
public void put(String key, String value) {
writeLock.lock();
try {
// Пока мы здесь, все читатели (get) ждут у двери
cache.put(key, value);
} finally {
writeLock.unlock();
}
}
}Проблема: У этого лока есть проблема «Голодания писателей» (Writer Starvation). Если поток читателей непрерывен (новые приходят раньше, чем уходят старые), поток-писатель может вечно ждать момента, когда Read Lock полностью освободится.
StampedLock был создан, чтобы решить проблемы ReentrantReadWriteLock. Он работает быстрее и предлагает киллер-фичу: Оптимистичное чтение (Optimistic Read).
Идея: «Я не буду брать блокировку сразу. Я быстро прочитаю данные и получу "штамп" (версию). Потом проверю: не изменился ли штамп, пока я читал? Если нет — супер, я сэкономил кучу времени. Если да — перечитаю нормально с блокировкой».
Важно:
StampedLockНЕ реентерабелен (non-reentrant). Если вы внутри лока попытаетесь взять его еще раз в том же потоке — вы получите Deadlock.
Этот паттерн сложнее, но он критически важен для High-Load систем.
import java.util.concurrent.locks.StampedLock;
public class Point {
private double x, y;
private final StampedLock sl = new StampedLock();
// Метод записи (Эксклюзивный)
public void move(double deltaX, double deltaY) {
long stamp = sl.writeLock(); // Блокируем всё
try {
x += deltaX;
y += deltaY;
} finally {
sl.unlockWrite(stamp); // Освобождаем по штампу
}
}
// Метод Оптимистичного Чтения
public double distanceFromOrigin() {
// 1. Попытка оптимистичного чтения (БЕЗ БЛОКИРОВКИ)
long stamp = sl.tryOptimisticRead();
// Копируем переменные в локальный стек
double currentX = x;
double currentY = y;
// 2. Проверка валидности (Validate)
// Если за время копирования кто-то вызвал writeLock(), validate вернет false
if (!sl.validate(stamp)) {
// План Б: Оптимизм не оправдался, переключаемся на пессимистичную блокировку
stamp = sl.readLock(); // Поток может заблокироваться здесь
try {
currentX = x;
currentY = y;
} finally {
sl.unlockRead(stamp);
}
}
// Вычисления над локальными копиями (безопасно)
return Math.sqrt(currentX * currentX + currentY * currentY);
}
}Почему это круто? В 99% случаев validate вернет true. Это значит, что чтение произошло со скоростью обычного доступа к памяти (почти как volatile), вообще без накладных расходов на синхронизацию OS.
Если Lock управляет доступом к данным («могу я войти?»), то Semaphore управляет количеством ресурсов («есть ли свободные слоты?»).
Представьте парковку на 5 мест.
- Приехала машина ->
acquire()(мест стало 4). - Мест нет -> машина ждет у шлагбаума.
- Машина уехала ->
release()(мест стало +1, шлагбаум открылся для следующего).
Мы не хотим, чтобы к нашей базе данных шло больше 10 запросов одновременно, иначе она упадет.
import java.util.concurrent.Semaphore;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class SemaphoreDemo {
// Только 3 разрешения одновременно
private static final Semaphore PERMITS = new Semaphore(3);
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(10);
for (int i = 0; i < 10; i++) {
int user = i;
executor.submit(() -> {
try {
// Запрос разрешения. Если занято - поток БЛОКИРУЕТСЯ здесь.
PERMITS.acquire();
System.out.println("User " + user + " работает с БД...");
Thread.sleep(1000); // Имитация долгого запроса
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
// ОБЯЗАТЕЛЬНО возвращаем разрешение в finally
System.out.println("User " + user + " закончил.");
PERMITS.release();
}
});
}
executor.shutdown();
}
}При запуске вы увидите, что сообщения «работает с БД» появляются пачками по 3 штуки, хотя пул потоков имеет размер 10.
ReentrantReadWriteLock: Используйте для коллекций, где много читателей и мало писателей.StampedLock: Используйте вместоReadWriteLock, если нужна экстремальная производительность на чтение (черезtryOptimisticRead). Но будьте осторожны со сложностью кода.Semaphore: Используйте для ограничения ресурсов (соединений, файлов, запросов к внешнему API).
В пакете java.util.concurrent есть три мушкетера для управления потоком исполнения: CountDownLatch, CyclicBarrier и Phaser.
Разберем их от простого к сложному.
Самый простой и популярный инструмент. Метафора: Запуск ракеты. "3, 2, 1, Пуск!".
- Устанавливаем счетчик на число N.
- Один или несколько потоков ждут (
await()), пока счетчик не станет 0. - Другие потоки уменьшают счетчик (
countDown()). - Как только счетчик равен 0, ожидающие потоки «срываются с цепи» и продолжают работу.
- Важно: Это одноразовый инструмент. Сбросить счетчик и использовать заново нельзя.
import java.util.concurrent.CountDownLatch;
public class LatchDemo {
public static void main(String[] args) throws InterruptedException {
// Нам нужно дождаться инициализации 3-х сервисов
CountDownLatch latch = new CountDownLatch(3);
// Запускаем сервисы в отдельных потоках
new Thread(new Service("Database", 2000, latch)).start();
new Thread(new Service("Cache", 1000, latch)).start();
new Thread(new Service("Logging", 500, latch)).start();
System.out.println("Main: Жду инициализации сервисов...");
// Главный поток БЛОКИРУЕТСЯ здесь, пока latch не станет 0
latch.await();
System.out.println("Main: Все сервисы готовы. Запускаем сервер!");
}
static class Service implements Runnable {
String name;
int delay;
CountDownLatch latch;
public Service(String name, int delay, CountDownLatch latch) {
this.name = name;
this.delay = delay;
this.latch = latch;
}
@Override
public void run() {
try {
Thread.sleep(delay);
System.out.println(name + " is UP");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// Уменьшаем счетчик. Даже если была ошибка,
// мы должны сообщить, что "закончили" (успешно или нет),
// иначе Main будет ждать вечно.
latch.countDown();
}
}
}
}Более сложный инструмент для групповой работы. Метафора: Туристическая группа. "Встречаемся у водопада. Кто пришел первый — ждет остальных. Когда соберутся все 10 человек, гид раздает бутерброды (Barrier Action), и все идут дальше к горе".
- Инициализируется количеством участников N.
- Потоки вызывают
await()и блокируются. - Когда
await()вызвал N-й поток, барьер "падает". - Опционально выполняется
barrierAction(в последнем пришедшем потоке). - Важно: Барьер многоразовый. После прохода его можно использовать снова для следующего этапа вычислений.
Идеально для матричных вычислений, генетических алгоритмов или обработки изображений.
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class BarrierDemo {
public static void main(String[] args) {
int workers = 3;
// Действие, которое выполнится, когда ВСЕ соберутся у барьера
Runnable barrierAction = () -> System.out.println(">>> ЭТАП ЗАВЕРШЕН. Объединяем результаты...\n");
CyclicBarrier barrier = new CyclicBarrier(workers, barrierAction);
for (int i = 0; i < workers; i++) {
new Thread(new Worker(i, barrier)).start();
}
}
static class Worker implements Runnable {
int id;
CyclicBarrier barrier;
public Worker(int id, CyclicBarrier barrier) {
this.id = id;
this.barrier = barrier;
}
@Override
public void run() {
try {
// Этап 1
System.out.println("Worker " + id + " делает часть 1...");
Thread.sleep((long) (Math.random() * 1000));
System.out.println("Worker " + id + " ждет у барьера 1");
barrier.await(); // Ждем остальных
// Как только все прошли барьер 1, начинаем Этап 2
// (Барьер автоматически сбросился)
System.out.println("Worker " + id + " делает часть 2...");
Thread.sleep((long) (Math.random() * 1000));
System.out.println("Worker " + id + " ждет у барьера 2");
barrier.await(); // Снова ждем
System.out.println("Worker " + id + " закончил работу.");
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
}
}Это «CyclicBarrier на стероидах», появившийся в Java 7. Самый гибкий и мощный синхронизатор.
В чем отличия от CyclicBarrier?
- Динамическое число участников: В барьере число фиксировано (N). В
Phaserпотоки могут регистрироваться (register()) и уходить (arriveAndDeregister()) прямо во время работы. - Фазы: Он нумерует этапы (Phase 0, Phase 1...).
Метафора: Проектная команда. На этапе "Анализ" работают 2 аналитика. Они закончили, фаза сменилась. На этапе "Разработка" к ним присоединяются 5 разработчиков (register). На этапе "Тестирование" разработчики уходят (deregister), приходят тестировщики.
import java.util.concurrent.Phaser;
public class PhaserDemo {
public static void main(String[] args) {
// Регистрация 1 участника (Main поток)
Phaser phaser = new Phaser(1);
System.out.println("Фаза 0: Запуск потоков");
// Запускаем 3 рабочих потока
for (int i = 0; i < 3; i++) {
// Регистрируем каждого нового участника
phaser.register();
new Thread(new Task(phaser), "Thread-" + i).start();
}
// Main поток сообщает, что он свою часть фазы 0 сделал,
// и ждет остальных (как barrier.await())
phaser.arriveAndAwaitAdvance();
System.out.println("Фаза 0 завершена. Все потоки готовы.");
// ... Работа продолжается в следующей фазе ...
// В конце Main снимается с регистрации
phaser.arriveAndDeregister();
}
static class Task implements Runnable {
Phaser phaser;
Task(Phaser phaser) { this.phaser = phaser; }
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " прибыл к фазе 0");
// Сообщаем о прибытии и ждем остальных
phaser.arriveAndAwaitAdvance();
System.out.println(Thread.currentThread().getName() + " работает в фазе 1...");
try { Thread.sleep(500); } catch (Exception e) {}
// Завершаем работу и выходим из фазера
System.out.println(Thread.currentThread().getName() + " уходит");
phaser.arriveAndDeregister();
}
}
}CountDownLatch: Если вам нужно один раз подождать завершения события (старт сервисов, конец загрузки файла).CyclicBarrier: Если у вас фиксированная группа потоков, которая выполняет итеративный алгоритм (шаг 1, синхронизация, шаг 2, синхронизация).Phaser: Если логика сложная, потоки появляются и исчезают динамически, или вам нужен номер текущей фазы.
В сочетании с Virtual Threads (Java 21), эти инструменты становятся очень дешевыми. Блокировка await() на виртуальном потоке не стоит почти ничего, поэтому вы можете координировать тысячи задач с помощью Phaser без страха "положить" процессор.
До Java 21 у нас была проблема C10k (10 000 одновременных соединений). Потоки ОС дорогие. Если у вас 10 000 пользователей и каждый запрос ждет ответа от БД 100мс:
- Thread-per-Request: Вам нужно 10 000 потоков. Сервер падает с
OutOfMemoryError. - Решение: Асинхронный неблокирующий ввод/вывод (NIO). Один поток (Event Loop) обслуживает тысячи запросов. Пока один запрос ждет БД, поток переключается на другой.
Библиотеки RxJava и Project Reactor дали удобный (относительно) API для работы с этим хаосом.
Представьте задачу:
- Найти User ID по имени.
- Найти его последние заказы.
- Вернуть список.
Выглядит как декларативный конвейер.
public Mono<List<Order>> getUserOrders(String username) {
return userRepository.findByName(username) // Возвращает Mono<User>
.flatMap(user -> orderRepository.findAllByUserId(user.getId())) // Возвращает Flux<Order>
.collectList() // Собирает в Mono<List<Order>>
.timeout(Duration.ofSeconds(5)) // Тайм-аут
.onErrorResume(ex -> {
log.error("Error fetching orders", ex);
return Mono.just(Collections.emptyList());
});
}Минусы:
- Когнитивная нагрузка: Нельзя использовать
if/else,for,try-catch. Нужно знать сотни операторов (flatMap,switchMap,zip,concat). - Отладка: Если упадет исключение, Stacktrace будет бесполезен. Он покажет кишки библиотеки Reactor, а не место в вашем коде, где произошла ошибка.
- Context Propagation: Передача
ThreadLocal(MDC логирование, Security Context) превращается в ад.
Мы возвращаемся к старому доброму коду, который выглядит синхронным, но под капотом работает на виртуальных потоках.
// Запускается внутри Virtual Thread
public List<Order> getUserOrders(String username) {
try {
User user = userRepository.findByName(username); // Блокировка (виртуальная)
List<Order> orders = orderRepository.findAllByUserId(user.getId()); // Блокировка (виртуальная)
return orders;
} catch (Exception ex) {
log.error("Error fetching orders", ex);
return Collections.emptyList();
}
}Плюсы:
- Обычные циклы и
try-catch. - Понятный Stacktrace.
- Обычный дебаггер (можно ставить брейкпоинты и идти по шагам).
- Производительность такая же (или лучше), как у WebFlux.
Для типичных REST API, Микросервисов, CRUD-приложений виртуальные потоки убивают необходимость в реактивщине.
Вам больше не нужно писать Mono<User>, вы просто пишете User. Серверы (Tomcat, Jetty) и фреймворки (Spring Boot 3.2+, Helidon Nima, Quarkus) уже умеют запускать каждый запрос в виртуальном потоке.
Вердикт: Если вы начинаете новый веб-проект на Java 21, берите обычный Spring MVC (не WebFlux) и включайте
spring.threads.virtual.enabled=true.
Реактивное программирование — это не только про экономию потоков. Это еще и про обработку потоков данных (Streaming).
Loom не умеет (и не должен уметь) делать две вещи, в которых силен Reactor:
Представьте, что вы читаете данные из Twitter Firehose (10 000 твитов в секунду), а ваша база может писать только 1 000 в секунду.
- Loom: Вы заспамите базу, и она упадет. Или у вас кончится память в очереди.
- Reactor: У потребителя есть механизм сказать источнику: «Пришли мне только 50 элементов, я пока занят».
Если ваша логика выглядит так: «Возьми поток цен акций, объедини с потоком новостей, сгруппируй по окнам в 5 секунд, вычисли среднее и отправь в WebSocket».
На Loom это придется писать вручную с циклами и очередями. На RxJava это делается в 5 строк кода оператором window и zip.
| Характеристика | Reactive (WebFlux/RxJava) | Project Loom (Virtual Threads) |
|---|---|---|
| Стиль кода | Функциональный, декларативный | Императивный, последовательный |
| Кривая обучения | Очень высокая (Operator hell) | Низкая (старая добрая Java) |
| Отладка | Ад | Рай |
| Масштабируемость | Отличная | Отличная |
| Backpressure | Есть "из коробки" | Нет (нужно реализовывать самим) |
| Идеально для | Streaming, WebSocket, Event Processing | REST API, RPC, DB access |
Вывод: Loom не заменяет Реактивное программирование полностью, но он заменяет его в самой популярной нише — высоконагруженных веб-сервисах. Изучать RxJava сейчас стоит, только если вы идете в проект со стримингом данных или поддерживаете существующий реактивный код.
Это архитектурный паттерн, на котором построены самые надежные системы в мире (телекоммуникационные шлюзы Ericsson, WhatsApp, Discord, онлайн-игры). В мире Java/Scala стандартом де-факто является библиотека Akka (или её свободный форк Apache Pekko).
Представьте, что объект — это не просто область в памяти, а живой человек.
- Вы не можете залезть человеку в голову и поменять его мысли (изменить поле переменной).
- Вы можете только отправить ему СМС (Сообщение).
- Человек прочитает СМС, когда освободится, и решит, что делать.
- Изоляция: У каждого Актора свое приватное состояние. Никто извне не может его прочитать или изменить (нет
get/setметодов). - Mailbox (Почтовый ящик): Все сообщения попадают в очередь. Актор обрабатывает их по одному (строго последовательно). Внутри актора многопоточности нет!
- Асинхронность: Отправка сообщения — это "Fire and Forget". Отправитель не блокируется и не ждет ответа.
- Нет блокировок (Lock-free): Раз актор обрабатывает сообщения по очереди, внутри него никогда не нужны
synchronized. Вы просто пишете обычный код. - Let it Crash (Пусть падает): Если Актор сломался (Exception), он не рушит всё приложение. У него есть "Родитель" (Supervisor), который решит: перезапустить актора, остановить или игнорировать ошибку. Это дает феноменальную отказоустойчивость (Self-healing systems).
- Распределенность: Акторам всё равно, где они живут. Вы можете отправить сообщение актору на другой сервер так же легко, как актору в соседнем потоке. Это основа кластеров.
Давайте реализуем банковский счет. В классической Java нам бы пришлось использовать synchronized или AtomicLong. В Акторах мы просто пишем логику обработки команд.
Примечание: Используется синтаксис Akka/Pekko Typed.
package com.example.actors;
import org.apache.pekko.actor.typed.ActorRef;
import org.apache.pekko.actor.typed.Behavior;
import org.apache.pekko.actor.typed.javadsl.AbstractBehavior;
import org.apache.pekko.actor.typed.javadsl.ActorContext;
import org.apache.pekko.actor.typed.javadsl.Behaviors;
import org.apache.pekko.actor.typed.javadsl.Receive;
// 1. Команды (Сообщения), которые понимает Актор
interface BankCommand {}
record Deposit(long amount) implements BankCommand {}
record Withdraw(long amount, ActorRef<String> replyTo) implements BankCommand {}
record GetBalance(ActorRef<Long> replyTo) implements BankCommand {}
// 2. Сам Актор
public class BankAccount extends AbstractBehavior<BankCommand> {
// ПРИВАТНОЕ состояние. Никаких volatile/atomic.
// Оно безопасно, так как доступ к нему строго последовательный.
private long balance = 0;
public static Behavior<BankCommand> create() {
return Behaviors.setup(BankAccount::new);
}
private BankAccount(ActorContext<BankCommand> context) {
super(context);
}
@Override
public Receive<BankCommand> createReceive() {
return newReceiveBuilder()
.onMessage(Deposit.class, this::onDeposit)
.onMessage(Withdraw.class, this::onWithdraw)
.onMessage(GetBalance.class, this::onGetBalance)
.build();
}
// Обработчик 1
private Behavior<BankCommand> onDeposit(Deposit cmd) {
balance += cmd.amount();
getContext().getLog().info("Депозит {}, баланс стал {}", cmd.amount(), balance);
return this; // Остаемся в том же поведении
}
// Обработчик 2
private Behavior<BankCommand> onWithdraw(Withdraw cmd) {
if (balance >= cmd.amount()) {
balance -= cmd.amount();
// Отправляем ответ назад отправителю
cmd.replyTo().tell("Успешно! Остаток: " + balance);
} else {
cmd.replyTo().tell("Недостаточно средств!");
}
return this;
}
// Обработчик 3
private Behavior<BankCommand> onGetBalance(GetBalance cmd) {
cmd.replyTo().tell(balance);
return this;
}
}Как это работает:
Даже если миллион потоков одновременно отправят сообщение Deposit, они выстроятся в очередь в Mailbox. Актор будет доставать их по одной и делать balance += amount. Никаких Race Conditions.
С приходом Java 21 часто спрашивают: «Нужны ли Акторы, если есть Виртуальные потоки?»
Ответ: Да, нужны, но для других целей.
- Virtual Threads решают проблему масштабируемости ввода-вывода (I/O). Они заменяют
ExecutorService, но не диктуют, как структурировать код. Вы все еще можете получить Deadlock или Race Condition, если используете общие переменные. - Actors решают проблему сложности взаимодействия и распределенности. Они дают модель мышления, где ошибки конкуренции исключены архитектурно.
Современный тренд: Запускать Акторы поверх Виртуальных потоков. Это дает лучшее из двух миров: дешевые потоки и безопасную модель данных.
Кривая обучения (Learning Curve).
- Вы не можете просто вызвать метод и получить
return. Вам нужно отправить сообщение, передать ссылку на себя (replyTo) и ждать ответного сообщения в своем обработчике. - Код становится разорванным (Request в одном месте, Response обрабатывается в другом).
- Отладка распределенной системы акторов — задача для сильных духом.
- Модель Акторов — это полная изоляция данных. Общение только через асинхронные сообщения.
- Идеально подходит для: Чатов, Игровых серверов, Систем умного дома (IoT), Финтеха (где ошибки синхронизации недопустимы).
- В Java используйте Apache Pekko (так как Akka изменила лицензию на платную).
- Это не замена Virtual Threads, а высокоуровневая абстракция над ними.
Примечание: В Java 21 этот API находится в стадии Incubator. Для запуска нужны флаги --add-modules jdk.incubator.vector.
SIMD (Single Instruction, Multiple Data) — это принцип, когда одна инструкция процессора выполняет операцию сразу над целым набором данных (вектором).
Представьте, что вам нужно сложить два массива: C[i] = A[i] + B[i].
- Скалярный подход (SISD):
- Загрузи
A[0], загрузиB[0], сложи, запиши вC[0]. - Загрузи
A[1], загрузиB[1], сложи, запиши вC[1]. - ...повторить 8 раз.
- Итог: 8 циклов процессора (условно).
- Векторный подход (SIMD):
- У процессора есть широкие регистры (например, AVX-512 на 512 бит). В такой регистр влезает сразу 16 чисел
int(по 32 бита). - Команда: "Загрузи 16 чисел из A, загрузи 16 чисел из B, сложи их все разом, запиши 16 чисел в C".
- Итог: 1 цикл процессора.
Вы получаете ускорение в 4, 8 или 16 раз без использования многопоточности.
Разве JIT-компилятор Java не делает это сам?
Делает, но плохо. Это называется Auto-vectorization. JIT пытается угадать, можно ли превратить ваш цикл for в векторные инструкции.
- Цикл слишком сложный? -> Отказ.
- Есть ветвление (
if) внутри? -> Отказ. - Зависимости между итерациями? -> Отказ.
Vector API дает нам гарантированный способ сказать процессору: "Используй AVX/NEON инструкции здесь и сейчас!".
Давайте сравним сложение двух массивов.
public void scalarSum(int[] a, int[] b, int[] c) {
for (int i = 0; i < a.length; i++) {
c[i] = a[i] + b[i];
}
}Код выглядит сложнее, но он работает на "железе" принципиально иначе.
package com.example.simd;
import jdk.incubator.vector.IntVector;
import jdk.incubator.vector.VectorSpecies;
public class VectorDemo {
// 1. Выбираем "Вид" вектора (Species).
// SPECIES_PREFERRED заставит Java выбрать самый широкий регистр,
// доступный на вашем CPU (например, 256 бит на обычном Intel, 512 на серверах).
static final VectorSpecies<Integer> SPECIES = IntVector.SPECIES_PREFERRED;
public void vectorSum(int[] a, int[] b, int[] c) {
int i = 0;
// Шаг цикла равен длине вектора (например, 8 для AVX-256)
// upperBound - это граница, до которой мы можем идти полными шагами
int upperBound = SPECIES.loopBound(a.length);
// 2. Основной цикл (Векторный)
for (; i < upperBound; i += SPECIES.length()) {
// Загружаем пачку данных из массива в векторный регистр
var va = IntVector.fromArray(SPECIES, a, i);
var vb = IntVector.fromArray(SPECIES, b, i);
// Одна операция сложения сразу над всей пачкой
var vc = va.add(vb);
// Выгружаем результат обратно в массив
vc.intoArray(c, i);
}
// 3. "Хвост" (Tail cleanup)
// Если длина массива (например, 10) не делится на длину вектора (8),
// у нас останется 2 элемента. Доделываем их обычным способом
// или с помощью маски (Mask).
for (; i < a.length; i++) {
c[i] = a[i] + b[i];
}
}
}На современных процессорах (поддержка AVX2 или AVX-512):
- Сложение массивов: ~8-16x быстрее.
- Перемножение матриц: ~10-30x быстрее.
- Поиск в массиве: ~4-8x быстрее.
Это критично для:
- Machine Learning на Java (инференс моделей).
- Криптографии (хеширование, шифрование).
- Обработки изображений и видео.
- Финансовой математики (расчет рисков).
Что если внутри цикла есть if? В скалярном коде процессор прыгает (branching). В SIMD прыгать нельзя — инструкция одна на всех.
Используется Masking (Предикаты). Логика такая: "Выполни сложение для всех 8 элементов, но результат запиши только для тех, где маска = true".
// Аналог: if (a[i] > 0) c[i] = a[i] + b[i];
// Создаем маску: true там, где a[i] > 0
var mask = va.compare(VectorOperators.GT, 0);
// Складываем, но применяем результат только там, где маска разрешает.
// В остальных ячейках останется старое значение из c (или 0).
va.add(vb).intoArray(c, i, mask);- Vector API — это способ писать код, который компилируется напрямую в супер-эффективные инструкции CPU (AVX, NEON, SVE).
- Это Data Parallelism (параллелизм данных), а не Task Parallelism (потоки).
- Используйте это для тяжелой математики. Для обычного перекладывания JSON из базы в контроллер это не даст выигрыша (и даже замедлит из-за подготовки данных).
- В Java 21 это все еще Incubator, API может меняться, но учить его стоит, если вы метите в High-Performance сектор.
Библиотека была создана инженерами лондонской биржи LMAX. Они обнаружили, что в их системе узким местом была не бизнес-логика, а сами очереди, передающие данные между потоками.
Классическая очередь (например, ArrayBlockingQueue) имеет две фундаментальные проблемы для High-Load:
- Contention (Конкуренция за блокировки):
Внутри очереди есть
lock(или CAS в неблокирующих очередях). Когда Producer и Consumer обращаются к «голове» или «хвосту» очереди, они дерутся за доступ к памяти. При высокой нагрузке процессоры тратят время на арбитраж, а не на работу. - Cache Misses (Промахи кеша):
В обычной очереди (
LinkedBlockingQueue) узлы разбросаны по куче (Heap). Процессор не может предсказать, где лежит следующий элемент, и постоянно лезет в медленную RAM. - Garbage Collection: Создание и удаление миллионов объектов-оберток для событий нагружает GC.
Дизайн Disruptor основан на принципе «Механической симпатии» — понимании того, как работает железо.
Вместо очереди используется массив фиксированного размера, замкнутый в кольцо.
- Предвыделение памяти (Pre-allocation):
Все объекты событий создаются один раз при старте.
Когда приходит новое событие, мы не делаем
new Event(), мы берем старый объект из ячейки массива и перезаписываем его поля.
- Результат: GC отдыхает. Мусора нет вообще.
- Локальность данных: Массив — это непрерывный блок памяти. Процессор обожает массивы: он подгружает их в кеш целыми линиями.
- Отсутствие блокировок (Lock-Free):
Вместо замков используются Sequences (счетчики типа
AtomicLong).
- Producer резервирует слот №5.
- Consumer ждет, пока счетчик не станет >= 5.
- Никаких
synchronized. Только барьеры памяти.
Disruptor требует немного больше кода для настройки, чем простая очередь, но результат того стоит.
*Зависимость Maven: com.lmax:disruptor*
package com.example.disruptor;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.util.DaemonThreadFactory;
import java.nio.ByteBuffer;
public class DisruptorDemo {
// 1. Событие (Данные)
// Это просто контейнер. Мы будем его переиспользовать вечно.
public static class OrderEvent {
private long id;
private double price;
public void set(long id, double price) {
this.id = id;
this.price = price;
}
@Override
public String toString() { return "Order{" + id + "=" + price + "}"; }
}
// 2. Потребитель (Consumer / Event Handler)
// Логика обработки события
public static class OrderHandler implements com.lmax.disruptor.EventHandler<OrderEvent> {
@Override
public void onEvent(OrderEvent event, long sequence, boolean endOfBatch) {
// Здесь происходит бизнес-логика
// System.out.println("Processing: " + event);
}
}
public static void main(String[] args) throws InterruptedException {
// Размер буфера (должен быть степенью двойки: 1024, 2048...)
int bufferSize = 1024;
// 3. Создаем Disruptor
Disruptor<OrderEvent> disruptor = new Disruptor<>(
OrderEvent::new, // Фабрика: как создавать пустые события при старте
bufferSize, // Размер кольца
DaemonThreadFactory.INSTANCE // Фабрика потоков
);
// 4. Подключаем обработчик
disruptor.handleEventsWith(new OrderHandler());
// 5. Запускаем
disruptor.start();
// 6. Получаем доступ к кольцу для публикации (Producer)
var ringBuffer = disruptor.getRingBuffer();
ByteBuffer bb = ByteBuffer.allocate(8);
System.out.println("Start publishing...");
long startTime = System.currentTimeMillis();
// Публикуем 10 миллионов событий
for (long l = 0; l < 10_000_000; l++) {
// Этап 1: Резервируем следующий слот (Sequence)
long sequence = ringBuffer.next();
try {
// Этап 2: Получаем объект по индексу (без создания нового!)
OrderEvent event = ringBuffer.get(sequence);
// Этап 3: Заполняем данными
event.set(l, 100.0);
} finally {
// Этап 4: Публикуем (делаем доступным для Consumer)
ringBuffer.publish(sequence);
}
}
long endTime = System.currentTimeMillis();
System.out.println("Done. 10M events in " + (endTime - startTime) + "ms");
disruptor.shutdown();
}
}Результат: На современном ноутбуке этот код обработает 10 миллионов событий менее чем за 1 секунду (в одном потоке). Стандартная очередь будет в 5-10 раз медленнее.
Disruptor работает быстрее всего, если у вас один поток-писатель (Single Producer). В этом случае даже CAS-операции не нужны. Писатель просто инкрементирует свой локальный счетчик и пишет в память. Это абсолютный предел скорости передачи данных между потоками.
Если писателей много (Multi Producer), Disruptor использует CAS, что чуть медленнее, но все равно быстрее очередей.
- Log4j 2: Вы когда-нибудь задумывались, почему Log4j2 быстрее, чем Logback или Log4j1? Потому что у него внутри есть режим Async Logger, который использует LMAX Disruptor для передачи логов в поток записи на диск.
- HFT (High Frequency Trading): Биржевые роботы.
- Event Sourcing (CQRS): Системы, где состояние — это последовательность миллионов мелких событий (Axon Framework).
- LMAX Disruptor — альтернатива очередям для экстремальной производительности.
- Ring Buffer + Pre-allocation = Ноль работы для GC.
- Padding защищает от False Sharing.
- Используйте, если вам нужны миллионы TPS или если GC паузы убивают ваше приложение.
Бонусная Глава 7: Полезные утилиты (Hidden Gems)
Это самая частая ошибка производительности в многопоточном коде.
Как работает java.util.Random?
Он потокобезопасен. Чтобы сгенерировать следующее случайное число, он должен обновить внутреннее состояние (seed).
// Внутри java.util.Random
protected int next(int bits) {
long oldSeed, nextSeed;
AtomicLong seed = this.seed;
do {
oldSeed = seed.get();
nextSeed = (oldSeed * multiplier + addend) & mask;
// CAS операция! Все потоки дерутся за одну переменную.
} while (!seed.compareAndSet(oldSeed, nextSeed));
return (int)(nextSeed >>> (48 - bits));
}Если 100 потоков одновременно вызывают random.nextInt(), 99 из них будут крутиться в CAS-цикле, прожигая CPU впустую.
Решение: ThreadLocalRandom (Java 7)
Каждый поток имеет свой собственный seed, который хранится прямо в объекте Thread (внутри JVM). Никакой синхронизации, никаких CAS.
import java.util.concurrent.ThreadLocalRandom;
public class RandomDemo {
public void doWork() {
// ПЛОХО:
// Random rand = new Random(); // Создание объекта дорого
// static Random rand = new Random(); // Конкуренция за seed
// ХОРОШО:
// Метод current() просто возвращает прокси, он не создает новый объект
int r = ThreadLocalRandom.current().nextInt(0, 100);
}
}Правило: В многопоточной среде никогда не используйте
new Random()или общий статическийRandom. ТолькоThreadLocalRandom.
Это секретное оружие создателей фреймворков (Spring, Hibernate, Jackson).
Проблема:
Вам нужно закешировать какие-то метаданные для класса. Например, вы написали ORM и хотите один раз просканировать класс User.class и запомнить, какие поля помечены аннотацией @Column.
- Вариант А (
Map<Class<?>, MetaData>): Нужна синхронизация (ConcurrentHashMap). Плюс, это держит жесткую ссылку на Класс, что мешает выгрузке классов (Class Unloading) и ведет к утечкам памяти в OSGi/Tomcat. - Вариант Б (
WeakHashMap): Медленно и ненадежно.
Решение: ClassValue (Java 7)
Позволяет лениво вычислить и привязать значение к конкретному классу. Значение хранится внутри самого класса (в его внутренней структуре в JVM), а не в карте. Когда класс выгружается GC, значение удаляется автоматически.
public class ORMIntrospector {
// Объявляем "вычислитель" метаданных
private static final ClassValue<String> TABLE_NAMES = new ClassValue<>() {
@Override
protected String computeValue(Class<?> type) {
// Этот код выполнится ОДИН раз для каждого класса
System.out.println("Analyzing " + type.getName());
if (type.isAnnotationPresent(Table.class)) {
return type.getAnnotation(Table.class).name();
}
return type.getSimpleName().toLowerCase();
}
};
public String getTableName(Object entity) {
// get() работает молниеносно (как доступ к полю)
return TABLE_NAMES.get(entity.getClass());
}
}До Java 9, если вы хотели делать атомарные операции над обычным полем (без AtomicInteger), или писать в память без проверок границ массива ради скорости, вам приходилось использовать sun.misc.Unsafe. Это закрытый API, который мог уронить JVM (Segmentation Fault).
VarHandle (Variable Handle) — это стандартизированный, типизированный и безопасный способ делать низкоуровневые вещи.
import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;
public class VarHandleDemo {
// Обычное поле, не AtomicInteger
private volatile int count = 0;
private static final VarHandle COUNT_HANDLE;
static {
try {
// Получаем "ручку" к полю count
COUNT_HANDLE = MethodHandles.lookup()
.findVarHandle(VarHandleDemo.class, "count", int.class);
} catch (Exception e) { throw new Error(e); }
}
public void increment() {
// Атомарный инкремент над обычным int полем!
// Раньше это было невозможно без Unsafe или AtomicIntegerFieldUpdater
COUNT_HANDLE.getAndAdd(this, 1);
}
public void setOpaque(int value) {
// Режим записи "Opaque" - слабее volatile, но быстрее.
// Гарантирует, что компилятор не выкинет запись,
// но не гарантирует видимость для других ядер мгновенно.
COUNT_HANDLE.setOpaque(this, value);
}
}Это инструмент для Architect уровня, когда вы пишете свои структуры данных.
Если вы пишете Spin-Lock (активное ожидание в цикле), как мы делали в главе про CAS:
while (!flag) {
// Пустой цикл
}Процессор сходит с ума. Он думает: "Это очень важный цикл!", разгоняет частоту, начинает спекулятивное выполнение инструкций, греется и потребляет батарею.
Решение: Подсказка процессору: "Брат, я просто жду. Расслабься".
while (!flag) {
// Компилируется в инструкцию PAUSE (на x86) или YIELD (на ARM)
Thread.onSpinWait();
}Это снижает энергопотребление и позволяет другому потоку на этом же гиперпоточном ядре (Hyper-Threading) получить больше ресурсов.