Это руководство предназначено для того, чтобы провести вас от полного непонимания Kafka до уровня, позволяющего проектировать архитектуру и писать код для высоконагруженных систем.
- Часть 1: Фундамент и Теория
- Часть 2: Настройка окружения (Docker)
- Часть 3: Практика разработки (Code Examples)
- Часть 4: Уровень Эксперта (Deep Dive)
- Часть 5: Экосистема (Beyond Code)
- Следующий шаг (Action Plan)
Kafka — это распределенная платформа потоковой передачи событий (Distributed Event Streaming Platform).
В её основе лежит Commit Log (журнал фиксации). В отличие от классических очередей (RabbitMQ), Kafka:
- Не удаляет сообщения сразу после прочтения.
- Хранит историю событий на диске заданное время (retention period).
- Оптимизирована для пропускной способности (Millions msg/sec).
Чтобы понять Kafka, нужно понять иерархию хранения данных:
- Topic (Топик): логическая категория сообщений (аналог таблицы в БД).
- Partition (Партиция): топик разбивается на шарды. Партиция — это единица параллелизма.
- Offset (Смещение): уникальный ID сообщения внутри партиции (0, 1, 2 ...). Гарантирует порядок строго внутри партиции.
- Producer: пишет данные в топик.
- Consumer: читает данные из топика.
- Broker: сервер, где хранятся данные.
- Cluster: группа брокеров.
- Leader & Follower: у каждой партиции есть один Лидер (принимает запись/чтение) и Фолловеры (просто копируют данные для надежности).
Мы используем современный режим KRaft (без Zookeeper), веб-интерфейс Kafka UI и Schema Registry.
Создайте файл docker-compose.yml и вставьте в него этот код:
version: "3"
services:
kafka:
image: bitnami/kafka:latest
container_name: kafka
ports:
- '9092:9092'
environment:
# Настройки KRaft
- KAFKA_CFG_NODE_ID=0
- KAFKA_CFG_PROCESS_ROLES=controller,broker
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
# Слушатели
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,INTERNAL://:29092
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092,INTERNAL://kafka:29092
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,INTERNAL:PLAINTEXT
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=INTERNAL
volumes:
- kafka_data:/bitnami/kafka
schema-registry:
image: confluentinc/cp-schema-registry:latest
container_name: schema-registry
depends_on:
- kafka
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'kafka:29092'
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
kafka-ui:
image: provectuslabs/kafka-ui:latest
container_name: kafka-ui
ports:
- "8080:8080"
depends_on:
- kafka
- schema-registry
environment:
KAFKA_CLUSTERS_0_NAME: local-kraft
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092
KAFKA_CLUSTERS_0_SCHEMAREGISTRY: http://schema-registry:8081
volumes:
kafka_data:- Выполните:
docker-compose up -d - Kafka UI: http://localhost:8080
- Адрес брокера для кода:
localhost:9092
Самый производительный вариант.
Зависимости (Maven): kafka-clients, slf4j-simple.
Producer:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<>("java_topic", "key1", "Hello Java"), (m, e) -> {
if (e == null) System.out.println("✅ Sent offset: " + m.offset());
});
producer.close();Consumer:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "java_group"); // Важно!
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("java_topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (var r : records) System.out.println("📩 Recv: " + r.value());
}Идеально для скриптов и ML. Библиотека: kafka-python.
Producer:
from kafka import KafkaProducer
import json
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda x: json.dumps(x).encode('utf-8')
)
producer.send('py_topic', {'event': 'login', 'user': 'admin'})
producer.flush()Consumer:
from kafka import KafkaConsumer
import json
consumer = KafkaConsumer(
'py_topic',
bootstrap_servers=['localhost:9092'],
group_id='py_group',
value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)
for msg in consumer:
print(f"📩 {msg.value}")Асинхронная обработка. Библиотека: kafkajs.
Producer:
const { Kafka } = require('kafkajs');
const kafka = new Kafka({ clientId: 'node-app', brokers: ['localhost:9092'] });
const producer = kafka.producer();
const run = async () => {
await producer.connect();
await producer.send({
topic: 'node_topic',
messages: [{ value: 'Hello Node' }],
});
await producer.disconnect();
};
run();Это механизм масштабирования Kafka.
- Все консьюмеры с одинаковым
group.idобъединяются в группу. - Kafka распределяет партиции между участниками группы.
- Правило: 1 партиция может читаться только 1 консьюмером внутри группы.
- Есть 4 партиции, 2 консьюмера: каждому по 2.
- Есть 4 партиции, 5 консьюмеров: один будет бездельничать (Idle).
- At most once (Максимум один раз):
acks=0. Сообщение может потеряться, но не дублируется. - At least once (Минимум один раз):
acks=all. Сообщение не потеряется, но может прийти дважды (нужна идемпотентность на клиенте). Это стандарт. - Exactly Once (Ровно один раз): требует транзакций и специальной настройки продюсера (
enable.idempotence=true).
Для топиков, хранящих состояние (например, "текущий адрес пользователя"), Kafka может удалять старые сообщения, оставляя только последнее значение для каждого ключа. Это экономит место.
Инструмент для интеграции без написания кода.
- Source: База данных → Kafka (например, через Debezium CDC).
- Sink: Kafka → Elasticsearch / S3 / Snowflake.
- Работает через конфигурационные JSON-файлы.
Решает проблему "грязных данных" и изменений форматов.
- Использует формат Avro или Protobuf.
- Хранит структуру данных (схему) отдельно от самих данных.
- Продюсер валидирует данные перед отправкой. Если поле
ageдолжно бытьint, а вы шлетеstring, отправка не пройдёт.
Библиотека Java для аналитики "на лету".
- Позволяет делать
map,filter,join,aggregation(подсчет сумм, средних значений) в реальном времени. - Примеры: Word Count, Fraud Detection.
Пример (Word Count на Kafka Streams):
KStream<String, String> text = builder.stream("input");
KTable<String, Long> counts = text
.flatMapValues(v -> Arrays.asList(v.split("\\W+")))
.groupBy((key, word) -> word)
.count();
counts.toStream().to("output");- Запустите Docker Compose.
- Создайте топик
test-topicс 3 партициями через Kafka UI. - Напишите Python Producer, который шлет туда случайные числа.
- Запустите 3 экземпляра Java Consumer (один и тот же код, запущенный в 3 разных терминалах) и посмотрите, как они поделят нагрузку.