我正在尝试编写一个Pulumi提供程序,它可以自动在Fivetran中创建资源。我从user对象开始,尝试调用invite user API。在执行pulumi up
时,我收到以下错误:
Diagnostics:
pulumi:pulumi:Stack (data-services-account-dev):
C:\Python39\lib\site-packages\grpc\_server.py:448: RuntimeWarning: coroutine 'invoke.<locals>.do_rpc' was never awaited
return None, False
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
error: update failed
pulumi-python:dynamic:Resource (dev-slack-bot):
error: Exception calling application: There is no current event loop in thread 'ThreadPoolExecutor-0_0'.
Resources:
1 unchanged
我正在使用python请求库对Fivetran进行API调用,似乎是request.post
调用触发了异常
以下是fivetran_pulumi.py中的代码:
from pulumi import ComponentResource, export, Input, Output
from pulumi.dynamic import Resource, ResourceProvider, CreateResult
from typing import Optional
import requests
from requests.auth import HTTPBasicAuth
class FivetranUserArgs(object):
given_name: Input[str]
family_name: Input[str]
email: Input[str]
phone: Optional[Input[str]]
picture: Optional[Input[str]]
role: Input[str]
def __init__(self, given_name, family_name, email, phone=None, picture=None, role="ReadOnly"):
self.given_name = given_name
self.family_name = family_name
self.email = email
self.phone = phone
self.picture = picture
self.role = role
class FivetranUserProvider(ResourceProvider):
def _auth(self):
return HTTPBasicAuth("my-key", "my-secret")
def create(self, props):
payload = {
"given_name": props["given_name"],
"family_name": props["family_name"],
"email": props["email"],
"phone": props["phone"] if "phone" in props else None,
"picture": props["picture"] if "picture" in props else None,
"role": props["role"]
}
url = "https://api.fivetran.com/v1/users"
response = requests.post(url=url,auth=self._auth(),json=payload).json()
return CreateResult(response["data"]["id"], {**props, **response.raw_data})
class FivetranUser(Resource):
given_name: Output[str]
family_name: Output[str]
email: Output[str]
phone: Output[str]
picture: Output[str]
role: Output[str]
def __init__(self, name, args: FivetranUserArgs, opts = None):
full_args = {'name': None, 'given_name':None, 'family_name':None, 'email':None, 'phone':None, 'picture':None, 'role':None, **vars(args)}
super().__init__(FivetranUserProvider(), name, full_args, opts)
这由main.py中的代码调用:
from fivetran_pulumi import FivetranUser as User, FivetranUserArgs as UserArgs
import json
import pulumi
import pulumi_aws as aws
stack = pulumi.get_stack()
slack_secret = aws.secretsmanager.get_secret_version("eda/slack").secret_string
slack_email = json.loads(slack_secret)["monitoring-channel-email"]
slack_bot_user = User(
stack + "-slack-bot",
UserArgs(
given_name = 'slack',
family_name = 'bot',
email = slack_email,
role = "ReadOnly"))
我使用pulumi up
执行此操作
我在跑步:
目前没有回答
相关问题 更多 >
编程相关推荐