Phase 2: Deep Dives | Category: Cost & Performance
The Senior-Level Framing: Depth Over Breadth
At senior level, the test isn’t “can you fix it?” — it’s “do you understand why it breaks?” When a Spark job slows down, the senior answer isn’t “add more memory” — it’s identifying root cause (skew, shuffle spill, IO regression) and applying a targeted fix.
The Systematic Diagnosis Framework
Before touching configs or code, diagnose.
Step 1: MEASURE baseline
- Current runtime vs expected runtime
- Peak resource utilization (CPU, memory, disk, network)
- Throughput (records/sec, MB/sec)
Step 2: IDENTIFY bottleneck category
A. CPU-bound: high CPU; tasks barely finish
B. Memory-bound: GC pauses, OOM, shuffle spill
C. I/O-bound: low CPU; time spent reading/writing
D. Network-bound: high shuffle read/write, cross-AZ transfer
E. Skew: 99% tasks fast; 1% take 100× longer
F. Serialization: high serialization time in Spark UI
Step 3: FIX the specific bottleneck
- Each category has specific fixes
- Don’t throw resources at it until you know root cause
Step 4: MEASURE after fix
- Never declare victory without measurement
Spark Performance Tuning: Advanced Scenarios
Day 9 covered fundamentals. This goes deeper on senior-level scenarios.
Memory Management: The OOM Anatomy
Spark executor memory under the Unified Memory Model:
Total executor memory = spark.executor.memory (e.g., 8 GB)
├── Reserved memory (~300 MB) — Spark system use
├── User memory = (8 GB - 300 MB) × (1 - spark.memory.fraction)
│ Default fraction: 0.6 → user memory ~40%
│ Purpose: UDFs, data structures, Python workers
└── Spark memory = (8 GB - 300 MB) × spark.memory.fraction
Default: ~60%
├── Execution memory (shuffle, sort, joins, aggs)
└── Storage memory (cache, broadcast)
Execution/storage share the pool dynamically
OOM scenario 1: Executor OOM during shuffle (most common)
Symptoms:
java.lang.OutOfMemoryError: GC overhead limit exceeded
Unable to acquire X bytes of memory
Root causes:
- Skewed key creates one giant partition (e.g., 50 GB) while others are tiny
- Too few shuffle partitions for dataset size (default 200)
Fixes:
# Fix 1: increase shuffle partitions
spark.conf.set("spark.sql.shuffle.partitions", "2000")
# Rule of thumb: target ~100–300 MB per shuffle partition
# Fix 2: enable AQE (Spark 3.x)
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
# Fix 3: salt skewed keys if AQE isn’t sufficient
OOM scenario 2: Driver OOM
Symptoms: OOM in driver logs (not executor), often after collect() / toPandas() / large broadcasts.
Root cause: pulling too much data to driver RAM.
Fixes:
# BAD: collect entire dataset to driver
result = df.filter(...).collect()
for row in result:
process(row)
# GOOD: process on workers; write output
df.filter(...).write.parquet("s3://output/")
# If you must collect, bound it
df.filter(...).limit(1000).collect()
# Driver guardrails (when needed)
# spark.conf.set("spark.driver.memory", "4g")
# spark.conf.set("spark.driver.maxResultSize", "2g")
Shuffle Optimization: Reducing the Most Expensive Operation
AQE (Adaptive Query Execution) — enable it broadly in Spark 3.x:
spark.conf.set("spark.sql.adaptive.enabled", "true")
# 1) Coalesce small post-shuffle partitions
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.minPartitionSize", "64m")
# 2) Automatically handle skewed joins
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "256m")
# 3) Optimize shuffle reads
spark.conf.set("spark.sql.adaptive.localShuffleReader.enabled", "true")
Reduce shuffle through better data organization
If two datasets are frequently joined on the same key, co-organize them.
orders_df.write \
.partitionBy("order_date") \
.bucketBy(100, "customer_id") \
.sortBy("customer_id") \
.saveAsTable("silver.orders_bucketed")
customers_df.write \
.bucketBy(100, "customer_id") \
.sortBy("customer_id") \
.saveAsTable("silver.customers_bucketed")
result = orders_df.join(customers_df, "customer_id")
Serialization: Kryo vs Java
Kryo is faster and smaller than default Java serialization (most relevant for RDD APIs, custom objects, and large broadcasts).
spark.conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
spark.conf.set("spark.kryo.unsafe", "false")
spark.conf.set("spark.kryo.registrationRequired", "false")
spark.conf.set(
"spark.kryo.classesToRegister",
"com.company.model.UserEvent,com.company.model.Order",
)
Note: DataFrame/SQL uses Tungsten encoding; serialization tuning matters less for pure SQL pipelines.
Speculative Execution: Handling Stragglers
Useful when a small number of tasks are stragglers (but don’t use it as a substitute for fixing skew).
spark.conf.set("spark.speculation", "true")
spark.conf.set("spark.speculation.multiplier", "1.5")
spark.conf.set("spark.speculation.quantile", "0.75")
SQL Query Optimization: The Systematic Approach
Reading execution plans (BigQuery)
EXPLAIN
SELECT user_id, COUNT(*) AS events
FROM gold.fact_events
WHERE event_date = '2026-04-13'
AND event_type = 'purchase'
GROUP BY user_id;
Look for:
- Full scan → partition pruning not working
- Broadcast join → small table broadcasted (good)
- Hash join vs merge join → depends on data order/indexing
- Sort nodes → expensive; verify need for
ORDER BY - Spill → memory pressure during sort/join
PostgreSQL: EXPLAIN ANALYZE
EXPLAIN (ANALYZE, BUFFERS, FORMAT JSON)
SELECT o.order_id, o.amount, c.customer_name
FROM orders o
JOIN customers c ON o.customer_id = c.customer_id
WHERE o.order_date >= '2026-01-01'
AND o.status = 'completed';
Key signals:
rowsestimated vs actual diverge → stale stats; runANALYZEbuffers hit/read→ cache hit ratio and IO pressureSeq Scan→ index likely needed (if table is large and predicate selective)- Join type choice: nested loop vs hash vs merge
Index Selection: When to Add, When Not To
Add indexes when:
-- High-selectivity filters
CREATE INDEX idx_orders_date ON orders(order_date);
-- Join keys (FK columns)
CREATE INDEX idx_orders_customer ON orders(customer_id);
-- Composite filters (column order matters)
CREATE INDEX idx_orders_status_customer ON orders(status, customer_id);
-- Covering index to avoid table lookups
CREATE INDEX idx_orders_covering ON orders(customer_id)
INCLUDE (order_date, amount, status);
Avoid indexes when:
- Low-cardinality columns (boolean/status with few values)
- Write-heavy tables (index maintenance dominates)
- Very small tables (seq scan faster)
- Frequently updated columns (fragmentation/overhead)
The SARGABLE Rule: Make Predicates Index-Friendly
SARGABLE predicates can use indexes effectively.
-- NOT SARGABLE
SELECT * FROM orders WHERE YEAR(order_date) = 2026;
-- SARGABLE
SELECT *
FROM orders
WHERE order_date >= '2026-01-01' AND order_date < '2027-01-01';
-- NOT SARGABLE
SELECT * FROM customers WHERE UPPER(email) = 'ALICE@GMAIL.COM';
-- SARGABLE (normalize at write time), or use functional index
CREATE INDEX idx_customers_email_lower ON customers(LOWER(email));
SELECT * FROM customers WHERE LOWER(email) = 'alice@gmail.com';
-- NOT SARGABLE
SELECT * FROM orders WHERE order_id + 1 = 1001;
-- SARGABLE
SELECT * FROM orders WHERE order_id = 1000;
Pipeline-Level Bottleneck Identification
Beyond single jobs/queries, pipelines fail for architectural reasons.
1. Source bottleneck (ingestion is slow)
Symptoms: pipeline waits for data; CPU idle.
Fixes:
- Parallel extraction
- CDC instead of full extract
- Push-based ingestion instead of polling
2. Transformation bottleneck (compute is slow)
Causes: skew, too few partitions, expensive UDFs (especially Python UDFs).
Fixes:
# Fix skew with AQE/salting/broadcast
# Increase partitions before heavy operations
# BAD: Python UDF (slow)
def compute_metric(x):
return x * 2 + 1
df.withColumn("metric", udf(compute_metric)(col("value")))
# GOOD: native expression (Catalyst-optimized)
df.withColumn("metric", col("value") * 2 + 1)
3. Sink bottleneck (writing is slow)
Causes: small files, large-table MERGE, slow sinks.
Fixes:
- Increase micro-batch interval (fewer/larger files)
- Partition-scoped staging for merges
- Use bulk load paths (Snowflake
COPY INTO, BigQuery load jobs)
4. Orchestration bottleneck (DAG waits)
Fixes:
- Parallelize independent tasks
- Tune sensor polling intervals
- Consolidate tiny tasks into fewer medium tasks
5. Resource contention bottleneck (pipelines fight)
Fixes:
- Resource pools / quotas by priority
- Stagger schedules (avoid midnight thundering herd)
- Separate clusters for SLA-critical vs best-effort
Interview Questions
Q1: Spark job (2 TB) was 45 minutes; now 6 hours after source format change. Diagnose.
Model answer:
- Start in Spark UI: stage timeline + stragglers (skew vs global slowdown).
- Validate input format: Parquet → JSON/CSV can cause 6–8× slowdown (no column pruning/predicate pushdown; expensive parsing).
- Check bytes read: if it jumped dramatically, format/schema changed.
- Check if pipeline is reading more columns (avoid
SELECT *). - Check initial partitioning: a few huge files can reduce parallelism; repartition after read.
- Most likely fix: convert to Parquet early (ingestion gateway) or add conversion as first step.
Q2: BigQuery dashboard query is 45s; business wants < 5s; scan is 2 TB. Fix.
Model answer:
- First check partition pruning (missing
WHEREon partition key often causes full scans). - If dashboard has a time range, ensure it filters partition column.
- Use materialized views for common expensive aggregations.
- Add clustering on frequent filter dimensions for block pruning.
- Consider BI Engine for in-memory acceleration for dashboards.
- If queuing/concurrency is the issue, slots/reservations can reduce latency.
Self-Assessment: 5 Questions
- What are three common Spark OOM causes and the fix for each?
- How do you enable AQE skew handling and what’s the threshold?
- What makes a predicate SARGABLE vs non-SARGABLE, and why?
- If pipeline is fast alone but slow alongside 10 others, what’s the diagnosis and fix?
- If BigQuery shows a 2 TB scan on a partitioned table, what do you check first?
Quick Reference
- Executor OOM: increase shuffle partitions; enable AQE; fix skew.
- Driver OOM: avoid
collect()/toPandas()on large data; bound results; adjust driver limits. - AQE:
adaptive.enabled=true+ skew + coalesce. - Serialization: Kryo helps for RDD/custom objects; less critical for pure DataFrames.
- Execution plans: look for full scans, spills, and poor join strategy.
- SARGABLE: don’t apply functions to indexed columns in
WHERE. - Pipeline bottlenecks: source, transform, sink, orchestration, contention.
Tomorrow’s Preview
Day 57: Design: Uber/Lyft Surge Pricing Data Pipeline — real-time event processing, geospatial handling, time-series aggregation, dynamic pricing serving, and sub-minute freshness.