Data Ingestion CSV File in Azure Data Bricks




We have a CSV File and we need to create the Data frame with that CSV File , we can use the below python code to create a data frame from a CSV File 

For this DEMO we will be using Google COLAB to run our PySpark Code you can use the link below to Access Google Colab for Free !!

https://colab.research.google.com/ 


Install PySpark


pip install pyspark





Once PySpark is Installed , we need to import "SparkContext" and import "SparkSession" libraries and then we can Invoke Spark Session which can be done by running the below commands  


Import & Invoke Spark Session


from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession

sc = SparkContext.getOrCreate()
spark = SparkSession(sc)








The below Syntax can be use to create a Data Frame from a CSV File , We have used the CSV file from Sample file Provided by Google COLAB 


Create DataFrame From CSV


spark.read.csv("/content/sample_data/california_housing_test.csv")

You can use the "SHOW()" Command at the end , to view the content of the dataframe 


Show Command


spark.read.csv("/content/sample_data/california_housing_test.csv").show()







We will create a Variable "DF" to hold the value of this data frame 


Assigning Variable


DF = spark.read.csv("/content/sample_data/california_housing_test.csv")


Again We can use the "SHOW()" Command to view the content of the data frame 


Show Command


DF.show()







One thing we noticed is that our file have headers but it was not identified in our data Frame so we need to fix this. we can use the ".option("Header",True)" command to tell the data frame to use the header provided by CSV file 


Including Headers


DF = spark.read.option("header",True).csv("/content/sample_data/california_housing_test.csv")


Now if we run the DF.Show() again we see that we now have headers in our dataFrame 


Show Command


DF.show()


Now if we run the DF.Show() again we see that we now have headers in our data Frame



 


We can also check the datatype used by the columns in the data frame , we can do this by running the following command "DF.PrintSchema()" , One thing we notice here is that all the fields were given a datatype of "String" ,However when we see the CSV Data its of type DOUBLE, So what need to do in order to fix this ? 


Print Schema


DF.printSchema()





We can use the below command ".option("inferschema",True)" , So that the Complier can judge the correct datatypes of the given CSV file. 

InferSchema True


DF = spark.read.option("header",True).option("inferschema",True).csv("/content/sample_data/california_housing_test.csv")


Now When we run the Print Schema command again , we can see that the data frame have pickup the correct Data types which is of type "DOUBLE" 

Print Schema


DF.printSchema()






Now What if i only want to select only specific columns from a Data Frame ? there are multiple ways of doing this one of them is , we can use the ".select" method to show only selected columns from our dataframe  

Selecting Column Method 1


DF.select("longitude","latitude","housing_median_age","total_rooms","total_bedrooms","population","households"
,"median_income","median_house_value").show()






The second method to select the Columns from a data frame is to use the below syntax where we use the data Frame name and the column name and a dot (.) in between them. This Method allows you to apply any column based functions  

Selecting Column Method 2


DF.select(DF.longitude,DF.latitude,DF.housing_median_age,DF.total_rooms,DF.total_bedrooms,
          DF.population,DF.households,DF.median_income,DF.median_house_value).show()






The third way is to use the dataframe name and a Square bracket and the column name inside a double quotes , This Method allows you to apply any column based functions  

Selecting Column Method 3


DF.select(DF["longitude"],DF["latitude"],DF["housing_median_age"],DF["total_rooms"],DF["total_bedrooms"],
          DF["population"],DF["households"],DF["median_income"],DF["median_house_value"]).show()





The fourth Method is to use COL Function , This Method also allows you to apply any column based functions , But before we can use COL Function we need to import function from PYSPARK Libraries.  

Selecting Column Method 4


from pyspark.sql.functions import col


The COL Function


DF.select(col("longitude"),col("latitude"),col("housing_median_age"),col("total_rooms"),col("total_bedrooms"),
          col("population"),col("households"),col("median_income"),col("median_house_value")).show()


  



Now the next task is that how can we rename a Column in a Data Frame we can use "With Column Renamed" to accomplish this , Below is the example for the same .Notice that we are using the Show() Command to view the Data in the Data Frame  

Renaming a Column


DF.withColumnRenamed("longitude","Longitude") \
.withColumnRenamed("latitude","Latitude") \
.withColumnRenamed("housing_median_age","Housing_Median_Age") \
.withColumnRenamed("total_rooms","Total_Rooms") \
.withColumnRenamed("total_bedrooms","Total_Bedrooms") \
.withColumnRenamed("population","Population") \
.withColumnRenamed("households","Households") \
.withColumnRenamed("median_income","Median_Income") \
.withColumnRenamed("median_house_value","Median_House_Value") \
.show()





In the previous section we saw how to Rename a Column , Now Let see how can we Add a New column in a Data Frame , We can use the ".WITHCOLUMN" function to accomplish this , Below we will create a new column "Data_Ingestion_Date" and will use the "current_timestamp" function to add the datetime of record Insertion , But before using "current_timestamp" function we need to import it .  

Importing Current Time stamp Function


from pyspark.sql.functions import current_timestamp


Adding a New Column


DF.withColumn("Data_Ingestion_Date",current_timestamp()) \
.show()


Suppose we need to add a new column with NULL Values in it or any other string or integer pre defined values , if we try to by using the ".withcolumn" syntax below then we will get an error , we can solve this by using the LIT Function.  

Adding Column with NULL Values


DF.withColumn("Data_Ingestion_Date","NULL") \
.show()




Before using the LIT Function we need to import this Function by running the below command or else we will get an error , We will Wrap the column value inside the LIT Function so that we can use it as shown below.  

Importing "LIT" Function


from pyspark.sql.functions import lit


The "LIT" Function


DF.withColumn("Data_Ingestion_Date",lit("NULL")) \
.show()





Once we have learned to read the data from CSV and selecting renaming and adding new columns we can now write this data into our Storage , we can do this by running the below command , the below command will write the data into a parquet file in our storage "/content/sample_data/california_housing_test.parquet"  

Write Data to Parquet File


DF.write.parquet("/content/sample_data/california_housing_test.parquet")


Once the Parquet file is created we can again see the content of the parquet file by running the below command to verify the data in our Parquet file , so we started with a CSV File , performed various opertions on that file and we finally stored it in a parquet format in our storage .  

Write Data to Parquet File


spark.read.parquet("/content/sample_data/california_housing_test.parquet").show()


 




One thing to note here is if we again try to run the below command to create the Parquet file it will fail as the file already exists  

Write Data to Parquet File


DF.write.parquet("/content/sample_data/california_housing_test.parquet")





We can Add the .mode("override") so that if the file already exists then the file is Overwrite.  

Write Mode Overwrite to Parquet File


DF.write.mode("overwrite").parquet("/content/sample_data/california_housing_test.parquet")


Post a Comment

0 Comments