我目前正在尝试使用Pyspark脚本,通过jtdsjdbc驱动程序查询sybasease数据库。此操作执行以下操作:
我已经确保火花-默认值.conf文件有足够的内存火花驱动记忆以及spark.executor.memory内存(两种设置均为14g)
我还确保使用了.cache()关键字
对于内存管理,在一个数据帧被另一个数据帧占用之后,我一直在“取消持久化”它。你知道吗
虽然内存使用得到了很好的管理,但我发现ASE表被调用了两次。如果保留所有数据帧,内存使用率非常高,但数据库只调用一次。你知道吗
示例:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
from pyspark.sql.session import SparkSession
from pyspark.sql.functions import md5, concat_ws, col, lit, broadcast
sconf = SparkConf()
conf = SparkConf().setMaster("local")\
.setAppName(appName)
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)
sp = SparkSession \
.builder \
.appName("My App") \
.getOrCreate()
# Read in the comparison file
df_data_table_yesterdays_data_table = sqlContext.read.parquet("C:/tmp/yesterdays_data_table.parquet") \
.cache()
# Create connection and load the data from the Sybase ASE instance
df_data_table = sp.read.format("jdbc") \
.option("url", "jdbc:jtds:sybase:<url>/<database>") \
.option("dbtable", "dbo.data_table") \
.option("user", "a_user") \
.option("password", "<password>") \
.load() \
.cache()
# Add a hash of all columns to the dataframe
df_data_table_hashkey = df_data_table.withColumn("hashkey", md5(concat_ws("", *df_data_table.columns))) \
.cache()
# Drop the initial data frame
df_data_table.unpersist()
# Return new records into data frame
df_data_table_insert = df_data_table_hashkey.alias('a').join(df_data_table_yesterdays_data_table.alias('b'), (col('a.pkey') == col('b.pkey')), 'left_anti') \
.select(lit("Insert").alias("_action"), 'a.*') \
.dropDuplicates() \
.cache()
# Return updated records into data frame
df_data_table_updates = df_data_table_hashkey.alias('a').join(df_data_table_yesterdays_data_table.alias('b'), (col('a.pkey') == col('b.pkey')) &
(col('a.hashkey') != col('b.hashkey')), 'inner') \
.select(lit("Update").alias("_action"), 'a.*') \
.dropDuplicates() \
.cache()
# Drop the hashkey data frame
df_data_table_hashkey.unpersist()
# Join the two delta data frames
df_data_table_delta = df_data_table_insert.union(df_data_table_updates).cache()
# Output
df_data_table_delta.coalesce(1).write.format("parquet").mode("append").save("C:/tmp/todays_delta_data_table.parquet")
sc.stop()
我很惊讶地看到,基于我如何“取消持久化”数据帧的额外数据库点击。理想情况下,我希望能够在数据帧被行中的下一个数据帧占用后删除它。你知道吗
有人能告诉我是否有更好的方法吗?或者这将是一个在数据库连接和内存管理之间进行实验和权衡的案例?你知道吗
谢谢
目前没有回答
相关问题 更多 >
编程相关推荐