79db6e5c96
新增锂电池剩余使用寿命预测模型RUL-Mamba,包含以下主要组件: 1. 添加Mamba模块作为核心时序建模组件 2. 实现特征注意力网络(FAN)和门控残差网络(GRN) 3. 新增数据预处理和归一化层 4. 添加模型训练和评估脚本 5. 补充README文档说明使用方法 6. 添加可视化辅助工具Helper_Plot.py 7. 实现多种时间序列处理层(Embedding、AutoCorrelation等) 8. 添加配置文件requirements.txt 9. 补充测试数据集TJU battery data
278 lines
12 KiB
Python
278 lines
12 KiB
Python
#! -*- coding:utf-8 -*-
|
|
"""
|
|
Hyperparameters can be efficiently tuned with `optuna <https://optuna.readthedocs.io/>`_.
|
|
"""
|
|
import copy
|
|
import logging
|
|
import os
|
|
from typing import Any, Dict, Tuple, Union
|
|
|
|
import numpy as np
|
|
import optuna
|
|
from optuna.integration import PyTorchLightningPruningCallback, TensorBoardCallback
|
|
import optuna.logging
|
|
import pytorch_lightning as pl
|
|
from pytorch_lightning import Callback
|
|
from pytorch_lightning.callbacks import LearningRateMonitor
|
|
from pytorch_lightning.loggers import TensorBoardLogger
|
|
import statsmodels.api as sm
|
|
import torch
|
|
from torch.utils.data import DataLoader
|
|
|
|
from pytorch_forecasting import TemporalFusionTransformer
|
|
from pytorch_forecasting.data import TimeSeriesDataSet
|
|
from pytorch_forecasting.metrics import QuantileLoss
|
|
from pytorch_forecasting.metrics import MAE, MAPE, MASE, RMSE, SMAPE, MultiHorizonMetric, MultiLoss, QuantileLoss
|
|
from pytorch_forecasting.models.rnn import RecurrentNetwork
|
|
from pytorch_forecasting import DeepAR
|
|
from pytorch_forecasting.metrics import SMAPE, MultivariateNormalDistributionLoss,MAE
|
|
|
|
optuna_logger = logging.getLogger("optuna")
|
|
|
|
|
|
class MetricsCallback(Callback):
|
|
"""PyTorch Lightning metric callback."""
|
|
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.metrics = []
|
|
|
|
def on_validation_end(self, trainer, pl_module):
|
|
self.metrics.append(trainer.callback_metrics)
|
|
|
|
def optimize_hyperparameters(
|
|
Model,
|
|
train_dataloaders: DataLoader,
|
|
val_dataloaders: DataLoader,
|
|
model_path: str,
|
|
max_epochs: int = 100,
|
|
enc_in: int = 11,
|
|
max_encoder_length: int = 24,
|
|
max_prediction_length: int = 1,
|
|
n_trials: int = 100,
|
|
timeout: float = 3600 * 10.0, # 10 hours
|
|
gradient_clip_val_range: Tuple[float, float] = (0.1, 0.5),
|
|
hidden_size_range: Tuple[int, int] = (16, 265),
|
|
d_model_range: Tuple[int, int] = (8, 96),
|
|
kernel_size_range: Tuple[int, int] = (3, 24),
|
|
|
|
hidden_continuous_size_range: Tuple[int, int] = (8, 64),
|
|
attention_head_size_range: Tuple[int, int] = (1, 8),
|
|
n_hidden_layer_range: Tuple[int, int] = (1, 3),
|
|
dropout_range: Tuple[float, float] = (0.05, 0.15),
|
|
learning_rate_range: Tuple[float, float] = (0.0005, 0.0015),
|
|
use_learning_rate_finder: bool = True,
|
|
trainer_kwargs: Dict[str, Any] = {},
|
|
log_dir: str = "lightning_logs",
|
|
study: optuna.Study = None,
|
|
verbose: Union[int, bool] = None,
|
|
cell_type: str = "GRU",
|
|
TAN_hidden_range: Tuple[int, int] = (1, 10),
|
|
weight_range: Tuple[float, float] = (0.1, 0.9),
|
|
pruner: optuna.pruners.BasePruner = optuna.pruners.SuccessiveHalvingPruner(),
|
|
|
|
**kwargs,
|
|
) -> optuna.Study:
|
|
"""
|
|
Optimize Temporal Fusion Transformer hyperparameters.
|
|
|
|
Run hyperparameter optimization. Learning rate for is determined with
|
|
the PyTorch Lightning learning rate finder.
|
|
|
|
Args:
|
|
train_dataloaders (DataLoader): dataloader for training model
|
|
val_dataloaders (DataLoader): dataloader for validating model
|
|
model_path (str): folder to which model checkpoints are saved
|
|
max_epochs (int, optional): Maximum number of epochs to run training. Defaults to 20.
|
|
n_trials (int, optional): Number of hyperparameter trials to run. Defaults to 100.
|
|
timeout (float, optional): Time in seconds after which training is stopped regardless of number of epochs
|
|
or validation metric. Defaults to 3600*8.0.
|
|
hidden_size_range (Tuple[int, int], optional): Minimum and maximum of ``hidden_size`` hyperparameter. Defaults
|
|
to (16, 265).
|
|
hidden_continuous_size_range (Tuple[int, int], optional): Minimum and maximum of ``hidden_continuous_size``
|
|
hyperparameter. Defaults to (8, 64).
|
|
attention_head_size_range (Tuple[int, int], optional): Minimum and maximum of ``attention_head_size``
|
|
hyperparameter. Defaults to (1, 4).
|
|
dropout_range (Tuple[float, float], optional): Minimum and maximum of ``dropout`` hyperparameter. Defaults to
|
|
(0.1, 0.3).
|
|
learning_rate_range (Tuple[float, float], optional): Learning rate range. Defaults to (1e-5, 1.0).
|
|
use_learning_rate_finder (bool): If to use learning rate finder or optimize as part of hyperparameters.
|
|
Defaults to True.
|
|
trainer_kwargs (Dict[str, Any], optional): Additional arguments to the
|
|
`PyTorch Lightning trainer <https://pytorch-lightning.readthedocs.io/en/latest/trainer.html>`_ such
|
|
as ``limit_train_batches``. Defaults to {}.
|
|
log_dir (str, optional): Folder into which to log results for tensorboard. Defaults to "lightning_logs".
|
|
study (optuna.Study, optional): study to resume. Will create new study by default.
|
|
verbose (Union[int, bool]): level of verbosity.
|
|
* None: no change in verbosity level (equivalent to verbose=1 by optuna-set default).
|
|
* 0 or False: log only warnings.
|
|
* 1 or True: log pruning events.
|
|
* 2: optuna logging level at debug level.
|
|
Defaults to None.
|
|
pruner (optuna.pruners.BasePruner, optional): The optuna pruner to use.
|
|
Defaults to optuna.pruners.SuccessiveHalvingPruner().
|
|
|
|
**kwargs: Additional arguments for the :py:class:`~TemporalFusionTransformer`.
|
|
|
|
Returns:
|
|
optuna.Study: optuna study results
|
|
"""
|
|
assert isinstance(train_dataloaders.dataset, TimeSeriesDataSet) and isinstance(
|
|
val_dataloaders.dataset, TimeSeriesDataSet
|
|
), "dataloaders must be built from timeseriesdataset"
|
|
|
|
logging_level = {
|
|
None: optuna.logging.get_verbosity(),
|
|
0: optuna.logging.WARNING,
|
|
1: optuna.logging.INFO,
|
|
2: optuna.logging.DEBUG,
|
|
}
|
|
optuna_verbose = logging_level[verbose]
|
|
optuna.logging.set_verbosity(optuna_verbose)
|
|
|
|
loss = kwargs.get(
|
|
"loss", QuantileLoss()
|
|
) # need a deepcopy of loss as it will otherwise propagate from one trial to the next
|
|
|
|
# create objective function
|
|
def objective(trial: optuna.Trial) -> float:
|
|
# Filenames for each trial must be made unique in order to access each checkpoint.
|
|
checkpoint_callback = pl.callbacks.ModelCheckpoint(
|
|
dirpath=os.path.join(model_path, "trial_{}".format(trial.number)), filename="{epoch}", monitor="val_loss"
|
|
)
|
|
|
|
# The default logger in PyTorch Lightning writes to event files to be consumed by
|
|
# TensorBoard. We don't use any logger here as it requires us to implement several abstract
|
|
# methods. Instead we setup a simple callback, that saves metrics from each validation step.
|
|
metrics_callback = MetricsCallback()
|
|
learning_rate_callback = LearningRateMonitor()
|
|
logger = TensorBoardLogger(log_dir, name="optuna", version=trial.number)
|
|
gradient_clip_val = trial.suggest_loguniform("gradient_clip_val", *gradient_clip_val_range)
|
|
default_trainer_kwargs = dict(
|
|
gpus=[0] if torch.cuda.is_available() else None,
|
|
max_epochs=max_epochs,
|
|
gradient_clip_val=gradient_clip_val,
|
|
callbacks=[
|
|
metrics_callback,
|
|
learning_rate_callback,
|
|
checkpoint_callback,
|
|
PyTorchLightningPruningCallback(trial, monitor="val_loss"),
|
|
],
|
|
logger=logger,
|
|
enable_progress_bar=optuna_verbose < optuna.logging.INFO,
|
|
# weights_summary=[None, "top"][optuna_verbose < optuna.logging.INFO],
|
|
)
|
|
default_trainer_kwargs.update(trainer_kwargs)
|
|
trainer = pl.Trainer(
|
|
**default_trainer_kwargs,
|
|
)
|
|
|
|
# 隐层数量
|
|
hidden_size = trial.suggest_int("hidden_size", *hidden_size_range, log=True)
|
|
n_hidden_layer = trial.suggest_int("n_hidden_layer", *n_hidden_layer_range, log=True)
|
|
dropout = trial.suggest_uniform("dropout", *dropout_range),
|
|
|
|
kwargs["loss"] = copy.deepcopy(loss)
|
|
weight = trial.suggest_float("weight", *weight_range, step=0.05)
|
|
# create model
|
|
|
|
# BPNN
|
|
if Model.__name__=='FullyConnectedModel':
|
|
model = Model.from_dataset(
|
|
train_dataloaders.dataset,
|
|
input_size = 216,
|
|
output_size=1,
|
|
hidden_size=hidden_size, # 调参隐层节点数
|
|
n_hidden_layers=n_hidden_layer, # 调参隐层层数
|
|
# dropout=trial.suggest_uniform("dropout", *dropout_range),
|
|
log_interval=-1,
|
|
loss=MAE()
|
|
# **kwargs,
|
|
)
|
|
elif 'RULMambaNetModel' in Model.__name__:
|
|
model = Model.from_dataset(
|
|
train_dataloaders.dataset,
|
|
lookback=max_encoder_length,
|
|
enc_in=enc_in,
|
|
predict=max_prediction_length,
|
|
dropout=trial.suggest_uniform(name="dropout",low=0.01,high=0.2),
|
|
d_model=trial.suggest_int(name="d_model",low=8,high=256, step=8),
|
|
n_dec_layer=trial.suggest_int(name="n_dec_layer", low=1, high=3, step = 1),
|
|
loss=SMAPE(),
|
|
)
|
|
elif 'RULMambaVSNNetModel' in Model.__name__:
|
|
model = Model.from_dataset(
|
|
train_dataloaders.dataset,
|
|
lookback=max_encoder_length,
|
|
enc_in=enc_in,
|
|
predict=max_prediction_length,
|
|
dropout=trial.suggest_uniform(name="dropout",low=0.01,high=0.2),
|
|
d_model=trial.suggest_int(name="d_model",low=8,high=96, step=8),
|
|
n_dec_layer=trial.suggest_int(name="n_dec_layer", low=1, high=3, step = 1),
|
|
loss=SMAPE(),
|
|
)
|
|
else:
|
|
model = Model.from_dataset(
|
|
train_dataloaders.dataset,
|
|
dropout=trial.suggest_uniform("dropout", *dropout_range),
|
|
hidden_size=hidden_size,
|
|
hidden_continuous_size=trial.suggest_int(
|
|
"hidden_continuous_size",
|
|
hidden_continuous_size_range[0],
|
|
min(hidden_continuous_size_range[1], hidden_size),
|
|
log=True,
|
|
),
|
|
attention_head_size=trial.suggest_int("attention_head_size", *attention_head_size_range),
|
|
log_interval=-1,
|
|
**kwargs,
|
|
)
|
|
# print('model:',model)
|
|
# find good learning rate
|
|
if use_learning_rate_finder:
|
|
lr_trainer = pl.Trainer(
|
|
gradient_clip_val=gradient_clip_val,
|
|
gpus=[0] if torch.cuda.is_available() else None,
|
|
logger=False,
|
|
enable_progress_bar=False,
|
|
enable_model_summary=False,
|
|
)
|
|
res = lr_trainer.tuner.lr_find(
|
|
model,
|
|
train_dataloaders=train_dataloaders,
|
|
val_dataloaders=val_dataloaders,
|
|
early_stop_threshold=10000,
|
|
min_lr=learning_rate_range[0],
|
|
num_training=100,
|
|
max_lr=learning_rate_range[1],
|
|
)
|
|
|
|
loss_finite = np.isfinite(res.results["loss"])
|
|
if loss_finite.sum() > 3: # at least 3 valid values required for learning rate finder
|
|
lr_smoothed, loss_smoothed = sm.nonparametric.lowess(
|
|
np.asarray(res.results["loss"])[loss_finite],
|
|
np.asarray(res.results["lr"])[loss_finite],
|
|
frac=1.0 / 10.0,
|
|
)[min(loss_finite.sum() - 3, 10) : -1].T
|
|
optimal_idx = np.gradient(loss_smoothed).argmin()
|
|
optimal_lr = lr_smoothed[optimal_idx]
|
|
else:
|
|
optimal_idx = np.asarray(res.results["loss"]).argmin()
|
|
optimal_lr = res.results["lr"][optimal_idx]
|
|
optuna_logger.info(f"Using learning rate of {optimal_lr:.3g}")
|
|
# add learning rate artificially
|
|
model.hparams.learning_rate = trial.suggest_uniform("learning_rate", optimal_lr, optimal_lr)
|
|
else:
|
|
model.hparams.learning_rate = trial.suggest_loguniform("learning_rate", *learning_rate_range)
|
|
|
|
# fit
|
|
trainer.fit(model, train_dataloaders=train_dataloaders, val_dataloaders=val_dataloaders)
|
|
|
|
# report result
|
|
return metrics_callback.metrics[-1]["val_loss"].item()
|
|
|
|
# setup optuna and run
|
|
if study is None:
|
|
study = optuna.create_study(direction="minimize", pruner=pruner)
|
|
study.optimize(objective, n_trials=n_trials, timeout=timeout)
|
|
return study
|