1. CRF 算法基本原理
一般地,在 NLP 中是
- 输入单词序列经过
BiLSTM
或BERT
处理,生成每个单词的特征表示 - 然后将特征表示输入到 CRF 层,如下图所示,就是 BiLSTM+CRF 的框架图。
不直接接一个 softmax 层是因为,上一个标签是对后续标签有约束的,CRF 就是学习这个约束的过程,最后使得输出更合理。

因为前的特征表示过程是很简单的,对应 BERT 类你只要类似文本分类一样输入,取 bert 输出的 Output[0],即 (bs, seq_len, hidden_dim) 输出矩阵,“喂”到 CRF 层即可。

如图所示 CRF 接码过程图中所示。
假定输入为 x=[x0,x1,x2,xn] (这里的 x 表示输入文本对应的发射分数,每个词 xi 都对应一个发射向量,维度为标签数目)。
输出为 y=[y0,y1,⋯,yn] 那么其条件概率表示为:
P(y|x)=P(y0,y1,⋯,yn|x0,x1,⋯,xn)
例如,我们输入文本对应的标签序列:B-Person, I-Person, O, O,B-Org I-Org。那么这个序列的转移分数可按照如下方式计算:
Seqt=TI−Person,B−Person+TO,I−Person+TO,O+TO,B−Org+TB−Org,I−Org
而 CRF 解码策略要求输出路径在所有的路径中概率最大。假设标签数量为 k, 输入文本长度为 n, 显然会有 N=kn 条路径,第 i 条路径分数记为 Si. 那么输出序列 i 的概率为:
P(Si)=eSi∑NjeSj
表示输出序列 i 比所有路径的分数。
我们的模板就是使得真实路径 P(SReal) 最大,为了方便计算,我们加上负号最小化这个 loss:
loss=−P(SReal)=−eSReal∑NjeSj
式 3 取对数有:
loss=−logeSreal∑NjeSj=−(log(eSreal)–log(N∑jeSj))=log(N∑jeSj)–log(eSreal)=log(eS1+eS2+…+eSN)–Sreal
这就是 CRF 建模的损失函数,即等于 所有路径分数 exp 和的 log 减去 单条真实路径分数Sreal。这样我们只要计算每条路径的分数了,为了方便计算先约定一些记号。
2.CRF 参数和路径分数计算公式
Tag | Tag id |
---|---|
B-Person | 0 |
I-Person | 1 |
B-Org | 2 |
I-Org | 3 |
O | 4 |
为了方便将 Tag 映射为 Tag id, 一般在代码中会有 label2id = {label: i for i, label in enumerate(labels)}
来维护一个字典,方便后续查找。
- 发射分数矩阵为 E, shape 为 [n,tag_size]。每行表示输入文本的一个 token 的发射分数。
- 转移分数矩阵为 T, shape 为 [tag_size,tag_size], 表示每个 Tag 转移到下一个 Tag 的分数。比如 T03 表示 B -Person 转移到 I -Org 的分数,那么这个 T 矩阵上该位置的数值就是对应分数。
我们知道,每条路径分数就是从发射分数到转移分数的路径组合计算得来的。如上图 5 中黄色路径中,
- x0 的标签是 B -Person, 发射分数为 E00
- 接下来 x1 的标签是 I−Person,对应发射分数为 E11
- 而由 B -person 像 I -Person 转移的分数是 T10。到这里的分数就是 E00+T10+E11
- 接下来是 x2, 标签是 O。这里 x1 的标签 I−Person 像 x2 标签 O 转移的分数为 T41。到这步的分数是 E00+T10+E11+T41+E24.
推导整条路径分数计算公式为:
Sreal=n−1∑i=0Ei,yi+n−2∑i=0Tyi+1,yi
3. 前向算法
要优化 CRF 路径,就是优化 loss。根据公式 4,要计算所有路径分数和真实路径分数。实际上所有路径分数计算需要进行 log sum exp 计算,本身很复杂。另外假设有长度为 50 的文本,标签数量为 31,那么整个路径数目为 3150,计算代价太高了,训练和预测速度将非常慢。
实际上,使用称为 前向算法 的动态规划。它会将所有路径和计算,拆解为每个位置和计算,最终得出所有路径和。3
由式 4 可知所有路径和分数是:
log(eS1+eS2+…+eSN)
下面来介绍其计算原理,为了简化计算量,只使用 2 个 tag,3 个词。如下图所示。
假定其发射分数矩阵 emission 为:
emissioni=[xi0,xi1]
其中, xi 表示位置为 i 的词对应的发射分数,xi0 表示 i 位置 tag0 的标签的分数,xi1 表示 i 位置 tag1 的标签的分数。
转移矩阵 trans 矩阵为:
trans=[t00t01 t10t11]
其中, t01 表示从 tag1 到 tag0 的转移分数,t10 表示从 tag0 到 tag1 的转移分数。
路径分数和:
alphai=[si0,si1]
其中 si0 表示从到 i 位置为止,以标签 tag0 结尾的所有路径分数。具体来说,alpha2=[s20,s21] 表示到 x2 位置,这里理论上会是所有 tag,但这里简化了只设定 2 个 tag,那么就是以 tag0 结尾的所有路径和 + 以 tag1 结尾的所有路径分数。比如下图中 s20 就表示了:
x00,x10,x20x00,x11,x20x01,x10,x20x01,x11,x20
这四条路径的分数。

下面来具体看看路径分数的计算,如果计算到 x2, 需要 3 个步骤,这里只介绍到 x1,后续计算类似推导。
- 截止到 x0 位置。
该位置发射分数为 emission0=[x00,x01], 这里是序列起始位置,该位置路径分数和为 alpha0=[s00,s01]=[x00,x01]。路径分数计算为:
total0=log(ex00+ex01)
- 截止到 x1 位置
这步就有 x0 向 x1 位置转移的过程,会复杂一些。x1 位置的发射分数为:
emission1=[x10,x11]
转移概率矩阵为:
trans=[t00t01t10t11]
前一个位置路径分数 alpha0=[x00,x01]。
展开 emission1, alpha0 两者方便后续用矩阵计算。
emission1=[x10x10x11x11]
alpha0=[x00x01x00x01]
那么截止到 x1 位置路径分数和计算可以表示为, 前一步 alpha0+ 转移状态分数 trans+ 发射分数 emission1. 公式计算为:
scores=alpha0+trans+emission1=[x00x01x00x01]+[t00t01t10t11]+[x10x10x11x11]=[x00+t00+x10x01+t01+x10x00+t10+x11x01+t11+x11]
以第一行为例,第一行表示当前位置 x1 以 tag0 结尾的所有路径分数,那么在第一行所表示的路径分数最大的那条,就是当前位置 x1 以 tag0 结尾的分数最大路径。
那么,我们可以计算当前位置 x1 的各个标签结尾的路径分数 alpha1: alpha1=[log(ex00+t00+x10+ex01+t01+x10),log(ex00+t10+x11+ex01+t11+x11)]
这样计算截止到 x1 位置,所有路径和为:
total1=log(ealpha1[0]+ealpha1[1])=log(elog(ex00+t00+x10+ex01+t01+x10)+elog(ex00+t10+x11+ex01+t11+x11))=log(ex00+t00+x10+ex01+t01+x10+ex00+t10+x11+ex01+t11+x11)
这就是到 x1 位置的所有路径和分数,也就是归一化分数。
还可以继续计算 x2 位置,但都类似,就是前一路径分数 + 转移状态分数 + 发射分数。
4. Viterbi 算法
Viterbi 算法解决的是,如何使用 CRF 从全部的路径中解码出得分最高的那条路径?
具体来说,如图 7 所示,接码得到分数最大的路径,如橙黄色所示路径。
大部分约定记号跟小节 3 一样,区别有以下:
- alphai=[si0,si1] 表示到 xi 位置,以 tag0 和 tag1 记为的路径中,取得的最大分数的路径。具体到图中就是 x00,x11,x20 的路径分数。
- 另外,引入 betai=[pi0,pi1]。表示当前 xi 位置,以当前位置 xi 相应标签结尾的路径中,分数最大的路径在前一个位置即 xi−1 的标签索引。
如 x2 位置,beta2=[p20,p21],p20 表示截止到 x2 位置,以标签 tag0 结尾的最大分数路径的前一个位置的 xi−1 的标签索引,那么就是 p20=1。同理,p21=1。
- 截止到 x0 位置
该位置发射分数为 emission0=[x00,x01], 这里是序列起始位置,该位置路径分数和为 alpha0=[x00,x01]。x0 前面没有路径了,初始化参数 beta0=[−1,−1]
- 截止到 x1 位置
其分数为,还是和式 12 一样。
scores=alpha0+trans+emission1=[x00x01x00x01]+[t00t01 t10t11]+[x10x10x11x11]=[x00+t00+x10x01+t01+x10x00+t10+x11x01+t11+x11]
同样地,第一行表示当前位置 x1 以 tag0 结尾的所有路径的得分,真是路径是取两个分数中国最大的。因此,我们可以算出到 x1 各个标签的最大路径分数 alpha1。具体公式为:alpha1=[max
这里就是,若以 tag0 标签结尾,那么路径有 x_{00}+t_{00}+x_{10} 和 x_{01}+t_{01}+x_{10}, 其中较大这就是我们需要的最大路径。
不妨假设:
x_{00}+t_{00}+x_{10} < x_{01}+t_{01}+x_{10}\\ x_{00}+t_{10}+x_{11} > x_{01}+t_{11}+x_{11}
这样可以看出 x_1 的索引位置为 \text{beta}_{1} = [1, 0]。因为 x{00}+t_{00}+x_{10} < x_{01}+t_{01}+x_{10} 表示 tag0 结尾的路径中较大的是 x_01,即前一步是 tag1。
- 截止到 x_2 位置。
发射分数 \text{emission}_2 = [x_{20}, x_{21}], 转移概率矩阵为:
\text{trans} =\left[\begin{matrix} t_{00} & t_{01}\ t_{10} & t_{11} \end{matrix} \right]
x_2 前一个位置各个标签路径累积和:
\text{alpha}_1=[\max(x_{00}+t_{00}+x_{10}, x_{01}+t_{01}+x_{10}),\ \max(x_{00}+t_{10}+x_{11}, x_{01}+t_{11}+x_{11})]
展开发射矩阵和路径分数和:
\text{alpha}_2 = \left[\begin{matrix} \max(x_{00}+t_{00}+x_{10}, x_{01}+t_{01}+x_{10}) & \max(x_{00}+t_{10}+x_{11}, x_{01}+t_{11}+x_{11}) \\ \max(x_{00}+t_{00}+x_{10}, x_{01}+t_{01}+x_{10}) & \max(x_{00}+t_{10}+x_{11}, x_{01}+t_{11}+x_{11}) \end{matrix} \right]
那么 x_2 位置的每条分数为:
\begin{aligned} \text{scores} &= \text{alpha}1+\text{trans}+\text{emission}_2 \\ &= \left[\begin{matrix} \max(x{00}+t_{00}+x_{10}, x_{01}+t_{01}+x_{10}) & \max(x_{00}+t_{10}+x_{11}, x_{01}+t_{11}+x_{11}) \\ \max(x_{00}+t_{00}+x_{10}, x_{01}+t_{01}+x_{10}) & \max(x_{00}+t_{10}+x_{11}, x_{01}+t_{11}+x_{11}) \end{matrix} \right] + \left[\begin{matrix} t_{00} & t_{01}\ t_{10} & t_{11} \end{matrix} \right] + \left[\begin{matrix} x_{20} & x_{20}\ x_{21} & x_{21} \end{matrix} \right] \\ &= \left[\begin{matrix} \max(x_{00}+t_{00}+x_{10}, x_{01}+t_{01}+x_{10}) +t_{00}+x_{20} & \max(x_{00}+t_{10}+x_{11}, x_{01}+t_{11}+x_{11})+t_{01}+x_{20}\\ \max(x_{00}+t_{00}+x_{10}, x_{01}+t_{01}+x_{10})+t_{10}+x_{21} & \max(x_{00}+t_{10}+x_{11}, x_{01}+t_{11}+x_{11}) +t_{11}+x_{21} \end{matrix} \right] \end{aligned} \tag{16}
这样就可以算出当前 x_2 位置的最大路径分数 \text{alpha}_2: \begin{aligned} \text{alpha}_2 &= [\max(scores[0,0], scores[0,1]), \max(scores[1,0], scores[1,1])] \\ &= [\max( \max(x{00}+t_{00}+x_{10}, x_{01}+t_{01}+x_{10}) +t_{00}+x_{20}, \max(x_{00}+t_{10}+x_{11}, x_{01}+t_{11}+x_{11})+t_{01}+x_{20}), \\ &\qquad \max(\max(x_{00}+t_{00}+x_{10}, x_{01}+t_{01}+x_{10})+t_{10}+x_{21}, \max(x_{00}+t_{10}+x_{11}, x_{01}+t_{11}+x_{11}) +t_{11}+x_{21} )] \end{aligned} \tag{17}
不妨假设:
\begin{aligned} \max(x_{00}+t_{00}+x_{10}, x_{01}+t_{01}+x_{10}) +t_{00}+x_{20} > \max(x_{00}+t_{10}+x_{11}, x_{01}+t_{11}+x_{11})+t_{01}+x_{20}) \\ \max(x_{00}+t_{00}+x_{10}, x_{01}+t_{01}+x_{10}) +t_{10}+x_{21} < \max(x_{00}+t_{10}+x_{11}, x_{01}+t_{11}+x_{11})+t_{11}+x_{21}) \end{aligned}\tag{18}
其实式 18 很容易理解,只是看起来复杂,只是表示前一步 tag 转换到下一步分数比较。
上一步中假设:
\begin{aligned} x_{00}+t_{00}+x_{10} < x_{01}+t_{01}+x_{10}\ x_{00}+t_{10}+x_{11} > x_{01}+t_{11}+x_{11}\end{aligned}\tag{19}
代入式 18 有:
x_{01}+t_{01}+x_{10}+t_{01}+x_{10} < x_{00}+t_{10}+x_{11}+t_{01}+x_{20}\\ x_{01}+t_{01}+x_{10}+t_{10}+x_{21} > x_{00}+t_{10}+x_{11}+t_{11}+x_{21} \tag{20}
因此 x_2 位置的索引 \text{beta}_2 = [1, 0], 因为 x_{00}+t_{10}+x_{11}+t_{01}+x_{20} 表示 tag0 是前一步 tag1 经过 t_{01} 转换为 x_{20}。同理可得 tag1 是 tag0 而来,当然这个比较就是路径分数最大化的过程。
此时,
\text{alpha}2 = [x_{00}+t_{10}+x_{11}+t_{01}+x_{20}, x_{01}+t_{01}+x_{10}+t_{10}+x_{21} ] \tag{21}
图 7 中橙黄色路径分数高,因此有:
x_{00}+t_{10}+x_{11}+t_{01}+x_{20} > x_{01}+t_{01}+x_{10}+t_{10}+x_{21} \tag{22}
这表示 x_2 位置所有标签对应的最大路径中以 tag0 结尾的是最大的。
- 开始解码标签序列
- 因为我们有 \text{beta}_{i} = [p_{i0}, p_{i1}] 这个前一步最大标签的索引,那么如果 x_2 位置 tag0 最大,\text{beta}_2 = [1, 0],其前一步 x_1 应该取 1,表示 tag1.
- 到 x_1 位置,这时取 tag1, 前一步 x_0 中 \text{beta}_{1} = [1, 0],那么应该取 0, 即 tag0
- 最后到 \text{beta}_0=[-1, -1], 就结束了,因此输出的最大路径就是 tag0->tag1->tag0.
5.pytoch-crf 中实现
一般使用 pytorch-crf 包中的 CRF 类,其继承自 nn.Module
。class CRF(nn.Module):
中最重要的就是:
def forward( | |
self, | |
emissions: torch.Tensor, | |
tags: torch.LongTensor, | |
mask: Optional[torch.ByteTensor] = None, | |
reduction: str = 'sum', | |
) -> torch.Tensor: | |
def decode(self, emissions: torch.Tensor, | |
mask: Optional[torch.ByteTensor] = None) -> List[List[int]]: | |
self._validate(emissions, mask=mask) | |
if mask is None: | |
mask = emissions.new_ones(emissions.shape[:2], dtype=torch.uint8) | |
if self.batch_first: | |
emissions = emissions.transpose(0, 1) | |
mask = mask.transpose(0, 1) | |
return self._viterbi_decode(emissions, mask) |
一般实例化一个 crf:
from torchcrf import CRF | |
crf = CRF(num_labels, batch_first=True) |
forward
用来计算 loss, 用于 训练。loss = - crf(seq_out, labels, mask=attention_mask.bool(), reduction='mean')
decode
用来输出 logits, 用于 预测解码。logits = crf.decode(seq_out, mask=attention_mask.bool())
.
forward 部分:
def forward( | |
self, | |
emissions: torch.Tensor, | |
tags: torch.LongTensor, | |
mask: Optional[torch.ByteTensor] = None, | |
reduction: str = 'sum', | |
) -> torch.Tensor: | |
"""Compute the conditional log likelihood of a sequence of tags given emission scores. | |
Args: | |
emissions (`~torch.Tensor`): Emission score tensor of size | |
``(seq_length, batch_size, num_tags)`` if ``batch_first`` is ``False``, | |
``(batch_size, seq_length, num_tags)`` otherwise. | |
tags (`~torch.LongTensor`): Sequence of tags tensor of size | |
``(seq_length, batch_size)`` if ``batch_first`` is ``False``, | |
``(batch_size, seq_length)`` otherwise. | |
mask (`~torch.ByteTensor`): Mask tensor of size ``(seq_length, batch_size)`` | |
if ``batch_first`` is ``False``, ``(batch_size, seq_length)`` otherwise. | |
reduction: Specifies the reduction to apply to the output: | |
``none|sum|mean|token_mean``. ``none``: no reduction will be applied. | |
``sum``: the output will be summed over batches. ``mean``: the output will be | |
averaged over batches. ``token_mean``: the output will be averaged over tokens. | |
Returns: | |
`~torch.Tensor`: The log likelihood. This will have size ``(batch_size,)`` if | |
reduction is ``none``, ``()`` otherwise. | |
""" | |
self._validate(emissions, tags=tags, mask=mask) | |
if reduction not in ('none', 'sum', 'mean', 'token_mean'): | |
raise ValueError(f'invalid reduction: {reduction}') | |
if mask is None: | |
mask = torch.ones_like(tags, dtype=torch.uint8) | |
if self.batch_first: | |
emissions = emissions.transpose(0, 1) | |
tags = tags.transpose(0, 1) | |
mask = mask.transpose(0, 1) | |
# shape: (batch_size,) | |
numerator = self._compute_score(emissions, tags, mask) | |
#下面是_compute_score 计算核心部分,就是前一步的转移分数 + 发射分数 | |
#for i in range(1, seq_length): | |
## Transition score to next tag, only added if next timestep is valid (mask == 1) | |
## shape: (batch_size,) | |
#score += self.transitions[tags[i - 1], tags[i]] * mask[i] | |
## Emission score for next tag, only added if next timestep is valid (mask == 1) | |
## shape: (batch_size,) | |
#score += emissions[i, torch.arange(batch_size), tags[i]] * mask[i] | |
# shape: (batch_size,) | |
denominator = self._compute_normalizer(emissions, mask) | |
# shape: (batch_size,) | |
llh = numerator - denominator | |
if reduction == 'none': | |
return llh | |
if reduction == 'sum': | |
return llh.sum() | |
if reduction == 'mean': | |
return llh.mean() | |
assert reduction == 'token_mean' | |
return llh.sum() / mask.float().sum() |
而 decode 最重要的就是 _viterbi_decode
算法部分,其用 history = []
存储最大分数的索引,然后逆序遍历得到输出标注序列。
def _viterbi_decode(self, emissions: torch.FloatTensor, | |
mask: torch.ByteTensor) -> List[List[int]]: | |
# emissions: (seq_length, batch_size, num_tags) | |
# mask: (seq_length, batch_size) | |
assert emissions.dim() == 3 and mask.dim() == 2 | |
assert emissions.shape[:2] == mask.shape | |
assert emissions.size(2) == self.num_tags | |
assert mask[0].all() | |
seq_length, batch_size = mask.shape | |
# Start transition and first emission | |
# shape: (batch_size, num_tags) | |
score = self.start_transitions + emissions[0] | |
history = [] | |
# score is a tensor of size (batch_size, num_tags) where for every batch, | |
# value at column j stores the score of the best tag sequence so far that ends | |
# with tag j | |
# history saves where the best tags candidate transitioned from; this is used | |
# when we trace back the best tag sequence | |
# Viterbi algorithm recursive case: we compute the score of the best tag sequence | |
# for every possible next tag | |
for i in range(1, seq_length): | |
# Broadcast viterbi score for every possible next tag | |
# shape: (batch_size, num_tags, 1) | |
broadcast_score = score.unsqueeze(2) | |
# Broadcast emission score for every possible current tag | |
# shape: (batch_size, 1, num_tags) | |
broadcast_emission = emissions[i].unsqueeze(1) | |
# Compute the score tensor of size (batch_size, num_tags, num_tags) where | |
# for each sample, entry at row i and column j stores the score of the best | |
# tag sequence so far that ends with transitioning from tag i to tag j and emitting | |
# shape: (batch_size, num_tags, num_tags) | |
next_score = broadcast_score + self.transitions + broadcast_emission | |
# Find the maximum score over all possible current tag | |
# shape: (batch_size, num_tags) | |
next_score, indices = next_score.max(dim=1) | |
# Set score to the next score if this timestep is valid (mask == 1) | |
# and save the index that produces the next score | |
# shape: (batch_size, num_tags) | |
score = torch.where(mask[i].unsqueeze(1), next_score, score) | |
history.append(indices) | |
# End transition score | |
# shape: (batch_size, num_tags) | |
score += self.end_transitions | |
# Now, compute the best path for each sample | |
# shape: (batch_size,) | |
seq_ends = mask.long().sum(dim=0) - 1 | |
best_tags_list = [] | |
for idx in range(batch_size): | |
# Find the tag which maximizes the score at the last timestep; this is our best tag | |
# for the last timestep | |
_, best_last_tag = score[idx].max(dim=0) | |
best_tags = [best_last_tag.item()] | |
# We trace back where the best last tag comes from, append that to our best tag | |
# sequence, and trace it back again, and so on | |
for hist in reversed(history[:seq_ends[idx]]): | |
best_last_tag = hist[idx][best_tags[-1]] | |
best_tags.append(best_last_tag.item()) | |
# Reverse the order because we start from the last timestep | |
best_tags.reverse() | |
best_tags_list.append(best_tags) | |
return best_tags_list |
我们可以看到 Viterbi 算法就是利用动态规划不断计算当前位置的最大分数路径,同时记录前一步索引,最后来回溯整个序列。使得计算复杂度降低,计算效率增加。
Viterbi 算法的时间复杂度和空间复杂度取决于状态和观测序列的长度。假设状态序列长度为 N,观测序列长度为 T。
时间复杂度:Viterbi 算法的时间复杂度为 O(N^2 * T),其中 N 是状态的数量,T 是观测序列的长度。这是因为在每个观测时间步,需要计算每个状态的最大概率,并选择最优路径。对于每个状态,需要计算其与前一个时间步所有状态的转移概率乘积,并选择最大值。因此,需要进行 N 次乘法和比较操作。
空间复杂度:Viterbi 算法的空间复杂度为 O(N * T),其中 N 是状态的数量,T 是观测序列的长度。在算法执行过程中,需要维护一个 N * T 的矩阵,用于存储每个状态在每个时间步的最大概率值。此外,还需要保存一个 N * T 的矩阵,用于回溯最优路径。
需要注意的是,如果状态和观测的数量非常大,Viterbi 算法的复杂度可能会变得很高。在实际应用中,为了减少计算量,常常使用一些技巧和优化方法来降低复杂度,例如利用稀疏矩阵表示、剪枝策略等。
6. pytorch-crf 一般使用
import torch | |
from torchcrf import CRF | |
num_tags = 5 | |
seq_len = 3 | |
batch_size = 2 | |
model = CRF(num_tags, batch_first=True) | |
emission = torch.randn(batch_size, seq_len, num_tags) | |
tags = torch.tensor([[0, 1, 2], | |
[2, 4, 3], | |
], dtype=torch.long) | |
print(emission) | |
print(tags) | |
score = model(emission, tags) | |
print(score) | |
#引入 mask | |
mask = torch.tensor([[1, 1, 1], [1, 1,0]], dtype=torch.bool) | |
score = model(emission, tags, mask=mask) | |
print(score) | |
seq_tags =model.decode(emission, mask) | |
print(seq_tags) |
Inference
[2] CRF Layer on the Top of BiLSTM
[4] pytorch-crf