大数跨境
0
0

时间序列异常检测方法及实践:从统计方法到深度学习

时间序列异常检测方法及实践:从统计方法到深度学习 数据分析艺术
2025-09-20
0
导读:1 时间序列异常检测概述时间序列异常检测是指对按时间顺序排列的数据点进行分析,识别其中不符合预期模式的数据点或



1 时间序列异常检测概述














时间序列异常检测是指对按时间顺序排列的数据点进行分析,识别其中不符合预期模式的数据点或序列片段。随着物联网、金融科技和工业4.0的发展,时间序列异常检测已成为“维持复杂系统有效运行”的关键技术,广泛应用于设备监控、网络安全、金融风控等领域。
1.1.异常类型及其特点

时间序列中的异常通常分为三种类型:点异常(Point Anomaly),即单个数据点明显偏离正常范围;上下文异常(Contextual Anomaly),指在特定上下文环境中表现异常;集体异常(Collective Anomaly),即一系列相关数据点共同表现出异常模式。在工业场景中,这些异常可能对应着设备故障、生产缺陷或系统性能下降等重要事件。
1.2.挑战与应对

时间序列异常检测面临诸多挑战,包括数据的“高维性”、“动态依赖关系”、“噪声干扰”以及“异常标签稀缺”等问题。传统方法往往难以捕捉多维时间序列中实体间的动态依赖关系,且容易受异常数据影响难以重构正常模式。为应对这些挑战,研究者开发了从经典统计方法到前沿深度学习技术的多种解决方案,下面我将全面介绍这些方法及其实际应用。



2 统计与传统机器学习方法




2.1 统计方法

2.1.1 IQR(四分位距)方法

IQR(Interquartile Range)是描述数据离散程度的经典统计指标,其通过计算数据的分位数来识别异常值。该方法计算第一四分位数(Q1,25%分位点)和第三四分位数(Q3,75%分位点),然后计算IQR = Q3 - Q1,最后确定异常值边界:上边界为Q3 + 1.5×IQR,下边界为Q1 - 1.5×IQR。任何落在边界之外的数据点都被视为异常值。

优点与局限性:IQR方法简单高效,对数据分布没有假设,适用于大多数连续型数据。然而,它对季节性数据和非平稳时间序列的处理效果较差,可能需要先进行数据分解处理。

Python代码示例:

```python

import numpy as np

import pandas as pd

import matplotlib.pyplot as plt

def iqr_detector(series, low_threshold=1.5, high_threshold=1.5):

    """

    使用IQR方法检测时间序列异常

    """

    q1 = series.quantile(0.25)

    q3 = series.quantile(0.75)

    iqr = q3 - q1

    lower_bound = q1 - low_threshold * iqr

    upper_bound = q3 + high_threshold * iqr

    anomalies = (series < lower_bound) | (series > upper_bound)

    return anomalies, lower_bound, upper_bound

# 生成示例数据

np.random.seed(42)

dates = pd.date_range(start='2023-01-01', end='2023-12-31', freq='D')

normal_data = np.sin(2 * np.pi * np.arange(len(dates)) / 365) + np.random.normal(0, 0.1, len(dates))

series = pd.Series(normal_data, index=dates)

# 添加一些异常值

series.iloc[50] = 3.5  # 点异常

series.iloc[150] = -2.8 # 点异常

series.iloc[250:255] = 2.5 * np.ones(5)  # 集体异常

# 检测异常

anomalies, lower_bound, upper_bound = iqr_detector(series)

# 可视化结果

plt.figure(figsize=(12, 6))

plt.plot(series.index, series, label='Time Series')

plt.fill_between(series.index, lower_bound, upper_bound, alpha=0.3, label='IQR Range')

plt.scatter(series[anomalies].index, series[anomalies], color='red', label='Anomalies')

plt.title('IQR Anomaly Detection')

plt.xlabel('Date')

plt.ylabel('Value')

plt.legend()

plt.show()

```

2.1.2 3-Sigma法则

3-Sigma法则(又称三标准差法则)基于正态分布假设,认为几乎所有数据点(99.7%)都落在均值附近三个标准差的范围内。计算数据的均值和标准差,然后将与均值偏差超过3个标准差的数据点标记为异常。

优缺点分析:这种方法实现简单,计算高效,适合作为基线方法。但它依赖于数据服从正态分布的假设,对非正态分布数据效果较差,且对极端值敏感(均值标准差本身受异常值影响)。
2.2机器学习方法

2.2.1 孤立森林(Isolation Forest)

孤立森林是一种“高效的无监督”异常检测算法,其核心思想是"异常点稀少且不同,因此更容易被孤立"。算法通过随机选择特征和分割点来构建隔离树,异常点通常只需很少几次分割就能被孤立出来。

适用场景:孤立森林适用于高维数据集,计算效率高,特别适合处理大规模时间序列数据。它在金融欺诈检测、工业设备故障检测等场景有广泛应用。

Python代码示例:

```python

from sklearn.ensemble import IsolationForest

from sklearn.preprocessing import StandardScaler

# 准备数据

X = series.values.reshape(-1, 1)

# 标准化数据

scaler = StandardScaler()

X_scaled = scaler.fit_transform(X)

# 训练孤立森林模型

iforest = IsolationForest(contamination=0.05, random_state=42)

iforest.fit(X_scaled)

# 预测异常

anomaly_pred = iforest.predict(X_scaled)

anomalies = series.index[anomaly_pred == -1]

# 可视化结果

plt.figure(figsize=(12, 6))

plt.plot(series.index, series, label='Time Series')

plt.scatter(anomalies, series[anomalies], color='red', label='Anomalies')

plt.title('Isolation Forest Anomaly Detection')

plt.xlabel('Date')

plt.ylabel('Value')

plt.legend()

plt.show()

```

2.2.2 一类支持向量机(One-Class SVM)

一类支持向量机是一种**无监督学习**算法,其目标是寻找一个能够包围绝大部分正常数据点的最小超球体。落在球体之外的点即为异常点。

优缺点分析:One-Class SVM适用于小样本的"正常"数据训练,对高维数据有较好效果。但它对参数选择敏感,计算复杂度较高,不适合超大规模数据集。






3 深度学习方法



3.1 时间卷积网络(TCN)

时间卷积网络(Temporal Convolutional Networks)利用“因果卷积”和“空洞卷积”来捕捉时间序列中的长期依赖关系。TCN通过确保输出只依赖于过去而不依赖于未来,保持了时间序列的时序性,并通过空洞卷积指数级扩大感受野,能够捕捉长期模式。

TCN的核心架构包括一维全卷积网络、因果卷积(确保时间先后关系不被破坏)、空洞卷积(扩大感受野)和残差连接(解决梯度消失问题)。在异常检测中,TCN通常用于预测下一个时间步的值,然后通过预测误差识别异常。

Python代码示例:

```python

import torch

import torch.nn as nn

from torch.utils.data import DataLoader, TensorDataset

import numpy as np

class TCNAnomalyDetector(nn.Module):

    def __init__(self, input_size, output_size, num_channels, kernel_size, dropout):

        super(TCNAnomalyDetector, self).__init__()

        self.tcn = nn.Sequential(

            nn.Conv1d(input_size, num_channels, kernel_size, stride=1, padding=(kernel_size-1)//2),

            nn.ReLU(),

            nn.Dropout(dropout),

            nn.Conv1d(num_channels, num_channels, kernel_size, stride=1, padding=(kernel_size-1)//2),

            nn.ReLU(),

            nn.Dropout(dropout),

            nn.Conv1d(num_channels, output_size, kernel_size, stride=1, padding=(kernel_size-1)//2)

        )

    def forward(self, x):

        return self.tcn(x)

# 准备训练数据

def create_sequences(data, seq_length):

    sequences = []

    for i in range(len(data) - seq_length):

        seq = data[i:i+seq_length]

        sequences.append(seq)

    return np.array(sequences)

seq_length = 10

X_seq = create_sequences(series.values, seq_length)

X_train = torch.FloatTensor(X_seq).transpose(1, 2)

# 初始化模型

model = TCNAnomalyDetector(input_size=1, output_size=1, num_channels=32, kernel_size=3, dropout=0.2)

criterion = nn.MSELoss()

optimizer = torch.optim.Adam(model.parameters(), lr=0.01)

# 训练模型

num_epochs = 100

for epoch in range(num_epochs):

    for batch in DataLoader(TensorDataset(X_train), batch_size=32, shuffle=True):

        x = batch[0]

        output = model(x)

        loss = criterion(output, x)

        optimizer.zero_grad()

        loss.backward()

        optimizer.step()

    if (epoch+1) % 20 == 0:

        print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}')

# 检测异常

model.eval()

with torch.no_grad():

    reconstructions = model(X_train)

    loss_per_seq = torch.mean((reconstructions - X_train)**2, dim=(1,2))

threshold = np.quantile(loss_per_seq.numpy(), 0.95)

anomalies = loss_per_seq > threshold

```

3.2 自编码器(Autoencoder)

自编码器是一种“无监督学习”神经网络,由编码器和解码器两部分组成。编码器将输入数据压缩为潜在表示,解码器则从潜在表示重构原始输入。在时间序列异常检测中,自编码器在正常数据上训练,学习如何有效地重构正常模式。当遇到异常数据时,由于其模式未知,“重构误差会显著增大”,从而识别出异常。

“Seq2Seq深度自编码器”是自编码器的一种变体,使用Bi-LSTM网络作为编码器和解码器,输入和输出均为序列,能更好地挖掘时间序列数据中的异常序列模式。

Python代码示例:

```python

from tensorflow.keras.models import Model

from tensorflow.keras.layers import Input, LSTM, RepeatVector, Dense

from sklearn.preprocessing import StandardScaler

# 准备数据

scaler = StandardScaler()

scaled_data = scaler.fit_transform(series.values.reshape(-1, 1))

# 创建序列数据

sequence_length = 10

X_seqs = []

for i in range(len(scaled_data) - sequence_length):

    X_seqs.append(scaled_data[i:i+sequence_length])

X_seqs = np.array(X_seqs)

# 构建LSTM自编码器

input_layer = Input(shape=(sequence_length, 1))

encoded = LSTM(16, activation='relu', return_sequences=True)(input_layer)

encoded = LSTM(8, activation='relu', return_sequences=False)(encoded)

decoded = RepeatVector(sequence_length)(encoded)

decoded = LSTM(8, activation='relu', return_sequences=True)(decoded)

decoded = LSTM(16, activation='relu', return_sequences=True)(decoded)

decoded = Dense(1)(decoded)

autoencoder = Model(input_layer, decoded)

autoencoder.compile(optimizer='adam', loss='mse')

# 训练自编码器

history = autoencoder.fit(

    X_seqs, X_seqs,

    epochs=50,

    batch_size=32,

    validation_split=0.1,

    shuffle=True

)

# 检测异常

reconstructions = autoencoder.predict(X_seqs)

mse = np.mean(np.square(X_seqs - reconstructions), axis=(1,2))

threshold = np.quantile(mse, 0.95)

anomalies = mse > threshold

```

3.3 记忆网络与混合Prototype方法

记忆网络和混合Prototype方法是“时间序列异常检测的前沿技术”,其核心思路是通过记忆网络增加自编码器对异常序列的还原难度。该方法设置多个prototype(原型),每个prototype对应一个embedding,存储正常序列中不同类型的序列信息。


创新点:ICLR 2025提出的改进方法引入了“多尺度patch粒度”的表征和“周期性表征”,生成混合prototype,增强了prototype的丰富度,使异常序列的还原难度进一步提升。多尺度patch通过不同尺寸的average pooling得到不同尺度的patch embedding,而多周期性prototype则重点刻画序列周期性,对每个变量的周期性prototype进行单独学习。


优势:这种方法有效解决了传统自编码器过泛化的问题,即使对于异常序列也能产生较大的重构误差,从而提高异常检测的准确性。

3.4 结合对比学习的双分支方法

结合对比学习的双分支多维时间序列异常检测方法是一种“针对工业场景”的创新方法。它通过图结构学习和图特征增强得到实体之间的关联图以捕获动态变化的实体相关性,并使用长短期记忆网络对时间依赖关系进行提取。


该方法首先通过图结构学习和图特征增强得到实体之间的关联图,然后使用LSTM提取时间依赖关系,接着插入分块重组并采用图卷积操作提取不同尺度间的时空融合关系,最后将融合后的关系特征进行联合对比训练得到正异常差异表示以评估异常。


实验效果:在SWaT、WADI、SWAP和MSL四个公开工业数据集上,该方法取得了较高的F1分数(91.63%、90.60%、90.06%和93.69%),比MTGFLOW方法平均高出1.52个百分点。






4 开源工具与平台




4.1 TODS(Time-series Outlier Detection System)


TODS是一个开源的自动化时间序列异常检测系统,由datamllab团队开发,基于Python语言,集成了多种先进的异常检测算法,适用于各种时间序列数据分析场景。


核心功能:TODS提供从数据预处理、特征提取、检测算法到结果评估的完整管道,支持多种时间序列异常检测算法,包括统计方法、机器学习方法和深度学习方法。

Python代码示例:

```python

from tods import generate_dataset, train_test_split

from tods.sk_interface.detection_algorithm.AutoRegODetector import AutoRegODetector

# 生成示例数据集

data = generate_dataset()

# 分割训练集和测试集

X_train, X_test = train_test_split(data)

# 初始化异常检测器

detector = AutoRegODetector()

# 训练模型

detector.fit(X_train)

# 预测异常点

predictions = detector.predict(X_test)

print(predictions)

```

4.2 IBM异常检测API

IBM日本研究院开发的异常检测API是面向工业应用的解决方案,具有“多算法支持”、“实时处理能力”、“多维分析”和“可解释性”等特点。该API集成多种统计学习和机器学习算法,可处理高速产生的流式时序列数据,支持单变量和多变量分析,并提供异常评分和置信度指标便于人工验证。


应用场景:该API适用于设备预测性维护(通过振动、温度等传感器数据早期发现设备异常)、产品质量监控(生产过程中的关键参数异常检测)、能源消耗分析(识别非正常能耗模式)以及供应链优化(物流和库存数据的异常波动监测)。

4.3 Darts时间序列库

Darts是Unit8团队开发的Python时间序列预测库,近期社区提出了基于“四分位距(IQR)方法”实现异常检测功能的增强需求。IQR方法是统计学中识别异常值的经典技术,相比简单的阈值检测具有更好的鲁棒性。


实现方案:在Darts库中,IQR检测器的实现将继承自QuantileDetector基类,但会采用不同的边界计算逻辑。QuantileDetector直接使用用户指定的分位数作为边界,而IQRDetector自动计算基于IQR的动态边界。




5 总结与展望



时间序列异常检测是一个“快速发展”的领域,各种方法各有其适用场景和优缺点。统计方法简单高效,适合作为基线方法;机器学习方法平衡了性能与复杂度;深度学习方法能捕捉复杂模式但需要更多数据和计算资源。

5.1 方法选择建议

- 数据特性:对于单变量数据,可以从简单的统计方法(如IQR)开始;对于多变量数据且变量间存在复杂依赖关系,应考虑深度学习方法或专门的多变量方法。

- 异常类型:点异常可使用统计方法或隔离森林;上下文异常需要考虑时序上下文,使用TCN或LSTM;集体异常则需要序列分析方法。

- 资源约束:计算资源有限时选择简单方法;有充足数据且需要高精度时选择深度学习方法。

- 实时性要求:高实时性要求选择高效算法(如统计方法或隔离森林),允许一定延迟时可使用深度学习方法。


5.2 未来发展趋势

时间序列异常检测领域正朝着以下几个方向发展:多模态学习(结合时间序列与其他类型数据)、可解释性(不仅检测异常还能解释原因)、在线学习(适应数据分布的变化)以及“少样本学习”(解决异常样本稀少的问题)。

随着工业4.0和物联网技术的普及,时间序列异常检测将在“智能制造”、“智慧城市”和“健康医疗”等领域发挥更加重要的作用。结合领域知识的异常检测系统将成为未来的研究重点,帮助实现更加智能和高效的异常检测解决方案。

无论选择哪种方法,都需要理解其原理和假设,并根据实际业务需求进行调整和优化。同时,良好的数据预处理和特征工程往往比模型选择更重要,在时间序列分析中尤为重要。


【声明】内容源于网络
0
0
数据分析艺术
分析方法|应用场景|复盘总结
内容 80
粉丝 0
数据分析艺术 分析方法|应用场景|复盘总结
总阅读89
粉丝0
内容80