Performance Tuning in Apache Spark



Performance Tuning in Apache Spark :
The process of adjusting settings to record for memory, cores, and all the instances used by the system is termed tuning. This process guarantees that the Spark has optimal performance and prevents resource bottlenecking. Effective changes are made to each property and settings, to ensure the correct usage of resources based on system-specific setup. Apache Spark has in-memory computation nature. As a result resources in the cluster (CPU, memory etc.) may get bottlenecked.
Sometimes to decrease memory usage RDD are stored in serialized form. Data serialization plays very important role in good network performance and can also help in reducing memory usage, and memory tuning.
If used these properly, tuning can be do:
1. Ensure proper use of all resources in an effective manner.
2.Eliminates those jobs that run long.
3.Improves the performance time of the system.
4.Guarantees that jobs are on correct execution engine.
Now we can see in the above image that there are many factors to play important role in Spark performance tuning such as:
a) Data Serialization in Spark
b) Memory Tuning in Spark
c) Spark Data Structure Tuning
d) Spark Garbage Collection Tuning
e) Memory Management in Spark
f) Determining Memory Consumption in Spark

Apart from all above there are some more consideration for Spark Performance Tuning, such as:

g) Level of Parallelism
h) Memory Usage of Reduce Task in Spark
i) Broadcasting Large Variables 

j) Data Locality in Apache Spark 



a) Data Serialization in Spark :

Data Serialization in Apache Spark is the process of converting the in-memory object to another format that can be used to store in a file or send over the network. It plays a distinctive role in the performance of any distributed application. The computation gets slower due to formats that are slow to serialize or consume a large number of files. Apache Spark gives two serialization libraries:
  • Java serialization
  • Kryo serialization
Java serialization – Objects are serialized in Spark using an ObjectOutputStream framework, and can run with any class that implements java.io.Serializable. The performance of serialization can be controlled by extending java.io.Externalizable. It is flexible but slow and leads to large serialized formats for many classes.
Kryo serialization – To serialize objects, Spark can use the Kryo library (Version 2). Although it is more compact than Java serialization, it does not support all Serializable types. For better performance, we need to register the classes in advance. We can switch to Karyo by initializing our job with SparkConf and calling-
conf.set(“spark.serializer”, “org.apache.spark.serializer.KyroSerializer”)
We use the registerKryoClasses method, to register our own class with Kryo. In case our objects are large we need to increase spark.kryoserializer.buffer config. The value should be large so that it can hold the largest object we want to serialize.
b) Memory Tuning in Spark : We can consider the following three things in tuning memory usage:
  • Amount of memory used by objects (the entire dataset should fit in-memory)
  • The cost of accessing those objects
  • Overhead of garbage collection.
The Java objects can be accessed but consume 2-5x more space than the raw data inside their field. The reasons for such behavior are:
  • Every distinct Java object has an “object header”. The size of this header is 16 bytes. Sometimes the object has little data in it, thus in such cases, it can be bigger than the data.
  • There are about 40 bytes of overhead over the raw string data in Java String. It stores each character as two bytes because of String’s internal usage of UTF-16 encoding. If there are 10 characters String, it can easily consume 60 bytes.
  • Common collection classes like HashMap and LinkedList make use of linked data structure, there we have “wrapper” object for every entry. This object has both header and pointer (8 bytes each) to the next object in the list.
  • Collections of primitive types often store them as “boxed objects”. For example, java.lang.Integer.
c) Spark Data Structure Tuning :
By avoiding the Java features that add overhead we can reduce the memory consumption. There are several ways to achieve this:
  • Avoid the nested structure with lots of small objects and pointers.
  • Instead of using strings for keys, use numeric IDs or enumerated objects.
  • If the RAM size is less than 32 GB, set JVM flag to –xx:+UseCompressedOops to make a pointer to four bytes instead of eight.

d) Spark Garbage Collection Tuning : 
In garbage collection, tuning in Apache Spark, the first step is to gather statistics on how frequently garbage collection occurs. It also gathers the amount of time spent in garbage collection. Thus, can be achieved by adding -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps to Java option. The next time when Spark job run, a message will display in workers log whenever garbage collection occurs. These logs will be in worker node, not on drivers program.
Java heap space divides into two regions Young and Old. The young generation holds short-lived objects while Old generation holds objects with longer life. The garbage collection tuning aims at, long-lived RDDs in the old generation. It also aims at the size of a young generation which is enough to store short-lived objects. With this, we can avoid full garbage collection to gather temporary object created during task execution. Some steps that may help to achieve this are:
  • If full garbage collection is invoked several times before a task is complete this ensures that there is not enough memory to execute the task.
  • In garbage collection statistics, if OldGen is near to full we can reduce the amount of memory used for caching. This can be achieved by lowering spark.memory.fraction. the better choice is to cache fewer objects than to slow down task execution. Or we can decrease the size of young generation i.e., lowering –Xmn.
The effect of Apache Spark garbage collection tuning depends on our application and amount of memory used.
e) Memory Management in Spark : 
We consider Spark memory management under two categories: execution and storage. The memory which is for computing in shuffles, Joins, aggregation is Execution memory. While the one for caching and propagating internal data in the cluster is storage memory. Both execution and storage share a unified region M. When the execution memory is not in use, the storage can use all the memory. The same case lies true for Storage memory. Execution can drive out the storage if necessary. This is done only until storage memory usage falls under certain threshold R.
We can get several properties by this design. First, the application can use entire space for execution if it does not use caching. While the applications that use caching can reserve a small storage (R), where data blocks are immune to evict.
Even though we have two relevant configurations, the users need not adjust them. Because default values are relevant to most workloads:
  • memory.fraction describes the size of M as a fraction of the (JVM heap space-300MB)(default 0.6). The remaining 40% is stored in user data structure, internal metadata in Spark and safeguarding against OOM error in case of Sparse and large records.
  • memory.storageFraction shows the size of R as the fraction of M (default 0.5).
f) Determining Memory Consumption in Spark : If we want to know the size of Spark memory consumption a dataset will require to create an RDD, put that RDD into the cache and look at “Storage” page in Web UI. This page will let us know the amount of memory RDD is occupying.
If we want to know the memory consumption of particular object, use SizeEstimator’S estimate method.
Apart from all above there are some more consideration for Spark Performance Tuning, such as:
g) Level of Parallelism : To use the full cluster the level of parallelism of each program should be high enough. According to the size of the file, Spark sets the number of “Map” task to run on each file. The level of parallelism can be passed as a second argument. We can set the config property spark.default.parallelism to change the default.

h) Memory Usage of Reduce Task in Spark : Although RDDs fit in our memory many times we come across a problem of OutOfMemoryError. This is because the working set of our task say groupByKey is too large. We can fix this by increasing the level of parallelism so that each task’s input set is small. We can increase the number of cores in our cluster because Spark reuses one executor JVM across many tasks and has low task launching cost.
i) Broadcasting Large Variables : The size of each serialized task reduces by using broadcast functionality in SparkContext. If a task uses a large object from driver program inside of them, turn it into the broadcast variable. Generally, it considers the tasks that are about 20 Kb for optimization.
j) Data Locality in Apache Spark : 
Data locality plays an important role in the performance of Spark Jobs. The case in which the data and code that operates on that data are together, the computation is faster. But if the two are separate, then either the code should be moved to data or vice versa. It is faster to move serialized code from place to place then the chunk of data because the size of the code is smaller than the data.
Based on data current location there are various levels of locality. The order from closest to farthest is:
  • The best possible locality is that the PROCESS_LOCAL resides in same JVM as the running code.
  • NODE_LOCAL resides on the same node in this. It is because the data travel between processes is quite slower than PROCESS_LOCAL.
  • There is no locality preference in NO_PREF data is accessible from anywhere.
  • RACK_LOCAL data is on the same rack of the server. Since the data is on the same rack but on the different server, so it sends the data in the network, through a single switch.
  • ANY data resides somewhere else in the network and not in the same rack.

So finally we can say that to do Spark Performance Tuning, increase the performance of the system performance tuning plays the vital role. Serializing the data plays an important role in tuning the system. Spark employs a number of optimization techniques to cut the processing time. Thus, Performance Tuning guarantees the better performance of the system. 
Hope you like this article about Spark Performance Tuning and it was helpful... like and share this post and please leave a comment if you have any queries.


Thank You!
  






Comments

Popular posts from this blog

Transformations and Actions in Spark

How to convert XML String to JSON String in Spark-Scala

How to Convert a Spark DataFrame to Map in Scala