import pickle, gzip, numpy as np
import torch, torch.nn as nn, torch.nn.functional as F
import matplotlib.pyplot as plt
import matplotlib.cm as cm
import tqdm # a package for making progress bar
def loadData(src):
'''
return: train_x - 2D Numpy array (n, d) where each row is an image
train_y - 1D Numpy array (n, ) where each row is a label
test_x - 2D Numpy array (n, d) where each row is an image
test_y - 1D Numpy array (n, ) where each row is a label
'''
f = gzip.open(src, 'rb')
train_set, valid_set, test_set = pickle.load(f, encoding='latin1')
f.close()
train_x, train_y = train_set
valid_x, valid_y = valid_set
test_x, test_y = test_set
print('Originally, train len (n): '+str(len(train_y))+' '+'valid len (n): '+str(len(valid_y))+' '+'test len (n): '+str(len(test_y)))
train_x = np.vstack((train_x, valid_x))
train_y = np.append(train_y, valid_y)
print('Now, train len (n): '+str(len(train_y))+' '+'test len (n): '+str(len(test_y)))
print('Each vec len (p): '+str(train_x.shape[1]))
return (train_x, train_y, test_x, test_y)
def loadOverlapData(path_to_data_dir, use_mini_dataset, img_rows=42, img_cols=28):
'''
paras: path_to_data_dir - String
use_mini_dataset - Boolean
img_rows, img_cols - int - define the size of an image
'''
if use_mini_dataset:
exten = '_mini'
else:
exten = ''
f = gzip.open(path_to_data_dir + 'train_multi_digit' + exten + '.pkl.gz', 'rb')
X_train = pickle.load(f, encoding='latin1')
f.close()
X_train = np.reshape(X_train, (len(X_train), 1, img_rows, img_cols))
f = gzip.open(path_to_data_dir + 'test_multi_digit' + exten +'.pkl.gz', 'rb')
X_test = pickle.load(f, encoding='latin1')
f.close()
X_test = np.reshape(X_test, (len(X_test),1, img_rows, img_cols))
f = gzip.open(path_to_data_dir + 'train_labels' + exten +'.txt.gz', 'rb')
y_train = np.loadtxt(f)
f.close()
f = gzip.open(path_to_data_dir +'test_labels' + exten + '.txt.gz', 'rb')
y_test = np.loadtxt(f)
f.close()
print('X_Train shape: '+str(X_train.shape)+' '+'y_train shape: '+str(y_train.shape))
print('X_Test shape: '+str(X_test.shape)+' '+'y_test shape: '+str(y_test.shape))
return X_train, y_train, X_test, y_test
def plotImg(X,n=0,p=0):
'''
return: image(s) with n rows & p cols
'''
num_images = X.shape[0]
if n == 0 & p == 0:
num_rows = np.floor(np.sqrt(num_images)) # floor: 2.5 -> 2
num_cols = np.ceil(num_images/num_rows) # ceil: 2.5 -> 3
else:
if n*p < num_images:
return print("Wrong n, p values")
num_rows, num_cols = n, p
plt.figure(figsize=(20,10))
for i in range(num_images):
reshaped_image = X[i,:].reshape(28,28)
plt.subplot(num_rows, num_cols, i+1)
plt.imshow(reshaped_image, cmap = cm.Greys_r)
plt.axis('off')
plt.show()
# Split into train and dev
def splitData(X, y, is_overlap):
split_index = int(9 * len(X) / 10)
X_train, X_dev = X[:split_index], X[split_index:] # 9/10 train set -> n = 54000
if is_overlap:
y_train = [y[0][:split_index], y[1][:split_index]]
y_dev = [y[0][split_index:], y[1][split_index:]]
else:
y_train, y_dev = y[:split_index], y[split_index:] # 1/10 dev set -> n = 6000
permutation = np.array([i for i in range(len(X_train))]) # an array of nums from 0 to 54000
np.random.shuffle(permutation) # shuffle so as to reorder the data points
X_train = [X_train[i] for i in permutation]
if is_overlap:
y_train = [[y_train[0][i] for i in permutation], [y_train[1][i] for i in permutation]]
else:
y_train = [y_train[i] for i in permutation]
return X_train, y_train, X_dev, y_dev
def batchifyData(X, y, batch_size, is_overlap):
"""
Takes a set of data points and labels and groups them into batches.
paras: X - a list of inputs;
y - a list of outputs;
batch_size - String - how many data points in each batch
is_overlap - Boolean - whether two digits overlap in an img
return: a list of dict
[{'x': tensor(mat1,...,matn), 'y': tensor(num1,...,numn)}, ...]
"""
# Only take batch_size chunks. The remainder will be dropped.
N = int(len(X) / batch_size) * batch_size # how many batches we have
batches = []
for i in range(0, N, batch_size):
if is_overlap:
batches.append({
'x': torch.tensor(X[i:i+batch_size], dtype=torch.float32),
'y': torch.tensor([y[0][i:i + batch_size],
y[1][i:i + batch_size]],
dtype=torch.int64)
})
else:
batches.append({
'x': torch.tensor(X[i:i+batch_size], dtype=torch.float32),
'y': torch.tensor(y[i:i+batch_size], dtype=torch.long
)})
return batches
# How a batch looks like
train_batches[0]
def trainModel(train_data, dev_data, model, opti, is_overlap, n_epochs=30):
# original paras: train_data, dev_data, model, is_overlap, lr=0.01, momentum=0.9, nesterov=False, n_epochs=30
"""
Train a model for N epochs given data and hyper-params.
return: a list of dictionary with [{'Train Loss','Validation Loss'},{'Train Accuracy','Validation Accuracy'}]
"""
# optimize: SGD, Adam
# if opti == 'sdg':
# optimizer = torch.optim.SGD(model.parameters(), lr=0.01, momentum=0.9, nesterov=True)
# elif opti == 'adadelta':
# optimizer = torch.optim.Adadelta(model.parameters(), lr=1.0, rho=0.9, eps=1e-06, weight_decay=0)
# optimizer = torch.optim.Adadelta(model.parameters(), lr=1.0, rho=0.9, eps=1e-06, weight_decay=0)
optimizer = torch.optim.Adam(model.parameters(), lr=0.001, betas=(0.9, 0.999), eps=1e-08, weight_decay=0, amsgrad=False)
# optimizer = torch.optim.SGD(model.parameters(), lr=0.01, momentum=0.9, nesterov=False)
# optimizer = torch.optim.SGD(model.parameters(), lr=lr, momentum=momentum, nesterov=nesterov)
train_losses, train_accuracies, val_losses, val_accuracies = [],[],[],[]
for epoch in tqdm.tqdm(range(1, n_epochs+1)):
print("-------------\nEpoch {}:\n".format(epoch))
# Run **training***
train_loss, train_acc, train_errors = runEpoch(train_data, model.train(), optimizer, is_overlap)
# Run **validation**
val_loss, val_acc, val_errors = runEpoch(dev_data, model.eval(), optimizer, is_overlap)
if is_overlap:
print('Train | loss1: {:.6f} accuracy1: {:.6f} | loss2: {:.6f} accuracy2: {:.6f}'.format(train_loss[0], train_acc[0], train_loss[1], train_acc[1]))
print('Valid | loss1: {:.6f} accuracy1: {:.6f} | loss2: {:.6f} accuracy2: {:.6f}'.format(val_loss[0], val_acc[0], val_loss[1], val_acc[1]))
else:
print('Train loss: {:.6f} | Train accuracy: {:.6f}'.format(train_loss, train_acc))
print('Val loss: {:.6f} | Val accuracy: {:.6f}'.format(val_loss, val_acc))
train_losses.append(train_loss)
train_accuracies.append(train_acc)
val_losses.append(val_loss)
val_accuracies.append(val_acc)
# Save model
torch.save(model, 'mnist_model_fully_connected.pt')
if is_overlap:
tl, vl = list(map(list, zip(*train_losses))), list(map(list, zip(*val_losses)))
ta, va = list(map(list, zip(*train_accuracies))), list(map(list, zip(*val_accuracies)))
return [{'Train loss1':tl[0],'Validation loss1':vl[0]}, {'Train loss2':tl[1],'Validation loss2':vl[1]},
{'Train accuracy1':ta[0],'Validation accuracy1':va[0]}, {'Train accuracy2':ta[1],'Validation accuracy2':va[1]}]
else:
return [{'Train loss':train_losses,'Validation loss':val_losses},
{'Train accuracy':train_accuracies,'Validation accuracy':val_accuracies}]
def runEpoch(data, model, optimizer, is_overlap):
"""Train model for one pass of train data, and return loss, acccuracy"""
if is_overlap:
# Gather losses
losses_first_label = []
losses_second_label = []
batch_accuracies_first = []
batch_accuracies_second = []
x_err, y1_err, y2_err, pred1_err, pred2_err = [],[],[],[],[]
# If model is in train mode, use optimizer.
is_training = model.training
# Iterate through batches
for batch in data:
# Grab x and y
x, y = batch['x'], batch['y']
# Get output predictions
out1, out2 = model(x)
# Predict and store accuracy
predictions_first_label = torch.argmax(out1, dim=1)
predictions_second_label = torch.argmax(out2, dim=1)
bool_arr1 = (predictions_first_label == y[0])
bool_arr2 = (predictions_second_label == y[1])
bool_arr = torch.tensor([i[0]|i[1] for i in zip(1-bool_arr1, 1-bool_arr2)])#,dtype=torch.uint8)
# collect data with wrong predicted label
## need to convert pytorch tensor to list so as to append
x_err += x[bool_arr].tolist()
y1_err += y[0][bool_arr].tolist()
y2_err += y[1][bool_arr].tolist()
pred1_err += predictions_first_label[bool_arr].tolist()
pred2_err += predictions_second_label[bool_arr].tolist()
## need to convert pytorch tensor to numpy array so as to apply np funcs
accuracy1 = np.mean(bool_arr1.numpy())
accuracy2 = np.mean(bool_arr2.numpy())
batch_accuracies_first.append(accuracy1)
batch_accuracies_second.append(accuracy2)
# Compute loss
loss1 = F.cross_entropy(out1, y[0])
loss2 = F.cross_entropy(out2, y[1])
losses_first_label.append(loss1.data.item())
losses_second_label.append(loss2.data.item())
# If training, do an update.
if is_training:
optimizer.zero_grad()
joint_loss = 0.5 * (loss1 + loss2)
joint_loss.backward()
optimizer.step()
# Calculate epoch level scores
errors = (x_err, y1_err, y2_err, pred1_err, pred2_err)
avg_loss = np.mean(losses_first_label), np.mean(losses_second_label)
avg_accuracy = np.mean(batch_accuracies_first), np.mean(batch_accuracies_second)
else:
# Gather losses
losses = []
batch_accuracies = []
x_err, y_err, pred_err = [],[],[]
# If model is in train mode, use optimizer.
is_training = model.training
# Iterate through batches
for batch in data:
# Grab x and y
x, y = batch['x'], batch['y']
# Get output predictions
out = model(x)
# Predict and store accuracy
predictions = torch.argmax(out, dim=1)
bool_arr = (predictions == y)
# collect data with wrong predicted label
## need to convert pytorch tensor to list so as to append
x_err += x[1-bool_arr].tolist()
y_err += y[1-bool_arr].tolist()
pred_err += predictions[1-bool_arr].tolist()
## need to convert pytorch tensor to numpy array so as to apply np funcs
accuracy = np.mean(bool_arr.numpy())
batch_accuracies.append(accuracy)
# Compute loss
loss = F.cross_entropy(out, y)
losses.append(loss.data.item())
# If training, do an update.
if is_training:
optimizer.zero_grad()
loss.backward()
optimizer.step()
# Calculate epoch level scores
avg_loss = np.mean(losses)
avg_accuracy = np.mean(batch_accuracies)
errors = (x_err, y_err, pred_err)
return avg_loss, avg_accuracy, errors
class Flatten(nn.Module):
"""A custom layer that views an input as 1D."""
def forward(self, input):
return input.view(input.size(0), -1)
def processData(algo):
'''
return: a list of dictionary with [{'Train Loss','Validation Loss'},{'Train Accuracy','Validation Accuracy'}]
a float - test loss
a float - test accuracy
a tuple - 3 lists - test x errors, test y errors, test pred errors
'''
if algo == 'FFNN': # each image is a flattened vector
X_train_dev_ = X_train_dev
X_test_ = X_test
elif algo == 'CNN': # need to rehape the data back into a 1x28x28 image
X_train_dev_ = np.reshape(X_train_dev, (X_train_dev.shape[0], 1, 28, 28))
X_test_ = np.reshape(X_test, (X_test.shape[0], 1, 28, 28))
# Split into train and dev
X_train, y_train, X_dev, y_dev = splitData(X_train_dev_,y_train_dev)
# Split dataset into batches
train_batches = batchifyData(X_train, y_train, batch_size)
dev_batches = batchifyData(X_dev, y_dev, batch_size)
test_batches = batchifyData(X_test_, y_test, batch_size)
# train model
train_dev_loss_acc = train_model(train_batches, dev_batches, model, lr=lr, momentum=momentum, nesterov=nesterov, n_epochs=n_epochs)
# Evaluate the model on test data
test_loss, test_acc, test_errors = run_epoch(test_batches, model.eval(), None)
print()
print("Loss on test set:" + str(test_loss) + " Accuracy on test set: " + str(test_acc))
return train_dev_loss_acc, test_loss, test_acc, test_errors
# def plotRes(res_dict,title):
# for key, values in res_dict.items():
# plt.plot(values, marker='o', markersize=5, label=key)
# plt.legend()
# plt.title(title)
def plotRes(train_dev_loss_acc, title_size):
list_len = len(train_dev_loss_acc)
fig, axs = plt.subplots(1, list_len, figsize=(20,5))
for i in range(list_len):
title = []
for key, values in train_dev_loss_acc[i].items():
axs[i].plot(values, marker='o', markersize=5, label=key)
title.append(key)
axs[i].set_title(' vs '.join(title), fontsize=title_size)
axs[i].set_xlabel('n epochs',fontsize=12)
axs[i].legend(fontsize=15)
np.random.seed(12321) # for reproducibility
torch.manual_seed(12321) # for reproducibility
X_train_dev, y_train_dev, X_test, y_test = loadData('mnist.pkl.gz')
plotImg(X_train_dev[:10],1,10)
# Model specification
batch_size = 128
## Hidden Layer Representation Size
unit_size = 128
## FFNN Model
ffnn = nn.Sequential(
nn.Linear(784, unit_size),
nn.LeakyReLU(),
nn.Linear(unit_size, 10)
)
## CNN Model
cnn = nn.Sequential( # 1 28 28
nn.Conv2d(1, 32, (3, 3)), # 32 26 26
nn.ReLU(),
nn.MaxPool2d((2, 2)), # 32 13 13
nn.Conv2d(32, 64, (3,3)), # 64 11 11
nn.ReLU(),
nn.MaxPool2d((2,2)), # 64 5 5
Flatten(), # 64*5*5 1
nn.Linear(1600,unit_size),# 1600 128
nn.Dropout(),
nn.Linear(unit_size,10) # 128 10
)
model = cnn
lr = 0.1
momentum = 0.9
nesterov = False
n_epochs = 20
# test accuracy = 0.9204727564102564 when batch size = 128 lr = 0.1
train_dev_loss_acc, test_loss, test_acc, test_errors = processData('CNN')
plotRes(train_dev_loss_acc)
plotImg(np.array(test_errors[0])[:10],1,10)
print('True labels: '+str(np.array(test_errors[1])[:10]))
print('Wrongly Predicted labels: '+str(np.array(test_errors[2])[:10]))
print('Wrong labels in total: '+str(len(test_errors[2])))
plotImg(np.array(test_errors[0]))
print('True labels: '+str(np.array(test_errors[1])))
print('Wrongly Predicted labels: '+str(np.array(test_errors[2])))
print('Wrong labels in total: '+str(len(test_errors[2])))
# Model specification
batch_size = 32
unit_size = 10
## FFNN Model
ffnn = nn.Sequential(
nn.Linear(784, unit_size),
nn.LeakyReLU(),
nn.Linear(unit_size, 10)
)
model = ffnn
lr = 0.1
n_epochs = 10
# Baseline
train_dev_loss_acc, test_loss, test_acc, test_errors = processData('FFNN')
plotRes(train_dev_loss_acc)
plotImg(np.array(test_errors[0])[:10],1,10)
print('True labels: '+str(np.array(test_errors[1])[:10]))
print('Wrongly Predicted labels: '+str(np.array(test_errors[2])[:10]))
print('Wrong labels in total: '+str(len(test_errors[2])))
num_classes = 10
img_rows, img_cols = 42, 28
class MLP(nn.Module):
def __init__(self, input_dimension):
super(MLP, self).__init__()
self.flatten = Flatten()
# initialize model layers here
self.linear1 = nn.Linear(input_dimension, 64)
self.linear2 = nn.Linear(64,64)
self.linear_first_digit = nn.Linear(64, num_classes)
self.linear_second_digit = nn.Linear(64, num_classes)
def forward(self, x):
xf = self.flatten(x)
# use model layers to predict the two digits
out1 = F.relu(self.linear1(xf))
out2 = F.relu(self.linear2(out1))
out_first_digit = self.linear_first_digit(out2)
out_second_digit = self.linear_second_digit(out2)
return out_first_digit, out_second_digit
class CNN(nn.Module):
def __init__(self, input_dimension):
super(CNN, self).__init__()
# initialize model layers here
self.linear1 = nn.Linear(input_dimension, 64)
self.linear2 = nn.Linear(64,64)
self.linear_first_digit = nn.Linear(64,num_classes)
self.linear_second_digit = nn.Linear(64, num_classes)
self.encoder = nn.Sequential(
nn.Conv2d(1,8,(3,3)),
nn.ReLU(),
nn.MaxPool2d((2,2)),
nn.Conv2d(8,16,(3,3)),
nn.ReLU(),
nn.MaxPool2d((2,2)),
Flatten(),
nn.Linear(720,128),
nn.Dropout(0.5)
)
self.first_digit_class = nn.Linear(128,10)
self.second_digit_class = nn.Linear(128,10)
def forward(self, x):
# use model layers to predict the two digits
out = self.encoder(x)
out_first_digit = self.first_digit_class(out)
out_second_digit = self.second_digit_class(out)
return out_first_digit, out_second_digit
np.random.seed(12321) # for reproducibility
torch.manual_seed(12321) # for reproducibility
X_train_dev, y_train_dev, X_test, y_test = loadOverlapData('', True)
def plotImg(X,n=0,p=0):
'''
return: image(s) with n rows & p cols
'''
num_images = X.shape[0]
if n*p < num_images:
return print("Wrong n, p values")
plt.figure(figsize=(20,10))
for i in range(num_images):
plt.subplot(n, p, i+1)
plt.imshow(X[i,0], cmap = cm.Greys_r)
plt.axis('off')
plt.show()
def plotErrors():
# test_errors = x_err, y1_err, y2_err, pred1_err, pred2_err
plotImg(np.array(test_errors[0])[:10],1,10)
print('True upper labels: '+str(np.array(test_errors[1])[:10]))
print('True lower labels: '+str(np.array(test_errors[2])[:10]))
print()
print('Wrongly Predicted upper labels: '+str(np.array(test_errors[3])[:10]))
print('Wrongly Predicted lower labels: '+str(np.array(test_errors[4])[:10]))
print()
print('Wrong labels in total: '+str(len(test_errors[0]))+' Percentage: '+'{:.1%}'.format(len(test_errors[0])/len(y_test[0])))
plotImg(X_train_dev[:10],1,10)
X_train, y_train, X_dev, y_dev = splitData(X_train_dev, y_train_dev, True)
# Paras:
batch_size = 64
n_epoch = 30
# model:
mlp = MLP(img_rows * img_cols)
cnn = CNN(img_rows * img_cols)
# Split dataset into batches
train_batches = batchifyData(X_train, y_train, batch_size, True)
dev_batches = batchifyData(X_dev, y_dev, batch_size, True)
test_batches = batchifyData(X_test, y_test, batch_size, True)
def runModel(model, opti, is_overlap):
model_ = model
# train model
train_dev_loss_acc = trainModel(train_batches, dev_batches, model_, opti, is_overlap, n_epoch)
# Evaluate the model on test data
test_loss, test_acc, test_errors = runEpoch(test_batches, model_.eval(), None, is_overlap)
print()
if is_overlap:
print('Test loss1: {:.6f} accuracy1: {:.6f} loss2: {:.6f} accuracy2: {:.6f}'.format(test_loss[0], test_acc[0], test_loss[1], test_acc[1]))
else:
print("Loss on test set:" + str(test_loss) + " Accuracy on test set: " + str(test_acc))
return train_dev_loss_acc, test_loss, test_acc, test_errors
# Baseline
train_dev_loss_acc, test_loss, test_acc, test_errors = runModel(mlp, 'sgd', True)
plotErrors()
plotRes(train_dev_loss_acc,14)
# Adam
train_dev_loss_acc, test_loss, test_acc, test_errors = runModel(cnn, 'adam', True)
plotErrors()
plotRes(train_dev_loss_acc,14)