我有一个ParDo类,它检查pubsub消息是否包含某些属性,并返回有效和无效的TaggedOutput(在正常流中工作正常,使用yield返回值),我无法对这个类进行单元测试,我试图提供一个虚拟消息(字典复制pubsub消息信息)我想检查类的输出是否包含其他属性
这就是我到目前为止所做的:
class TestValidateMessage(unittest.TestCase):
def test_not_valid(self):
with TestPipeline() as p:
pcoll = (
p
| beam.Create([{'attributes':{"imageUrl":000}}])
| beam.ParDo(ValidateMessage()).with_outputs(
'invalid', main='valid'))
valid, _ = pcoll
invalid = pcoll['invalid']
print(invalid)
assert_that(invalid, {'failure_step':'Message validation'})
通过此操作,我收到一条错误消息:
TypeError: Map can be used only with callable objects. Received {'failure_step'} instead.
当我尝试打印时(无效),我得到PCollection[ParDo(ValidateMessage)/ParDo(ValidateMessage).invalid]
如何访问PCollection的内容(用于断言目的)
TLDR
这段代码有三个问题:
更正代码:
长篇大论
在apachebeam中,您必须在管道中测试代码
Beam为您从PCollection构建管道执行断言。 看起来很复杂,但看起来很简单
如果您有这样的管道:
必须在with子句中添加assert_that,因为assert_that()将在管道中添加执行断言的代码
这与执行此操作相同:
当您有多个输出时,情况类似:
也许您要检查其他输出是否为空:
在这种情况下,您需要向assert_that()添加一个标签,以确保它能够生成正确的管道
相关问题 更多 >
编程相关推荐