Phase 2: Deep Dives | Category: Advanced Streaming

Why This Is Where Senior Candidates Stumble

Per DataInterview.com: “Exactly-once semantics questions are where most senior candidates stumble, even those with years of Kafka experience. The critical insight that separates strong candidates is recognizing that exactly-once is an end-to-end property, not just a Kafka feature.” You can enable every Kafka transaction flag and still have duplicates in your pipeline. Understanding why — and what actually closes the gap — is the senior signal.

The Three Delivery Guarantees

At-Most-Once: Fire and forget. Commit offset before processing. If consumer crashes after committing but before finishing processing → message is lost. No retry, no data loss detection.

Consumer
:  read message → commit offset → process             If crash here ↑, message lost forever

When to use: Metrics sampling where occasional loss is acceptable, mobile push notifications (app will reconcile on next open), logging where completeness isn’t critical. Low latency, zero overhead. Never use for financial data.

At-Least-Once: The safe default. Commit offset AFTER processing. If consumer crashes after processing but before committing → message reprocessed on restart.

Consumer
:  read message → process → commit offset             If crash here ↑, message redelivered → duplicate

When to use: Most production data pipelines. “No data loss” is non-negotiable; “occasional duplicate” is handleable with idempotent sinks. This is Kafka’s default.

Exactly-Once: Each message’s effect appears in the output exactly once, even across failures. No data loss AND no duplicates. The hardest guarantee in distributed systems.

The key subtlety: Exactly-once is not “guaranteed delivery exactly once” — it’s “guaranteed that the EFFECT of processing appears exactly once.” This is an important semantic distinction: the message may be processed multiple times internally, but its effect (a write to a database, an increment of a counter) happens exactly once.

Why Exactly-Once Is Hard: The Fundamental Problem

The challenge is atomicity across two systems. Consider this pipeline:

Kafka
(source) → consumer reads record → DB write → Kafka commit offset

Four possible crash points:

  1. Crash before DB write → restart, reprocess → OK (nothing happened yet)

  2. Crash after DB write, before offset commit → restart, reprocess → DUPLICATE DB WRITE

  3. Crash after offset commit, before DB write → offset moves forward, record never processed → DATA LOSS

  4. Crash during DB write (partial) → CORRUPTED STATE

Points 2 and 3 are the classic exactly-once problem. You can’t atomically commit to two independent systems (Kafka and a DB) in a single transaction — they don’t share a transaction coordinator.

Solutions:

  1. Make DB writes idempotent (handle point 2 — duplicates become no-ops)

  2. Use Kafka transactions to atomically commit offsets + output messages (for Kafka-to-Kafka pipelines)

  3. Two-phase commit (Flink’s approach for sinks that support transactions)

  4. Transactional outbox pattern (for DB sinks)

Building Blocks: How Exactly-Once Works in Kafka

1. Idempotent Producer (prevents duplicate sends)

Without idempotence: if a producer send fails and retries, Kafka may receive the message twice → two records in the topic.

With idempotence:

  • Broker assigns each producer a Producer ID (PID) + sequence number per partition
  • For each message: (PID, partition, sequence_number) is tracked by the broker
  • If a retry arrives with the same sequence number → broker silently deduplicates (one write in Kafka, not two)
  • Config: enable.idempotence=true (auto-enables with acks=all and max.in.flight=5)

Scope: Idempotent producer only prevents duplicates from producer retries within a single partition. It doesn’t handle consumer-side duplicates or cross-partition atomicity.

2. Kafka Transactions (atomic writes across partitions)

Kafka transactions allow a producer to write to multiple partitions atomically — either all messages commit, or none become visible. This also enables atomically writing output messages + committing consumer offsets in one transaction.

The consume-transform-produce pattern:

// Kafka-to-Kafka exactly-once (within Kafka only)  producer.initTransactions();  while (true) {      ConsumerRecords records = consumer.poll();      // Read from input topic      producer.beginTransaction();                   // Start transaction      for (record in records) {          output = transform(record);          producer.send(outputTopic, output);        // Write to output topic      }      // ATOMIC: both output messages and offset commit happen together      producer.sendOffsetsToTransaction(          currentOffsets,          consumer.groupMetadata()      );      producer.commitTransaction();                  // All visible, or nothing  }

If the process crashes after producer.send() but before commitTransaction() → transaction is aborted → output messages never become visible to downstream consumers (isolation.level=read_committed) → on restart, we reprocess from the last committed offset → output re-sent and committed atomically.

Consumer must set isolation.level=read_committed — otherwise it reads in-flight uncommitted messages that may later be aborted.

Limitation: Kafka transactions only guarantee exactly-once within Kafka. If your sink is a database, S3, or any external system, Kafka transactions alone are not enough.

Flink achieves exactly-once processing internally through checkpoint barriers — a distributed snapshot algorithm (Chandy-Lamport).

How checkpoint barriers work:

Kafka
Source      ↓  Flink Operator 1 (filter)      ↓  Flink Operator 2 (aggregate, stateful)      ↓  Sink
  1. Flink JobManager injects a checkpoint barrier into the stream between records

  2. Each operator, when it receives the barrier, pauses processing, snapshots its current state to durable storage (S3/HDFS), and passes the barrier downstream

  3. Once all operators and the sink have acknowledged the barrier (saved their state), the checkpoint is “complete”

  4. After a complete checkpoint, Flink advances the committed Kafka offset to the offset that corresponds to this snapshot

On failure:

  • Flink restores from the last complete checkpoint
  • All operator state is rolled back to checkpoint N
  • Kafka offset is also rolled back to checkpoint N
  • Processing resumes from that point — all records between checkpoint N and the crash point are reprocessed

Why this gives exactly-once internally: Each record updates operator state exactly once, because:

  • If the crash happens during processing → we roll back to before it happened → reprocess
  • If the crash happens after the checkpoint → state is preserved → no reprocessing needed

State backends:

  • In-memory (HeapStateBackend): fast, small state only
  • RocksDB (EmbeddedRocksDBStateBackend): for large state (TB-scale) — state spilled to local disk, checkpointed to S3

Flink’s checkpointing is exactly-once within the Flink pipeline. For the sink to also be exactly-once, the sink must support transactions. Flink uses Two-Phase Commit (2PC) to coordinate.

2PC protocol with Kafka sink:

PHASE
1 — PRE-COMMIT (during checkpoint):    Flink sink writes output records to Kafka, but in an OPEN TRANSACTION    (records are not yet visible to consumers reading with read_committed)    Sink snapshots: "I have transaction ID XYZ in flight"  PHASE 2 — COMMIT (after checkpoint completes):    JobManager: "Checkpoint N is complete"    All sinks receive this notification    Kafka sink: COMMIT transaction XYZ    → Records now visible to downstream consumers  ON FAILURE (before phase 2):    Flink restores from checkpoint N-1    Kafka sink: ABORT transaction XYZ (pre-commit records vanish)    Reprocessing from offset at checkpoint N-1    New transaction started → same records written again → committed at next checkpoint

Configuration:

on
env.enableCheckpointing
(10_000)  # checkpoint every 10 seconds  env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)  # Kafka sink with exactly-once  sink = KafkaSink.builder()      .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)      .setTransactionalIdPrefix("flink-pipeline")      .build()

The timeout constraint: Kafka’s transaction.max.timeout.ms (default: 15 minutes) must be larger than Flink’s checkpoint interval × max_retries. If a checkpoint takes longer than the transaction timeout, the Kafka broker aborts the open transaction, causing data loss. This is why very long checkpoint intervals are dangerous with exactly-once Kafka sinks.

For sinks that don’t support transactions (S3, most REST APIs, DynamoDB), you can’t use 2PC. The solution: at-least-once + idempotent writes.

Idempotent S3 sink:

on

Output path is deterministic based on the processing key + checkpoint ID  output_path = f"s3://output/orders/{order_date}/checkpoint_{checkpoint_id}/"  # On reprocessing after failure:  # Same checkpoint_id → same output path → overwrites previous partial write  # Result: exactly-once EFFECT (idempotent overwrites)

Idempotent database sink (MERGE INTO):

--
Flink writes with at-least-once, but DB handles duplicates  MERGE INTO target_table t  USING staging s ON t.event_id = s.event_id  WHEN MATCHED THEN UPDATE SET t.value = s.value  WHEN NOT MATCHED THEN INSERT VALUES (s.event_id, s.value, s.timestamp)

Duplicate events from at-least-once delivery are silently handled by the MERGE — the same event_id updates the same row, producing the same final state. The effect is exactly-once even though the message was processed multiple times.

The Decision Framework: Which Guarantee to Use

Is
data loss tolerable?  ├── YES → At-most-once (metrics sampling, best-effort notifications)  └── NO → continue...  Are duplicate effects tolerable?  ├── YES (after deduplication) → At-least-once + idempotent sink  │   Examples: clickstream analytics, log aggregation, most batch-friendly pipelines  │   Cost: low overhead, standard Kafka  └── NO (even brief duplicates unacceptable) →      Does your pipeline stay within Kafka?      ├── YES → Kafka transactions (consume-transform-produce)      └── NO (external DB/S3 sink) →          Does the sink support transactions?          ├── YES (JDBC, Kafka) → Flink 2PC + transactional sink          └── NO (S3, REST) → Flink checkpointing + idempotent sink writes                              (at-least-once delivery + exactly-once effect)

The senior answer pattern: “True exactly-once across Kafka and an external database requires either making the DB writes idempotent and accepting at-least-once delivery (my preferred approach — simpler, more resilient), or using a transactional outbox pattern where writes to Kafka and DB happen in the same local DB transaction. I’d use Flink 2PC only when the sink is Kafka itself, because the 2PC overhead and transaction timeout constraints are only worth it when you can’t make your sink idempotent.”

The Cost of Exactly-Once

Per Google Cloud Dataflow blog: “The storage and read costs incurred to implement exactly-once metadata deduplication turn out to be quite expensive, especially in pipelines that otherwise perform very little I/O.” Dataflow explicitly offers both modes and recommends at-least-once for many use cases.

Quantifiable costs of exactly-once:

  • Checkpoint storage overhead: state snapshots written to S3 every checkpoint interval
  • Checkpoint barrier alignment: operators wait for barriers from all parallel sources → adds latency
  • Transaction coordination: Kafka transaction coordinator overhead (~5-15% throughput reduction)
  • Pre-commit buffering: records held in open transactions until checkpoint completes → increases end-to-end latency

When the cost is worth it:

  • Financial transactions (duplicate payment = real money problem)
  • Billing/invoicing (duplicate record = overcharge)
  • Legal/compliance audit trails (duplicate entry invalidates audit)
  • Exactly-once aggregations (sum, count — duplicates corrupt the math)

When at-least-once + dedup is better (most cases):

  • Analytics and dashboards (1-5 minutes to converge is fine)
  • ML training data (dedup in preprocessing is standard)
  • Log aggregation (near-zero tolerance for loss, high tolerance for occasional duplicate)

Interview Questions

Q1: “A team claims they have end-to-end exactly-once because they commit Kafka offsets only after writing to S3, and they turn on retries for uploads. Is this correct?”

Model Answer (from DataInterview.com): “No — this gives at-least-once, not exactly-once. Here’s why. If the consumer writes to S3 successfully and then crashes before committing the offset, Kafka sees the offset as uncommitted. On restart, the consumer will reprocess the same records from the last committed offset and write them to S3 again. The S3 write succeeded twice. The claim of exactly-once relies on an implicit assumption that S3 write semantics make the second write a no-op — which is only true if the output path is deterministic. If the write path includes any timestamp or run-based suffix (like s3://bucket/output/run_{timestamp}/), the second write creates a new file → duplicate data. ‘Exactly-once effects’ from this approach requires that the S3 path and file content are identical on retry — idempotent writes via deterministic output paths. If that’s guaranteed, you have exactly-once effects from at-least-once delivery + idempotent writes. If not, you have at-least-once with potential duplicates. I’d redesign: use Flink checkpointing with a deterministic output path keyed by checkpoint_id + partition, so any reprocessed window overwrites the previous partial result atomically.”

Q2: “Your pipeline is Kafka → Flink → payment ledger database. The business requires exactly-once. What do you guarantee and how?”

Model Answer: “Let me be precise about what ‘exactly-once’ means here. Inside the Flink pipeline: Flink’s checkpoint-based exactly-once guarantees each record updates operator state exactly once — this is achieved by the checkpoint barrier protocol and RocksDB state backend writing snapshots to S3. At the Kafka source: Flink stores Kafka offsets in its checkpointed state, so on recovery it reads from the exact offset at the last complete checkpoint. The hard part is the database sink. Two options. Option 1 (preferred): at-least-once delivery to the database + idempotent DB writes. Every payment record has an event_id. The DB write is: MERGE INTO payments ON event_id = [event_id] — a duplicate reprocessing silently becomes an UPDATE SET (no change) instead of an INSERT. The payment ledger shows each payment exactly once. Option 2: Flink 2PC with a JDBC transactional sink. Flink pre-commits DB writes inside an open transaction during the checkpoint barrier pass. On checkpoint completion, the transaction is committed. On failure, the open transaction is aborted and reprocessed. This provides true exactly-once but adds latency (DB writes are only visible after checkpoint completion, typically 10-30 seconds) and introduces coupling between Flink checkpoint timing and DB transaction timeouts. For a payment ledger, I’d implement Option 1 with strict idempotency checks and a unique constraint on event_id — it’s simpler, more resilient to checkpoint configuration errors, and the operational behavior on failure is easier to reason about. Option 2’s transaction timeout interaction with Flink checkpoint intervals is a subtle production failure mode that I’d want to avoid for mission-critical payment infrastructure.”

Think About This

You’re in a Netflix interview. The prompt: “Netflix processes viewing events at 17M events/sec through Flink. A Flink job crashed and recovered. How do you know that the downstream analytics tables don’t have duplicates?”

Walk through:

  1. What did Flink do on crash? (Restored from last checkpoint — all operator state, Kafka offsets, and any in-flight Kafka sink transactions were rolled back to checkpoint N)

  2. What happened to in-flight Kafka sink writes? (2PC: the open Kafka transaction was aborted. Records written but not committed vanished. Downstream consumers with isolation.level=read_committed never saw them.)

  3. What about the analytics tables? (Analytics tables read from the output Kafka topic, which only received committed records. No aborted transactions visible → no duplicates in analytics tables.)

  4. What’s the gap between checkpoint N and the crash? (Events between checkpoint N and the crash need to be reprocessed. Flink starts from offset at checkpoint N and reprocesses. New transactions committed → correct records now in the output topic → analytics tables correct.)

  5. What if checkpoints were every 60 seconds and the crash happened after 58 seconds? (At most 58 seconds of reprocessing. This is the latency penalty of exactly-once — the “recovery gap” equals the checkpoint interval. For Netflix’s real-time dashboards, 60-second checkpoint intervals mean up to 60 seconds of stale data during recovery, which is acceptable for analytics.)

Quick Reference

  • Three guarantees: At-most-once (lose data, never duplicate), At-least-once (never lose, may duplicate → requires idempotent sinks), Exactly-once (never lose, never duplicate effects — expensive, complex)
  • Idempotent producer: enable.idempotence=true — prevents duplicate sends from producer retries. PID + sequence number per partition. Not enough alone for end-to-end exactly-once.
  • Kafka transactions: Atomic writes across multiple partitions + consumer offset commits. Enables exactly-once Kafka-to-Kafka. Consumer needs isolation.level=read_committed.
  • Flink checkpointing: Barrier alignment → snapshot all operator state → S3. On failure: restore from checkpoint, roll back Kafka offsets, abort open sink transactions.
  • Flink 2PC: Pre-commit to transactional sink during barrier pass → commit after all operators acknowledge checkpoint. Adds latency = up to one checkpoint interval.
  • The production default: At-least-once + idempotent writes. Simpler, lower latency, no transaction timeout constraints. Reserve 2PC for Kafka-to-Kafka pipelines or sinks with native transaction support.
  • Exactly-once across Kafka + DB: Must use idempotent DB writes (MERGE with event_id key) OR transactional outbox pattern. Kafka transactions alone don’t help if the sink is an external DB.

Tomorrow’s Preview

Day 45: Stream-Table Duality & Materialized Views — How streams and tables are two views of the same data, KSQL, Flink SQL, materialized views over streams, event sourcing and CQRS patterns, and how streaming databases like Materialize change the data engineering architecture.