All Basic Commands and use case of Apache Spark
Create
a Dataframe by parallelize in Memory and collect the output:
val
x = sc.parallelize(List("spark rdd example", "sample example"))
val x =
sc.parallelize(List("spark rdd example", "sample
example"),4)
x.collect()
Read File from local : first create a test
file in your local path and try below commands to understand it better :
val textFileLocalTest =
sc.textFile("test.txt");
val textBigFileLocalTest = sc.textFile("/Users/Anamika_Singh3/HadoopExamples/file1.txt");
val textFileLocalTest = sc.textFile("file:///home/Anamika_Singh3/test.txt");
textFileLocalTest.first();
val textFileLocalTest =
sc.textFile("file:///home/Anamika_Singh3/HadoopExamples/file1.txt");
Read file from HDFS
val textFileFromHDFS =
sc.textFile(“test.txt");
val textFileFromHDFS = sc.textFile(""/hdfs_path/test.txt");
textFileLocalTest.first();
textFileLocalTest.first();
Flat
Map
val
x = sc.parallelize(List("spark rdd example", "sample
example"))
val y = x.flatMap(x => x.split("
"))
y.take(5);
Map
val z = y.map(x => (x, 1));
Filter
val x = sc.parallelize(1 to 10)
val y = x.filter(num => num%2==0)
Or
with partition
val
x = sc.parallelize(1 to 10, 2)
val y =
x.filter(num => num%2==0)
y.collect();
Reduce
val x =
sc.parallelize(1 to 10, 2)
val y =
x.reduce((a, b) => (a+b))
Pair
RDD Operations
GroupBy
val
x = sc.parallelize(Array("Joseph", "Jimmy",
"Tina","Thomas", "James",
"Cory","Christine", "Jackeline",
"Juan"))
val y =
x.groupBy(word => word.charAt(0))
ReduceByKey
val
x = sc.parallelize(Array(("a", 1), ("b", 1),
("a", 1),("a", 1), ("b", 1),("b",
1),("b", 1), ("b", 1)))
val y =
x.reduceByKey((key, value) => (key + value))
y.collect()
SortByKey
val y =
x.sortByKey()
y.collect()
Joins
val
salesprofit = sc.parallelize(Array(("Cadbury's",
3.5),("Nestle", 2.8),("Mars", 2.5), ("Thorton's",
2.2)));
val
salesyear = sc.parallelize(Array(("Cadbury's",
2015),("Nestle", 2014),("Mars", 2014),
("Thorton's", 2013)));
val
join = salesprofit.join(salesyear);
join.collect();
Persistance
:
Spark
gives 5 types of Storage level
1 .
MEMORY_ONLY
2 .
MEMORY_ONLY_SER
3 .
MEMORY_AND_DISK
4 .
MEMORY_AND_DISK_SER
5 .
DISK_ONLY
Cache() will use MEMORY_ONLY. If you want to use something
else, use persist(StorageLevel.<*storage type*>).
By default persist() will
store the data in the JVM heap as unserialized objects.
Ex :
val
x = sc.parallelize(List("spark rdd example", "sample
example"))
x.persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK)
Broadcast
Variables
val
pws = Map("Apache Spark" -> "http://spark.apache.org/",
"Scala" ->
"http://www.scala-lang.org/")
val
websites = sc.parallelize(Seq("Apache Spark").map(pws).collect
val
pwsB = sc.broadcast(pws)
val
websitesWithBroadCast = sc.parallelize(Seq("Apache
Spark”)).map(pwsB.value).collect
Aggregate
Functions
Create test data
val x =
sc.parallelize(List(1,2,3,4,5,6))
Create function to print the content of RDDs
with Partition Labels
def myfunc(index: Int, iter: Iterator[(Int)])
: Iterator[String] = { iter.toList.map(x => "[partID:" + index +
", val: " + x + "]").iterator
Pass your function to mapPartitionsWithIndex
function and get the value
val y =
x.mapPartitionsWithIndex(myfunc)
y.collect()
Now use the aggregate function
x.aggregate(0)(math.max(_,
_), _ + _) (Result = 14) x.aggregate(5)(math.max(_, _), _ + _) (Result = 26)
x.aggregate(7)(math.max(_, _), _ + _) (Result = 35) x.aggregate(7)(math.min(_,
_), _ + _) (Result = 19)
Note :- Takes the
zero value and apply it to the partition and evaluate the function. The results
are added for all the partitions. Then the zero value added to the result to
get the final result
3rd Example
elaboration
7 (max from
1st partition)+7 (max from 2nd partition)+7 (max from 3rd partition)+7 (max
from 4th partition)+7(zero value)=35
With Minimum zero value 7
1 (min from
1st partition)+2 (min from 2nd partition)+4 (min from 3rd partition)+5 (max
from 4th partition)+7(zero value)=19
Cartesain
Function
val x = sc.parallelize(List("spark rdd
example", "sample example")) val y =
sc.parallelize(List("spark rdd example", "sample example"))
x.cartesian(y).collect
Coalesce
Function
val x = (1 to
10).toList
val numbersDF
= x.toDF()
numbersDF.rdd.partitions.size
val
numbersDF2 = numbersDF.coalesce(2)
numbersDF.write.csv("/Users/Anamika/numbers1")
numbersDF2.write.csv("/Users/Anamika/numbers2")
Cogroup
Function
val rdd1 =
sc.parallelize(Seq( ("key1", 1), ("key2",
2),("key1", 3)))
val rdd1 =
sc.parallelize(Seq( ("key1", 5), ("key2", 4)))
val cogrouped
= rdd1.cogroup(rdd2)
Fold
Function
val rdd1 =
sc.parallelize(List( (“maths”, 80), (“Science”, 90)))
val additionalMarks = ("extra",4)
val sum =
rdd1.fold(additionalMarks){(rdd,extraMarks)=> val sum = rdd._2+extraMarks._2
("total",sum)}
Repartition
Function
val
x = sc.parallelize(List("spark rdd example", "sample
example")) x.partitions.length
—4
val
y = x.repartition(6)
y.partitions.length
—6
RDD
Actions - countByKey, collectAsMap and lookUp
val rdd =
sc.parallelize(Seq((“maths”,55),(“maths”,56),(“english”,57),(“english”,58),
(“science”,59),(“science”,54)))
rdd.countByKey
rdd.collectAsMap (will give last value of the
key)
rdd.lookup(“maths”)
GLOM Function
val dataList = List(50.0,40.0,40.0,70.0)
val dataRDD = sc.makeRDD(dataList)
val maxValue = dataRDD.reduce(_ max _)
—not too
good, all the partition values will get shuffled. Use glom instead which treats
values of partitions as Arrays.
val maxValue
= dataRDD.glom().map((value:Array[Double]) => value.max).reduce(_ max _)
Checkpoint
sc.setCheckpointDir("/Users/Anamika/checkpoint")
dataRDD.checkpoint()
dataRDD.collect
Saving
Data
textFileLocalTest.saveAsTextFile("file:///home/Anamika/sparkout
put")
textFileLocalTest.saveAsTextFile("/sparkoutput")
Spark
SQL
Data
Frames
Simple One
val df =
Seq("Hey, dataframe !!").toDF("text")
df.show
df.printSchema
With Case Class
case class
Person(name:String,age:Long)
val people =
Seq(Person("Jacek",42),Person("Patrick",19),Person("Mackinson"
,5))
val peopleDF
= people.toDF()
peopleDF.show
peopleDF.printSchema
Just Another Way
val
peopleDFAnother = spark.createDataFrame(people)
peopleDFAnother.show
peopleDFAnother.printSchema
Working
with Data
Dataframe
from Text File
val
peopleDFFromFile =
spark.sparkContext.textFile("sparksql/people.txt").map(_.split(",")).map(at
tributes => Person(attributes(0), attributes(1).trim.toInt)).toDF()
peopleDFFromFile.show
peopleDFFromFile.printSchema
peopleDFFromFile.createOrReplaceTempView("people")
val viewDF =
spark.sql("SELECT * FROM people")
viewDF.show
viewDF.printSchema
val
teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13
AND 19")
teenagersDF.show
teenagersDF.printSchema
Dataframe from jSON File
val jSONDF =
spark.sqlContext.read.json(“sparksql/sample.json")
jSONDF.select("name").show();
jSONDF.select(jSONDF("name"),jSONDF("age")+1).show();
jSONDF.filter(jSONDF("age")>21).show()
jSONDF.groupBy("age").count().show();
-
select
name,max(age) from df group by name , equivalent in spark
jSONDF.groupBy("name").max("age").show
Interacting
with Hive
import org.apache.spark.sql.Row
import
org.apache.spark.sql.SparkSession val warehouseLocation =
"spark-warehouse"
val spark = SparkSession.builder().appName("Spark
Hive Example").config("spark.sql.warehouse.dir",
warehouseLocation).enableHiveSupport().getOrCreate()
import
spark.implicits._
import spark.sql
sql("CREATE
TABLE IF NOT EXISTS src (key INT, value STRING)")
sql("LOAD
DATA LOCAL INPATH
'/Users/Anamika/Desktop/HadoopExamples/Spark/kv1.txt'
INTO TABLE src")
sql("SELECT
* FROM src").show()
sql("select
current_database()").show(false)
Comments
Post a Comment