如何将dictionary作为PCollection返回?

2024-09-29 21:30:34 发布

您现在位置:Python中文网/ 问答频道 /正文

我得到了Google云存储桶的URL。我必须:

  1. 使用URL获取该bucket中blob的列表

  2. 对于每个blob,我调用一些gcsapi来获取关于blob的信息(水滴大小, blob.名称等等)

  3. 对于每个blob,我还必须读取它,在其中找到一些内容并将其添加到从GCS API调用获得的值中

  4. 对于每个blob,我必须将步骤2和3中关于blob的值写入BigQuery

我有数千个blob,所以这需要用apachebeam来完成(我被推荐)

我对管道的想法是这样的:

获取UrlOfBucket并进行PCollection

使用该PCollection获取blob列表作为新PCollection

使用这些blob的元数据创建PCollection

执行一个转换,该转换将接受PCollection,该PCollection是元数据值的字典,进入blob,扫描一个值并返回一个新的PCollection,该PCollection是元数据值和这个新值的字典

把这个写到BigQuery。在

我很难再去查字典了

[+]我读到的:

https://beam.apache.org/documentation/programming-guide/#composite-transforms

https://medium.com/@rajeshhegde/data-pipeline-using-apache-beam-python-sdk-on-dataflow-6bb8550bf366

任何建议,特别是关于如何接受bucket名称并返回blob的PCollection的建议,都将受到极大的欢迎。在


Tags: 数据https名称url列表字典bucketapache
1条回答
网友
1楼 · 发布于 2024-09-29 21:30:34

我通过阅读更多关于apachebeam的文章来解决这个问题,并发现我必须使用ParDo函数在我的资源之间分配作业,在ParDo中,我调用DoFn函数,该函数接受一个元素,并完成它所需的所有处理并生成dic。 参考这篇文章Apache Beam: How To Simultaneously Create Many PCollections That Undergo Same PTransform?

    class ExtractMetadata(beam.DoFn):                                                                                                                                                                                                                                                  
def process(self, element):                                                                                                                                                                                                                                                    
    """                                                                                                                                                                                                                                                                        
    Takes in a blobName, fetches the blob and its values and returns a dictionary of values                                                                                                                                                                                    
    """                                                                                                                                                                                                                                                                        
    metadata = element.metadata                                                                                                                                                                                                                                                
    if metadata is not None:                                                                                                                                                                                                                                                   
        event_count = int(metadata['count'])                                                                                                                                                                                                                                   
    else:                                                                                                                                                                                                                                                                      
        event_count = None                                                                                                                                                                                                                                                     

    event_type = self.determine_event_type(element.id)                                                                                                                                                                                                                         
    cluster    = self.determine_cluster(element.id)                                                                                                                                                                                                                            
    customer   = self.determine_customer(element)                                                                                                                                                                                                                              
   # date = datetime.strptime(element.time_created, '%a, %d %b %Y %H:%M:%S')                                                                                                                                                                                                   
    #date = date.isoformat()                                                                                                                                                                                                                                                   
    dic = {                                                                                                                                                                                                                                                                    
        'blob_name'       : element.name,                                                                                                                                                                                                                                      
        'event_path'      : element.path,                                                                                                                                                                                                                                      
        'size'            : int(element.size),                                                                                                                                                                                                                                 
        'time_of_creation': element.time_created.isoformat(),                                                                                                                                                                                                                  
        'event_count'     : event_count,                                                                                                                                                                                                                                       
        'event_type'      : event_type,                                                                                                                                                                                                                                        
        'cluster'         : cluster,                                                                                                                                                                                                                                           
        'customer'        : customer                                                                                                                                                                                                                                           
    }                                                                                                                                                                                                                                                                          
    yield dic

相关问题 更多 >

    热门问题