CLIP再研究以及和视频异常检测的结合

clip 异常检测







预训练模型时期,如上图,描述图像的文本与图像分别通过Text Encoder、与Image Encoder 转换成对应的向量,其中Ti表示一个batch中第i个图像表示的特征,Ii表示第i张图像表示的特征。








  1. $ conda install --yes -c pytorch pytorch=1.7.1 torchvision cudatoolkit=11.0
  2. $ pip install ftfy regex tqdm
  3. $ pip install git+https://github.com/openai/CLIP.git



  1. import os
  2. import clip
  3. import torch
  4. from torchvision.datasets import CIFAR100
  5. # Load the model
  6. device = "cuda" if torch.cuda.is_available() else "cpu"
  7. model, preprocess = clip.load('ViT-B/32', device)
  8. # Download the dataset
  9. cifar100 = CIFAR100(root=os.path.expanduser("~/.cache"), download=True, train=False)
  10. # Prepare the inputs
  11. image, class_id = cifar100[3637]
  12. image_input = preprocess(image).unsqueeze(0).to(device)
  13. text_inputs = torch.cat([clip.tokenize(f"a photo of a {c}") for c in cifar100.classes]).to(device)
  14. # Calculate features
  15. with torch.no_grad():
  16. image_features = model.encode_image(image_input)
  17. text_features = model.encode_text(text_inputs)
  18. # Pick the top 5 most similar labels for the image
  19. image_features /= image_features.norm(dim=-1, keepdim=True)
  20. text_features /= text_features.norm(dim=-1, keepdim=True)
  21. similarity = (100.0 * image_features @ text_features.T).softmax(dim=-1)
  22. values, indices = similarity[0].topk(5)
  23. # Print the result
  24. print("\nTop predictions:\n")
  25. for value, index in zip(values, indices):
  26. print(f"{cifar100.classes[index]:>16s}: {100 * value.item():.2f}%")




  1. import os
  2. import clip
  3. import torch
  4. from torchvision.datasets import CIFAR100
  5. from torchvision import transforms
  6. from PIL import Image
  7. folder_path = "archive/Train"
  8. classes = os.listdir(folder_path)
  9. device = "cuda" if torch.cuda.is_available() else "cpu"
  10. model, preprocess = clip.load('ViT-B/32', device)
  11. input_image_path = "archive/Train/Abuse/Abuse001_x264_610.png"
  12. # Prepare the inputs
  13. image = Image.open(input_image_path)
  14. image_input = preprocess(image).unsqueeze(0).to(device)
  15. text_inputs = torch.cat([clip.tokenize(f"a photo of a {c}") for c in classes]).to(device)
  16. # Calculate features
  17. with torch.no_grad():
  18. image_features = model.encode_image(image_input)
  19. text_features = model.encode_text(text_inputs)
  20. # Pick the top 5 most similar labels for the image
  21. image_features /= image_features.norm(dim=-1, keepdim=True)
  22. text_features /= text_features.norm(dim=-1, keepdim=True)
  23. similarity = (100.0 * image_features @ text_features.T).softmax(dim=-1)
  24. values, indices = similarity[0].topk(5)
  25. # Print the result
  26. print("\nTop predictions:\n")
  27. for value, index in zip(values, indices):
  28. print(f"{classes[index]:>16s}: {100 * value.item():.2f}%")


  1. Top predictions:
  2. NormalVideos: 33.01%
  3. Shooting: 16.24%
  4. Burglary: 14.46%
  5. Assault: 10.37%
  6. Arrest: 6.54%




  1. import os
  2. from PIL import Image
  3. import numpy as np
  4. import torch
  5. import clip
  6. from loguru import logger
  7. from torch.utils.data import Dataset, DataLoader, ConcatDataset
  8. import torch.optim as optim
  9. from torch.optim import lr_scheduler
  10. import torch.nn as nn
  11. from tqdm import tqdm
  12. class YDataset(Dataset):
  13. def __init__(self,img_root,meta_root,is_train,preprocess):
  14. self.img_root = img_root
  15. self.meta_root = meta_root
  16. self.train_set_file = meta_root+'/path_tag_train.txt'
  17. self.test_set_file = meta_root+'/path_tag_test.txt'
  18. self.is_train = is_train
  19. self.img_process = preprocess
  20. self.samples = []
  21. self.sam_labels = []
  22. self.read_file = ""
  23. if is_train:
  24. self.read_file = self.train_set_file
  25. else:
  26. self.read_file = self.test_set_file
  27. with open(self.read_file,'r') as f:
  28. path_tag = f.readlines()
  29. for line in path_tag:
  30. img_path = line.split("\t")[0]
  31. label = line.split("\t")[1]
  32. label = "a photo of " + label
  33. self.samples.append(img_path)
  34. self.sam_labels.append(label)
  35. # 转换为token
  36. self.tokens = clip.tokenize(self.sam_labels)
  37. def __len__(self):
  38. return len(self.samples)
  39. def __getitem__(self, idx):
  40. img_path = self.samples[idx]
  41. token = self.tokens[idx]
  42. # 加载图像
  43. image = Image.open(img_path).convert('RGB')
  44. # 对图像进行转换
  45. image = self.img_process(image)
  46. return image,token
  47. if __name__ == '__main__':
  48. device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
  49. net, preprocess = clip.load("RN50",device=device,jit=False)
  50. optimizer = optim.Adam(net.parameters(), lr=1e-6,betas=(0.9,0.98),eps=1e-6,weight_decay=0.001)
  51. scheduler = lr_scheduler.StepLR(
  52. optimizer, step_size=10, gamma=0.1)
  53. # 创建损失函数
  54. loss_img = nn.CrossEntropyLoss()
  55. loss_txt = nn.CrossEntropyLoss()
  56. ydataset = YDataset(img_root= '../archive',meta_root= '../archive',is_train=True,preprocess=preprocess)
  57. dataset_size_y = len(ydataset)
  58. ydataloader = DataLoader(ydataset,batch_size=4,shuffle=True,num_workers=4,pin_memory=False)
  59. phase = "train"
  60. model_name = "CLIP_Crime"
  61. ckt_gap = 4
  62. epoches = 30
  63. for epoch in range(epoches):
  64. scheduler.step()
  65. total_loss = 0
  66. batch_num = 0
  67. # 使用混合精度,占用显存更小
  68. with torch.cuda.amp.autocast(enabled=True):
  69. for images,label_tokens in tqdm(ydataloader) :
  70. # 将图片和标签token转移到device设备
  71. images = images.to(device)
  72. label_tokens = label_tokens.to(device)
  73. batch_num += 1
  74. # 优化器梯度清零
  75. optimizer.zero_grad()
  76. with torch.set_grad_enabled(phase == "train"):
  77. logits_per_image, logits_per_text = net(images, label_tokens)
  78. ground_truth = torch.arange(len(images),dtype=torch.long,device=device)
  79. cur_loss = (loss_img(logits_per_image,ground_truth) + loss_txt(logits_per_text,ground_truth))/2
  80. total_loss += cur_loss
  81. if phase == "train":
  82. cur_loss.backward()
  83. if device == "cpu":
  84. optimizer.step()
  85. else:
  86. optimizer.step()
  87. clip.model.convert_weights(net)
  88. if batch_num % 4 == 0:
  89. logger.info('{} epoch:{} loss:{}'.format(phase,epoch,cur_loss))
  90. epoch_loss = total_loss / dataset_size_y
  91. torch.save(net.state_dict(),f"{model_name}_epoch_{epoch}.pth")
  92. logger.info(f"weights_{epoch} saved")
  93. if epoch % ckt_gap == 0:
  94. checkpoint_path = f"{model_name}_ckt.pth"
  95. checkpoint = {
  96. 'it': epoch,
  97. 'network': net.state_dict(),
  98. 'optimizer': optimizer.state_dict(),
  99. 'scheduler': scheduler.state_dict()}
  100. torch.save(checkpoint, checkpoint_path)
  101. logger.info(f"checkpoint_{epoch} saved")
  102. logger.info('{} Loss: {:.4f}'.format(
  103. phase, epoch_loss))



文本编码器是 BERT

image编码器使用 ViT


通过在自自注意力(SA)层和前馈网络(FFN)之间为文本编码器的每个 变压器块插入一个附加的交叉注意(CA)层来注入视觉信息。




  1. from transformers import BlipForConditionalGeneration, AutoProcessor
  2. model = BlipForConditionalGeneration.from_pretrained("ybelkada/blip-image-captioning-base-football-finetuned").to(device)
  3. processor = AutoProcessor.from_pretrained("ybelkada/blip-image-captioning-base-football-finetuned")
  4. from matplotlib import pyplot as plt
  5. fig = plt.figure(figsize=(18, 14))
  6. # prepare image for the model
  7. for i, example in enumerate(dataset):
  8. image = example["image"]
  9. inputs = processor(images=image, return_tensors="pt").to(device)
  10. pixel_values = inputs.pixel_values
  11. generated_ids = model.generate(pixel_values=pixel_values, max_length=50)
  12. generated_caption = processor.batch_decode(generated_ids, skip_special_tokens=True)[0]
  13. fig.add_subplot(2, 3, i+1)
  14. plt.imshow(image)
  15. plt.axis("off")
  16. plt.title(f"Generated caption: {generated_caption}")


  1. from transformers.utils import send_example_telemetry
  2. send_example_telemetry("image_captioning_blip_notebook", framework="pytorch")
  3. from datasets import load_dataset
  4. dataset = load_dataset("ybelkada/football-dataset", split="train")
  5. from torch.utils.data import Dataset, DataLoader
  6. class ImageCaptioningDataset(Dataset):
  7. def __init__(self, dataset, processor):
  8. self.dataset = dataset
  9. self.processor = processor
  10. def __len__(self):
  11. return len(self.dataset)
  12. def __getitem__(self, idx):
  13. item = self.dataset[idx]
  14. encoding = self.processor(images=item["image"], text=item["text"], padding="max_length", return_tensors="pt")
  15. # remove batch dimension
  16. encoding = {k:v.squeeze() for k,v in encoding.items()}
  17. return encoding
  18. from transformers import AutoProcessor, BlipForConditionalGeneration
  19. processor = AutoProcessor.from_pretrained("Salesforce/blip-image-captioning-base")
  20. model = BlipForConditionalGeneration.from_pretrained("Salesforce/blip-image-captioning-base")
  21. train_dataset = ImageCaptioningDataset(dataset, processor)
  22. train_dataloader = DataLoader(train_dataset, shuffle=True, batch_size=2)
  23. import torch
  24. optimizer = torch.optim.AdamW(model.parameters(), lr=5e-5)
  25. device = "cuda" if torch.cuda.is_available() else "cpu"
  26. model.to(device)
  27. model.train()
  28. for epoch in range(50):
  29. print("Epoch:", epoch)
  30. for idx, batch in enumerate(train_dataloader):
  31. input_ids = batch.pop("input_ids").to(device)
  32. pixel_values = batch.pop("pixel_values").to(device)
  33. outputs = model(input_ids=input_ids,
  34. pixel_values=pixel_values,
  35. labels=input_ids)
  36. loss = outputs.loss
  37. print("Loss:", loss.item())
  38. loss.backward()
  39. optimizer.step()
  40. optimizer.zero_grad()
  41. # load image
  42. example = dataset[0]
  43. image = example["image"]
  44. # prepare image for the model
  45. inputs = processor(images=image, return_tensors="pt").to(device)
  46. pixel_values = inputs.pixel_values
  47. generated_ids = model.generate(pixel_values=pixel_values, max_length=50)
  48. generated_caption = processor.batch_decode(generated_ids, skip_special_tokens=True)[0]
  49. print(generated_caption)

