当前位置:   article > 正文

dataframe数组做元素,如何从struct元素的嵌套数组创建Spark DataFrame?

spark中dataframe struct

I have read a JSON file into Spark. This file has the following structure:

scala> tweetBlob.printSchema

root

|-- related: struct (nullable = true)

| |-- next: struct (nullable = true)

| | |-- href: string (nullable = true)

|-- search: struct (nullable = true)

| |-- current: long (nullable = true)

| |-- results: long (nullable = true)

|-- tweets: array (nullable = true)

| |-- element: struct (containsNull = true)

| | |-- cde: struct (nullable = true)

...

...

| | |-- cdeInternal: struct (nullable = true)

...

...

| | |-- message: struct (nullable = true)

...

...

What I would ideally want is a DataFrame with columns "cde", "cdeInternal", "message"... as shown below

root

|-- cde: struct (nullable = true)

...

...

|-- cdeInternal: struct (nullable = true)

...

...

|-- message: struct (nullable = true)

...

...

I have managed to use "explode" to extract elements from the "tweets" array into a column called "tweets"

scala> val tweets = tweetBlob.select(explode($"tweets").as("tweets"))

tweets: org.apache.spark.sql.DataFrame = [tweets: struct,maritalStatus:struct,parenthood:struct>,content:struct>,polarity:string>>>,cdeInternal:struct,tracks:array>>,message:struct,link:string,links:array>,listedCount:bigint,location:struct,objectType:string,postedTime...

scala> tweets.printSchema

root

|-- tweets: struct (nullable = true)

| |-- cde: struct (nullable = true)

...

...

| |-- cdeInternal: struct (nullable = true)

...

...

| |-- message: struct (nullable = true)

...

...

How can I select all columns inside the struct and create a DataFrame out of it? Explode does not work on a struct if my understanding is correct.

Any help is appreciated.

解决方案

One possible way to handle this is to extract required information from the schema. Lets start with some dummy data:

import org.apache.spark.sql.DataFrame

import org.apache.spark.sql.types._

case class Bar(x: Int, y: String)

case class Foo(bar: Bar)

val df = sc.parallelize(Seq(Foo(Bar(1, "first")), Foo(Bar(2, "second")))).toDF

df.printSchema

// root

// |-- bar: struct (nullable = true)

// | |-- x: integer (nullable = false)

// | |-- y: string (nullable = true)

and a helper function:

def children(colname: String, df: DataFrame) = {

val parent = df.schema.fields.filter(_.name == colname).head

val fields = parent.dataType match {

case x: StructType => x.fields

case _ => Array.empty[StructField]

}

fields.map(x => col(s"$colname.${x.name}"))

}

Finally the results:

df.select(children("bar", df): _*).printSchema

// root

// |-- x: integer (nullable = true)

// |-- y: string (nullable = true)

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/木道寻08/article/detail/816425
推荐阅读
相关标签
  

闽ICP备14008679号