AWS Lambda函数未执行所有Python代码,但也未接收到错误

2024-10-02 00:26:40 发布

您现在位置:Python中文网/ 问答频道 /正文

首先,感谢大家对我的支持,我觉得我在这方面已经接近极限了。但我还有一个关于AWS Lambda的问题

我正在编写一个Lambda函数,它处理一个接收桶中的csv,然后为每个不同的转换将四个不同的已处理文件返回到不同的目标桶

我没有收到任何错误消息,文件被发送到正确标记的目标存储桶;但是,大多数代码都没有执行,因此几乎没有任何转换发生

编辑 例外情况是函数“newstage”,它创建了一个新的列/功能工程,并将其应用于Lambda函数本身,第一次输出中的所有内容。在创建一个新列的同时,它还将零值更改为NaN,并注意到这也已完成

因此,它似乎在不断地从df1数据中删除输出,问题可能在于df1.to_csv(csv_buffer)而不是在下载到后续存储桶时根据新的对象/数据子集更新到df2.to_csv... df3.to_csv...

调用代码的其余部分并让它执行相应的转换,我缺少了什么或需要更改什么

#trying a different approach to importing data into Lambad
import boto3
import io 
import numpy as np
import pandas as pd
import datetime 
from datetime import datetime
from io import StringIO

s3_client = boto3.client("s3")
s3_resource = boto3.resource('s3')
csv_buffer = StringIO()

#creating the export buckets
bucket1 = 'random-processed-salestotal'
bucket2 = 'random-processed-winlossgrouped'
bucket3 = 'random-employees-stagetotal'
bucket4 = 'random-employees-salespivot'

csv_file_1 = 'total_sales_' + str(datetime.now().strftime('%Y_%m_%d_%H_%M_%S')) + '.csv'
csv_file_2 = 'stages_grouped_' + str(datetime.now().strftime('%Y_%m_%d_%H_%M_%S')) + '.csv'
csv_file_3 = 'employee_stage_totals_' + str(datetime.now().strftime('%Y_%m_%d_%H_%M_%S')) + '.csv'
csv_file_4 = 'employee_pivot_' + str(datetime.now().strftime('%Y_%m_%d_%H_%M_%S')) + '.csv'

def newstage(x):
    """Creating a function that simplifies and condenses the Stage column into 3 types"""
    if x in 'Closed Won':
         return "won"
    elif x in list(['01 Pending','01 Qualified Opportunity','02 Consideration','02 Proposal','03 Negotiation','03 Proposal','04 Contracts','04 Validation',
    '05 Negotiation','06 Contracts','Hold - Re-engage','Hold - Stalled']):
        return "in pipeline"
    else:
        return "lost"


def featureengineering(event, context):
    bucket_name = event['Records'][0]['s3']['bucket']['name']
    s3_file_name =  event['Records'][0]['s3']['object']['key']
    read_file = s3_client.get_object(Bucket=bucket_name,Key=s3_file_name)
   
    #turning the CSV into a dataframe in AWS Lambda
    s3_data = io.BytesIO(read_file.get('Body').read())
    df = pd.read_csv(s3_data, encoding="ISO-8859-1")

    #replacing erroneous zero values to nan (missing) which is more accurate and a general table,
    #and creating a new column with just three stages instead for simplification
    df[['Company Annual Revenue','Number of Employees']] = df[['Company Annual Revenue','Number of Employees']].replace(0,np.nan)
    df['WonLossProgress'] = df.Stage.apply(newstage)
    df1 = df
    df1.to_csv(csv_buffer)
    s3_resource.Object(bucket1, csv_file_1).put(Body=csv_buffer.getvalue())


    #creating a groupby of the 3 major pipeline components and exporting that
    df2 = pd.DataFrame(df.groupby('WonLossProgress')['Opportunity Amount'].agg([np.sum, np.mean, np.std, np.ma.count])).astype(int)
    df2.to_csv(csv_buffer)
    s3_resource.Object(bucket2, csv_file_2).put(Body=csv_buffer.getvalue())

    #creating export for employee performance
    EmployeesOutcome = pd.crosstab(df.WonLossProgress, df['Sales Person'], dropna=False)
    EmployeesOutcome = EmployeesOutcome.T
    EmployeesOutcome.reset_index(inplace = True)
    EmployeesOutcome['Win/Loss Ratio'] = EmployeesOutcome.won / EmployeesOutcome.lost
    EmployeesOutcome.sort_values('in pipeline', ascending=False)
    df3 = EmployeesOutcome
    df3.to_csv(csv_buffer)
    s3_resource.Object(bucket3, csv_file_3).put(Body=csv_buffer.getvalue())

    #creating a pivot table of opportunity amount per stage per employee
    table = pd.pivot_table(df, values='Opportunity Amount', index=['Sales Person'], columns=['WonLossProgress'], aggfunc=np.sum).sort_values('in pipeline', ascending=False)
    table[['in pipeline','lost','won']] = table[['in pipeline','lost','won']].replace(np.nan,0)
    df4 = EmployeesOutcome
    df4.to_csv(csv_buffer)
    s3_resource.Object(bucket4, csv_file_4).put(Body=csv_buffer.getvalue())

编辑这将修复它。虽然不漂亮,但它很管用

s3_client = boto3.client("s3")
s3_resource = boto3.resource('s3')

csv_buffer1 = StringIO()
csv_buffer2 = StringIO()
csv_buffer3 = StringIO()
csv_buffer4 = StringIO()

#creating the export buckets
bucket1 = 'proficio-processed-salestotal'
bucket2 = 'proficio-processed-winlossgrouped'
bucket3 = 'proficio-employees-stagetotal'
bucket4 = 'proficio-employees-salespivot'

csv_file_1 = 'total_sales_' + str(datetime.now().strftime('%Y_%m_%d_%H_%M_%S')) + '.csv'
csv_file_2 = 'stages_grouped_' + str(datetime.now().strftime('%Y_%m_%d_%H_%M_%S')) + '.csv'
csv_file_3 = 'employee_stage_totals_' + str(datetime.now().strftime('%Y_%m_%d_%H_%M_%S')) + '.csv'
csv_file_4 = 'employee_pivot_' + str(datetime.now().strftime('%Y_%m_%d_%H_%M_%S')) + '.csv'

def newstage(x):
    """Creating a function that simplifies and condenses the Stage column into 3 types"""
    if x in 'Closed Won':
         return "won"
    elif x in list(['01 Pending','01 Qualified Opportunity','02 Consideration','02 Proposal','03 Negotiation','03 Proposal','04 Contracts','04 Validation',
    '05 Negotiation','06 Contracts','Hold - Re-engage','Hold - Stalled']):
        return "in pipeline"
    else:
        return "lost"


def featureengineering(event, context):

如前所述,在函数外部实例化它是有效的,我单独做了它们,只是为了确定。这使得他们将数据子集放入各自的存储桶中


Tags: csvtoinimportdfdatetimes3buffer

热门问题