我使用的是pyspark 2.1。下面是我的输入数据帧。我被困在从不同的数据帧获取动态偏移值请帮助
df1=
价值类别
13个
2 2个
4 5个
df2型
类别年-月-周数-滞后-属性运行
0 0 2 0
2019年1月11日
2019年1月2日20
2019年1月3日30
2019年1月4日1
2019年1 5 5 2
2019年1月6日3
2019年1月7 7 4
2019年1月18日5
2019年1月9 9 6
2 0 0 0 9 0
2018年2月1 1 1 2 0
2 2018年1 2 3 2
2 2018年1 3 4 3
2 2018年1 3 5 4
如上例所示,df1是我的查找表,它有偏移值,对于1,偏移值是3,对于类别2,偏移值是2。在
在df2中,runs是我的输出列,因此对于df1中的每个类别值,如果lag值是3,那么从dataframe2[df2]应该考虑lag_attbute和lag down 3个值,因此可以看到每3个lag_属性值重复运行
我试过下面的代码没用。请帮忙
df1=df1.registerTempTable("df1")
df2=df2.registerTempTable("df2")
sqlCtx.sql("select st.category,st.Year,st.Month,st.weekyear,st.lag_attribute,LAG(st.lag_attribute,df1.value, 0) OVER (PARTITION BY st.cagtegory ORDER BY st.Year,st.Month,st.weekyear) as return_test from df1 st,df2 lkp where df1.category=df2.category")
请帮我跨过这个障碍
lag
接受一个column对象和一个整数(python integer),如函数签名所示: 在
^{pr2}$count
的值不能是pysparkIntegerType
(列对象)。不过,还是有一些变通办法,让我们从示例数据开始:如果
df1
不是太大(意味着少量的categories
,并且每个category
中可能有很多值),则可以将df1
转换为一个列表,并创建一个if elif elif。。。基于其值的条件:注意:如果
c
和l
是整数,如果它们是字符串,则:现在我们可以应用条件:
如果
df1
是大的,那么您可以在一个内置的lag
列上自连接df2
:首先,我们将使用一个连接将}:
values
从df1
带到{如果
df1
不是太大,则应该broadcast
它:现在我们将枚举每个
partition
中的行,并构建一个lag
列:注意:您的}。还有一些行在数据帧中具有相同的顺序(例如,示例数据帧中的最后两行
runs
滞后值有问题,例如catagory=2
它只是滞后1
,而不是{df2
具有相同的category, year, month and weeknumber
),因为涉及到洗牌,所以每次运行代码时可能会得到不同的结果。在相关问题 更多 >
编程相关推荐