How to convert Nested JSON Keys and Values into two columns : Attribute_Name and Attribute_Value
Hi Friends,
Today I'd like to explain a json use case where, I want to get all json key/value in separate columns and Nested Json key/value as two seperated columns as attribute_name and attribute_value :
For Example, below is the input json :
{"file_name":"test_20200202122754.json","object_class":"Monitor","object_class_instance":"Monitor","relation_tree":"Source~>HD_Info~>Monitor","Monitor":{"Index":"0","Vendor_Data":"58F5Y","Monitor_Type":"Lenovo Monitor","HInfoID":"650FEC74"}}
Expected Output :
Below is the Code with Steps to Achieve the same output :
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
object ConvertJsonToKeyValueColumns extends App {
//Creating SparkSession
lazy val conf = new SparkConf().setAppName("json-to-key-value").set("spark.default.parallelism", "2")
.setIfMissing("spark.master", "local[*]")
lazy val sparkSession = SparkSession.builder().config(conf).getOrCreate()
lazy val sparkContext: SparkContext = sparkSession.sparkContext
import sparkSession.implicits._
//Raw Json Data to test
val jsonStr = """{"file_name":"test_20200202122754.json","object_class":"Monitor","object_class_instance":"Monitor","relation_tree":"Source~>HD_Info~>Monitor","Monitor":{"Index":"0","Vendor_Data":"58F5Y","Monitor_Type":"Lenovo Monitor","HInfoID":"650FEC74"}}""".stripMargin
//Loading the Json Data to create a DataFrame
val jsonDF = sparkSession.read.json(Seq(jsonStr).toDS)
jsonDF.show(false)
//We need to pivot the columns to become rows, for this I usually define a helper function getKeyValue:
def getKeyValue (columnsToTranspose: Array[String]) = explode(array(columnsToTranspose.map {
c => struct(lit(c).alias("key"), col(c).alias("value"))
}: _*))
//Selecting all the Schema from the Column which needs to convert into Attribute_Key and Attribute_Value
val pivotColumns = jsonDF.select($"Monitor.*").schema.names
//Get the Final Expected Output using above getKeyValue function
val finalDF = jsonDF.select( "file_name", "object_class", "object_class_instance", "relation_tree", "Monitor.*")
.withColumn("getKeyValue", getKeyValue(pivotColumns)).select( $"file_name", $"object_class", $"object_class_instance", $"relation_tree",
$"getKeyValue.key" as "attribute_name", $"getKeyValue.value" as "attribute_value")
//Printing the Output
finalDF.show(false)
}
Output :
Another Similar small Example :
Input Json : {"name" : "Anamika","value":{"1":"value1","2":"value2","3":"value3"}}
Expected Output :
Below is the Code for the same :
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
object ConvertJsonToKeyValueColumns extends App {
//Creating SparkSession
lazy val conf = new SparkConf().setAppName("json-to-key-value").set("spark.default.parallelism", "2")
.setIfMissing("spark.master", "local[*]")
lazy val sparkSession = SparkSession.builder().config(conf).getOrCreate()
lazy val sparkContext: SparkContext = sparkSession.sparkContext
import sparkSession.implicits._
//Raw Json Data to test
val jsonStr = """{"name" : "Anamika","value":{"1":"value1","2":"value2","3":"value3"}}"""
//Loading the Json Data to create a DataFrame
val jsonDF = sparkSession.read.json(Seq(jsonStr).toDS)
jsonDF.select($"name", $"value.*").show(false)
//We need to pivot the columns to become rows, for this I usually define a helper function getKeyValue:
def getKeyValue (columnsToTranspose: Array[String]) = explode(array(columnsToTranspose.map {
c => struct(lit(c).alias("key"), col(c).alias("value"))
}: _*))
//Then we need to create an array fo the desired columns:
val pivotCols = Array("1", "2", "3")
//And finally need to apply the function to the previous DataFrame:
val finalDF1 = jsonDF.select($"name", $"value.*")
.withColumn("getKeyValue", getKeyValue(pivotCols))
.select($"name", $"getKeyValue.key" as "key", $"getKeyValue.value" as "value")
finalDF1.show(false)
// Same Output Can be achieved by below steps too in case we don't wanna manually specify the columns to pivot, we can use an intermediate DataFrame as follows:
val intermediateDF = jsonDF.select($"name", $"value.*")
val finalDF2 = intermediateDF.withColumn("getKeyValue", getKeyValue(intermediateDF.columns.tail))
.select($"name", $"getKeyValue.key" as "key", $"getKeyValue.value" as "value")
finalDF2.show(false)
}
Output :
Today I'd like to explain a json use case where, I want to get all json key/value in separate columns and Nested Json key/value as two seperated columns as attribute_name and attribute_value :
For Example, below is the input json :
{"file_name":"test_20200202122754.json","object_class":"Monitor","object_class_instance":"Monitor","relation_tree":"Source~>HD_Info~>Monitor","Monitor":{"Index":"0","Vendor_Data":"58F5Y","Monitor_Type":"Lenovo Monitor","HInfoID":"650FEC74"}}
Expected Output :
Below is the Code with Steps to Achieve the same output :
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
object ConvertJsonToKeyValueColumns extends App {
//Creating SparkSession
lazy val conf = new SparkConf().setAppName("json-to-key-value").set("spark.default.parallelism", "2")
.setIfMissing("spark.master", "local[*]")
lazy val sparkSession = SparkSession.builder().config(conf).getOrCreate()
lazy val sparkContext: SparkContext = sparkSession.sparkContext
import sparkSession.implicits._
//Raw Json Data to test
val jsonStr = """{"file_name":"test_20200202122754.json","object_class":"Monitor","object_class_instance":"Monitor","relation_tree":"Source~>HD_Info~>Monitor","Monitor":{"Index":"0","Vendor_Data":"58F5Y","Monitor_Type":"Lenovo Monitor","HInfoID":"650FEC74"}}""".stripMargin
//Loading the Json Data to create a DataFrame
val jsonDF = sparkSession.read.json(Seq(jsonStr).toDS)
jsonDF.show(false)
//We need to pivot the columns to become rows, for this I usually define a helper function getKeyValue:
def getKeyValue (columnsToTranspose: Array[String]) = explode(array(columnsToTranspose.map {
c => struct(lit(c).alias("key"), col(c).alias("value"))
}: _*))
//Selecting all the Schema from the Column which needs to convert into Attribute_Key and Attribute_Value
val pivotColumns = jsonDF.select($"Monitor.*").schema.names
//Get the Final Expected Output using above getKeyValue function
val finalDF = jsonDF.select( "file_name", "object_class", "object_class_instance", "relation_tree", "Monitor.*")
.withColumn("getKeyValue", getKeyValue(pivotColumns)).select( $"file_name", $"object_class", $"object_class_instance", $"relation_tree",
$"getKeyValue.key" as "attribute_name", $"getKeyValue.value" as "attribute_value")
//Printing the Output
finalDF.show(false)
}
Output :
Another Similar small Example :
Input Json : {"name" : "Anamika","value":{"1":"value1","2":"value2","3":"value3"}}
Expected Output :
Below is the Code for the same :
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
object ConvertJsonToKeyValueColumns extends App {
//Creating SparkSession
lazy val conf = new SparkConf().setAppName("json-to-key-value").set("spark.default.parallelism", "2")
.setIfMissing("spark.master", "local[*]")
lazy val sparkSession = SparkSession.builder().config(conf).getOrCreate()
lazy val sparkContext: SparkContext = sparkSession.sparkContext
import sparkSession.implicits._
//Raw Json Data to test
val jsonStr = """{"name" : "Anamika","value":{"1":"value1","2":"value2","3":"value3"}}"""
//Loading the Json Data to create a DataFrame
val jsonDF = sparkSession.read.json(Seq(jsonStr).toDS)
jsonDF.select($"name", $"value.*").show(false)
//We need to pivot the columns to become rows, for this I usually define a helper function getKeyValue:
def getKeyValue (columnsToTranspose: Array[String]) = explode(array(columnsToTranspose.map {
c => struct(lit(c).alias("key"), col(c).alias("value"))
}: _*))
//Then we need to create an array fo the desired columns:
val pivotCols = Array("1", "2", "3")
//And finally need to apply the function to the previous DataFrame:
val finalDF1 = jsonDF.select($"name", $"value.*")
.withColumn("getKeyValue", getKeyValue(pivotCols))
.select($"name", $"getKeyValue.key" as "key", $"getKeyValue.value" as "value")
finalDF1.show(false)
// Same Output Can be achieved by below steps too in case we don't wanna manually specify the columns to pivot, we can use an intermediate DataFrame as follows:
val intermediateDF = jsonDF.select($"name", $"value.*")
val finalDF2 = intermediateDF.withColumn("getKeyValue", getKeyValue(intermediateDF.columns.tail))
.select($"name", $"getKeyValue.key" as "key", $"getKeyValue.value" as "value")
finalDF2.show(false)
}
I Hope, This Post was helpful, please do like, comment and share.
Thank You !
Thank You !
Comments
Post a Comment