在pyspark中每隔1小时从postgres DB读取数据

2024-09-19 23:40:02 发布

您现在位置:Python中文网/ 问答频道 /正文

我想从postgres db读取1小时时间间隔的数据,我希望进程每1小时运行一次。我该怎么做?我已经附上了我的代码片段。我无法使用readstream for jdbc选项

df = spark.read \
.format("jdbc") \
.option("url", URL) \
.option("dbtable", "tagpool_with_tag_raw") \
.option("user", "tsdbadmin") \
.option("password", "cgqu5qss2zy3i1") \
.option("driver", "org.postgresql.Driver") \
.load()

# Getting the current date and time
dt = datetime.datetime.now(timezone.utc)
utc_time = dt.replace(tzinfo=timezone.utc)
utc_timestamp = utc_time.timestamp()
epoch = round(utc_timestamp / 60) * 60
# epoch = epoch+3600
print("epoch ", epoch)

df.createOrReplaceTempView("tagpool_with_tag_raw")
x = spark.sql("""select *  from tagpool_with_tag_raw""")
x.show()
query = spark.sql("select *  from tagpool_with_tag_raw WHERE input_time = " + str(epoch))  # .format()

    # query = spark.sql("select CAST(input_time AS bigint), CAST(orig_time AS bigint) ,  from tagpool_with_tag_raw WHERE input_time = "+ epoch) #.format()
query.show()
# df.selectExpr(("SELECT * FROM public.tagpool_raw WHERE input_time<= %s".format(epoch)))
df.printSchema()

query.write \
    .format("jdbc") \
    .option("url", URL) \
    .option("dbtable", "tagpool_tag_raw") \
    .option("user", USER) \
    .option("password", PW) \
    .option("driver", DRIVER).save()

Tags: formatdfinputrawtimetagwithquery
1条回答
网友
1楼 · 发布于 2024-09-19 23:40:02

Readstream不适用于jdbc,因为jdbc是一个批处理操作,所以您必须像以前一样创建一个流程,并使用AutoSys或oozie之类的调度程序,或者您的企业中的任何东西每小时运行一次

相关问题 更多 >