从python管理Azure中的事件中心

2024-04-20 06:36:32 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在尝试编写python代码,以便能够从python脚本在Azure event Hub上创建/删除事件中心。我已经在the documentation on this page之后创建了一个EventHubManagementClient。我相信我现在需要使用EventHubsOperations类as documented here

我有两个挑战:

  1. “from aaaa import EventHubsOperations”行中的“aaaa”是什么,以便能够引用该类?我似乎找不到如何调用相应的包来导入类
  2. 在使用该类时,为config、serializer和deserializer传递哪些都是必需值的值?也许有人可以分享一个如何使用这个类的例子

理想情况下,我希望调用create_或_delete方法来创建新的事件中心或从python脚本中删除现有的事件中心。如果有人能分享如何扩展这段代码来实现这一点,我将不胜感激。文档似乎越来越轻:“配置,必需,服务客户端的配置”

我的代码如下:

import setenv
import os
from azure.mgmt.eventhub import EventHubManagementClient
from azure.identity import DefaultAzureCredential

setenv.import_env_vars('')

vault_url = os.environ["KEY_VAULT_URL"]
subscription_id=os.environ["AZURE_SUBSCRIPTION_ID"]

credential = DefaultAzureCredential()

print('Creating EH_client...')
EH_client = EventHubManagementClient(vault_url, credential, subscription_id, base_url=None)
print('Created.')

EventHubsOperations(EH_client)

结果输出如下:

Project root: 
filename: env_values
Creating EH_client...
Created.
Traceback (most recent call last):
  File "/home/db533/gitRepos/GunaBot2/azure-mgmt/azure_test.py", line 25, in <module>
    EventHubsOperations(EH_client)
NameError: name 'EventHubsOperations' is not defined

Process finished with exit code 1

2条回答

下面是我从python中创建和删除EventHub的代码

我使用单独的脚本(setenv.py)加载存储在文本文件中的环境变量

import os
import setenv
from azure.mgmt.eventhub import EventHubManagementClient
from azure.mgmt.resource import ResourceManagementClient
from azure.common.credentials import ServicePrincipalCredentials
from azure.mgmt.storage import StorageManagementClient
from azure.mgmt.storage.models import (StorageAccountCreateParameters,Sku,SkuName,Kind)

set_env_path="C:\\Users\\db533\\PycharmProjects\\GunaBot2\\shared_files\\"
setenv.import_env_vars(set_env_path,'env_values')

def main():
    SUBSCRIPTION_ID = os.environ.get("AZURE_SUBSCRIPTION_ID", None)
    GROUP_NAME = "annabot-eventhub2"
    STORAGE_ACCOUNT_NAME = "storageaccountxyztest"
    NAMESPACE_NAME = "annabot-eventhub999"
    EVENTHUB_NAME = "worker99901"

    tenant_id = os.environ["AZURE_TENANT_ID"]
    client_id = os.environ["AZURE_CLIENT_ID"]
    client_secret = os.environ["AZURE_CLIENT_SECRET"]
    print('AZURE_CLIENT_SECRET:',client_secret)

    credential_common = ServicePrincipalCredentials(tenant=tenant_id, client_id=client_id, secret=client_secret)

    # Create client
    print(" Create resource client...")
    resource_client = ResourceManagementClient(credential_common, SUBSCRIPTION_ID)

    print(" Create Event hub client...")
    eventhub_client = EventHubManagementClient(credential_common,SUBSCRIPTION_ID)

    print(" Create storage client...")
    storage_client = StorageManagementClient(credential_common,SUBSCRIPTION_ID)

    # Create resource group
    print(" Create resource group...")
    resource_client.resource_groups.create_or_update(
        GROUP_NAME,
        {"location": "germanywestcentral"}
    )

    # Create StorageAccount
    print(" Create storageAccount...")
    storage_async_operation = storage_client.storage_accounts.create(
        GROUP_NAME,
        STORAGE_ACCOUNT_NAME,
        StorageAccountCreateParameters(
            sku=Sku(name=SkuName.standard_lrs),
            kind=Kind.storage_v2,
            location='germanywestcentral'
        )
    )
    storage_account = storage_async_operation.result()

    # Create Namespace
    print(" Create event hub namespace...")
    eventhub_client.namespaces.create_or_update(
        GROUP_NAME,
        NAMESPACE_NAME,
        {
          "sku": {
            "name": "Standard",
            "tier": "Standard"
          },
          "location": "Germany West Central",
          "tags": {
            "tag1": "value1",
            "tag2": "value2"
          },
          "kafka_enabled": "True"
        }
    ).result()

    # Create EventHub
    print(" Create event hub...")
    eventhub = eventhub_client.event_hubs.create_or_update(
        GROUP_NAME,
        NAMESPACE_NAME,
        EVENTHUB_NAME,
        {
          "message_retention_in_days": "4",
          "partition_count": "4",
          "status": "Active",
          "capture_description": {
            "enabled": True,
            "encoding": "Avro",
            "interval_in_seconds": "120",
            "size_limit_in_bytes": "10485763",
            "destination": {
              "name": "EventHubArchive.AzureBlockBlob",
              "storage_account_resource_id": "/subscriptions/" + SUBSCRIPTION_ID + "/resourceGroups/" + GROUP_NAME + "/providers/Microsoft.Storage/storageAccounts/" + STORAGE_ACCOUNT_NAME + "",
              "blob_container": "container",
              "archive_name_format": "{Namespace}/{EventHub}/{PartitionId}/{Year}/{Month}/{Day}/{Hour}/{Minute}/{Second}"
            }
          }
        }
    )
    print("Created EventHub: {}".format(eventhub))

    # Get EventHub
    eventhub = eventhub_client.event_hubs.get(
        GROUP_NAME,
        NAMESPACE_NAME,
        EVENTHUB_NAME
    )
    print("get() for EventHub: {}\n".format(eventhub))

    #Create authorisation rule
    eventhub_rule = eventhub_client.event_hubs.create_or_update_authorization_rule(
        GROUP_NAME,
        NAMESPACE_NAME,
        event_hub_name=EVENTHUB_NAME,
        authorization_rule_name="manager",
        rights=["LISTEN","SEND"]
    )
    print("create_or_update_authorization_rule() for Manager for EventHub: {}\n".format(eventhub_rule))

    # Get authorisation rule
    eventhub_rule2 = eventhub_client.event_hubs.get_authorization_rule(
        GROUP_NAME,
        NAMESPACE_NAME,
        event_hub_name=EVENTHUB_NAME,
        authorization_rule_name="manager"
    )
    print("get_authorization_rule() for manager for EventHub: {}\n".format(eventhub_rule2))

    # List keys
    namespace_keys = eventhub_client.event_hubs.list_keys(
        GROUP_NAME,
        NAMESPACE_NAME,
        event_hub_name=EVENTHUB_NAME,
        authorization_rule_name="manager"
    )
    print("list_keys() for EventHub: {}\n".format(namespace_keys))
    print("namespace_keys.primary_connection_string:",namespace_keys.primary_connection_string)

# Delete EventHub
    eventhub_client.event_hubs.delete(
        GROUP_NAME,
        NAMESPACE_NAME,
        EVENTHUB_NAME
    )
    print("Delete EventHub.")

    # Delete Namespace
    eventhub_client.namespaces.delete(
        GROUP_NAME,
        NAMESPACE_NAME
    ).result()

    # Delete StorageAccount
    storage_client.storage_accounts.delete(
        GROUP_NAME,
        STORAGE_ACCOUNT_NAME
    )

    # Delete resource group
    resource_client.resource_groups.delete(
        GROUP_NAME
    ).result()


if __name__ == "__main__":
    main()

加载环境变量的setenv.py脚本如下所示。(我是从另一个答案中得到这个答案的。不能因此而获得荣誉…)

import os

def import_env_vars(env_folder,env_filename):
    """Imports some environment variables from a special .env file in the
    project root directory.
    """
    print("env_folder:",env_folder)
    if len(env_folder) > 0 and env_folder[-1] != '\\':
        env_folder += '\\'
    try:
        print("filename:",env_folder+env_filename)
        envfile = open(env_folder+env_filename, "r")

    except IOError:
        raise Exception("You must have a {0} file in your project root "
                        "in order to run the server in your local machine. "
                        "This specifies some necessary environment variables. ")
    for line in envfile.readlines():
        [key,value] = line.strip().split("=")
        os.environ[key] = value
        print("key:",key)
        print("value:", value)

环境变量在文件中定义如下:

EVENTHUB_SERVER=gunabot-eventhub.servicebus.windows.net
DEV_STAGE=Dev
AZURE_SUBSCRIPTION_ID=xxxxxxxxx-xxxx-xxxxxxx-xxxxx-xxxx
AZURE_TENANT_ID=yyyyyyyyy-yyyyy-yyyyyy-yyyyyy
AZURE_CLIENT_ID=zzzzzz-zzzzzz-zzzzzz-zzzzzzz-zzz
AZURE_CLIENT_SECRET=qqqqq-qqqq-qqqqqqq-qqqqq-qqqqq

希望这能帮助别人

使用以下语句导入EventHubsOperation类:

from azure.mgmt.eventhub.v2021_01_01_preview.operations import EventHubsOperations

config是服务客户端的配置详细信息

serializerdeserializer对象是有助于将对象转储并加载到字节流中的对象,例如pickle模块Link想知道更多

准备好这些参数后,需要为EventHubsOperations类创建对象,并为所有参数传递值

object_name = EventHubsOperations(client=(value), config=(value), serializer=(value), deserializer=(value))

使用此对象,可以使用此类的create_or_updatedelete方法以及所需的参数

object_name.create_or_update(resource_group_name, namespace_name, event_hub_name, parameters, **kwargs)

object_name.delete(resource_group_name, namespace_name, event_hub_name, **kwargs)

您还可以发现Source code for azure.mgmt.eventhub.v2018_01_01_preview.operations.event)

相关问题 更多 >