Coding Questions in Spark SQL, Pyspark, and Python

Difference of scores for each subjects from Predecessor only for each subject in same class, difference of scores for each subject from successor only for each subject in same class , difference of total scores for a student from his predecessor and successor in a same class, also difference of scores from previous year for a student in spark sql or dataframe api

Here are the solutions:

Difference of scores for each subject from Predecessor

SELECT 
  class, 
  subject, 
  student, 
  score - LAG(score) OVER (PARTITION BY class, subject ORDER BY score) AS diff_from_predecessor
FROM 
  scores
ORDER BY 
  class, subject, score;
from pyspark.sql import functions as F, Window

scores_df = spark.read.table("scores")
window = Window.partitionBy("class", "subject").orderBy("score")

scores_df = scores_df.withColumn(
    "diff_from_predecessor", 
    F.col("score") - F.lag("score").over(window)
)

scores_df.show()

Difference of scores for each subject from Successor

SELECT 
  class, 
  subject, 
  student, 
  LEAD(score) OVER (PARTITION BY class, subject ORDER BY score) - score AS diff_from_successor
FROM 
  scores
ORDER BY 
  class, subject, score;
scores_df = scores_df.withColumn(
    "diff_from_successor", 
    F.lead("score").over(window) - F.col("score")
)

scores_df.show()

Difference of total scores for a student from his Predecessor and Successor

SELECT 
  class, 
  student, 
  SUM(score) - LAG(SUM(score)) OVER (PARTITION BY class ORDER BY SUM(score)) AS diff_from_predecessor,
  LEAD(SUM(score)) OVER (PARTITION BY class ORDER BY SUM(score)) - SUM(score) AS diff_from_successor
FROM 
  scores
GROUP BY 
  class, student
ORDER BY 
  class, SUM(score);
from pyspark.sql import functions as F

total_scores_df = scores_df.groupBy("class", "student").agg(F.sum("score").alias("total_score"))

window = Window.partitionBy("class").orderBy("total_score")

total_scores_df = total_scores_df.withColumn(
    "diff_from_predecessor", 
    F.col("total_score") - F.lag("total_score").over(window)
)

total_scores_df = total_scores_df.withColumn(
    "diff_from_successor", 
    F.lead("total_score").over(window) - F.col("total_score")
)

total_scores_df.show()

Difference of scores from previous year for a student

Assuming you have a column year in your scores table.

SELECT 
  class, 
  student, 
  subject, 
  score - LAG(score) OVER (PARTITION BY class, student, subject ORDER BY year) AS diff_from_previous_year
FROM 
  scores
ORDER BY 
  class, student, subject, year;
from pyspark.sql import functions as F, Window

scores_df = spark.read.table("scores")
window = Window.partitionBy("class", "student", "subject").orderBy("year")

scores_df = scores_df.withColumn(
    "diff_from_previous_year", 
    F.col("score") - F.lag("score").over(window)
)

scores_df.show()
# Import necessary libraries
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import Window

# Create a SparkSession
spark = SparkSession.builder.appName("Score Differences").getOrCreate()

# Sample data
data = [
    ("ClassA", "Student1", "Math", 90, 2022),
    ("ClassA", "Student1", "Science", 85, 2022),
    ("ClassA", "Student2", "Math", 95, 2022),
    ("ClassA", "Student2", "Science", 90, 2022),
    ("ClassA", "Student3", "Math", 92, 2022),
    ("ClassA", "Student3", "Science", 88, 2022),
    ("ClassA", "Student1", "Math", 92, 2023),
    ("ClassA", "Student1", "Science", 90, 2023),
    ("ClassA", "Student2", "Math", 97, 2023),
    ("ClassA", "Student2", "Science", 92, 2023),
    ("ClassA", "Student3", "Math", 94, 2023),
    ("ClassA", "Student3", "Science", 91, 2023),
]

# Create a DataFrame
df = spark.createDataFrame(data, ["Class", "Student", "Subject", "Score", "Year"])

# Calculate differences from predecessor and successor for each subject
window = Window.partitionBy("Class", "Subject").orderBy("Score")

df_with_ranks = df.withColumn("Rank", F.dense_rank().over(window))

df_with_diffs = df_with_ranks.withColumn("DiffFromPredecessor", F.col("Score") - F.lag("Score").over(window)) \
                                 .withColumn("DiffFromSuccessor", F.lead("Score").over(window) - F.col("Score"))

# Calculate difference of total scores for a student from his predecessor and successor
total_scores_window = Window.partitionBy("Class").orderBy("TotalScore")

total_scores_df = df.groupBy("Class", "Student").agg(F.sum("Score").alias("TotalScore")) \
                       .withColumn("Rank", F.dense_rank().over(total_scores_window))

total_scores_df_with_diffs = total_scores_df.withColumn("DiffFromPredecessor", F.col("TotalScore") - F.lag("TotalScore").over(total_scores_window)) \
                                                 .withColumn("DiffFromSuccessor", F.lead("TotalScore").over(total_scores_window) - F.col("TotalScore"))

# Calculate difference of scores from previous year for a student
yearly_scores_window = Window.partitionBy("Class", "Student", "Subject").orderBy("Year")

yearly_scores_df = df.withColumn("DiffFromPreviousYear", F.col("Score") - F.lag("Score").over(yearly_scores_window))

# Display the results
df_with_diffs.show()
total_scores_df_with_diffs.show()
yearly_scores_df.show()

Pages ( 4 of 11 ): « Previous123 4 56 ... 11Next »