Transformations and Actions in Spark



Hello Friends, Today I’ll try to explain about Transformations and Actions in Spark.

We know that Basically, Spark RDD supports two types of Operations :
        ·  Transformations
        ·  Actions



Transformations :
Transformations are kind of operations which will transform our RDD data from one form to another form, and when we apply this operation on any RDD, we get a new RDD with transformed data  as RDDs in Spark are immutable. Operations like map, flatMap, filter, join, union are transformations.
Now there is a point to be noted here and that is when we apply the transformation on any RDD it will not perform the operation immediately. It creates a DAG(Directed Acyclic Graph) using the applied operations, source RDD and functions used for transformation. And it keeps on building this graph using the references till we apply any action operation on the last lined up RDD. That is why the transformation in Spark are lazy.
After the transformation, the resultant RDD is always different from its parent RDD. It can be smaller (e.g. filter, count, distinct, sample), bigger (e.g. flatMap(), union(), Cartesian()) or the same size (e.g. map).

There are two types of transformations:
 ·   Narrow transformation – In Narrow transformation, all the elements that are required to compute the records in single partition live in the single partition of parent RDD. A limited subset of partition is used to calculate the result.
Example of Narrow Transformation : map, flatMap, mapPartition, filter, sample and union etc.

·  Wide transformation – In wide transformation, all the elements that are required to compute the records in the single partition may live in many partitions of parent RDD. The partition may live in many partitions of parent RDD.

Example of Wide Transformation : reduceByKey, groupByKey, intersection, distinct, join, cartesian, repartition and coalesce etc.



In the below diagram we can find out the differences(way to process the data) between both type of transformations :




 Actions :
Transformations create RDDs from each other, but when we want to work with the actual dataset, at that point action is performed. When the action is triggered after the result, new RDD is not formed like transformation. Thus, actions are RDD operations that give non-RDD values. The values of action are stored to drivers or to the external storage system. It brings laziness of RDD into motion.
Actions triggers execution using lineage graph to load the data into original RDD
Ex: count(), first()
, reduce(), collect(), aggregate(), fold() etc.

Spark drivers and external storage system store the value of action. It brings laziness of RDD into motion.

An action is one of the ways of sending data from Executer to the driver. Executors are agents that are responsible for executing a task. While the driver is a JVM process that coordinates workers and execution of the task.

  
Here I am listing Transformations and Actions as categorised :



1.  Essential Core & Intermediate Spark Operations



Transformation :

General :
        ·        map
        ·        flatmap
        ·        filter
        ·        mapPartitions
        ·        mapPartitionsWithIndex
        ·        groupBy
        ·        sortBy

Math / Statistical :

·        sample
·        randomSplit

Set Theory / Relational :

·        union
·        intersection
·        subtract
·        distinct
·        cartesian
·        zip

Data Structure / I / O :
·        keyBy
·        zipWithIndex
·        zipWithUniqueID
·        zipPartitions
·        coalesce
·        repartition
·        repartitionAndSortWithinPartitions
·        pipe




  Actions :

General :

                                     ·         Reduce
·         Collect
·         aggregate
·         fold
·         first
·         take
·         forEach
·         top
·         treeAggregate
·         treeReduce
·         forEachPartition
·         collectAsMap


Math / Statistical :

                                     ·         count
                                     ·         takeSample
                                     ·         max
                                     ·         min
                                     ·         sum
                                     ·         histogram
                                     ·         mean
                                     ·         variance
                                     ·         stdev
                                     ·         sampleVariance
                                     ·         countApprox
                                     ·         countApproxDistinct

Set Theory / Relational :
                             ·        takeOrdered


Data Structure / I / O  :
                                       ·         saveAsTextFile
                                       ·         saveAsSequenceFile
                                       ·         saveAsObjectFile
                                       ·         saveAsHadoopDataset
                                       ·         saveAsHadoopFile
                                       ·         saveAsNewAPIHadoopDataset
                                       ·         saveAsNewAPIHadoopFile





2.  Essential Core & Intermediate PairRDDOperations


Transformations :


General :
·        flatMapValues
·        groupByKey
·        reduceByKey
·        reduceByKeyLocally
·        foldByKey
·        aggregateByKey
·        sortByKey
·        combineByKey

Math / Statistical :
·        sampleByKey

Set Theory / Relational :
·        cogroup(=groupWith)
·        join
·        subtractByKey
·        fullOuterJoin
·        leftOuterJoin
·        rightOuterJoin

Data Structure :
                           ·        partitionBy



Actions :

                     General :
                                   ·        keys
                                   ·        values

                 Math / Statistical :
                               ·        countByKey
                               ·        countByValue
                               ·        countByKeyApprox
                               ·        countByValueApprox
                               ·        countApproxDistinctByKey
                               ·        sampleByKeyExact

  

Now, below I’ll try to brief some of the Transformations and Actions :

Transformation :
The map function iterates over every line in RDD and split into new RDD. Using map() transformation we take in any function, and that function is applied to every element of RDD.
In the map, we have the flexibility that the input and the return type of RDD may differ from each other. For example, we can have input RDD type as String, after applying the map() function the return RDD can be Boolean.
With the help of flatMap() function, to each input element, we have many elements in an output RDD. The most simple use of flatMap() is to split each input string into words.
Map and flatMap are similar in the way that they take a line from input RDD and apply a function on that line. The key difference between map() and flatMap() is map() returns only one element, while flatMap() can return a list of elements.
Spark RDD filter() function returns a new RDD, containing only the elements that meet a predicate. It is a narrow operation because it does not shuffle data from one partition to many partitions.
The MapPartition converts each partition of the source RDD into many elements of the result (possibly none). In mapPartition(), the map() function is applied on each partitions simultaneously. MapPartition is like a map, but the difference is it runs separately on each partition(block) of the RDD.
It is like mapPartition; Besides mapPartition it provides func with an integer value representing the index of the partition, and the map() is applied on partition index wise one after the other.
With the union() function, we get the elements of both the RDD in new RDD. The key rule of this function is that the two RDDs should be of the same type.
With the intersection() function, we get only the common element of both the RDD in new RDD. The key rule of this function is that the two RDDs should be of the same type.
It returns a new dataset that contains the distinct elements of the source dataset. It is helpful to remove duplicate data.
groupByKey([numPartitions])
When we use groupByKey() on a dataset of (K, V) pairs, the data is shuffled according to the key value K in another RDD. In this transformation, lots of unnecessary data get to transfer over the network.
When we use reduceByKey() on a dataset (K, V), the pairs on the same machine with the same key are combined, before the data is shuffled.
When we apply the sortByKey() function on a dataset of (K, V) pairs, the data is sorted according to the key K in another RDD.
Ø join()
The Join is database term. It combines the fields from two table using common values. join() operation in Spark is defined on pair-wise RDD. Pair-wise RDDs are RDD in which each element is in the form of tuples. Where the first element is key and the second element is the value.
The boon of using keyed data is that we can combine the data together. The join() operation combines two data sets on the basis of the key.
Ø  cogroup(otherDataset, [numPartitions])
When we called cogroup() on datasets of type (K, V) and (K, W), returns a dataset of (K, (Iterable<V>, Iterable<W>)) tuples. This operation is also called groupWith.

To avoid full shuffling of data we use coalesce() function. In coalesce() we use existing partition so that less data is shuffled. Using this we can cut the number of the partition. Suppose, we have four nodes and we want only two nodes. Then the data of extra nodes will be kept onto nodes which we kept.
Ø  Repartition()
It reshuffle the data in the RDD randomly to create either more or fewer partitions and balance it across them. This always shuffles all data over the network.

Ø  repartitionAndSortWithinPartitions(partitioner)
When we apply this function on a dataset, it repartition the RDD according to the given partitioner and, within each resulting partition, sort records by their keys. This is more efficient than calling repartition and then sorting within each partition because it can push the sorting down into the shuffle machinery


Actions :
Ø  reduce(func)
It aggregates the elements of the dataset using a function func (which takes two arguments and returns one). The function should be commutative and associative so that it can be computed correctly in parallel.

Ø  collect()
It returns all the elements of the dataset as an array at the driver program. This is usually useful after a filter or other operation that returns a sufficiently small subset of the data.

Ø  count()
It returns the number of elements in the dataset.

Ø  first()
It returns the first element of the dataset (its similar to take(1)).

Ø  take(n)
It returns an array with the first n elements of the dataset.

Ø  takeSample(withReplacement, num, [seed])
It returns an array with a random sample of num elements of the dataset, with or without replacement, optionally pre-specifying a random number generator seed.

Ø  takeOrdered(n, [ordering])
It returns the first n elements of the RDD using either their natural order or a custom comparator.


Ø  saveAsTextFile(path)
Write the elements of the dataset as a text file (or set of text files) in a given directory in the local filesystem, HDFS or any other Hadoop-supported file system. Spark will call toString on each element to convert it to a line of text in the file.

Ø  saveAsSequenceFile(path) => (Java and Scala)
Write the elements of the dataset as a Hadoop SequenceFile in a given path in the local filesystem, HDFS or any other Hadoop-supported file system. This is available on RDDs of key-value pairs that implement Hadoop's Writable interface. In Scala, it is also available on types that are implicitly convertible to Writable (Spark includes conversions for basic types like Int, Double, String, etc).

Ø  saveAsObjectFile(path)  => (Java and Scala)
Write the elements of the dataset in a simple format using Java serialization, which can then be loaded using SparkContext.objectFile().

Ø  countByKey()
This is only available on RDDs of type (K, V). Returns a hashmap of (K, Int) pairs with the count of each key.

Ø  foreach(func)
It runs a function func on each element of the dataset. This is usually done for side effects such as updating an Accumulator or interacting with external storage systems.


Finally, we tried to explore more about Transformations and Actions, in next session I’ll try to explain all the Transformations and Actions with example using practicles.
Thanks a lot to read this post, and if you have any question or suggestions, kindly leave a comment.

Thank You!













Comments

Popular posts from this blog

How to Convert a Spark DataFrame to Map in Scala

How to Handle and Convert DateTime format in Spark-Scala.