1. Introduction
Window functions are one of the most powerful features in PySpark for analytical workloads. They allow you to compute values across a set of rows that are related to the current row — without collapsing the result set the way groupBy does. Every row in the result keeps its original identity while still gaining a computed value based on its surrounding rows.
PySpark window functions mirror the SQL:2003 standard OVER clause and are available through pyspark.sql.window.Window and pyspark.sql.functions. They are essential for time-series analysis, ranking, running totals, period-over-period comparisons, and sessionisation.
This blog covers every category of PySpark window function — syntax, the WindowSpec API, ranking, aggregate, offset (lag/lead), and distribution functions — with full code examples and output tables after every example.
2. Sample Data Used Throughout This Blog
All examples use the following sales DataFrame. Familiarise yourself with the data before diving into the examples.
sales DataFrame
| emp_id | name | department | salary | hire_year | region | sales_amount | month |
| E01 | Alice | Engineering | 95000 | 2020 | North | 42000 | 2024-01 |
| E02 | Bob | Engineering | 82000 | 2021 | North | 38000 | 2024-01 |
| E03 | Carol | Marketing | 74000 | 2019 | South | 61000 | 2024-01 |
| E04 | David | Marketing | 68000 | 2022 | South | 54000 | 2024-01 |
| E05 | Eve | Engineering | 91000 | 2020 | East | 47000 | 2024-01 |
| E06 | Frank | HR | 61000 | 2018 | West | 28000 | 2024-01 |
| E07 | Grace | HR | 58000 | 2023 | West | 25000 | 2024-01 |
| E08 | Hank | Marketing | 71000 | 2021 | East | 58000 | 2024-01 |
3. WindowSpec Anatomy — The OVER Clause in PySpark
Every window function in PySpark requires a WindowSpec object that defines the window over which the function operates. A WindowSpec has three optional components: partitionBy, orderBy, and a frame specification.
3.1 Building a WindowSpec
from pyspark.sql.window import Windowfrom pyspark.sql import functions as F# Full WindowSpec with all three componentswindow_spec = ( Window .partitionBy('department') # divide rows into independent groups .orderBy(F.col('salary').desc()) # sort within each partition .rowsBetween( # frame: from partition start to current row Window.unboundedPreceding, Window.currentRow ))# Minimal WindowSpec (no partition, no frame)global_window = Window.orderBy('sales_amount')# Partition only (no order — for aggregate functions)dept_window = Window.partitionBy('department')
3.2 Frame Specification Options
| Frame Expression | Meaning |
| Window.unboundedPreceding | Start of the partition |
| Window.currentRow | The current row |
| Window.unboundedFollowing | End of the partition |
| rowsBetween(start, end) | Physical row offsets (exact row count) |
| rangeBetween(start, end) | Logical value offsets (value-based range) |
4. Ranking Functions
Ranking functions assign a sequential number to each row within a partition based on an ORDER BY expression. PySpark offers four ranking functions, each handling ties differently.
| Function | Ties | Gaps After Ties | Example (ties at 2nd) |
| row_number() | Arbitrary unique | N/A | 1, 2, 3, 4 |
| rank() | Same rank for ties | Yes | 1, 2, 2, 4 |
| dense_rank() | Same rank for ties | No | 1, 2, 2, 3 |
| ntile(n) | Divides into buckets | N/A | 1, 1, 2, 2 |
4.1 row_number()
from pyspark.sql.window import Windowfrom pyspark.sql import functions as Fwindow = Window.partitionBy('department').orderBy(F.col('salary').desc())df_ranked = df.withColumn('row_num', F.row_number().over(window))df_ranked.select('name', 'department', 'salary', 'row_num').show()
OUTPUT: row_number() — unique rank within each department by salary desc
| name | department | salary | row_num |
| Alice | Engineering | 95000 | 1 |
| Eve | Engineering | 91000 | 2 |
| Bob | Engineering | 82000 | 3 |
| Carol | Marketing | 74000 | 1 |
| Hank | Marketing | 71000 | 2 |
| David | Marketing | 68000 | 3 |
| Frank | HR | 61000 | 1 |
| Grace | HR | 58000 | 2 |
4.2 rank() and dense_rank()
window = Window.partitionBy('department').orderBy(F.col('salary').desc())df_ranks = df.withColumn('rnk', F.rank().over(window)) \ .withColumn('dense_rnk', F.dense_rank().over(window))df_ranks.select('name', 'department', 'salary', 'rnk', 'dense_rnk').show()
OUTPUT: rank() vs dense_rank() — notice gap behaviour with tied salaries
| name | department | salary | rank() | dense_rank() |
| Alice | Engineering | 95000 | 1 | 1 |
| Eve | Engineering | 91000 | 2 | 2 |
| Bob | Engineering | 82000 | 3 | 3 |
| Carol | Marketing | 74000 | 1 | 1 |
| Hank | Marketing | 71000 | 2 | 2 |
| David | Marketing | 68000 | 3 | 3 |
| Frank | HR | 61000 | 1 | 1 |
| Grace | HR | 58000 | 2 | 2 |
4.3 ntile(n) — Divide Rows into Buckets
# Divide employees into salary quartiles (4 buckets) per departmentwindow = Window.partitionBy('department').orderBy(F.col('salary').desc())df_ntile = df.withColumn('quartile', F.ntile(4).over(window))df_ntile.select('name', 'department', 'salary', 'quartile').show()
OUTPUT: ntile(4) — salary quartile within department
| name | department | salary | quartile |
| Alice | Engineering | 95000 | 1 |
| Eve | Engineering | 91000 | 2 |
| Bob | Engineering | 82000 | 3 |
| Carol | Marketing | 74000 | 1 |
| Hank | Marketing | 71000 | 2 |
| David | Marketing | 68000 | 3 |
| Frank | HR | 61000 | 1 |
| Grace | HR | 58000 | 2 |
4.4 Practical Pattern — Top N Per Group
# Find the top 2 highest-paid employees per departmentwindow = Window.partitionBy('department').orderBy(F.col('salary').desc())df_top2 = ( df .withColumn('rnk', F.dense_rank().over(window)) .filter(F.col('rnk') <= 2) .select('name', 'department', 'salary', 'rnk') .orderBy('department', 'rnk'))df_top2.show()
OUTPUT: Top 2 employees per department by salary
| name | department | salary | rnk |
| Alice | Engineering | 95000 | 1 |
| Eve | Engineering | 91000 | 2 |
| Frank | HR | 61000 | 1 |
| Grace | HR | 58000 | 2 |
| Carol | Marketing | 74000 | 1 |
| Hank | Marketing | 71000 | 2 |
5. Aggregate Window Functions
All standard PySpark aggregate functions — sum, avg, count, min, max — can be used as window functions by appending .over(window_spec). Unlike groupBy, they return a value for every row while still computing across the defined window.
# Cumulative salary sum per department ordered by salary ascwindow = ( Window .partitionBy('department') .orderBy('salary') .rowsBetween(Window.unboundedPreceding, Window.currentRow))df_running = df.withColumn('running_salary_total', F.sum('salary').over(window))df_running.select('name', 'department', 'salary', 'running_salary_total') \ .orderBy('department', 'salary').show()
5.1 Running Total (Cumulative Sum)
OUTPUT: Running total of salary within each department
| name | department | salary | running_salary_total |
| Bob | Engineering | 82000 | 82000 |
| Eve | Engineering | 91000 | 173000 |
| Alice | Engineering | 95000 | 268000 |
| Grace | HR | 58000 | 58000 |
| Frank | HR | 61000 | 119000 |
| David | Marketing | 68000 | 68000 |
| Hank | Marketing | 71000 | 139000 |
| Carol | Marketing | 74000 | 213000 |
5.2 Department Average Alongside Each Employee
# No ORDER BY — aggregates across entire partitiondept_window = Window.partitionBy('department')df_avg = df.withColumn('dept_avg_salary', F.round(F.avg('salary').over(dept_window), 0)) \ .withColumn('dept_max_salary', F.max('salary').over(dept_window)) \ .withColumn('diff_from_avg', F.col('salary') - F.round(F.avg('salary').over(dept_window), 0))df_avg.select('name', 'department', 'salary', 'dept_avg_salary', 'diff_from_avg') \ .orderBy('department', 'salary').show()
OUTPUT: Each employee vs department average salary
| name | department | salary | dept_avg | diff_from_avg |
| Bob | Engineering | 82000 | 89333 | -7333 |
| Eve | Engineering | 91000 | 89333 | +1667 |
| Alice | Engineering | 95000 | 89333 | +5667 |
| Grace | HR | 58000 | 59500 | -1500 |
| Frank | HR | 61000 | 59500 | +1500 |
| David | Marketing | 68000 | 71000 | -3000 |
| Hank | Marketing | 71000 | 71000 | 0 |
| Carol | Marketing | 74000 | 71000 | +3000 |
5.3 Moving Average (Rolling 3-Row Window)
# 3-row rolling average of sales_amount ordered by monthrolling_window = ( Window .partitionBy('region') .orderBy('month') .rowsBetween(-2, Window.currentRow) # current + 2 preceding rows)df_rolling = df.withColumn( 'rolling_avg_sales', F.round(F.avg('sales_amount').over(rolling_window), 0))df_rolling.select('name', 'region', 'sales_amount', 'rolling_avg_sales').show()
OUTPUT: 3-row rolling average of sales per region
| name | region | sales_amount | rolling_avg_sales |
| Alice | North | 42000 | 42000 |
| Bob | North | 38000 | 40000 |
| Carol | South | 61000 | 61000 |
| David | South | 54000 | 57500 |
| Eve | East | 47000 | 47000 |
| Hank | East | 58000 | 52500 |
| Frank | West | 28000 | 28000 |
| Grace | West | 25000 | 26500 |
5.4 Percentage of Department Total
dept_window = Window.partitionBy('department')df_pct = df.withColumn( 'pct_of_dept_sales', F.round( F.col('sales_amount') * 100 / F.sum('sales_amount').over(dept_window), 1 ))df_pct.select('name', 'department', 'sales_amount', 'pct_of_dept_sales') \ .orderBy('department', F.col('pct_of_dept_sales').desc()).show()
OUTPUT: Each employee’s sales as % of their department total
| name | department | sales_amount | pct_of_dept_sales |
| Eve | Engineering | 47000 | 37.6 |
| Alice | Engineering | 42000 | 33.6 |
| Bob | Engineering | 38000 | 30.4 |
| Carol | Marketing | 61000 | 35.3 |
| Hank | Marketing | 58000 | 33.5 |
| David | Marketing | 54000 | 31.2 |
| Frank | HR | 28000 | 52.8 |
| Grace | HR | 25000 | 47.2 |
6. Offset Functions — lag() and lead()
lag() accesses the value of a column from a previous row. lead() accesses a value from a following row. Both are indispensable for period-over-period comparisons and detecting changes between consecutive rows.
| Function | Returns | Syntax |
| lag(col, n, default) | Value from n rows BEFORE current row | F.lag(‘col’, 1, 0).over(window) |
| lead(col, n, default) | Value from n rows AFTER current row | F.lead(‘col’, 1, 0).over(window) |
6.1 Month-over-Month Sales Change
from pyspark.sql import functions as Ffrom pyspark.sql.window import Windowwindow = Window.partitionBy('region').orderBy('month')df_mom = df \ .withColumn('prev_month_sales', F.lag('sales_amount', 1).over(window)) \ .withColumn('mom_change', F.col('sales_amount') - F.lag('sales_amount', 1).over(window)) \ .withColumn('mom_pct_change', F.round( (F.col('sales_amount') - F.lag('sales_amount', 1).over(window)) * 100.0 / F.lag('sales_amount', 1).over(window), 1))df_mom.select('name', 'region', 'sales_amount', 'prev_month_sales', 'mom_change', 'mom_pct_change').show()
OUTPUT: Month-over-month change using lag()
| name | region | sales_amount | prev_sales | mom_change | mom_pct_change |
| Alice | North | 42000 | null | null | null |
| Bob | North | 38000 | 42000 | -4000 | -9.5 |
| Carol | South | 61000 | null | null | null |
| David | South | 54000 | 61000 | -7000 | -11.5 |
| Eve | East | 47000 | null | null | null |
| Hank | East | 58000 | 47000 | +11000 | +23.4 |
| Frank | West | 28000 | null | null | null |
| Grace | West | 25000 | 28000 | -3000 | -10.7 |
6.2 lead() — Days Until Next High Earner
# Order by salary and look ahead to the next employee's salarywindow = Window.partitionBy('department').orderBy('salary')df_lead = df \ .withColumn('next_salary', F.lead('salary', 1).over(window)) \ .withColumn('next_name', F.lead('name', 1).over(window)) \ .withColumn('salary_gap', F.lead('salary', 1).over(window) - F.col('salary'))df_lead.select('name', 'department', 'salary', 'next_name', 'next_salary', 'salary_gap').show()
OUTPUT: lead() — look ahead to the next employee’s salary in department
| name | department | salary | next_name | next_salary | salary_gap |
| Bob | Engineering | 82000 | Eve | 91000 | 9000 |
| Eve | Engineering | 91000 | Alice | 95000 | 4000 |
| Alice | Engineering | 95000 | null | null | null |
| Grace | HR | 58000 | Frank | 61000 | 3000 |
| Frank | HR | 61000 | null | null | null |
| David | Marketing | 68000 | Hank | 71000 | 3000 |
| Hank | Marketing | 71000 | Carol | 74000 | 3000 |
| Carol | Marketing | 74000 | null | null | null |
7. Value Functions — first_value() and last_value()
first_value() returns the value from the first row in the window frame. last_value() returns the value from the last row. These are useful for carrying a baseline value forward or comparing any row against an anchor.
7.1 first_value() and last_value()
window = ( Window .partitionBy('department') .orderBy(F.col('salary').desc()) .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing))df_fv = df \ .withColumn('highest_salary', F.first('salary').over(window)) \ .withColumn('lowest_salary', F.last('salary').over(window)) \ .withColumn('top_earner', F.first('name').over(window))df_fv.select('name', 'department', 'salary', 'highest_salary', 'lowest_salary', 'top_earner').show()
OUTPUT: first_value() and last_value() with unbounded frame
| name | department | salary | highest_salary | lowest_salary | top_earner |
| Alice | Engineering | 95000 | 95000 | 82000 | Alice |
| Eve | Engineering | 91000 | 95000 | 82000 | Alice |
| Bob | Engineering | 82000 | 95000 | 82000 | Alice |
| Carol | Marketing | 74000 | 74000 | 68000 | Carol |
| Hank | Marketing | 71000 | 74000 | 68000 | Carol |
| David | Marketing | 68000 | 74000 | 68000 | Carol |
| Frank | HR | 61000 | 61000 | 58000 | Frank |
| Grace | HR | 58000 | 61000 | 58000 | Frank |
Always specify rowsBetween(unboundedPreceding, unboundedFollowing) with last_value(). Without it, the default frame ends at currentRow, making last_value() always return the current row’s value.
8. Distribution Functions — percent_rank() and cume_dist()
Distribution functions compute the relative position of a row within its partition as a value between 0 and 1. They are used for percentile analysis, histograms, and segmentation.
| Function | Formula | Range | Interpretation |
| percent_rank() | (rank – 1) / (total_rows – 1) | 0.0 to 1.0 | % of rows ranked BELOW this row |
| cume_dist() | rank / total_rows | 1/n to 1.0 | % of rows with value <= this row |
window = Window.partitionBy('department').orderBy('salary')df_dist = df \ .withColumn('pct_rank', F.round(F.percent_rank().over(window), 2)) \ .withColumn('cume_dist', F.round(F.cume_dist().over(window), 2))df_dist.select('name', 'department', 'salary', 'pct_rank', 'cume_dist') \ .orderBy('department', 'salary').show()
OUTPUT: percent_rank() and cume_dist() within department
| name | department | salary | pct_rank | cume_dist |
| Bob | Engineering | 82000 | 0.0 | 0.33 |
| Eve | Engineering | 91000 | 0.5 | 0.67 |
| Alice | Engineering | 95000 | 1.0 | 1.0 |
| Grace | HR | 58000 | 0.0 | 0.5 |
| Frank | HR | 61000 | 1.0 | 1.0 |
| David | Marketing | 68000 | 0.0 | 0.33 |
| Hank | Marketing | 71000 | 0.5 | 0.67 |
| Carol | Marketing | 74000 | 1.0 | 1.0 |
9. Advanced Real-World Patterns
9.1 Year-over-Year Sales Growth
from pyspark.sql import functions as Ffrom pyspark.sql.window import Window# Assume df_annual has columns: year, region, annual_saleswindow = Window.partitionBy('region').orderBy('year')df_yoy = df_annual \ .withColumn('prev_year_sales', F.lag('annual_sales', 1).over(window)) \ .withColumn('yoy_growth_pct', F.round( (F.col('annual_sales') - F.lag('annual_sales', 1).over(window)) * 100.0 / F.lag('annual_sales', 1).over(window), 1 ))
9.2 Sessionisation — Group Events into Sessions
from pyspark.sql import functions as Ffrom pyspark.sql.window import Window# Flag a new session when gap > 30 minutesw_ordered = Window.partitionBy('user_id').orderBy('event_time')df_flagged = clickstream.withColumn( 'new_session_flag', F.when( (F.col('event_time').cast('long') - F.lag('event_time', 1).over(w_ordered).cast('long')) > 1800, 1 ).otherwise(0))# Cumulative sum of flags = session numberw_cum = w_ordered.rowsBetween(Window.unboundedPreceding, Window.currentRow)df_sessions = df_flagged.withColumn( 'session_id', F.sum('new_session_flag').over(w_cum))
9.3 Dense Rank with Multiple Tie-Breaking Columns
# Rank by sales_amount desc, break ties by salary descwindow = ( Window .partitionBy('department') .orderBy(F.col('sales_amount').desc(), F.col('salary').desc()))df_multi = df.withColumn('sales_rank', F.dense_rank().over(window))df_multi.select('name', 'department', 'sales_amount', 'salary', 'sales_rank') \ .orderBy('department', 'sales_rank').show()
OUTPUT: Dense rank with tie-breaking on salary
| name | department | sales_amount | salary | sales_rank |
| Eve | Engineering | 47000 | 91000 | 1 |
| Alice | Engineering | 42000 | 95000 | 2 |
| Bob | Engineering | 38000 | 82000 | 3 |
| Carol | Marketing | 61000 | 74000 | 1 |
| Hank | Marketing | 58000 | 71000 | 2 |
| David | Marketing | 54000 | 68000 | 3 |
| Frank | HR | 28000 | 61000 | 1 |
| Grace | HR | 25000 | 58000 | 2 |
9.4 Running Count with Conditional Filter
# Count employees hired in or after 2021 — running count per departmentwindow = ( Window .partitionBy('department') .orderBy('hire_year') .rowsBetween(Window.unboundedPreceding, Window.currentRow))df_count = df.withColumn( 'running_recent_hires', F.sum(F.when(F.col('hire_year') >= 2021, 1).otherwise(0)).over(window))df_count.select('name', 'department', 'hire_year', 'running_recent_hires') \ .orderBy('department', 'hire_year').show()
10. Window Functions vs groupBy — When to Use Which
Understanding the difference between window functions and groupBy is essential to writing efficient, correct PySpark code.
| Aspect | groupBy + agg() | Window Function |
| Output rows | One row per group | All original rows preserved |
| Other columns | Only grouped/aggregated columns | All columns freely selectable |
| Per-row context | No access to individual rows | Each row sees its peers |
| Ranking | Not possible | row_number, rank, dense_rank |
| Running totals | Not possible | sum().over() with cumulative frame |
| Period comparison | Not possible | lag() / lead() |
| Use in filter | Use .filter() after agg() | Must wrap in subquery or temp view |
| Performance | Single shuffle | Can require repartition/sort |
# groupBy — one row per department (loses individual rows)df_grp = df.groupBy('department').agg(F.avg('salary').alias('avg_salary'))# Window — keeps all rows AND adds the department averagedept_window = Window.partitionBy('department')df_win = df.withColumn('dept_avg', F.round(F.avg('salary').over(dept_window), 0))# Join approach (messy — use window instead)# df_joined = df.join(df_grp, on='department', how='left')
11. Performance Tips for Window Functions
- Repartition by the PARTITION BY column before applying window functions. This co-locates all rows of the same partition on the same executor, avoiding shuffle.
- Minimise the number of distinct window definitions in the same transformation — reuse WindowSpec objects.
- Use orderBy only when required (ranking, offset, cumulative). Aggregate-only windows (sum, avg) do not need ORDER BY.
- Avoid applying window functions on columns with very high cardinality in PARTITION BY — it creates millions of tiny partitions.
- For large datasets, enable Adaptive Query Execution: spark.sql.adaptive.enabled = true.
- Use persist() or cache() before applying multiple window functions on the same DataFrame.
# Pre-repartition by partition key before windowingdf_repartitioned = df.repartition(200, 'department')# Reuse WindowSpec — define once, use many timesdept_sal_window = Window.partitionBy('department').orderBy(F.col('salary').desc())df_result = df_repartitioned \ .withColumn('rnk', F.rank().over(dept_sal_window)) \ .withColumn('dense_rnk', F.dense_rank().over(dept_sal_window)) \ .withColumn('row_num', F.row_number().over(dept_sal_window))# Cache if applying multiple unrelated windowsdf_result.cache()df_result.count() # trigger caching
12. Quick Reference — PySpark Window Functions Cheat Sheet
| Goal | PySpark Pattern |
| Unique row number per partition | F.row_number().over(Window.partitionBy(‘grp’).orderBy(‘col’)) |
| Rank with gaps | F.rank().over(Window.partitionBy(‘grp’).orderBy(col.desc())) |
| Rank without gaps | F.dense_rank().over(Window.partitionBy(‘grp’).orderBy(col.desc())) |
| Top N per group | .withColumn(‘rnk’, dense_rank().over(w)).filter(col(‘rnk’) <= N) |
| Quartile / decile buckets | F.ntile(4).over(Window.partitionBy(‘grp’).orderBy(‘col’)) |
| Running total | F.sum(‘col’).over(Window…rowsBetween(unboundedPreceding, currentRow)) |
| Rolling N-row average | F.avg(‘col’).over(Window…rowsBetween(-N+1, currentRow)) |
| % of partition total | col * 100 / F.sum(‘col’).over(Window.partitionBy(‘grp’)) |
| Compare to partition average | col – F.avg(‘col’).over(Window.partitionBy(‘grp’)) |
| Previous row value | F.lag(‘col’, 1).over(Window.partitionBy(‘grp’).orderBy(‘date’)) |
| Next row value | F.lead(‘col’, 1).over(Window.partitionBy(‘grp’).orderBy(‘date’)) |
| Period-over-period change | col – F.lag(‘col’, 1).over(w) |
| First value in partition | F.first(‘col’).over(Window…rowsBetween(unboundedPreceding, unboundedFollowing)) |
| Last value in partition | F.last(‘col’).over(Window…rowsBetween(unboundedPreceding, unboundedFollowing)) |
| Percentile rank (0 to 1) | F.percent_rank().over(Window.partitionBy(‘grp’).orderBy(‘col’)) |
| Cumulative distribution | F.cume_dist().over(Window.partitionBy(‘grp’).orderBy(‘col’)) |
13. Conclusion
PySpark window functions are a must-have tool for any data engineer or analyst working with Apache Spark. They elegantly solve problems that would otherwise require self-joins, complex subqueries, or multiple groupBy operations — while keeping every original row in the result.
The foundation is the WindowSpec: partitionBy defines your groups, orderBy defines the sequence within each group, and the optional frame clause pins the exact rows each calculation sees. Layer on ranking, aggregation, lag/lead, and distribution functions to express any analytical pattern from running totals to year-over-year comparisons to sessionisation.
Master these patterns and you will write faster, cleaner, and more expressive PySpark transformations across any scale of data.
Happy Sparking!
Discover more from DataSangyan
Subscribe to get the latest posts sent to your email.