The single most important thing to understand about Kafka is that it is a distributed, append-only log , not a message queue. This distinction shapes every design decision.
In a traditional queue (RabbitMQ, SQS), a message exists until it is consumed and acknowledged. Once a consumer processes it, it is gone. The queue is a delivery mechanism.
Kafka’s log is different. Messages are written sequentially to a durable, ordered structure. Consumers read from the log by tracking their own position (offset). The message is not removed after consumption. Multiple consumers can read the same message independently, at different speeds, at different points in time.
This changes everything:
Think of Kafka as a commit log for your distributed system. The log is the source of truth. Everything downstream is a projection of that log.
A topic is a named, logical channel for a stream of records. Producers write to topics. Consumers read from topics. A topic has no inherent type , it holds raw bytes. Schema is enforced by convention (Avro + Schema Registry, Protobuf, JSON).
Topics are the unit of organization. In practice, naming conventions matter: payments.transactions, user.events.signup, inventory.updates are better than topic1.
Every topic is divided into one or more partitions. Each partition is an independent, ordered, immutable sequence of records. This is the atomic unit of parallelism in Kafka.
Topic: payments.transactions
Partition 0: [offset 0] [offset 1] [offset 2] ...
Partition 1: [offset 0] [offset 1] [offset 2] ...
Partition 2: [offset 0] [offset 1] [offset 2] ... Ordering is guaranteed within a partition. There is no ordering guarantee across partitions. If you need global ordering, use a single partition , at the cost of throughput.
Partition count is set at topic creation and is hard to change later. Under-partition early and you will hit throughput ceilings. Over-partition and you pay in memory, file descriptors, and replication overhead. A common starting point is one partition per expected consumer in your consumer group, with headroom.
A broker is a single Kafka server process. A Kafka cluster is a group of brokers. Each broker stores a subset of partitions. No single broker holds all data for a topic (unless the cluster has one broker, which you should never do in production).
Brokers are identified by an integer node.id. Clients discover the full cluster topology via any broker, you only need one bootstrap address.
Producers publish records to topics. Each record consists of:
The key drives partitioning. If a key is provided, Kafka hashes it (murmur2 by default) and routes the record to a deterministic partition. Same key always goes to the same partition , this is how you preserve ordering for a logical entity (e.g., all events for user_id=42).
If no key is provided, Kafka distributes records across partitions using round-robin or sticky partitioning (default since 2.4).
Consumers read records from partitions by polling. They maintain an offset , the position of the next record to be read. Offsets are per-partition, per-consumer-group.
Committing an offset marks that all records up to that position have been processed. Kafka stores committed offsets in an internal topic: __consumer_offsets.
Two commit strategies:
enable.auto.commit=true): Kafka commits periodically. Risk: records may be marked as processed before your code actually handles them (at-most-once delivery on crash).A consumer group is a set of consumers that jointly consume a topic. Kafka assigns each partition to exactly one consumer within the group. This is the parallelism mechanism.
Topic: payments.transactions (3 partitions)
Consumer Group: payment-processor
Consumer A -> Partition 0
Consumer B -> Partition 1
Consumer C -> Partition 2 If you add a fourth consumer, it sits idle , there are only 3 partitions. If you remove Consumer B, Kafka rebalances and redistributes Partition 1 to A or C.
Multiple independent consumer groups can read the same topic simultaneously, each maintaining their own offsets. A logging pipeline and an analytics pipeline can consume the same event stream without interfering with each other.
An offset is a monotonically increasing integer that uniquely identifies a record within a partition. Offsets are immutable and gap-free within a partition.
Key offset concepts:
user_id=42 to topic user.events.batch.size, linger.ms) and sends to the partition leader on Broker 2.sendfile(2) and page cache aggressively. This is why Kafka is fast even on spinning disks.acks and ISR).acks setting.analytics polls the leader of Partition 1, receives the record at its current offset.__consumer_offsets.Partitioning is where many Kafka designs go wrong. Getting it right requires understanding two competing goals: ordering and parallelism.
Kafka only guarantees ordering within a partition. If you have events that must be processed in order relative to each other (e.g., all state changes for a bank account), they must go to the same partition. The mechanism is the message key.
# All events for account_id go to the same partition
producer.produce(
topic='account.events',
key=str(account_id).encode(),
value=event_payload
) Choosing the wrong key , or using no key at all , will scatter related events across partitions and break ordering.
The maximum parallelism for a consumer group equals the number of partitions. You cannot have more active consumers than partitions. This means partition count is a capacity decision made at topic creation time.
If you start with 4 partitions and later need 8 consumers, you must increase partitions , which triggers a repartition of existing records and can break key-based ordering for a period.
Sometimes the default hash partitioner is insufficient. If one key is extremely hot (e.g., a large tenant), all its traffic goes to one partition, creating a hot spot. Custom partitioners can distribute within a tenant, or route to dedicated partitions for VIP tenants.
Replication is Kafka’s durability mechanism. Understanding it deeply is essential for configuring your cluster correctly.
Every partition has one leader and zero or more followers. All reads and writes go through the leader. Followers exist solely to replicate data and serve as failover candidates.
When a leader fails, Kafka elects a new leader from the pool of followers. The election process depends on which replicas are in the ISR.
The ISR is the set of replicas that are caught up with the leader within a configurable lag threshold (replica.lag.time.max.ms, default 30s). A follower is removed from the ISR if it falls too far behind.
The ISR is critical because:
acks=all waits for all ISR members to acknowledgemin.insync.replicas sets the minimum ISR size required to accept writesIf min.insync.replicas=2 and only one replica is in-sync, writes will fail. This is intentional , it prevents data loss at the cost of availability.
acks)acks controls when the producer considers a write successful:
| Setting | Behavior | Durability | Latency |
|---|---|---|---|
acks=0 | Fire and forget. No acknowledgment. | Lowest | Lowest |
acks=1 | Leader acknowledges. Followers not guaranteed. | Medium | Medium |
acks=all | All ISR members acknowledge. | Highest | Higher |
acks=0 is appropriate only for telemetry or metrics where some loss is acceptable. For financial or critical data, always use acks=all paired with min.insync.replicas=2 and replication.factor=3.
# Topic-level
replication.factor: 3
min.insync.replicas: 2
# Producer-level
acks: all
enable.idempotence: true
retries: 2147483647
max.in.flight.requests.per.connection: 5 enable.idempotence=true ensures exactly-once delivery from the producer side by deduplicating retried messages using a producer ID and sequence number.
Rebalancing is the process of reassigning partitions among consumers in a group. It is triggered by:
By default, Kafka uses eager rebalancing: all consumers stop, all partition assignments are revoked, then partitions are reassigned from scratch. During a rebalance, no consumer in the group processes any messages. For large groups or slow rebalances, this can cause noticeable consumer lag spikes.
Kafka 2.4+ introduced cooperative rebalancing via the CooperativeStickyAssignor. Instead of revoking all partitions, only the partitions that need to move are revoked and reassigned. Consumers keep their other partitions and continue processing during the rebalance.
consumer = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'my-group',
'partition.assignment.strategy': 'cooperative-sticky',
}) For workloads where restarts are frequent (e.g., Kubernetes rolling deployments), static group membership (group.instance.id) prevents unnecessary rebalances. When a consumer with a known instance ID rejoins within session.timeout.ms, it reclaims its previous partition assignments without triggering a full rebalance.
consumer = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'payment-processor',
'group.instance.id': 'payment-processor-pod-3',
'session.timeout.ms': 60000,
}) # Consumer settings
session.timeout.ms: 45000 # How long before broker considers consumer dead
heartbeat.interval.ms: 15000 # How often consumer sends heartbeats (1/3 of session timeout)
max.poll.interval.ms: 300000 # Max time between polls before consumer is ejected If your processing logic is slow, increase max.poll.interval.ms. If it exceeds this threshold, the consumer is removed from the group and a rebalance triggers , even though the consumer is technically alive.
Kafka is not infinite. You must configure how long data lives. Three strategies:
log.retention.hours: 168 # 7 days (default)
log.retention.ms: 604800000 # Same, in milliseconds (takes precedence) Records older than the retention period are deleted. Deletion happens at the log segment level, not per-record. A segment is only deleted when all records within it are older than the retention threshold.
log.retention.bytes: 10737418240 # 10 GB per partition When total partition size exceeds this threshold, old segments are deleted. Time and size retention can be combined , whichever limit is hit first triggers deletion.
Compaction is fundamentally different from deletion. Instead of discarding old records, Kafka retains the latest record for each key. This turns a topic into a compacted key-value store , the topic holds the current state of each entity.
log.cleanup.policy: compact
log.min.cleanable.dirty.ratio: 0.5
log.segment.bytes: 1073741824 # 1 GB Compacted topics are used for:
A tombstone record (a record with the key set and value null) signals that a key should be deleted during compaction.
You can combine both: log.cleanup.policy=compact,delete retains the latest value per key but also enforces a time-based deletion floor.
Historically, Kafka relied on Apache ZooKeeper for:
ZooKeeper is a separate distributed system with its own operational burden: separate JVM processes, separate monitoring, separate failure modes. It also imposed a practical limit on partition count (~200k partitions per cluster due to ZooKeeper read amplification on startup).
ZooKeeper mode is deprecated as of Kafka 3.5 and will be removed in a future release.
KRaft (Kafka Raft) embeds metadata management directly into Kafka using the Raft consensus protocol. Introduced in Kafka 2.8, production-ready since 3.3, and now the only supported mode for new deployments.
Benefits:
In KRaft, one or more brokers take the controller role. Controllers use Raft to elect a leader among themselves and replicate metadata. Brokers fetch metadata from controllers via the internal MetadataFetch protocol.
Combined mode (one process acts as both controller and broker) is suitable for development and small clusters:
KAFKA_CFG_PROCESS_ROLES: controller,broker Separated mode (dedicated controller nodes) is recommended for production clusters with high partition counts:
# Controller nodes
KAFKA_CFG_PROCESS_ROLES: controller
# Broker nodes
KAFKA_CFG_PROCESS_ROLES: broker This compose file runs a single-node KRaft cluster , suitable for local development and integration testing.
version: "3.8"
services:
kafka:
image: bitnami/kafka:3.7
container_name: kafka
ports:
- "9092:9092"
- "9094:9094"
environment:
# KRaft mode: this broker is also the controller
KAFKA_CFG_NODE_ID: "1"
KAFKA_CFG_PROCESS_ROLES: "controller,broker"
# Raft quorum: node 1 is the sole controller
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: "1@kafka:9093"
# Listeners:
# PLAINTEXT -> clients inside the compose network
# EXTERNAL -> clients on the host machine
# CONTROLLER -> internal Raft communication
KAFKA_CFG_LISTENERS: "PLAINTEXT://:9092,EXTERNAL://:9094,CONTROLLER://:9093"
KAFKA_CFG_ADVERTISED_LISTENERS: "PLAINTEXT://kafka:9092,EXTERNAL://localhost:9094"
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: "PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT"
KAFKA_CFG_CONTROLLER_LISTENER_NAMES: "CONTROLLER"
KAFKA_CFG_INTER_BROKER_LISTENER_NAME: "PLAINTEXT"
# Replication and durability
KAFKA_CFG_DEFAULT_REPLICATION_FACTOR: "1"
KAFKA_CFG_MIN_INSYNC_REPLICAS: "1"
KAFKA_CFG_NUM_PARTITIONS: "3"
# Log retention: 7 days, up to 10 GB per partition
KAFKA_CFG_LOG_RETENTION_HOURS: "168"
KAFKA_CFG_LOG_RETENTION_BYTES: "10737418240"
KAFKA_CFG_LOG_SEGMENT_BYTES: "1073741824"
# Required for bitnami image auto-initialization
ALLOW_PLAINTEXT_LISTENER: "yes"
volumes:
- kafka_data:/bitnami/kafka
kafka-ui:
image: provectuslabs/kafka-ui:latest
container_name: kafka-ui
ports:
- "8080:8080"
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
depends_on:
- kafka
volumes:
kafka_data: For a production-grade multi-broker KRaft cluster, run 3 controller nodes separately and 3+ broker nodes, with replication.factor=3 and min.insync.replicas=2.
confluent-kafka wraps librdkafka , the C client , and is significantly more performant than kafka-python. Use it for anything production-facing.
from confluent_kafka import Producer, Consumer, KafkaError, KafkaException
from confluent_kafka.admin import AdminClient, NewTopic
import json
import logging
logging.basicConfig(level=logging.INFO)
log = logging.getLogger(__name__)
BOOTSTRAP_SERVERS = "localhost:9094"
TOPIC = "user.events"
# --- Admin: create topic if not exists ---
def ensure_topic(topic: str, num_partitions: int = 3, replication_factor: int = 1):
admin = AdminClient({"bootstrap.servers": BOOTSTRAP_SERVERS})
metadata = admin.list_topics(timeout=5)
if topic in metadata.topics:
return
futures = admin.create_topics([
NewTopic(topic, num_partitions=num_partitions, replication_factor=replication_factor)
])
for t, f in futures.items():
try:
f.result()
log.info("Topic %s created", t)
except Exception as e:
log.error("Failed to create topic %s: %s", t, e)
# --- Producer ---
def delivery_report(err, msg):
"""Callback invoked by librdkafka on delivery result."""
if err:
log.error("Delivery failed for key %s: %s", msg.key(), err)
else:
log.debug(
"Delivered to %s [%d] at offset %d",
msg.topic(), msg.partition(), msg.offset()
)
def run_producer():
producer = Producer({
"bootstrap.servers": BOOTSTRAP_SERVERS,
"acks": "all",
"enable.idempotence": True,
"retries": 5,
"retry.backoff.ms": 500,
"compression.type": "snappy",
"linger.ms": 10, # Wait up to 10ms to batch records
"batch.size": 65536, # 64 KB batch size
})
events = [
{"user_id": "u-001", "action": "signup", "ts": 1712000000},
{"user_id": "u-002", "action": "login", "ts": 1712000001},
{"user_id": "u-001", "action": "purchase", "ts": 1712000002},
]
for event in events:
key = event["user_id"].encode()
value = json.dumps(event).encode()
# produce() is non-blocking; delivery_report fires asynchronously
producer.produce(TOPIC, key=key, value=value, on_delivery=delivery_report)
# poll() allows librdkafka to invoke delivery callbacks
producer.poll(0)
# Block until all outstanding messages are delivered
remaining = producer.flush(timeout=30)
if remaining > 0:
log.warning("%d messages were not delivered", remaining)
# --- Consumer ---
def run_consumer(group_id: str = "analytics-pipeline"):
consumer = Consumer({
"bootstrap.servers": BOOTSTRAP_SERVERS,
"group.id": group_id,
"auto.offset.reset": "earliest",
"enable.auto.commit": False, # Manual commit for at-least-once
"max.poll.interval.ms": 300000,
"session.timeout.ms": 45000,
"heartbeat.interval.ms": 15000,
"partition.assignment.strategy": "cooperative-sticky",
"fetch.min.bytes": 1,
"fetch.max.wait.ms": 500,
})
consumer.subscribe([TOPIC])
log.info("Consumer started, group=%s", group_id)
try:
while True:
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
# Reached end of partition , not an error
log.debug("Reached end of %s[%d]", msg.topic(), msg.partition())
else:
raise KafkaException(msg.error())
continue
event = json.loads(msg.value().decode())
log.info(
"Received: partition=%d offset=%d key=%s event=%s",
msg.partition(), msg.offset(), msg.key().decode(), event
)
# Process the event here...
# Commit after processing , at-least-once delivery
consumer.commit(message=msg, asynchronous=False)
except KeyboardInterrupt:
log.info("Shutting down consumer")
finally:
# Always close , this commits offsets and triggers a clean rebalance
consumer.close()
if __name__ == "__main__":
ensure_topic(TOPIC)
run_producer()
run_consumer() Key points in this example:
enable.idempotence=True with acks=all for exactly-once producer semanticslinger.ms and batch.size enable batching , critical for throughputconsumer.close() is always called , it commits offsets and signals a clean group leave, preventing an unnecessary rebalance timeoutcooperative-sticky assignment minimizes rebalance disruptionSymptom: Consumer offset falls further and further behind the log-end offset. The consumer is slower than the producer.
Causes:
max.poll.records is set too high , consumer fetches more than it can process within max.poll.interval.msRemedies:
max.poll.records and increase max.poll.interval.msMonitoring: Track kafka.consumer.lag (or records-lag-max JMX metric) per consumer group and partition. Alert when lag exceeds a threshold that represents your SLA for freshness.
Symptom: Consumers continuously rebalance, never settling. Throughput collapses. Logs fill with Rebalancing... entries.
Causes:
max.poll.interval.ms , consumer is ejected and rejoins, triggering another rebalanceRemedies:
max.poll.interval.ms to accommodate your slowest processing pathgroup.instance.id (static membership) to survive pod restarts without rebalancecooperative-sticky assignorSymptom: MessageSizeTooLargeException on the producer, or FetchResponseTooLargeException on the consumer.
Default limits:
message.max.bytes=1048588 (~1 MB)fetch.max.bytes=52428800 (50 MB), max.partition.fetch.bytes=1048576 (1 MB)Remedies (in order of preference):
snappy, lz4, zstd) , often reduces message size dramaticallyIf unclean.leader.election.enable=true (default false since Kafka 0.11), Kafka may elect an out-of-sync replica as leader when no ISR member is available. This recovers availability at the cost of data loss.
Never enable this for financial or critical data. Accept the unavailability until an ISR member recovers.
Symptom: Messages are processed multiple times, or skipped entirely.
Use enable.auto.commit=false and commit only after successful processing. Design consumers to be idempotent , safe to process the same message twice.
Log compaction seems like a clean solution for maintaining current state, but it has sharp edges:
log.cleaner.min.compaction.lag.ms setting prevents records newer than a threshold from being compacted. Records in the “dirty” head of the log are not yet compacted.delete.retention.ms to allow consumers to observe the deletion.Design around these realities: consumers of compacted topics must handle seeing a key multiple times and always apply the latest value.
Kafka’s operational overhead is non-trivial. Choose it deliberately.
Redis Streams support consumer groups, offset tracking, and pending entry lists. For many microservice communication patterns, they are entirely sufficient.
Kafka is the right choice when you are building a data platform, not just connecting two services. If you are wiring one service to another, a simpler queue almost always serves you better. Kafka’s power comes from treating events as a persistent, replayable record of what happened , and from enabling many consumers to independently derive meaning from that record.
More examples and hands-on experiments: kafka learning repo