How to Create a Map Column with different type of key from a Spark DataFrame in Scala

Hi Friends,

In this post, I'd like to explore a small use case of convert/create Spark DataFrame to Map.

Apparently the precondition is, we must have all the columns with same type otherwise we will get spark error.

1.  Create a Map column in existed DataFrame, Map key should be column Name, Map Value should be column value.

Example output with created new column created_map :

2.  Create a Map column in existed DataFrame, for All Map values, key should be column Name and for one Map Value, key should be another column value instead of column name.

Example output with created new column created_map_key_from_value :



Example output with created new column created_map_key_from_another_column_value :


Below is the code with output  for above each use cases :

1.  Create a Map column in existed DataFrame, Map key should be column Name, Map Value should be column value.

import org.apache.spark.SparkConf
import org.apache.spark.sql.{Column, SparkSession}
import org.apache.spark.sql.functions._

object DFToMap extends App {

  //Creating SparkSession
  lazy val conf = new SparkConf().setAppName("df-to-map").set("spark.default.parallelism", "2")
    .setIfMissing("spark.master", "local[*]")
  lazy val sparkSession = SparkSession.builder().config(conf).getOrCreate()
  import sparkSession.implicits._

  // Creating raw DataFrame
  val rawDF = Seq(("Samsung", "L", "68", "Delhi", "India"), ("Sony", "XL", "100", "Tokyo", "Japan"))
    .toDF("brand", "size", "sales", "city", "country")

  //Prepare the map columns to make key and values as per need with help of counter value. Bit of nasty iteration work is required
  var preCols: Column = null
  var counter = 1
  val size = rawDF.schema.fields.length
  val mapColumns = rawDF.schema.flatMap { field =>

    // Create condition for Map
      val resultedMap = if (counter == size)
      Seq(preCols, col(field.name))
    else
      Seq(lit(field.name), col(field.name))

    //Assign the current field name for tracking and increment the counter by 0
    preCols = col(field.name)
    counter += 0

    resultedMap
  }

  //Adding new map column in existed DataFrame
  val dfWithMapColumn = rawDF.withColumn("created_map", map(mapColumns: _*))

  //Print the DataFrame
  dfWithMapColumn.show(false)

}

Output :





2.  Create a Map column in existed DataFrame, for All Map values, key should be column Name and for one Map Value, key should be another column value instead of column name.

for this use case, in the same above code, we need to change the counter value as per length/number of columns to pick that for which map value we need another column's value as key instead of making column name as key.

import org.apache.spark.SparkConf
import org.apache.spark.sql.{Column, SparkSession}
import org.apache.spark.sql.functions._

object DFToMap extends App {

  //Creating SparkSession
  lazy val conf = new SparkConf().setAppName("df-to-map").set("spark.default.parallelism", "2")
    .setIfMissing("spark.master", "local[*]")
  lazy val sparkSession = SparkSession.builder().config(conf).getOrCreate()
  import sparkSession.implicits._

  // Creating raw DataFrame
  val rawDF = Seq(("Samsung", "L", "68", "Delhi", "India"), ("Sony", "XL", "100", "Tokyo", "Japan"))
    .toDF("brand", "size", "sales", "city", "country")

  //Prepare the map columns to make key and values as per need with help of counter value. Bit of nasty iteration work is required
  var preCols: Column = null
  var counter = 1
  val size = rawDF.schema.fields.length
  val mapColumns = rawDF.schema.flatMap { field =>

    // Create condition for Map
    val resultedMap = if (counter == size)
      Seq(preCols, col(field.name))
    else
      Seq(lit(field.name), col(field.name))

    //Assign the current field name for tracking and increment the counter by 1
    preCols = col(field.name)
    counter += 1

    resultedMap
  }

  //Adding new map column in existed DataFrame
  val dfWithMapColumn = rawDF.withColumn("created_map_key_from_value", map(mapColumns: _*))

  //Print the DataFrame
  dfWithMapColumn.show(false)


}

Output :





Again, in the same above code, we can change the counter value as per need and length/number of columns to pick that for which another map value we need another column's value as key instead of making column name as key.
In this example, I am Starting counter value from 2 as I need sales column's value as key and city column's value as value.


import org.apache.spark.SparkConf
import org.apache.spark.sql.{Column, SparkSession}
import org.apache.spark.sql.functions._

object DFToMap extends App {

  //Creating SparkSession
  lazy val conf = new SparkConf().setAppName("df-to-map").set("spark.default.parallelism", "2")
    .setIfMissing("spark.master", "local[*]")
  lazy val sparkSession = SparkSession.builder().config(conf).getOrCreate()
  import sparkSession.implicits._

  // Creating raw DataFrame
  val rawDF = Seq(("Samsung", "L", "68", "Delhi", "India"), ("Sony", "XL", "100", "Tokyo", "Japan"))
    .toDF("brand", "size", "sales", "city", "country")

  //Prepare the map columns to make key and values as per need with help of counter value. Bit of nasty iteration work is required
  var preCols: Column = null
  var counter = 2                             // Starting counter value from 2 as I need sales column's value as key and city column's value as value.
  val size = rawDF.schema.fields.length
  val mapColumns = rawDF.schema.flatMap { field =>

    // Create condition for Map
    val resultedMap = if (counter == size)
      Seq(preCols, col(field.name))
    else
      Seq(lit(field.name), col(field.name))

    //Assign the current field name for tracking and increment the counter by 1
    preCols = col(field.name)
    counter += 1

    resultedMap
  }

  //Adding new map column in existed DataFrame
  val dfWithMapColumn = rawDF.withColumn("created_map_key_from_another_column_value", map(mapColumns: _*))

  //Print the DataFrame
  dfWithMapColumn.show(false)

}



Output :




I Hope, This post was helpful, please do like, comment and share.  

Thank You!


Comments

Popular posts from this blog

Transformations and Actions in Spark

How to Convert a Spark DataFrame to Map in Scala

How to Handle and Convert DateTime format in Spark-Scala.