我正在尝试使用卷积神经网络复制大规模视频分类。 我已经下载了照片袋中均匀划分的帧。维护所有信息的训练和测试 DataFrame 具有以下结构:
指数 | 0 | 1 | 2 | 3 | 4 | 等 | |
---|---|---|---|---|---|---|---|
0 | 火车/-b5vDKaAd9o/bag_of_shots0 | 0 | 0 | 0 | 0 | 1 | |
1 | 火车/-b5vDKaAd9o/bag_of_shots1 | 0 | 0 | 0 | 0 | 1 | |
2 | 火车/-b5vDKaAd9o/bag_of_shots2 | 0 | 0 | 0 | 0 | 1 | |
3 | 火车/5smf0wPlLk4/bag_of_shots0 | 1 | 0 | 1 | 0 | 1 | |
等 | - | - | - | - | - | - |
以计数器为索引,拍摄袋位置的路径和所有标签的单一热编码。
现在我的自定义数据集类是:
class VideoDataset(Dataset):
def __init__(self, df, transform = None, t = 'single'):
self.df = df
self.transform = transform
self.t = t
def __len__(self):
return len(self.df)
def __getitem__(self, index):
if self.transform is None: self.transform = transforms.ToTensor()
images_path = self.df.iloc[index, 0]
if self.t == 'single':
images = self.transform(Image.open(f'{dataset_path}/{images_path}/shot{shots//2}.png'))
if self.t == 'early':
images = np.array([self.transform(Image.open(f'{dataset_path}/{images_path}/shot{idx}.png')) for idx in range(shots//3,shots//3*2)])
if self.t == 'late':
images = np.array([self.transform(Image.open(f'{dataset_path}/{images_path}/shot0.png')), self.transform(Image.open(f'{dataset_path}/{images_path}/shot{shots-1}.png'))])
if self.t == 'slow':
if shots%10 == 0:
images = np.array([self.transform(Image.open(f'{dataset_path}/{images_path}/shot{idx}.png')) for idx in range((shots/2) - 5, (shots/2) + 5)])
else:
images = np.array([self.transform(Image.open(f'{dataset_path}/{images_path}/shot{idx}.png')) for idx in range((shots%10) - (shots%10)//2, shots-(shots%10)//2)])
y_labels = torch.from_numpy(self.df.iloc[0, 1:].to_numpy().astype(float))
if self.t != 'single': images = torch.from_numpy(images)
return images, y_labels, images_path
然后关于火车测试拆分我有:
def spit_train(train_data, perc_val_size):
train_size = len(train_data)
val_size = int((train_size * perc_val_size) // 100)
train_size -= val_size
return random_split(train_data, [int(train_size), int(val_size)]) #train_data, val_data
train_data_single, val_data_single = spit_train(VideoDataset(df=train_df, transform=train_transform, t='single'), 20)
test_data_single = VideoDataset(df=test_df, transform=test_transform, t='single')
数据加载器:
BATCH_SIZE = 8
NUM_WORKERS = os.cpu_count()
def generate_dataloaders(train_data, val_data, test_data, batch_size=BATCH_SIZE, num_workers=NUM_WORKERS):
train_dl = DataLoader(dataset = train_data,
batch_size = BATCH_SIZE,
num_workers = NUM_WORKERS,
shuffle = True)
val_dl = DataLoader(dataset = val_data,
batch_size = BATCH_SIZE,
num_workers = NUM_WORKERS,
shuffle = True)
test_dl = DataLoader(dataset = test_data,
batch_size = BATCH_SIZE,
num_workers = NUM_WORKERS,
shuffle = False) # don't need to shuffle testing data when we are considering time series dataset
return train_dl, val_dl, test_dl
自定义 AlexNet CNN:
class AlexNet(nn.Module):
def __init__(self, in_channels, stream_type=None, t_frames=[1,1,1]):
super().__init__()
self.stream_type = stream_type
self.fovea = transforms.Compose([transforms.CenterCrop((89, 89))])
self.context = transforms.Compose([transforms.Resize((89, 89))])
self.transform = transforms.Compose([transforms.Resize((170, 170))])
self.t_frames = t_frames
self.conv1 = nn.Sequential(
nn.Conv2d(in_channels*self.t_frames[0], 96, kernel_size=11, stride=3, padding=2),
nn.ReLU(inplace=False),
nn.LocalResponseNorm(size=5, alpha=0.0001, beta=0.5, k=2),
nn.MaxPool2d(kernel_size=3, stride=2)
)
self.conv2 = nn.Sequential(
nn.Conv2d(96*self.t_frames[1], 256, kernel_size=5, stride=1, padding=2),
nn.ReLU(inplace=False),
nn.LocalResponseNorm(size=5, alpha=0.0001, beta=0.5, k=2),
nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
)
self.conv3 = nn.Sequential(
nn.Conv2d(256*self.t_frames[2], 384, kernel_size=3, stride=1, padding=1),
nn.ReLU(inplace=False),
nn.Conv2d(384, 384, kernel_size=3, stride=1, padding=1),
nn.ReLU(inplace=False),
nn.Conv2d(384, 256, kernel_size=3, stride=1, padding=1),
nn.ReLU(inplace=False)
)
self.MaxPool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
self.init_bias() # initialize bias
def init_bias(self):
for block in [self.conv1, self.conv2, self.conv3]:
for layer in block:
if isinstance(layer, nn.Conv2d):
nn.init.normal_(layer.weight, mean=0, std=0.01)
nn.init.constant_(layer.bias, 0)
# original paper = 1 for Conv2d layers 2nd, 4th, and 5th conv layers
nn.init.constant_(self.conv2[0].bias, 1)
nn.init.constant_(self.conv3[2].bias, 1)
nn.init.constant_(self.conv3[4].bias, 1)
def forward(self, x):
if self.stream_type != None:
x = self.fovea(x) if self.stream_type == 'fovea' else self.context(x)
return self.conv3(self.conv2(self.conv1(x)))
else:
x = self.transform(x)
return self.MaxPool(self.conv3(self.conv2(self.conv1(x))))
初始模型是将每包镜头的中间帧作为输入的模型
class NoMultiresCNN(nn.Module):
def __init__(self, CNN, num_classes):
super(NoMultiresCNN, self).__init__()
self.CNN = CNN
self.classifier = nn.Sequential(
nn.Dropout(p=0.5, inplace=False),
nn.Linear(in_features=(256 * 7 * 7), out_features=4096),
nn.ReLU(inplace=False),
nn.Dropout(p=0.5, inplace=False),
nn.Linear(in_features=4096, out_features=4096),
nn.ReLU(inplace=False),
nn.Linear(in_features=4096, out_features=num_classes),
)
def forward(self, x):
x = self.CNN(x)
x = x.reshape(x.shape[0], -1)
return self.classifier(x)
训练和评估步骤的架构:
class CNN_Architecture():
def __init__(self, model: torch.nn.Module, train_dataloader: torch.utils.data.DataLoader,
val_dataloader: torch.utils.data.DataLoader, optimizer: torch.optim.Optimizer,
loss_fn: torch.nn.Module, accuracy_fn, scheduler: torch.optim.Optimizer, device: torch.device, save_check = False):
self.model = model.to(device)
self.optimizer = optimizer
self.train_dataloader = train_dataloader
self.loss_fn = loss_fn
self.val_dataloader = val_dataloader
self.accuracy_fn = accuracy_fn
self.scheduler = scheduler
self.device = device
self.save_check = save_check
def __save_checkpoint(self, train_loss, train_f1, epoch):
data_path = Path('data/')
filename = f'{self.model.typ}_checkpoint.pth.tar'
print('=> Saving Checkpoint')
checkpoint = {'state_dict': self.model.state_dict(), 'optimizer': self.optimizer.state_dict(), 'train_loss': train_loss, 'train_f1': train_f1, 'epoch': epoch}
torch.save(checkpoint, filename)
print(' DONE\n')
def __load_checkpoint(self,checkpoint):
self.model.load_state_dict(checkpoint['state_dict'])
self.optimizer.load_state_dict(checkpoint['optimizer'])
def evaluate(self, val_dataloader: torch.utils.data.DataLoader, epoch = 1, epochs = 1):
val_loss, val_f1 = 0, 0
self.model.eval()
pbar = tqdm(enumerate(val_dataloader), total = len(val_dataloader), leave=False) #, desc='EVALUATION'
with torch.inference_mode():
for batch_idx, (images, labels, _) in pbar: # there is a _ to ignore the paths
images, labels = images.to(self.device), labels.to(self.device)
outputs = self.model(images)
#loss = self.loss_fn(outputs, labels).mean()
loss = self.loss_fn(outputs, labels)
f1 = self.accuracy_fn(outputs, labels)
#val_loss += loss.data.mean()
val_loss += loss
val_f1 += f1
pbar.set_description(f'{self.model.__class__.__name__} EVALUATION Epoch [{epoch + 1} / {epochs}]')
pbar.set_postfix(loss = loss.item(), f1 = f1.item())
val_loss /= len(val_dataloader) # already calculate the mean of all loss
val_f1 /= len(val_dataloader) # already calculate the mean of all f1
model_name = self.model.__class__.__name__
if self.model.__class__.__name__ == 'NoMultiresCNN': model_name = f'{model_name} - Stream Type: {self.model.CNN.stream_type}'
return { 'model_name': model_name, # only works when model was created with a class
'model_loss': val_loss.item(),
'model_f1': val_f1.item() }
def fit(self, epochs: int):
results = { 'train_loss': [], 'train_f1': [], 'val_loss': [], 'val_f1': [] }
best_train_loss, best_train_f1 = float('inf'), float('-inf')
for epoch in range(epochs):
train_loss, train_f1 = 0, 0
# Training phase
self.model.train()
pbar = tqdm(enumerate(self.train_dataloader), total = len(self.train_dataloader), leave=False) #, desc='TRAIN'
for batch_idx, (images, labels, _) in pbar: # there is a _ to ignore the paths
# zero_grad -> backword -> step
self.optimizer.zero_grad()
images, labels = images.to(self.device), labels.to(self.device)
outputs = self.model(images)
#loss = self.loss_fn(outputs, labels).mean()
loss = self.loss_fn(outputs, labels)
loss.backward()
self.optimizer.step()
train_loss += loss.item()
f1 = self.accuracy_fn(outputs, labels).item()
train_f1 += f1
model_name = self.model.__class__.__name__
if self.model.__class__.__name__ == 'NoMultiresCNN': model_name = f'{model_name} - Stream Type: {self.model.CNN.stream_type}'
pbar.set_description(f'{model_name} TRAIN Epoch [{epoch + 1} / {epochs}]')
pbar.set_postfix(loss = loss.item(), f1 = f1)
train_loss /= len(self.train_dataloader)
train_f1 /= len(self.train_dataloader)
self.scheduler.step(train_loss)
if(self.save_check):
if(train_loss < best_train_loss and train_f1 > best_train_f1):
self.__save_checkpoint(train_loss, train_f1, epoch + 1)
best_train_loss, best_train_f1 = train_loss, train_f1
# Validation phase
model_name, val_loss, val_f1 = (self.evaluate(self.val_dataloader, epoch, epochs)).values()
results['train_loss'].append(train_loss)
results['train_f1'].append(train_f1)
results['val_loss'].append(val_loss)
results['val_f1'].append(val_f1)
print('Epoch [{}], train_loss: {:.4f}, train_f1: {:.4f}, val_loss: {:.4f}, val_f1: {:.4f} \n'.format(
epoch + 1, train_loss, train_f1, val_loss, val_f1))
return {'model_name': model_name, 'results': results}
我选择的准确度指标:
def accuracy(outputs, labels):
metric = MultilabelF1Score(num_labels=len(LABELS)).to(device)
return metric(outputs, labels)
开始所有计算的功能:
def train_evaluate(model, epochs=NUM_EPOCHS):
# Train model
start_time = timer()
history = model.fit(NUM_EPOCHS)
end_time = timer()
print(f'Total training time: {end_time-start_time:.3f} seconds')
# Compare the results between train and validation set
plot_loss_curves(history)
# Evaluate model
# Test the model in the Test Dataloader
start_time = timer()
result = model.evaluate(test_dl_single)
end_time = timer()
print(f'Total evaluation time: {end_time-start_time:.3f} seconds\n')
print(f"TEST Results for {result['model_name']} -> loss: {result['model_loss']} f1-accuracy: {result['model_f1']}")
最后
NUM_EPOCHS = 5
n_class = len(LABELS)
singleframe_model = NoMultiresCNN(AlexNet(in_channels=3), num_classes=n_class)
loss_fn = nn.BCEWithLogitsLoss()
optimizer = torch.optim.Adam(params=singleframe_model.parameters(), lr=1e-5)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, factor=0.1, patience=3, verbose=True)
CNN = CNN_Architecture(model = singleframe_model,
train_dataloader = train_dl_single,
val_dataloader = val_dl_single,
optimizer = optimizer,
loss_fn = loss_fn,
accuracy_fn = accuracy,
scheduler = scheduler,
device = device)
train_evaluate(CNN)
我得到这些结果:
Epoch [1], train_loss: 0.6415, train_f1: 0.0020, val_loss: 0.5625, val_f1: 0.0021
Epoch [2], train_loss: 0.4761, train_f1: 0.0021, val_loss: 0.3569, val_f1: 0.0021
Epoch [3], train_loss: 0.2507, train_f1: 0.0021, val_loss: 0.1271, val_f1: 0.0021
Epoch [4], train_loss: 0.0718, train_f1: 0.0021, val_loss: 0.0188, val_f1: 0.0021
Epoch [5], train_loss: 0.0107, train_f1: 0.0021, val_loss: 0.0018, val_f1: 0.0021
Total training time: 124.898 seconds
Total evaluation time: 3.226 seconds
TEST Results for NoMultiresCNN - Stream Type: None -> loss: 0.0358263327473281 f1-accuracy: 0.0
如您所见,我使用了 nn.BCEWithLogitsLoss() 损失函数和 MultilabelF1Score 作为准确度分数。 即使我改变模型,f1 分数仍然卡住,而损失继续减少。