Pyspark: How To Flatten Nested Arrays By Merging Values In Spark
I have 10000 jsons with different ids each has 10000 names. How to flatten nested arrays by merging values by int or str in pyspark? EDIT: I have added column name_10000_xvz to ex
Solution 1:
UPDATE
As @werner has mentioned, it's necessary to transform all structs to append the column name into it.
import pyspark.sql.functions as f
names = [column for column in df.columns if column.startswith('name_')]
expressions = []
for name in names:
expressions.append(f.expr('TRANSFORM({name}, el -> STRUCT("{name}" AS name, el.date, el.val))'.format(name=name)))
flatten_df = (df
.withColumn('flatten', f.flatten(f.array(*expressions)))
.selectExpr('id', 'inline(flatten)'))
output_df = (flatten_df
.groupBy('id', 'date')
.pivot('name', names)
.agg(f.first('val')))
output_df.sort('id', 'date').show(truncate=False)
+---+----+--------------+--------+--------+--------+
|id |date|name_10000_xvz|name_1_a|name_1_b|name_2_a|
+---+----+--------------+--------+--------+--------+
|1 |2000|30 |null |null |null |
|1 |2001|31 |1 |4 |21 |
|1 |2002|32 |2 |5 |22 |
|1 |2003|33 |3 |6 |23 |
|2 |1990|39 |null |null |null |
|2 |2000|30 |null |null |null |
|2 |2001|31 |1 |4 |21 |
|2 |2002|32 |2 |5 |22 |
|2 |2003|33 |3 |6 |23 |
|2 |2004|34 |null |null |null |
+---+----+--------------+--------+--------+--------+
OLD
Assuming:
date
value is always the same value all columnsname_1_a, name_1_b, name_2_a
their sizes are equals
import pyspark.sql.functions as f
output_df = (df
.withColumn('flatten', f.expr('TRANSFORM(SEQUENCE(0, size(name_1_a) - 1), i -> ' \
'STRUCT(name_1_a[i].date AS date, ' \
' name_1_a[i].val AS name_1_a, ' \
' name_1_b[i].val AS name_1_b, ' \
' name_2_a[i].val AS name_2_a))'))
.selectExpr('id', 'inline(flatten)'))
output_df.sort('id', 'date').show(truncate=False)
+---+----+--------+--------+--------+
|id |date|name_1_a|name_1_b|name_2_a|
+---+----+--------+--------+--------+
|1 |2001|1 |4 |21 |
|1 |2002|2 |5 |22 |
|1 |2003|3 |6 |23 |
|2 |2001|1 |4 |21 |
|2 |2002|2 |5 |22 |
|2 |2003|3 |6 |23 |
+---+----+--------+--------+--------+
Solution 2:
How are the naming conventions used?.
Can you try something below using spark-sql?
df.createOrReplaceTempView("df")
spark.sql("""
select id,
name_1_a.date[0] as date, name_1_a.val[0] as name_1_a, name_1_b.val[0] as name_1_b, name_2_a.val[0] as name_2_a
from df
""").show(false)
+---+----+--------+--------+--------+
|id |date|name_1_a|name_1_b|name_2_a|
+---+----+--------+--------+--------+
|1 |2001|1 |4 |21 |
|2 |2001|1 |4 |21 |
+---+----+--------+--------+--------+
Here are my assumptions.
- The first field is id and the rest are all names..1 to n like name_1_a, name_1_b, name_2_a, etc
- The date is same across all "n" names, so I can use the first field for deriving it.
Building up the dataframe.
JSON strings
valjsonstr1="""{ "id":1,"name_1_a": [ { "date":2001, "val":1 }, { "date":2002, "val":2 }, { "date":2003, "val":3 } ],"name_1_b": [ { "date":2001, "val":4 }, { "date":2002, "val":5 }, { "date":2003, "val":6 } ],"name_2_a": [ { "date":2001, "val":21 }, { "date":2002, "val":22 }, { "date":2003, "val":23 } ]}"""valjsonstr2="""{ "id":2,"name_1_a": [ { "date":2001, "val":1 }, { "date":2002, "val":2 }, { "date":2003, "val":3 } ],"name_1_b": [ { "date":2001, "val":4 }, { "date":2002, "val":5 }, { "date":2003, "val":6 } ],"name_2_a": [ { "date":2001, "val":21 }, { "date":2002, "val":22 }, { "date":2003, "val":23 } ]}"""
Dataframes
val df1 = spark.read.json(Seq(jsonstr1).toDS)
val df2 = spark.read.json(Seq(jsonstr2).toDS)
val df = df1.union(df2)
Now create a view on top of df. Im just naming it as "df"
df.createOrReplaceTempView("df")
Show the data:
df.show(false)
df.printSchema
Use the metadata and construct the sql string.
df.columns
Array[String] = Array(id, name_1_a, name_1_b, name_2_a)
val names = df.columns.drop(1) // drop idval sql1 = for { i <- 0 to 2
t1=names.map( x => x + s".val[${i}] as ${x}").mkString(",")
t2 = names(0) + ".date[0] as date ," + t1
_=println(t)
} yield s""" select id, ${t2} from df """val sql2 = sql1.mkString(" union All ")
Now sql2 contains the below string which is a valid sql
" select id, name_1_a.date[0] as date ,name_1_a.val[0] as name_1_a,name_1_b.val[0] as name_1_b,name_2_a.val[0] as name_2_a from df union All select id, name_1_a.date[0] as date ,name_1_a.val[1] as name_1_a,name_1_b.val[1] as name_1_b,name_2_a.val[1] as name_2_a from df union All select id, name_1_a.date[0] as date ,name_1_a.val[2] as name_1_a,name_1_b.val[2] as name_1_b,name_2_a.val[2] as name_2_a from df "
Pass it to spark.sql(sql2) and get the required result
spark.sql(sql2).orderBy("id").show(false)
+---+----+--------+--------+--------+|id |date|name_1_a|name_1_b|name_2_a|+---+----+--------+--------+--------+|1|2001|2|5|22||1|2001|1|4|21||1|2001|3|6|23||2|2001|1|4|21||2|2001|3|6|23||2|2001|2|5|22|+---+----+--------+--------+--------+
Post a Comment for "Pyspark: How To Flatten Nested Arrays By Merging Values In Spark"