We have a JSON File and we need to create the Data frame with that JSON File , we can use the below python code to create a data frame from a JSON File
For this DEMO we will be using Google COLAB to run our PySpark Code you can use the link below to Access Google Colab for Free !!
https://colab.research.google.com/
Install PySpark
pip install pyspark
Once PySpark is Installed , we need to import "SparkContext" and import "SparkSession" libraries and then we can Invoke Spark Session which can be done by running the below commands
Import & Invoke Spark Session
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
sc = SparkContext.getOrCreate()
spark = SparkSession(sc)
Create Data Frame From CSV
{"Empid":1,"emp_name":{"f_name":"Mukesh","l_name":"dhiman"},"Email":"muk@gmail.com","DepartmentID":"HR","Region":"CA","Phonenumber":"111-222-333"},
{"Empid":1,"emp_name":{"f_name":"Angela","l_name":"Mathew"},"Email":"John.Mathew@xyz.com","DepartmentID":"HR","Region":"CA","Phonenumber":"111-222-333"},
{"Empid":1,"emp_name":{"f_name":"Bonnie","l_name":"Parker"},"Email":"Jim.Parker@xyz.com","DepartmentID":"HR","Region":"CA","Phonenumber":"111-222-333"},
{"Empid":1,"emp_name":{"f_name":"Frank","l_name":"Ran"},"Email":"Sophia.Ran@xyz.com","DepartmentID":"HR","Region":"CA","Phonenumber":"111-222-333"},
{"Empid":1,"emp_name":{"f_name":"Joe","l_name":"Blake"},"Email":"Wendi.Blake@xyz.com","DepartmentID":"HR","Region":"CA","Phonenumber":"111-222-333"},
{"Empid":1,"emp_name":{"f_name":"Kimberly","l_name":"Lai"},"Email":"Stephan.Lai@testing.com","DepartmentID":"HR","Region":"CA","Phonenumber":"111-222-333"},
{"Empid":1,"emp_name":{"f_name":"Lisa","l_name":"Van Damme"},"Email":"Fay.Van Damme@testing.com","DepartmentID":"HR","Region":"CA","Phonenumber":"111-222-333"},
{"Empid":1,"emp_name":{"f_name":"Micheal","l_name":"Dice"},"Email":"Brevin.Dice@testing.com","DepartmentID":"HR","Region":"CA","Phonenumber":"111-222-333"},
{"Empid":1,"emp_name":{"f_name":"Patrick","l_name":"Oleveria"},"Email":"Regina.Oleveria@testing.com","DepartmentID":"HR","Region":"CA","Phonenumber":"111-222-333"},
{"Empid":1,"emp_name":{"f_name":"Rose","l_name":"Regina"},"Email":"rose@gmail.com","DepartmentID":"HR","Region":"CA","Phonenumber":"111-222-333"},
{"Empid":1,"emp_name":{"f_name":"Todd","l_name":"Brevin"},"Email":"todd@gmail.com","DepartmentID":"HR","Region":"CA","Phonenumber":"111-222-333"}
The below Syntax can be use to create a Data Frame from a JSON File , and You can also use the "SHOW()" Command at the end , to view the content of the data frame
Create Data Frame and Show Command
spark.read \
.json("/content/sample_data/test.json").show()
We will create a Variable "DF" to hold the value of this data frame
Assigning Variable
DF = spark.read.csv("/content/sample_data/california_housing_test.csv")
Again We can use the "SHOW()" Command to view the content of the data frame
Show Command
DF.show()
One thing we noticed is that our file have headers but it was not identified in our data Frame so we need to fix this. we can use the ".option("Header",True)" command to tell the data frame to use the header provided by CSV file
Including Headers
DF = spark.read.option("header",True).csv("/content/sample_data/california_housing_test.csv")
Now if we run the DF.Show() again we see that we now have headers in our dataFrame
Show Command
DF.show()
Now if we run the DF.Show() again we see that we now have headers in our data Frame
We can also check the datatype used by the columns in the data frame , we can do this by running the following command "DF.PrintSchema()" , One thing we notice here is that all the fields were given a datatype of "String" ,However when we see the CSV Data its of type DOUBLE, So what need to do in order to fix this ?
Print Schema
DF.printSchema()
We can use the below command ".option("inferschema",True)" , So that the Complier can judge the correct datatypes of the given CSV file.
InferSchema True
DF = spark.read.option("header",True).option("inferschema",True).csv("/content/sample_data/california_housing_test.csv")
Now When we run the Print Schema command again , we can see that the data frame have pickup the correct Data types which is of type "DOUBLE"
Print Schema
DF.printSchema()
Now What if i only want to select only specific columns from a Data Frame ? there are multiple ways of doing this one of them is , we can use the ".select" method to show only selected columns from our dataframe
Selecting Column Method 1
DF.select("longitude","latitude","housing_median_age","total_rooms","total_bedrooms","population","households"
,"median_income","median_house_value").show()
The second method to select the Columns from a data frame is to use the below syntax where we use the data Frame name and the column name and a dot (.) in between them. This Method allows you to apply any column based functions
Selecting Column Method 2
DF.select(DF.longitude,DF.latitude,DF.housing_median_age,DF.total_rooms,DF.total_bedrooms,
DF.population,DF.households,DF.median_income,DF.median_house_value).show()
The third way is to use the dataframe name and a Square bracket and the column name inside a double quotes , This Method allows you to apply any column based functions
Selecting Column Method 3
DF.select(DF["longitude"],DF["latitude"],DF["housing_median_age"],DF["total_rooms"],DF["total_bedrooms"],
DF["population"],DF["households"],DF["median_income"],DF["median_house_value"]).show()
The fourth Method is to use COL Function , This Method also allows you to apply any column based functions , But before we can use COL Function we need to import function from PYSPARK Libraries.
Selecting Column Method 4
from pyspark.sql.functions import col
The COL Function
DF.select(col("longitude"),col("latitude"),col("housing_median_age"),col("total_rooms"),col("total_bedrooms"),
col("population"),col("households"),col("median_income"),col("median_house_value")).show()
Now the next task is that how can we rename a Column in a Data Frame we can use "With Column Renamed" to accomplish this , Below is the example for the same .Notice that we are using the Show() Command to view the Data in the Data Frame
Renaming a Column
DF.withColumnRenamed("longitude","Longitude") \
.withColumnRenamed("latitude","Latitude") \
.withColumnRenamed("housing_median_age","Housing_Median_Age") \
.withColumnRenamed("total_rooms","Total_Rooms") \
.withColumnRenamed("total_bedrooms","Total_Bedrooms") \
.withColumnRenamed("population","Population") \
.withColumnRenamed("households","Households") \
.withColumnRenamed("median_income","Median_Income") \
.withColumnRenamed("median_house_value","Median_House_Value") \
.show()
In the previous section we saw how to Rename a Column , Now Let see how can we Add a New column in a Data Frame , We can use the ".WITHCOLUMN" function to accomplish this , Below we will create a new column "Data_Ingestion_Date" and will use the "current_timestamp" function to add the datetime of record Insertion , But before using "current_timestamp" function we need to import it .
Importing Current Time stamp Function
from pyspark.sql.functions import current_timestamp
Adding a New Column
DF.withColumn("Data_Ingestion_Date",current_timestamp()) \
.show()
Suppose we need to add a new column with NULL Values in it or any other string or integer pre defined values , if we try to by using the ".withcolumn" syntax below then we will get an error , we can solve this by using the LIT Function.
Adding Column with NULL Values
DF.withColumn("Data_Ingestion_Date","NULL") \
.show()
Before using the LIT Function we need to import this Function by running the below command or else we will get an error , We will Wrap the column value inside the LIT Function so that we can use it as shown below.
Importing "LIT" Function
from pyspark.sql.functions import lit
The "LIT" Function
DF.withColumn("Data_Ingestion_Date",lit("NULL")) \
.show()
Once we have learned to read the data from CSV and selecting renaming and adding new columns we can now write this data into our Storage , we can do this by running the below command , the below command will write the data into a parquet file in our storage "/content/sample_data/california_housing_test.parquet"
Write Data to Parquet File
DF.write.parquet("/content/sample_data/california_housing_test.parquet")
Once the Parquet file is created we can again see the content of the parquet file by running the below command to verify the data in our Parquet file , so we started with a CSV File , performed various opertions on that file and we finally stored it in a parquet format in our storage .
Write Data to Parquet File
spark.read.parquet("/content/sample_data/california_housing_test.parquet").show()
One thing to note here is if we again try to run the below command to create the Parquet file it will fail as the file already exists
Write Data to Parquet File
DF.write.parquet("/content/sample_data/california_housing_test.parquet")
We can Add the .mode("override") so that if the file already exists then the file is Overwrite.
Write Mode Overwrite to Parquet File
DF.write.mode("overwrite").parquet("/content/sample_data/california_housing_test.parquet")
0 Comments