How to convert Nested JSON Keys and Values into two columns : Attribute_Name and Attribute_Value

Hi Friends,

Today I'd like to explain a json use case where, I want to get all json key/value in separate columns and Nested Json key/value as two seperated columns as attribute_name and attribute_value :

For Example, below is the input json :

{"file_name":"test_20200202122754.json","object_class":"Monitor","object_class_instance":"Monitor","relation_tree":"Source~>HD_Info~>Monitor","Monitor":{"Index":"0","Vendor_Data":"58F5Y","Monitor_Type":"Lenovo Monitor","HInfoID":"650FEC74"}}

Expected Output :


Below is the Code with Steps to Achieve the same output :

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._


object ConvertJsonToKeyValueColumns extends App {

  //Creating SparkSession
  lazy val conf = new SparkConf().setAppName("json-to-key-value").set("spark.default.parallelism", "2")
    .setIfMissing("spark.master", "local[*]")
  lazy val sparkSession = SparkSession.builder().config(conf).getOrCreate()
  lazy val sparkContext: SparkContext = sparkSession.sparkContext
  import sparkSession.implicits._


  //Raw Json Data to test
  val jsonStr = """{"file_name":"test_20200202122754.json","object_class":"Monitor","object_class_instance":"Monitor","relation_tree":"Source~>HD_Info~>Monitor","Monitor":{"Index":"0","Vendor_Data":"58F5Y","Monitor_Type":"Lenovo Monitor","HInfoID":"650FEC74"}}""".stripMargin

  //Loading the Json Data to create a DataFrame
  val jsonDF = sparkSession.read.json(Seq(jsonStr).toDS)
    jsonDF.show(false)


  //We need to pivot the columns to become rows, for this I usually define a helper function getKeyValue:
  def getKeyValue (columnsToTranspose: Array[String]) = explode(array(columnsToTranspose.map {
    c => struct(lit(c).alias("key"), col(c).alias("value"))
  }: _*))

  //Selecting all the Schema from the Column which needs to convert into Attribute_Key and Attribute_Value
  val pivotColumns = jsonDF.select($"Monitor.*").schema.names

  //Get the Final Expected Output using above getKeyValue function
  val finalDF = jsonDF.select( "file_name", "object_class", "object_class_instance", "relation_tree", "Monitor.*")
  .withColumn("getKeyValue", getKeyValue(pivotColumns)).select( $"file_name", $"object_class", $"object_class_instance", $"relation_tree",
    $"getKeyValue.key" as "attribute_name", $"getKeyValue.value" as "attribute_value")

//Printing the Output
    finalDF.show(false)


}

Output :


Another Similar small Example :

Input Json :  {"name" : "Anamika","value":{"1":"value1","2":"value2","3":"value3"}}

Expected Output :




Below is the Code for the same :

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

object ConvertJsonToKeyValueColumns extends App {

  //Creating SparkSession
  lazy val conf = new SparkConf().setAppName("json-to-key-value").set("spark.default.parallelism", "2")
    .setIfMissing("spark.master", "local[*]")
  lazy val sparkSession = SparkSession.builder().config(conf).getOrCreate()
  lazy val sparkContext: SparkContext = sparkSession.sparkContext
  import sparkSession.implicits._

  //Raw Json Data to test
  val jsonStr = """{"name" : "Anamika","value":{"1":"value1","2":"value2","3":"value3"}}"""

  //Loading the Json Data to create a DataFrame
  val jsonDF = sparkSession.read.json(Seq(jsonStr).toDS)

  jsonDF.select($"name", $"value.*").show(false)

  //We need to pivot the columns to become rows, for this I usually define a helper function getKeyValue:
  def getKeyValue (columnsToTranspose: Array[String]) = explode(array(columnsToTranspose.map {
    c => struct(lit(c).alias("key"), col(c).alias("value"))
  }: _*))


  //Then we need to create an array fo the desired columns:
  val pivotCols = Array("1", "2", "3")

  //And finally need to apply the function to the previous DataFrame:
   val finalDF1 = jsonDF.select($"name", $"value.*")
  .withColumn("getKeyValue", getKeyValue(pivotCols))
    .select($"name", $"getKeyValue.key" as "key", $"getKeyValue.value" as "value")

  finalDF1.show(false)

  // Same Output Can be achieved by below steps too in case we don't wanna manually specify the columns to pivot, we can use an intermediate DataFrame as follows:

  val intermediateDF = jsonDF.select($"name", $"value.*")

  val finalDF2 = intermediateDF.withColumn("getKeyValue", getKeyValue(intermediateDF.columns.tail))
    .select($"name", $"getKeyValue.key" as "key", $"getKeyValue.value" as "value")

  finalDF2.show(false)


}

Output :



I Hope, This Post was helpful, please do like, comment and share.
Thank You !

Comments

Popular posts from this blog

Transformations and Actions in Spark

How to Convert a Spark DataFrame to Map in Scala

How to Handle and Convert DateTime format in Spark-Scala.