A Sample use case for Spark SQL with solution in Scala

Use Case

We have a given Mobile log files from mobile sdks, where each line is in below format :

orgGpackageGmobileIdHnameH1

where

org is organization name
package is android application package name
mobileId is mobile unique identifier
name is application name corresponding to package

All above fields are always in lower case

Application name for any given package is not unique, Example we may have the following 2 entries for the same package 

airtelGcom.whatsappG1Hwhatsapp messengerH1
airtelGcom.whatsappG2Hwhatsapp indiaH1

Expected Output:

Output_1:

org G package G total count for the package

For above example :

airtelGcom.whatsappG3


Output_2:

org G package G list of app names aliases

For above example :

airtelGcom.whatsappGwhatsapp messenger, whatsapp india

Below is the Sample Data which I'll save as log.txt file to use.

airtelGcom.whatsappG1Hwhatsapp messengerH1
airtelGcom.whatsappG2Hwhatsapp indiaH1
airtelGcom.ubercabG1Huber indiaH1
airtelGcom.ubercabG2Huber indiaH1
bsnlGcom.whatsappG3HwhatsappH1
bsnlGcom.whatsappG4HwhatsappH1
bsnlGcom.ubercabG3Huber cab indiaH1
bsnlGcom.ubercabG4Huber indiaH1

============================ Solution ============================
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

/**  
* Created by anamika_singh on 08/02/2020.  
**/

object MobileLog {

  def main(args: Array[String]): Unit = {

    //Creating SparkSession    
    val conf = new SparkConf().setAppName("MobileLog").setMaster("local")
    implicit val spark = SparkSession.builder().config(conf).getOrCreate()

    spark.sparkContext.setLogLevel("WARN")

    //Reading Data from text file (log.txt) and converting in DataFrame    
    val textFile = spark.read.csv("C:/Users/anamika_singh/Desktop/log.txt")
    val fileData = textFile//.toDF()    
    fileData.show(false)

    //Managing DataFrame to get OUTPUT_1 ==> airtelGcom.whatsappG3    
    val dataDF1 = fileData.withColumn("split", split(col("_c0"), "G")).select(col("split")(0).as("org"), col("split")(1).as("package"), col("split")(2).as("num_pkg")).withColumn("id", monotonically_increasing_id())
    //dataDF1.show(false)    
    val dataDF2 = dataDF1.withColumn("split", split(col("num_pkg"), "H")).select(col("split")(0).as("pkg_num")).withColumn("id", monotonically_increasing_id())
    //dataDF2.show(false)    
    val dataDF3 = fileData.withColumn("split", split(col("_c0"), "H")).select(col("split")(1).as("app_name")).withColumn("id", monotonically_increasing_id())
    //dataDF3.show(false)    
    
    //Creating temporary table to join above DataFrames to get expected DataFrame to produce given output    
    dataDF1.createOrReplaceTempView("df1")
    dataDF2.createOrReplaceTempView("df2")
    dataDF3.createOrReplaceTempView("df3")

    //Joining DataFrames    
    val dataDF4 = spark.sql("select a.org, a.package, b.pkg_num, c.app_name from df1 a join df2 b on a.id=b.id join df3 c on b.id=c.id")
    //dataDF4.show(false)    
    //Creating temporary table upon joined Dataframe with CAST the columns pkg_num as int    
    dataDF4.createOrReplaceTempView("df4")

    val dataDF5 = spark.sql("select org, package, SUM(cast(pkg_num as int)) as pkg_num from df4 group by org, package")
    //dataDF5.show(false)    
   //Final Dataframe to get expected Output with column name OUTPUT_1    
    val dataDF6 = dataDF5.select(concat_ws("G", col("org"), col("package"), col("pkg_num"))).withColumnRenamed("concat_ws(G, org, package, pkg_num)", "OUTPUT_1")
    dataDF6.show(false)    //airtelGcom.whatsappG3
    
    //Dataframe to get expected Output by a different way with column name output_1    
    //dataDF5.withColumn("output_1",struct(dataDF5.columns.head, dataDF5.columns.tail: _*)).show(false) //[airtel, com.whatsapp, 3]
    
    //Managing DataFrame to get OUTPUT_2 ==> airtelGcom.whatsappGwhatsapp messenger, whatsapp india    
    //Taking Column's value from dataDF4's temporary table df4    
    val DF1 = dataDF4.select(concat(col("org"), lit("G"), col("package"), lit("G")).as("org_package")).withColumn("id", monotonically_increasing_id()).createTempView("temp1")
    val DF2 = spark.sql("select app_name from df4").withColumn("id", monotonically_increasing_id()).createOrReplaceTempView("temp2")
    
    //Joining above two DataFrame to get expected columns org_package and app_name to produce expected output    
    val DF3 = spark.sql("select a.org_package, b.app_name from temp1 a join temp2 b on a.id = b.id")
    //DF3.show()    
    //Creating temporary table to get expected output with column name OUTPUT_2    
    DF3.createOrReplaceTempView("temp3")
    val DF4 = spark.sql("select org_package, concat_ws(',' ,collect_set(app_name)) as app_name from temp3 group by org_package ").createOrReplaceTempView("temp4")

    //Producing final output with column name OUTPUT_2    
    val DF5 = spark.sql("select CONCAT(org_package,app_name) from temp4")
    val DF6 = DF5.withColumnRenamed("concat(org_package, app_name)", "OUTPUT_2")
    DF6.show(false)

  }
}

======================== Expected Output ========================
+---------------------+
|OUTPUT_1             |
+---------------------+
|bsnlGcom.whatsappG7  |
|airtelGcom.whatsappG3|
|bsnlGcom.ubercabG7   |
|airtelGcom.ubercabG3 |
+---------------------+
+-----------------------------------------------------+
|OUTPUT_2                                             |
+-----------------------------------------------------+
|airtelGcom.whatsappGwhatsapp messenger,whatsapp india|
|airtelGcom.ubercabGuber india                        |
|bsnlGcom.whatsappGwhatsapp                           |
|bsnlGcom.ubercabGuber cab india,uber india           |
+-----------------------------------------------------+


Comments

Popular posts from this blog

Transformations and Actions in Spark

How to Convert a Spark DataFrame to Map in Scala

How to Handle and Convert DateTime format in Spark-Scala.