如何更快地从python中的S3存储桶中读取.txt数千个文件

2024-06-28 21:00:03 发布

您现在位置:Python中文网/ 问答频道 /正文

请帮助我找到一个解决方案,以高效的方式从python中的S3存储桶中读取多个文件。我正在做一个ETL,它从S3存储桶中读取日志文件,相应地进行转换,并将其加载到Redshift中。在一个小时内,日志文件可以生成数千个,从最少1600个文件开始,最多可以生成2500个文件。每个文件大小约为5MB。我使用下面的代码从S3存储桶中读取要附加到Pandas数据帧中的文件(一小时的数据):

self.query_date1_d = '2020-09-01 10:00:00'
self.query_date2_d = '2020-09-01 11:00:00'
self.filter_date1 = '2020-09-01'

 '''Creating a PageIterator from the Paginator to load objects from S3 bucket'''
        self.page_iterator = self.paginator.paginate(**self.operation_parameters)

        self.jsondf = pd.DataFrame()
        self.s3_files = []
        for self.page in self.page_iterator:
            for self.s3_objects in self.page['Contents']:
                if self.filter_date1 in self.s3_objects['Key']:
                    self.file_name = self.s3_objects['Key']
                    self.last_modified_date = self.s3_objects['LastModified']
                    self.object_date = self.last_modified_date.strftime('%Y-%m-%d %H:%M:%S')
                    self.object_date = datetime.strptime(self.object_date, '%Y-%m-%d %H:%M:%S')
                    if self.query_date1_d <= self.object_date < self.query_date2_d:
                        self.s3_files.append(self.file_name)
                        print(self.file_name + ' :: ' + str(self.object_date))
                        self.obj = self.client.get_object(Bucket=self.my_bucket, Key=self.file_name)
                        self.initial_df = pd.read_json(self.obj['Body'], lines=True)
                        self.jsondf = self.jsondf.append(self.initial_df, ignore_index=True, sort=True)
        return self.jsondf

但是仅仅从S3读取文件并将其附加到Pandas数据帧中就需要2个多小时。有没有快速读取文件的有效方法?目前,bucket已经包含大约20000个文件。 请帮我做这个

以下是JSON示例:

{
    "env": "production",
    "event_type": "RESPONSE",
    "log_type": "rest-api",
    "method": "GET",
    "user_id": "wcel",
    "mid": "wcel",
    "request_uri": "",
    "app_name": "DF",
    "ecs": {
        "version": "1.0.0"
    },
    "@timestamp": "2020-08-27T06:27:11.293Z",
    "tags": ["Some", "DF", "out-S3", "out-elastic-search"],
    "ip_address": "00.00.00.00",
    "log_time": "2020-08-27 06:27:11",
    "payload": {
        "Excel2Offer": [{
            "contractType": "SIB",
            "prodName": "Olivier",
            "price": 1800.0,
            "qty": 1,
            "marketPrice": null,
            "marketPriceDate": null,
            "contractId": 807225,
            "unitSize": "2",
            "lastTradeDate": null,
            "code": "2154",
            "lastChangeOn": 548798,
            "lastListPrice": null,
            "wUrl": "/n11=10197192014",
            "yourId": null,
            "specialInfo": null,
            "orderGuid": "2f75132e-6ff0-498c-978d-f57bc5953ec1",
            "xCode": null,
            "lastListDate": null,
            "lastTradePrice": null,
            "year": "2014",
            "region": "south"
        }],
        "Excel2Offer1": [{
            "contractType": "SIB",
            "prodName": "Olivier",
            "price": 1800.0,
            "qty": 1,
            "marketPrice": null,
            "marketPriceDate": null,
            "contractId": 807225,
            "unitSize": "2",
            "lastTradeDate": null,
            "code": "2154",
            "lastChangeOn": 548798,
            "lastListPrice": null,
            "wUrl": "/n11=10197192014",
            "yourId": null,
            "specialInfo": null,
            "orderGuid": "2f75132e-6ff0-498c-978d-f57bc5953ec1",
            "xCode": null,
            "lastListDate": null,
            "lastTradePrice": null,
            "year": "2014",
            "region": "south"
        }]
    },
    "relation_id": "5487we-asd4-87we-65qw-54a2154qw"
}

多谢各位


Tags: 文件nameselfdateobjectsobjects3page