<pre><code>from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = SparkSession.builder.appName('Test').getOrCreate()
data = [
("harry", "london", [
{"score": "0.999926", "sentiment": {"score": "-0.640237"}, "text": "happy"},
{"score": "0.609836", "sentiment": {"score": "-0.607594"}, "text": "sad"},
{"score": "0.58564", "sentiment": {"score": "-0.6833"}, "text": "mad"}
]),
("sally", "london", [
{"score": "0.999926", "sentiment": {"score": "-0.640237"}, "text": "sad"},
{"score": "0.609836", "sentiment": {"score": "-0.607594"}, "text": "mad"},
{"score": "0.58564", "sentiment": {"score": "-0.6833"}, "text": "agitated"}
]),
("gary", "london", [
{"score": "0.999926", "sentiment": {"score": "-0.640237"}, "text": "excited"},
{"score": "0.609836", "sentiment": {"score": "-0.607594"}, "text": "down"},
{"score": "0.58564", "sentiment": {"score": "-0.6833"}, "text": "agitated"}
]),
("mary", "manchester", [
{"score": "0.999926", "sentiment": {"score": "-0.640237"}, "text": "sad"},
{"score": "0.609836", "sentiment": {"score": "-0.607594"}, "text": "low"},
{"score": "0.58564", "sentiment": {"score": "-0.6833"}, "text": "content"}
]),
("gerry", "manchester", [
{"score": "0.999926", "sentiment": {"score": "-0.640237"}, "text": "ecstatic"},
{"score": "0.609836", "sentiment": {"score": "-0.607594"}, "text": "good"},
{"score": "0.58564", "sentiment": {"score": "-0.6833"}, "text": "bad"}
])
]
df = spark.createDataFrame(data=data, schema=["name", "city", "sentiment"])
df.show()
df.filter(df.city == "london").select("name", "city", F.explode("sentiment").alias("sentiment"))\
.select("name", "city", F.col("sentiment.text").alias("sentiment")).show()
Output:
+ -+ + -+
| name| city|sentiment|
+ -+ + -+
|harry|london| happy|
|harry|london| sad|
|harry|london| mad|
|sally|london| sad|
|sally|london| mad|
|sally|london| agitated|
| gary|london| excited|
| gary|london| down|
| gary|london| agitated|
+ -+ + -+
</code></pre>