#1. create a sample dataframe

# create a sample dataframe
data = [
    ("Sam","Sales", 50000),
    ("Ram","Sales", 60000),
    ("Dan","Sales", 70000),
    ("Gam","Marketing", 40000),
    ("Ham","Marketing", 55000),
    ("RAM","IT", 45000),
    ("Mam","IT", 65000),
    ("MAM","IT", 75000)
]

df = spark.createDataFrame(data, ["Name","Department", "Salary"])

other ways to create dataframe here?

In PySpark, there are multiple ways to create a DataFrame besides using spark.createDataFrame() with a list of tuples. Below are some alternative methods to create the same DataFrame:


1. Using a List of Dictionaries

You can create a DataFrame from a list of dictionaries, where each dictionary represents a row.

data = [
    {"Name": "Sam", "Department": "Sales", "Salary": 50000},
    {"Name": "Ram", "Department": "Sales", "Salary": 60000},
    {"Name": "Dan", "Department": "Sales", "Salary": 70000},
    {"Name": "Gam", "Department": "Marketing", "Salary": 40000},
    {"Name": "Ham", "Department": "Marketing", "Salary": 55000},
    {"Name": "RAM", "Department": "IT", "Salary": 45000},
    {"Name": "Mam", "Department": "IT", "Salary": 65000},
    {"Name": "MAM", "Department": "IT", "Salary": 75000}
]

df = spark.createDataFrame(data)
df.show()

2. Using a Pandas DataFrame

You can create a PySpark DataFrame from a Pandas DataFrame.

import pandas as pd

# Create a Pandas DataFrame
pandas_df = pd.DataFrame({
    "Name": ["Sam", "Ram", "Dan", "Gam", "Ham", "RAM", "Mam", "MAM"],
    "Department": ["Sales", "Sales", "Sales", "Marketing", "Marketing", "IT", "IT", "IT"],
    "Salary": [50000, 60000, 70000, 40000, 55000, 45000, 65000, 75000]
})

# Convert Pandas DataFrame to PySpark DataFrame
df = spark.createDataFrame(pandas_df)
df.show()

3. Using an RDD

You can create a DataFrame from an RDD (Resilient Distributed Dataset).

from pyspark.sql import Row

# Create an RDD
rdd = spark.sparkContext.parallelize([
    Row(Name="Sam", Department="Sales", Salary=50000),
    Row(Name="Ram", Department="Sales", Salary=60000),
    Row(Name="Dan", Department="Sales", Salary=70000),
    Row(Name="Gam", Department="Marketing", Salary=40000),
    Row(Name="Ham", Department="Marketing", Salary=55000),
    Row(Name="RAM", Department="IT", Salary=45000),
    Row(Name="Mam", Department="IT", Salary=65000),
    Row(Name="MAM", Department="IT", Salary=75000)
])

# Convert RDD to DataFrame
df = spark.createDataFrame(rdd)
df.show()

4. Using SQL Query

If you already have a table in a database or another DataFrame, you can create a new DataFrame using a SQL query.

# Register the DataFrame as a temporary view
df.createOrReplaceTempView("employees")

# Create a new DataFrame using a SQL query
new_df = spark.sql("SELECT * FROM employees")
new_df.show()

5. Using CSV or Other File Formats

You can create a DataFrame by reading data from a file (e.g., CSV, JSON, Parquet).

# Example: Reading from a CSV file
df = spark.read.csv("path/to/file.csv", header=True, inferSchema=True)
df.show()

6. Using StructType and StructField

You can define the schema explicitly using StructType and StructField and then create a DataFrame.

from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# Define the schema
schema = StructType([
    StructField("Name", StringType(), True),
    StructField("Department", StringType(), True),
    StructField("Salary", IntegerType(), True)
])

# Create the DataFrame
data = [
    ("Sam", "Sales", 50000),
    ("Ram", "Sales", 60000),
    ("Dan", "Sales", 70000),
    ("Gam", "Marketing", 40000),
    ("Ham", "Marketing", 55000),
    ("RAM", "IT", 45000),
    ("Mam", "IT", 65000),
    ("MAM", "IT", 75000)
]

df = spark.createDataFrame(data, schema)
df.show()

Output for All Methods

All the above methods will produce the same DataFrame:

Copy

+----+----------+------+
|Name|Department|Salary|
+----+----------+------+
| Sam|     Sales| 50000|
| Ram|     Sales| 60000|
| Dan|     Sales| 70000|
| Gam| Marketing| 40000|
| Ham| Marketing| 55000|
| RAM|        IT| 45000|
| Mam|        IT| 65000|
| MAM|        IT| 75000|
+----+----------+------+

#2. Dataframe or Tables Columns Manipulation

How to show column names of a Dataframe in Pyspark:-

# Show column names
print(df.columns)--List of Column Names
df.printSchema()-- Schema that has column Names
df.select(*df.columns).show(truncate=False)-- 
print(df.dtypes)---Tuples that has column name 
column_names = [x[0] for x in df.dtypes]
column_names = list(map(lambda x: x[0], df.dtypes))
column_names, _ = zip(*df.dtypes)
column_names = [field.name for field in df.schema.fields]

How to Print Column Names as separated by comma or space and no single or Double quotes

print(‘, ‘.join([c.strip(“‘”) for c in df.columns]))

from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Initialize Spark session
spark = SparkSession.builder.appName("RenameColumns").getOrCreate()

# Sample data
data = [(1, "foo"), (2, "bar")]
columns = ["column_abc", "column_xyz"]

# Create the original DataFrame
df = spark.createDataFrame(data, columns)

# Show the original DataFrame
df.show()

# Rename columns by removing the 'column_' prefix
new_columns = [col(col_name).alias(col_name.replace("column_", "")) for col_name in df.columns]

# Create a new DataFrame with renamed columns
new_df = df.select(new_columns)

# Show the new DataFrame
new_df.show()

#2nd Method Rename columns by removing "column_" prefix
new_column_names = [col_name.replace("column_", "") for col_name in df.columns]
df_transformed = df.toDF(*new_column_names)
df_transformed.show()

# Save the new DataFrame as a table (optional)
new_df.write.saveAsTable("new_table_name")

# Function to remove 'column_' and convert to CamelCase
def to_camel_case(column_name):
    new_name = column_name.replace("column_", "")  # Remove 'column_'
    return new_name[:1].upper() + new_name[1:]  # Convert first letter to uppercase
new_column_names = [to_camel_case(col_name) for col_name in df.columns]
df_transformed = df_original.toDF(*new_column_names)
df_transformed.show()
# Function to remove 'column_' and convert to CamelCase
def to_camel_case(column_name):
    new_name1 ,new_name2 = column_name.split("_")
    new_name1 = new_name1.replace("_", "")  # Remove 'column_'
    print(new_name1,new_name2)
    return new_name1.capitalize()  + new_name2.capitalize()    # Convert first letter to uppercase
new_column_names = [to_camel_case(col_name) for col_name in df.columns]
df_transformed = df.toDF(*new_column_names)
df_transformed.show()

what is toDF and its uses.. can we use any other function to achieve similar functionality?

What is toDF() in PySpark?

toDF() is a method in PySpark used to rename columns of a DataFrame or to create a DataFrame from an RDD by specifying column names.

Uses of toDF()

  1. Rename All Columns: You can use toDF(*new_column_names) to rename all columns at once.
  2. Create a DataFrame from an RDD: When creating a DataFrame from an RDD, toDF() allows specifying column names.
  3. Convert RDD to DataFrame with Inferred Schema: If an RDD has data but no schema, toDF() helps create a structured DataFrame.

Alternative Ways to Rename Columns

Yes, you can rename columns using other methods in PySpark:

1. Using withColumnRenamed() (Renaming Columns One by One)

pythonCopyEditfrom pyspark.sql.functions import col

df_transformed = df_original
for col_name in df_original.columns:
    df_transformed = df_transformed.withColumnRenamed(col_name, col_name.replace("column_", ""))

Best for renaming selective columns but is less efficient when renaming all columns.


2. Using selectExpr() (Using SQL Expressions)

pythonCopyEditdf_transformed = df_original.selectExpr(
    "`column_abc` as abc", "`column_xyz` as xyz", "`column_pqr` as pqr"
)

Useful when renaming and applying transformations simultaneously.


3. Using alias() (Inside select())

df_transformed = df_original.select(
    col("column_abc").alias("abc"),
    col("column_xyz").alias("xyz"),
    col("column_pqr").alias("pqr")
)
Allows renaming specific columns, similar to selectExpr().

#3. To add to say 12 columns dynamically depends on current_date. .. select abc,xyz, txn_date, case when {txn_date is in current_month-1 , last month} then abc as mdab_{current_year}{last_month} .. will be repeated as for last 11 months

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, expr, date_format, trunc
from datetime import datetime, timedelta

# Initialize Spark Session
spark = SparkSession.builder.appName("DynamicColumns").getOrCreate()

# Sample DataFrame with txn_date
data = [
    (1, "Alice", "2024-12-15"),
    (2, "Bob", "2024-11-10"),
    (3, "Charlie", "2024-10-05"),
]
df = spark.createDataFrame(data, ["abc", "xyz", "txn_date"])

# Convert txn_date to date format
df = df.withColumn("txn_date", col("txn_date").cast("date"))

# Get current date
current_date = datetime.today()

# Generate last 12 months dynamically
for i in range(1, 13):  # Last 12 months
    target_date = current_date.replace(day=1) - timedelta(days=1)  # Go to last month
    target_date = target_date.replace(day=1) - timedelta(days=(i-1) * 30)  # Shift further back
    year_month = target_date.strftime("%Y%m")  # Format as YYYYMM

    # Generate column dynamically
    column_name = f"mdab_{year_month}"
    df = df.withColumn(
        column_name, 
        expr(f"CASE WHEN date_format(txn_date, 'yyyyMM') = '{year_month}' THEN abc ELSE NULL END")
    )

# Show final DataFrame
df.show()
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, expr, current_date, date_format, add_months

# Initialize Spark Session
spark = SparkSession.builder.appName("DynamicColumns").getOrCreate()

# Sample DataFrame with txn_date
data = [
    (1, "Alice", "2024-12-15"),
    (2, "Bob", "2024-11-10"),
    (3, "Charlie", "2024-10-05"),
]
df = spark.createDataFrame(data, ["abc", "xyz", "txn_date"])

# Convert txn_date to date type
df = df.withColumn("txn_date", col("txn_date").cast("date"))

# Generate last 12 months dynamically using PySpark functions
current_dt = current_date()
new_columns = []

for i in range(1, 13):  # Last 12 months
    target_month = date_format(add_months(current_dt, -i), "yyyyMM")  # Get YYYYMM for each month
    column_name = f"mdab_{i}"  # e.g., mdab_1, mdab_2, ..., mdab_12

    # Create the CASE WHEN column
    new_columns.append(
        expr(f"CASE WHEN date_format(txn_date, 'yyyyMM') = '{target_month}' THEN abc ELSE NULL END").alias(column_name)
    )

# Select all columns + dynamically generated columns
df_final = df.select(["abc", "xyz", "txn_date"] + new_columns)

# Show the result
df_final.show()

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, expr, current_date, date_format, add_months, sequence, explode, lit

# Initialize Spark Session
spark = SparkSession.builder.appName("DynamicColumns").getOrCreate()

# Sample DataFrame with txn_date
data = [
    (1, "Alice", "2024-12-15"),
    (2, "Bob", "2024-11-10"),
    (3, "Charlie", "2024-10-05"),
]
df = spark.createDataFrame(data, ["abc", "xyz", "txn_date"])

# Convert txn_date to date type
df = df.withColumn("txn_date", col("txn_date").cast("date"))

# Step 1: Generate an array of the last 12 months
df_months = df.withColumn("month_list", sequence(add_months(current_date(), -12), add_months(current_date(), -1), lit(1)))

# Step 2: Explode the array into separate rows
df_exploded = df_months.withColumn("month_offset", explode(col("month_list")))

# Step 3: Format exploded dates into 'YYYYMM' format
df_exploded = df_exploded.withColumn("year_month", date_format(col("month_offset"), "yyyyMM"))

# Step 4: Apply CASE WHEN condition
df_mapped = df_exploded.withColumn(
    "mdab_value", expr("CASE WHEN date_format(txn_date, 'yyyyMM') = year_month THEN abc ELSE NULL END")
)

# Step 5: Pivot table to get months as columns
df_final = df_mapped.groupBy("abc", "xyz", "txn_date").pivot("year_month").agg(expr("first(mdab_value)"))

# Show final output
df_final.show()

dbname.table_name want to save dbname and table_name in seperate variable and then to pass them as parameters in pyspark/python script

# String containing dbname and table_name
full_table_name = "my_database.my_table"

# Split into dbname and table_name
dbname, table_name = full_table_name.split('.')

# Print the variables
print(f"Database Name: {dbname}")
print(f"Table Name: {table_name}")

# Use these variables in a PySpark job
query = f"SELECT * FROM {dbname}.{table_name} WHERE some_column = 'some_value'"

# Example usage in PySpark
df = spark.sql(query)
df.show()

import argparse
from pyspark.sql import SparkSession

# Initialize Spark
spark = SparkSession.builder.appName("Database Table Processing").getOrCreate()

# Argument parser
parser = argparse.ArgumentParser(description="Process a database table")
parser.add_argument("--dbname", required=True, help="Database name")
parser.add_argument("--table_name", required=True, help="Table name")

# Parse the arguments
args = parser.parse_args()
dbname = args.dbname
table_name = args.table_name

# Use dbname and table_name in your query
query = f"SELECT * FROM {dbname}.{table_name} WHERE some_column = 'some_value'"

# Execute the query in PySpark
df = spark.sql(query)
df.show()

import argparse
from pyspark.sql import SparkSession

# Initialize Spark
spark = SparkSession.builder.appName("Database Table Processing").getOrCreate()

# Argument parser
parser = argparse.ArgumentParser(description="Process a database table")
parser.add_argument("--dbname", required=True, help="Database name")
parser.add_argument("--table_name", required=True, help="Table name")

# Parse the arguments
args = parser.parse_args()
dbname = args.dbname
table_name = args.table_name

# Use dbname and table_name in your query
query = "SELECT * FROM {}.{} WHERE some_column = 'some_value'".format(dbname, table_name)

# Execute the query in PySpark
df = spark.sql(query)
df.show()


spark-submit myscript.py --dbname my_database --table_name my_table

To create a list of columns from a Pandas DataFrame or PySpark DataFrame, formatted with different delimiters or enclosed in quotes.

Pandas DataFrame

Example DataFrame

import pandas as pd

df = pd.DataFrame({
    "col1": [1, 2],
    "col2": [3, 4],
    "col3": [5, 6]
})

Creating a List of Columns

# Get column names
columns = df.columns.tolist()

# Separate by comma
comma_separated = ", ".join(columns)
print("Comma-separated:", comma_separated)

# Separate by space
space_separated = " ".join(columns)
print("Space-separated:", space_separated)

# Enclose in quotes and separate by comma
quoted_comma_separated = ", ".join(f"'{col}'" for col in columns)
print("Quoted, comma-separated:", quoted_comma_separated)

For PySpark DataFrame

Example DataFrame

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("ColumnListExample").getOrCreate()

data = [(1, 3, 5), (2, 4, 6)]
columns = ["col1", "col2", "col3"]
df = spark.createDataFrame(data, columns)

Creating a List of Columns

# Get column names
columns = df.columns

# Separate by comma
comma_separated = ", ".join(columns)
print("Comma-separated:", comma_separated)

# Separate by space
space_separated = " ".join(columns)
print("Space-separated:", space_separated)

# Enclose in quotes and separate by comma
quoted_comma_separated = ", ".join(f"'{col}'" for col in columns)
print("Quoted, comma-separated:", quoted_comma_separated)

Outputs

For the DataFrame columns ["col1", "col2", "col3"], you would get:

  1. Comma-separated: col1, col2, col3
  2. Space-separated: col1 col2 col3
  3. Quoted, comma-separated: 'col1', 'col2', 'col3'

Discover more from HintsToday

Subscribe to get the latest posts sent to your email.

Pages ( 1 of 2 ): 1 2Next »

Discover more from HintsToday

Subscribe now to keep reading and get access to the full archive.

Continue reading