我目前正在使用IBM数据科学家工作台与Jupyter笔记本电脑和Spark。在
我试图将多个CSV文件读取到DF,然后对其应用一些转换,以便使用来自不同CSV文件的合并数据创建最终数据帧,但由于某些原因,我收到了以下错误:
原因:java.lang.OutOfMemoryError:超出GC开销限制 在java.util.array.副本(数组.java:2367) 在java.lang.AbstractStringBuilder.扩展能力(Ajava:130) 在java.lang.AbstractStringBuilder确保内部电容(Ajava:114) 在java.lang.AbstractStringBuilder.附加(Ajava:415) 在java.lang.StringBuilder。追加(StringBuilder.java:132)在
我使用的代码如下:
i=0
count = 0
var_name = []
schema = StructType([])
df1 = sqlContext.createDataFrame(sc.emptyRDD(), schema)
df1_ocurrences = sqlContext.createDataFrame(sc.emptyRDD(), schema)
df1_count = sqlContext.createDataFrame(sc.emptyRDD(), schema)
df1_merged = sqlContext.createDataFrame(sc.emptyRDD(), schema)
df1_complete = sqlContext.createDataFrame(sc.emptyRDD(), schema)
FINAL = sqlContext.createDataFrame(sc.emptyRDD(), schema)
for file in os.listdir('/resources/data/test_variables/'):
df1 = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load("/resources/data/test_variables/"+file)
#SKIP SERIES WITH ONLY 0s
count = df1.groupBy().sum("Bit_value")
if count.select("sum(Bit_value)").collect()[0][0] == 0:
continue
#
i+=1
# AGGREGATION
df1 = df1.withColumn("Interval", ((df1.Timestamp.cast("long") / 1).cast("long") * 1).cast("timestamp"))
# COUNT 1s
df1_ocurrences = df1.groupBy("Interval").sum("Bit_value").sort("Interval")
df1_ocurrences = df1_ocurrences.withColumnRenamed("sum(Bit_value)", "Sum_df1")
# COUNT TOTAL
df1_count = df1.groupBy("Interval").count().sort("Interval")
df1_count = df1_count.withColumnRenamed("count", "Total_df1")
# MERGING
df1_merged = df1_ocurrences.join(df1_count, ["Interval"]).sort("Interval")
var_name = file.split(".")
df1_complete = df1_merged.withColumn(var_name[0], df1_merged.Sum_df1 / df1_merged.Total_df1)
df1_complete = df1_complete.drop('Sum_df1')
df1_complete = df1_complete.drop('Total_df1')
#FINAL DATAFRAME
if i == 1:
FINAL = df1_complete
else:
FINAL = FINAL.join(df1_complete, ["Interval"]).sort("Interval")
有什么建议吗?也许我写的不是最高效的代码,但我是新手。在
花费在GC上的时间太长,释放的内存太少:https://developer.ibm.com/hadoop/2016/02/16/beginners-guide-apache-spark-troubleshooting/ 除了上述文章中的建议,在jypter中对我有用的是:
注意火花纱线执行器.memoryOverhead设置为执行器内存的10%。在
相关问题 更多 >
编程相关推荐