Create Schema using case class for a RDD
Dear friends, today we’ll
learn to create schema using using case class for a RDD.
This method uses reflection to
generate the schema of an RDD that contains specific type of objects.
The scala interface foe Spark
SQL supports automatically converting an RDD containing case classes to a DataFrame. The case class defines the schema of the table/loaded data file
into RDD.
The names of the case class
are read using reflection and they become the name of the columns.
The case classes can also be
nested or contain complex types such as Sequences of arrays. This RDD can be
implicitly be converted to a DataFrame and then registered as a table. Tables
can be used in subsequent SQL Statement.
For Example I have below
customers data file named test_file.csv in hdfs location. I’ll use this data
for this example to create schema using case class for a RDD.
1,Divyanshu,Laptop,75000,Delhi
2,Himanshu,Laptop,95000,Delhi
3,Sushil,Hard Disk,20000,Delhi
4,Sidharth,Mobile,75000,Kolkata
5,Surendra,Desktop,70000,Kolkata
6,Mohit,Pen Drive,25000,Kolkata
7,Mohit,Laptop,65000,Hyderabad
8,Prabhu,Laptop,50000,Hyderabad
9,Rajesh,Head Phone,10000,Hyderabad
10,Mohit,Mobile,40000,Chennai
11,Sonia,Desktop,25000,Chennai
12,Deepthi,Head Phone,10000,Chennai
13,Priyanka,Desktop,56000,Noida
14,Prashant,Hard Disk,35000,Noida
15,Ayush,Mobile,15000,Noida
First I’ll load the data into
a RDD named Load_Data as below :
val Load_data =
sc.textFile("/user/g.aspiresit.001/Anamika/test_file.csv")
Now we will check this above
RDD to see the recods of it :
val show_data =
Load_data.collect
Now I want to print all
records line by line :
val print_data_by_line = Load_data.foreach(println)
Now next step is to create
SQLContext using following command : Here sc means SparkContext object.
val
sqlContext = new org.apache.spark.sql.SQLContext(sc)
and now use the following
command to import all the SQL functions used to implicitly convert an RDD to a
DataFrame.
import
sqlContext.implicits._
Next we have to define the schema for customers
record data using a case class. The following command is used to declare the
case class based on the given data (cust_no,
cust_name, orders, price, city)
case class customers_schema(cust_no:Int,
cust_name:String, orders:String, price:Int, city:String)
Now, above I have already created the RDD Load_Data and using the map function Converting it into
DataFrame.
In below command, we can see that here two map
functions are defined, one is for splitting the text record into fields .map(x => x.split(","))
and the second map function for converting
individual fields
(cust_no,
cust_name, orders, price, city) into one case class object
.map(x => customers_schema(x(0).trim.toInt,
x(1), x(2), x(3).trim.toInt, x(4)))
And finally, toDF methos is used for converting the case class object with
schema into a DataFrame.
val
map_customers_schema = RDD1.map(x => x.split(",")).map(x =>
customers_schema(x(0).trim.toInt, x(1), x(2), x(3).trim.toInt, x(4))).toDF
Next store the Dataframe data
in a table named Customers using following command, and now after run this
command we can apply all types of SQL statements into it.
map_customers_schema.registerTempTable("Customers")
Now our Customers table is
ready with schema and we can pass any SQL query on this table using SQLContext.sql() method.
Here I’ll show you the select
query :
This below command to select
all the records from the Customers table. Here we use the variable all_records for capturing all records data. To display those
records we’ll call show() method on it.
val
all_records = sqlContext.sql("select * from Customers")
To see the result/data of the all_records DataFrame, use the following command :
all_records.show()
Finally, we can see the data
with schema as a table.
Thank You!
Comments
Post a Comment