2024-10-01 00:31:18 发布
网友
我的任务是将数百万个单独的JSON文件转换并合并成大的CSV文件。在
使用复制活动和映射模式的操作非常简单,我已经测试过了,问题是大量的文件具有错误的JSON格式。在
我知道错误是什么,修复也非常简单,我想我可以使用Python数据块活动来修复字符串,然后将输出传递给copy活动,该活动可以将记录合并到一个大的CSV文件中。在
我有这样的想法,我不确定这是否是解决这项任务的正确方法。我不知道在数据块活动中使用Copy Activy的输出
将JSON文件复制到存储(例如BLOB),您可以从Databricks访问存储。然后,您可以使用Python修复该文件,甚至可以在集群运行时将其转换为所需的格式。在
因此,在“复制数据”活动中,如果还没有文件,请将它们复制到BLOB。在
Azure现在不想使用大量的Azure@JSON文件。但是,既然您使用了azuredatabricks,那么编写一个简单的Python脚本来完成同样的事情对您来说就更容易了。因此,一个有效的解决方案是直接使用azurestoragesdk和pandasPython包,通过对Azure数据库的几个步骤来实现这一点。在
pandas
可能这些JSON文件都在Azure Blob存储的一个容器中,所以你需要通过^{}在容器中列出它们,并用sas token for pandas^{}函数生成它们的url,代码如下。在
from azure.storage.blob.baseblobservice import BaseBlobService from azure.storage.blob import ContainerPermissions from datetime import datetime, timedelta account_name = '<your account name>' account_key = '<your account key>' container_name = '<your container name>' service = BaseBlobService(account_name=account_name, account_key=account_key) token = service.generate_container_shared_access_signature(container_name, permission=ContainerPermissions.READ, expiry=datetime.utcnow() + timedelta(hours=1),) blob_names = service.list_blob_names(container_name) blob_urls_with_token = (f"https://{account_name}.blob.core.windows.net/{container_name}/{blob_name}?{token}" for blob_name in blob_names) #print(list(blob_urls_with_token))
然后,您可以通过read_json函数直接从blob读取这些JSON文件,以创建它们的pandas Dataframe。在
read_json
即使您想将它们合并到一个大的CSV文件中,也可以首先通过^{}中列出的pandas函数将它们合并到一个大数据帧中,比如append。
append
要将数据帧写入csv文件,我认为通过^{}函数很容易。或者,您可以将pandas数据帧转换为Azure Databricks上的PySpark数据帧,如下代码所示。在
from pyspark.sql import SQLContext from pyspark import SparkContext sc = SparkContext() sqlContest = SQLContext(sc) spark_df = sqlContest.createDataFrame(df)
所以接下来,不管你想做什么,都很简单。如果你想在azuredatabricks中将脚本安排为笔记本,你可以参考官方文档^{}来运行Spark作业。在
希望有帮助。在
将JSON文件复制到存储(例如BLOB),您可以从Databricks访问存储。然后,您可以使用Python修复该文件,甚至可以在集群运行时将其转换为所需的格式。在
因此,在“复制数据”活动中,如果还没有文件,请将它们复制到BLOB。在
Azure现在不想使用大量的Azure@JSON文件。但是,既然您使用了azuredatabricks,那么编写一个简单的Python脚本来完成同样的事情对您来说就更容易了。因此,一个有效的解决方案是直接使用azurestoragesdk和
pandas
Python包,通过对Azure数据库的几个步骤来实现这一点。在可能这些JSON文件都在Azure Blob存储的一个容器中,所以你需要通过^{} 在容器中列出它们,并用sas token for pandas^{} 函数生成它们的url,代码如下。在
然后,您可以通过
^{pr2}$read_json
函数直接从blob读取这些JSON文件,以创建它们的pandas Dataframe。在即使您想将它们合并到一个大的CSV文件中,也可以首先通过^{} 中列出的pandas函数将它们合并到一个大数据帧中,比如
append
。要将数据帧写入csv文件,我认为通过^{} 函数很容易。或者,您可以将pandas数据帧转换为Azure Databricks上的PySpark数据帧,如下代码所示。在
所以接下来,不管你想做什么,都很简单。如果你想在azuredatabricks中将脚本安排为笔记本,你可以参考官方文档^{} 来运行Spark作业。在
希望有帮助。在
相关问题 更多 >
编程相关推荐