使用 Transformer 模型進行時間序列預測實戰

作者: Kaan Aslan 

轉自:Deephub IMBA

時間序列預測是一個經久不衰的主題,受自然語言處理領域的成功啓發,transformer 模型也在時間序列預測有了很大的發展。本文可以作爲學習使用 Transformer 模型的時間序列預測的一個起點。


數據集

這裏我們直接使用 kaggle 中的 Store Sales — Time Series Forecasting 作爲數據。這個比賽需要預測 54 家商店中各種產品系列未來 16 天的銷售情況,總共創建 1782 個時間序列。數據從 2013 年 1 月 1 日至 2017 年 8 月 15 日,目標是預測接下來 16 天的銷售情況。雖然爲了簡潔起見,我們做了簡化處理,作爲模型的輸入包含 20 列中的 3,029,400 條數據,。每行的關鍵列爲'store_nbr'、'family'和'date'。數據分爲三類變量:

1、截止到最後一次訓練數據日期 (2017 年 8 月 15 日) 之前已知的與時間相關的變量。這些變量包括數字變量,如“銷售額”,表示某一產品系列在某家商店的銷售額;“transactions”,一家商店的交易總數;'store_sales',該商店的總銷售額;'family_sales'表示該產品系列的總銷售額。

2、訓練截止日期 (2017 年 8 月 31 日) 之前已知,包括 “onpromotion”(產品系列中促銷產品的數量) 和“dcoilwtico”等變量。這些數字列由'holiday'列補充,它表示假日或事件的存在,並被分類編碼爲整數。此外,'time_idx'、'week_day'、'month_day'、'month'和'year'列提供時間上下文,也編碼爲整數。雖然我們的模型是隻有編碼器的,但已經添加了 16 天移動值 “onpromotion” 和“dcoilwtico”,以便在沒有解碼器的情況下包含未來的信息。

3、靜態協變量隨着時間的推移保持不變,包括諸如 “store_nbr”、“family” 等標識符,以及 “city”、“state”、“type” 和“cluster”等分類變量(詳細說明了商店的特徵),所有這些變量都是整數編碼的。

我們最後生成的 df 名爲'data_all',結構如下:

 categorical_covariates = ['time_idx','week_day','month_day','month','year','holiday']
 
 categorical_covariates_num_embeddings = []
 for col in categorical_covariates:
     data_all[col] = data_all[col].astype('category').cat.codes
     categorical_covariates_num_embeddings.append(data_all[col].nunique())
 
 categorical_static = ['store_nbr','city','state','type','cluster','family_int']
 
 categorical_static_num_embeddings = []
 for col in categorical_static:
     data_all[col] = data_all[col].astype('category').cat.codes
     categorical_static_num_embeddings.append(data_all[col].nunique())
 
 numeric_covariates = ['sales','dcoilwtico','dcoilwtico_future','onpromotion','onpromotion_future','store_sales','transactions','family_sales']
 
 target_idx = np.where(np.array(numeric_covariates)=='sales')[0][0]

在將數據轉換爲適合我的 PyTorch 模型的張量之前,需要將其分爲訓練集和驗證集。窗口大小是一個重要的超參數,表示每個訓練樣本的序列長度。此外,'num_val'表示使用的驗證折數,在此上下文中設置爲 2。將 2013 年 1 月 1 日至 2017 年 6 月 28 日的觀測數據指定爲訓練數據集,以 2017 年 6 月 29 日至 2017 年 7 月 14 日和 2017 年 7 月 15 日至 2017 年 7 月 30 日作爲驗證區間。

同時還進行了數據的縮放,完整代碼如下:

 def dataframe_to_tensor(series,numeric_covariates,categorical_covariates,categorical_static,target_idx):
 
     numeric_cov_arr = np.array(series[numeric_covariates].values.tolist())
     category_cov_arr = np.array(series[categorical_covariates].values.tolist())
     static_cov_arr = np.array(series[categorical_static].values.tolist())
 
     x_numeric = torch.tensor(numeric_cov_arr,dtype=torch.float32).transpose(2,1)
     x_numeric = torch.log(x_numeric+1e-5)
     x_category = torch.tensor(category_cov_arr,dtype=torch.long).transpose(2,1)
     x_static = torch.tensor(static_cov_arr,dtype=torch.long)
     y = torch.tensor(numeric_cov_arr[:,target_idx,:],dtype=torch.float32)
 
     return x_numeric, x_category, x_static, y
 
 
 window_size = 16
 forecast_length = 16
 num_val = 2
 
 val_max_date = '2017-08-15'
 train_max_date = str((pd.to_datetime(val_max_date) - pd.Timedelta(days=window_size*num_val+forecast_length)).date())
 
 train_final = data_all[data_all['date']<=train_max_date]
 val_final = data_all[(data_all['date']>train_max_date)&(data_all['date']<=val_max_date)]
 
 train_series = train_final.groupby(categorical_static+['family']).agg(list).reset_index()
 val_series = val_final.groupby(categorical_static+['family']).agg(list).reset_index()
 
 x_numeric_train_tensor, x_category_train_tensor, x_static_train_tensor, y_train_tensor = dataframe_to_tensor(train_series,numeric_covariates,categorical_covariates,categorical_static,target_idx)
 
 x_numeric_val_tensor, x_category_val_tensor, x_static_val_tensor, y_val_tensor = dataframe_to_tensor(val_series,numeric_covariates,categorical_covariates,categorical_static,target_idx)

數據加載器

在數據加載時,需要將每個時間序列從窗口範圍內的隨機索引開始劃分爲時間塊,以確保模型暴露於不同的序列段。

爲了減少偏差還引入了一個額外的超參數設置,它不是隨機打亂數據,而是根據塊的開始時間對數據集進行排序。然後數據被分成五部分——反映了我們五年的數據集——每一部分都是內部打亂的,這樣最後一批數據將包括去年的觀察結果,但還是隨機的。模型的最終梯度更新受到最近一年的影響,理論上可以改善最近時期的預測。

 def divide_shuffle(df,div_num):
     space = df.shape[0]//div_num
     division = np.arange(0,df.shape[0],space)
     return pd.concat([df.iloc[division[i]:division[i]+space,:].sample(frac=1) for i in range(len(division))])
 
 def create_time_blocks(time_length,window_size):
     start_idx = np.random.randint(0,window_size-1)
     end_idx = time_length-window_size-16-1
     time_indices = np.arange(start_idx,end_idx+1,window_size)[:-1]
     time_indices = np.append(time_indices,end_idx)
     return time_indices
 
 def data_loader(x_numeric_tensor, x_category_tensor, x_static_tensor, y_tensor, batch_size, time_shuffle):
 
     num_series = x_numeric_tensor.shape[0]
     time_length = x_numeric_tensor.shape[1]
     index_pd = pd.DataFrame({'serie_idx':range(num_series)})
     index_pd['time_idx'] = [create_time_blocks(time_length,window_size) for n in range(index_pd.shape[0])]
     if time_shuffle:
         index_pd = index_pd.explode('time_idx')
         index_pd = index_pd.sample(frac=1)
     else:
         index_pd = index_pd.explode('time_idx').sort_values('time_idx')
         index_pd = divide_shuffle(index_pd,5)
     indices = np.array(index_pd).astype(int)
 
     for batch_idx in np.arange(0,indices.shape[0],batch_size):
 
         cur_indices = indices[batch_idx:batch_idx+batch_size,:]
 
         x_numeric = torch.stack([x_numeric_tensor[n[0],n[1]:n[1]+window_size,:] for n in cur_indices])
         x_category = torch.stack([x_category_tensor[n[0],n[1]:n[1]+window_size,:] for n in cur_indices])
         x_static = torch.stack([x_static_tensor[n[0],:] for n in cur_indices])
         y = torch.stack([y_tensor[n[0],n[1]+window_size:n[1]+window_size+forecast_length] for n in cur_indices])
 
         yield x_numeric.to(device), x_category.to(device), x_static.to(device), y.to(device)
 
 def val_loader(x_numeric_tensor, x_category_tensor, x_static_tensor, y_tensor, batch_size, num_val):
 
     num_time_series = x_numeric_tensor.shape[0]
 
     for i in range(num_val):
 
       for batch_idx in np.arange(0,num_time_series,batch_size):
 
           x_numeric = x_numeric_tensor[batch_idx:batch_idx+batch_size,window_size*i:window_size*(i+1),:]
           x_category = x_category_tensor[batch_idx:batch_idx+batch_size,window_size*i:window_size*(i+1),:]
           x_static = x_static_tensor[batch_idx:batch_idx+batch_size]
           y_val = y_tensor[batch_idx:batch_idx+batch_size,window_size*(i+1):window_size*(i+1)+forecast_length]
 
           yield x_numeric.to(device), x_category.to(device), x_static.to(device), y_val.to(device)

模型

我們這裏通過 Pytorch 來簡單的實現《Attention is All You Need》(2017)² 中描述的 Transformer 架構。因爲是時間序列預測,所以注意力機制中不需要因果關係,也就是沒有對注意塊應用進行遮蔽。

從輸入開始: 分類特徵通過嵌入層傳遞,以密集的形式表示它們,然後送到 Transformer 塊。多層感知器 (MLP) 接受最終編碼輸入來產生預測。嵌入維數、每個 Transformer 塊中的注意頭數和 dropout 概率是模型的主要超參數。堆疊多個 Transformer 塊由'num_blocks'超參數控制。

下面是單個 Transformer 塊的實現和整體預測模型:

 class transformer_block(nn.Module):
 
     def __init__(self,embed_size,num_heads):
         super(transformer_block, self).__init__()
 
         self.attention = nn.MultiheadAttention(embed_size, num_heads, batch_first=True)
         self.fc = nn.Sequential(nn.Linear(embed_size, 4 * embed_size),
                                  nn.LeakyReLU(),
                                  nn.Linear(4 * embed_size, embed_size))
         self.dropout = nn.Dropout(drop_prob)
         self.ln1 = nn.LayerNorm(embed_size, eps=1e-6)
         self.ln2 = nn.LayerNorm(embed_size, eps=1e-6)
 
     def forward(self, x):
 
         attn_out, _ = self.attention(x, x, x, need_weights=False)
         x = x + self.dropout(attn_out)
         x = self.ln1(x)
 
         fc_out = self.fc(x)
         x = x + self.dropout(fc_out)
         x = self.ln2(x)
 
         return x
 
 class transformer_forecaster(nn.Module):
 
     def __init__(self,embed_size,num_heads,num_blocks):
         super(transformer_forecaster, self).__init__()
 
         num_len = len(numeric_covariates)
         self.embedding_cov = nn.ModuleList([nn.Embedding(n,embed_size-num_len) for n in categorical_covariates_num_embeddings])
         self.embedding_static = nn.ModuleList([nn.Embedding(n,embed_size-num_len) for n in categorical_static_num_embeddings])
 
         self.blocks = nn.ModuleList([transformer_block(embed_size,num_heads) for n in range(num_blocks)])
 
         self.forecast_head = nn.Sequential(nn.Linear(embed_size, embed_size*2),
                                            nn.LeakyReLU(),
                                            nn.Dropout(drop_prob),
                                            nn.Linear(embed_size*2, embed_size*4),
                                            nn.LeakyReLU(),
                                            nn.Linear(embed_size*4, forecast_length),
                                            nn.ReLU())
 
     def forward(self, x_numeric, x_category, x_static):
 
         tmp_list = []
         for i,embed_layer in enumerate(self.embedding_static):
             tmp_list.append(embed_layer(x_static[:,i]))
         categroical_static_embeddings = torch.stack(tmp_list).mean(dim=0).unsqueeze(1)
 
         tmp_list = []
         for i,embed_layer in enumerate(self.embedding_cov):
             tmp_list.append(embed_layer(x_category[:,:,i]))
         categroical_covariates_embeddings = torch.stack(tmp_list).mean(dim=0)
         T = categroical_covariates_embeddings.shape[1]
 
         embed_out = (categroical_covariates_embeddings + categroical_static_embeddings.repeat(1,T,1))/2
         x = torch.concat((x_numeric,embed_out),dim=-1)
 
         for block in self.blocks:
             x = block(x)
 
         x = x.mean(dim=1)
         x = self.forecast_head(x)
 
         return x

我們修改後的 transformer 架構如下圖所示:

模型接受三個獨立的輸入張量: 數值特徵、分類特徵和靜態特徵。對分類和靜態特徵嵌入進行平均,並與數字特徵組合形成具有形狀 (batch_size, window_size, embedding_size) 的張量,爲 Transformer 塊做好準備。這個複合張量還包含嵌入的時間變量,提供必要的位置信息。

Transformer 塊提取順序信息,然後將結果張量沿着時間維度聚合,將其傳遞到 MLP 中以生成最終預測 (batch_size, forecast_length)。這個比賽採用均方根對數誤差(RMSLE) 作爲評價指標,公式爲:

鑑於預測經過對數轉換,預測低於 - 1 的負銷售額 (這會導致未定義的錯誤) 需要進行處理,所以爲了避免負的銷售預測和由此產生的 NaN 損失值,在 MLP 層以後增加了一層 ReLU 激活確保非負預測。

 class RMSLELoss(nn.Module):
 
    def __init__(self):
        super().__init__()
        self.mse = nn.MSELoss()
 
    def forward(self, pred, actual):
        return torch.sqrt(self.mse(torch.log(pred + 1), torch.log(actual + 1)))

訓練和驗證

訓練模型時需要設置幾個超參數: 窗口大小、是否打亂時間、嵌入大小、頭部數量、塊數量、dropout、批大小和學習率。以下配置是有效的,但不保證是最好的:

 num_epoch = 1000
 min_val_loss = 999
 
 num_blocks = 1
 embed_size = 500
 num_heads = 50
 batch_size = 128
 learning_rate = 3e-4
 time_shuffle = False
 drop_prob = 0.1
 
 model = transformer_forecaster(embed_size,num_heads,num_blocks).to(device)
 criterion = RMSLELoss()
 optimizer = torch.optim.Adam(model.parameters(),lr=learning_rate)
 scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=50, gamma=0.5)

這裏使用 adam 優化器和學習率調度,以便在訓練期間逐步調整學習率。

 for epoch in range(num_epoch):
 
    batch_loader = data_loader(x_numeric_train_tensor, x_category_train_tensor, x_static_train_tensor, y_train_tensor, batch_size, time_shuffle)
    train_loss = 0
    counter = 0
 
    model.train()
    for x_numeric, x_category, x_static, y in batch_loader:
 
        optimizer.zero_grad()
        preds = model(x_numeric, x_category, x_static)
        loss = criterion(preds, y)
        train_loss += loss.item()
        counter += 1
        loss.backward()
        optimizer.step()
 
    train_loss = train_loss/counter
    print(f'Epoch {epoch} training loss: {train_loss}')
 
    model.eval()
    val_batches = val_loader(x_numeric_val_tensor, x_category_val_tensor, x_static_val_tensor, y_val_tensor, batch_size, num_val)
    val_loss = 0
    counter = 0
    for x_numeric_val, x_category_val, x_static_val, y_val in val_batches:
        with torch.no_grad():
            preds = model(x_numeric_val,x_category_val,x_static_val)
            loss = criterion(preds,y_val).item()
        val_loss += loss
        counter += 1
    val_loss = val_loss/counter
    print(f'Epoch {epoch} validation loss: {val_loss}')
 
    if val_loss<min_val_loss:
      print('saved...')
      torch.save(model,data_folder+'best.model')
      min_val_loss = val_loss
 
    scheduler.step()

結果

訓練後,表現最好的模型的訓練損失爲 0.387,驗證損失爲 0.457。當應用於測試集時,該模型的 RMSLE 爲 0.416,比賽排名爲第 89 位(前 10%)。

更大的嵌入和更多的注意力頭似乎可以提高性能,但最好的結果是用一個單獨的 Transformer 實現的,這表明在有限的數據下,簡單是優點。當放棄整體打亂而選擇局部打亂時,效果有所改善; 引入輕微的時間偏差提高了預測的準確性。

引用:
[1]: Alexis Cook, DanB, inversion, Ryan Holbrook. (2021). Store Sales — Time Series Forecasting. Kaggle. https://kaggle.com/competitions/store-sales-time-series-forecasting
[2]: Vaswani, A., Shazeer, N., Parmar, N., Uszkoreit, J., Jones, L., Gomez,  A. N., … & Polosukhin, I. (2017). Attention is all you need. Advances in neural information processing systems30.
[3]: Lim, B., Arık, S. Ö., Loeff, N., & Pfister, T. (2021). Temporal  fusion transformers for interpretable multi-horizon time series  forecasting. International Journal of Forecasting37(4), 1748–1764.

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/17iS1BOMGIJy5eq4IdD4fQ