Understanding PySpark JOINs Types for Data Engineering

Apache Spark is the go-to engine for large-scale distributed data processing, and PySpark brings Spark’s power to Python. At the heart of almost every data pipeline is the need to combine datasets — matching records across tables, enriching data with reference information, or identifying gaps and mismatches between sources.

PySpark DataFrames support all the JOIN types you know from SQL, plus two powerful extras — Semi Join and Anti Join — that have no direct SQL equivalent but are invaluable for filtering workflows. Understanding which join to use, and how PySpark executes it across a distributed cluster, is what separates a good data engineer from a great one.

This guide covers all seven PySpark join types with syntax, output tables, real-world use cases, and performance considerations including broadcast joins and shuffle optimisation.

All examples use two DataFrames: employees and departments. The deliberate asymmetry — Diana has no department, and HR has no employees — makes each join type’s behaviour immediately visible.

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, broadcast
spark = SparkSession.builder.appName('JoinDemo').getOrCreate()
# Employees DataFrame
emp_data = [
(1, 'Alice', 10, 95000),
(2, 'Bob', 20, 82000),
(3, 'Charlie', 30, 74000), # dept 30 does not exist
(4, 'Diana', None, 61000), # no department
]
emp_cols = ['emp_id', 'name', 'dept_id', 'salary']
employees = spark.createDataFrame(emp_data, emp_cols)
# Departments DataFrame
dept_data = [
(10, 'Engineering', 500000),
(20, 'Marketing', 300000),
(40, 'HR', 200000), # no employees
]
dept_cols = ['dept_id', 'dept_name', 'budget']
departments = spark.createDataFrame(dept_data, dept_cols)

OUTPUT: employees DataFrame

emp_idnamedept_idsalary
1Alice1095000
2Bob2082000
3Charlie3074000
4Diananull61000

OUTPUT: departments DataFrame

dept_iddept_namebudget
10Engineering500000
20Marketing300000
40HR200000

PySpark provides the .join() method on DataFrames. The three key parameters are: the right DataFrame, the join condition, and the join type string.

# General syntax
result = left_df.join(
right_df,
on = 'column_name', # or col condition, or list of cols
how = 'inner' # join type string
)
# Join condition options:
# String: on='dept_id'
# Column expr: on=employees['dept_id'] == departments['dept_id']
# List: on=['dept_id', 'region'] (multi-column join)

how= StringJoin TypeSQL Equivalent
‘inner’INNER JOININNER JOIN
‘left’LEFT OUTER JOINLEFT JOIN
‘right’RIGHT OUTER JOINRIGHT JOIN
‘outer’ / ‘full’FULL OUTER JOINFULL OUTER JOIN
‘cross’CROSS JOINCROSS JOIN
‘left_semi’Left Semi JoinWHERE EXISTS (subquery)
‘left_anti’Left Anti JoinWHERE NOT EXISTS (subquery)

Inner join is the default and most common join type. It returns only rows where the join condition matches in both DataFrames. Rows that have no match on either side are excluded from the result.

result = employees.join(
departments,
on = 'dept_id',
how = 'inner'
)
result.show()
# Equivalent column-expression syntax:
result = employees.join(
departments,
employees['dept_id'] == departments['dept_id'],
'inner'
)

OUTPUT: Inner Join — only rows with a match in both DataFrames

emp_idnamedept_idsalarydept_namebudget
1Alice1095000Engineering500000
2Bob2082000Marketing300000
Charlie (dept 30 not found) and Diana (null dept_id) are excluded. HR (no employees) is also excluded. Only the two matched rows appear.

Left join keeps every row from the left DataFrame. Where no matching row exists in the right DataFrame, the right-side columns are filled with null. This is the most widely used join after inner join — essential for enriching records while preserving all source rows.

result = employees.join(
departments,
on = 'dept_id',
how = 'left' # also accepts 'left_outer'
)
result.show()

OUTPUT: Left Join — all employees, nulls for unmatched right rows

emp_idnamedept_idsalarydept_namebudget
1Alice1095000Engineering500000
2Bob2082000Marketing300000
3Charlie3074000nullnull
4Diananull61000nullnull

Find Employees Without a Valid Department

# Classic null-filter pattern after left join
unassigned = employees.join(
departments,
on = 'dept_id',
how = 'left'
).filter(col('dept_name').isNull())
unassigned.select('emp_id', 'name', 'dept_id').show()
# Returns: Charlie (dept 30 missing) and Diana (null dept_id)
The .filter(col(‘dept_name’).isNull()) pattern is one of the most powerful uses of left join in PySpark — it reliably surfaces orphaned records that have no matching entry in the reference table.

Right join is the exact mirror of left join. It keeps every row from the right DataFrame, filling left-side columns with null where no match exists. In practice, most PySpark developers prefer to swap DataFrame order and use left join for consistency.

result = employees.join(
departments,
on = 'dept_id',
how = 'right' # also accepts 'right_outer'
)
result.show()

OUTPUT: Right Join — all departments, nulls for unmatched left rows

emp_idnamedept_idsalarydept_namebudget
1Alice1095000Engineering500000
2Bob2082000Marketing300000
nullnull40nullHR200000
HR appears with null on the employee side because no employee has dept_id=40. This is equivalent to: departments.join(employees, on=’dept_id’, how=’left’).

Full outer join is the union of left and right joins. Every row from both DataFrames appears — with nulls filling in wherever there is no match on the opposite side. This gives the most complete view of both datasets and is ideal for data reconciliation and gap analysis.

result = employees.join(
departments,
on = 'dept_id',
how = 'outer' # also accepts 'full' or 'full_outer'
)
result.show()

OUTPUT: Full Outer Join — all rows from both DataFrames

emp_idnamedept_idsalarydept_namebudget
1Alice1095000Engineering500000
2Bob2082000Marketing300000
3Charlie3074000nullnull
4Diananull61000nullnull
nullnull40nullHR200000
Full outer join is the most expensive join type — Spark must shuffle both DataFrames across the cluster and produce the union. Use it only when you genuinely need to surface gaps on both sides.

Cross join produces the Cartesian product — every row from the left DataFrame is paired with every row from the right DataFrame. With 4 employees and 3 departments, the result has 4 x 3 = 12 rows. No join condition is needed.

# PySpark requires crossJoin() method OR how='cross'
# Option A: dedicated method (recommended)
result = employees.crossJoin(departments)
# Option B: join with how='cross'
result = employees.join(departments, how='cross')
result.count() # Returns 12
result.show()

OUTPUT: Cross Join — every employee paired with every department (12 rows)

emp_idnamedept_id (emp)dept_id (dept)dept_name
1Alice1010Engineering
1Alice1020Marketing
1Alice1040HR
2Bob2010Engineering
… (12 rows total)
PySpark requires spark.conf.set(‘spark.sql.crossJoin.enabled’, ‘true’) in older versions. In Spark 3.x this is enabled by default. Never trigger a cross join accidentally — on large tables it can produce billions of rows and cause cluster failure.

Left semi join is PySpark’s equivalent of SQL’s WHERE EXISTS pattern. It returns only the rows from the left DataFrame that have a matching row in the right DataFrame — but no columns from the right DataFrame are included in the result. It is a pure filter using the right DataFrame as a lookup.

# Return only employees who belong to a valid department
result = employees.join(
departments,
on = 'dept_id',
how = 'left_semi'
)
result.show()
# Note: result contains ONLY employees columns, not departments columns

OUTPUT: Left Semi Join — employees who have a matching department (left columns only)

emp_idnamedept_idsalary
1Alice1095000
2Bob2082000
The key distinction from inner join: semi join returns only left DataFrame columns. No dept_name or budget appears. This makes semi join more efficient than inner join + select when you only need to filter, because Spark doesn’t need to carry right-side data through the shuffle.

Left anti join is the opposite of left semi join. It returns only the rows from the left DataFrame that have no matching row in the right DataFrame. This is PySpark’s equivalent of SQL’s WHERE NOT EXISTS pattern and is perfect for finding records that are missing from a reference table.

# Return employees who do NOT have a valid department
result = employees.join(
departments,
on = 'dept_id',
how = 'left_anti'
)
result.show()
# Returns Charlie (dept 30 missing) and Diana (null dept_id)

OUTPUT: Left Anti Join — employees with no matching department

emp_idnamedept_idsalary
3Charlie3074000
4Diananull61000
Anti join is far more efficient than left join + filter(isNull()) for large DataFrames. Spark’s optimiser can prune the right side early in anti join, whereas left join + filter must carry the full null rows through the shuffle before filtering them.

how= StringLeft RowsRight RowsRight Cols?Best Used For
‘inner’Matched onlyMatched onlyYesFetch only fully matched records
‘left’AllMatched/nullYesEnrich left data; preserve all left rows
‘right’Matched/nullAllYesPreserve all right rows; expose left gaps
‘outer’AllAllYesFull reconciliation; surface all gaps
‘cross’All x AllAll x AllYesCartesian products; combinatorial data
‘left_semi’Matched onlyNot includedNoFilter left using right as a lookup
‘left_anti’Unmatched onlyNot includedNoFind records missing from right table

When one DataFrame is significantly smaller than the other, a broadcast join is the single biggest performance optimisation available. Instead of shuffling both DataFrames across the cluster, Spark sends a copy of the small DataFrame to every executor — eliminating the shuffle entirely.

from pyspark.sql.functions import broadcast
# Explicitly broadcast the smaller DataFrame
result = employees.join(
broadcast(departments), # departments is small -- broadcast it
on = 'dept_id',
how = 'inner'
)
# Auto-broadcast: Spark broadcasts automatically if a DataFrame
# is below the threshold (default: 10 MB)
spark.conf.set('spark.sql.autoBroadcastJoinThreshold', 10 * 1024 * 1024)
# Disable auto-broadcast (force sort-merge join):
spark.conf.set('spark.sql.autoBroadcastJoinThreshold', -1)

When to Use Broadcast Join

ScenarioRecommended Join Strategy
One DataFrame < 10 MB (e.g. lookup/reference table)broadcast() — eliminates shuffle completely
Both DataFrames are large (GB-scale)Sort-merge join (Spark default for large tables)
Joining on a skewed key (many rows share same value)Salting + broadcast or AQE skew join handling
Repeated joins against the same small DataFrameCache/persist the small DF + broadcast
Adaptive Query Execution (AQE), available from Spark 3.0+, can automatically switch to broadcast join at runtime if it detects one side is small enough after collecting shuffle statistics. Enable it with: spark.conf.set(‘spark.sql.adaptive.enabled’, ‘true’).

When joining on a column with the same name in both DataFrames (e.g. dept_id), PySpark produces two columns with the same name in the result if you use the column-expression syntax. This causes ambiguity errors downstream. The string syntax automatically resolves this.

# PROBLEM: column-expression syntax creates duplicate dept_id columns
result = employees.join(
departments,
employees['dept_id'] == departments['dept_id'], # two dept_id columns!
'inner'
)
result.select('dept_id') # ERROR: Reference 'dept_id' is ambiguous
# SOLUTION 1: Use string syntax (preferred) -- keeps only one dept_id
result = employees.join(departments, on='dept_id', how='inner')
# SOLUTION 2: Drop the duplicate after join
result = employees.join(
departments,
employees['dept_id'] == departments['dept_id'],
'inner'
).drop(departments['dept_id'])
# SOLUTION 3: Rename before joining
depts = departments.withColumnRenamed('dept_id', 'd_dept_id')
result = employees.join(depts, employees['dept_id'] == depts['d_dept_id'], 'inner')
Always prefer the string or list syntax for join conditions when the column names are the same on both sides. It is cleaner, avoids ambiguity, and produces a single merged column in the result — not two identical ones.

When a join requires matching on multiple columns simultaneously, pass a list of column names to the on parameter. All columns in the list must match for a row to be included.

# Multi-column join using list syntax
result = orders.join(
order_items,
on = ['order_id', 'product_id'], # both columns must match
how = 'inner'
)
# Multi-column join using column expressions (when column names differ)
result = transactions.join(
accounts,
(transactions['account_id'] == accounts['id']) &
(transactions['region'] == accounts['region']),
'inner'
)

MistakeWhat Goes WrongCorrect Approach
Using column-expression syntax with same-named columnsTwo ambiguous columns in result; downstream errorsUse string or list syntax: on=’dept_id’
Forgetting to broadcast a small DataFrameUnnecessary shuffle; slow join on large clusterWrap small DF with broadcast(): broadcast(dept_df)
Joining on a nullable column without handling nullsNull keys never match — rows silently droppedFilter nulls before join or use coalesce on the key
Using left join when semi join is sufficientCarries right-side columns through shuffle unnecessarilyUse left_semi when you only need to filter, not enrich
Cross joining large DataFrames accidentallyBillions of rows; cluster OOM or timeoutAlways verify ON conditions; use crossJoin() explicitly
Performing multiple joins without cachingSpark recomputes the same base DataFrame repeatedlyCache/persist shared base DataFrames before multi-joins
Not enabling AQE on Spark 3+Misses runtime optimisations like skew join handlingSet spark.sql.adaptive.enabled = true

Join Typehow= ValueReturnsRight Cols Included?
Inner Join‘inner’Rows matching in both DataFramesYes
Left Join‘left’All left rows + matched right (nulls)Yes
Right Join‘right’All right rows + matched left (nulls)Yes
Full Outer Join‘outer’All rows from both; nulls both sidesYes
Cross Join‘cross’Every left row x every right row (N x M)Yes
Left Semi Join‘left_semi’Left rows with a match (no right cols)No
Left Anti Join‘left_anti’Left rows with NO match (no right cols)No
Performance StrategyWhen to Apply
broadcast(small_df)One side < 10 MB — eliminates shuffle entirely
spark.sql.adaptive.enabled = trueSpark 3+ — enables runtime join optimisation and skew handling
Cache/persist shared DataFramesSame DataFrame used in multiple joins in one pipeline
Salting skewed keysOne key value has disproportionately many rows — causes slow tasks
Use left_semi instead of inner + selectWhen filtering only; avoids carrying right-side data through shuffle
Partition on join key before writingPre-partition data to avoid shuffle on repeated joins in pipelines

PySpark’s join API is concise — a single .join() call with a how parameter — but the behaviour behind each join type is powerful and distinct. Choosing the right join type is not just about correctness; it is a performance decision that directly affects shuffle size, memory pressure, and pipeline speed on a distributed cluster.

Use inner join for matched lookups, left join to preserve source records, full outer join for reconciliation, semi join for efficient filtering, and anti join to surface what is missing. Always broadcast small DataFrames, enable AQE on Spark 3+, and cache DataFrames that are reused across multiple joins.

Master these seven join types and their performance implications, and you will have the foundation to build fast, correct, and scalable PySpark pipelines regardless of data size.

Happy Sparking!


Discover more from DataSangyan

Subscribe to get the latest posts sent to your email.

Leave a Reply