spark convert RDD[Map] to DataFrame

将RDD[Map[String,String]] 转化为展平 DataFrame,类似于pyspark 中 dict 结构toDF的效果。

input

val mapRDD: RDD[Map[String, String]] = sc.parallelize(Seq(
   Map("name" -> "zhangsan", "age" -> "18", "addr" -> "bj"),
   Map("name" -> "lisi", "age" -> "20", "addr" -> "hz"),
))

output

name     age addr
zhangsan 18  bj
lisi     20  hz

1. Map中元素固定

每个 Map 只有三个元素的情况下

val columns=mapRDD.take(1).flatMap(_.keys)

val resultantDF=mapRDD.filter(_.nonEmpty).map{m=>
      val seq=m.values.toSeq
      (seq(0),seq(1),seq(2))
      }.toDF(columns:_*)

resultantDF.show()

2. Map中元素不固定
RDD[Map[String,String]] -> RDD[Row] -> DataFrame

  def map2DF(spark: SparkSession, rdd: RDD[Map[String, String]]): DataFrame = {
    val cols = rdd.take(1).flatMap(_.keys)
    val resRDD = rdd.filter(_.nonEmpty).map { m =>
      val seq = m.values.toSeq
      Row.fromSeq(seq)
    }

    val fields = cols.map(fieldName => StructField(fieldName, StringType, nullable = true))
    val schema = StructType(fields)

    spark.createDataFrame(resRDD, schema)
  }
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容