Intelligent Emotion Detection
DS 5230 Unsupervised Machine Learning and Data Mining

Emotion Detection has been an emerging area of study in the field of machine learning. With the rise in the application of facial recognition and image processing, emotion detection is also becoming increasingly important. When there is a human-machine interaction, emotion detection can provide personalized services depending on their mood and underlying emotions. Emotion recognition can also have a variety of applications in the fields of medical science, health monitoring, marketing and customer satisfaction, security and surveillance. Emotions are an essential part of human lives and it is also a very good indicator of health and communications. One other potential application could be in social media where most people upload facial images with various emotions.

Intelligent Emotion Detection

The objective of this notebook is compare the performance of different CNNs applied to the task of emotion detection from images. Given an image of a human face, we would like to classify it as one of 7 emotions.

Out[1]:
Import libraries
In [ ]:
from google.colab import drive
import os

drive.mount('/content/gdrive')

os.chdir('gdrive/My Drive/Facial_Recognition')
In [2]:
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import glob
from PIL import Image
from pathlib import Path
from tqdm.notebook import tqdm
from sklearn.model_selection import GridSearchCV
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import confusion_matrix, accuracy_score
from sklearn.model_selection import train_test_split
 
 
# Importing Keras libraries
from keras.utils import np_utils
from keras.models import Sequential, save_model, load_model
from tensorflow.keras.applications.vgg16 import VGG16
from tensorflow.keras.preprocessing import image
from tensorflow.keras.applications.vgg16 import preprocess_input
from keras.applications import imagenet_utils
from keras.callbacks import ModelCheckpoint, ReduceLROnPlateau, TensorBoard, EarlyStopping
from keras.preprocessing.image import load_img, ImageDataGenerator
from keras.preprocessing.image import img_to_array
from keras.layers import Dense, Conv2D, MaxPooling2D, BatchNormalization
from keras.layers import Dropout, Flatten, GlobalAveragePooling2D
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Dense, GlobalAveragePooling2D

from keras.applications.vgg16 import preprocess_input
from keras.regularizers import l2
from tensorflow.keras.optimizers import Adam
import seaborn as sns
 
import warnings
warnings.filterwarnings('ignore')
Define functions and load dataset
In [3]:
#define functions and load dataset
def getPic(img_path):
    return np.array(Image.open(img_path).convert('L').resize((48,48),Image.ANTIALIAS))

def get_label(img_path):
    return Path(img_path).parts[2][0:3]

def get_ds(data_path):
    img_paths = list()
    # Recursively find all the image files from the path data_path
    for img_path in glob.glob(data_path+"/**/*"):
        img_paths.append(img_path)
    images = np.zeros((len(img_paths),48,48))
    labels = np.zeros(len(img_paths))
    # Read and resize the images
    # Get the encoded labels
    for i, img_path in tqdm(enumerate(img_paths)):
        images[i] = getPic(img_path)
        labels[i] = label_to_index[get_label(img_path)]
        
    return images,labels
In [4]:
labels = ["angry", "disgust", "fear", "happy", "neutral", "sad", "suprise"]

# This function calcules the prediction for a given image give the image path and a model
def return_prediction(path, model):
    img = Image.open(path)
    label = Path(path).parts[len(Path(path).parts)-2]
    data_X = getPic(path)   
    data_X = np.expand_dims(np.expand_dims(data_X, -1),-4)
    data_X = data_X / 255.

    plt.style.use('classic')

    fig = plt.figure(figsize=(10,5), constrained_layout=True)
    gs = fig.add_gridspec(3, 3)
    a0 = fig.add_subplot(gs[0:,:2])
    a1 = fig.add_subplot(gs[1:,2])
    a3 = fig.add_subplot(gs[0,2])
    a0.imshow(img)

    pred_prob =  model.predict(data_X)
    pred_label = np.argmax(pred_prob)

    a1.set_title('Predicted Probability')
    a1.barh(labels, pred_prob[0], align='center', alpha=0.5)
    a3.text(0.5, 0.5, 'Actual: ' +label, size=16, ha='center', va='center')
    a3.axis('off')
    a0.axis('off')
    fig.set_facecolor((1,1,1))
    plt.show()

#this function prints the accuracy, presision and recall matrices given the predicted and actual values
def return_confusion_matrix(actual_label, predicted_label):
    confusionMatrix = confusion_matrix(actual_label, predicted_label, labels=range(0,7))
    precision = confusionMatrix/confusionMatrix.sum(axis = 0)
    recall = (confusionMatrix.T/confusionMatrix.sum(axis = 1)).T
    
    sns.set(font_scale=1)
    
    plt.figure(figsize=(8,3))
    sns.heatmap(confusionMatrix, cmap = "Blues", annot = True, fmt = ".0f", xticklabels=labels, yticklabels=labels)
    plt.title("Confusion Matrix", fontsize = 20)
    plt.xlabel('Predicted Label', fontsize = 12)
    plt.ylabel('Actual Label', fontsize = 12)
    plt.tick_params(labelsize = 13)
    plt.xticks(rotation = 60)
    plt.show()

    plt.figure(figsize=(8,3))
    sns.heatmap(precision, cmap = "Blues", annot = True, fmt = ".1%", xticklabels=labels, yticklabels=labels)
    plt.title("Precision Matrix", fontsize = 20)
    plt.xlabel('Predicted Label', fontsize = 12)
    plt.ylabel('Actual Label', fontsize = 12)
    plt.tick_params(labelsize = 13)
    plt.xticks(rotation = 60)
    plt.show()

    plt.figure(figsize=(8,3))
    sns.heatmap(recall, cmap = "Blues", annot = True, fmt = ".1%", xticklabels=labels, yticklabels=labels)
    plt.title("Recall Matrix", fontsize = 20)
    plt.xlabel('Predicted Label', fontsize = 12)
    plt.ylabel('Actual Label', fontsize = 12)
    plt.tick_params(labelsize = 13)
    plt.xticks(rotation = 60)
    plt.show()
In [5]:
def create_features(x, pre_model):
    features = pre_model.predict(x, batch_size=32)
    print(features.shape)
    features_flatten = features.reshape((features.shape[0], 1 * 1 * 512))
    # features_flatten = features.reshape((features.shape[0], 2 * 2 * 2048))
    return features, features_flatten

Dataset

This dataset is called FER2013 and is available through Kaggle here.

The dataset consists of 48x48 pixel grayscale images of faces. The faces have been automatically registered so that the face is more or less centered and occupies about the same amount of space in each image.

In [8]:
train_X = np.load('np_data/train_X.npy')
train_X = np.expand_dims(train_X, -1)
test_X = np.load('np_data/test_X.npy')
test_X = np.expand_dims(test_X, -1)

train_X = train_X / 255.
test_X = test_X /255.

num_classes = 7
train_y = np.load('np_data/train_y.npy')
test_y = np.load('np_data/test_y.npy')

#Set aside validation set from test
test_X, validation_X, test_y, validation_y = train_test_split(test_X, test_y, test_size=0.6, random_state=23)

y_train = np_utils.to_categorical(train_y, num_classes)
y_test = np_utils.to_categorical(test_y, num_classes)
y_validation = np_utils.to_categorical(validation_y, num_classes)
In [9]:
datagen = ImageDataGenerator(rotation_range=25, horizontal_flip=0.5,channel_shift_range=.3)
# datagen.fit(train_X)
train_gen = datagen.flow(train_X, y_train, batch_size=32)
batch_size = 9
i=0
plt.figure(figsize=(8, 6), dpi=80)

for img_batch, batch_labels in datagen.flow(train_X, y_train, batch_size=9):
    for img in img_batch:
        plt.subplot(330 + 1 + i)
        img = np.repeat(img, 3, -1)
        ax = plt.gca()
        ax.axes.xaxis.set_visible(False)
        ax.axes.yaxis.set_visible(False)
        label = labels[np.argmax(batch_labels[i])]
        ax.set_title(label)
        plt.imshow(img)
        i=i+1    
    if i >= batch_size:
        break

The training set consists of 28,709 examples and the test set consists of 3,589 examples.

In [10]:
print("Training data available in 7 classes")
 
food_classes = ('angry', 'disgust',  'fear', 'happy', 'neutral', 'sad', 'surprise')
 
y_pos = np.arange(len(food_classes))
counts = [list(train_y).count(i) for i in range(num_classes)]
 
plt.barh(y_pos, counts, align='center', alpha=0.5)
plt.yticks(y_pos, food_classes)
plt.xlabel('Counts')
plt.title('Train Data Class Distribution')
plt.show()
Training data available in 7 classes

As seen in the distribution chart above, there is significant class invalance, so we'll need to use a especial technique to balance the distribution to make sure the model learns how to classify the minority classes properly.

I applied SMOTE and Data Augmentation.

Data augmentation artificially expands the size of a training dataset by creating modi ed versions of images already in the dataset which can help enhance the model as accuracy since deep neural networks tend to perform better when trained on more data. But also, the variations on the images can help expose the model to a wider variety of situations which can make it more robust. Random horizontal flip and random rotation performed best out of all transformations I tried.

SMOTE (Synthetic Minority Over-sampling Technique) is an oversampling technique that generates synthetic samples from the minority class. It was used to obtain a synthetically class-balanced training set, which was then used to train the model. Finally, in the weighted class approach, each class was assigned a weight based on the ratio of occurrence of each class and the class weights were used while training.

In [11]:
#Create a new SMOTE dataset.
from imblearn.over_sampling import SMOTE

train_smote_X = np.load('np_data/train_X.npy')
train_smote_X = train_smote_X.flatten()
train_smote_X = train_smote_X.reshape(train_X.shape[0], -1)
train_smote_X.shape
sm = SMOTE(random_state=23)
X_smote, y_smote = sm.fit_resample(train_smote_X, train_y)
np.save('np_data/X_smote.npy', X_smote)
np.save('np_data/y_smote.npy', y_smote)
In [12]:
#load SMOTE dataset 
X_smote = np.load('np_data/X_smote.npy')
y_smote = np.load('np_data/y_smote.npy')
smote_X = X_smote.reshape(-1, 48,48)
smote_X = np.expand_dims(smote_X, -1)

smote_X = smote_X / 255.
smote_datagen = ImageDataGenerator(rotation_range=35, 
                                   horizontal_flip=0.5,
                                   channel_shift_range=.4,
                                   vertical_flip=0.5
                                   )
smote_y = np_utils.to_categorical(y_smote, num_classes)
train_smote_gen = smote_datagen.flow(smote_X, smote_y,batch_size=32)

#generate a version with 3 channels for transfer learning with VGG16
smote_Xr = np.repeat(smote_X,3,-1)
validation_Xr = np.repeat(validation_X,3,-1)

trainr_smote_gen = smote_datagen.flow(smote_Xr, smote_y,batch_size=32)

test_Xr = np.repeat(test_X,3,-1)

Model Training

I trained two separate models. The first one uses transfer learning to finetune a CNN with VGG weights, and the second one trains a CNN from random initialization.

Transfer Learning with VGG

Create a base pre-trained model using VGG16 weights and add average pooling and trainable fully-connected layers.

Transfer Learning is a method of constructing deep learning models by taking weights from a model trained for one task and using them as weight initialization for finetuning the model to perform a different task.

I took weights from a VGG16 model which has been pretrained on the ImageNet dataset and exclude the model's top layers which were used for the final ImageNet classification task. I add an average pooling layer to reduce the dimensions and a softmax activation layer to perform the final classification for this task.

Finetune VGG16 transfer model
In [ ]:
# create the base pre-trained model
base_model = VGG16(weights='imagenet', include_top=False) 

# add a global spatial average pooling layer
x = base_model.output
x = GlobalAveragePooling2D()(x)

#add softmax layer
predictions = Dense(7, activation='softmax')(x)

transfer_model = Model(inputs=base_model.input, outputs=predictions)

# train only the top layers and freeze the rest
for layer in transfer_model.layers[:11]:
   layer.trainable = False
for layer in transfer_model.layers[11:]:
   layer.trainable = True

transfer_model.compile(loss='categorical_crossentropy',
              optimizer= Adam(lr=0.0001),
              metrics=['accuracy'])

#reduce learning rate when loss on validation set has stopped improving
lr_reducer = ReduceLROnPlateau(monitor='val_loss', factor=0.3, patience=5, verbose=1)

#stop training when loss on validation set has stopped improving
early_stopper = EarlyStopping(monitor='val_loss', min_delta=0, patience=10, verbose=1, mode='auto')

checkpointer = ModelCheckpoint(filepath='model.vgg16transfer',
                               verbose=1,save_best_only=True, monitor='val_loss')

# transfer_model.load_weights('model.vgg16transfer')

history = transfer_model.fit(trainr_smote_gen,
          batch_size=32,
          epochs=40,
          verbose=1,
          validation_data=(validation_Xr, y_validation),
          shuffle=True,
          callbacks=[lr_reducer, early_stopper, checkpointer])

import json
# Get the dictionary containing each metric and the loss for each epoch
history_dict = history.history
# Save it under the form of a json file
json.dump(str(history_dict), open('model.vgg16transfer.history', 'w'))
In [14]:
acc_train = history.history['accuracy']
acc_val = history.history['val_accuracy']
epochs = range(1,len(acc_train)+1)
plt.plot(epochs, acc_train, 'g', label='Training accuracy')
plt.plot(epochs, acc_val, 'b', label='validation accuracy')
plt.title('Training and Validation accuracy')
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.legend(loc='best')
plt.show()
In [21]:
preds = np.argmax(transfer_model.predict(test_Xr), axis=1)
print("\nAccuracy on Test Data: ", accuracy_score(test_y, preds))
return_confusion_matrix(test_y,preds)
Accuracy on Test Data:  0.6241727621037966

Train a CNN from Scratch.

Define model architecture
In [23]:
num_of_filters = 32
model = Sequential()

model.add(Conv2D(filters=num_of_filters, kernel_size=(3, 3), activation='relu', input_shape=(48, 48, 1)))
model.add(BatchNormalization())
model.add(Conv2D(num_of_filters, kernel_size=(3, 3), activation='relu', padding='same'))
# model.add(Dropout(.4))
model.add(BatchNormalization())
model.add(MaxPooling2D(pool_size=(2, 2), strides=(2, 2)))
model.add(Dropout(.3))

model.add(Conv2D(2*num_of_filters, kernel_size=(3, 3), activation='relu', padding='same'))
model.add(BatchNormalization())
# model.add(Dropout(.4))
model.add(Conv2D(2*num_of_filters, kernel_size=(3, 3), activation='relu', padding='same'))
# model.add(Dropout(.4))
model.add(BatchNormalization())
model.add(MaxPooling2D(pool_size=(2, 2), strides=(2, 2)))
model.add(Dropout(.3))

model.add(Conv2D(2*2*num_of_filters, kernel_size=(3, 3), activation='relu', padding='same'))
model.add(BatchNormalization())
model.add(Conv2D(2*2*num_of_filters, kernel_size=(3, 3), activation='relu', padding='same'))
# model.add(Dropout(.4))
model.add(BatchNormalization())
model.add(MaxPooling2D(pool_size=(2, 2), strides=(2, 2)))
model.add(Dropout(.3))


model.add(Conv2D(2*2*2*num_of_filters, kernel_size=(3, 3), activation='relu', padding='same'))
model.add(BatchNormalization())
model.add(Conv2D(2*2*2*num_of_filters, kernel_size=(3, 3), activation='relu', padding='same'))
# model.add(Dropout(.4))
model.add(BatchNormalization())
model.add(GlobalAveragePooling2D())
# model.add(Dropout(.5))


# model.add(Flatten())

# model.add(Dense(2*2*2*num_of_filters, activation='relu'))
# model.add(Dense(2*2*num_of_filters, activation='relu'))
# model.add(Dense(2*num_of_filters, activation='relu'))

model.add(Dense(7, activation='softmax'))
model.summary()

initial_weights = model.get_weights()
# model.load_weights('model.lab4.hdf5')
Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
=================================================================
 conv2d (Conv2D)             (None, 46, 46, 32)        320       
                                                                 
 batch_normalization (BatchN  (None, 46, 46, 32)       128       
 ormalization)                                                   
                                                                 
 conv2d_1 (Conv2D)           (None, 46, 46, 32)        9248      
                                                                 
 batch_normalization_1 (Batc  (None, 46, 46, 32)       128       
 hNormalization)                                                 
                                                                 
 max_pooling2d (MaxPooling2D  (None, 23, 23, 32)       0         
 )                                                               
                                                                 
 dropout (Dropout)           (None, 23, 23, 32)        0         
                                                                 
 conv2d_2 (Conv2D)           (None, 23, 23, 64)        18496     
                                                                 
 batch_normalization_2 (Batc  (None, 23, 23, 64)       256       
 hNormalization)                                                 
                                                                 
 conv2d_3 (Conv2D)           (None, 23, 23, 64)        36928     
                                                                 
 batch_normalization_3 (Batc  (None, 23, 23, 64)       256       
 hNormalization)                                                 
                                                                 
 max_pooling2d_1 (MaxPooling  (None, 11, 11, 64)       0         
 2D)                                                             
                                                                 
 dropout_1 (Dropout)         (None, 11, 11, 64)        0         
                                                                 
 conv2d_4 (Conv2D)           (None, 11, 11, 128)       73856     
                                                                 
 batch_normalization_4 (Batc  (None, 11, 11, 128)      512       
 hNormalization)                                                 
                                                                 
 conv2d_5 (Conv2D)           (None, 11, 11, 128)       147584    
                                                                 
 batch_normalization_5 (Batc  (None, 11, 11, 128)      512       
 hNormalization)                                                 
                                                                 
 max_pooling2d_2 (MaxPooling  (None, 5, 5, 128)        0         
 2D)                                                             
                                                                 
 dropout_2 (Dropout)         (None, 5, 5, 128)         0         
                                                                 
 conv2d_6 (Conv2D)           (None, 5, 5, 256)         295168    
                                                                 
 batch_normalization_6 (Batc  (None, 5, 5, 256)        1024      
 hNormalization)                                                 
                                                                 
 conv2d_7 (Conv2D)           (None, 5, 5, 256)         590080    
                                                                 
 batch_normalization_7 (Batc  (None, 5, 5, 256)        1024      
 hNormalization)                                                 
                                                                 
 global_average_pooling2d_3   (None, 256)              0         
 (GlobalAveragePooling2D)                                        
                                                                 
 dense_3 (Dense)             (None, 7)                 1799      
                                                                 
=================================================================
Total params: 1,177,319
Trainable params: 1,175,399
Non-trainable params: 1,920
_________________________________________________________________
Finetune
In [ ]:
model = model
model.set_weights(initial_weights)

model.compile(loss='categorical_crossentropy',
              optimizer=Adam(lr=0.001),
              metrics=['accuracy'])

# reduce learning rate when loss on validation set has stopped improving
lr_reducer = ReduceLROnPlateau(monitor='val_loss', factor=0.3, patience=5, verbose=1)

#stop training when loss on validation set has stopped improving
early_stopper = EarlyStopping(monitor='val_loss', min_delta=0, patience=15, verbose=1, mode='auto')

checkpointer = ModelCheckpoint(filepath='model.lab31.hdf5',
                               verbose=1,save_best_only=True, monitor='val_loss')

history = model.fit(train_smote_gen,
          epochs=40,
          verbose=1,
          validation_data=(validation_X, y_validation),
          shuffle=True,
          callbacks=[lr_reducer, early_stopper, checkpointer])

# Get the dictionary containing each metric and the loss for each epoch
history_dict = history.history
# Save it under the form of a json file
json.dump(str(history_dict), open('model.lab31.hdf5.history', 'w'))
In [25]:
preds = np.argmax(model.predict(test_X), axis=1)
print("\nAccuracy on Test Data: ", accuracy_score(test_y, preds))
return_confusion_matrix(test_y,preds)
Accuracy on Test Data:  0.6287008011145943
In [26]:
acc_train = history.history['accuracy']
acc_val = history.history['val_accuracy']
epochs = range(1,len(acc_train)+1)
plt.plot(epochs, acc_train, 'g', label='Training accuracy')
plt.plot(epochs, acc_val, 'b', label='validation accuracy')
plt.title('Training and Validation accuracy')
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.legend(loc='best')
plt.show()

Tests on celebrity images

We can test the model on random images obtained from the web.

In [27]:
#test on image from folder
return_prediction('Test Images/neutral/kanye.jpg', model)
In [28]:
return_prediction('Test Images/surprise/katy.jpg', model)
In [29]:
#test on image from folder
return_prediction('Test Images/happy/gates.jpeg', model)
In [30]:
return_prediction('Test Images/surprise/kambucha.jpg', model)



Design borrowed from John Barron's website.