Data Ingestion JSON File in Azure Data Bricks


We have a JSON File and we need to create the Data frame with that JSON File , we can use the below python code to create a data frame from a JSON File 

For this DEMO we will be using Google COLAB to run our PySpark Code you can use the link below to Access Google Colab for Free !!

https://colab.research.google.com/ 

Install PySpark


pip install pyspark





Once PySpark is Installed , we need to import "SparkContext" and import "SparkSession" libraries and then we can Invoke Spark Session which can be done by running the below commands  


Import & Invoke Spark Session


from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession

sc = SparkContext.getOrCreate()
spark = SparkSession(sc)




For this Demo we have created a Sample data , Please find below the sample Employees data , Create a JSON file Manually and add the below code to create the sample file , If you check the file care fully then you will see that we have f_name and l_name which is nested inside emp_name.  


Create Data Frame From CSV


{"Empid":1,"emp_name":{"f_name":"Mukesh","l_name":"dhiman"},"Email":"muk@gmail.com","DepartmentID":"HR","Region":"CA","Phonenumber":"111-222-333"},
{"Empid":1,"emp_name":{"f_name":"Angela","l_name":"Mathew"},"Email":"John.Mathew@xyz.com","DepartmentID":"HR","Region":"CA","Phonenumber":"111-222-333"},
{"Empid":1,"emp_name":{"f_name":"Bonnie","l_name":"Parker"},"Email":"Jim.Parker@xyz.com","DepartmentID":"HR","Region":"CA","Phonenumber":"111-222-333"},
{"Empid":1,"emp_name":{"f_name":"Frank","l_name":"Ran"},"Email":"Sophia.Ran@xyz.com","DepartmentID":"HR","Region":"CA","Phonenumber":"111-222-333"},
{"Empid":1,"emp_name":{"f_name":"Joe","l_name":"Blake"},"Email":"Wendi.Blake@xyz.com","DepartmentID":"HR","Region":"CA","Phonenumber":"111-222-333"},
{"Empid":1,"emp_name":{"f_name":"Kimberly","l_name":"Lai"},"Email":"Stephan.Lai@testing.com","DepartmentID":"HR","Region":"CA","Phonenumber":"111-222-333"},
{"Empid":1,"emp_name":{"f_name":"Lisa","l_name":"Van Damme"},"Email":"Fay.Van Damme@testing.com","DepartmentID":"HR","Region":"CA","Phonenumber":"111-222-333"},
{"Empid":1,"emp_name":{"f_name":"Micheal","l_name":"Dice"},"Email":"Brevin.Dice@testing.com","DepartmentID":"HR","Region":"CA","Phonenumber":"111-222-333"},
{"Empid":1,"emp_name":{"f_name":"Patrick","l_name":"Oleveria"},"Email":"Regina.Oleveria@testing.com","DepartmentID":"HR","Region":"CA","Phonenumber":"111-222-333"},
{"Empid":1,"emp_name":{"f_name":"Rose","l_name":"Regina"},"Email":"rose@gmail.com","DepartmentID":"HR","Region":"CA","Phonenumber":"111-222-333"},
{"Empid":1,"emp_name":{"f_name":"Todd","l_name":"Brevin"},"Email":"todd@gmail.com","DepartmentID":"HR","Region":"CA","Phonenumber":"111-222-333"}



The below Syntax can be use to create a Data Frame from a JSON File , and You can also use the "SHOW()" Command at the end , to view the content of the data frame 

Create Data Frame and Show Command


spark.read \
.json("/content/sample_data/test.json").show()





We will create a Variable "DF" to hold the value of this data frame 

Assigning Variable


DF = spark.read.csv("/content/sample_data/california_housing_test.csv")


Again We can use the "SHOW()" Command to view the content of the data frame 

Show Command


DF.show()





One thing we noticed is that our file have headers but it was not identified in our data Frame so we need to fix this. we can use the ".option("Header",True)" command to tell the data frame to use the header provided by CSV file 

Including Headers


DF = spark.read.option("header",True).csv("/content/sample_data/california_housing_test.csv")


Now if we run the DF.Show() again we see that we now have headers in our dataFrame 

Show Command


DF.show()


Now if we run the DF.Show() again we see that we now have headers in our data Frame



 

We can also check the datatype used by the columns in the data frame , we can do this by running the following command "DF.PrintSchema()" , One thing we notice here is that all the fields were given a datatype of "String" ,However when we see the CSV Data its of type DOUBLE, So what need to do in order to fix this ? 

Print Schema


DF.printSchema()





We can use the below command ".option("inferschema",True)" , So that the Complier can judge the correct datatypes of the given CSV file. 

InferSchema True


DF = spark.read.option("header",True).option("inferschema",True).csv("/content/sample_data/california_housing_test.csv")


Now When we run the Print Schema command again , we can see that the data frame have pickup the correct Data types which is of type "DOUBLE" 

Print Schema


DF.printSchema()




Now What if i only want to select only specific columns from a Data Frame ? there are multiple ways of doing this one of them is , we can use the ".select" method to show only selected columns from our dataframe  

Selecting Column Method 1


DF.select("longitude","latitude","housing_median_age","total_rooms","total_bedrooms","population","households"
,"median_income","median_house_value").show()






The second method to select the Columns from a data frame is to use the below syntax where we use the data Frame name and the column name and a dot (.) in between them. This Method allows you to apply any column based functions  

Selecting Column Method 2


DF.select(DF.longitude,DF.latitude,DF.housing_median_age,DF.total_rooms,DF.total_bedrooms,
          DF.population,DF.households,DF.median_income,DF.median_house_value).show()




The third way is to use the dataframe name and a Square bracket and the column name inside a double quotes , This Method allows you to apply any column based functions  

Selecting Column Method 3


DF.select(DF["longitude"],DF["latitude"],DF["housing_median_age"],DF["total_rooms"],DF["total_bedrooms"],
          DF["population"],DF["households"],DF["median_income"],DF["median_house_value"]).show()




The fourth Method is to use COL Function , This Method also allows you to apply any column based functions , But before we can use COL Function we need to import function from PYSPARK Libraries. 

Selecting Column Method 4


from pyspark.sql.functions import col


The COL Function


DF.select(col("longitude"),col("latitude"),col("housing_median_age"),col("total_rooms"),col("total_bedrooms"),
          col("population"),col("households"),col("median_income"),col("median_house_value")).show()


  



Now the next task is that how can we rename a Column in a Data Frame we can use "With Column Renamed" to accomplish this , Below is the example for the same .Notice that we are using the Show() Command to view the Data in the Data Frame  

Renaming a Column


DF.withColumnRenamed("longitude","Longitude") \
.withColumnRenamed("latitude","Latitude") \
.withColumnRenamed("housing_median_age","Housing_Median_Age") \
.withColumnRenamed("total_rooms","Total_Rooms") \
.withColumnRenamed("total_bedrooms","Total_Bedrooms") \
.withColumnRenamed("population","Population") \
.withColumnRenamed("households","Households") \
.withColumnRenamed("median_income","Median_Income") \
.withColumnRenamed("median_house_value","Median_House_Value") \
.show()




In the previous section we saw how to Rename a Column , Now Let see how can we Add a New column in a Data Frame , We can use the ".WITHCOLUMN" function to accomplish this , Below we will create a new column "Data_Ingestion_Date" and will use the "current_timestamp" function to add the datetime of record Insertion , But before using "current_timestamp" function we need to import it .  

Importing Current Time stamp Function


from pyspark.sql.functions import current_timestamp


Adding a New Column


DF.withColumn("Data_Ingestion_Date",current_timestamp()) \
.show()


Suppose we need to add a new column with NULL Values in it or any other string or integer pre defined values , if we try to by using the ".withcolumn" syntax below then we will get an error , we can solve this by using the LIT Function.  

Adding Column with NULL Values


DF.withColumn("Data_Ingestion_Date","NULL") \
.show()



Before using the LIT Function we need to import this Function by running the below command or else we will get an error , We will Wrap the column value inside the LIT Function so that we can use it as shown below.  

Importing "LIT" Function


from pyspark.sql.functions import lit


The "LIT" Function


DF.withColumn("Data_Ingestion_Date",lit("NULL")) \
.show()




Once we have learned to read the data from CSV and selecting renaming and adding new columns we can now write this data into our Storage , we can do this by running the below command , the below command will write the data into a parquet file in our storage "/content/sample_data/california_housing_test.parquet"  

Write Data to Parquet File


DF.write.parquet("/content/sample_data/california_housing_test.parquet")


Once the Parquet file is created we can again see the content of the parquet file by running the below command to verify the data in our Parquet file , so we started with a CSV File , performed various opertions on that file and we finally stored it in a parquet format in our storage .  

Write Data to Parquet File


spark.read.parquet("/content/sample_data/california_housing_test.parquet").show()


 




One thing to note here is if we again try to run the below command to create the Parquet file it will fail as the file already exists  

Write Data to Parquet File


DF.write.parquet("/content/sample_data/california_housing_test.parquet")





We can Add the .mode("override") so that if the file already exists then the file is Overwrite.  

Write Mode Overwrite to Parquet File


DF.write.mode("overwrite").parquet("/content/sample_data/california_housing_test.parquet")


Post a Comment

0 Comments