在Sp中填充多个数据帧时管理内存使用和数据库调用

2024-07-08 10:01:52 发布

您现在位置:Python中文网/ 问答频道 /正文

我目前正在尝试使用Pyspark脚本,通过jtdsjdbc驱动程序查询sybasease数据库。此操作执行以下操作:

  • 将“昨天”数据从拼花文件导入数据帧。此数据包含所有ASE列的主键和哈希
  • 连接到ASE实例。将表中的数据加载到第二个数据帧中
  • 散列第二个数据帧中的列并将其加载到第三个数据帧中
  • 使用左键反联接将第三个数据帧中的主键字段与第一个数据帧中的主键字段进行比较,以确定新记录。这将创建第四个数据帧
  • 使用内部联接将第三个数据帧中的散列字段与第一个数据帧进行比较,以确定更新的记录。这将创建第五个数据帧
  • 合并第四和第五个数据帧以创建最终的数据帧
  • 将最终数据帧输出到拼花文件中

我已经确保火花-默认值.conf文件有足够的内存火花驱动记忆以及spark.executor.memory内存(两种设置均为14g)

我还确保使用了.cache()关键字

对于内存管理,在一个数据帧被另一个数据帧占用之后,我一直在“取消持久化”它。你知道吗

虽然内存使用得到了很好的管理,但我发现ASE表被调用了两次。如果保留所有数据帧,内存使用率非常高,但数据库只调用一次。你知道吗

示例:

from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
from pyspark.sql.session import SparkSession
from pyspark.sql.functions import md5, concat_ws, col, lit, broadcast

sconf = SparkConf()
conf = SparkConf().setMaster("local")\
    .setAppName(appName)

sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)

sp = SparkSession \
    .builder \
    .appName("My App") \
    .getOrCreate()

# Read in the comparison file
df_data_table_yesterdays_data_table = sqlContext.read.parquet("C:/tmp/yesterdays_data_table.parquet") \
    .cache()

# Create connection and load the data from the Sybase ASE instance
df_data_table = sp.read.format("jdbc") \
    .option("url", "jdbc:jtds:sybase:<url>/<database>") \
    .option("dbtable", "dbo.data_table") \
    .option("user", "a_user") \
    .option("password", "<password>") \
    .load() \
    .cache()

# Add a hash of all columns to the dataframe
df_data_table_hashkey = df_data_table.withColumn("hashkey", md5(concat_ws("", *df_data_table.columns))) \
    .cache()

# Drop the initial data frame
df_data_table.unpersist()

# Return new records into data frame
df_data_table_insert = df_data_table_hashkey.alias('a').join(df_data_table_yesterdays_data_table.alias('b'), (col('a.pkey') == col('b.pkey')), 'left_anti') \
    .select(lit("Insert").alias("_action"), 'a.*') \
    .dropDuplicates() \
    .cache()

# Return updated records into data frame
df_data_table_updates = df_data_table_hashkey.alias('a').join(df_data_table_yesterdays_data_table.alias('b'), (col('a.pkey') == col('b.pkey')) &
                                   (col('a.hashkey') != col('b.hashkey')), 'inner') \
    .select(lit("Update").alias("_action"), 'a.*') \
    .dropDuplicates() \
    .cache()

# Drop the hashkey data frame
df_data_table_hashkey.unpersist()

# Join the two delta data frames
df_data_table_delta = df_data_table_insert.union(df_data_table_updates).cache()

# Output
df_data_table_delta.coalesce(1).write.format("parquet").mode("append").save("C:/tmp/todays_delta_data_table.parquet")

sc.stop()


我很惊讶地看到,基于我如何“取消持久化”数据帧的额外数据库点击。理想情况下,我希望能够在数据帧被行中的下一个数据帧占用后删除它。你知道吗

有人能告诉我是否有更好的方法吗?或者这将是一个在数据库连接和内存管理之间进行实验和权衡的案例?你知道吗

谢谢


Tags: the数据内存from数据库cachedfdata

热门问题