如果我预测一些显示服务器工作负载的数据,ARIMA甚至不能预测一个模拟模式

2024-06-26 14:21:53 发布

您现在位置:Python中文网/ 问答频道 /正文

我想用ARIMA预测一些数据。当我想预测一些测试数据时,一切都是可行的,但是ARIMA模型甚至不是很接近,而是拟合直线或者只是一些二次函数。我试过很多不同的p,d,q值。没有任何帮助。现在我只是想知道,为什么代码不起作用。有人知道哪里会失败吗? csv文件在这里:https://www.dropbox.com/s/3s0touu0bo3hw2o/ivwb2051.csv?dl=0 这里是天气预报的图片:https://www.dropbox.com/s/0v6wdtselrspqd8/Capture.PNG?dl=0


import pandas as pd
from pandas import datetime
import matplotlib.pyplot as plt
from pandas.plotting import _converter
from datetime import timedelta as delta
import numpy as np
import statsmodels.api as sm
import warnings
from statsmodels.tsa.arima_model import ARIMA
import itertools
from statsmodels.graphics.tsaplots import plot_acf
from sklearn.metrics import mean_squared_error

# https://www.kaggle.com/poiupoiu/how-to-use-sarimax
# using scipy version 1.2.0, because of incompabilities with statsmodels 0.9.0
_converter.register()


def parser(x):
    return datetime.strptime(x, '%Y%m%d %H:%M:%S')


'''------------------------------------------------------------------------------------------------------------ '''
'''-------------------------------------------ALL COLUMNS------------------------------------------------------ '''
'''------------------------------------------------------------------------------------------------------------ '''

my_dataframe = ['Unnamed: 0', 'server', 'MemoryUsedPercent', 'TotalVisibleMemorySize', 'FreePhysicalMemory',
                '\\logicaldisk(c:)\\% free space', '\\logicaldisk(c:)\\free megabytes',
                '\\logicaldisk(d:)\\% free space', '\\logicaldisk(d:)\\free megabytes',
                '\\logicaldisk(e:)\\% free space', '\\logicaldisk(e:)\\free megabytes',
                '\\logicaldisk(f:)\\% free space', '\\logicaldisk(f:)\\free megabytes',
                '\\processor(_total)\\% processor time']

'''------------------------------------------------------------------------------------------------------------ '''
'''---------------------------------GET DATA FROM CSV AND PARSE------------------------------------------------ '''
'''------------------------------------------------------------------------------------------------------------ '''

data = pd.read_csv('ivwb2051.csv', index_col=1, parse_dates=[1], date_parser=parser)
data.index = data.index.map(lambda x: x.replace(second=0))

arimaColumn = 2

data = data.drop(my_dataframe[0:arimaColumn], axis=1)
data = data.drop(my_dataframe[(arimaColumn + 1):], axis=1)

column = data[my_dataframe[arimaColumn]]
column_str = my_dataframe[arimaColumn]

'''------------------------------------------------------------------------------------------------------------ '''
'''---------------------------------FULFILL NON FREQUENT ELEMENTS---------------------------------------------- '''
'''------------------------------------------------------------------------------------------------------------ '''

update_series = delta(minutes=10)

for n in range(0, len(data)):
    if np.isnan(data[my_dataframe[arimaColumn]][n]) or data[my_dataframe[arimaColumn]][n] == 0:
        data[my_dataframe[arimaColumn]][n] = data[my_dataframe[arimaColumn]][n - 1]


def insert_row(row_number, df, date, row_value):
    df1 = df[0:row_number]
    df2 = df[row_number:]
    new = pd.DataFrame(
        {'date': [date], column_str: [row_value]})
    new.set_index('date', inplace=True)
    out = df1.append(new).append(df2)
    return out


def delete_row(row_number, df):
    df1 = df[0:row_number]
    df2 = df[row_number + 1:]
    out = df1.append(df2)
    return out


def change_date(row_number, df, newdate):
    row_value = data[my_dataframe[arimaColumn]][row_number]
    out = delete_row(row_number, df)
    out = insert_row(row_number, out, newdate, row_value)
    return out


# Fulfill timestamps, that haven't been sent, to make the series frequent
for n in range(1, len(data)):
    # Wenn Daten im Abstand von x*10min fehlen, neue Zeile(n) mit dem Wert des vorherigen
    if (((data.index[n] - data.index[n - 1]) != update_series)
            and ((data.index[n] - data.index[n - 1]) % update_series == delta(minutes=0))):
        while (data.index[n] - data.index[n - 1]) >= 2 * update_series:
            data = insert_row(n, data, data.index[n] - update_series, data[my_dataframe[arimaColumn]][n - 1])
    # Wenn Daten in anderem Abstand fehlen, runde ab und lösche falls gleich n-1 sont fülle den Rest bis n-1 im
    # richtigen Intervall auf.
    elif (data.index[n] - data.index[n - 1]) % update_series != delta(minutes=0):
        if data.index[n] - ((data.index[n] - data.index[n - 1]) % update_series) != data.index[n - 1]:
            data = change_date(n, data, data.index[n] - ((data.index[n] - data.index[n - 1]) % update_series))
        while (data.index[n] - data.index[n - 1]) >= 2 * update_series:
            data = insert_row(n, data, data.index[n] - update_series, data[my_dataframe[arimaColumn]][n - 1])


print('Your Series has now an static frequency!\n')

'''-------------------------------------------------------------------------------------------------------------'''
'''------------------------------------SEASONAL DECOMPOSE ------------------------------------------------------'''
'''-------------------------------------------------------------------------------------------------------------'''

frequency = 4320  # monthly
frequency = 6*24

res = sm.tsa.seasonal_decompose(data.dropna(), freq=frequency)
fig = res.plot()
fig.set_figheight(8)
fig.set_figwidth(15)
plt.show()

'''-------------------------------------------------------------------------------------------------------------'''
'''-----------------------------------------------TRAIN/ TEST---------------------------------------------------'''
'''-------------------------------------------------------------------------------------------------------------'''

# Ab 3645/ 31.03
Y_length = data.shape[0] - 3502
testingPercentage = 0.8

boundForTest = int(Y_length * testingPercentage) + 3502

tr_start, tr_end = data.index[3502], data.index[boundForTest]
te_start, te_end = data.index[boundForTest+1], data.index[data.shape[0]-1]

train = data[column_str][tr_start:tr_end].dropna()
test = data[column_str][te_start:te_end].dropna()

'''------------------------------------------------------------------------------------------------------------ '''
'''-----------------------------------------TESTING STATIONARITY----------------------------------------------- '''
'''------------------------------------------------------------------------------------------------------------ '''

from statsmodels.tsa.stattools import adfuller


def test_stationarity(timeseries):
    # Determing rolling statistics
    rolmean = timeseries.rolling(frequency + 1).mean()
    rolstd = timeseries.rolling(frequency + 1).std()

    # Plot rolling statistics:
    fig = plt.figure(figsize=(12, 8))
    orig = plt.plot(timeseries, color='blue', label='Original')
    mean = plt.plot(rolmean, color='red', label='Rolling Mean')
    std = plt.plot(rolstd, color='black', label='Rolling Std')
    plt.legend(loc='best')
    plt.title('Rolling Mean & Standard Deviation')
    plt.show()

    # Perform Dickey-Fuller test:
    print('Results of Dickey-Fuller Test:')
    dftest = adfuller(timeseries, autolag='AIC')
    dfoutput = pd.Series(dftest[0:4], index=['Test Statistic', 'p-value', '#Lags Used', 'Number of Observations Used'])
    for key, value in dftest[4].items():
        dfoutput['Critical Value (%s)' % key] = value
    print(dfoutput)


'''-------------------------------------------------------------------------------------------------------------'''
'''---------------------------------------------ADF-TESTING-----------------------------------------------------'''
'''-------------------------------------------------------------------------------------------------------------'''

test_stationarity(data[my_dataframe[arimaColumn]])

'''
data['first_difference'] = data[my_dataframe[arimaColumn]] - data[my_dataframe[arimaColumn]].shift(1)
test_stationarity(data.first_difference.dropna(inplace=False))

data['seasonal_difference'] = data[my_dataframe[arimaColumn]] - data[my_dataframe[arimaColumn]].shift(frequency)
test_stationarity(data.seasonal_difference.dropna(inplace=False))

data['seasonal_first_difference'] = data.first_difference - data.first_difference.shift(frequency)
test_stationarity(data.seasonal_first_difference.dropna(inplace=False))
'''

fig, ax = plt.subplots(2, 1, figsize=(20, 10))
fig = sm.graphics.tsa.plot_acf(train.diff().dropna(), lags=50, ax=ax[0])
fig = sm.graphics.tsa.plot_pacf(train.diff().dropna(), lags=50, ax=ax[1])
plt.show()

'''-------------------------------------------------------------------------------------------------------------'''
'''---------------------------------------------ARIMA-----------------------------------------------------------'''
'''-------------------------------------------------------------------------------------------------------------'''


warnings.filterwarnings('ignore')

resDiff = sm.tsa.arma_order_select_ic(train, max_ar=2, max_ma=2, ic='aic', trend='c')
print('ARMA(p,q) =', resDiff['aic_min_order'], 'is the best.')

arima = sm.tsa.statespace.SARIMAX(train, order=(4, 2, 3), freq=pd.DateOffset(minutes=10), seasonal_order=(0, 0, 0, 0),
                                  enforce_stationarity=False, enforce_invertibility=False, ).fit()

arima.summary()
print(arima.summary())


from sklearn.metrics import mean_squared_error
pred = arima.predict(tr_end, te_end)[1:]
print('ARIMA model MSE:{}'.format(mean_squared_error(test, pred)))
print(pred)

pd.DataFrame({'test': test, 'pred': pred}).plot()
plt.show()



输出图/预测甚至没有完全错误


Tags: fromtestimportnumberdataframedataindexplot