<p>让我们首先使用示例中的数据创建一个数据帧:</p>
<pre class="lang-py prettyprint-override"><code>import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('explode_example').getOrCreate()
data = [
("harry", "london", [
{"score": "0.999926", "sentiment": {"score": "-0.640237"}, "text": "happy"},
{"score": "0.609836", "sentiment": {"score": "-0.607594"}, "text": "sad"},
{"score": "0.58564", "sentiment": {"score": "-0.6833"}, "text": "mad"}
]),
("sally", "london", [
{"score": "0.999926", "sentiment": {"score": "-0.640237"}, "text": "sad"},
{"score": "0.609836", "sentiment": {"score": "-0.607594"}, "text": "mad"},
{"score": "0.58564", "sentiment": {"score": "-0.6833"}, "text": "agitated"}
]),
("gary", "london", [
{"score": "0.999926", "sentiment": {"score": "-0.640237"}, "text": "excited"},
{"score": "0.609836", "sentiment": {"score": "-0.607594"}, "text": "down"},
{"score": "0.58564", "sentiment": {"score": "-0.6833"}, "text": "agitated"}
]),
("mary", "manchester", [
{"score": "0.999926", "sentiment": {"score": "-0.640237"}, "text": "sad"},
{"score": "0.609836", "sentiment": {"score": "-0.607594"}, "text": "low"},
{"score": "0.58564", "sentiment": {"score": "-0.6833"}, "text": "content"}
]),
("gerry", "manchester", [
{"score": "0.999926", "sentiment": {"score": "-0.640237"}, "text": "ecstatic"},
{"score": "0.609836", "sentiment": {"score": "-0.607594"}, "text": "good"},
{"score": "0.58564", "sentiment": {"score": "-0.6833"}, "text": "bad"}
])
]
df = spark.createDataFrame(data=data, schema = ["name", "city", "sentiment"])
</code></pre>
<p>您拥有的是以下数据帧:</p>
<pre><code>df.show(truncate=False)
+ -+ + -+
|name |city |sentiment |
+ -+ + -+
|harry|london |[[sentiment -> {score=-0.640237}, score -> 0.999926, text -> happy], [sentiment -> {score=-0.607594}, score -> 0.609836, text -> sad], [sentiment -> {score=-0.6833}, score -> 0.58564, text -> mad]] |
|sally|london |[[sentiment -> {score=-0.640237}, score -> 0.999926, text -> sad], [sentiment -> {score=-0.607594}, score -> 0.609836, text -> mad], [sentiment -> {score=-0.6833}, score -> 0.58564, text -> agitated]] |
|gary |london |[[sentiment -> {score=-0.640237}, score -> 0.999926, text -> excited], [sentiment -> {score=-0.607594}, score -> 0.609836, text -> down], [sentiment -> {score=-0.6833}, score -> 0.58564, text -> agitated]]|
|mary |manchester|[[sentiment -> {score=-0.640237}, score -> 0.999926, text -> sad], [sentiment -> {score=-0.607594}, score -> 0.609836, text -> low], [sentiment -> {score=-0.6833}, score -> 0.58564, text -> content]] |
|gerry|manchester|[[sentiment -> {score=-0.640237}, score -> 0.999926, text -> ecstatic], [sentiment -> {score=-0.607594}, score -> 0.609836, text -> good], [sentiment -> {score=-0.6833}, score -> 0.58564, text -> bad]] |
+ -+ + -+
</code></pre>
<p>一旦我们有了数据帧,您需要<strong>分解<code>sentiment</code>列:</p>
<pre class="lang-py prettyprint-override"><code>from pyspark.sql.functions import explode
df_exp = df.select(df["name"], df["city"], explode(df["sentiment"]))
</code></pre>
<p>结果是:</p>
<pre><code>df_exp.show(truncate=False)
+ -+ + -+
|name |city |col |
+ -+ + -+
|harry|london |[sentiment -> {score=-0.640237}, score -> 0.999926, text -> happy] |
|harry|london |[sentiment -> {score=-0.607594}, score -> 0.609836, text -> sad] |
|harry|london |[sentiment -> {score=-0.6833}, score -> 0.58564, text -> mad] |
|sally|london |[sentiment -> {score=-0.640237}, score -> 0.999926, text -> sad] |
|sally|london |[sentiment -> {score=-0.607594}, score -> 0.609836, text -> mad] |
|sally|london |[sentiment -> {score=-0.6833}, score -> 0.58564, text -> agitated] |
|gary |london |[sentiment -> {score=-0.640237}, score -> 0.999926, text -> excited] |
|gary |london |[sentiment -> {score=-0.607594}, score -> 0.609836, text -> down] |
|gary |london |[sentiment -> {score=-0.6833}, score -> 0.58564, text -> agitated] |
|mary |manchester|[sentiment -> {score=-0.640237}, score -> 0.999926, text -> sad] |
|mary |manchester|[sentiment -> {score=-0.607594}, score -> 0.609836, text -> low] |
|mary |manchester|[sentiment -> {score=-0.6833}, score -> 0.58564, text -> content] |
|gerry|manchester|[sentiment -> {score=-0.640237}, score -> 0.999926, text -> ecstatic]|
|gerry|manchester|[sentiment -> {score=-0.607594}, score -> 0.609836, text -> good] |
|gerry|manchester|[sentiment -> {score=-0.6833}, score -> 0.58564, text -> bad] |
+ -+ + -+
</code></pre>
<p>最后,让我们创建一个只包含文本的列,按城市筛选并获得3个想要的列:</p>
<pre class="lang-py prettyprint-override"><code># Extract text
df_exp = df_exp.withColumn("text", df_exp["col"].text)
# Select result columns and filter city
result = df_exp.select("name", "city", "text").where("city = 'london'")
</code></pre>
<p>结果将是:</p>
<pre><code>result.show(truncate=False)
+ -+ + +
|name |city |text |
+ -+ + +
|harry|london|happy |
|harry|london|sad |
|harry|london|mad |
|sally|london|sad |
|sally|london|mad |
|sally|london|agitated|
|gary |london|excited |
|gary |london|down |
|gary |london|agitated|
+ -+ + +
</code></pre>