我正在使用Luigi启动AWS批处理作业。
我想创建luigi.contrib.batch.BatchTask
的子类(Luigi for AWS批处理的文档(依赖于boto3)可以在这里找到:https://luigi.readthedocs.io/en/stable/_modules/luigi/contrib/batch.html#BatchTask)
类批处理任务:
class BatchTask(luigi.Task):
"""
Base class for an Amazon Batch job
Amazon Batch requires you to register "job definitions", which are JSON
descriptions for how to issue the ``docker run`` command. This Luigi Task
requires a pre-registered Batch jobDefinition name passed as a Parameter
:param job_definition (str): name of pre-registered jobDefinition
:param job_name: name of specific job, for tracking in the queue and logs.
:param job_queue: name of job queue where job is going to be submitted.
"""
job_definition = luigi.Parameter()
job_name = luigi.OptionalParameter(default=None)
job_queue = luigi.OptionalParameter(default=None)
poll_time = luigi.IntParameter(default=POLL_TIME)
def run(self):
bc = BatchClient(self.poll_time)
job_id = bc.submit_job(
self.job_definition,
self.parameters,
job_name=self.job_name,
queue=self.job_queue)
bc.wait_on_job(job_id)
@property
def parameters(self):
"""Override to return a dict of parameters for the Batch Task"""
return {}
在我的子类中,我希望重写parameters函数,以便可以使用代码传递我自己的参数,而不是依赖AWS webinterface手动填充参数。这个parameters函数在函数上方有@property
标记,这表明它是一个decorator——我不太熟悉
我认为我的问题与上述问题无关,是:
如何重写parameters函数,使其不返回空字典,而是返回我定义的字典
书面子类:
import luigi.contrib.batch as batch
batch_job_revision_number=1
class SubclassLuigiBatchTask(batch.BatchTask):
job_definition='arn:aws:batch:{0}:job-definition/{1}:{2}'.format(
'aws-credentials-that-i-cannot-share',
'aws_pre-registered-job-description-name',
batch_job_revision_number)
job_name='my_example_job'
job_queue='my_example_queue'
poll_time = 10
task = batch.BatchTask(
job_definition='arn:aws:batch:{0}:job-definition/{1}:{2}'.format(
'aws-credentials-that-i-cannot-share',
'aws_pre-registered-job_description',
batch_job_revision_number),
job_name='my_example_job',
job_queue='my_example_queue',
poll_time=10
)
@task.parameters
def parameters(self):
return {
"job_definition": "df -h"
}
# This run function runs the predefined job (job definition)
def run(self):
self.task.run()
# function only runs if output doesn't exist yet
def output(self):
return LocalTarget('/home/some_user/task_completed.txt')
上面的示例返回错误:
TypeError: 'dict' object is not callable
如何访问setter函数以覆盖此默认值return {}
?
我在命令行上尝试查看哪些函数可供task.parameters
使用,以查看哪些方法可用:
>>> dir(task.parameters)
['__class__', '__contains__', '__delattr__', '__delitem__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__getitem__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__iter__', '__le__', '__len__', '__lt__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__setitem__', '__sizeof__', '__str__', '__subclasshook__', 'clear', 'copy', 'fromkeys', 'get', 'items', 'keys', 'pop', 'popitem', 'setdefault', 'update', 'values']
而不是
您需要执行以下操作:
用decorator重写属性还不错。请看以下示例:
这将打印出:
我认为加上那个奇怪的装饰师可能会把事情搞砸。可能真的想在自己身上重现?非常奇怪的东西
相关问题 更多 >
编程相关推荐