How to convert Nested JSON Keys and Values into two columns : Attribute_Name and Attribute_Value

Hi Friends,

Today I'd like to explain a json use case where, I want to get all json key/value in separate columns and Nested Json key/value as two seperated columns as attribute_name and attribute_value :

For Example, below is the input json :

{"file_name":"test_20200202122754.json","object_class":"Monitor","object_class_instance":"Monitor","relation_tree":"Source~>HD_Info~>Monitor","Monitor":{"Index":"0","Vendor_Data":"58F5Y","Monitor_Type":"Lenovo Monitor","HInfoID":"650FEC74"}}

Expected Output :


Below is the Code with Steps to Achieve the same output :

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._


object ConvertJsonToKeyValueColumns extends App {

  //Creating SparkSession
  lazy val conf = new SparkConf().setAppName("json-to-key-value").set("spark.default.parallelism", "2")
    .setIfMissing("spark.master", "local[*]")
  lazy val sparkSession = SparkSession.builder().config(conf).getOrCreate()
  lazy val sparkContext: SparkContext = sparkSession.sparkContext
  import sparkSession.implicits._


  //Raw Json Data to test
  val jsonStr = """{"file_name":"test_20200202122754.json","object_class":"Monitor","object_class_instance":"Monitor","relation_tree":"Source~>HD_Info~>Monitor","Monitor":{"Index":"0","Vendor_Data":"58F5Y","Monitor_Type":"Lenovo Monitor","HInfoID":"650FEC74"}}""".stripMargin

  //Loading the Json Data to create a DataFrame
  val jsonDF = sparkSession.read.json(Seq(jsonStr).toDS)
    jsonDF.show(false)


  //We need to pivot the columns to become rows, for this I usually define a helper function getKeyValue:
  def getKeyValue (columnsToTranspose: Array[String]) = explode(array(columnsToTranspose.map {
    c => struct(lit(c).alias("key"), col(c).alias("value"))
  }: _*))

  //Selecting all the Schema from the Column which needs to convert into Attribute_Key and Attribute_Value
  val pivotColumns = jsonDF.select($"Monitor.*").schema.names

  //Get the Final Expected Output using above getKeyValue function
  val finalDF = jsonDF.select( "file_name", "object_class", "object_class_instance", "relation_tree", "Monitor.*")
  .withColumn("getKeyValue", getKeyValue(pivotColumns)).select( $"file_name", $"object_class", $"object_class_instance", $"relation_tree",
    $"getKeyValue.key" as "attribute_name", $"getKeyValue.value" as "attribute_value")

//Printing the Output
    finalDF.show(false)


}

Output :


Another Similar small Example :

Input Json :  {"name" : "Anamika","value":{"1":"value1","2":"value2","3":"value3"}}

Expected Output :




Below is the Code for the same :

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

object ConvertJsonToKeyValueColumns extends App {

  //Creating SparkSession
  lazy val conf = new SparkConf().setAppName("json-to-key-value").set("spark.default.parallelism", "2")
    .setIfMissing("spark.master", "local[*]")
  lazy val sparkSession = SparkSession.builder().config(conf).getOrCreate()
  lazy val sparkContext: SparkContext = sparkSession.sparkContext
  import sparkSession.implicits._

  //Raw Json Data to test
  val jsonStr = """{"name" : "Anamika","value":{"1":"value1","2":"value2","3":"value3"}}"""

  //Loading the Json Data to create a DataFrame
  val jsonDF = sparkSession.read.json(Seq(jsonStr).toDS)

  jsonDF.select($"name", $"value.*").show(false)

  //We need to pivot the columns to become rows, for this I usually define a helper function getKeyValue:
  def getKeyValue (columnsToTranspose: Array[String]) = explode(array(columnsToTranspose.map {
    c => struct(lit(c).alias("key"), col(c).alias("value"))
  }: _*))


  //Then we need to create an array fo the desired columns:
  val pivotCols = Array("1", "2", "3")

  //And finally need to apply the function to the previous DataFrame:
   val finalDF1 = jsonDF.select($"name", $"value.*")
  .withColumn("getKeyValue", getKeyValue(pivotCols))
    .select($"name", $"getKeyValue.key" as "key", $"getKeyValue.value" as "value")

  finalDF1.show(false)

  // Same Output Can be achieved by below steps too in case we don't wanna manually specify the columns to pivot, we can use an intermediate DataFrame as follows:

  val intermediateDF = jsonDF.select($"name", $"value.*")

  val finalDF2 = intermediateDF.withColumn("getKeyValue", getKeyValue(intermediateDF.columns.tail))
    .select($"name", $"getKeyValue.key" as "key", $"getKeyValue.value" as "value")

  finalDF2.show(false)


}

Output :



I Hope, This Post was helpful, please do like, comment and share.
Thank You !

Comments

Popular posts from this blog

Knowledge about Apache Sqoop and its all basic commands to import and export the Data

Transformations and Actions in Spark

Data Lake, Data Warehouse, Data Mart, and Delta Lake