#1. create a sample dataframe
# create a sample dataframe
data = [
("Sam","Sales", 50000),
("Ram","Sales", 60000),
("Dan","Sales", 70000),
("Gam","Marketing", 40000),
("Ham","Marketing", 55000),
("RAM","IT", 45000),
("Mam","IT", 65000),
("MAM","IT", 75000)
]
df = spark.createDataFrame(data, ["Name","Department", "Salary"])
other ways to create dataframe here?
In PySpark, there are multiple ways to create a DataFrame besides using spark.createDataFrame()
with a list of tuples. Below are some alternative methods to create the same DataFrame:
1. Using a List of Dictionaries
You can create a DataFrame from a list of dictionaries, where each dictionary represents a row.
data = [
{"Name": "Sam", "Department": "Sales", "Salary": 50000},
{"Name": "Ram", "Department": "Sales", "Salary": 60000},
{"Name": "Dan", "Department": "Sales", "Salary": 70000},
{"Name": "Gam", "Department": "Marketing", "Salary": 40000},
{"Name": "Ham", "Department": "Marketing", "Salary": 55000},
{"Name": "RAM", "Department": "IT", "Salary": 45000},
{"Name": "Mam", "Department": "IT", "Salary": 65000},
{"Name": "MAM", "Department": "IT", "Salary": 75000}
]
df = spark.createDataFrame(data)
df.show()
2. Using a Pandas DataFrame
You can create a PySpark DataFrame from a Pandas DataFrame.
import pandas as pd
# Create a Pandas DataFrame
pandas_df = pd.DataFrame({
"Name": ["Sam", "Ram", "Dan", "Gam", "Ham", "RAM", "Mam", "MAM"],
"Department": ["Sales", "Sales", "Sales", "Marketing", "Marketing", "IT", "IT", "IT"],
"Salary": [50000, 60000, 70000, 40000, 55000, 45000, 65000, 75000]
})
# Convert Pandas DataFrame to PySpark DataFrame
df = spark.createDataFrame(pandas_df)
df.show()
3. Using an RDD
You can create a DataFrame from an RDD (Resilient Distributed Dataset).
from pyspark.sql import Row
# Create an RDD
rdd = spark.sparkContext.parallelize([
Row(Name="Sam", Department="Sales", Salary=50000),
Row(Name="Ram", Department="Sales", Salary=60000),
Row(Name="Dan", Department="Sales", Salary=70000),
Row(Name="Gam", Department="Marketing", Salary=40000),
Row(Name="Ham", Department="Marketing", Salary=55000),
Row(Name="RAM", Department="IT", Salary=45000),
Row(Name="Mam", Department="IT", Salary=65000),
Row(Name="MAM", Department="IT", Salary=75000)
])
# Convert RDD to DataFrame
df = spark.createDataFrame(rdd)
df.show()
4. Using SQL Query
If you already have a table in a database or another DataFrame, you can create a new DataFrame using a SQL query.
# Register the DataFrame as a temporary view
df.createOrReplaceTempView("employees")
# Create a new DataFrame using a SQL query
new_df = spark.sql("SELECT * FROM employees")
new_df.show()
5. Using CSV or Other File Formats
You can create a DataFrame by reading data from a file (e.g., CSV, JSON, Parquet).
# Example: Reading from a CSV file
df = spark.read.csv("path/to/file.csv", header=True, inferSchema=True)
df.show()
6. Using StructType and StructField
You can define the schema explicitly using StructType
and StructField
and then create a DataFrame.
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
# Define the schema
schema = StructType([
StructField("Name", StringType(), True),
StructField("Department", StringType(), True),
StructField("Salary", IntegerType(), True)
])
# Create the DataFrame
data = [
("Sam", "Sales", 50000),
("Ram", "Sales", 60000),
("Dan", "Sales", 70000),
("Gam", "Marketing", 40000),
("Ham", "Marketing", 55000),
("RAM", "IT", 45000),
("Mam", "IT", 65000),
("MAM", "IT", 75000)
]
df = spark.createDataFrame(data, schema)
df.show()
Output for All Methods
All the above methods will produce the same DataFrame:
Copy
+----+----------+------+ |Name|Department|Salary| +----+----------+------+ | Sam| Sales| 50000| | Ram| Sales| 60000| | Dan| Sales| 70000| | Gam| Marketing| 40000| | Ham| Marketing| 55000| | RAM| IT| 45000| | Mam| IT| 65000| | MAM| IT| 75000| +----+----------+------+
#2. Dataframe or Tables Columns Manipulation
How to show column names of a Dataframe in Pyspark:-
# Show column names
print(df.columns)--List of Column Names
df.printSchema()-- Schema that has column Names
df.select(*df.columns).show(truncate=False)--
print(df.dtypes)---Tuples that has column name
column_names = [x[0] for x in df.dtypes]
column_names = list(map(lambda x: x[0], df.dtypes))
column_names, _ = zip(*df.dtypes)
column_names = [field.name for field in df.schema.fields]
How to Print Column Names as separated by comma or space and no single or Double quotes
print(‘, ‘.join([c.strip(“‘”) for c in df.columns]))
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
# Initialize Spark session
spark = SparkSession.builder.appName("RenameColumns").getOrCreate()
# Sample data
data = [(1, "foo"), (2, "bar")]
columns = ["column_abc", "column_xyz"]
# Create the original DataFrame
df = spark.createDataFrame(data, columns)
# Show the original DataFrame
df.show()
# Rename columns by removing the 'column_' prefix
new_columns = [col(col_name).alias(col_name.replace("column_", "")) for col_name in df.columns]
# Create a new DataFrame with renamed columns
new_df = df.select(new_columns)
# Show the new DataFrame
new_df.show()
#2nd Method Rename columns by removing "column_" prefix
new_column_names = [col_name.replace("column_", "") for col_name in df.columns]
df_transformed = df.toDF(*new_column_names)
df_transformed.show()
# Save the new DataFrame as a table (optional)
new_df.write.saveAsTable("new_table_name")
# Function to remove 'column_' and convert to CamelCase
def to_camel_case(column_name):
new_name = column_name.replace("column_", "") # Remove 'column_'
return new_name[:1].upper() + new_name[1:] # Convert first letter to uppercase
new_column_names = [to_camel_case(col_name) for col_name in df.columns]
df_transformed = df_original.toDF(*new_column_names)
df_transformed.show()
# Function to remove 'column_' and convert to CamelCase
def to_camel_case(column_name):
new_name1 ,new_name2 = column_name.split("_")
new_name1 = new_name1.replace("_", "") # Remove 'column_'
print(new_name1,new_name2)
return new_name1.capitalize() + new_name2.capitalize() # Convert first letter to uppercase
new_column_names = [to_camel_case(col_name) for col_name in df.columns]
df_transformed = df.toDF(*new_column_names)
df_transformed.show()
what is toDF and its uses.. can we use any other function to achieve similar functionality?
What is toDF()
in PySpark?
toDF()
is a method in PySpark used to rename columns of a DataFrame or to create a DataFrame from an RDD by specifying column names.
Uses of toDF()
- Rename All Columns: You can use
toDF(*new_column_names)
to rename all columns at once. - Create a DataFrame from an RDD: When creating a DataFrame from an RDD,
toDF()
allows specifying column names. - Convert RDD to DataFrame with Inferred Schema: If an RDD has data but no schema,
toDF()
helps create a structured DataFrame.
Alternative Ways to Rename Columns
Yes, you can rename columns using other methods in PySpark:
1. Using withColumnRenamed()
(Renaming Columns One by One)
pythonCopyEditfrom pyspark.sql.functions import col
df_transformed = df_original
for col_name in df_original.columns:
df_transformed = df_transformed.withColumnRenamed(col_name, col_name.replace("column_", ""))
Best for renaming selective columns but is less efficient when renaming all columns.
2. Using selectExpr()
(Using SQL Expressions)
pythonCopyEditdf_transformed = df_original.selectExpr(
"`column_abc` as abc", "`column_xyz` as xyz", "`column_pqr` as pqr"
)
Useful when renaming and applying transformations simultaneously.
3. Using alias()
(Inside select()
)
df_transformed = df_original.select(
col("column_abc").alias("abc"),
col("column_xyz").alias("xyz"),
col("column_pqr").alias("pqr")
)
Allows renaming specific columns, similar to selectExpr()
.
#3. To add to say 12 columns dynamically depends on current_date. .. select abc,xyz, txn_date, case when {txn_date is in current_month-1 , last month} then abc as mdab_{current_year}{last_month} .. will be repeated as for last 11 months
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, expr, date_format, trunc
from datetime import datetime, timedelta
# Initialize Spark Session
spark = SparkSession.builder.appName("DynamicColumns").getOrCreate()
# Sample DataFrame with txn_date
data = [
(1, "Alice", "2024-12-15"),
(2, "Bob", "2024-11-10"),
(3, "Charlie", "2024-10-05"),
]
df = spark.createDataFrame(data, ["abc", "xyz", "txn_date"])
# Convert txn_date to date format
df = df.withColumn("txn_date", col("txn_date").cast("date"))
# Get current date
current_date = datetime.today()
# Generate last 12 months dynamically
for i in range(1, 13): # Last 12 months
target_date = current_date.replace(day=1) - timedelta(days=1) # Go to last month
target_date = target_date.replace(day=1) - timedelta(days=(i-1) * 30) # Shift further back
year_month = target_date.strftime("%Y%m") # Format as YYYYMM
# Generate column dynamically
column_name = f"mdab_{year_month}"
df = df.withColumn(
column_name,
expr(f"CASE WHEN date_format(txn_date, 'yyyyMM') = '{year_month}' THEN abc ELSE NULL END")
)
# Show final DataFrame
df.show()
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, expr, current_date, date_format, add_months
# Initialize Spark Session
spark = SparkSession.builder.appName("DynamicColumns").getOrCreate()
# Sample DataFrame with txn_date
data = [
(1, "Alice", "2024-12-15"),
(2, "Bob", "2024-11-10"),
(3, "Charlie", "2024-10-05"),
]
df = spark.createDataFrame(data, ["abc", "xyz", "txn_date"])
# Convert txn_date to date type
df = df.withColumn("txn_date", col("txn_date").cast("date"))
# Generate last 12 months dynamically using PySpark functions
current_dt = current_date()
new_columns = []
for i in range(1, 13): # Last 12 months
target_month = date_format(add_months(current_dt, -i), "yyyyMM") # Get YYYYMM for each month
column_name = f"mdab_{i}" # e.g., mdab_1, mdab_2, ..., mdab_12
# Create the CASE WHEN column
new_columns.append(
expr(f"CASE WHEN date_format(txn_date, 'yyyyMM') = '{target_month}' THEN abc ELSE NULL END").alias(column_name)
)
# Select all columns + dynamically generated columns
df_final = df.select(["abc", "xyz", "txn_date"] + new_columns)
# Show the result
df_final.show()
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, expr, current_date, date_format, add_months, sequence, explode, lit
# Initialize Spark Session
spark = SparkSession.builder.appName("DynamicColumns").getOrCreate()
# Sample DataFrame with txn_date
data = [
(1, "Alice", "2024-12-15"),
(2, "Bob", "2024-11-10"),
(3, "Charlie", "2024-10-05"),
]
df = spark.createDataFrame(data, ["abc", "xyz", "txn_date"])
# Convert txn_date to date type
df = df.withColumn("txn_date", col("txn_date").cast("date"))
# Step 1: Generate an array of the last 12 months
df_months = df.withColumn("month_list", sequence(add_months(current_date(), -12), add_months(current_date(), -1), lit(1)))
# Step 2: Explode the array into separate rows
df_exploded = df_months.withColumn("month_offset", explode(col("month_list")))
# Step 3: Format exploded dates into 'YYYYMM' format
df_exploded = df_exploded.withColumn("year_month", date_format(col("month_offset"), "yyyyMM"))
# Step 4: Apply CASE WHEN condition
df_mapped = df_exploded.withColumn(
"mdab_value", expr("CASE WHEN date_format(txn_date, 'yyyyMM') = year_month THEN abc ELSE NULL END")
)
# Step 5: Pivot table to get months as columns
df_final = df_mapped.groupBy("abc", "xyz", "txn_date").pivot("year_month").agg(expr("first(mdab_value)"))
# Show final output
df_final.show()
dbname.table_name want to save dbname and table_name in seperate variable and then to pass them as parameters in pyspark/python script
# String containing dbname and table_name
full_table_name = "my_database.my_table"
# Split into dbname and table_name
dbname, table_name = full_table_name.split('.')
# Print the variables
print(f"Database Name: {dbname}")
print(f"Table Name: {table_name}")
# Use these variables in a PySpark job
query = f"SELECT * FROM {dbname}.{table_name} WHERE some_column = 'some_value'"
# Example usage in PySpark
df = spark.sql(query)
df.show()
import argparse
from pyspark.sql import SparkSession
# Initialize Spark
spark = SparkSession.builder.appName("Database Table Processing").getOrCreate()
# Argument parser
parser = argparse.ArgumentParser(description="Process a database table")
parser.add_argument("--dbname", required=True, help="Database name")
parser.add_argument("--table_name", required=True, help="Table name")
# Parse the arguments
args = parser.parse_args()
dbname = args.dbname
table_name = args.table_name
# Use dbname and table_name in your query
query = f"SELECT * FROM {dbname}.{table_name} WHERE some_column = 'some_value'"
# Execute the query in PySpark
df = spark.sql(query)
df.show()
import argparse
from pyspark.sql import SparkSession
# Initialize Spark
spark = SparkSession.builder.appName("Database Table Processing").getOrCreate()
# Argument parser
parser = argparse.ArgumentParser(description="Process a database table")
parser.add_argument("--dbname", required=True, help="Database name")
parser.add_argument("--table_name", required=True, help="Table name")
# Parse the arguments
args = parser.parse_args()
dbname = args.dbname
table_name = args.table_name
# Use dbname and table_name in your query
query = "SELECT * FROM {}.{} WHERE some_column = 'some_value'".format(dbname, table_name)
# Execute the query in PySpark
df = spark.sql(query)
df.show()
spark-submit myscript.py --dbname my_database --table_name my_table
To create a list of columns from a Pandas DataFrame or PySpark DataFrame, formatted with different delimiters or enclosed in quotes.
Pandas DataFrame
Example DataFrame
import pandas as pd
df = pd.DataFrame({
"col1": [1, 2],
"col2": [3, 4],
"col3": [5, 6]
})
Creating a List of Columns
# Get column names
columns = df.columns.tolist()
# Separate by comma
comma_separated = ", ".join(columns)
print("Comma-separated:", comma_separated)
# Separate by space
space_separated = " ".join(columns)
print("Space-separated:", space_separated)
# Enclose in quotes and separate by comma
quoted_comma_separated = ", ".join(f"'{col}'" for col in columns)
print("Quoted, comma-separated:", quoted_comma_separated)
For PySpark DataFrame
Example DataFrame
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("ColumnListExample").getOrCreate()
data = [(1, 3, 5), (2, 4, 6)]
columns = ["col1", "col2", "col3"]
df = spark.createDataFrame(data, columns)
Creating a List of Columns
# Get column names
columns = df.columns
# Separate by comma
comma_separated = ", ".join(columns)
print("Comma-separated:", comma_separated)
# Separate by space
space_separated = " ".join(columns)
print("Space-separated:", space_separated)
# Enclose in quotes and separate by comma
quoted_comma_separated = ", ".join(f"'{col}'" for col in columns)
print("Quoted, comma-separated:", quoted_comma_separated)
Outputs
For the DataFrame columns ["col1", "col2", "col3"]
, you would get:
- Comma-separated:
col1, col2, col3
- Space-separated:
col1 col2 col3
- Quoted, comma-separated:
'col1', 'col2', 'col3'
Discover more from HintsToday
Subscribe to get the latest posts sent to your email.