Phase 2: Deep Dives | Category: Cost & Performance

The Senior-Level Framing: Depth Over Breadth

At senior level, the test isn’t “can you fix it?” — it’s “do you understand why it breaks?” When a Spark job slows down, the senior answer isn’t “add more memory” — it’s identifying root cause (skew, shuffle spill, IO regression) and applying a targeted fix.

The Systematic Diagnosis Framework

Before touching configs or code, diagnose.

Step 1: MEASURE baseline
  - Current runtime vs expected runtime
  - Peak resource utilization (CPU, memory, disk, network)
  - Throughput (records/sec, MB/sec)

Step 2: IDENTIFY bottleneck category
  A. CPU-bound: high CPU; tasks barely finish
  B. Memory-bound: GC pauses, OOM, shuffle spill
  C. I/O-bound: low CPU; time spent reading/writing
  D. Network-bound: high shuffle read/write, cross-AZ transfer
  E. Skew: 99% tasks fast; 1% take 100× longer
  F. Serialization: high serialization time in Spark UI

Step 3: FIX the specific bottleneck
  - Each category has specific fixes
  - Don’t throw resources at it until you know root cause

Step 4: MEASURE after fix
  - Never declare victory without measurement

Spark Performance Tuning: Advanced Scenarios

Day 9 covered fundamentals. This goes deeper on senior-level scenarios.

Memory Management: The OOM Anatomy

Spark executor memory under the Unified Memory Model:

Total executor memory = spark.executor.memory (e.g., 8 GB)
  ├── Reserved memory (~300 MB) — Spark system use
  ├── User memory = (8 GB - 300 MB) × (1 - spark.memory.fraction)
  │   Default fraction: 0.6 → user memory ~40%
  │   Purpose: UDFs, data structures, Python workers
  └── Spark memory = (8 GB - 300 MB) × spark.memory.fraction
      Default: ~60%
      ├── Execution memory (shuffle, sort, joins, aggs)
      └── Storage memory (cache, broadcast)
      Execution/storage share the pool dynamically

OOM scenario 1: Executor OOM during shuffle (most common)

Symptoms:

java.lang.OutOfMemoryError: GC overhead limit exceeded
Unable to acquire X bytes of memory

Root causes:

  • Skewed key creates one giant partition (e.g., 50 GB) while others are tiny
  • Too few shuffle partitions for dataset size (default 200)

Fixes:

# Fix 1: increase shuffle partitions
spark.conf.set("spark.sql.shuffle.partitions", "2000")
# Rule of thumb: target ~100–300 MB per shuffle partition

# Fix 2: enable AQE (Spark 3.x)
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")

# Fix 3: salt skewed keys if AQE isn’t sufficient

OOM scenario 2: Driver OOM

Symptoms: OOM in driver logs (not executor), often after collect() / toPandas() / large broadcasts.

Root cause: pulling too much data to driver RAM.

Fixes:

# BAD: collect entire dataset to driver
result = df.filter(...).collect()
for row in result:
    process(row)

# GOOD: process on workers; write output
df.filter(...).write.parquet("s3://output/")

# If you must collect, bound it
df.filter(...).limit(1000).collect()

# Driver guardrails (when needed)
# spark.conf.set("spark.driver.memory", "4g")
# spark.conf.set("spark.driver.maxResultSize", "2g")

Shuffle Optimization: Reducing the Most Expensive Operation

AQE (Adaptive Query Execution) — enable it broadly in Spark 3.x:

spark.conf.set("spark.sql.adaptive.enabled", "true")

# 1) Coalesce small post-shuffle partitions
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.minPartitionSize", "64m")

# 2) Automatically handle skewed joins
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "256m")

# 3) Optimize shuffle reads
spark.conf.set("spark.sql.adaptive.localShuffleReader.enabled", "true")

Reduce shuffle through better data organization

If two datasets are frequently joined on the same key, co-organize them.

orders_df.write \
    .partitionBy("order_date") \
    .bucketBy(100, "customer_id") \
    .sortBy("customer_id") \
    .saveAsTable("silver.orders_bucketed")

customers_df.write \
    .bucketBy(100, "customer_id") \
    .sortBy("customer_id") \
    .saveAsTable("silver.customers_bucketed")

result = orders_df.join(customers_df, "customer_id")

Serialization: Kryo vs Java

Kryo is faster and smaller than default Java serialization (most relevant for RDD APIs, custom objects, and large broadcasts).

spark.conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
spark.conf.set("spark.kryo.unsafe", "false")
spark.conf.set("spark.kryo.registrationRequired", "false")

spark.conf.set(
    "spark.kryo.classesToRegister",
    "com.company.model.UserEvent,com.company.model.Order",
)

Note: DataFrame/SQL uses Tungsten encoding; serialization tuning matters less for pure SQL pipelines.

Speculative Execution: Handling Stragglers

Useful when a small number of tasks are stragglers (but don’t use it as a substitute for fixing skew).

spark.conf.set("spark.speculation", "true")
spark.conf.set("spark.speculation.multiplier", "1.5")
spark.conf.set("spark.speculation.quantile", "0.75")

SQL Query Optimization: The Systematic Approach

Reading execution plans (BigQuery)

EXPLAIN
SELECT user_id, COUNT(*) AS events
FROM gold.fact_events
WHERE event_date = '2026-04-13'
  AND event_type = 'purchase'
GROUP BY user_id;

Look for:

  • Full scan → partition pruning not working
  • Broadcast join → small table broadcasted (good)
  • Hash join vs merge join → depends on data order/indexing
  • Sort nodes → expensive; verify need for ORDER BY
  • Spill → memory pressure during sort/join

PostgreSQL: EXPLAIN ANALYZE

EXPLAIN (ANALYZE, BUFFERS, FORMAT JSON)
SELECT o.order_id, o.amount, c.customer_name
FROM orders o
JOIN customers c ON o.customer_id = c.customer_id
WHERE o.order_date >= '2026-01-01'
  AND o.status = 'completed';

Key signals:

  • rows estimated vs actual diverge → stale stats; run ANALYZE
  • buffers hit/read → cache hit ratio and IO pressure
  • Seq Scan → index likely needed (if table is large and predicate selective)
  • Join type choice: nested loop vs hash vs merge

Index Selection: When to Add, When Not To

Add indexes when:

-- High-selectivity filters
CREATE INDEX idx_orders_date ON orders(order_date);

-- Join keys (FK columns)
CREATE INDEX idx_orders_customer ON orders(customer_id);

-- Composite filters (column order matters)
CREATE INDEX idx_orders_status_customer ON orders(status, customer_id);

-- Covering index to avoid table lookups
CREATE INDEX idx_orders_covering ON orders(customer_id)
INCLUDE (order_date, amount, status);

Avoid indexes when:

  • Low-cardinality columns (boolean/status with few values)
  • Write-heavy tables (index maintenance dominates)
  • Very small tables (seq scan faster)
  • Frequently updated columns (fragmentation/overhead)

The SARGABLE Rule: Make Predicates Index-Friendly

SARGABLE predicates can use indexes effectively.

-- NOT SARGABLE
SELECT * FROM orders WHERE YEAR(order_date) = 2026;

-- SARGABLE
SELECT *
FROM orders
WHERE order_date >= '2026-01-01' AND order_date < '2027-01-01';

-- NOT SARGABLE
SELECT * FROM customers WHERE UPPER(email) = 'ALICE@GMAIL.COM';

-- SARGABLE (normalize at write time), or use functional index
CREATE INDEX idx_customers_email_lower ON customers(LOWER(email));
SELECT * FROM customers WHERE LOWER(email) = 'alice@gmail.com';

-- NOT SARGABLE
SELECT * FROM orders WHERE order_id + 1 = 1001;

-- SARGABLE
SELECT * FROM orders WHERE order_id = 1000;

Pipeline-Level Bottleneck Identification

Beyond single jobs/queries, pipelines fail for architectural reasons.

1. Source bottleneck (ingestion is slow)

Symptoms: pipeline waits for data; CPU idle.

Fixes:

  • Parallel extraction
  • CDC instead of full extract
  • Push-based ingestion instead of polling

2. Transformation bottleneck (compute is slow)

Causes: skew, too few partitions, expensive UDFs (especially Python UDFs).

Fixes:

# Fix skew with AQE/salting/broadcast
# Increase partitions before heavy operations

# BAD: Python UDF (slow)
def compute_metric(x):
    return x * 2 + 1

df.withColumn("metric", udf(compute_metric)(col("value")))

# GOOD: native expression (Catalyst-optimized)
df.withColumn("metric", col("value") * 2 + 1)

3. Sink bottleneck (writing is slow)

Causes: small files, large-table MERGE, slow sinks.

Fixes:

  • Increase micro-batch interval (fewer/larger files)
  • Partition-scoped staging for merges
  • Use bulk load paths (Snowflake COPY INTO, BigQuery load jobs)

4. Orchestration bottleneck (DAG waits)

Fixes:

  • Parallelize independent tasks
  • Tune sensor polling intervals
  • Consolidate tiny tasks into fewer medium tasks

5. Resource contention bottleneck (pipelines fight)

Fixes:

  • Resource pools / quotas by priority
  • Stagger schedules (avoid midnight thundering herd)
  • Separate clusters for SLA-critical vs best-effort

Interview Questions

Q1: Spark job (2 TB) was 45 minutes; now 6 hours after source format change. Diagnose.

Model answer:

  • Start in Spark UI: stage timeline + stragglers (skew vs global slowdown).
  • Validate input format: Parquet → JSON/CSV can cause 6–8× slowdown (no column pruning/predicate pushdown; expensive parsing).
  • Check bytes read: if it jumped dramatically, format/schema changed.
  • Check if pipeline is reading more columns (avoid SELECT *).
  • Check initial partitioning: a few huge files can reduce parallelism; repartition after read.
  • Most likely fix: convert to Parquet early (ingestion gateway) or add conversion as first step.

Q2: BigQuery dashboard query is 45s; business wants < 5s; scan is 2 TB. Fix.

Model answer:

  • First check partition pruning (missing WHERE on partition key often causes full scans).
  • If dashboard has a time range, ensure it filters partition column.
  • Use materialized views for common expensive aggregations.
  • Add clustering on frequent filter dimensions for block pruning.
  • Consider BI Engine for in-memory acceleration for dashboards.
  • If queuing/concurrency is the issue, slots/reservations can reduce latency.

Self-Assessment: 5 Questions

  • What are three common Spark OOM causes and the fix for each?
  • How do you enable AQE skew handling and what’s the threshold?
  • What makes a predicate SARGABLE vs non-SARGABLE, and why?
  • If pipeline is fast alone but slow alongside 10 others, what’s the diagnosis and fix?
  • If BigQuery shows a 2 TB scan on a partitioned table, what do you check first?

Quick Reference

  • Executor OOM: increase shuffle partitions; enable AQE; fix skew.
  • Driver OOM: avoid collect()/toPandas() on large data; bound results; adjust driver limits.
  • AQE: adaptive.enabled=true + skew + coalesce.
  • Serialization: Kryo helps for RDD/custom objects; less critical for pure DataFrames.
  • Execution plans: look for full scans, spills, and poor join strategy.
  • SARGABLE: don’t apply functions to indexed columns in WHERE.
  • Pipeline bottlenecks: source, transform, sink, orchestration, contention.

Tomorrow’s Preview

Day 57: Design: Uber/Lyft Surge Pricing Data Pipeline — real-time event processing, geospatial handling, time-series aggregation, dynamic pricing serving, and sub-minute freshness.