我有一个Pypark应用程序,它需要从Azure blob存储帐户读取文件,在该帐户中,文件每5分钟按以下格式分区到文件夹:
\Root\yyyy\mm\dd\HH\MM\files.csv
我有一个每小时运行一次的进程,希望处理自上次运行以来的所有文件(如果错过了一次运行,可能会超过一个小时)。我管理一个高水位线,它告诉我上一次处理文件夹的时间。你知道吗
在文件中还有一个datetime字段,它与路径datetime相匹配(第二个字段有更多细节)。你知道吗
请注意,我无法将文件夹结构更改为年=yyyy\month=mm等的首选分区方法
我写了这个函数:
from datetime import datetime
def folderDateTimeRange(startDateTime, endDateTime, levels=5):
if startDateTime.year != endDateTime.year:
return '/{*}' * levels
elif startDateTime.month != endDateTime.month:
return datetime.strftime(startDateTime, '%Y') + '/{*}' * (levels - 1)
elif startDateTime.day != endDateTime.day:
return datetime.strftime(startDateTime, '%Y/%m') + '/{*}' * (levels - 2)
elif startDateTime.hour != endDateTime.hour:
return datetime.strftime(startDateTime, '%Y/%m/%d') + '/{*}' * (levels - 3)
else:
return ""
这限制了大多数情况下读取的文件夹数。我仍然需要过滤数据的读取时间与传递到函数中的开始时间和结束时间相同,因为第二天的23:00到01:00将在日和小时部分返回{*},因此我认为这可能更有效。你知道吗
在最坏的示例中,传入start=2018-12-31 22:00:00和end=2019-01-01 01:00:00-这将导致读取所有年份的所有数据。你知道吗
我对地球仪的知识是有限的,但是有可能通过一个范围而不是{*}吗?你知道吗
是的,可以使用大括号返回项目列表,也可以使用正则表达式。你知道吗
检查这里:Read range of files in pySpark和这里:pyspark select subset of files using regex/glob from s3(我不确定Azure和S3有多大的不同,但我的假设是PySpark可以将其抽象出来;如果我错了,请纠正我。)
您还可以通过生成几个路径并发送它们来最大限度地减少读取文件的“浪费”,而不是只发送一个路径(这样可以确保您在跨一年到下一年时不会遇到读取两年数据的相同陷阱)
为了好玩,我在底部写了一些测试代码,你可以返回这些列表,得到你想要的:
相关问题 更多 >
编程相关推荐