Transformations and Actions in Spark
Hello Friends, Today I’ll try
to explain about Transformations and Actions in Spark.
We know that Basically, Spark RDD supports two types of Operations :
· Transformations
· Actions
Transformations :
Transformations
are kind of operations which will transform our RDD data from one form to another
form, and when we apply this operation on any RDD, we get a new RDD with
transformed data as RDDs in Spark are
immutable. Operations like map, flatMap, filter, join, union are
transformations.
Now
there is a point to be noted here and that is when we apply the transformation
on any RDD it will not perform the operation immediately. It creates a
DAG(Directed Acyclic Graph) using the applied operations, source RDD and
functions used for transformation. And it keeps on building this graph using
the references till we apply any action operation on the last lined up RDD.
That is why the transformation in Spark are lazy.
After the transformation, the
resultant RDD is always different from its parent RDD. It can be smaller (e.g.
filter, count, distinct, sample), bigger (e.g. flatMap(), union(), Cartesian())
or the same size (e.g. map).
There are two types of transformations:
· Narrow transformation – In Narrow transformation, all the elements
that are required to compute the records in single partition live in the single
partition of parent RDD. A limited subset of partition is used to calculate the
result.
Example of Narrow
Transformation : map, flatMap, mapPartition, filter, sample and union etc.
· Wide transformation – In wide transformation, all the elements that are
required to compute the records in the single partition may live in many
partitions of parent RDD. The partition may live in many partitions of parent
RDD.
Example of Wide Transformation
: reduceByKey, groupByKey, intersection, distinct, join, cartesian, repartition
and coalesce etc.
In the below diagram we can
find out the differences(way to process the data) between both type of
transformations :
Transformations create RDDs from
each other, but when we want to work with the actual dataset, at that point
action is performed. When the action is triggered after the result, new RDD is
not formed like transformation. Thus, actions are RDD operations that give
non-RDD values. The values of action are stored to drivers or to the external
storage system. It brings laziness of RDD into motion.
Actions triggers execution
using lineage graph to
load the data into original RDD
Ex: count(), first(), reduce(), collect(), aggregate(), fold() etc.
Ex: count(), first(), reduce(), collect(), aggregate(), fold() etc.
Spark
drivers and external storage system store the value of action. It brings
laziness of RDD into motion.
An action is one of the ways of
sending data from Executer to the driver. Executors are agents that are responsible for
executing a task. While the driver is a JVM process that coordinates workers
and execution of the task.
Here I am listing Transformations and Actions as categorised
:
1. Essential Core & Intermediate Spark Operations
Transformation :
General :
·
map
·
flatmap
·
filter
·
mapPartitions
·
mapPartitionsWithIndex
·
groupBy
·
sortBy
Math / Statistical :
·
sample
·
randomSplit
Set Theory / Relational :
·
union
·
intersection
·
subtract
·
distinct
·
cartesian
·
zip
Data Structure / I / O :
·
keyBy
·
zipWithIndex
·
zipWithUniqueID
·
zipPartitions
·
coalesce
·
repartition
·
repartitionAndSortWithinPartitions
·
pipe
Actions :
General
:
·
Reduce
·
Collect
·
aggregate
·
fold
·
first
·
take
·
forEach
·
top
·
treeAggregate
·
treeReduce
·
forEachPartition
·
collectAsMap
Math
/ Statistical :
· takeSample
· max
· min
· sum
· histogram
· mean
· variance
· stdev
· sampleVariance
· countApprox
· countApproxDistinct
Set Theory / Relational :
·
takeOrdered
Data
Structure / I / O :
·
saveAsTextFile
·
saveAsSequenceFile
· saveAsObjectFile
· saveAsHadoopDataset
· saveAsHadoopFile
· saveAsNewAPIHadoopDataset
· saveAsNewAPIHadoopFile
· saveAsObjectFile
· saveAsHadoopDataset
· saveAsHadoopFile
· saveAsNewAPIHadoopDataset
· saveAsNewAPIHadoopFile
2. Essential Core & Intermediate PairRDDOperations
Transformations :
General
:
·
flatMapValues
·
groupByKey
·
reduceByKey
·
reduceByKeyLocally
·
foldByKey
·
aggregateByKey
·
sortByKey
·
combineByKey
Math / Statistical :
·
sampleByKey
Set
Theory / Relational :
·
cogroup(=groupWith)
·
join
·
subtractByKey
·
fullOuterJoin
·
leftOuterJoin
·
rightOuterJoin
Data Structure :
·
partitionBy
Actions :
General :
·
keys
·
values
Math / Statistical :
· countByKey
· countByValue
· countByKeyApprox
· countByValueApprox
· countApproxDistinctByKey
· sampleByKeyExact
· countByKey
· countByValue
· countByKeyApprox
· countByValueApprox
· countApproxDistinctByKey
· sampleByKeyExact
Now, below I’ll try to brief some
of the Transformations and Actions :
Transformation :
The map function iterates over
every line in RDD and split into new RDD. Using map() transformation we
take in any function, and that function is applied to every element of RDD.
In the map, we have the
flexibility that the input and the return type of RDD may differ from each
other. For example, we can have input RDD type as String, after applying the
map() function the return RDD can be Boolean.
With the help of flatMap()
function, to each input element, we have many elements in an output RDD. The
most simple use of flatMap() is to split each input string into words.
Map and flatMap are similar in
the way that they take a line from input RDD and apply a function on that line.
The key difference between map() and
flatMap() is map() returns only one element, while flatMap() can
return a list of elements.
Spark RDD filter()
function returns a new RDD, containing only the elements that meet a predicate.
It is a narrow operation because it does not shuffle data from one
partition to many partitions.
The MapPartition
converts each partition of the source RDD into many elements of the
result (possibly none). In mapPartition(), the map() function is applied on
each partitions simultaneously. MapPartition is like a map, but the difference
is it runs separately on each partition(block) of the RDD.
It is like mapPartition;
Besides mapPartition it provides func with an integer value representing
the index of the partition, and the map() is applied on partition index wise
one after the other.
With the union()
function, we get the elements of both the RDD in new RDD. The key rule of this
function is that the two RDDs should be of the same type.
With the intersection()
function, we get only the common element of both the RDD in new RDD. The key
rule of this function is that the two RDDs should be of the same type.
It returns a new dataset that
contains the distinct elements of the source dataset. It is helpful to
remove duplicate data.
groupByKey([numPartitions])
When we use groupByKey()
on a dataset of (K, V) pairs, the data is shuffled according to the key value K
in another RDD. In this transformation, lots of unnecessary data get to
transfer over the network.
When we use reduceByKey()
on a dataset (K, V), the pairs on the same machine with the same key are
combined, before the data is shuffled.
When we apply the sortByKey()
function on a dataset of (K, V) pairs, the data is sorted according to the
key K in another RDD.
Ø join()
The Join is database
term. It combines the fields from two table using common values. join()
operation in Spark is defined on pair-wise RDD. Pair-wise RDDs are RDD in which
each element is in the form of tuples. Where the first element is key and the
second element is the value.
The boon of using keyed data
is that we can combine the data together. The join() operation combines two
data sets on the basis of the key.
Ø cogroup(otherDataset,
[numPartitions])
When we called
cogroup() on datasets of type (K, V) and (K, W), returns a dataset of (K,
(Iterable<V>, Iterable<W>)) tuples. This operation is also called groupWith.
To avoid full shuffling of
data we use coalesce() function. In coalesce() we use existing partition
so that less data is shuffled. Using this we can cut the number of the
partition. Suppose, we have four nodes and we want only two nodes. Then the
data of extra nodes will be kept onto nodes which we kept.
Ø Repartition()
It reshuffle the data in the RDD
randomly to create either more or fewer partitions and balance it across them.
This always shuffles all data over the network.
Ø
repartitionAndSortWithinPartitions(partitioner)
When we apply this function on a dataset, it repartition the RDD according to the given
partitioner and, within each resulting partition, sort records by their keys.
This is more efficient than calling repartition and then sorting within each
partition because it can push the sorting down into the shuffle machinery
Actions :
Ø reduce(func)
It aggregates the elements of the dataset using a function func
(which takes two arguments and returns one). The function should be commutative
and associative so that it can be computed correctly in parallel.
Ø collect()
It returns all the elements
of the dataset as an array at the driver program. This is usually useful after
a filter or other operation that returns a sufficiently small subset of the
data.
Ø count()
It returns the number of elements in the dataset.
Ø first()
It returns the first element of the dataset (its similar to
take(1)).
Ø take(n)
It returns an array with the first n elements of the
dataset.
Ø takeSample(withReplacement, num, [seed])
It returns an array with a
random sample of num elements of the dataset, with or without
replacement, optionally pre-specifying a random number generator seed.
Ø takeOrdered(n, [ordering])
It returns the first n
elements of the RDD using either their natural order or a custom comparator.
Ø saveAsTextFile(path)
Write the elements of the
dataset as a text file (or set of text files) in a given directory in the local
filesystem, HDFS or any other Hadoop-supported file system. Spark will call
toString on each element to convert it to a line of text in the file.
Ø saveAsSequenceFile(path) => (Java and Scala)
Write the elements of the
dataset as a Hadoop SequenceFile in a given path in the local filesystem, HDFS
or any other Hadoop-supported file system. This is available on RDDs of
key-value pairs that implement Hadoop's Writable interface. In Scala, it is
also available on types that are implicitly convertible to Writable (Spark
includes conversions for basic types like Int, Double, String, etc).
Ø saveAsObjectFile(path)
=> (Java and Scala)
Write the elements of the
dataset in a simple format using Java serialization, which can then be loaded
using
SparkContext.objectFile()
.
Ø countByKey()
This is only available on
RDDs of type (K, V). Returns a hashmap of (K, Int) pairs with the count of each
key.
Ø foreach(func)
It runs a function func
on each element of the dataset. This is usually done for side effects such as
updating an Accumulator or interacting
with external storage systems.
Finally, we tried to explore
more about Transformations and Actions, in next session I’ll try to explain all
the Transformations and Actions with example using practicles.
Thanks a lot to read this
post, and if you have any question or suggestions, kindly leave a comment.
Thank You!
Comments
Post a Comment