我是新手。我们有下面的配置单元查询,在它上面,我们使用spark和python执行pivot操作。在
下面的pyspark脚本执行一些透视操作并写入配置单元表。配置单元查询返回1.4亿行。在
方法1
from pyspark import SparkContext
from pyspark import HiveContext
from pyspark.sql import functions as F
sc = SparkContext()
hc = HiveContext(sc)
tbl = hc.sql("""
Select Rating.BranchID
, Rating.Vehicle
, Rating.PersonalAutoCov
, Rating.PersonalVehicleCov
, Rating.EffectiveDate
, Rating.ExpirationDate
, attr.name as RatingAttributeName
, Cast(Rating.OutputValue as Int) OutputValue
, Rating.InputValue
From db.dbo_pcx_paratingdata_piext_master rating
Inner Join db.dbo_pctl_ratingattrname_piext_master attr
on rating.RatingAttribute = attr.id
and attr.CurrentRecordIndicator = 'Y'
Where
rating.CurrentRecordIndicator = 'Y'
""")
tbl.cache()
pvttbl1 = tbl.groupby("BranchId","Vehicle","PersonalAutoCov","PersonalVehicleCov","EffectiveDate","ExpirationDate")\
.pivot("RatingAttributeName")\
.agg({"InputValue":"max", "OutputValue":"sum"})
pvttbl1.createOrReplaceTempView("paRatingAttributes")
hc.sql("Create table dev_pekindataaccesslayer.createcount as select * from paRatingAttributes")
当我用spark submit命令运行上面的脚本时
java.lang.OutOfMemoryError: Java heap space
或者有时候
java.lang.OutOfMemoryError: GC overhead limit exceeded
我使用的spark submit命令。在
^{pr2}$详细日志:
^{3}$我对上面的pyspark脚本做了一些小的修改,并且没有任何问题
方法2
from pyspark import SparkContext
from pyspark import HiveContext
from pyspark.sql import SQLContext
from pyspark.sql import functions as F
sc = SparkContext()
hc = HiveContext(sc)
sqlContext = SQLContext(sc)
tbl = hc.sql("""
Select Rating.BranchID
, Rating.Vehicle
, Rating.PersonalAutoCov
, Rating.PersonalVehicleCov
, Rating.EffectiveDate
, Rating.ExpirationDate
, attr.name as RatingAttributeName
, Cast(Rating.OutputValue as Int) OutputValue
, Rating.InputValue
From db.dbo_pcx_paratingdata_piext_master rating
Inner Join db.dbo_pctl_ratingattrname_piext_master attr
on rating.RatingAttribute = attr.id
and attr.CurrentRecordIndicator = 'Y'
Where
rating.CurrentRecordIndicator = 'Y'
""")
tbl.createOrReplaceTempView("Ptable")
r=sqlContext.sql("select count(1) from Ptable")
m=r.collect()[0][0]
hc.sql("drop table if exists db.Ptable")
hc.sql("Create table db.Ptable as select * from Ptable")
tb2 = hc.sql("select * from db.Ptable limit "+str(m))
pvttbl1 = tb2.groupby("BranchId","Vehicle","PersonalAutoCov","PersonalVehicleCov","EffectiveDate","ExpirationDate")\
.pivot("RatingAttributeName")\
.agg({"InputValue":"max", "OutputValue":"sum"})
pvttbl1.createOrReplaceTempView("paRatingAttributes")
hc.sql("drop table if exists db.createcount")
hc.sql("Create table db.createcount STORED AS ORC as select * from paRatingAttributes")
但是上面的脚本涉及到中间表的创建,这是一个额外的步骤。 在方法2中,当我使用相同的spark submit命令保存limit关键字时,命令可以正常工作。在
我的方法1有什么问题,我如何才能使其有效?在
注意:我遵循了Spark java.lang.OutOfMemoryError: Java heap space并尝试了所有建议的conf参数,但仍然没有成功。在
目前没有回答
相关问题 更多 >
编程相关推荐