Python中文
首页
教程
问答
标签
搜索
登录
注册
如何使用给定的reduce函数合并基于pyspark中一个字段的多个JSON数据行
回答此问题可获得
20
贡献值,回答如果被采纳可获得
50
分。
<p>如何使用下面的merge函数与pyspark合并如下所示的JSON数据行?在</p> <p>注意:假设这只是一个细节示例,我有1000行数据要合并。最有效的解决方案是什么?不管是好是坏,我必须使用pyspark。在</p> <p>输入:</p> <pre><code>data = [ {'timestamp': '20080411204445', 'address': '100 Sunder Ct', 'name': 'Joe Schmoe'}, {'timestamp': '20040218165319', 'address': '100 Lee Ave', 'name': 'Joe Schmoe'}, {'timestamp': '20120309173318', 'address': '1818 Westminster', 'name': 'John Doe'}, ... More ... ] </code></pre> <p>期望输出:</p> ^{pr2}$ <p>合并功能:</p> <pre><code>def reduce_on_name(a, b): '''Combines two JSON data rows based on name''' merged = {} if a['name'] == b['name']: addresses = (a['timestamp'], a['address']), (b['timestamp'], b['address']) merged['name'] = a['name'] merged['addresses'] = addresses return merged </code></pre>
0 条评论
分类:
Python问答
请先
登录
后评论
默认排序
时间排序
1 个回答
匿名
1天前
擅长:python、mysql、java
<p>我想应该是这样的:</p> <pre><code>sc.parallelize(data).groupBy(lambda x: x['name']).map(lambda t: {'name':t[0],'addresses':[(x['timestamp'], x['address']) for x in t[1]]}).collect() </code></pre>
请先
登录
后评论
针对此问题:
更多的回答
关注
89
关注
收藏
1
收藏,
216
浏览
网友 提问于 2天前
相关Python问题
无法使用Django/mongoengine连接到MongoDB(身份验证失败)
6 回答
无法使用Django\u mssql\u后端迁移到外部hos
8 回答
无法使用Django&Python3.4连接到MySql
9 回答
无法使用Django+nginx上载媒体文件
6 回答
无法使用Django1.6导入名称模式
4 回答
无法使用Django1.7和mongodb登录管理站点
9 回答
无法使用Djangoadmin创建项目,进程使用了错误的路径,因为我事先安装了错误的Python
6 回答
无法使用Djangockedi验证CBV中的字段
6 回答
无法使用Djangocketditor上载图像(错误400)
6 回答
无法使用Djangocron进行函数调用
8 回答
无法使用Djangofiler djang上载文件
7 回答
无法使用Djangokronos
4 回答
无法使用Djangomssql provid
10 回答
无法使用Djangomssql连接到带有Django 1.11的MS SQL Server 2016
10 回答
无法使用Djangomssq迁移Django数据库
10 回答
无法使用Djangonox创建用户
7 回答
无法使用Djangopyodb从Django查询SQL Server
6 回答
无法使用Djangopython3ldap连接到ldap
8 回答
无法使用Djangoredis连接到redis
4 回答
无法使用Django中的FK创建新表
6 回答