How to Fix Data Skew in Apache Spark

Imagine a PySpark job running on a 20-node cluster. All 199 tasks finish in under a minute. One task is still running at the 45-minute mark. Your entire job is blocked on a single executor while the rest of the cluster sits idle. This is data skewness — and it is one of the most common and costly performance problems in distributed data engineering.

Data skewness happens when the data is unevenly distributed across partitions. Some partitions end up with far more rows than others. Since a Spark stage can only complete when its last task finishes, a single oversized partition can make an entire job take 10 to 50 times longer than it should.

This blog covers the complete picture: what skewness is, why it happens, how to identify it precisely using the Spark UI and code, how data skewness specifically occurs in joins (the most common trigger), and how to fix each type with practical, working examples.

In PySpark, data is divided into partitions. Each partition is processed by one task running on one executor core. In an ideal world, all partitions are the same size and all tasks finish at the same time — the cluster works at maximum efficiency.

Skewness breaks this ideal. A skewed partition has significantly more rows than average. The task processing that partition takes proportionally longer. Every other task waits for it. The cluster is underutilised and the job takes much longer than the data volume would suggest.

Balanced vs Skewed Partitions

ScenarioPartition 0Partition 1Partition 2Partition 3Max/Avg Ratio
Balanced250,000251,000249,000250,0001.0x  (ideal)
Skewed4,800,000250,000249,000250,00019.2x  (problem!)
A Max/Median ratio above 5x is a warning sign. A ratio above 10x will almost certainly cause timeout failures, executor OOM errors, or jobs that never complete. Anything above 3x on hot join keys deserves investigation.

Understanding why skewness happens is the first step to choosing the right fix. There are five primary root causes.

The most common cause. When you group or join on a column where a small number of values account for a large share of rows, those values become hot keys. In a country-level dataset, ‘IN’ or ‘USA’ might represent 70% of all rows. In an e-commerce dataset, a viral product_id might appear in millions of transactions.

NULL is not just a missing value — it is a performance hazard. When you group by or join on a column that contains many NULLs, all NULLs hash to the same partition. If 40% of your rows have NULL in the join key, 40% of your data goes to one partition.

Partitioning by a column with very few distinct values — like status (Active/Inactive) or gender — creates only as many useful partitions as there are distinct values. With 200 default shuffle partitions but only 3 distinct values, 197 partitions are empty and 3 carry all the data.

Some data sources are inherently skewed. Kafka partitions can be unevenly populated. S3 prefixes can have wildly different file sizes. Skew that originates at ingestion carries through every downstream transformation.

In a star-schema join, a fact table row often references a dimension key that appears hundreds of thousands of times. When the join shuffles by that key, all rows sharing the key go to one reducer — creating a massive skewed task even if the key distribution looks reasonable in isolation.

Root CauseTypical SymptomPrimary Fix
Hot key in groupBy/joinOne task runs 10-50x longer than othersSalting
NULL values in join/groupBy keyOne partition has all NULLs — OOM or timeoutReplace NULLs before join
Low-cardinality partition columnOnly 2-5 partitions active, rest emptyRepartition with higher cardinality column
Skewed source filesFirst stage immediately shows uneven task sizesRepartition after read
Repeated keys in star-schema joinShuffle stage stalls on one or two reducer tasksBroadcast join or salting

Correct identification is critical — applying the wrong fix to the wrong type of skew wastes time. There are two complementary approaches: visual diagnosis via the Spark UI, and programmatic diagnosis using PySpark code.

StepWhere to LookWhat to CheckSkew Signal
1Jobs tabExpand the job and look at stage durationsOne stage takes much longer than the others
2Stages tabClick the slow stage; check Summary Metrics boxMax Duration >> Median Duration (5x+ is skew)
3Stages tabLook at the task list sorted by DurationOne or two tasks are extreme outliers
4Stages tabCheck Shuffle Read Size column per taskOne task reads 100x more shuffle data than others
5Executors tabLook at GC Time for each executorOne executor has very high GC Time (memory pressure)

Step 1: Check partition sizes to confirm skew

from pyspark.sql.functions import spark_partition_id, count
df.groupBy(spark_partition_id().alias('partition_id')) \
.count() \
.orderBy('count', ascending=False) \
.show(20)

OUTPUT: Partition size distribution — severe skew in partition 0

partition_idcountObservation
05,200,000SEVERELY SKEWED — holds 87% of all data
1280,000Normal
2263,000Normal
3257,000Normal
~260,000All other partitions balanced

Step 2: Find the hot keys causing the skew

df.groupBy('country_code') \
.count() \
.orderBy('count', ascending=False) \
.show(10)

OUTPUT: Key value distribution — ‘IN’ is the hot key

country_codecount% of total
IN5,200,00086.7%  <- HOT KEY
USA280,0004.7%
GBR263,0004.4%
AUS257,0004.3%

Step 3: Measure skew statistically

from pyspark.sql.functions import stddev, mean
key_counts = df.groupBy('country_code').count()
stats = key_counts.select(
mean('count').alias('mean_count'),
stddev('count').alias('stddev_count')
).collect()[0]
cv = stats['stddev_count'] / stats['mean_count']
print(f'Coefficient of Variation: {cv:.2f}')
print('Skew level:', 'SEVERE' if cv > 3 else 'MODERATE' if cv > 1 else 'LOW')

OUTPUT: Statistical skew measurement

MetricValueInterpretation
mean_count1,500,000Average rows per key
stddev_count2,283,000Very high — far from mean
max5,200,000Largest key group
CV1.52MODERATE-SEVERE skew — needs fixing

Step 4: Check for NULL skew specifically

total = df.count()
null_count = df.filter(col('dept_id').isNull()).count()
null_pct = (null_count / total) * 100
print(f'NULL rows: {null_count:,} ({null_pct:.1f}%)')
if null_pct > 5:
print('WARNING: High NULL % — NULL skew likely!')
Run all four identification steps before applying any fix. Knowing whether you have hot-key skew, NULL skew, or low-cardinality skew determines which solution to use. Applying salting to a NULL skew problem will not solve it.

Joins are the single most common trigger of data skewness in PySpark. Understanding exactly why joins cause skew — and how each join type is affected differently — is essential before applying any fix.

When Spark executes a Sort-Merge Join or Hash Join (the default strategies for large DataFrames), it must co-locate matching rows on the same executor. It does this by shuffling both DataFrames across the network, using a hash of the join key to determine which partition each row goes to.

The problem: a hash function assigns all rows with the same key value to the same partition. If one key value dominates — say, 80% of your orders have country_code=’IN’ — then 80% of your data from the left DataFrame lands on one partition, and 80% of the matching rows from the right DataFrame land on the same partition. One executor receives a huge amount of work; all others are nearly idle.

Join TypeUses Shuffle?Skew RiskNotes
Sort-Merge JoinYes — both sidesHIGHDefault for large-large joins; full shuffle on join key
Hash JoinYes — build sideMEDIUMSmaller side is hashed; still shuffles by key
Broadcast JoinNoNONESmall side sent to all executors; no shuffle, no skew
Broadcast Hash JoinNoNONEMost efficient; eliminates skew entirely
Cartesian / CrossNo key hashLOWSkew possible from source but not from join key hash

Type A — One-Sided Hot Key Skew

The most common pattern: the left (fact) table has a dominant key value. All rows with that key go to one partition on both sides of the join.

from pyspark.sql import SparkSession
from pyspark.sql.functions import col
spark = SparkSession.builder.appName('JoinSkewDemo').getOrCreate()
# orders: 6M rows, 87% have country_code='IN'
orders_data = (
[(i, 'IN', 100+i) for i in range(5_200_000)] + # HOT KEY
[(i, 'USA', 200+i) for i in range(280_000)] +
[(i, 'GBR', 300+i) for i in range(263_000)] +
[(i, 'AUS', 400+i) for i in range(257_000)]
)
orders = spark.createDataFrame(orders_data, ['order_id','country_code','amount'])
# countries: small reference table
countries_data = [('IN','India'),('USA','United States'),
('GBR','United Kingdom'),('AUS','Australia')]
countries = spark.createDataFrame(countries_data, ['country_code','country_name'])
# SKEWED JOIN — Spark uses Sort-Merge Join (both sides shuffled by country_code)
# All 5.2M 'IN' rows go to one partition on the left
# The one 'IN' row from countries goes to the same partition on the right
result = orders.join(countries, on='country_code', how='inner')
# Diagnose: check partition sizes AFTER the join shuffle
result.groupBy(spark_partition_id().alias('pid')) \
.count().orderBy('count', ascending=False).show(5)

OUTPUT: Partition distribution after the skewed join

partition_idcountTask Duration (approx)
05,200,00045 min — SKEWED
1280,0002 min
2263,0002 min
3257,0002 min
~260,0002 min each

Type B — NULL Key Skew in a Join

When the join key contains NULLs, all NULL rows hash to the same partition. In SQL, NULL never equals NULL, so these rows are excluded from INNER JOINs — but in LEFT JOINs, all NULLs from the left side pile into one partition during the shuffle phase, even though they produce no matches.

# employees: 1M rows, 42% have NULL dept_id
emp_data = (
[(i, 'Emp_'+str(i), None) for i in range(420_000)] + # NULL dept
[(i, 'Emp_'+str(i), 10) for i in range(200_000)] +
[(i, 'Emp_'+str(i), 20) for i in range(200_000)] +
[(i, 'Emp_'+str(i), 30) for i in range(180_000)]
)
employees = spark.createDataFrame(emp_data, ['emp_id','name','dept_id'])
dept_data = [(10,'Engineering'),(20,'Marketing'),(30,'HR')]
departments = spark.createDataFrame(dept_data, ['dept_id','dept_name'])
# LEFT JOIN: 420K NULL dept_id rows all shuffle to the same partition
result = employees.join(departments, on='dept_id', how='left')
# Check: all NULLs in one partition
result.filter(col('dept_name').isNull()) \
.groupBy(spark_partition_id().alias('pid')).count().show()

OUTPUT: NULL rows all land in one partition during the join shuffle

partition_idnull_dept_rowsNon-null rows in same partitionTotal in partition
7 (or any one)420,000~26,000446,000  — SKEWED
All others0~26,000~26,000 each

Type C — Multi-Table Join Skew (Star Schema)

In a star-schema pipeline, a fact table joins multiple dimension tables. Skew compounds across stages: the first join creates a skewed intermediate DataFrame, and that skewed DataFrame then participates in the next join — amplifying the problem at each step.

# Fact table: 10M transactions, many share the same product_id
transactions_data = (
[(i, 'PROD_001', i*10) for i in range(7_000_000)] + # HOT product
[(i, 'PROD_002', i*5) for i in range(2_000_000)] +
[(i, 'PROD_003', i*8) for i in range(1_000_000)]
)
transactions = spark.createDataFrame(
transactions_data, ['txn_id','product_id','revenue']
)
# Dimension tables
products_data = [('PROD_001','Laptop'),('PROD_002','Phone'),('PROD_003','Tablet')]
products = spark.createDataFrame(products_data, ['product_id','product_name'])
category_data = [('Laptop','Electronics'),('Phone','Electronics'),('Tablet','Electronics')]
categories = spark.createDataFrame(category_data, ['product_name','category'])
# Join 1: transactions x products (SKEWED — PROD_001 is hot key)
# 7M rows go to one partition
step1 = transactions.join(products, on='product_id', how='inner')
# Join 2: skewed step1 x categories (skew COMPOUNDS)
# The already-skewed PROD_001 partition is now joined again
step2 = step1.join(categories, on='product_name', how='inner')
# Both stages are slow; the pipeline staggers from join to join
In star-schema pipelines, fix skew at the FIRST join. A skewed intermediate DataFrame carries its skew into every subsequent join. Fixing it early eliminates the compounding effect and speeds up the entire pipeline, not just one stage.

Use these patterns to pinpoint skew that is caused by the join itself, not by the source data layout.

from pyspark.sql.functions import spark_partition_id, count
# Method 1: Check partition sizes on the JOIN RESULT
joined = orders.join(countries, on='country_code', how='inner')
joined.groupBy(spark_partition_id().alias('pid')) \
.count().orderBy('count', ascending=False).show(10)
# If top partition is 10x+ larger than median -> join skew confirmed
# Method 2: Check key distribution on BOTH sides before joining
print('=== LEFT SIDE key distribution ===')
orders.groupBy('country_code').count() \
.orderBy('count', ascending=False).show(5)
print('=== RIGHT SIDE key distribution ===')
countries.groupBy('country_code').count() \
.orderBy('count', ascending=False).show(5)
# Method 3: Use explain() to see which join strategy Spark chose
joined.explain()
# SortMergeJoin -> full shuffle on both sides -> skew likely
# BroadcastHashJoin -> no shuffle -> no join-key skew possible
# Method 4: Check shuffle read bytes per task in Spark UI
# If one task shows 10x more shuffle read bytes -> join skew

OUTPUT: explain() output revealing join strategy

Plan OutputMeaningSkew Risk
== Physical Plan == SortMergeJoin [country_code]Full shuffle on both sides by country_codeHIGH — hot keys pile into one partition
Exchange hashpartitioning(country_code, 200)200 shuffle partitions by key hashAll ‘IN’ rows -> same 1 of 200 partitions
BroadcastHashJoin, BuildRightRight side was broadcastNONE — no shuffle, no join-key skew

If the right side of a skewed join is small (under 100 MB), broadcast join is the fastest and simplest fix. It sends the entire small DataFrame to every executor in memory. The join becomes a local lookup — no shuffle, no hash partitioning, no skew.

from pyspark.sql.functions import broadcast
# BEFORE: Sort-Merge Join -- 'IN' rows all go to one partition
# skewed = orders.join(countries, on='country_code', how='inner')
# AFTER: Broadcast Join -- no shuffle at all
fixed = orders.join(broadcast(countries), on='country_code', how='inner')
# Verify broadcast was used
fixed.explain()
# Should show: BroadcastHashJoin, BuildRight (NOT SortMergeJoin)
# Check partition balance after broadcast join
fixed.groupBy(spark_partition_id().alias('pid')) \
.count().orderBy('count', ascending=False).show(5)

OUTPUT: Partition distribution AFTER broadcast join — perfectly balanced

partition_idcount BEFOREcount AFTER broadcastImprovement
05,200,000~300,00017x reduction
1280,000~300,000Balanced
2263,000~300,000Balanced
All others~260,000~300,000All balanced

When both sides are large and the right side cannot be broadcast, salting distributes the hot key across multiple partitions by appending a random integer to the join key. The right side is exploded to match all possible salted key variants.

from pyspark.sql.functions import concat_ws, lit, rand, floor, explode, array
SALT_FACTOR = 10 # spread hot key across 10 partitions
# Step 1: Add random salt to the large left DataFrame
# 'IN' rows now become: IN_0, IN_1, IN_2, ..., IN_9
orders_salted = orders.withColumn(
'salted_key',
concat_ws('_', col('country_code'),
floor(rand() * SALT_FACTOR).cast('string'))
)
# Step 2: Explode right DataFrame to create all salt variants
# 'IN' row becomes 10 rows: IN_0, IN_1, ..., IN_9
salt_array = array([lit(i) for i in range(SALT_FACTOR)])
countries_exploded = countries \
.withColumn('salt_arr', salt_array) \
.withColumn('salt', explode(col('salt_arr'))) \
.withColumn('salted_key',
concat_ws('_', col('country_code'), col('salt').cast('string'))) \
.drop('salt_arr', 'salt')
# Step 3: Join on the salted key — hot key is now spread across 10 partitions
result = orders_salted.join(
countries_exploded,
on = 'salted_key',
how = 'inner'
).drop('salted_key')
result.write.mode('overwrite').parquet('/tmp/salted_join_output')

OUTPUT: Partition distribution AFTER salting — hot key split across 10 partitions

partition_idcount BEFOREcount AFTER saltingImprovement
05,200,000521,00010x reduction (IN_0 shard)
1280,000519,000Balanced
2263,000521,000Balanced
3-9~260,000~520,000All balanced

For Spark 3.0 and above, Adaptive Query Execution (AQE) can automatically detect and split oversized join partitions at runtime. Enable it with two configuration lines — no code changes to your join logic are needed.

# Enable AQE globally in your SparkSession
spark.conf.set('spark.sql.adaptive.enabled', 'true')
spark.conf.set('spark.sql.adaptive.skewJoin.enabled', 'true')
# Tune detection thresholds:
# A join partition is 'skewed' if it is BOTH:
# (a) > skewedPartitionFactor x median partition size, AND
# (b) > skewedPartitionThresholdInBytes
spark.conf.set(
'spark.sql.adaptive.skewJoin.skewedPartitionFactor', '5'
)
spark.conf.set(
'spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes', '256MB'
)
# Run the join as normal — AQE handles the rest
result = orders.join(countries, on='country_code', how='inner')
# AQE detects 'IN' partition is 5x larger than median,
# automatically splits it and replicates the matching 'IN' row
# from countries for each sub-task

NULL keys cause join skew because all NULLs hash to the same partition during the shuffle phase — even though NULL rows produce no matches in an INNER JOIN and only produce null-padded rows in a LEFT JOIN.

from pyspark.sql.functions import col, coalesce, lit
# FIX A: Replace NULLs before the join (sentinel value approach)
# Use when NULLs represent a known category (e.g. 'Unassigned')
employees_clean = employees.withColumn(
'dept_id', coalesce(col('dept_id'), lit(-1))
)
result = employees_clean.join(departments, on='dept_id', how='left')
# FIX B: Separate NULLs, join non-NULLs, then union
# Use when NULLs need different handling from matched rows
null_rows = employees.filter(col('dept_id').isNull())
non_null_rows = employees.filter(col('dept_id').isNotNull())
# Join only non-null rows — no NULL partition skew
joined = non_null_rows.join(departments, on='dept_id', how='left')
# Add placeholder columns to null rows to match schema
null_result = null_rows \
.withColumn('dept_name', lit('Unassigned')) \
.withColumn('budget', lit(0).cast('long'))
# Union both back together
final = joined.union(null_result)
Always check NULL percentage in join key columns before running any large join. If NULL% > 5%, apply Fix A or Fix B above. A join on a column where 40% of values are NULL will create a partition with 40% of all data — guaranteed OOM or timeout on any reasonably large dataset.

In multi-table pipelines, fix skew at the first join to prevent it from carrying forward. Cache the fixed intermediate result so it is not recomputed for subsequent joins.

# BEFORE (skew compounds across joins):
# step1 = transactions.join(products, 'product_id') # SKEWED
# step2 = step1.join(categories, 'product_name') # STILL SKEWED
# AFTER: Fix skew at step 1 using broadcast
# Step 1: Broadcast the small products table
step1_fixed = transactions.join(
broadcast(products), # products is small -- broadcast it
on = 'product_id',
how = 'inner'
)
# Cache the fixed intermediate result
# Avoids recomputation for subsequent joins
step1_fixed.cache()
step1_fixed.count() # Materialise the cache
# Step 2: Now join on product_name
# step1_fixed is evenly distributed -- no compounding skew
step2_fixed = step1_fixed.join(
broadcast(categories),
on = 'product_name',
how = 'inner'
)
step2_fixed.write.mode('overwrite').parquet('/tmp/star_join_output')
# Always unpersist when done
step1_fixed.unpersist()

Join Skew ScenarioRight Side SizeRecommended FixExpected Speedup
One hot key dominates left sideSmall (< 100 MB)broadcast(right_df)10-50x
One hot key dominates left sideLarge (> 100 MB)Salting (SALT_FACTOR = skew ratio)5-20x
Moderate skew (5-20x ratio)Any sizeAQE skew join (Spark 3+)3-10x
NULL keys in left side (> 5% NULL)Any sizeReplace NULLs before join5-40x
Skew compounds across multiple joinsSmall dimensionsBroadcast all dims + cache intermediate10-50x
Both sides skewed on same hot keyBoth largeSalting on both sides5-15x

Salting is not just for joins — it is equally powerful for groupBy aggregation skew. The two-stage approach first aggregates on the salted key (partial aggregation per shard), then aggregates the partial results on the original key (final aggregation).

from pyspark.sql.functions import sum as spark_sum, concat_ws, rand, floor, col
SALT_FACTOR = 10
# Stage 1: Partial aggregation on salted key
# 'IN' rows spread across IN_0, IN_1, ..., IN_9
partial = orders \
.withColumn('salt', floor(rand() * SALT_FACTOR).cast('string')) \
.withColumn('salted_key', concat_ws('_', col('country_code'), col('salt'))) \
.groupBy('salted_key', 'country_code') \
.agg(spark_sum('amount').alias('partial_sum'))
# Stage 2: Final aggregation on original key
# Only 10 partial rows per country — no skew possible
final = partial \
.groupBy('country_code') \
.agg(spark_sum('partial_sum').alias('total_amount'))
final.show()

OUTPUT: Final aggregation result — correct totals, no skew

country_codetotal_amount
IN13,520,000,000  (sum of all 5.2M IN orders)
USA14,028,000,000
GBR14,125,000,000
AUS14,000,000,000

NULL values in join or groupBy keys cause silent but severe skew. All NULL rows hash to the same partition. If 30% of your data has a NULL join key, 30% of your data is on a single executor. The fix must be applied before the join or aggregation.

from pyspark.sql.functions import col, coalesce, lit
# FIX A: Replace NULLs with a sentinel value
employees_fixed = employees.withColumn(
'dept_id', coalesce(col('dept_id'), lit(-1))
)
# FIX B: Filter NULLs before join
employees_no_null = employees.filter(col('dept_id').isNotNull())
result = employees_no_null.join(departments, on='dept_id', how='inner')
# FIX C: Separate NULLs, join non-NULLs, union results
null_rows = employees.filter(col('dept_id').isNull())
non_null_rows = employees.filter(col('dept_id').isNotNull())
joined = non_null_rows.join(departments, on='dept_id', how='left')
null_handled = null_rows.withColumn('dept_name', lit('Unassigned')) \
.withColumn('budget', lit(0).cast('long'))
final = joined.union(null_handled)

When skewness is not tied to a specific key value but to the overall partition layout, repartition() is the most direct fix. It performs a full shuffle to redistribute data evenly across a chosen number of partitions.

# Repartition evenly across 200 partitions
df_even = df.repartition(200)
# Repartition BY COLUMN: co-locate rows with same key on same partition
# Avoids shuffle in subsequent join on that column
df_by_key = df.repartition(200, 'country_code')
# coalesce() for reducing partition count WITHOUT a full shuffle
df.coalesce(10).write.parquet('/output/result')
# Check partition count
print(f'Partitions: {df.rdd.getNumPartitions()}')
repartition() triggers a full shuffle — use it once before a chain of heavy operations. Use coalesce() when reducing partition count for output writing, since it avoids a full shuffle.

Always measure the improvement after applying a fix. Use partition distribution checks and Spark UI timing to confirm the fix worked.

def measure_skew(df, label):
from pyspark.sql.functions import spark_partition_id, mean, stddev
pcounts = df.groupBy(spark_partition_id().alias('pid')).count()
stats = pcounts.agg(
mean('count').alias('avg'),
stddev('count').alias('std')
).collect()[0]
max_c = pcounts.agg({'count':'max'}).collect()[0][0]
cv = stats['std'] / stats['avg'] if stats['avg'] else 0
print(f'\n--- {label} ---')
print(f' Max partition : {max_c:,}')
print(f' Avg partition : {stats["avg"]:,.0f}')
print(f' Max/Avg ratio : {max_c/stats["avg"]:.1f}x')
print(f' CV score : {cv:.2f}')
print(f' Skew Level : {"SEVERE" if cv>3 else "MODERATE" if cv>1 else "LOW"}')
measure_skew(orders.join(countries,'country_code'), 'BEFORE FIX')
measure_skew(orders.join(broadcast(countries),'country_code'),'AFTER BROADCAST')
measure_skew(orders_salted.join(countries_exploded,'salted_key'),'AFTER SALTING')

OUTPUT: Before vs after comparison

MetricBEFOREAFTER BroadcastAFTER Salting
Max partition size5,200,000~300,000521,000
Max / Avg ratio19.2x1.05x1.03x
CV (skew score)1.520.040.03
Skew LevelMODERATELOWLOW
Stage Duration~45 min~2 min~4 min

Diagnosis ResultRoot CauseRecommended FixSection
One hot key has 10x+ more rows; large right sideHot key join skewSalting5.5
Right-side DataFrame is small (< 100 MB)Join shuffle skewBroadcast join5.4
Running Spark 3.0+ with moderate join skew (5-20x)Join skewAQE Skew Join (config only)5.6
NULL values > 5% of join/groupBy key columnNULL key skewReplace NULLs or split + union5.7 / 7
Skew compounds across multiple star-schema joinsCascading join skewBroadcast all dims + cache5.8
GroupBy aggregation skew on hot keyAggregation skewTwo-stage salted aggregation6
Source data has uneven file/partition sizesIngestion skewrepartition() after read8

CheckCode / ActionSkew If…
Partition size distributiondf.groupBy(spark_partition_id()).count().show()Max partition > 5x median partition
Hot key distributiondf.groupBy(‘key_col’).count().orderBy(‘count’, desc).show()Top key has > 20% of all rows
Statistical skew scoreCV = stddev / mean on key countsCV > 1.0 (moderate), CV > 3.0 (severe)
NULL key percentagedf.filter(col(‘key’).isNull()).count() / df.count()NULL% > 5% of total rows
Join strategy checkjoined_df.explain() — look for SortMergeJoin vs BroadcastHashJoinSortMergeJoin on a column with hot keys
Spark UI — Stages tabCheck Max Duration vs Median DurationMax / Median > 5x in the slow stage
Spark UI — Task listSort tasks by Duration in the slow stageOne or two tasks are extreme outliers

Data skewness is not random bad luck — it is a predictable consequence of data distribution combined with hash-based partitioning. Understanding its causes makes it entirely preventable and fixable.

Joins are the most common trigger of skewness in PySpark. When Spark shuffles data for a Sort-Merge Join, every row with the same key goes to the same partition — turning hot keys into single-executor bottlenecks. The fix is always one of four approaches: broadcast the small side to eliminate the shuffle entirely; salt the hot key to spread it across partitions; enable AQE to let Spark handle it automatically at runtime; or handle NULLs before they collapse into one partition.

The workflow is always the same: Identify first — use the Spark UI and partition distribution checks to confirm skew and find the hot keys. Choose the right fix based on the type of skew. Measure the improvement — verify with CV scores and Spark UI timing. A 45-minute job caused by a single skewed partition can run in 2 minutes after a one-line broadcast hint.


Discover more from DataSangyan

Subscribe to get the latest posts sent to your email.

Leave a Reply