1. Introduction
Imagine a PySpark job running on a 20-node cluster. All 199 tasks finish in under a minute. One task is still running at the 45-minute mark. Your entire job is blocked on a single executor while the rest of the cluster sits idle. This is data skewness — and it is one of the most common and costly performance problems in distributed data engineering.
Data skewness happens when the data is unevenly distributed across partitions. Some partitions end up with far more rows than others. Since a Spark stage can only complete when its last task finishes, a single oversized partition can make an entire job take 10 to 50 times longer than it should.
This blog covers the complete picture: what skewness is, why it happens, how to identify it precisely using the Spark UI and code, how data skewness specifically occurs in joins (the most common trigger), and how to fix each type with practical, working examples.
2. What Is Data Skew in Apache Spark?
In PySpark, data is divided into partitions. Each partition is processed by one task running on one executor core. In an ideal world, all partitions are the same size and all tasks finish at the same time — the cluster works at maximum efficiency.
Skewness breaks this ideal. A skewed partition has significantly more rows than average. The task processing that partition takes proportionally longer. Every other task waits for it. The cluster is underutilised and the job takes much longer than the data volume would suggest.
Balanced vs Skewed Partitions
| Scenario | Partition 0 | Partition 1 | Partition 2 | Partition 3 | Max/Avg Ratio |
| Balanced | 250,000 | 251,000 | 249,000 | 250,000 | 1.0x (ideal) |
| Skewed | 4,800,000 | 250,000 | 249,000 | 250,000 | 19.2x (problem!) |
| A Max/Median ratio above 5x is a warning sign. A ratio above 10x will almost certainly cause timeout failures, executor OOM errors, or jobs that never complete. Anything above 3x on hot join keys deserves investigation. |
3. Root Causes of Data Skew in Apache Spark
Understanding why skewness happens is the first step to choosing the right fix. There are five primary root causes.
3.1 — Hot Keys in GroupBy or Join
The most common cause. When you group or join on a column where a small number of values account for a large share of rows, those values become hot keys. In a country-level dataset, ‘IN’ or ‘USA’ might represent 70% of all rows. In an e-commerce dataset, a viral product_id might appear in millions of transactions.
3.2 — NULL Values Hashing to the Same Partition
NULL is not just a missing value — it is a performance hazard. When you group by or join on a column that contains many NULLs, all NULLs hash to the same partition. If 40% of your rows have NULL in the join key, 40% of your data goes to one partition.
3.3 — Low-Cardinality Partition Columns
Partitioning by a column with very few distinct values — like status (Active/Inactive) or gender — creates only as many useful partitions as there are distinct values. With 200 default shuffle partitions but only 3 distinct values, 197 partitions are empty and 3 carry all the data.
3.4 — Skewed Source Data
Some data sources are inherently skewed. Kafka partitions can be unevenly populated. S3 prefixes can have wildly different file sizes. Skew that originates at ingestion carries through every downstream transformation.
3.5 — Heavily Repeated Keys in Star-Schema Joins
In a star-schema join, a fact table row often references a dimension key that appears hundreds of thousands of times. When the join shuffles by that key, all rows sharing the key go to one reducer — creating a massive skewed task even if the key distribution looks reasonable in isolation.
| Root Cause | Typical Symptom | Primary Fix |
| Hot key in groupBy/join | One task runs 10-50x longer than others | Salting |
| NULL values in join/groupBy key | One partition has all NULLs — OOM or timeout | Replace NULLs before join |
| Low-cardinality partition column | Only 2-5 partitions active, rest empty | Repartition with higher cardinality column |
| Skewed source files | First stage immediately shows uneven task sizes | Repartition after read |
| Repeated keys in star-schema join | Shuffle stage stalls on one or two reducer tasks | Broadcast join or salting |
4. How to Identify Data Skewness
Correct identification is critical — applying the wrong fix to the wrong type of skew wastes time. There are two complementary approaches: visual diagnosis via the Spark UI, and programmatic diagnosis using PySpark code.
4.1 — Spark UI Visual Diagnosis
| Step | Where to Look | What to Check | Skew Signal |
| 1 | Jobs tab | Expand the job and look at stage durations | One stage takes much longer than the others |
| 2 | Stages tab | Click the slow stage; check Summary Metrics box | Max Duration >> Median Duration (5x+ is skew) |
| 3 | Stages tab | Look at the task list sorted by Duration | One or two tasks are extreme outliers |
| 4 | Stages tab | Check Shuffle Read Size column per task | One task reads 100x more shuffle data than others |
| 5 | Executors tab | Look at GC Time for each executor | One executor has very high GC Time (memory pressure) |
4.2 — Programmatic Identification Using PySpark
Step 1: Check partition sizes to confirm skew
from pyspark.sql.functions import spark_partition_id, countdf.groupBy(spark_partition_id().alias('partition_id')) \ .count() \ .orderBy('count', ascending=False) \ .show(20)
OUTPUT: Partition size distribution — severe skew in partition 0
| partition_id | count | Observation |
| 0 | 5,200,000 | SEVERELY SKEWED — holds 87% of all data |
| 1 | 280,000 | Normal |
| 2 | 263,000 | Normal |
| 3 | 257,000 | Normal |
| … | ~260,000 | All other partitions balanced |
Step 2: Find the hot keys causing the skew
df.groupBy('country_code') \ .count() \ .orderBy('count', ascending=False) \ .show(10)
OUTPUT: Key value distribution — ‘IN’ is the hot key
| country_code | count | % of total |
| IN | 5,200,000 | 86.7% <- HOT KEY |
| USA | 280,000 | 4.7% |
| GBR | 263,000 | 4.4% |
| AUS | 257,000 | 4.3% |
Step 3: Measure skew statistically
from pyspark.sql.functions import stddev, meankey_counts = df.groupBy('country_code').count()stats = key_counts.select( mean('count').alias('mean_count'), stddev('count').alias('stddev_count')).collect()[0]cv = stats['stddev_count'] / stats['mean_count']print(f'Coefficient of Variation: {cv:.2f}')print('Skew level:', 'SEVERE' if cv > 3 else 'MODERATE' if cv > 1 else 'LOW')
OUTPUT: Statistical skew measurement
| Metric | Value | Interpretation |
| mean_count | 1,500,000 | Average rows per key |
| stddev_count | 2,283,000 | Very high — far from mean |
| max | 5,200,000 | Largest key group |
| CV | 1.52 | MODERATE-SEVERE skew — needs fixing |
Step 4: Check for NULL skew specifically
total = df.count()null_count = df.filter(col('dept_id').isNull()).count()null_pct = (null_count / total) * 100print(f'NULL rows: {null_count:,} ({null_pct:.1f}%)')if null_pct > 5: print('WARNING: High NULL % — NULL skew likely!')
| Run all four identification steps before applying any fix. Knowing whether you have hot-key skew, NULL skew, or low-cardinality skew determines which solution to use. Applying salting to a NULL skew problem will not solve it. |
5. Data Skewness in Joins — A Deep Dive
Joins are the single most common trigger of data skewness in PySpark. Understanding exactly why joins cause skew — and how each join type is affected differently — is essential before applying any fix.
5.1 — Why Joins Cause Skew: The Shuffle Mechanism
When Spark executes a Sort-Merge Join or Hash Join (the default strategies for large DataFrames), it must co-locate matching rows on the same executor. It does this by shuffling both DataFrames across the network, using a hash of the join key to determine which partition each row goes to.
The problem: a hash function assigns all rows with the same key value to the same partition. If one key value dominates — say, 80% of your orders have country_code=’IN’ — then 80% of your data from the left DataFrame lands on one partition, and 80% of the matching rows from the right DataFrame land on the same partition. One executor receives a huge amount of work; all others are nearly idle.
| Join Type | Uses Shuffle? | Skew Risk | Notes |
| Sort-Merge Join | Yes — both sides | HIGH | Default for large-large joins; full shuffle on join key |
| Hash Join | Yes — build side | MEDIUM | Smaller side is hashed; still shuffles by key |
| Broadcast Join | No | NONE | Small side sent to all executors; no shuffle, no skew |
| Broadcast Hash Join | No | NONE | Most efficient; eliminates skew entirely |
| Cartesian / Cross | No key hash | LOW | Skew possible from source but not from join key hash |
5.2 — Three Types of Join Skew with Examples
Type A — One-Sided Hot Key Skew
The most common pattern: the left (fact) table has a dominant key value. All rows with that key go to one partition on both sides of the join.
from pyspark.sql import SparkSessionfrom pyspark.sql.functions import colspark = SparkSession.builder.appName('JoinSkewDemo').getOrCreate()# orders: 6M rows, 87% have country_code='IN'orders_data = ( [(i, 'IN', 100+i) for i in range(5_200_000)] + # HOT KEY [(i, 'USA', 200+i) for i in range(280_000)] + [(i, 'GBR', 300+i) for i in range(263_000)] + [(i, 'AUS', 400+i) for i in range(257_000)])orders = spark.createDataFrame(orders_data, ['order_id','country_code','amount'])# countries: small reference tablecountries_data = [('IN','India'),('USA','United States'), ('GBR','United Kingdom'),('AUS','Australia')]countries = spark.createDataFrame(countries_data, ['country_code','country_name'])# SKEWED JOIN — Spark uses Sort-Merge Join (both sides shuffled by country_code)# All 5.2M 'IN' rows go to one partition on the left# The one 'IN' row from countries goes to the same partition on the rightresult = orders.join(countries, on='country_code', how='inner')# Diagnose: check partition sizes AFTER the join shuffleresult.groupBy(spark_partition_id().alias('pid')) \ .count().orderBy('count', ascending=False).show(5)
OUTPUT: Partition distribution after the skewed join
| partition_id | count | Task Duration (approx) |
| 0 | 5,200,000 | 45 min — SKEWED |
| 1 | 280,000 | 2 min |
| 2 | 263,000 | 2 min |
| 3 | 257,000 | 2 min |
| … | ~260,000 | 2 min each |
Type B — NULL Key Skew in a Join
When the join key contains NULLs, all NULL rows hash to the same partition. In SQL, NULL never equals NULL, so these rows are excluded from INNER JOINs — but in LEFT JOINs, all NULLs from the left side pile into one partition during the shuffle phase, even though they produce no matches.
# employees: 1M rows, 42% have NULL dept_idemp_data = ( [(i, 'Emp_'+str(i), None) for i in range(420_000)] + # NULL dept [(i, 'Emp_'+str(i), 10) for i in range(200_000)] + [(i, 'Emp_'+str(i), 20) for i in range(200_000)] + [(i, 'Emp_'+str(i), 30) for i in range(180_000)])employees = spark.createDataFrame(emp_data, ['emp_id','name','dept_id'])dept_data = [(10,'Engineering'),(20,'Marketing'),(30,'HR')]departments = spark.createDataFrame(dept_data, ['dept_id','dept_name'])# LEFT JOIN: 420K NULL dept_id rows all shuffle to the same partitionresult = employees.join(departments, on='dept_id', how='left')# Check: all NULLs in one partitionresult.filter(col('dept_name').isNull()) \ .groupBy(spark_partition_id().alias('pid')).count().show()
OUTPUT: NULL rows all land in one partition during the join shuffle
| partition_id | null_dept_rows | Non-null rows in same partition | Total in partition |
| 7 (or any one) | 420,000 | ~26,000 | 446,000 — SKEWED |
| All others | 0 | ~26,000 | ~26,000 each |
Type C — Multi-Table Join Skew (Star Schema)
In a star-schema pipeline, a fact table joins multiple dimension tables. Skew compounds across stages: the first join creates a skewed intermediate DataFrame, and that skewed DataFrame then participates in the next join — amplifying the problem at each step.
# Fact table: 10M transactions, many share the same product_idtransactions_data = ( [(i, 'PROD_001', i*10) for i in range(7_000_000)] + # HOT product [(i, 'PROD_002', i*5) for i in range(2_000_000)] + [(i, 'PROD_003', i*8) for i in range(1_000_000)])transactions = spark.createDataFrame( transactions_data, ['txn_id','product_id','revenue'])# Dimension tablesproducts_data = [('PROD_001','Laptop'),('PROD_002','Phone'),('PROD_003','Tablet')]products = spark.createDataFrame(products_data, ['product_id','product_name'])category_data = [('Laptop','Electronics'),('Phone','Electronics'),('Tablet','Electronics')]categories = spark.createDataFrame(category_data, ['product_name','category'])# Join 1: transactions x products (SKEWED — PROD_001 is hot key)# 7M rows go to one partitionstep1 = transactions.join(products, on='product_id', how='inner')# Join 2: skewed step1 x categories (skew COMPOUNDS)# The already-skewed PROD_001 partition is now joined againstep2 = step1.join(categories, on='product_name', how='inner')# Both stages are slow; the pipeline staggers from join to join
| In star-schema pipelines, fix skew at the FIRST join. A skewed intermediate DataFrame carries its skew into every subsequent join. Fixing it early eliminates the compounding effect and speeds up the entire pipeline, not just one stage. |
5.3 — Identifying Join Skew Specifically
Use these patterns to pinpoint skew that is caused by the join itself, not by the source data layout.
from pyspark.sql.functions import spark_partition_id, count# Method 1: Check partition sizes on the JOIN RESULTjoined = orders.join(countries, on='country_code', how='inner')joined.groupBy(spark_partition_id().alias('pid')) \ .count().orderBy('count', ascending=False).show(10)# If top partition is 10x+ larger than median -> join skew confirmed# Method 2: Check key distribution on BOTH sides before joiningprint('=== LEFT SIDE key distribution ===')orders.groupBy('country_code').count() \ .orderBy('count', ascending=False).show(5)print('=== RIGHT SIDE key distribution ===')countries.groupBy('country_code').count() \ .orderBy('count', ascending=False).show(5)# Method 3: Use explain() to see which join strategy Spark chosejoined.explain()# SortMergeJoin -> full shuffle on both sides -> skew likely# BroadcastHashJoin -> no shuffle -> no join-key skew possible# Method 4: Check shuffle read bytes per task in Spark UI# If one task shows 10x more shuffle read bytes -> join skew
OUTPUT: explain() output revealing join strategy
| Plan Output | Meaning | Skew Risk |
| == Physical Plan == SortMergeJoin [country_code] | Full shuffle on both sides by country_code | HIGH — hot keys pile into one partition |
| Exchange hashpartitioning(country_code, 200) | 200 shuffle partitions by key hash | All ‘IN’ rows -> same 1 of 200 partitions |
| BroadcastHashJoin, BuildRight | Right side was broadcast | NONE — no shuffle, no join-key skew |
5.4 — Fix 1: Broadcast Join (Best for Small Right Side)
If the right side of a skewed join is small (under 100 MB), broadcast join is the fastest and simplest fix. It sends the entire small DataFrame to every executor in memory. The join becomes a local lookup — no shuffle, no hash partitioning, no skew.
from pyspark.sql.functions import broadcast# BEFORE: Sort-Merge Join -- 'IN' rows all go to one partition# skewed = orders.join(countries, on='country_code', how='inner')# AFTER: Broadcast Join -- no shuffle at allfixed = orders.join(broadcast(countries), on='country_code', how='inner')# Verify broadcast was usedfixed.explain()# Should show: BroadcastHashJoin, BuildRight (NOT SortMergeJoin)# Check partition balance after broadcast joinfixed.groupBy(spark_partition_id().alias('pid')) \ .count().orderBy('count', ascending=False).show(5)
OUTPUT: Partition distribution AFTER broadcast join — perfectly balanced
| partition_id | count BEFORE | count AFTER broadcast | Improvement |
| 0 | 5,200,000 | ~300,000 | 17x reduction |
| 1 | 280,000 | ~300,000 | Balanced |
| 2 | 263,000 | ~300,000 | Balanced |
| All others | ~260,000 | ~300,000 | All balanced |
5.5 — Fix 2: Salting (Best for Large Hot Key on Both Sides)
When both sides are large and the right side cannot be broadcast, salting distributes the hot key across multiple partitions by appending a random integer to the join key. The right side is exploded to match all possible salted key variants.
from pyspark.sql.functions import concat_ws, lit, rand, floor, explode, arraySALT_FACTOR = 10 # spread hot key across 10 partitions# Step 1: Add random salt to the large left DataFrame# 'IN' rows now become: IN_0, IN_1, IN_2, ..., IN_9orders_salted = orders.withColumn( 'salted_key', concat_ws('_', col('country_code'), floor(rand() * SALT_FACTOR).cast('string')))# Step 2: Explode right DataFrame to create all salt variants# 'IN' row becomes 10 rows: IN_0, IN_1, ..., IN_9salt_array = array([lit(i) for i in range(SALT_FACTOR)])countries_exploded = countries \ .withColumn('salt_arr', salt_array) \ .withColumn('salt', explode(col('salt_arr'))) \ .withColumn('salted_key', concat_ws('_', col('country_code'), col('salt').cast('string'))) \ .drop('salt_arr', 'salt')# Step 3: Join on the salted key — hot key is now spread across 10 partitionsresult = orders_salted.join( countries_exploded, on = 'salted_key', how = 'inner').drop('salted_key')result.write.mode('overwrite').parquet('/tmp/salted_join_output')
OUTPUT: Partition distribution AFTER salting — hot key split across 10 partitions
| partition_id | count BEFORE | count AFTER salting | Improvement |
| 0 | 5,200,000 | 521,000 | 10x reduction (IN_0 shard) |
| 1 | 280,000 | 519,000 | Balanced |
| 2 | 263,000 | 521,000 | Balanced |
| 3-9 | ~260,000 | ~520,000 | All balanced |
5.6 — Fix 3: AQE Skew Join (Spark 3+ — Zero Code Change)
For Spark 3.0 and above, Adaptive Query Execution (AQE) can automatically detect and split oversized join partitions at runtime. Enable it with two configuration lines — no code changes to your join logic are needed.
# Enable AQE globally in your SparkSessionspark.conf.set('spark.sql.adaptive.enabled', 'true')spark.conf.set('spark.sql.adaptive.skewJoin.enabled', 'true')# Tune detection thresholds:# A join partition is 'skewed' if it is BOTH:# (a) > skewedPartitionFactor x median partition size, AND# (b) > skewedPartitionThresholdInBytesspark.conf.set( 'spark.sql.adaptive.skewJoin.skewedPartitionFactor', '5')spark.conf.set( 'spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes', '256MB')# Run the join as normal — AQE handles the restresult = orders.join(countries, on='country_code', how='inner')# AQE detects 'IN' partition is 5x larger than median,# automatically splits it and replicates the matching 'IN' row# from countries for each sub-task
5.7 — Fix 4: Handling NULL Key Skew in Joins
NULL keys cause join skew because all NULLs hash to the same partition during the shuffle phase — even though NULL rows produce no matches in an INNER JOIN and only produce null-padded rows in a LEFT JOIN.
from pyspark.sql.functions import col, coalesce, lit# FIX A: Replace NULLs before the join (sentinel value approach)# Use when NULLs represent a known category (e.g. 'Unassigned')employees_clean = employees.withColumn( 'dept_id', coalesce(col('dept_id'), lit(-1)))result = employees_clean.join(departments, on='dept_id', how='left')# FIX B: Separate NULLs, join non-NULLs, then union# Use when NULLs need different handling from matched rowsnull_rows = employees.filter(col('dept_id').isNull())non_null_rows = employees.filter(col('dept_id').isNotNull())# Join only non-null rows — no NULL partition skewjoined = non_null_rows.join(departments, on='dept_id', how='left')# Add placeholder columns to null rows to match schemanull_result = null_rows \ .withColumn('dept_name', lit('Unassigned')) \ .withColumn('budget', lit(0).cast('long'))# Union both back togetherfinal = joined.union(null_result)
| Always check NULL percentage in join key columns before running any large join. If NULL% > 5%, apply Fix A or Fix B above. A join on a column where 40% of values are NULL will create a partition with 40% of all data — guaranteed OOM or timeout on any reasonably large dataset. |
5.8 — Fixing Star-Schema Compounding Join Skew
In multi-table pipelines, fix skew at the first join to prevent it from carrying forward. Cache the fixed intermediate result so it is not recomputed for subsequent joins.
# BEFORE (skew compounds across joins):# step1 = transactions.join(products, 'product_id') # SKEWED# step2 = step1.join(categories, 'product_name') # STILL SKEWED# AFTER: Fix skew at step 1 using broadcast# Step 1: Broadcast the small products tablestep1_fixed = transactions.join( broadcast(products), # products is small -- broadcast it on = 'product_id', how = 'inner')# Cache the fixed intermediate result# Avoids recomputation for subsequent joinsstep1_fixed.cache()step1_fixed.count() # Materialise the cache# Step 2: Now join on product_name# step1_fixed is evenly distributed -- no compounding skewstep2_fixed = step1_fixed.join( broadcast(categories), on = 'product_name', how = 'inner')step2_fixed.write.mode('overwrite').parquet('/tmp/star_join_output')# Always unpersist when donestep1_fixed.unpersist()
5.9 — Join Skew Fix Decision Guide
| Join Skew Scenario | Right Side Size | Recommended Fix | Expected Speedup |
| One hot key dominates left side | Small (< 100 MB) | broadcast(right_df) | 10-50x |
| One hot key dominates left side | Large (> 100 MB) | Salting (SALT_FACTOR = skew ratio) | 5-20x |
| Moderate skew (5-20x ratio) | Any size | AQE skew join (Spark 3+) | 3-10x |
| NULL keys in left side (> 5% NULL) | Any size | Replace NULLs before join | 5-40x |
| Skew compounds across multiple joins | Small dimensions | Broadcast all dims + cache intermediate | 10-50x |
| Both sides skewed on same hot key | Both large | Salting on both sides | 5-15x |
6. Fix 1 (GroupBy) — Salting for Aggregation Skew
Salting is not just for joins — it is equally powerful for groupBy aggregation skew. The two-stage approach first aggregates on the salted key (partial aggregation per shard), then aggregates the partial results on the original key (final aggregation).
from pyspark.sql.functions import sum as spark_sum, concat_ws, rand, floor, colSALT_FACTOR = 10# Stage 1: Partial aggregation on salted key# 'IN' rows spread across IN_0, IN_1, ..., IN_9partial = orders \ .withColumn('salt', floor(rand() * SALT_FACTOR).cast('string')) \ .withColumn('salted_key', concat_ws('_', col('country_code'), col('salt'))) \ .groupBy('salted_key', 'country_code') \ .agg(spark_sum('amount').alias('partial_sum'))# Stage 2: Final aggregation on original key# Only 10 partial rows per country — no skew possiblefinal = partial \ .groupBy('country_code') \ .agg(spark_sum('partial_sum').alias('total_amount'))final.show()
OUTPUT: Final aggregation result — correct totals, no skew
| country_code | total_amount |
| IN | 13,520,000,000 (sum of all 5.2M IN orders) |
| USA | 14,028,000,000 |
| GBR | 14,125,000,000 |
| AUS | 14,000,000,000 |
7. Fix 2 — Resolving NULL Key Skew
NULL values in join or groupBy keys cause silent but severe skew. All NULL rows hash to the same partition. If 30% of your data has a NULL join key, 30% of your data is on a single executor. The fix must be applied before the join or aggregation.
from pyspark.sql.functions import col, coalesce, lit# FIX A: Replace NULLs with a sentinel valueemployees_fixed = employees.withColumn( 'dept_id', coalesce(col('dept_id'), lit(-1)))# FIX B: Filter NULLs before joinemployees_no_null = employees.filter(col('dept_id').isNotNull())result = employees_no_null.join(departments, on='dept_id', how='inner')# FIX C: Separate NULLs, join non-NULLs, union resultsnull_rows = employees.filter(col('dept_id').isNull())non_null_rows = employees.filter(col('dept_id').isNotNull())joined = non_null_rows.join(departments, on='dept_id', how='left')null_handled = null_rows.withColumn('dept_name', lit('Unassigned')) \ .withColumn('budget', lit(0).cast('long'))final = joined.union(null_handled)
8. Fix 3 — Repartitioning to Redistribute Data
When skewness is not tied to a specific key value but to the overall partition layout, repartition() is the most direct fix. It performs a full shuffle to redistribute data evenly across a chosen number of partitions.
# Repartition evenly across 200 partitionsdf_even = df.repartition(200)# Repartition BY COLUMN: co-locate rows with same key on same partition# Avoids shuffle in subsequent join on that columndf_by_key = df.repartition(200, 'country_code')# coalesce() for reducing partition count WITHOUT a full shuffledf.coalesce(10).write.parquet('/output/result')# Check partition countprint(f'Partitions: {df.rdd.getNumPartitions()}')
| repartition() triggers a full shuffle — use it once before a chain of heavy operations. Use coalesce() when reducing partition count for output writing, since it avoids a full shuffle. |
9. Measuring the Impact: Before vs After
Always measure the improvement after applying a fix. Use partition distribution checks and Spark UI timing to confirm the fix worked.
def measure_skew(df, label): from pyspark.sql.functions import spark_partition_id, mean, stddev pcounts = df.groupBy(spark_partition_id().alias('pid')).count() stats = pcounts.agg( mean('count').alias('avg'), stddev('count').alias('std') ).collect()[0] max_c = pcounts.agg({'count':'max'}).collect()[0][0] cv = stats['std'] / stats['avg'] if stats['avg'] else 0 print(f'\n--- {label} ---') print(f' Max partition : {max_c:,}') print(f' Avg partition : {stats["avg"]:,.0f}') print(f' Max/Avg ratio : {max_c/stats["avg"]:.1f}x') print(f' CV score : {cv:.2f}') print(f' Skew Level : {"SEVERE" if cv>3 else "MODERATE" if cv>1 else "LOW"}')measure_skew(orders.join(countries,'country_code'), 'BEFORE FIX')measure_skew(orders.join(broadcast(countries),'country_code'),'AFTER BROADCAST')measure_skew(orders_salted.join(countries_exploded,'salted_key'),'AFTER SALTING')
OUTPUT: Before vs after comparison
| Metric | BEFORE | AFTER Broadcast | AFTER Salting |
| Max partition size | 5,200,000 | ~300,000 | 521,000 |
| Max / Avg ratio | 19.2x | 1.05x | 1.03x |
| CV (skew score) | 1.52 | 0.04 | 0.03 |
| Skew Level | MODERATE | LOW | LOW |
| Stage Duration | ~45 min | ~2 min | ~4 min |
10. Choosing the Right Fix
| Diagnosis Result | Root Cause | Recommended Fix | Section |
| One hot key has 10x+ more rows; large right side | Hot key join skew | Salting | 5.5 |
| Right-side DataFrame is small (< 100 MB) | Join shuffle skew | Broadcast join | 5.4 |
| Running Spark 3.0+ with moderate join skew (5-20x) | Join skew | AQE Skew Join (config only) | 5.6 |
| NULL values > 5% of join/groupBy key column | NULL key skew | Replace NULLs or split + union | 5.7 / 7 |
| Skew compounds across multiple star-schema joins | Cascading join skew | Broadcast all dims + cache | 5.8 |
| GroupBy aggregation skew on hot key | Aggregation skew | Two-stage salted aggregation | 6 |
| Source data has uneven file/partition sizes | Ingestion skew | repartition() after read | 8 |
11. Quick Reference — Identification Checklist
| Check | Code / Action | Skew If… |
| Partition size distribution | df.groupBy(spark_partition_id()).count().show() | Max partition > 5x median partition |
| Hot key distribution | df.groupBy(‘key_col’).count().orderBy(‘count’, desc).show() | Top key has > 20% of all rows |
| Statistical skew score | CV = stddev / mean on key counts | CV > 1.0 (moderate), CV > 3.0 (severe) |
| NULL key percentage | df.filter(col(‘key’).isNull()).count() / df.count() | NULL% > 5% of total rows |
| Join strategy check | joined_df.explain() — look for SortMergeJoin vs BroadcastHashJoin | SortMergeJoin on a column with hot keys |
| Spark UI — Stages tab | Check Max Duration vs Median Duration | Max / Median > 5x in the slow stage |
| Spark UI — Task list | Sort tasks by Duration in the slow stage | One or two tasks are extreme outliers |
12. Conclusion
Data skewness is not random bad luck — it is a predictable consequence of data distribution combined with hash-based partitioning. Understanding its causes makes it entirely preventable and fixable.
Joins are the most common trigger of skewness in PySpark. When Spark shuffles data for a Sort-Merge Join, every row with the same key goes to the same partition — turning hot keys into single-executor bottlenecks. The fix is always one of four approaches: broadcast the small side to eliminate the shuffle entirely; salt the hot key to spread it across partitions; enable AQE to let Spark handle it automatically at runtime; or handle NULLs before they collapse into one partition.
The workflow is always the same: Identify first — use the Spark UI and partition distribution checks to confirm skew and find the hot keys. Choose the right fix based on the type of skew. Measure the improvement — verify with CV scores and Spark UI timing. A 45-minute job caused by a single skewed partition can run in 2 minutes after a one-line broadcast hint.
Happy Optimising!
Discover more from DataSangyan
Subscribe to get the latest posts sent to your email.