Mastering PySpark Window Functions: A Complete Guide

Window functions are one of the most powerful features in PySpark for analytical workloads. They allow you to compute values across a set of rows that are related to the current row — without collapsing the result set the way groupBy does. Every row in the result keeps its original identity while still gaining a computed value based on its surrounding rows.

PySpark window functions mirror the SQL:2003 standard OVER clause and are available through pyspark.sql.window.Window and pyspark.sql.functions. They are essential for time-series analysis, ranking, running totals, period-over-period comparisons, and sessionisation.

This blog covers every category of PySpark window function — syntax, the WindowSpec API, ranking, aggregate, offset (lag/lead), and distribution functions — with full code examples and output tables after every example.

All examples use the following sales DataFrame. Familiarise yourself with the data before diving into the examples.

sales DataFrame

emp_idnamedepartmentsalaryhire_yearregionsales_amountmonth
E01AliceEngineering950002020North420002024-01
E02BobEngineering820002021North380002024-01
E03CarolMarketing740002019South610002024-01
E04DavidMarketing680002022South540002024-01
E05EveEngineering910002020East470002024-01
E06FrankHR610002018West280002024-01
E07GraceHR580002023West250002024-01
E08HankMarketing710002021East580002024-01

Every window function in PySpark requires a WindowSpec object that defines the window over which the function operates. A WindowSpec has three optional components: partitionBy, orderBy, and a frame specification.

from pyspark.sql.window import Window
from pyspark.sql import functions as F
# Full WindowSpec with all three components
window_spec = (
Window
.partitionBy('department') # divide rows into independent groups
.orderBy(F.col('salary').desc()) # sort within each partition
.rowsBetween( # frame: from partition start to current row
Window.unboundedPreceding,
Window.currentRow
)
)
# Minimal WindowSpec (no partition, no frame)
global_window = Window.orderBy('sales_amount')
# Partition only (no order — for aggregate functions)
dept_window = Window.partitionBy('department')

Frame ExpressionMeaning
Window.unboundedPrecedingStart of the partition
Window.currentRowThe current row
Window.unboundedFollowingEnd of the partition
rowsBetween(start, end)Physical row offsets (exact row count)
rangeBetween(start, end)Logical value offsets (value-based range)

Ranking functions assign a sequential number to each row within a partition based on an ORDER BY expression. PySpark offers four ranking functions, each handling ties differently.

FunctionTiesGaps After TiesExample (ties at 2nd)
row_number()Arbitrary uniqueN/A1, 2, 3, 4
rank()Same rank for tiesYes1, 2, 2, 4
dense_rank()Same rank for tiesNo1, 2, 2, 3
ntile(n)Divides into bucketsN/A1, 1, 2, 2

from pyspark.sql.window import Window
from pyspark.sql import functions as F
window = Window.partitionBy('department').orderBy(F.col('salary').desc())
df_ranked = df.withColumn('row_num', F.row_number().over(window))
df_ranked.select('name', 'department', 'salary', 'row_num').show()

 

OUTPUT: row_number() — unique rank within each department by salary desc

namedepartmentsalaryrow_num
AliceEngineering950001
EveEngineering910002
BobEngineering820003
CarolMarketing740001
HankMarketing710002
DavidMarketing680003
FrankHR610001
GraceHR580002

window = Window.partitionBy('department').orderBy(F.col('salary').desc())
df_ranks = df.withColumn('rnk', F.rank().over(window)) \
.withColumn('dense_rnk', F.dense_rank().over(window))
df_ranks.select('name', 'department', 'salary', 'rnk', 'dense_rnk').show()

OUTPUT: rank() vs dense_rank() — notice gap behaviour with tied salaries

namedepartmentsalaryrank()dense_rank()
AliceEngineering9500011
EveEngineering9100022
BobEngineering8200033
CarolMarketing7400011
HankMarketing7100022
DavidMarketing6800033
FrankHR6100011
GraceHR5800022

# Divide employees into salary quartiles (4 buckets) per department
window = Window.partitionBy('department').orderBy(F.col('salary').desc())
df_ntile = df.withColumn('quartile', F.ntile(4).over(window))
df_ntile.select('name', 'department', 'salary', 'quartile').show()

 

OUTPUT: ntile(4) — salary quartile within department

namedepartmentsalaryquartile
AliceEngineering950001
EveEngineering910002
BobEngineering820003
CarolMarketing740001
HankMarketing710002
DavidMarketing680003
FrankHR610001
GraceHR580002

# Find the top 2 highest-paid employees per department
window = Window.partitionBy('department').orderBy(F.col('salary').desc())
df_top2 = (
df
.withColumn('rnk', F.dense_rank().over(window))
.filter(F.col('rnk') <= 2)
.select('name', 'department', 'salary', 'rnk')
.orderBy('department', 'rnk')
)
df_top2.show()

 

OUTPUT: Top 2 employees per department by salary

namedepartmentsalaryrnk
AliceEngineering950001
EveEngineering910002
FrankHR610001
GraceHR580002
CarolMarketing740001
HankMarketing710002

All standard PySpark aggregate functions — sum, avg, count, min, max — can be used as window functions by appending .over(window_spec). Unlike groupBy, they return a value for every row while still computing across the defined window.

# Cumulative salary sum per department ordered by salary asc
window = (
Window
.partitionBy('department')
.orderBy('salary')
.rowsBetween(Window.unboundedPreceding, Window.currentRow)
)
df_running = df.withColumn('running_salary_total', F.sum('salary').over(window))
df_running.select('name', 'department', 'salary', 'running_salary_total') \
.orderBy('department', 'salary').show()

 

OUTPUT: Running total of salary within each department

namedepartmentsalaryrunning_salary_total
BobEngineering8200082000
EveEngineering91000173000
AliceEngineering95000268000
GraceHR5800058000
FrankHR61000119000
DavidMarketing6800068000
HankMarketing71000139000
CarolMarketing74000213000

# No ORDER BY — aggregates across entire partition
dept_window = Window.partitionBy('department')
df_avg = df.withColumn('dept_avg_salary', F.round(F.avg('salary').over(dept_window), 0)) \
.withColumn('dept_max_salary', F.max('salary').over(dept_window)) \
.withColumn('diff_from_avg', F.col('salary') - F.round(F.avg('salary').over(dept_window), 0))
df_avg.select('name', 'department', 'salary', 'dept_avg_salary', 'diff_from_avg') \
.orderBy('department', 'salary').show()

 

OUTPUT: Each employee vs department average salary

namedepartmentsalarydept_avgdiff_from_avg
BobEngineering8200089333-7333
EveEngineering9100089333+1667
AliceEngineering9500089333+5667
GraceHR5800059500-1500
FrankHR6100059500+1500
DavidMarketing6800071000-3000
HankMarketing71000710000
CarolMarketing7400071000+3000

# 3-row rolling average of sales_amount ordered by month
rolling_window = (
Window
.partitionBy('region')
.orderBy('month')
.rowsBetween(-2, Window.currentRow) # current + 2 preceding rows
)
df_rolling = df.withColumn(
'rolling_avg_sales',
F.round(F.avg('sales_amount').over(rolling_window), 0)
)
df_rolling.select('name', 'region', 'sales_amount', 'rolling_avg_sales').show()

 

OUTPUT: 3-row rolling average of sales per region

nameregionsales_amountrolling_avg_sales
AliceNorth4200042000
BobNorth3800040000
CarolSouth6100061000
DavidSouth5400057500
EveEast4700047000
HankEast5800052500
FrankWest2800028000
GraceWest2500026500

dept_window = Window.partitionBy('department')
df_pct = df.withColumn(
'pct_of_dept_sales',
F.round(
F.col('sales_amount') * 100 / F.sum('sales_amount').over(dept_window),
1
)
)
df_pct.select('name', 'department', 'sales_amount', 'pct_of_dept_sales') \
.orderBy('department', F.col('pct_of_dept_sales').desc()).show()

 

OUTPUT: Each employee’s sales as % of their department total

namedepartmentsales_amountpct_of_dept_sales
EveEngineering4700037.6
AliceEngineering4200033.6
BobEngineering3800030.4
CarolMarketing6100035.3
HankMarketing5800033.5
DavidMarketing5400031.2
FrankHR2800052.8
GraceHR2500047.2

lag() accesses the value of a column from a previous row. lead() accesses a value from a following row. Both are indispensable for period-over-period comparisons and detecting changes between consecutive rows.

FunctionReturnsSyntax
lag(col, n, default)Value from n rows BEFORE current rowF.lag(‘col’, 1, 0).over(window)
lead(col, n, default)Value from n rows AFTER current rowF.lead(‘col’, 1, 0).over(window)

from pyspark.sql import functions as F
from pyspark.sql.window import Window
window = Window.partitionBy('region').orderBy('month')
df_mom = df \
.withColumn('prev_month_sales', F.lag('sales_amount', 1).over(window)) \
.withColumn('mom_change',
F.col('sales_amount') - F.lag('sales_amount', 1).over(window)) \
.withColumn('mom_pct_change', F.round(
(F.col('sales_amount') - F.lag('sales_amount', 1).over(window))
* 100.0 / F.lag('sales_amount', 1).over(window), 1))
df_mom.select('name', 'region', 'sales_amount', 'prev_month_sales',
'mom_change', 'mom_pct_change').show()

 

OUTPUT: Month-over-month change using lag()

nameregionsales_amountprev_salesmom_changemom_pct_change
AliceNorth42000nullnullnull
BobNorth3800042000-4000-9.5
CarolSouth61000nullnullnull
DavidSouth5400061000-7000-11.5
EveEast47000nullnullnull
HankEast5800047000+11000+23.4
FrankWest28000nullnullnull
GraceWest2500028000-3000-10.7

# Order by salary and look ahead to the next employee's salary
window = Window.partitionBy('department').orderBy('salary')
df_lead = df \
.withColumn('next_salary', F.lead('salary', 1).over(window)) \
.withColumn('next_name', F.lead('name', 1).over(window)) \
.withColumn('salary_gap', F.lead('salary', 1).over(window) - F.col('salary'))
df_lead.select('name', 'department', 'salary', 'next_name',
'next_salary', 'salary_gap').show()

 

OUTPUT: lead() — look ahead to the next employee’s salary in department

namedepartmentsalarynext_namenext_salarysalary_gap
BobEngineering82000Eve910009000
EveEngineering91000Alice950004000
AliceEngineering95000nullnullnull
GraceHR58000Frank610003000
FrankHR61000nullnullnull
DavidMarketing68000Hank710003000
HankMarketing71000Carol740003000
CarolMarketing74000nullnullnull

first_value() returns the value from the first row in the window frame. last_value() returns the value from the last row. These are useful for carrying a baseline value forward or comparing any row against an anchor.

window = (
Window
.partitionBy('department')
.orderBy(F.col('salary').desc())
.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
)
df_fv = df \
.withColumn('highest_salary', F.first('salary').over(window)) \
.withColumn('lowest_salary', F.last('salary').over(window)) \
.withColumn('top_earner', F.first('name').over(window))
df_fv.select('name', 'department', 'salary', 'highest_salary',
'lowest_salary', 'top_earner').show()

 

OUTPUT: first_value() and last_value() with unbounded frame

namedepartmentsalaryhighest_salarylowest_salarytop_earner
AliceEngineering950009500082000Alice
EveEngineering910009500082000Alice
BobEngineering820009500082000Alice
CarolMarketing740007400068000Carol
HankMarketing710007400068000Carol
DavidMarketing680007400068000Carol
FrankHR610006100058000Frank
GraceHR580006100058000Frank

Always specify rowsBetween(unboundedPreceding, unboundedFollowing) with last_value(). Without it, the default frame ends at currentRow, making last_value() always return the current row’s value.

Distribution functions compute the relative position of a row within its partition as a value between 0 and 1. They are used for percentile analysis, histograms, and segmentation.

FunctionFormulaRangeInterpretation
percent_rank()(rank – 1) / (total_rows – 1)0.0 to 1.0% of rows ranked BELOW this row
cume_dist()rank / total_rows1/n to 1.0% of rows with value <= this row
window = Window.partitionBy('department').orderBy('salary')
df_dist = df \
.withColumn('pct_rank', F.round(F.percent_rank().over(window), 2)) \
.withColumn('cume_dist', F.round(F.cume_dist().over(window), 2))
df_dist.select('name', 'department', 'salary', 'pct_rank', 'cume_dist') \
.orderBy('department', 'salary').show()

 

OUTPUT: percent_rank() and cume_dist() within department

namedepartmentsalarypct_rankcume_dist
BobEngineering820000.00.33
EveEngineering910000.50.67
AliceEngineering950001.01.0
GraceHR580000.00.5
FrankHR610001.01.0
DavidMarketing680000.00.33
HankMarketing710000.50.67
CarolMarketing740001.01.0

from pyspark.sql import functions as F
from pyspark.sql.window import Window
# Assume df_annual has columns: year, region, annual_sales
window = Window.partitionBy('region').orderBy('year')
df_yoy = df_annual \
.withColumn('prev_year_sales', F.lag('annual_sales', 1).over(window)) \
.withColumn('yoy_growth_pct', F.round(
(F.col('annual_sales') - F.lag('annual_sales', 1).over(window))
* 100.0 / F.lag('annual_sales', 1).over(window),
1
))

from pyspark.sql import functions as F
from pyspark.sql.window import Window
# Flag a new session when gap > 30 minutes
w_ordered = Window.partitionBy('user_id').orderBy('event_time')
df_flagged = clickstream.withColumn(
'new_session_flag',
F.when(
(F.col('event_time').cast('long') -
F.lag('event_time', 1).over(w_ordered).cast('long')) > 1800,
1
).otherwise(0)
)
# Cumulative sum of flags = session number
w_cum = w_ordered.rowsBetween(Window.unboundedPreceding, Window.currentRow)
df_sessions = df_flagged.withColumn(
'session_id',
F.sum('new_session_flag').over(w_cum)
)

# Rank by sales_amount desc, break ties by salary desc
window = (
Window
.partitionBy('department')
.orderBy(F.col('sales_amount').desc(), F.col('salary').desc())
)
df_multi = df.withColumn('sales_rank', F.dense_rank().over(window))
df_multi.select('name', 'department', 'sales_amount', 'salary', 'sales_rank') \
.orderBy('department', 'sales_rank').show()

 

OUTPUT: Dense rank with tie-breaking on salary

namedepartmentsales_amountsalarysales_rank
EveEngineering47000910001
AliceEngineering42000950002
BobEngineering38000820003
CarolMarketing61000740001
HankMarketing58000710002
DavidMarketing54000680003
FrankHR28000610001
GraceHR25000580002

# Count employees hired in or after 2021 — running count per department
window = (
Window
.partitionBy('department')
.orderBy('hire_year')
.rowsBetween(Window.unboundedPreceding, Window.currentRow)
)
df_count = df.withColumn(
'running_recent_hires',
F.sum(F.when(F.col('hire_year') >= 2021, 1).otherwise(0)).over(window)
)
df_count.select('name', 'department', 'hire_year', 'running_recent_hires') \
.orderBy('department', 'hire_year').show()

Understanding the difference between window functions and groupBy is essential to writing efficient, correct PySpark code.

AspectgroupBy + agg()Window Function
Output rowsOne row per groupAll original rows preserved
Other columnsOnly grouped/aggregated columnsAll columns freely selectable
Per-row contextNo access to individual rowsEach row sees its peers
RankingNot possiblerow_number, rank, dense_rank
Running totalsNot possiblesum().over() with cumulative frame
Period comparisonNot possiblelag() / lead()
Use in filterUse .filter() after agg()Must wrap in subquery or temp view
PerformanceSingle shuffleCan require repartition/sort
# groupBy — one row per department (loses individual rows)
df_grp = df.groupBy('department').agg(F.avg('salary').alias('avg_salary'))
# Window — keeps all rows AND adds the department average
dept_window = Window.partitionBy('department')
df_win = df.withColumn('dept_avg', F.round(F.avg('salary').over(dept_window), 0))
# Join approach (messy — use window instead)
# df_joined = df.join(df_grp, on='department', how='left')

  1. Repartition by the PARTITION BY column before applying window functions. This co-locates all rows of the same partition on the same executor, avoiding shuffle.
  2. Minimise the number of distinct window definitions in the same transformation — reuse WindowSpec objects.
  3. Use orderBy only when required (ranking, offset, cumulative). Aggregate-only windows (sum, avg) do not need ORDER BY.
  4. Avoid applying window functions on columns with very high cardinality in PARTITION BY — it creates millions of tiny partitions.
  5. For large datasets, enable Adaptive Query Execution: spark.sql.adaptive.enabled = true.
  6. Use persist() or cache() before applying multiple window functions on the same DataFrame.
# Pre-repartition by partition key before windowing
df_repartitioned = df.repartition(200, 'department')
# Reuse WindowSpec — define once, use many times
dept_sal_window = Window.partitionBy('department').orderBy(F.col('salary').desc())
df_result = df_repartitioned \
.withColumn('rnk', F.rank().over(dept_sal_window)) \
.withColumn('dense_rnk', F.dense_rank().over(dept_sal_window)) \
.withColumn('row_num', F.row_number().over(dept_sal_window))
# Cache if applying multiple unrelated windows
df_result.cache()
df_result.count() # trigger caching

GoalPySpark Pattern
Unique row number per partitionF.row_number().over(Window.partitionBy(‘grp’).orderBy(‘col’))
Rank with gapsF.rank().over(Window.partitionBy(‘grp’).orderBy(col.desc()))
Rank without gapsF.dense_rank().over(Window.partitionBy(‘grp’).orderBy(col.desc()))
Top N per group.withColumn(‘rnk’, dense_rank().over(w)).filter(col(‘rnk’) <= N)
Quartile / decile bucketsF.ntile(4).over(Window.partitionBy(‘grp’).orderBy(‘col’))
Running totalF.sum(‘col’).over(Window…rowsBetween(unboundedPreceding, currentRow))
Rolling N-row averageF.avg(‘col’).over(Window…rowsBetween(-N+1, currentRow))
% of partition totalcol * 100 / F.sum(‘col’).over(Window.partitionBy(‘grp’))
Compare to partition averagecol – F.avg(‘col’).over(Window.partitionBy(‘grp’))
Previous row valueF.lag(‘col’, 1).over(Window.partitionBy(‘grp’).orderBy(‘date’))
Next row valueF.lead(‘col’, 1).over(Window.partitionBy(‘grp’).orderBy(‘date’))
Period-over-period changecol – F.lag(‘col’, 1).over(w)
First value in partitionF.first(‘col’).over(Window…rowsBetween(unboundedPreceding, unboundedFollowing))
Last value in partitionF.last(‘col’).over(Window…rowsBetween(unboundedPreceding, unboundedFollowing))
Percentile rank (0 to 1)F.percent_rank().over(Window.partitionBy(‘grp’).orderBy(‘col’))
Cumulative distributionF.cume_dist().over(Window.partitionBy(‘grp’).orderBy(‘col’))

PySpark window functions are a must-have tool for any data engineer or analyst working with Apache Spark. They elegantly solve problems that would otherwise require self-joins, complex subqueries, or multiple groupBy operations — while keeping every original row in the result.

The foundation is the WindowSpec: partitionBy defines your groups, orderBy defines the sequence within each group, and the optional frame clause pins the exact rows each calculation sees. Layer on ranking, aggregation, lag/lead, and distribution functions to express any analytical pattern from running totals to year-over-year comparisons to sessionisation.

Master these patterns and you will write faster, cleaner, and more expressive PySpark transformations across any scale of data.


Discover more from DataSangyan

Subscribe to get the latest posts sent to your email.

Leave a Reply