首先,感谢大家对我的支持,我觉得我在这方面已经接近极限了。但我还有一个关于AWS Lambda的问题



编辑 例外情况是函数“newstage”,它创建了一个新的列/功能工程,并将其应用于Lambda函数本身,第一次输出中的所有内容。在创建一个新列的同时,它还将零值更改为NaN,并注意到这也已完成

因此,它似乎在不断地从df1数据中删除输出,问题可能在于df1.to_csv(csv_buffer)而不是在下载到后续存储桶时根据新的对象/数据子集更新到df2.to_csv... df3.to_csv...


#trying a different approach to importing data into Lambad
import boto3
import io 
import numpy as np
import pandas as pd
import datetime 
from datetime import datetime
from io import StringIO

s3_client = boto3.client("s3")
s3_resource = boto3.resource('s3')
csv_buffer = StringIO()

#creating the export buckets
bucket1 = 'random-processed-salestotal'
bucket2 = 'random-processed-winlossgrouped'
bucket3 = 'random-employees-stagetotal'
bucket4 = 'random-employees-salespivot'

csv_file_1 = 'total_sales_' + str('%Y_%m_%d_%H_%M_%S')) + '.csv'
csv_file_2 = 'stages_grouped_' + str('%Y_%m_%d_%H_%M_%S')) + '.csv'
csv_file_3 = 'employee_stage_totals_' + str('%Y_%m_%d_%H_%M_%S')) + '.csv'
csv_file_4 = 'employee_pivot_' + str('%Y_%m_%d_%H_%M_%S')) + '.csv'

def newstage(x):
    """Creating a function that simplifies and condenses the Stage column into 3 types"""
    if x in 'Closed Won':
         return "won"
    elif x in list(['01 Pending','01 Qualified Opportunity','02 Consideration','02 Proposal','03 Negotiation','03 Proposal','04 Contracts','04 Validation',
    '05 Negotiation','06 Contracts','Hold - Re-engage','Hold - Stalled']):
        return "in pipeline"
        return "lost"

def featureengineering(event, context):
    bucket_name = event['Records'][0]['s3']['bucket']['name']
    s3_file_name =  event['Records'][0]['s3']['object']['key']
    read_file = s3_client.get_object(Bucket=bucket_name,Key=s3_file_name)
    #turning the CSV into a dataframe in AWS Lambda
    s3_data = io.BytesIO(read_file.get('Body').read())
    df = pd.read_csv(s3_data, encoding="ISO-8859-1")

    #replacing erroneous zero values to nan (missing) which is more accurate and a general table,
    #and creating a new column with just three stages instead for simplification
    df[['Company Annual Revenue','Number of Employees']] = df[['Company Annual Revenue','Number of Employees']].replace(0,np.nan)
    df['WonLossProgress'] = df.Stage.apply(newstage)
    df1 = df
    s3_resource.Object(bucket1, csv_file_1).put(Body=csv_buffer.getvalue())

    #creating a groupby of the 3 major pipeline components and exporting that
    df2 = pd.DataFrame(df.groupby('WonLossProgress')['Opportunity Amount'].agg([np.sum, np.mean, np.std,])).astype(int)
    s3_resource.Object(bucket2, csv_file_2).put(Body=csv_buffer.getvalue())

    #creating export for employee performance
    EmployeesOutcome = pd.crosstab(df.WonLossProgress, df['Sales Person'], dropna=False)
    EmployeesOutcome = EmployeesOutcome.T
    EmployeesOutcome.reset_index(inplace = True)
    EmployeesOutcome['Win/Loss Ratio'] = EmployeesOutcome.won / EmployeesOutcome.lost
    EmployeesOutcome.sort_values('in pipeline', ascending=False)
    df3 = EmployeesOutcome
    s3_resource.Object(bucket3, csv_file_3).put(Body=csv_buffer.getvalue())

    #creating a pivot table of opportunity amount per stage per employee
    table = pd.pivot_table(df, values='Opportunity Amount', index=['Sales Person'], columns=['WonLossProgress'], aggfunc=np.sum).sort_values('in pipeline', ascending=False)
    table[['in pipeline','lost','won']] = table[['in pipeline','lost','won']].replace(np.nan,0)
    df4 = EmployeesOutcome
    s3_resource.Object(bucket4, csv_file_4).put(Body=csv_buffer.getvalue())


