A Sample use case for Spark SQL with solution in Scala
Use Case
We have a given Mobile log files from mobile sdks, where each line is in below format :
orgGpackageGmobileIdHnameH1
where
org is organization name
package is android application package name
mobileId is mobile unique identifier
name is application name corresponding to package
All above fields are always in lower case
Application name for any given package is not unique, Example we may have the following 2 entries for the same package
airtelGcom.whatsappG1Hwhatsapp messengerH1
airtelGcom.whatsappG2Hwhatsapp indiaH1
Expected Output:
Output_1:
org G package G total count for the package
For above example :
airtelGcom.whatsappG3
Output_2:
org G package G list of app names aliases
For above example :
airtelGcom.whatsappGwhatsapp messenger, whatsapp india
Below is the Sample Data which I'll save as log.txt file to use.
airtelGcom.whatsappG1Hwhatsapp messengerH1
airtelGcom.whatsappG2Hwhatsapp indiaH1
airtelGcom.ubercabG1Huber indiaH1
airtelGcom.ubercabG2Huber indiaH1
bsnlGcom.whatsappG3HwhatsappH1
bsnlGcom.whatsappG4HwhatsappH1
bsnlGcom.ubercabG3Huber cab indiaH1
bsnlGcom.ubercabG4Huber indiaH1
============================ Solution ============================
We have a given Mobile log files from mobile sdks, where each line is in below format :
orgGpackageGmobileIdHnameH1
where
org is organization name
package is android application package name
mobileId is mobile unique identifier
name is application name corresponding to package
All above fields are always in lower case
Application name for any given package is not unique, Example we may have the following 2 entries for the same package
airtelGcom.whatsappG1Hwhatsapp messengerH1
airtelGcom.whatsappG2Hwhatsapp indiaH1
Expected Output:
Output_1:
org G package G total count for the package
For above example :
airtelGcom.whatsappG3
Output_2:
org G package G list of app names aliases
For above example :
airtelGcom.whatsappGwhatsapp messenger, whatsapp india
Below is the Sample Data which I'll save as log.txt file to use.
airtelGcom.whatsappG1Hwhatsapp messengerH1
airtelGcom.whatsappG2Hwhatsapp indiaH1
airtelGcom.ubercabG1Huber indiaH1
airtelGcom.ubercabG2Huber indiaH1
bsnlGcom.whatsappG3HwhatsappH1
bsnlGcom.whatsappG4HwhatsappH1
bsnlGcom.ubercabG3Huber cab indiaH1
bsnlGcom.ubercabG4Huber indiaH1
============================ Solution ============================
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
/**
* Created by anamika_singh on 08/02/2020.
**/
object MobileLog {
def main(args: Array[String]): Unit = {
//Creating SparkSession
val conf = new SparkConf().setAppName("MobileLog").setMaster("local")
implicit val spark = SparkSession.builder().config(conf).getOrCreate()
spark.sparkContext.setLogLevel("WARN")
//Reading Data from text file (log.txt) and converting in DataFrame
val textFile = spark.read.csv("C:/Users/anamika_singh/Desktop/log.txt")
val fileData = textFile//.toDF()
fileData.show(false)
//Managing DataFrame to get OUTPUT_1 ==> airtelGcom.whatsappG3
val dataDF1 = fileData.withColumn("split", split(col("_c0"), "G")).select(col("split")(0).as("org"), col("split")(1).as("package"), col("split")(2).as("num_pkg")).withColumn("id", monotonically_increasing_id())
//dataDF1.show(false)
val dataDF2 = dataDF1.withColumn("split", split(col("num_pkg"), "H")).select(col("split")(0).as("pkg_num")).withColumn("id", monotonically_increasing_id())
//dataDF2.show(false)
val dataDF3 = fileData.withColumn("split", split(col("_c0"), "H")).select(col("split")(1).as("app_name")).withColumn("id", monotonically_increasing_id())
//dataDF3.show(false)
//Creating temporary table to join above DataFrames to get expected DataFrame to produce given output
dataDF1.createOrReplaceTempView("df1")
dataDF2.createOrReplaceTempView("df2")
dataDF3.createOrReplaceTempView("df3")
//Joining DataFrames
val dataDF4 = spark.sql("select a.org, a.package, b.pkg_num, c.app_name from df1 a join df2 b on a.id=b.id join df3 c on b.id=c.id")
//dataDF4.show(false)
//Creating temporary table upon joined Dataframe with CAST the columns pkg_num as int
dataDF4.createOrReplaceTempView("df4")
val dataDF5 = spark.sql("select org, package, SUM(cast(pkg_num as int)) as pkg_num from df4 group by org, package")
//dataDF5.show(false)
//Final Dataframe to get expected Output with column name OUTPUT_1
val dataDF6 = dataDF5.select(concat_ws("G", col("org"), col("package"), col("pkg_num"))).withColumnRenamed("concat_ws(G, org, package, pkg_num)", "OUTPUT_1")
dataDF6.show(false) //airtelGcom.whatsappG3
//Dataframe to get expected Output by a different way with column name output_1
//dataDF5.withColumn("output_1",struct(dataDF5.columns.head, dataDF5.columns.tail: _*)).show(false) //[airtel, com.whatsapp, 3]
//Managing DataFrame to get OUTPUT_2 ==> airtelGcom.whatsappGwhatsapp messenger, whatsapp india
//Taking Column's value from dataDF4's temporary table df4
val DF1 = dataDF4.select(concat(col("org"), lit("G"), col("package"), lit("G")).as("org_package")).withColumn("id", monotonically_increasing_id()).createTempView("temp1")
val DF2 = spark.sql("select app_name from df4").withColumn("id", monotonically_increasing_id()).createOrReplaceTempView("temp2")
//Joining above two DataFrame to get expected columns org_package and app_name to produce expected output
val DF3 = spark.sql("select a.org_package, b.app_name from temp1 a join temp2 b on a.id = b.id")
//DF3.show()
//Creating temporary table to get expected output with column name OUTPUT_2
DF3.createOrReplaceTempView("temp3")
val DF4 = spark.sql("select org_package, concat_ws(',' ,collect_set(app_name)) as app_name from temp3 group by org_package ").createOrReplaceTempView("temp4")
//Producing final output with column name OUTPUT_2
val DF5 = spark.sql("select CONCAT(org_package,app_name) from temp4")
val DF6 = DF5.withColumnRenamed("concat(org_package, app_name)", "OUTPUT_2")
DF6.show(false)
}
}
======================== Expected Output ========================
+---------------------+
|OUTPUT_1 |
+---------------------+
|bsnlGcom.whatsappG7 |
|airtelGcom.whatsappG3|
|bsnlGcom.ubercabG7 |
|airtelGcom.ubercabG3 |
+---------------------+
+-----------------------------------------------------+
|OUTPUT_2 |
+-----------------------------------------------------+
|airtelGcom.whatsappGwhatsapp messenger,whatsapp india|
|airtelGcom.ubercabGuber india |
|bsnlGcom.whatsappGwhatsapp |
|bsnlGcom.ubercabGuber cab india,uber india |
+-----------------------------------------------------+
Comments
Post a Comment