Create Schema using case class for a RDD



Dear friends, today we’ll learn to create schema using using case class for a RDD.

This method uses reflection to generate the schema of an RDD that contains specific type of objects.
The scala interface foe Spark SQL supports automatically converting an RDD containing case classes  to a DataFrame. The case class defines the schema of the table/loaded data file into RDD.
The names of the case class are read using reflection and they become the name of the columns.

The case classes can also be nested or contain complex types such as Sequences of arrays. This RDD can be implicitly be converted to a DataFrame and then registered as a table. Tables can be used in subsequent SQL Statement.  

For Example I have below customers data file named test_file.csv in hdfs location. I’ll use this data for this example to create schema using case class for a RDD.

1,Divyanshu,Laptop,75000,Delhi
2,Himanshu,Laptop,95000,Delhi
3,Sushil,Hard Disk,20000,Delhi
4,Sidharth,Mobile,75000,Kolkata
5,Surendra,Desktop,70000,Kolkata
6,Mohit,Pen Drive,25000,Kolkata
7,Mohit,Laptop,65000,Hyderabad
8,Prabhu,Laptop,50000,Hyderabad
9,Rajesh,Head Phone,10000,Hyderabad
10,Mohit,Mobile,40000,Chennai
11,Sonia,Desktop,25000,Chennai
12,Deepthi,Head Phone,10000,Chennai
13,Priyanka,Desktop,56000,Noida
14,Prashant,Hard Disk,35000,Noida
15,Ayush,Mobile,15000,Noida

First I’ll load the data into a RDD named Load_Data as below :
val Load_data = sc.textFile("/user/g.aspiresit.001/Anamika/test_file.csv")



Now we will check this above RDD to see the recods of it :
val show_data = Load_data.collect


Now I want to print all records line by line :
val  print_data_by_line = Load_data.foreach(println)



Now next step is to create SQLContext using following command : Here sc means SparkContext object.

val sqlContext = new org.apache.spark.sql.SQLContext(sc)

and now use the following command to import all the SQL functions used to implicitly convert an RDD to a DataFrame.

import sqlContext.implicits._

 
Next we have to define the schema for customers record data using a case class. The following command is used to declare the case class based on the given data (cust_no, cust_name, orders, price, city)

case class customers_schema(cust_no:Int, cust_name:String, orders:String, price:Int, city:String)


Now, above I have already created the RDD Load_Data and using the map function Converting it into DataFrame.

In below command, we can see that here two map functions are defined, one is for splitting the text record into fields   .map(x => x.split(","))      


and the second map function for converting individual fields
(cust_no, cust_name, orders, price, city) into one case class object 
.map(x => customers_schema(x(0).trim.toInt, x(1), x(2), x(3).trim.toInt, x(4)))
And finally, toDF methos is used for converting the case class object with schema into a DataFrame.
val map_customers_schema = RDD1.map(x => x.split(",")).map(x => customers_schema(x(0).trim.toInt, x(1), x(2), x(3).trim.toInt, x(4))).toDF



Next store the Dataframe data in a table named Customers using following command, and now after run this command we can apply all types of SQL statements into it.
map_customers_schema.registerTempTable("Customers")



Now our Customers table is ready with schema and we can pass any SQL query on this table using SQLContext.sql() method.

Here I’ll show you the select query :

This below command to select all the records from the Customers table. Here we use the variable all_records for capturing all records data. To display those records we’ll call show() method on it.

val all_records = sqlContext.sql("select * from Customers")




To see the result/data of the all_records DataFrame, use the following command :
all_records.show()

Finally, we can see the data with schema as a table.


Thank You!










Comments

Popular posts from this blog

Transformations and Actions in Spark

Knowledge about Apache Sqoop and its all basic commands to import and export the Data

How to Convert a Spark DataFrame String Type Column to Array Type and Split all the json files of this column into rows : Part - 1