How to Create a Map Column with different type of key from a Spark DataFrame in Scala
Hi Friends,
In this post, I'd like to explore a small use case of convert/create Spark DataFrame to Map.
Apparently the precondition is, we must have all the columns with same type otherwise we will get spark error.
1. Create a Map column in existed DataFrame, Map key should be column Name, Map Value should be column value.
Example output with created new column created_map :
2. Create a Map column in existed DataFrame, for All Map values, key should be column Name and for one Map Value, key should be another column value instead of column name.
Example output with created new column created_map_key_from_value :
Example output with created new column created_map_key_from_another_column_value :
Below is the code with output for above each use cases :
1. Create a Map column in existed DataFrame, Map key should be column Name, Map Value should be column value.
import org.apache.spark.SparkConf
import org.apache.spark.sql.{Column, SparkSession}
import org.apache.spark.sql.functions._
object DFToMap extends App {
//Creating SparkSession
lazy val conf = new SparkConf().setAppName("df-to-map").set("spark.default.parallelism", "2")
.setIfMissing("spark.master", "local[*]")
lazy val sparkSession = SparkSession.builder().config(conf).getOrCreate()
import sparkSession.implicits._
// Creating raw DataFrame
val rawDF = Seq(("Samsung", "L", "68", "Delhi", "India"), ("Sony", "XL", "100", "Tokyo", "Japan"))
.toDF("brand", "size", "sales", "city", "country")
//Prepare the map columns to make key and values as per need with help of counter value. Bit of nasty iteration work is required
var preCols: Column = null
var counter = 1
val size = rawDF.schema.fields.length
val mapColumns = rawDF.schema.flatMap { field =>
// Create condition for Map
val resultedMap = if (counter == size)
Seq(preCols, col(field.name))
else
Seq(lit(field.name), col(field.name))
//Assign the current field name for tracking and increment the counter by 0
preCols = col(field.name)
counter += 0
resultedMap
}
//Adding new map column in existed DataFrame
val dfWithMapColumn = rawDF.withColumn("created_map", map(mapColumns: _*))
//Print the DataFrame
dfWithMapColumn.show(false)
}
Output :
2. Create a Map column in existed DataFrame, for All Map values, key should be column Name and for one Map Value, key should be another column value instead of column name.
for this use case, in the same above code, we need to change the counter value as per length/number of columns to pick that for which map value we need another column's value as key instead of making column name as key.
import org.apache.spark.SparkConf
import org.apache.spark.sql.{Column, SparkSession}
import org.apache.spark.sql.functions._
object DFToMap extends App {
//Creating SparkSession
lazy val conf = new SparkConf().setAppName("df-to-map").set("spark.default.parallelism", "2")
.setIfMissing("spark.master", "local[*]")
lazy val sparkSession = SparkSession.builder().config(conf).getOrCreate()
import sparkSession.implicits._
// Creating raw DataFrame
val rawDF = Seq(("Samsung", "L", "68", "Delhi", "India"), ("Sony", "XL", "100", "Tokyo", "Japan"))
.toDF("brand", "size", "sales", "city", "country")
//Prepare the map columns to make key and values as per need with help of counter value. Bit of nasty iteration work is required
var preCols: Column = null
var counter = 1
val size = rawDF.schema.fields.length
val mapColumns = rawDF.schema.flatMap { field =>
// Create condition for Map
val resultedMap = if (counter == size)
Seq(preCols, col(field.name))
else
Seq(lit(field.name), col(field.name))
//Assign the current field name for tracking and increment the counter by 1
preCols = col(field.name)
counter += 1
resultedMap
}
//Adding new map column in existed DataFrame
val dfWithMapColumn = rawDF.withColumn("created_map_key_from_value", map(mapColumns: _*))
//Print the DataFrame
dfWithMapColumn.show(false)
}
Output :
Again, in the same above code, we can change the counter value as per need and length/number of columns to pick that for which another map value we need another column's value as key instead of making column name as key.
In this example, I am Starting counter value from 2 as I need sales column's value as key and city column's value as value.
import org.apache.spark.SparkConf
import org.apache.spark.sql.{Column, SparkSession}
import org.apache.spark.sql.functions._
object DFToMap extends App {
//Creating SparkSession
lazy val conf = new SparkConf().setAppName("df-to-map").set("spark.default.parallelism", "2")
.setIfMissing("spark.master", "local[*]")
lazy val sparkSession = SparkSession.builder().config(conf).getOrCreate()
import sparkSession.implicits._
// Creating raw DataFrame
val rawDF = Seq(("Samsung", "L", "68", "Delhi", "India"), ("Sony", "XL", "100", "Tokyo", "Japan"))
.toDF("brand", "size", "sales", "city", "country")
//Prepare the map columns to make key and values as per need with help of counter value. Bit of nasty iteration work is required
var preCols: Column = null
var counter = 2 // Starting counter value from 2 as I need sales column's value as key and city column's value as value.
val size = rawDF.schema.fields.length
val mapColumns = rawDF.schema.flatMap { field =>
// Create condition for Map
val resultedMap = if (counter == size)
Seq(preCols, col(field.name))
else
Seq(lit(field.name), col(field.name))
//Assign the current field name for tracking and increment the counter by 1
preCols = col(field.name)
counter += 1
resultedMap
}
//Adding new map column in existed DataFrame
val dfWithMapColumn = rawDF.withColumn("created_map_key_from_another_column_value", map(mapColumns: _*))
//Print the DataFrame
dfWithMapColumn.show(false)
}
In this post, I'd like to explore a small use case of convert/create Spark DataFrame to Map.
Apparently the precondition is, we must have all the columns with same type otherwise we will get spark error.
1. Create a Map column in existed DataFrame, Map key should be column Name, Map Value should be column value.
Example output with created new column created_map :
2. Create a Map column in existed DataFrame, for All Map values, key should be column Name and for one Map Value, key should be another column value instead of column name.
Example output with created new column created_map_key_from_value :
Example output with created new column created_map_key_from_another_column_value :
Below is the code with output for above each use cases :
1. Create a Map column in existed DataFrame, Map key should be column Name, Map Value should be column value.
import org.apache.spark.SparkConf
import org.apache.spark.sql.{Column, SparkSession}
import org.apache.spark.sql.functions._
object DFToMap extends App {
//Creating SparkSession
lazy val conf = new SparkConf().setAppName("df-to-map").set("spark.default.parallelism", "2")
.setIfMissing("spark.master", "local[*]")
lazy val sparkSession = SparkSession.builder().config(conf).getOrCreate()
import sparkSession.implicits._
// Creating raw DataFrame
val rawDF = Seq(("Samsung", "L", "68", "Delhi", "India"), ("Sony", "XL", "100", "Tokyo", "Japan"))
.toDF("brand", "size", "sales", "city", "country")
//Prepare the map columns to make key and values as per need with help of counter value. Bit of nasty iteration work is required
var preCols: Column = null
var counter = 1
val size = rawDF.schema.fields.length
val mapColumns = rawDF.schema.flatMap { field =>
// Create condition for Map
val resultedMap = if (counter == size)
Seq(preCols, col(field.name))
else
Seq(lit(field.name), col(field.name))
//Assign the current field name for tracking and increment the counter by 0
preCols = col(field.name)
counter += 0
resultedMap
}
//Adding new map column in existed DataFrame
val dfWithMapColumn = rawDF.withColumn("created_map", map(mapColumns: _*))
//Print the DataFrame
dfWithMapColumn.show(false)
}
2. Create a Map column in existed DataFrame, for All Map values, key should be column Name and for one Map Value, key should be another column value instead of column name.
for this use case, in the same above code, we need to change the counter value as per length/number of columns to pick that for which map value we need another column's value as key instead of making column name as key.
import org.apache.spark.SparkConf
import org.apache.spark.sql.{Column, SparkSession}
import org.apache.spark.sql.functions._
object DFToMap extends App {
//Creating SparkSession
lazy val conf = new SparkConf().setAppName("df-to-map").set("spark.default.parallelism", "2")
.setIfMissing("spark.master", "local[*]")
lazy val sparkSession = SparkSession.builder().config(conf).getOrCreate()
import sparkSession.implicits._
// Creating raw DataFrame
val rawDF = Seq(("Samsung", "L", "68", "Delhi", "India"), ("Sony", "XL", "100", "Tokyo", "Japan"))
.toDF("brand", "size", "sales", "city", "country")
//Prepare the map columns to make key and values as per need with help of counter value. Bit of nasty iteration work is required
var preCols: Column = null
var counter = 1
val size = rawDF.schema.fields.length
val mapColumns = rawDF.schema.flatMap { field =>
// Create condition for Map
val resultedMap = if (counter == size)
Seq(preCols, col(field.name))
else
Seq(lit(field.name), col(field.name))
//Assign the current field name for tracking and increment the counter by 1
preCols = col(field.name)
counter += 1
resultedMap
}
//Adding new map column in existed DataFrame
val dfWithMapColumn = rawDF.withColumn("created_map_key_from_value", map(mapColumns: _*))
//Print the DataFrame
dfWithMapColumn.show(false)
}
Output :
In this example, I am Starting counter value from 2 as I need sales column's value as key and city column's value as value.
import org.apache.spark.SparkConf
import org.apache.spark.sql.{Column, SparkSession}
import org.apache.spark.sql.functions._
object DFToMap extends App {
//Creating SparkSession
lazy val conf = new SparkConf().setAppName("df-to-map").set("spark.default.parallelism", "2")
.setIfMissing("spark.master", "local[*]")
lazy val sparkSession = SparkSession.builder().config(conf).getOrCreate()
import sparkSession.implicits._
// Creating raw DataFrame
val rawDF = Seq(("Samsung", "L", "68", "Delhi", "India"), ("Sony", "XL", "100", "Tokyo", "Japan"))
.toDF("brand", "size", "sales", "city", "country")
//Prepare the map columns to make key and values as per need with help of counter value. Bit of nasty iteration work is required
var preCols: Column = null
var counter = 2 // Starting counter value from 2 as I need sales column's value as key and city column's value as value.
val size = rawDF.schema.fields.length
val mapColumns = rawDF.schema.flatMap { field =>
// Create condition for Map
val resultedMap = if (counter == size)
Seq(preCols, col(field.name))
else
Seq(lit(field.name), col(field.name))
//Assign the current field name for tracking and increment the counter by 1
preCols = col(field.name)
counter += 1
resultedMap
}
//Adding new map column in existed DataFrame
val dfWithMapColumn = rawDF.withColumn("created_map_key_from_another_column_value", map(mapColumns: _*))
//Print the DataFrame
dfWithMapColumn.show(false)
}
Comments
Post a Comment