(多模态入门)VSE++模型实现

(多模态入门)VSE++模型实现多模态模型 VSE 实现 参考 多模态深度学习技术基础 vse

大家好,欢迎来到IT知识分享网。

数据集:Flicker8k

数据集处理:Flickr8k数据集处理(草履虫也能看得懂!)-CSDN博客


一:模型架构

VSE++模型是由图像表示提取器和文本表示提取器构成,二者将图像和文本映射到对应的表示空间。其中,图像表示提取器为在 ImageNet 数据集上与训练的 ResNet-152 模型或者 VGG19 模型,ResNet-152 模型和 VGG19 模型分别输出 2048 维和 4096 维的图像特征,文本表示提取器为 GRU 模型。

(多模态入门)VSE++模型实现


1:图像表示提取器

在这里使用在 ImageNet 数据集上与训练的 ResNet-152 模型或者 VGG19 模型作为图像表示提取器,二者都需要进行微调:需要更改最后一个全连接层,以输出符合对应空间维度的图像表示。需要注意,这里对图像表示进行了长度归一化。

class ImageRepExtractor(nn.Module):
    def __init__(self, embed_size, pretrained_model='resnet152', finetuned=True):
        """
        参数:
            embed_size: 维度
            pretrained_model: 图像表示提取器,如ResNet-152
            finetuned: 是否微调参数(默认True)
        """
        super(ImageRepExtractor, self).__init__()

        if pretrained_model == "resnet152":
            net = models.resnet152(weights=ResNet152_Weights.DEFAULT)       # 参考:https://pytorch.org/vision/main/models/generated/torchvision.models.resnet152.html ; https://zhuanlan.zhihu.com/p/225597229
            for param in net.parameters():
                param.requires_grad = finetuned
            
            net.fc = nn.Linear(net.fc.in_features, embed_size)
            nn.init.xavier_uniform_(net.fc.weight)      # 参考: https://blog.csdn.net/luoxuexiong/article/details/95772045

        elif pretrained_model == "vgg19":
            net = models.vgg19(weights=VGG19_Weights.DEFAULT)
            for param in net.parameters:
                param.requires_grad = finetuned

            net.classifier[6] = nn.Linear(net.classifier[6].in_feature, embed_size)
            nn.init.xavier_uniform_(net.classifier[6].weight)
            
        else:
            raise ValueError("Unknow image model"+ pretrained_model)
        self.net = net

    def forward(self, x):
        out = self.net(x)
        out = nn.functional.normalize(out)
        return out

2:文本表示提取器

这里使用 GRU 模型作为文字表示提取器,它的输入层为词嵌入形式,文本表示为最后一个词对应的隐藏层的输出。文本表示的维度也和对应表示空间的维度相同且也进行了长度归一化。

由于文本长度的不一致,我们给长度较短的序列填充了大量的 0(在词典中的序号中填充),如果这些 0 都参与运算就会浪费大量的计算资源。我们应使用 Pytorch 中内置的 pack_padded_sequence 函数来解决这个问题。

关于 pack_padded_sequence 函数,可参考:压缩填充张量 Pack_padded_sequence 介绍 – 知乎 (zhihu.com)

class TextRepExtractor(nn.Module):
    def __init__(self, vocab_size, word_dim, embed_size, num_layers):
        """
        参数:
            vocab_size: 词典大小
            word_dim: 词嵌入维度
            embed_size: 对应表示维度,即 RNN 隐藏层维度
            num_layers: RNN 隐藏层层数
        """
        super(TextRepExtractor, self).__init__()
        self.embed_dim = embed_size
        self.embed = nn.Embedding(vocab_size, word_dim)
        self.rnn = nn.GRU(word_dim, embed_size, num_layers, batch_first = True)

        self.embed.weight.data.uniform_(-0.1, 0.1)

    def forward(self, x, lengths):
        x = self.embed(x)
        # 参考:https://www.cnblogs.com/sbj123456789/p/9834018.html ; https://blog.csdn.net/BierOne/article/details/116133857
        packed = pack_padded_sequence(input=x, lengths=lengths, batch_first=True)
        _, hidden = self.rnn(packed)
        out = nn.functional.normalize(hidden[-1])
        return out

3:VSE++模型

有了上面两个部件,就很可以开始构建 VSE++ 模型了。只要利用图像表示器提取器和文字表示提取器对成对的图像和文本数据输出表示即可。

需要注意:要先按照文本的长短对数据进行排序,且为了评测模型时能够对其图像和文本数据,还需要恢复数据原始的输入顺序。

class VSEPP(nn.Module): def __init__(self, vocab_size, word_dim, embed_size, num_layers, image_model, finetuned = True): """ 参数: vocab_size: 词表大小 word_dim: 词嵌入维度 embed_size: 对应表示维度,即 RNN 隐藏层维度 num_layers: RNN隐藏层数 image_model: 图像表示提取器,ResNet或者VGG19 finetuned: 是否微调图像表示器 """ super(VSEPP, self).__init__() self.image_extractor = ImageRepExtractor(embed_size = embed_size, pretrained_model = image_model, finetuned = finetuned) self.text_extractor = TextRepExtractor(vocab_size = vocab_size, word_dim = word_dim, embed_size = embed_size, num_layers = num_layers) def forward(self, images, captions, cap_lens): # 按照 caption 的长短排序,并对照调整 image 的顺序 sorted_cap_len, sorted_cap_indices = torch.sort(cap_lens, 0, True) images = images[sorted_cap_indices] captions = captions[sorted_cap_indices] cap_lens = sorted_cap_len image_code = self.image_extractor(images) text_code = self.text_extractor(captions, cap_lens) if not self.training: # 恢复数据原始输入 _, recover_indices = torch.sort(sorted_cap_indices) image_code = image_code[recover_indices] text_code = text_code[recover_indices] return image_code, text_code

4:损失函数

VSE++ 模型采用了困难样本挖掘常用的 triplet 损失函数。部分代码参考 VSE++ 模型作者的源码。

class TripleNetLoss(nn.Module): def __init__(self, margin=0.2, hard_negative=False): super(TripleNetLoss, self).__init__() self.margin = margin self.hard_negative = hard_negative def forward(self, ie, te): """ 参数: ie: 图像表示,为 VSEPP 返回的 image_code te: 文字表示,为 VSEPP 返回的 text_code """ scores = ie.mm(te.t()) diagonal = scores.diag().view(ie.size(0), 1) d1 = diagonal.expand_as(scores) d2 = diagonal.t().expand_as(scores) # 图像为锚 cost_i = (self.margin + scores - d1).clamp(min=0) # 文本为锚 cost_t = (self.margin + scores - d2).clamp(min=0) # 损失矩阵对角线上的值不参与运算 mask = torch.eye(scores.size(0), dtype=torch.bool) I = torch.autograd.Variable(mask) if torch.cuda.is_available(): I = I.cuda() cost_i = cost_i.masked_fill_(I, 0) cost_t = cost_t.masked_fill_(I, 0) # 寻求困难样本 if self.hard_negative: cost_i = cost_i.max(1)[0] cost_t = cost_t.max(0)[0] return cost_t.sum() + cost_i.sum()

5:优化器:Adam

无需多言,Adam 几乎是最常用的优化器。

def get_optimizer(model, config): params = filter(lambda p : p.requires_grad, model.parameters()) return torch.optim.Adam(params=params, lr=config.learning_rate)

6:评估指标

这里实现了跨模态检索中最常用的评估指标 Recall@K 。该指标是指正确答案出现在前 K 个返回结果的样例占总样例的比例。

比如在以图检文的任务中,对于单一图片查询,在文本候选集中搜索它的 K 个最近邻的文本,如果返回的前 K 个文本中至少有一个文本和查询的图片匹配,则该次查询的计分为 1, 否则记为 0。

值得一提的是,这里提到的 Recall@K 和推荐系统中的 Recall@K 是完全不一样的。推荐系统中的 Recall@K 是 K 个推荐条目中的相关条目数量在所有相关条目数量中的占比,衡量的是系统的查全率。

在这里,首先利用 VSE++ 模型计算图像和文本编码,然后然后直接计算所有图像编码和所有文本编码之间的点积得到所有图像文本对之间的相似度得分(由于相邻的若干图片是一样的,所以每隔固定数量取图片即可),最后利用得分排序计算 Recall@K 。

def evaluate(data_loader, model, batch_size, captions_per_image): # 切换模型为评估模式 model.eval() image_codes = None text_codes = None device = next(model.parameters()).device N = len(data_loader.dataset) for i, (imgs, caps, caplens) in enumerate(data_loader): with torch.no_grad(): image_code, text_code = model(imgs.to(device), caps.to(device), caplens) if image_codes is None: image_codes = np.zeros((N, image_code.size(1))) text_codes = np.zeros((N, text_code.size(1))) # 将图文对应表示存到 numpy 数组中,之后在 CPU 上计算 recall st = i * batch_size ed = (i+1) * batch_size image_codes[st:ed] = image_code.data.cpu().numpy() text_codes[st:ed] = text_code.data.cpu().numpy() # 模型切换回训练模式 model.train() return calc_recall(image_codes, text_codes, captions_per_image)
def calc_recall(image_codes, text_codes, captions_per_image): # 之所以可以每隔固定数量取图片,是因为前面对图文数据对输入顺序进行了还原 scores = np.dot(image_codes[::captions_per_image], text_codes.T) # 以图检文, 按照从小到大排序 sorted_scores_indices = (-scores).argsort(axis = 1) (n_image, n_text) = scores.shape ranks_i2t = np.zeros(n_image) for i in range(0, n_image): # 一张图片对应 cpi 条文本,找到排名最靠前的文本位置 min_rank = 1e10 for j in range(i*captions_per_image, (i+1)*captions_per_image): rank = list(sorted_scores_indices[i,:]).index(j) if min_rank > rank: min_rank = rank ranks_i2t[i] = min_rank # 以文检图, 按照从小到大排序 sorted_scores_indices = (-scores).argsort(axis = 0) ranks_t2i = np.zeros(n_text) for i in range(n_text): rank = list(sorted_scores_indices[:,i]).index(i // captions_per_image) ranks_t2i[i] = rank # 最靠前的位置小于 k,即 recall@k, 这里计算了 k 取 1,5,10 时的图文互检的 recall r1_i2t = 100.0 * len(np.where(ranks_i2t < 1)[0]) / n_image r1_t2i = 100.0 * len(np.where(ranks_t2i < 1)[0]) / n_text r5_i2t = 100.0 * len(np.where(ranks_i2t < 5)[0]) / n_image r5_t2i = 100.0 * len(np.where(ranks_t2i < 5)[0]) / n_text r10_i2t = 100.0 * len(np.where(ranks_i2t < 10)[0]) / n_image r10_t2i = 100.0 * len(np.where(ranks_t2i < 10)[0]) / n_text return r1_i2t, r1_t2i, r5_i2t, r5_t2i, r10_i2t, r10_t2i

7:学习率随训练变化

def adjust_learning_rate(optimizer, epoch, config): """ 每隔 lr_updata 个轮次,学习率减少至当前的二分之一 参数: optimizer: 优化器 epoch: 训练轮次 config: 模型超参数和辅助变量 """ lr = config.learning_rate *(0.5 (epoch// config.lr_update)) lr = max(lr, config.min_learning_rate) for param_group in optimizer.param_groups: # 参考: https://blog.csdn.net/weixin_/article/details/ param_group['lr']= lr 

二:训练模型

训练模型可以分为 1:读数据;2:前馈计算;3:计算损失;4:更新参数;5:选择模型 一共 5 个步骤。

具体训练方案为一共训练 45 轮,初始学习速率为 0.00002,在 15 轮后将变为 0.000002 。

模型参数:

config = Namespace( captions_per_image=5, batch_size=32, word_dim=300, embed_size=1024, num_layers=1, image_model="resnet152", finetuned=True, learning_rate=0.00002, lr_update=15, min_learning_rate=0.000002, margin=0.2, hard_nagative=True, num_epochs=45, grad_clip=2, evaluate_step=60, # 每隔多少步在验证集上测试一次 checkpoint=None, # 如果不为 None,则利用该变量路径中的模型继续训练 best_checkpoint="./model/best_flickr8k.ckpt", # 表现最佳的模型的保存路径 last_checkpoint="./model/last_flickr8k.ckpt" # 训练完成时的模型的保存路径 )

训练过程——杂项:

# 是否可以用 cuda os.environ['CUDA_VISIBLE_DEVICES'] = '0' device = torch.device("cuda" if torch.cuda.is_available() else "cpu") # 数据文件夹/地址 data_dir = "./data/flickr8k" vocab_path = './data/flickr8k/vocab.json' # 读取词典,数据集 with open(vocab_path, 'r') as f: vocab = json.load(f) train_loader, val_loader, test_loader = mktrainval(data_dir, vocab_path, config.batch_size) # 随机初始化或载入已训练模型 start_epoch = 0 checkpoint = config.checkpoint if checkpoint is None: model = VSEPP(vocab_size = len(vocab), word_dim = config.word_dim, embed_size = config.embed_size, num_layers = config.num_layers, image_model = config.image_model, finetuned = config.finetuned) else: checkpoint = torch.load(checkpoint) start_epoch = checkpoint['epoch'] + 1 model = checkpoint['model'] # 优化器 optimizer = get_optimizer(model=model, config=config) # 模型移动到 GPU, 并进入训练模式 model.to(device) model.train() # 损失函数 loss_fn = TripleNetLoss(margin=config.margin, hard_negative=config.hard_nagative)

训练:

 # 训练 best_res = 0 print("开始训练") for epoch in range(start_epoch, config.num_epochs): adjust_learning_rate(optimizer, epoch, config) print(f"这是第{epoch}轮") for i, (imgs, caps, caplens) in enumerate(train_loader): optimizer.zero_grad() # 数据读取至 GPU imgs = imgs.to(device, non_blocking = True) caps = caps.to(device, non_blocking = True) # 向前传播 image_code, text_code = model(imgs, caps, caplens) # 计算损失 loss = loss_fn(image_code, text_code) loss.backward() # 梯度截断 if config.grad_clip > 0: nn.utils.clip_grad_norm_(model.parameters(), config.grad_clip) # 更新参数 optimizer.step() # 当前状态 state = { 'epoch': epoch, 'step': i, 'model': model, 'optimizer': optimizer } if (i+1) % config.evaluate_step == 0: r1_i2t, r1_t2i, r5_i2t, r5_t2i, r10_i2t, r10_t2i = evaluate(data_loader=val_loader, model=model, batch_size=config.batch_size, captions_per_image=config.captions_per_image ) recall_sum = r1_i2t + r1_t2i + r5_i2t + r5_t2i + r10_i2t + r10_t2i # 选择模型 if best_res < recall_sum: best_res = recall_sum torch.save(state, config.best_checkpoint) torch.save(state, config.last_checkpoint) print(f"epoch: {epoch}, step: {i+1}, loss: {loss.item()}") # 用效果最好的模型在测试集上进行测试 checkpoint = torch.load(config.best_checkpoint) model = checkpoint['model'] r1_i2t, r1_t2i, r5_i2t, r5_t2i, r10_i2t, r10_t2i = evaluate(data_loader=test_loader, model=model, batch_size=config.batch_size, captions_per_image=config.captions_per_image) print(f"Epoch: {checkpoint['epoch']}, \n I2T R@1: {r1_i2t}, T2I R@1: {r1_t2i}, \n I2T R@5: {r5_i2t}, T2I R@5: {r5_t2i}, \n I2T R@10: {r10_i2t}, T2I R@10: {r10_t2i}, \n ") 

完整代码(包括数据集处理)

import json
import os
import random
from PIL import Image
from argparse import Namespace
import numpy as np
from collections import Counter, defaultdict
from matplotlib import pyplot as plt
from PIL import Image

import torch
import torch.nn as nn
from torch.nn.utils.rnn import pack_padded_sequence
from torch.utils import data
from torch.utils.data import Dataset
import torchvision
import torchvision.transforms as transforms
import torchvision.models as models
from torchvision.models import ResNet152_Weights, VGG19_Weights

# ————————————————————————————————处理数据集————————————————————————————————
def create_dataset(dataset = 'flickr8k', captions_per_image = 5, min_word_count = 5, max_len = 30):
    """
    参数:
        dataset:数据集名称
        captions_per_image:每张图片对应的文本描述数
        min_word_count:仅考虑在数据集中(除测试集外)出现5次的词
        max_len:文本描述包含的最大单词数。如果文本描述超过该值则截断
    输出:
        一个词典文件:vocab.json
        三个数据集文件:train_data.json val_data.json test_data.json
    """
    
    karpathy_json_path = "./dataset_flickr8k.json"  # 读取json文件
    image_folder = "./images/"                      # 图片文件夹
    output_folder = "./data/%s" % dataset           # 保存处理结果的文件夹

    # 读取数据集文本描述的json文件
    with open(file=karpathy_json_path, mode="r") as j:
        data = json.load(fp=j)
    

    image_paths = defaultdict(list)                 # collections.defaultdict() 参考:https://zhuanlan.zhihu.com/p/345741967 ; https://blog.csdn.net/sinat_38682860/article/details/112878842
    image_captions = defaultdict(list)
    vocab = Counter()                               # collections.Counter() 主要功能:可以支持方便、快速的计数,将元素数量统计,然后计数并返回一个字典,键为元素,值为元素个数。 参考:https://blog.csdn.net/chl183/article/details/106956807

    for img in data["images"]:                      # 读取每张图片
        split = img["split"]                        # split:该图片文本描述的编号 len(spilt)==5
        captions = []                               
        for c in img["sentences"]:                  # 读取图片的文本描述

            # 更新词频,测试集在训练过程中未见到数据集
            if split != "test":                     # 只读取train/val
                vocab.update(c['tokens'])           # 更新词表 这里的c['tokens']是一个列表,在这里对这列表中每个元素,也就是每个词使其在词表中的出现个数加一 参考:https://blog.csdn.net/ljr_123/article/details/106209564 ; https://blog.csdn.net/ljr_123/article/details/106209564
            
            # 不统计超过最大长度限制的词
            if len(c["tokens"]) <= max_len:
                captions.append(c["tokens"])        # 如果每个句子的单词数都不大与max_len,则len(captions)+=5

        if len(captions) == 0:                      # 万一有超过的也得往下循环
            continue
        
        path =os.path.join(image_folder, img['filename'])    # 读取图片路径:"./images/img['filename']" 这里img['filename']为图片名字 os.path.join()函数用于路径拼接文件路径,可以传入多个路径 参考:https://blog.csdn.net/swan777/article/details/89040802

        image_paths[split].append(path)             # 保存每张图片路径
        image_captions[split].append(captions)      # 保存每张图片对应描述文本
    
    """
    执行完以上步骤后得到了:vocab, image_captions, image_paths

    vocab 为一个字典结构,key为各个出现的词; value为这个词出现的个数
    image_captions 为一个字典结构,key为"train","val"; val为列表,表中元素为一个个文本描述的列表
    image_paths 为一个字典结构,key为"train","val"; val为列表,表中元素为图片路径的字符串
    
    可运行以下代码验证:
    print(vocab)
    print(image_paths["train"][1])
    print(image_captions["train"][1])
    """
    
    # 创造词典,增加占位符<pad>,未登录词标识符<unk>,句子首尾标识符<start>和<end>
    words = [w for w in vocab.keys() if vocab[w]> min_word_count]
    vocab = {k : v + 1 for v, k in enumerate(words)}

    vocab['<pad>']=0
    vocab['<unk>']=len(vocab)
    vocab['<start>']=len(vocab)
    vocab['<end>']=len(vocab)
    
    # 储存词典
    with open(os.path.join(output_folder, 'vocab.json'),"w") as fw:
        json.dump(vocab,fw)
    

    # 整理数据集
    for split in image_paths:                       # 只会循环三次 split = "train" 、 split = "val" 和 split = "test"
        
        imgpaths = image_paths[split]               # type(imgpaths)=list
        imcaps = image_captions[split]              # type(imcaps)=list
        enc_captions = []
    
        for i, path in enumerate(imgpaths):

            # 合法性检测,检查图像时候可以被解析
            img = Image.open(path)                  # 参考:https://blog.csdn.net/weixin_43723625/article/details/108158375
            
            # 如果图像对应的描述数量不足,则补足
            if len(imcaps[i]) < captions_per_image:
                filled_num = captions_per_image - len(imcaps[i])
                captions = imcaps[i]+ [random.choice(imcaps[i]) for _ in range(0, filled_num)]
            else:
                captions = random.sample(imcaps[i],k=captions_per_image)        # 打乱文本描述 参考:https://blog.csdn.net/_37281522/article/details/85032470
            
            assert len(captions)==captions_per_image

            for j,c in enumerate(captions):
                # 对文本描述进行编码
                enc_c = [vocab['<start>']] + [vocab.get(word, vocab['<unk>']) for word in c] + [vocab["<end>"]]
                enc_captions.append(enc_c)
    
        assert len(imgpaths)* captions_per_image == len(enc_captions)

        data = {"IMAGES" : imgpaths,
                "CAPTIONS" : enc_captions}
        
        # 储存训练集,验证集,测试集
        with open(os.path.join(output_folder,split+"_data.json"),'w') as fw:
            json.dump(data, fw)

create_dataset()

# ————————————————————————————————模型架构/训练————————————————————————————————
class ImageTextDataset(Dataset):
    """
    Pytorch 数据类,用于 Pytorch Dataloader 来按批次产生数据
    """
    def __init__(self, dataset_path, vocab_path, split, captions_per_image=5, max_len=30, transform=None):
        """
        参数:
            dataset_path: json 格式数据文件路径
            vocab_path: json 格式字典文件路径
            split: "tarin", "val", "test"
            captions_per_image: 每张图片对应的文本描述数
            max_len: 文本描述最大单词量
            transform: 图像预处理方法
        """
        self.split = split
        assert self.split in {"train", "val", "test"}       # assert的应用 参考:https://blog.csdn.net/TeFuirnever/article/details/88883859
        self.cpi = captions_per_image
        self.max_len = max_len

        # 载入图像
        with open(dataset_path, "r") as f:
            self.data = json.load(f)
        
        # 载入词典
        with open(vocab_path, "r") as f:
            self.vocab = json.load(f)
        
        # 图像预处理流程
        self.transform = transform

        # 数据量
        self.dataset_size = len(self.data["CAPTIONS"])
    
    def __getitem__(self, i):
        """
        参数:
            i: 第 i 张图片
        """
        # 第 i 个样本描述对应第 (i // captions_per_image) 张图片
        img = Image.open(self.data['IMAGES'][i // self.cpi]).convert("RGB")     # 参考:https://blog.csdn.net/nienelong3319/article/details/105458742
        
        # 如歌有图像预处理流程,进行图像预处理
        if self.transform is not None:
            img = self.transform(img)
        
        caplen = len(self.data["CAPTIONS"][i])
        pad_caps = [self.vocab['<pad>']] * (self.max_len + 2 - caplen)
        caption = torch.LongTensor(self.data["CAPTIONS"][i] + pad_caps)         # 类型转换 参考:https://blog.csdn.net/_45138078/article/details/131557441

        return img, caption, caplen

    def __len__(self):
        return self.dataset_size
    
    
def mktrainval(data_dir, vocab_path, batch_size, workers=4):
    """
    参数:
        data_dir: json 文件夹位置
        vocab_path: 词典位置
        batch_size: batch大小
        worker: 运行进程数 defaul = multiprocessing.cpu_count()
    """
    # 参考:https://zhuanlan.zhihu.com/p/476220305
    train_tx = transforms.Compose([
        transforms.Resize(256),                                                         # 缩放
        transforms.RandomCrop(224),                                                     # 随机裁剪
        transforms.ToTensor(),                                                          # 用于对载入的图片数据进行类型转换,将图片数据转换成Tensor数据类型的变量
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])     # 标准化,这里的均值和方差为在ImageNet数据集上抽样计算出来的
    ])

    val_tx = transforms.Compose([
        transforms.Resize(256),
        transforms.CenterCrop(224),                                                     # 中心裁剪
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ])

    train_set = ImageTextDataset(dataset_path = os.path.join(data_dir, "train_data.json"), vocab_path=vocab_path, split="train", transform=train_tx)
    val_set = ImageTextDataset(dataset_path = os.path.join(data_dir, "val_data.json"), vocab_path=vocab_path, split="val", transform=val_tx)
    test_set = ImageTextDataset(dataset_path = os.path.join(data_dir, "test_data.json"), vocab_path=vocab_path, split="test", transform=val_tx)

    # 构建 pytorch 的 Dataloader 类
    # 参考:https://blog.csdn.net/rocketeerLi/article/details/90523649 ; https://blog.csdn.net/zfhsfdhdfajhsr/article/details/116836851
    train_loder = data.DataLoader(
        dataset=train_set, batch_size=batch_size, shuffle=True,
        num_workers=workers, pin_memory=True
    )

    # 验证集和测试集不需要打乱数据顺序:shuffer = False
    # 参考:https://blog.csdn.net/_42940160/article/details/123894759
    val_loder = data.DataLoader(              
        dataset=val_set, batch_size=batch_size, shuffle=False,
        num_workers=workers, pin_memory=True, drop_last=False
    )

    test_loder = data.DataLoader(              
        dataset=test_set, batch_size=batch_size, shuffle=False,
        num_workers=workers, pin_memory=True, drop_last=False
    )

    return train_loder, val_loder, test_loder
    

class ImageRepExtractor(nn.Module):
    def __init__(self, embed_size, pretrained_model='resnet152', finetuned=True):
        """
        参数:
            embed_size: 维度
            pretrained_model: 图像表示提取器,如ResNet-152
            finetuned: 是否微调参数(默认True)
        """
        super(ImageRepExtractor, self).__init__()

        if pretrained_model == "resnet152":
            net = models.resnet152(weights=ResNet152_Weights.DEFAULT)       # 参考:https://pytorch.org/vision/main/models/generated/torchvision.models.resnet152.html ; https://zhuanlan.zhihu.com/p/225597229
            for param in net.parameters():
                param.requires_grad = finetuned
            
            net.fc = nn.Linear(net.fc.in_features, embed_size)
            nn.init.xavier_uniform_(net.fc.weight)      # 参考: https://blog.csdn.net/luoxuexiong/article/details/95772045

        elif pretrained_model == "vgg19":
            net = models.vgg19(weights=VGG19_Weights.DEFAULT)
            for param in net.parameters:
                param.requires_grad = finetuned

            net.classifier[6] = nn.Linear(net.classifier[6].in_feature, embed_size)
            nn.init.xavier_uniform_(net.classifier[6].weight)
            
        else:
            raise ValueError("Unknow image model"+ pretrained_model)
        self.net = net

    def forward(self, x):
        out = self.net(x)
        out = nn.functional.normalize(out)
        return out


class TextRepExtractor(nn.Module):
    def __init__(self, vocab_size, word_dim, embed_size, num_layers):
        """
        参数:
            vocab_size: 词典大小
            word_dim: 词嵌入维度
            embed_size: 对应表示维度,即 RNN 隐藏层维度
            num_layers: RNN 隐藏层层数
        """
        super(TextRepExtractor, self).__init__()
        self.embed_dim = embed_size
        self.embed = nn.Embedding(vocab_size, word_dim)
        self.rnn = nn.GRU(word_dim, embed_size, num_layers, batch_first = True)

        self.embed.weight.data.uniform_(-0.1, 0.1)

    def forward(self, x, lengths):
        x = self.embed(x)
        # 参考:https://www.cnblogs.com/sbj123456789/p/9834018.html ; https://blog.csdn.net/BierOne/article/details/116133857
        packed = pack_padded_sequence(input=x, lengths=lengths, batch_first=True)
        _, hidden = self.rnn(packed)
        out = nn.functional.normalize(hidden[-1])
        return out

class VSEPP(nn.Module):
    def __init__(self, vocab_size, word_dim, embed_size, num_layers, image_model, finetuned = True):
        """
        参数:
            vocab_size: 词表大小
            word_dim: 词嵌入维度
            embed_size: 对应表示维度,即 RNN 隐藏层维度
            num_layers: RNN隐藏层数
            image_model: 图像表示提取器,ResNet或者VGG19
            finetuned: 是否微调图像表示器
        """
        super(VSEPP, self).__init__()
        self.image_extractor = ImageRepExtractor(embed_size = embed_size, pretrained_model = image_model, finetuned = finetuned)
        self.text_extractor = TextRepExtractor(vocab_size = vocab_size, word_dim = word_dim, embed_size = embed_size, num_layers = num_layers)
    
    def forward(self, images, captions, cap_lens):
        # 按照 caption 的长短排序,并对照调整 image 的顺序
        sorted_cap_len, sorted_cap_indices = torch.sort(cap_lens, 0, True)
        
        images = images[sorted_cap_indices]
        captions = captions[sorted_cap_indices]
        cap_lens = sorted_cap_len

        image_code = self.image_extractor(images)
        text_code = self.text_extractor(captions, cap_lens)

        if not self.training:
            # 恢复数据原始输入
            _, recover_indices = torch.sort(sorted_cap_indices)
            image_code = image_code[recover_indices]
            text_code = text_code[recover_indices]
        
        return image_code, text_code

class TripleNetLoss(nn.Module):
    def __init__(self, margin=0.2, hard_negative=False):
        super(TripleNetLoss, self).__init__()
        self.margin = margin
        self.hard_negative = hard_negative
    
    def forward(self, ie, te):
        """
        参数:
            ie: 图像表示,为 VSEPP 返回的 image_code
            te: 文字表示,为 VSEPP 返回的 text_code
        """
        scores = ie.mm(te.t())

        diagonal = scores.diag().view(ie.size(0), 1)
        d1 = diagonal.expand_as(scores)
        d2 = diagonal.t().expand_as(scores)

        # 图像为锚
        cost_i = (self.margin + scores - d1).clamp(min=0)
        # 文本为锚
        cost_t = (self.margin + scores - d2).clamp(min=0)

        # 损失矩阵对角线上的值不参与运算
        mask = torch.eye(scores.size(0), dtype=torch.bool)
        I = torch.autograd.Variable(mask)
        if torch.cuda.is_available():
            I = I.cuda()
        
        cost_i = cost_i.masked_fill_(I, 0)
        cost_t = cost_t.masked_fill_(I, 0)

        # 寻求困难样本
        if self.hard_negative:
            cost_i = cost_i.max(1)[0]
            cost_t = cost_t.max(0)[0]
        
        return cost_t.sum() + cost_i.sum()


def get_optimizer(model, config):
    params = filter(lambda p : p.requires_grad, model.parameters())
    return torch.optim.Adam(params=params, lr=config.learning_rate)


def adjust_learning_rate(optimizer, epoch, config):
    """
    每隔 lr_updata 个轮次,学习率减少至当前的二分之一
    参数:
        optimizer: 优化器
        epoch: 训练轮次
        config: 模型超参数和辅助变量
    """
    lr = config.learning_rate *(0.5  (epoch// config.lr_update))
    lr = max(lr, config.min_learning_rate)
    for param_group in optimizer.param_groups:  # 参考: https://blog.csdn.net/weixin_45464524/article/details/130456843
        param_group['lr']= lr


def calc_recall(image_codes, text_codes, captions_per_image):
    # 之所以可以每隔固定数量取图片,是因为前面对图文数据对输入顺序进行了还原
    scores = np.dot(image_codes[::captions_per_image], text_codes.T)
    # 以图检文, 按照从小到大排序
    sorted_scores_indices = (-scores).argsort(axis = 1)
    (n_image, n_text) = scores.shape
    ranks_i2t = np.zeros(n_image)
    for i in range(0, n_image):
        # 一张图片对应 cpi 条文本,找到排名最靠前的文本位置
        min_rank = 1e10
        for j in range(i*captions_per_image, (i+1)*captions_per_image):
            rank = list(sorted_scores_indices[i,:]).index(j)
            if min_rank > rank:
                min_rank = rank
        ranks_i2t[i] = min_rank
    # 以文检图, 按照从小到大排序
    sorted_scores_indices = (-scores).argsort(axis = 0)
    ranks_t2i = np.zeros(n_text)
    for i in range(n_text):
        rank = list(sorted_scores_indices[:,i]).index(i // captions_per_image)
        ranks_t2i[i] = rank
    # 最靠前的位置小于 k,即 recall@k, 这里计算了 k 取 1,5,10 时的图文互检的 recall
    r1_i2t = 100.0 * len(np.where(ranks_i2t < 1)[0]) / n_image
    r1_t2i = 100.0 * len(np.where(ranks_t2i < 1)[0]) / n_text
    r5_i2t = 100.0 * len(np.where(ranks_i2t < 5)[0]) / n_image
    r5_t2i = 100.0 * len(np.where(ranks_t2i < 5)[0]) / n_text
    r10_i2t = 100.0 * len(np.where(ranks_i2t < 10)[0]) / n_image
    r10_t2i = 100.0 * len(np.where(ranks_t2i < 10)[0]) / n_text

    return r1_i2t, r1_t2i, r5_i2t, r5_t2i, r10_i2t, r10_t2i


def evaluate(data_loader, model, batch_size, captions_per_image):
    # 切换模型为评估模式
    model.eval()
    image_codes = None
    text_codes = None
    device = next(model.parameters()).device
    N = len(data_loader.dataset)

    for i, (imgs, caps, caplens) in enumerate(data_loader):
        with torch.no_grad():
            image_code, text_code = model(imgs.to(device), caps.to(device), caplens)
            if image_codes is None:
                image_codes = np.zeros((N, image_code.size(1)))
                text_codes = np.zeros((N, text_code.size(1)))
            # 将图文对应表示存到 numpy 数组中,之后在 CPU 上计算 recall
            st = i * batch_size
            ed = (i+1) * batch_size

            image_codes[st:ed] = image_code.data.cpu().numpy()
            text_codes[st:ed] = text_code.data.cpu().numpy()
    # 模型切换回训练模式
    model.train()
    return calc_recall(image_codes, text_codes, captions_per_image)


if __name__ == "__main__":

    # 设置模型超参数和辅助变量
    config = Namespace(
        captions_per_image=5,
        batch_size=32,
        word_dim=300,
        embed_size=1024,
        num_layers=1,
        image_model="resnet152",
        finetuned=True,
        learning_rate=0.00002,
        lr_update=15,
        min_learning_rate=0.000002,
        margin=0.2,
        hard_nagative=True,
        num_epochs=45,
        grad_clip=2,
        evaluate_step=60,       # 每隔多少步在验证集上测试一次
        checkpoint=None,        # 如果不为 None,则利用该变量路径中的模型继续训练
        best_checkpoint="./model/best_flickr8k.ckpt",   # 表现最佳的模型的保存路径
        last_checkpoint="./model/last_flickr8k.ckpt"    # 训练完成时的模型的保存路径
    )

    # 是否可以用 cuda
    os.environ['CUDA_VISIBLE_DEVICES'] = '0'
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    # 数据文件夹/地址
    data_dir = "./data/flickr8k"
    vocab_path = './data/flickr8k/vocab.json'

    # 读取词典,数据集
    with open(vocab_path, 'r') as f:
        vocab = json.load(f)
    train_loader, val_loader, test_loader = mktrainval(data_dir, vocab_path, config.batch_size)

    # 随机初始化或载入已训练模型
    start_epoch = 0
    checkpoint = config.checkpoint
    if checkpoint is None:
        model = VSEPP(vocab_size = len(vocab),
                      word_dim = config.word_dim,
                      embed_size = config.embed_size,
                      num_layers = config.num_layers,
                      image_model = config.image_model,
                      finetuned = config.finetuned)
    else:
        checkpoint = torch.load(checkpoint)
        start_epoch = checkpoint['epoch'] + 1
        model = checkpoint['model']

    # 优化器
    optimizer = get_optimizer(model=model, config=config)

    # 模型移动到 GPU, 并进入训练模式
    model.to(device)
    model.train()

    # 损失函数
    loss_fn = TripleNetLoss(margin=config.margin, hard_negative=config.hard_nagative)

    # 训练
    best_res = 0
    print("开始训练")
    for epoch in range(start_epoch, config.num_epochs):

        adjust_learning_rate(optimizer, epoch, config)

        print(f"这是第{epoch}轮")

        for i, (imgs, caps, caplens) in enumerate(train_loader):

            optimizer.zero_grad()

            # 数据读取至 GPU
            imgs = imgs.to(device, non_blocking = True)
            caps = caps.to(device, non_blocking = True)

            # 向前传播
            image_code, text_code = model(imgs, caps, caplens)
            # 计算损失
            loss = loss_fn(image_code, text_code)
            loss.backward()

            # 梯度截断
            if config.grad_clip > 0:
                nn.utils.clip_grad_norm_(model.parameters(), config.grad_clip)

            # 更新参数
            optimizer.step()

            # 当前状态
            state = {
                'epoch': epoch,
                'step': i,
                'model': model,
                'optimizer': optimizer
            }

            if (i+1) % config.evaluate_step == 0:
                r1_i2t, r1_t2i, r5_i2t, r5_t2i, r10_i2t, r10_t2i = evaluate(data_loader=val_loader,
                                                                            model=model,
                                                                            batch_size=config.batch_size,
                                                                            captions_per_image=config.captions_per_image
                                                                            )
                recall_sum = r1_i2t + r1_t2i + r5_i2t + r5_t2i + r10_i2t + r10_t2i

                # 选择模型
                if best_res < recall_sum:
                    best_res = recall_sum
                    torch.save(state, config.best_checkpoint)

                torch.save(state, config.last_checkpoint)

                print(f"epoch: {epoch}, step: {i+1}, loss: {loss.item()}")

    # 用效果最好的模型在测试集上进行测试
    checkpoint = torch.load(config.best_checkpoint)
    model = checkpoint['model']
    r1_i2t, r1_t2i, r5_i2t, r5_t2i, r10_i2t, r10_t2i = evaluate(data_loader=test_loader, model=model, batch_size=config.batch_size, captions_per_image=config.captions_per_image)

    print(f"Epoch: {checkpoint['epoch']}, \n I2T R@1: {r1_i2t}, T2I R@1: {r1_t2i}, \n I2T R@5: {r5_i2t}, T2I R@5: {r5_t2i}, \n I2T R@10: {r10_i2t}, T2I R@10: {r10_t2i}, \n ")

参考:《多模态深度学习技术基础》冯方向 王小捷

免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://haidsoft.com/156045.html

(0)
上一篇 2025-02-15 16:05
下一篇 2025-02-15 16:10

相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注微信