How to Convert a Spark DataFrame String Type Column to Array Type and Split all the json files of this column into rows : Part - 1

Hi Friends,

In this post, I'd like to explore a project scenario of json data.

Suppose, We are getting a DataFrame from Source which has a column ArrayOfJsonStrings, which is actually an Array of Json files/data, but Data Type of this Column is String.
We need to Split All the json files of this ArrayOfJsonStrings column into possible number of rows.

Below is the Input and Output DataFrames :

Input DataFrame :

Output DataFrame :

Below is the code for the same with explained steps :

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

object SplitArrayOfJsonsToRows {

  def main(args: Array[String]) {

    //Creating SparkSession
    lazy val conf = new SparkConf().setAppName("split-array-of-json-to-row").set("spark.default.parallelism", "1")
      .setIfMissing("spark.master", "local[*]")
    lazy val sparkSession = SparkSession.builder().config(conf).getOrCreate()

    //Creating raw DataFrame which has a column (ArrayOfJsonStrings) of Array of Jsons, which Data Type is string and needs to split into possible number of Rows.

    val rawDF = sparkSession.sql(""" select string ("1") as id """).withColumn("ArrayOfJsonStrings", lit("""[{"First":{"Info":"ABCD123","Res":"1.0"}},{"Second":{"Info":"EFGH456","Res":"2.0"}},{"Third":{"Info":"IJKL789","Res":"3.0"}}]"""))
    rawDF.show(false)

    //printing Schema to show it's string type
    rawDF.printSchema()


    //In above DataFrame column json String looks like Array due to [...] but Actually it's a part of String
    //When We will convert this column to Array Type, Automatically it will be in form of [...] and then we will get jsons data in [[...]]
    //So, to get correct Array of jsons, Before Converting the Data Type, We will remove the first [ and last ] from json String.
    //And Dropping original column : ArrayOfJsonStrings

    val rmvFirstLastArrChar = rawDF.withColumn("transformedJsonString", expr("substring(ArrayOfJsonStrings, 2, length(ArrayOfJsonStrings))"))
      .withColumn("transformedJsonString", expr("substring(transformedJsonString, 1, length(transformedJsonString)-1)")).drop("ArrayOfJsonStrings")

    rmvFirstLastArrChar.show(false)
    rmvFirstLastArrChar.printSchema()



    //Now, We need to convert this above String Type column to Array Type to Explode it
    //But we need Separator for the same. Here we have , as separator but we can't use it as , can be part of json and resulted output can be wrong.
    //So first I'm replacing Jsons separator , to ,,, to make it unique to Convert to Array Type.

    //Creating a function to replace , with ,,, to make it unique with help of pattern },{

    val changeSeparator = (str: String) => {
      if (str == null) null else str.replaceAll("},\\{", "},,,{")
    }

    //Creating UDF for the above function to apply on DataFrame column to get the expected result
    val separatorUdf = udf(changeSeparator)


    //Applying above udf to replace , with ,,, to convert String Data Type to Array Type.
    // And Dropping the older original column transformedJsonString

    val replaceSeparatorDF = rmvFirstLastArrChar.withColumn("chgSepJsonString", separatorUdf(rmvFirstLastArrChar("transformedJsonString"))).drop("transformedJsonString")
    replaceSeparatorDF.show(false)
    replaceSeparatorDF.printSchema()


    //Creating a UDF : (strToArr) to convert Data Type from String to Array based upon separator ,,,
    val strToArr = udf((value: String) => value.split(",,,"))

    //Applying above UDF : strToArr on the DataFrame replaceSeparatorDF's chgSepJsonString Column to change the Data Type to Array Type
    //And Dropping original chgSepJsonString Column which Data Type is String

    val strJsonToArrOfJson = replaceSeparatorDF.withColumn("ArrayOfJson", strToArr(replaceSeparatorDF("chgSepJsonString"))).drop("chgSepJsonString")

    strJsonToArrOfJson.show(false)

    //printing Schema to show it's array type
    strJsonToArrOfJson.printSchema()


    //Now After change the DataType to Array, We can explode the json data column and split to multiple possible number of rows like below :
    //And Dropping original Column : ArrayOfJson

    val splittedJsonDF = strJsonToArrOfJson.withColumn("splittedJson", explode(strJsonToArrOfJson.col("ArrayOfJson"))).drop("ArrayOfJson")
    
    splittedJsonDF.show(false)
    
    splittedJsonDF.printSchema()

  }

}


Output for All the coding Steps till final output :


 










I hope, this post was helpful, please do like, comment and share.

Thank You!

Comments

Popular posts from this blog

Transformations and Actions in Spark

How to Convert a Spark DataFrame to Map in Scala

How to Handle and Convert DateTime format in Spark-Scala.