D2L 08 循环神经网络


D2L 08 循环神经网络

序列模型

处理序列数据需要统计工具和新的深度神经网络架构

统计工具

教材以股票的例子作为例子,假设一个交易员想在第 $t$ 日的股市中表现良好,于是通过以下途径预测当天的价格 $x_t$

自回归模型

本章后面的大部分内容将围绕着如何有效估计 $P(x_t∣x_{t−1},…,x_1)$ 展开,简单地说,它归结为以下两种策略:

  1. 假设在现实情况下相当长的序列 $x_{t−1},…,x_1$ 可能是不必要的, 因此我们只需要满足某个长度为 $\tau$ 的时间跨度, 即使用观测序列 $x_{t−1},…,x_{t−τ}$ 这种,模型被称为自回归模型(autoregressive models), 因为它们是对自己执行回归

  2. 保留一些对过去观测的总结 $h_t$,并且同时更新预测 $\hat {x}_t$ 和总结 $h_t$ ,由于 $ht$ 从未被观测到,这类模型也被称为隐变量自回归模型(latent autoregressive models)

马尔可夫模型

在自回归模型的近似法中, 我们使用 $x_{t−1},…,x_{t−τ}$ 而不是 $x_{t−1},…,x_1$ 来估计 $x_t$。 只要这种是近似精确的,我们就说序列满足马尔可夫条件(Markov condition),特别是,如果 $\tau =1$,得到一个一阶马尔可夫模型(first-order Markov model), $P(x)$ 由下式给出

动态规划这些计算工具已经在控制算法和强化学习算法广泛使用

训练

接下来教材开始了一个小实践!首先,生成一些数据:使用正弦函数和一些可加性噪声来生成序列数据, 时间步为1,2,…,1000,并且将这个序列转换为模型的“特征-标签”(feature-label)对。 基于嵌入维度 $\tau$,我们将数据映射为数据对 $y_t=x_t$ 和 $x_t=[x_{t−τ},…,x_{t−1}]$

换句话说,我们想要干这么一个事情:在时刻 $t$,根据之前的 $\tau$ 个数据 $[x_{t−τ},…,x_{t−1}]$,去预测 $x_t$ 的数据是什么。教材使用了一个 MLP 去做这个预测,效果还不错

预测

通常,对于直到 $x_t$ 的观测序列,其在时间步 $t+k$ 处的预测输出 $x_{t+k}$ 称为 $k$ 步预测(k-step-ahead-prediction),上面展示的就是单步预测结果,当使训练好的模型进行多步预测时,会发现预测结果很快就变得离谱起来,一个原因是误差的累计导致预测结果与真实标签相差甚远

文本预处理

序列数据存在许多种形式,文本是最常见例子之一,本节中,我们将解析文本的常见预处理步骤。 这些步骤通常包括:

  1. 将文本作为字符串加载到内存中
  2. 将字符串拆分为词元(如单词和字符)
  3. 建立一个词表,将拆分的词元映射到数字索引
  4. 将文本转换为数字索引序列,方便模型操作

读取数据集

教材使用了一个 time machine 文本作为数据集

data_path = Path('./d2l_data/timemachine.txt')
def read_time_machine():  #@save
    """将时间机器数据集加载到文本行的列表中"""
    with open(data_path, 'r') as f:
        lines = f.readlines()
    return [re.sub('[^A-Za-z]+', ' ', line).strip().lower() for line in lines]

lines = read_time_machine()
print(f'# 文本总行数: {len(lines)}')
print(lines[0])
print(lines[10])

词元化

下面的 tokenize 函数将文本行列表(lines)作为输入,每个文本序列又被拆分成一个词元列表,词元(token)是文本的基本单位(例如,可以选择单词或字母作为基本单位)

def tokenize(lines, token='word'):  #@save
    """将文本行拆分为单词或字符词元"""
    if token == 'word':
        return [line.split() for line in lines]
    elif token == 'char':
        # 把字符串转换为单个字母可以用 list 方法
        return [list(line) for line in lines]
    else:
        print('错误:未知词元类型:' + token)

tokens = tokenize(lines)
for i in range(11):
    print(tokens[i])

词表

词元的类型是字符串,而模型需要的输入是数字,因此这种类型不方便模型使用。 现在,让我们构建一个字典,通常也叫做词表(vocabulary), 用来将字符串类型的词元映射到从0开始的数字索引中

class Vocab:  #@save
    """文本词表"""
    def __init__(self, tokens=None, min_freq=0, reserved_tokens=None):
        if tokens is None:
            tokens = []
        if reserved_tokens is None:
            reserved_tokens = []
        # 按出现频率排序
        counter = count_corpus(tokens)
        self._token_freqs = sorted(counter.items(), key=lambda x: x[1],
                                   reverse=True)
        # 初始化未知词元 & reserved_tokens
        self.idx_to_token = ['<unk>'] + reserved_tokens
        self.token_to_idx = {token: idx
                             for idx, token in enumerate(self.idx_to_token)}
        # 下面这句话是多余的,是教材中的一个小失误
        # self.idx_to_token, self.token_to_idx = [], dict()
        for token, freq in self._token_freqs:
            # 去除出现频率低的词元
            if freq < min_freq:
                break
            if token not in self.token_to_idx:
                self.idx_to_token.append(token)
                # 核心是生成了 {token: idx} 字典
                self.token_to_idx[token] = len(self.idx_to_token) - 1

    def __len__(self):
        return len(self.idx_to_token)

    def __getitem__(self, tokens):
        if not isinstance(tokens, (list, tuple)):
            return self.token_to_idx.get(tokens, self.unk)
        return [self.__getitem__(token) for token in tokens]

    def to_tokens(self, indices):
        if not isinstance(indices, (list, tuple)):
            return self.idx_to_token[indices]
        return [self.idx_to_token[index] for index in indices]

    @property
    def unk(self):  # 未知词元的索引为0
        return 0

    @property
    def token_freqs(self):
        return self._token_freqs

def count_corpus(tokens):  #@save
    """统计词元的频率"""
    # 这里的 `tokens` 是 1D 列表或 2D 列表
    if len(tokens) == 0 or isinstance(tokens[0], list):
        # 将词元列表展平成一个列表
        tokens = [token for line in tokens for token in line]
    return collections.Counter(tokens)

这里的代码比较复杂,为了让 vocab 的概念更清晰,再总结一下它的功能:

  1. index to token,给定一个 index 返回其 token

    vocab.to_tokens(index)
  2. token to index,给定一个 token 返回其 index

    # 由 __getitem__ 实现
    vocab[token]
  3. token_freqs 返回一个 list of tuple,用于存储 token 出现的次数 (token, freqs),该功能是由 collections.Counter + sorted 实现

语料库

将如下几个功能整合起来:将数据导入,生成词表,生成语料库(所谓语料库,就是一个 list of index,存储了文本中每一个 token 对应的 index)

def load_corpus_time_machine(max_tokens=-1):  #@save
    """返回时光机器数据集的词元索引列表和词表"""
    lines = read_time_machine()
    tokens = tokenize(lines, 'char')
    vocab = Vocab(tokens)
    # 因为时光机器数据集中的每个文本行不一定是一个句子或一个段落,
    # 所以将所有文本行展平到一个列表中,并利用 vocab 查询其 index
    # 这里的语法非常的简洁,欣赏一下
    corpus = [vocab[token] for line in tokens for token in line]
    if max_tokens > 0:
        corpus = corpus[:max_tokens]
    return corpus, vocab

语言模型和数据集

一个理想的语言模型就能够基于模型本身生成自然文本

马尔可夫模型与 n 元语法

马尔可夫性质推导出了许多可以应用于序列建模的近似公式:

通常,涉及一个、两个和三个变量的概率公式分别被称为 “一元语法”(unigram)、“二元语法”(bigram)和“三元语法”(trigram)模型。 下面,我们将学习如何去设计更好的模型

自然语言统计

自然语言中词频的分布是不均匀的,少部分的词会大量出现,而大部分的词出现的频率会比较少

这告诉我们想要通过计数统计和平滑来建模单词是不可行的, 因为这样建模的结果会大大高估尾部单词的频率,也就是所谓的不常用单词。那么其他的词元组合,比如二元语法、三元语法等等,又会如何呢?

# 二元语法示例
bigram_tokens = [pair for pair in zip(corpus[:-1], corpus[1:])]
bigram_vocab = d2l.Vocab(bigram_tokens)
bigram_vocab.token_freqs[:10]

这张图非常令人振奋!原因有很多: 首先,除了一元语法词,单词序列似乎也遵循齐普夫定律

其次,词表中 n 元组的数量并没有那么大,这说明语言中存在相当多的结构, 这些结构给了我们应用模型的希望

读取长序列数据

假设我们将使用神经网络来训练语言模型, 模型中的网络一次处理具有预定义长度 (例如 n 个时间步)的一个小批量序列。 现在的问题是如何随机生成一个小批量数据的特征和标签以供读取

随机采样

在随机采样中,每个样本都是在原始的长序列上任意捕获的子序列

def seq_data_iter_random(corpus, batch_size, num_steps):  #@save
    """使用随机抽样生成一个小批量子序列"""
    # 从随机偏移量开始对序列进行分区,随机范围包括`num_steps - 1`
    corpus = corpus[random.randint(0, num_steps - 1):]
    # 减去1,是因为我们需要考虑标签
    num_subseqs = (len(corpus) - 1) // num_steps
    # 长度为`num_steps`的子序列的起始索引
    initial_indices = list(range(0, num_subseqs * num_steps, num_steps))
    # 在随机抽样的迭代过程中,
    # 来自两个相邻的、随机的、小批量中的子序列不一定在原始序列上相邻
    random.shuffle(initial_indices)

    def data(pos):
        # 返回从`pos`位置开始的长度为`num_steps`的序列
        return corpus[pos: pos + num_steps]

    num_batches = num_subseqs // batch_size
    for i in range(0, batch_size * num_batches, batch_size):
        # 在这里,`initial_indices`包含子序列的随机起始索引
        initial_indices_per_batch = initial_indices[i: i + batch_size]
        X = [data(j) for j in initial_indices_per_batch]
        Y = [data(j + 1) for j in initial_indices_per_batch]
        yield torch.tensor(X), torch.tensor(Y)

顺序分区

在迭代过程中,除了对原始序列可以随机抽样外, 我们还可以保证两个相邻的小批量中的子序列在原始序列上也是相邻的

def seq_data_iter_sequential(corpus, batch_size, num_steps):  #@save
    """使用顺序分区生成一个小批量子序列"""
    # 从随机偏移量开始划分序列
    offset = random.randint(0, num_steps)
    num_tokens = ((len(corpus) - offset - 1) // batch_size) * batch_size
    Xs = torch.tensor(corpus[offset: offset + num_tokens])
    Ys = torch.tensor(corpus[offset + 1: offset + 1 + num_tokens])
    Xs, Ys = Xs.reshape(batch_size, -1), Ys.reshape(batch_size, -1)
    num_batches = Xs.shape[1] // num_steps
    for i in range(0, num_steps * num_batches, num_steps):
        X = Xs[:, i: i + num_steps]
        Y = Ys[:, i: i + num_steps]
        yield X, Y

从结果上直观感受一下顺序分区和随机采样的区别

# my_seq = list(range(35))

# 随机采样 batch_size=2, num_steps=5
X:  tensor([[27, 28, 29, 30, 31],
        [22, 23, 24, 25, 26]])
Y: tensor([[28, 29, 30, 31, 32],
        [23, 24, 25, 26, 27]])
X:  tensor([[17, 18, 19, 20, 21],
        [12, 13, 14, 15, 16]])
Y: tensor([[18, 19, 20, 21, 22],
        [13, 14, 15, 16, 17]])

# 顺序分区
X:  tensor([[ 3,  4,  5,  6,  7],
        [18, 19, 20, 21, 22]])
Y: tensor([[ 4,  5,  6,  7,  8],
        [19, 20, 21, 22, 23]])
X:  tensor([[ 8,  9, 10, 11, 12],
        [23, 24, 25, 26, 27]])
Y: tensor([[ 9, 10, 11, 12, 13],
        [24, 25, 26, 27, 28]])

循环神经网络

我们介绍了 n 元语法模型, 其中单词 $x_t$ 在时间步 t 的条件概率仅取决于前面 n−1 个单词。如果要增加 n,模型参数的数量也会随之呈指数增长,面临的次元组合数为 $|V|^n$

因此与其将 $P(x_t∣x_{t−1},…,x_{t−n+1})$ 模型化, 不如使用隐变量模型

其中 $h_{t−1}$ 是隐状态(hidden state), 也称为隐藏变量(hidden variable), 它存储了到时间步 t−1 的序列信息。 通常,我们可以基于当前输入和先前隐状态来计算时间步 t 处的任何时间的隐状态

有隐状态的循环神经网络

假设我们在时间步 t 有小批量输入 $X_t∈R^{n×d}$,与多层感知机不同的是, 我们在这里保存了前一个时间步的隐藏变量 $H_{t−1}$, 并引入了一个新的权重参数 $W_{hh}∈R^{h×h}$, 来描述如何在当前时间步中使用前一个时间步的隐藏变量。 具体地说,当前时间步隐藏变量由当前时间步的输入与前一个时间步的隐藏变量一起计算得出:

其中 $\phi$ 为激活函数。输出层的输出类似于多层感知机中的计算

值得一提的是,即使在不同的时间步,循环神经网络也总是使用这些模型参数。 因此,循环神经网络的参数开销不会随着时间步的增加而增加,下面来看一下图示对整个计算过程有更清晰的认识

困惑度(Perplexity)

在训练过程中,我们对每个时间步的输出层的输出进行softmax操作, 然后利用交叉熵损失计算模型输出和标签之间的误差

由于历史原因,自然语言处理的科学家更喜欢使用一个叫做困惑度(perplexity)的量,它是上式的指数

循环神经网络的从零实现

这里仅记录一些值得注意的知识点,详细的从零实现不做笔记

One-hot 编码

每个词元都表示为一个数字索引, 将这些索引直接输入神经网络可能会使学习变得困难。 我们通常将每个词元表示为更具表现力的特征向量。 最简单的表示称为独热编码(one-hot encoding)

F.one_hot(torch.tensor([0, 2]), len(vocab))

我们每次采样的小批量数据形状是二维张量: (批量大小,时间步数)。 one_hot函数将这样一个小批量数据转换成三维张量, 张量的最后一个维度等于词表大小 len(vocab)

X = torch.arange(10).reshape((2, 5))
F.one_hot(X.T, 28).shape
# torch.Size([5, 2, 28])

预测

让我们首先定义预测函数来生成 prefix 之后的新字符, 其中的 prefix 是一个用户提供的包含多个字符的字符串。 在循环遍历 prefix 中的开始字符时, 我们不断地将隐状态传递到下一个时间步,但是不生成任何输出。 这被称为预热(warm-up)期, 因为在此期间模型会自我更新(例如,更新隐状态), 但不会进行预测。 预热期结束后,隐状态的值通常比刚开始的初始值更适合预测, 从而预测字符并输出它们

def predict_ch8(prefix, num_preds, net, vocab, device):  #@save
    """在prefix后面生成新字符"""
    state = net.begin_state(batch_size=1, ctx=device)
    outputs = [vocab[prefix[0]]]
    get_input = lambda: np.array([outputs[-1]], ctx=device).reshape((1, 1))
    for y in prefix[1:]:  # 预热期
        _, state = net(get_input(), state)	# 不关心输出,只关心隐状态
        outputs.append(vocab[y])
    for _ in range(num_preds):  # 预测num_preds步
        y, state = net(get_input(), state)
        outputs.append(int(y.argmax(axis=1).reshape(1)))
    return ''.join([vocab.idx_to_token[i] for i in outputs])

梯度剪裁

对于长度为 T 的序列,我们在迭代中计算这 T 个时间步上的梯度, 将会在反向传播过程中产生长度为 O(T) 的矩阵乘法链,当 T 较大时,它可能导致数值不稳定, 例如可能导致梯度爆炸或梯度消失。对于梯度爆炸, 一个流行的替代方案是通过将梯度gg投影回给定半径 (例如 θ)的球来裁剪梯度 g。 如下式

训练

循环神经网络的训练与普通的神经网络训练有几点不同:

  1. 序列数据的不同采样方法(随机采样和顺序分区)将导致隐状态初始化的差异
  2. 我们在更新模型参数之前裁剪梯度。 这样的操作的目的是:即使训练过程中某个点上发生了梯度爆炸,也能保证模型不会发散
  3. 我们用困惑度来评价模型

需要注意的是隐状态的传递会导致梯度链的增长,这带来的好处是历史信息的传递,带来的坏处是过长的计算图使得梯度计算变得复杂,下面是教材中的原话:

具体来说,当使用顺序分区时, 我们只在每个迭代周期(epoch)的开始位置初始化隐状态。 由于下一个小批量数据中的第 i 个子序列样本与当前第 i 个子序列样本相邻, 因此当前小批量数据最后一个样本的隐状态, 将用于初始化下一个小批量数据第一个样本的隐状态。 这样,存储在隐状态中的序列的历史信息可以在一个迭代周期内流经相邻的子序列。 然而,在任何一点隐状态的计算, 都依赖于同一迭代周期中前面所有的小批量数据, 这使得梯度计算变得复杂。 为了降低计算量,在处理任何一个小批量数据之前(iteration), 我们先分离梯度,使得隐状态的梯度计算总是限制在一个小批量数据的时间步内

解决方式就是在每个 iteration 前,把隐状态作从计算图中分离 detach_,也就是将其看作一个常数输入,下面是训练的代码

#@save
def train_epoch_ch8(net, train_iter, loss, updater, device, use_random_iter):
    """训练网络一个迭代周期(定义见第8章)"""
    state, timer = None, d2l.Timer()
    metric = d2l.Accumulator(2)  # 训练损失之和,词元数量
    for X, Y in train_iter:
        if state is None or use_random_iter:
            # 在第一次迭代或使用随机抽样时初始化state
            state = net.begin_state(batch_size=X.shape[0], device=device)
        else:
            if isinstance(net, nn.Module) and not isinstance(state, tuple):
                # state对于nn.GRU是个张量
                state.detach_()
            else:
                # state对于nn.LSTM或对于我们从零开始实现的模型是个元组
                for s in state:
                    s.detach_()
        y = Y.T.reshape(-1)
        X, y = X.to(device), y.to(device)
        y_hat, state = net(X, state)
        l = loss(y_hat, y.long()).mean()
        if isinstance(updater, torch.optim.Optimizer):
            updater.zero_grad()
            l.backward()
            grad_clipping(net, 1)
            updater.step()
        else:
            l.backward()
            grad_clipping(net, 1)
            # 因为已经调用了mean函数
            updater(batch_size=1)
        metric.add(l * y.numel(), y.numel())
    return math.exp(metric[0] / metric[1]), metric[1] / timer.stop()

循环神经网络的简洁实现

定义模型

高级API提供了循环神经网络的实现。 我们构造一个具有256个隐藏单元的单隐藏层的循环神经网络层 rnn_layer。 事实上,我们还没有讨论多层循环神经网络的意义

import torch
from torch import nn
from torch.nn import functional as F
from d2l import torch as d2l

batch_size, num_steps = 32, 35
train_iter, vocab = d2l.load_data_time_machine(batch_size, num_steps)

num_hiddens = 256
# 可以看到 rnn_layer 的定义是不需要时间步的,应该是在输入数据上进行定义
# 这里应该能体会到“循环”的感觉了,即数据不断地流入流出同一个网络
rnn_layer = nn.RNN(len(vocab), num_hiddens)

我们使用张量来初始化隐状态,它的形状是(隐藏层数,批量大小,隐藏单元数)

state = torch.zeros((1, batch_size, num_hiddens))	# [1, 32, 256]

通过一个隐状态和一个输入,我们就可以用更新后的隐状态计算输出。 需要强调的是,rnn_layer的“输出”(Y)不涉及输出层的计算: 它是指每个时间步的隐状态,这些隐状态可以用作后续输出层的输入

# 区别于卷积神经网络把 batch_size 的维度放在第一个,rnn 的输入将时间步维度放在第一个
X = torch.rand(size=(num_steps, batch_size, len(vocab)))
Y, state_new = rnn_layer(X, state)
# Y.shape = [35, 32, 256]
# state_new.shape = [1, 32, 256] 是 Y 的组后一个输出,即:Y[34] or Y[num_steps-1]

rnn_layer只包含隐藏的循环层,我们还需要创建一个单独的输出层,这里用 nn.Linear(num_hidden, len(vocab)) 实现,下面完整实现

#@save
class RNNModel(nn.Module):
    """循环神经网络模型"""
    def __init__(self, rnn_layer, vocab_size, **kwargs):
        super(RNNModel, self).__init__(**kwargs)
        self.rnn = rnn_layer
        self.vocab_size = vocab_size
        self.num_hiddens = self.rnn.hidden_size
        # 如果RNN是双向的(之后将介绍),num_directions应该是2,否则应该是1
        if not self.rnn.bidirectional:
            self.num_directions = 1
            self.linear = nn.Linear(self.num_hiddens, self.vocab_size)
        else:
            self.num_directions = 2
            self.linear = nn.Linear(self.num_hiddens * 2, self.vocab_size)

    def forward(self, inputs, state):
        # input.shape = (batch_size, num_steps) 需要转置一下并生成 onehot 特征
        X = F.one_hot(inputs.T.long(), self.vocab_size)
        X = X.to(torch.float32)
        Y, state = self.rnn(X, state)
        # 全连接层首先将Y的形状改为(时间步数*批量大小,隐藏单元数)
        # 它的输出形状是(时间步数*批量大小,词表大小)。
        output = self.linear(Y.reshape((-1, Y.shape[-1])))
        return output, state

    def begin_state(self, device, batch_size=1):
        if not isinstance(self.rnn, nn.LSTM):
            # nn.GRU以张量作为隐状态
            return  torch.zeros((self.num_directions * self.rnn.num_layers,
                                 batch_size, self.num_hiddens),
                                device=device)
        else:
            # nn.LSTM以元组作为隐状态
            return (torch.zeros((
                self.num_directions * self.rnn.num_layers,
                batch_size, self.num_hiddens), device=device),
                    torch.zeros((
                        self.num_directions * self.rnn.num_layers,
                        batch_size, self.num_hiddens), device=device))

通过时间反向传播

这一节详细为了计算循环神经网络的梯度,向我们展示了循环神经网络中存在的梯度数值不稳定

梯度策略

教材提到了三种策略:随机阶段,常规截断,完整计算

  • 第一行采用随机截断,方法是将文本划分为不同长度的片断
  • 第二行采用常规截断,方法是将文本分解为相同长度的子序列。 这也是教材在循环神经网络实验中一直在做的,具体来说就是使用 detach_() 方法将每一个 iteration 输入的初始隐状态从计算图中分离开
  • 第三行采用通过时间的完全反向传播,结果是产生了在计算上不可行的表达式(迭代链很长,计算复杂)

遗憾的是,虽然随机截断在理论上具有吸引力, 但很可能是由于多种因素在实践中并不比常规截断更好

反向传播细节

首先写每个时间步的方程

写出损失函数

画出计算图

在任意时间步tt, 目标函数关于模型输出的微分计算是相当简单的

现在,我们可以计算目标函数关于输出层中参数 $W_{qh}$ 的梯度

根据计算图,要计算关于隐状态的梯度。我们不能够同时得到所有时间步的梯度,而是从最后一个时间步 T 倒推

进行迭代计算,最终得到所有时间步的表达式

为了进行分析,对于任何时间步 1≤t≤T 展开递归计算得

这个简单的线性例子已经展现了长序列模型的一些关键问题: 由于连续矩阵相乘,它可能陷入到潜在的非常大的幂。 这在数值上是不稳定的,表现形式为梯度消失或梯度爆炸,解决方法通常为梯度截断,或者使用(在之后学习的)更复杂的序列模型,如:LSTM/GRU

最后我们得到权重矩阵的梯度


Author: Declan
Reprint policy: All articles in this blog are used except for special statements CC BY 4.0 reprint polocy. If reproduced, please indicate source Declan !
  TOC