All Basic Commands and use case of Apache Spark




 //spark2-shell --conf spark.ui.port=4040


Create a Dataframe by parallelize in Memory and collect the output:

val x = sc.parallelize(List("spark rdd example", "sample example"))

val x = sc.parallelize(List("spark rdd example", "sample example"),4)

x.collect()



Read File from local : first create a test file in your local path and try below commands to understand it better :

val textFileLocalTest  = sc.textFile("test.txt");

val textBigFileLocalTest = sc.textFile("/Users/Anamika_Singh3/HadoopExamples/file1.txt");

val textFileLocalTest  = sc.textFile("file:///home/Anamika_Singh3/test.txt");
textFileLocalTest.first();

val textFileLocalTest = sc.textFile("file:///home/Anamika_Singh3/HadoopExamples/file1.txt");


Read file from HDFS

val textFileFromHDFS = sc.textFile(“test.txt");
val textFileFromHDFS = sc.textFile(""/hdfs_path/test.txt"); 
textFileLocalTest.first();

Flat Map
val x = sc.parallelize(List("spark rdd example",  "sample example"))
val y = x.flatMap(x => x.split(" "))
y.take(5);

Map
val z = y.map(x => (x, 1));


Filter
val x = sc.parallelize(1 to 10)
val y = x.filter(num => num%2==0)


Or with partition

val x = sc.parallelize(1 to 10, 2)
val y = x.filter(num => num%2==0)
y.collect();


Reduce
val x = sc.parallelize(1 to 10, 2)
val y = x.reduce((a, b) => (a+b))




Pair RDD Operations

GroupBy

val x = sc.parallelize(Array("Joseph", "Jimmy", "Tina","Thomas", "James", "Cory","Christine", "Jackeline", "Juan"))
val y = x.groupBy(word => word.charAt(0))


y.collect();


ReduceByKey

val x = sc.parallelize(Array(("a", 1), ("b", 1), ("a", 1),("a", 1), ("b", 1),("b", 1),("b", 1), ("b", 1)))
val y = x.reduceByKey((key, value) => (key + value))
y.collect()


SortByKey
val y = x.sortByKey()
y.collect()


Joins

val salesprofit = sc.parallelize(Array(("Cadbury's", 3.5),("Nestle", 2.8),("Mars", 2.5), ("Thorton's", 2.2)));

val salesyear = sc.parallelize(Array(("Cadbury's", 2015),("Nestle", 2014),("Mars", 2014), ("Thorton's", 2013)));

val join = salesprofit.join(salesyear);

join.collect();





Persistance :
Spark gives 5 types of Storage level
1     .     MEMORY_ONLY
2     .     MEMORY_ONLY_SER
3     .     MEMORY_AND_DISK
4     .     MEMORY_AND_DISK_SER
5     .     DISK_ONLY
Cache() will use MEMORY_ONLY. If you want to use something else, use persist(StorageLevel.<*storage type*>).
By default persist() will store the data in the JVM heap as unserialized objects.

Ex :
val x = sc.parallelize(List("spark rdd example",  "sample example"))
x.persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK)



Broadcast Variables

val pws = Map("Apache Spark" -> "http://spark.apache.org/", "Scala" ->

"http://www.scala-lang.org/")

val websites = sc.parallelize(Seq("Apache Spark").map(pws).collect

val pwsB = sc.broadcast(pws)

val websitesWithBroadCast = sc.parallelize(Seq("Apache

Spark”)).map(pwsB.value).collect


Aggregate Functions

Create test data

val x = sc.parallelize(List(1,2,3,4,5,6))

Create function to print the content of RDDs with Partition Labels 

def myfunc(index: Int, iter: Iterator[(Int)]) : Iterator[String] = { iter.toList.map(x => "[partID:" + index + ", val: " + x + "]").iterator


}

Pass your function to mapPartitionsWithIndex function and get the value

val y = x.mapPartitionsWithIndex(myfunc)

y.collect()

Now use the aggregate function 

x.aggregate(0)(math.max(_, _), _ + _) (Result = 14) x.aggregate(5)(math.max(_, _), _ + _) (Result = 26) x.aggregate(7)(math.max(_, _), _ + _) (Result = 35) x.aggregate(7)(math.min(_, _), _ + _) (Result = 19)


Note :- Takes the zero value and apply it to the partition and evaluate the function. The results are added for all the partitions. Then the zero value added to the result to get the final result


3rd Example elaboration

7 (max from 1st partition)+7 (max from 2nd partition)+7 (max from 3rd partition)+7 (max from 4th partition)+7(zero value)=35

With Minimum zero value 7

1 (min from 1st partition)+2 (min from 2nd partition)+4 (min from 3rd partition)+5 (max from 4th partition)+7(zero value)=19



Cartesain Function

val x = sc.parallelize(List("spark rdd example", "sample example")) val y = sc.parallelize(List("spark rdd example", "sample example")) x.cartesian(y).collect


Coalesce Function

val x = (1 to 10).toList
val numbersDF = x.toDF()
numbersDF.rdd.partitions.size
val numbersDF2 = numbersDF.coalesce(2)
numbersDF.write.csv("/Users/Anamika/numbers1")

numbersDF2.write.csv("/Users/Anamika/numbers2")


Cogroup Function

val rdd1 = sc.parallelize(Seq( ("key1", 1), ("key2", 2),("key1", 3)))

val rdd1 = sc.parallelize(Seq( ("key1", 5), ("key2", 4)))

val cogrouped = rdd1.cogroup(rdd2)


Fold Function

val rdd1 = sc.parallelize(List( (“maths”, 80), (“Science”, 90)))
val additionalMarks = ("extra",4)

val sum = rdd1.fold(additionalMarks){(rdd,extraMarks)=> val sum = rdd._2+extraMarks._2 ("total",sum)}

Repartition Function


val x = sc.parallelize(List("spark rdd example", "sample example")) x.partitions.length
—4

val y = x.repartition(6)

y.partitions.length
—6


RDD Actions - countByKey, collectAsMap and lookUp

val rdd =
sc.parallelize(Seq((“maths”,55),(“maths”,56),(“english”,57),(“english”,58),
(“science”,59),(“science”,54)))

rdd.countByKey
rdd.collectAsMap (will give last value of the key)
rdd.lookup(“maths”)


GLOM Function

val dataList = List(50.0,40.0,40.0,70.0)
val dataRDD = sc.makeRDD(dataList)
val maxValue = dataRDD.reduce(_ max _)

—not too good, all the partition values will get shuffled. Use glom instead which treats values of partitions as Arrays.

val maxValue = dataRDD.glom().map((value:Array[Double]) => value.max).reduce(_ max _)


Checkpoint

sc.setCheckpointDir("/Users/Anamika/checkpoint")
dataRDD.checkpoint()
dataRDD.collect


Saving Data

textFileLocalTest.saveAsTextFile("file:///home/Anamika/sparkout

put")
textFileLocalTest.saveAsTextFile("/sparkoutput")





Spark SQL

Data Frames


Simple One

val df = Seq("Hey, dataframe !!").toDF("text")

df.show

df.printSchema

With Case Class

case class Person(name:String,age:Long)

val people =

Seq(Person("Jacek",42),Person("Patrick",19),Person("Mackinson"

,5))

val peopleDF = people.toDF()

peopleDF.show

peopleDF.printSchema


Just Another Way

val peopleDFAnother = spark.createDataFrame(people)

peopleDFAnother.show

peopleDFAnother.printSchema



Working with Data

Dataframe from Text File

val peopleDFFromFile =

spark.sparkContext.textFile("sparksql/people.txt").map(_.split(",")).map(at tributes => Person(attributes(0), attributes(1).trim.toInt)).toDF()

peopleDFFromFile.show

peopleDFFromFile.printSchema

peopleDFFromFile.createOrReplaceTempView("people")

val viewDF = spark.sql("SELECT * FROM people")

viewDF.show

viewDF.printSchema



val teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19")

teenagersDF.show

teenagersDF.printSchema


Dataframe from jSON File

val jSONDF = spark.sqlContext.read.json(“sparksql/sample.json")

jSONDF.select("name").show();

jSONDF.select(jSONDF("name"),jSONDF("age")+1).show();

jSONDF.filter(jSONDF("age")>21).show()

jSONDF.groupBy("age").count().show();

-     select name,max(age) from df group by name , equivalent in spark jSONDF.groupBy("name").max("age").show



Interacting with Hive
import org.apache.spark.sql.Row

import org.apache.spark.sql.SparkSession val warehouseLocation = "spark-warehouse"

val spark = SparkSession.builder().appName("Spark Hive Example").config("spark.sql.warehouse.dir", warehouseLocation).enableHiveSupport().getOrCreate()

import spark.implicits._
import spark.sql

sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")

sql("LOAD DATA LOCAL INPATH
'/Users/Anamika/Desktop/HadoopExamples/Spark/kv1.txt' INTO TABLE src")

sql("SELECT * FROM src").show()

sql("select current_database()").show(false)






Comments

Popular posts from this blog

Transformations and Actions in Spark

How to Convert a Spark DataFrame to Map in Scala

How to Handle and Convert DateTime format in Spark-Scala.