使用带有sp的Jupyter笔记本时内存不足

2024-10-05 10:18:47 发布

您现在位置:Python中文网/ 问答频道 /正文

我目前正在使用IBM数据科学家工作台与Jupyter笔记本电脑和Spark。在

我试图将多个CSV文件读取到DF,然后对其应用一些转换,以便使用来自不同CSV文件的合并数据创建最终数据帧,但由于某些原因,我收到了以下错误:

原因:java.lang.OutOfMemoryError:超出GC开销限制 在java.util.array.副本(数组.java:2367) 在java.lang.AbstractStringBuilder.扩展能力(Ajava:130) 在java.lang.AbstractStringBuilder确保内部电容(Ajava:114) 在java.lang.AbstractStringBuilder.附加(Ajava:415) 在java.lang.StringBuilder。追加(StringBuilder.java:132)在

我使用的代码如下:

i=0
count = 0
var_name = []

schema = StructType([])
df1 = sqlContext.createDataFrame(sc.emptyRDD(), schema)
df1_ocurrences = sqlContext.createDataFrame(sc.emptyRDD(), schema)
df1_count = sqlContext.createDataFrame(sc.emptyRDD(), schema)
df1_merged = sqlContext.createDataFrame(sc.emptyRDD(), schema)
df1_complete = sqlContext.createDataFrame(sc.emptyRDD(), schema)
FINAL = sqlContext.createDataFrame(sc.emptyRDD(), schema)

for file in os.listdir('/resources/data/test_variables/'):


    df1 = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load("/resources/data/test_variables/"+file)

    #SKIP SERIES WITH ONLY 0s
    count = df1.groupBy().sum("Bit_value")

    if count.select("sum(Bit_value)").collect()[0][0] == 0:
        continue
    #

    i+=1

    # AGGREGATION
    df1 = df1.withColumn("Interval", ((df1.Timestamp.cast("long") / 1).cast("long") * 1).cast("timestamp")) 
    # COUNT 1s
    df1_ocurrences = df1.groupBy("Interval").sum("Bit_value").sort("Interval")
    df1_ocurrences = df1_ocurrences.withColumnRenamed("sum(Bit_value)", "Sum_df1")
    # COUNT TOTAL
    df1_count = df1.groupBy("Interval").count().sort("Interval")
    df1_count = df1_count.withColumnRenamed("count", "Total_df1")
    # MERGING
    df1_merged = df1_ocurrences.join(df1_count, ["Interval"]).sort("Interval")
    var_name = file.split(".")
    df1_complete = df1_merged.withColumn(var_name[0], df1_merged.Sum_df1 / df1_merged.Total_df1)
    df1_complete = df1_complete.drop('Sum_df1')
    df1_complete = df1_complete.drop('Total_df1')

    #FINAL DATAFRAME

    if i == 1:
        FINAL = df1_complete
    else:
        FINAL = FINAL.join(df1_complete, ["Interval"]).sort("Interval")

有什么建议吗?也许我写的不是最高效的代码,但我是新手。在


Tags: langschemacountjavamergedfinalcompletedf1
1条回答
网友
1楼 · 发布于 2024-10-05 10:18:47

花费在GC上的时间太长,释放的内存太少:https://developer.ibm.com/hadoop/2016/02/16/beginners-guide-apache-spark-troubleshooting/ 除了上述文章中的建议,在jypter中对我有用的是:

spark = SparkSession.builder \
    .appName("GBT Model") \
    .config("spark.executor.memory", "2000mb") \
    .master("local[*]") \
    .config("spark.executor.cores", "4") \
    .config("spark.yarn.executor.memoryOverhead",200) \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .config("spark.default.parallelism", "4") \
    .getOrCreate()

注意火花纱线执行器.memoryOverhead设置为执行器内存的10%。在

相关问题 更多 >

    热门问题