我是Spark的新手,对RDD和数据帧有以下高级问题,如果我没有弄错的话,这些RDD和数据帧是建立在RDD之上的:
我知道有两种类型的操作可以在RDD上完成,转换和操作。我还理解,只有在对RDD执行操作时,转换才会执行,RDD是转换的产物。考虑到RDD在内存中,我想知道是否有可能优化这些RDD消耗的内存量,以下面的示例为例:
KafkaDF = KafkaDFRaw.select(
KafkaDFRaw.key,
KafkaDFRaw.value,
KafkaDFRaw.topic,
unix_timestamp('timestamp',
'yyyy-MM-dd HH:mm:ss').alias('kafka_arrival_time')
).withColumn("spark_arrival_time", udf(time.time, DoubleType())())
我有一个KafkaDFRaw数据帧,我生成了一个名为KafkaDF的新RDD。然后,我希望向这个新RDD添加列。我应该将它们添加到现有RDD中吗?像这样:
decoded_value_udf = udf(lambda value: value.decode("utf-8"))
KafkaDF = KafkaDF\
.withColumn(
"cleanKey", decoded_value_udf(KafkaDF.key))\
.withColumn(
"cleanValue", decoded_value_udf(KafkaDF.value))
或者我应该从上一个数据帧创建一个新的数据帧?像这样:
decoded_value_udf = udf(lambda value: value.decode("utf-8"))
KafkaDF_NEW = KafkaDF\
.withColumn(
"cleanKey", decoded_value_udf(KafkaDF.key))\
.withColumn(
"cleanValue", decoded_value_udf(KafkaDF.value))
这对内存优化有影响吗
提前感谢您的帮助
无论何时调用操作,都会执行优化的dag,并按照计划使用内存。 您可以比较执行计划以了解:
在两者之间创建额外的变量来保存转换不会影响内存利用率。内存需求将取决于数据大小、分区大小、洗牌等
相关问题 更多 >
编程相关推荐