apachebeam-Python单元测试带有_输出的ParDo类

2024-09-29 21:30:41 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个ParDo类,它检查pubsub消息是否包含某些属性,并返回有效和无效的TaggedOutput(在正常流中工作正常,使用yield返回值),我无法对这个类进行单元测试,我试图提供一个虚拟消息(字典复制pubsub消息信息)我想检查类的输出是否包含其他属性

这就是我到目前为止所做的:

class TestValidateMessage(unittest.TestCase):

def test_not_valid(self):
    with TestPipeline() as p:
        pcoll = (
                p
                | beam.Create([{'attributes':{"imageUrl":000}}])
                | beam.ParDo(ValidateMessage()).with_outputs(
        'invalid', main='valid'))
    valid, _ = pcoll
    invalid = pcoll['invalid']
    print(invalid)
    assert_that(invalid, {'failure_step':'Message validation'})

通过此操作,我收到一条错误消息:

TypeError: Map can be used only with callable objects. Received {'failure_step'} instead.

当我尝试打印时(无效),我得到PCollection[ParDo(ValidateMessage)/ParDo(ValidateMessage).invalid] 如何访问PCollection的内容(用于断言目的)


Tags: 消息属性failurestepwithpubsubbeamyield
1条回答
网友
1楼 · 发布于 2024-09-29 21:30:41

TLDR

这段代码有三个问题:

  • 在运行管道之后执行的assert_()
  • 必须在assert_that()中使用equal_to()、is_empty()或is_not_empty()作为条件
  • 在管道的末尾,您将得到一个数组。始终根据[预期结果]进行测试

更正代码:

class TestValidateMessage(unittest.TestCase):

def test_not_valid(self):
    with TestPipeline() as p:
        pcoll = (
                p
                | beam.Create([{'attributes':{"imageUrl":000}}])
                | beam.ParDo(ValidateMessage()).with_outputs(
        'invalid', main='valid'))

        assert_that(pcoll.invalid, 
                    equal_to([ {'failure_step':'Message validation'} ] )

长篇大论

在apachebeam中,您必须在管道中测试代码

Beam为您从PCollection构建管道执行断言。 看起来很复杂,但看起来很简单

如果您有这样的管道:

with TestPipeline() as p:
   pcoll = p | Beam.Create( testdata ) | Beam.DoFn(I_Want_To_Test_This())

必须在with子句中添加assert_that,因为assert_that()将在管道中添加执行断言的代码

with TestPipeline() as p:
   pcoll = p | Beam.Create( testdata ) | Beam.ParDo(I_Want_To_Test_This_DoFn())
   assert_that(pcoll, equal_to(expected_data) )

这与执行此操作相同:

P = TestPipeline()
pcoll = p | Beam.Create( testdata ) | Beam.ParDo(I_Want_To_Test_This_DoFn())
assert_that(pcoll, equal_to(expected_data) )
p.run()  # Test must be run _inside_ the pipeline

当您有多个输出时,情况类似:

with TestPipeline() as p:
   pcoll = (
       p 
       | Beam.Create( testdata ) 
       | Beam.ParDo(I_Want_To_Test_This_DoFn()).with_outputs('valid','invalid')
   )

   # You can test inside the pipeline with assert_that

   assert_that(pcoll.invalid, equal_to( [ {'failure step':'Message validation'} ] ))

也许您要检查其他输出是否为空:

from apache_beam.testing.util import assert_that, equal_to, is_empty, is_not_empty

...

with TestPipeline() as p:
   pcoll = (
       p 
       | Beam.Create( testdata ) 
       | Beam.ParDo(I_Want_To_Test_This_DoFn()).with_outputs('valid','invalid')
   )

   # You can test inside the pipeline with assert_that

   assert_that(pcoll.valid, is_empty() ), label='valid')

   assert_that(pcoll.invalid, equal_to( [ {'failure step':'Message validation'} ] ), label='invalid')

在这种情况下,您需要向assert_that()添加一个标签,以确保它能够生成正确的管道

相关问题 更多 >

    热门问题