我想从postgres db读取1小时时间间隔的数据,我希望进程每1小时运行一次。我该怎么做?我已经附上了我的代码片段。我无法使用readstream for jdbc选项
df = spark.read \
.format("jdbc") \
.option("url", URL) \
.option("dbtable", "tagpool_with_tag_raw") \
.option("user", "tsdbadmin") \
.option("password", "cgqu5qss2zy3i1") \
.option("driver", "org.postgresql.Driver") \
.load()
# Getting the current date and time
dt = datetime.datetime.now(timezone.utc)
utc_time = dt.replace(tzinfo=timezone.utc)
utc_timestamp = utc_time.timestamp()
epoch = round(utc_timestamp / 60) * 60
# epoch = epoch+3600
print("epoch ", epoch)
df.createOrReplaceTempView("tagpool_with_tag_raw")
x = spark.sql("""select * from tagpool_with_tag_raw""")
x.show()
query = spark.sql("select * from tagpool_with_tag_raw WHERE input_time = " + str(epoch)) # .format()
# query = spark.sql("select CAST(input_time AS bigint), CAST(orig_time AS bigint) , from tagpool_with_tag_raw WHERE input_time = "+ epoch) #.format()
query.show()
# df.selectExpr(("SELECT * FROM public.tagpool_raw WHERE input_time<= %s".format(epoch)))
df.printSchema()
query.write \
.format("jdbc") \
.option("url", URL) \
.option("dbtable", "tagpool_tag_raw") \
.option("user", USER) \
.option("password", PW) \
.option("driver", DRIVER).save()
Readstream不适用于jdbc,因为jdbc是一个批处理操作,所以您必须像以前一样创建一个流程,并使用AutoSys或oozie之类的调度程序,或者您的企业中的任何东西每小时运行一次
相关问题 更多 >
编程相关推荐