21.6. 用于个性化排名的神经协同过滤¶ 在 SageMaker Studio Lab 中打开 Notebook
本节将超越显式反馈,介绍用于隐式反馈推荐的神经协同过滤(NCF)框架。隐式反馈在推荐系统中普遍存在。点击、购买和观看等行为是常见的隐式反馈,它们易于收集并能表明用户的偏好。我们将要介绍的模型名为 NeuMF (He et al., 2017),是神经矩阵分解(neural matrix factorization)的缩写,旨在通过隐式反馈解决个性化排名任务。该模型利用神经网络的灵活性和非线性来取代矩阵分解的点积,以增强模型的表达能力。具体来说,该模型由两个子网络构成,包括广义矩阵分解(GMF)和多层感知机(MLP),并通过两条路径而非简单的点积来建模交互。这两个网络的输出被拼接起来用于最终的预测分数计算。与 AutoRec 中的评分预测任务不同,该模型基于隐式反馈为每个用户生成一个排名推荐列表。我们将使用上一节介绍的个性化排名损失来训练这个模型。
21.6.1. NeuMF 模型¶
如前所述,NeuMF 融合了两个子网络。GMF 是矩阵分解的一个通用神经网络版本,其输入是用户和物品潜在因子的逐元素乘积。它由两个神经层组成
其中 \(\odot\) 表示向量的哈达玛积(Hadamard product)。\(\mathbf{P} \in \mathbb{R}^{m \times k}\) 和 \(\mathbf{Q} \in \mathbb{R}^{n \times k}\) 分别对应用户和物品的潜在矩阵。\(\mathbf{p}_u \in \mathbb{R}^{ k}\) 是 \(P\) 的第 \(u\) 行,\(\mathbf{q}_i \in \mathbb{R}^{ k}\) 是 \(Q\) 的第 \(i\) 行。\(\alpha\) 和 \(h\) 分别表示输出层的激活函数和权重。\(\hat{y}_{ui}\) 是用户 \(u\) 可能给物品 \(i\) 的预测分数。
该模型的另一个组成部分是 MLP。为了丰富模型的灵活性,MLP 子网络不与 GMF 共享用户和物品嵌入。它使用用户和物品嵌入的拼接作为输入。通过复杂的连接和非线性变换,它能够估计用户和物品之间错综复杂的交互。更确切地说,MLP 子网络定义为
其中 \(\mathbf{W}^*, \mathbf{b}^*\) 和 \(\alpha^*\) 分别表示权重矩阵、偏置向量和激活函数。\(\phi^*\) 表示相应层的函数。\(\mathbf{z}^*\) 表示相应层的输出。
为了融合 GMF 和 MLP 的结果,NeuMF 没有采用简单的相加,而是将两个子网络的倒数第二层拼接起来,创建一个可以传递给更深层的特征向量。然后,将输出用矩阵 \(\mathbf{h}\) 和一个 Sigmoid 激活函数进行投影。预测层公式如下:
下图展示了 NeuMF 的模型架构。
图 21.6.1 NeuMF 模型示意图¶
import random
import mxnet as mx
from mxnet import autograd, gluon, np, npx
from mxnet.gluon import nn
from d2l import mxnet as d2l
npx.set_np()
21.6.2. 模型实现¶
以下代码实现了 NeuMF 模型。它由一个广义矩阵分解模型和一个具有不同用户和物品嵌入向量的 MLP 组成。MLP 的结构由参数 nums_hiddens
控制。ReLU 被用作默认的激活函数。
class NeuMF(nn.Block):
def __init__(self, num_factors, num_users, num_items, nums_hiddens,
**kwargs):
super(NeuMF, self).__init__(**kwargs)
self.P = nn.Embedding(num_users, num_factors)
self.Q = nn.Embedding(num_items, num_factors)
self.U = nn.Embedding(num_users, num_factors)
self.V = nn.Embedding(num_items, num_factors)
self.mlp = nn.Sequential()
for num_hiddens in nums_hiddens:
self.mlp.add(nn.Dense(num_hiddens, activation='relu',
use_bias=True))
self.prediction_layer = nn.Dense(1, activation='sigmoid', use_bias=False)
def forward(self, user_id, item_id):
p_mf = self.P(user_id)
q_mf = self.Q(item_id)
gmf = p_mf * q_mf
p_mlp = self.U(user_id)
q_mlp = self.V(item_id)
mlp = self.mlp(np.concatenate([p_mlp, q_mlp], axis=1))
con_res = np.concatenate([gmf, mlp], axis=1)
return self.prediction_layer(con_res)
21.6.3. 带负采样的自定义数据集¶
对于成对排名损失,一个重要的步骤是负采样。对于每个用户,用户未交互过的物品是候选物品(未观察到的条目)。以下函数将用户身份和候选物品作为输入,并为每个用户从该用户的候选集中随机采样负样本。在训练阶段,模型确保用户喜欢的物品排名高于用户不喜欢或未交互过的物品。
class PRDataset(gluon.data.Dataset):
def __init__(self, users, items, candidates, num_items):
self.users = users
self.items = items
self.cand = candidates
self.all = set([i for i in range(num_items)])
def __len__(self):
return len(self.users)
def __getitem__(self, idx):
neg_items = list(self.all - set(self.cand[int(self.users[idx])]))
indices = random.randint(0, len(neg_items) - 1)
return self.users[idx], self.items[idx], neg_items[indices]
21.6.4. 评估器¶
在本节中,我们采用按时间划分的策略来构建训练集和测试集。使用两种评估指标来评估模型效果,即给定截断点 \(\ell\) 的命中率(\(\textrm{Hit}@\ell\))和 ROC 曲线下面积(AUC)。每个用户在给定位置 \(\ell\) 的命中率表示推荐的物品是否包含在排名前 \(\ell\) 的列表中。其正式定义如下:
其中 \(\textbf{1}\) 是一个指示函数,如果真实物品排在前 \(\ell\) 的列表中,则等于 1,否则等于 0。\(rank_{u, g_u}\) 表示用户 \(u\) 的真实物品 \(g_u\) 在推荐列表中的排名(理想排名为 1)。\(m\) 是用户数量。\(\mathcal{U}\) 是用户集合。
AUC 的定义如下:
其中 \(\mathcal{I}\) 是物品集合。\(S_u\) 是用户 \(u\) 的候选物品。请注意,许多其他评估协议如精确率、召回率和归一化折损累计增益(NDCG)也可以使用。
以下函数计算每个用户的命中数和 AUC。
#@save
def hit_and_auc(rankedlist, test_matrix, k):
hits_k = [(idx, val) for idx, val in enumerate(rankedlist[:k])
if val in set(test_matrix)]
hits_all = [(idx, val) for idx, val in enumerate(rankedlist)
if val in set(test_matrix)]
max = len(rankedlist) - 1
auc = 1.0 * (max - hits_all[0][0]) / max if len(hits_all) > 0 else 0
return len(hits_k), auc
然后,总体的命中率和 AUC 计算如下。
#@save
def evaluate_ranking(net, test_input, seq, candidates, num_users, num_items,
devices):
ranked_list, ranked_items, hit_rate, auc = {}, {}, [], []
all_items = set([i for i in range(num_users)])
for u in range(num_users):
neg_items = list(all_items - set(candidates[int(u)]))
user_ids, item_ids, x, scores = [], [], [], []
[item_ids.append(i) for i in neg_items]
[user_ids.append(u) for _ in neg_items]
x.extend([np.array(user_ids)])
if seq is not None:
x.append(seq[user_ids, :])
x.extend([np.array(item_ids)])
test_data_iter = gluon.data.DataLoader(
gluon.data.ArrayDataset(*x), shuffle=False, last_batch="keep",
batch_size=1024)
for index, values in enumerate(test_data_iter):
x = [gluon.utils.split_and_load(v, devices, even_split=False)
for v in values]
scores.extend([list(net(*t).asnumpy()) for t in zip(*x)])
scores = [item for sublist in scores for item in sublist]
item_scores = list(zip(item_ids, scores))
ranked_list[u] = sorted(item_scores, key=lambda t: t[1], reverse=True)
ranked_items[u] = [r[0] for r in ranked_list[u]]
temp = hit_and_auc(ranked_items[u], test_input[u], 50)
hit_rate.append(temp[0])
auc.append(temp[1])
return np.mean(np.array(hit_rate)), np.mean(np.array(auc))
21.6.5. 训练和评估模型¶
训练函数定义如下。我们以成对的方式训练模型。
#@save
def train_ranking(net, train_iter, test_iter, loss, trainer, test_seq_iter,
num_users, num_items, num_epochs, devices, evaluator,
candidates, eval_step=1):
timer, hit_rate, auc = d2l.Timer(), 0, 0
animator = d2l.Animator(xlabel='epoch', xlim=[1, num_epochs], ylim=[0, 1],
legend=['test hit rate', 'test AUC'])
for epoch in range(num_epochs):
metric, l = d2l.Accumulator(3), 0.
for i, values in enumerate(train_iter):
input_data = []
for v in values:
input_data.append(gluon.utils.split_and_load(v, devices))
with autograd.record():
p_pos = [net(*t) for t in zip(*input_data[:-1])]
p_neg = [net(*t) for t in zip(*input_data[:-2],
input_data[-1])]
ls = [loss(p, n) for p, n in zip(p_pos, p_neg)]
[l.backward(retain_graph=False) for l in ls]
l += sum([l.asnumpy() for l in ls]).mean()/len(devices)
trainer.step(values[0].shape[0])
metric.add(l, values[0].shape[0], values[0].size)
timer.stop()
with autograd.predict_mode():
if (epoch + 1) % eval_step == 0:
hit_rate, auc = evaluator(net, test_iter, test_seq_iter,
candidates, num_users, num_items,
devices)
animator.add(epoch + 1, (hit_rate, auc))
print(f'train loss {metric[0] / metric[1]:.3f}, '
f'test hit rate {float(hit_rate):.3f}, test AUC {float(auc):.3f}')
print(f'{metric[2] * num_epochs / timer.sum():.1f} examples/sec '
f'on {str(devices)}')
现在,我们可以加载 MovieLens 100k 数据集并训练模型。由于 MovieLens 数据集中只有评分,在牺牲一些准确性的情况下,我们将这些评分二值化为 0 和 1。如果用户对某个物品进行了评分,我们认为隐式反馈为 1,否则为 0。对物品进行评分的行为可以被视为一种提供隐式反馈的形式。在这里,我们以 seq-aware
模式分割数据集,将用户最近交互的物品留作测试。
batch_size = 1024
df, num_users, num_items = d2l.read_data_ml100k()
train_data, test_data = d2l.split_data_ml100k(df, num_users, num_items,
'seq-aware')
users_train, items_train, ratings_train, candidates = d2l.load_data_ml100k(
train_data, num_users, num_items, feedback="implicit")
users_test, items_test, ratings_test, test_iter = d2l.load_data_ml100k(
test_data, num_users, num_items, feedback="implicit")
train_iter = gluon.data.DataLoader(
PRDataset(users_train, items_train, candidates, num_items ), batch_size,
True, last_batch="rollover", num_workers=d2l.get_dataloader_workers())
然后我们创建并初始化模型。我们使用一个三层 MLP,其隐藏层大小固定为 10。
devices = d2l.try_all_gpus()
net = NeuMF(10, num_users, num_items, nums_hiddens=[10, 10, 10])
net.initialize(ctx=devices, force_reinit=True, init=mx.init.Normal(0.01))
[22:14:55] ../src/storage/storage.cc:196: Using Pooled (Naive) StorageManager for CPU
[22:14:56] ../src/storage/storage.cc:196: Using Pooled (Naive) StorageManager for GPU
[22:14:56] ../src/storage/storage.cc:196: Using Pooled (Naive) StorageManager for GPU
以下代码训练该模型。
lr, num_epochs, wd, optimizer = 0.01, 10, 1e-5, 'adam'
loss = d2l.BPRLoss()
trainer = gluon.Trainer(net.collect_params(), optimizer,
{"learning_rate": lr, 'wd': wd})
train_ranking(net, train_iter, test_iter, loss, trainer, None, num_users,
num_items, num_epochs, devices, evaluate_ranking, candidates)
train loss 16.982, test hit rate 0.075, test AUC 0.531
2.6 examples/sec on [gpu(0), gpu(1)]
21.6.6. 小结¶
为矩阵分解模型增加非线性有助于提高模型的能力和效果。
NeuMF 是矩阵分解和 MLP 的结合。MLP 将用户和物品嵌入的拼接作为输入。