使用Python或J从本地将数据上载到Azure ADLS Gen2

2024-09-27 00:17:16 发布

您现在位置:Python中文网/ 问答频道 /正文

我在Data Lake Gen2上有一个Azure存储帐户。我想用Python(或Java)将本地数据上传到lakegen2文件系统。在

我找到了关于如何与存储帐户中的文件共享交互的examples,但是我还不能找到如何上传到Lake(而不是文件共享)。我还发现了如何为gen1lakeshere做这件事,但是除了Gen2的closedrequests什么都没有。在

我的问题是,到目前为止,Python是否可以实现这一点;或者,如何使用Java将文件上载到gen2lake?如果有一个代码片段演示上传的API调用,我们将不胜感激。在


Tags: 文件数据代码apidata帐户javaazure
1条回答
网友
1楼 · 发布于 2024-09-27 00:17:16

根据官方教程^{},如下所示,如果您没有注册multi-protocol access on Data Lake Storage的公共预览,则不能直接使用azurestoragesdkforpython在azuredatalakestoregen2中执行任何操作。在

Note

The features described in this article are available to accounts that have a hierarchical namespace only if you enroll in the public preview of multi-protocol access on Data Lake Storage. To review limitations, see the known issues article.

因此,将数据上传到adlsgen2的唯一解决方案是使用adlsgen2的restapi,请参考它的引用^{}。在

下面是我用Python将数据上传到adlsgen2的示例代码,它工作得很好。在

import requests
import json

def auth(tenant_id, client_id, client_secret):
    print('auth')
    auth_headers = {
        "Content-Type": "application/x-www-form-urlencoded"
    }
    auth_body = {
        "client_id": client_id,
        "client_secret": client_secret,
        "scope" : "https://storage.azure.com/.default",
        "grant_type" : "client_credentials"
    }
    resp = requests.post(f"https://login.microsoftonline.com/{tenant_id}/oauth2/v2.0/token", headers=auth_headers, data=auth_body)
    return (resp.status_code, json.loads(resp.text))

def mkfs(account_name, fs_name, access_token):
    print('mkfs')
    fs_headers = {
        "Authorization": f"Bearer {access_token}"
    }
    resp = requests.put(f"https://{account_name}.dfs.core.windows.net/{fs_name}?resource=filesystem", headers=fs_headers)
    return (resp.status_code, resp.text)

def mkdir(account_name, fs_name, dir_name, access_token):
    print('mkdir')
    dir_headers = {
        "Authorization": f"Bearer {access_token}"
    }
    resp = requests.put(f"https://{account_name}.dfs.core.windows.net/{fs_name}/{dir_name}?resource=directory", headers=dir_headers)
    return (resp.status_code, resp.text)

def touch_file(account_name, fs_name, dir_name, file_name, access_token):
    print('touch_file')
    touch_file_headers = {
        "Authorization": f"Bearer {access_token}"
    }
    resp = requests.put(f"https://{account_name}.dfs.core.windows.net/{fs_name}/{dir_name}/{file_name}?resource=file", headers=touch_file_headers)
    return (resp.status_code, resp.text)

def append_file(account_name, fs_name, path, content, position, access_token):
    print('append_file')
    append_file_headers = {
        "Authorization": f"Bearer {access_token}",
        "Content-Type": "text/plain",
        "Content-Length": f"{len(content)}"
    }
    resp = requests.patch(f"https://{account_name}.dfs.core.windows.net/{fs_name}/{path}?action=append&position={position}", headers=append_file_headers, data=content)
    return (resp.status_code, resp.text)

def flush_file(account_name, fs_name, path, position, access_token):
    print('flush_file')
    flush_file_headers = {
        "Authorization": f"Bearer {access_token}"
    }
    resp = requests.patch(f"https://{account_name}.dfs.core.windows.net/{fs_name}/{path}?action=flush&position={position}", headers=flush_file_headers)
    return (resp.status_code, resp.text)

def mkfile(account_name, fs_name, dir_name, file_name, local_file_name, access_token):
    print('mkfile')
    status_code, result = touch_file(account_name, fs_name, dir_name, file_name, access_token)
    if status_code == 201:
        with open(local_file_name, 'rb') as local_file:
            path = f"{dir_name}/{file_name}"
            content = local_file.read()
            position = 0
            append_file(account_name, fs_name, path, content, position, access_token)
            position = len(content)
            flush_file(account_name, fs_name, path, position, access_token)
    else:
        print(result)


if __name__ == '__main__':
    tenant_id = '<your tenant id>'
    client_id = '<your client id>'
    client_secret = '<your client secret>'

    account_name = '<your adls account name>'
    fs_name = '<your filesystem name>'
    dir_name = '<your directory name>'
    file_name = '<your file name>'
    local_file_name = '<your local file name>'

    # Acquire an Access token
    auth_status_code, auth_result = auth(tenant_id, client_id, client_secret)
    access_token = auth_status_code == 200 and auth_result['access_token'] or ''
    print(access_token)

    # Create a filesystem
    mkfs_status_code, mkfs_result = mkfs(account_name, fs_name, access_token)
    print(mkfs_status_code, mkfs_result)

    # Create a directory
    mkdir_status_code, mkdir_result = mkdir(account_name, fs_name, dir_name, access_token)
    print(mkdir_status_code, mkdir_result)

    # Create a file from local file
    mkfile(account_name, fs_name, dir_name, file_name, local_file_name, access_token)

希望有帮助。在

相关问题 更多 >

    热门问题