Last active
November 12, 2025 00:20
-
-
Save mfurquimdev/e0ea54852b45abe261ae7bcd8b9a83ac to your computer and use it in GitHub Desktop.
Sentiment RNN for Udacity course
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/bin/env python | |
| import matplotlib.pyplot as plt | |
| import numpy as np | |
| import os | |
| import torch | |
| from collections import Counter | |
| from os.path import exists as file_exists | |
| from sklearn.model_selection import train_test_split | |
| from string import punctuation | |
| from torch import nn | |
| from torch.utils.data import TensorDataset, DataLoader | |
| def load_reviews_and_labels(): | |
| print(f'\nload_reviews_and_labels()') | |
| with open('data/reviews.txt', 'r') as f: | |
| reviews = f.read() | |
| with open('data/labels.txt', 'r') as f: | |
| labels = f.read() | |
| print(f'reviews loaded: {len(reviews):8d} Bytes') | |
| print(f'labels loaded: {len(labels):8d} Bytes') | |
| return reviews, labels | |
| def data_preprocessing(reviews): | |
| print(f'\ndata_preprocessing({reviews[:20]})') | |
| reviews = reviews.lower() | |
| all_text = ''.join([c for c in reviews if c not in punctuation]) | |
| print(f'all_text: {all_text[:200]}') | |
| reviews_split = all_text.split('\n') | |
| all_text = ' '.join(reviews_split) | |
| words = all_text.split() | |
| print(f'words: {words[:20]}') | |
| print(f'#words: {len(words):7d}') | |
| print(f'#uniq: {len(set(words)):7d}') | |
| return reviews_split, words | |
| def enconding_words(reviews_split, words): | |
| print(f'\nenconding_words({reviews_split[:1]}, {words[:20]}') | |
| vocab_to_int = {y:x for x,y in enumerate(set(words), start=1)} | |
| reviews_ints = [[vocab_to_int[word] | |
| for word in review.split()] | |
| for review in reviews_split] | |
| print(f'Encoded dict size: {len(vocab_to_int)}') | |
| print(f'Tokenized review: {reviews_ints[:1]}') | |
| return reviews_ints, vocab_to_int | |
| def enconding_labels(labels): | |
| print(f'\nenconding_labels({labels[:20]})') | |
| # 1=positive, 0=negative label conversion | |
| labels = labels.split('\n') | |
| encoded_labels = [1 | |
| if label == 'positive' | |
| else 0 | |
| for label in labels] | |
| print(f'labels[:10]: {labels[:10]}') | |
| print(f'encoded_labels[:10]: {encoded_labels[:10]}') | |
| return encoded_labels | |
| def visualize_data(reviews_ints): | |
| print(f'\nvisualize_data({reviews_ints[:1]})') | |
| review_lens = Counter([len(x) for x in reviews_ints]) | |
| print("Zero-length reviews: {}".format(review_lens[0])) | |
| print("Maximum review length: {}".format(max(review_lens))) | |
| fig, ax = plt.subplots() | |
| ax.hist(review_lens, bins=1000, linewidth=0.5, edgecolor="white") | |
| plt.show() | |
| def remove_outliers(reviews_ints, encoded_labels): | |
| print(f'\nremove_outliers({reviews_ints[:1]}, {encoded_labels[:10]})') | |
| print('Number of reviews before removing outliers: ', len(reviews_ints)) | |
| for idx, review in reversed([(idx,review) for idx,review in enumerate(reviews_ints)]): | |
| if len(review) == 0: | |
| reviews_ints.pop(idx) | |
| encoded_labels.pop(idx) | |
| print('Number of reviews after removing outliers: ', len(reviews_ints)) | |
| return reviews_ints, encoded_labels | |
| def pad_features(reviews_ints, seq_length): | |
| ''' Return features of review_ints, where each review is padded with 0's | |
| or truncated to the input seq_length. | |
| ''' | |
| print(f'\npad_features({reviews_ints[:1]}, {seq_length})') | |
| features=np.empty((0,seq_length), dtype=np.int8) | |
| for review_int in reviews_ints: | |
| if len(review_int) > seq_length: | |
| features = np.append(features, [review_int[:seq_length]], axis=0) | |
| else: | |
| features = np.append(features, [np.pad(review_int, (seq_length-len(review_int),0), 'constant', constant_values=(0,0))], axis=0) | |
| print(f'first 10 values of firest 5 batches\n' | |
| f'{features[:5,:10]}') | |
| return features | |
| def train_valid_test_dataset(features, encoded_labels, train_data_frac=0.8): | |
| print(f'\nsplit_train_test_validation: {features[:5, :10]}, {encoded_labels[:5]}, {train_data_frac})') | |
| X = features | |
| y = np.array(encoded_labels) | |
| ## split data into training, validation, and test data (features and labels, x and y) | |
| X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=1-train_data_frac) | |
| X_test, X_valid, y_test, y_valid = train_test_split(X_test, y_test, test_size=0.5) | |
| ## print out the shapes of your resultant feature data | |
| print(f' Feature Shapes:') | |
| print(f'Train set: {X_train.shape} {len(y_train)}') | |
| print(f'Validation set: {X_valid.shape} {len(y_valid)}') | |
| print(f'Test set: {X_test.shape} {len(y_test)}') | |
| # create Tensor datasets | |
| train_data = TensorDataset(torch.from_numpy(X_train), torch.from_numpy(y_train)) | |
| valid_data = TensorDataset(torch.from_numpy(X_valid), torch.from_numpy(y_valid)) | |
| test_data = TensorDataset(torch.from_numpy(X_test), torch.from_numpy(y_test)) | |
| return train_data, valid_data, test_data | |
| def load_preprocess_data(data_directory, dataset_filename, visualize_data): | |
| print(f'\nload_preprocess_data({data_directory}, {dataset_filename}, {visualize_data})') | |
| if file_exists(f'{data_directory}/{dataset_filename}'): | |
| reviews_data = torch.load(f'{data_directory}/{dataset_filename}') | |
| features, encoded_labels = reviews_data.tensors | |
| features = np.array(features).reshape(features.shape) | |
| encoded_labels = np.array(encoded_labels).reshape(encoded_labels.shape) | |
| else: | |
| reviews, labels = load_reviews_and_labels() | |
| reviews_split, words = data_preprocessing(reviews) | |
| reviews_ints, vocab_to_int = enconding_words(reviews_split, words) | |
| encoded_labels = enconding_labels(labels) | |
| if visualize_data: | |
| visualize_data(reviews_ints) | |
| reviews_ints, encoded_labels = remove_outliers(reviews_ints, encoded_labels) | |
| seq_length = 200 | |
| features = pad_features(reviews_ints, seq_length=seq_length) | |
| assert len(features)==len(reviews_ints), "Your features should have as many rows as reviews." | |
| assert len(features[0])==seq_length, "Each feature row should contain seq_length values." | |
| encoded_labels = np.array(encoded_labels) | |
| reviews_data = TensorDataset(torch.from_numpy(features), torch.from_numpy(encoded_labels)) | |
| torch.save(reviews_data, f'{data_directory}/{dataset_filename}') | |
| return features, encoded_labels | |
| def get_data_loaders(features, encoded_labels, train_data_frac, batch_size): | |
| print(f'\nget_data_loaders({features}, {encoded_labels}, {train_data_frac})') | |
| train_data, valid_data, test_data = \ | |
| train_valid_test_dataset(features, encoded_labels, train_data_frac=train_data_frac) | |
| train_loader = DataLoader(train_data, shuffle=True, batch_size=batch_size) | |
| valid_loader = DataLoader(valid_data, shuffle=True, batch_size=batch_size) | |
| test_loader = DataLoader(test_data, shuffle=True, batch_size=batch_size) | |
| # obtain one batch of training data | |
| dataiter = iter(train_loader) | |
| sample_x, sample_y = dataiter.next() | |
| print('\nSample input size: ', sample_x.size()) # batch_size, seq_length | |
| print('Sample input: \n', sample_x) | |
| print() | |
| print('Sample label size: ', sample_y.size()) # batch_size | |
| print('Sample label: \n', sample_y) | |
| return train_loader, valid_loader, test_loader | |
| class SentimentRNN(nn.Module): | |
| """ | |
| The RNN model that will be used to perform Sentiment analysis. | |
| """ | |
| def __init__(self, vocab_size, output_size, embedding_dim, hidden_dim, n_layers, drop_prob=0.5): | |
| """ | |
| Initialize the model by setting up the layers. | |
| """ | |
| print(f'\nSentimentRNN.__init__(' | |
| f'{vocab_size}, {output_size}, {embedding_dim}, {hidden_dim}, {n_layers}, {drop_prob})') | |
| super(SentimentRNN, self).__init__() | |
| self.output_size = output_size | |
| self.n_layers = n_layers | |
| self.hidden_dim = hidden_dim | |
| # define all layers | |
| self.embedding = nn.Embedding(vocab_size, embedding_dim) | |
| ## TODO: define the LSTM | |
| self.lstm = nn.LSTM(embedding_dim, hidden_dim, n_layers, | |
| dropout=drop_prob, batch_first=True) | |
| ## TODO: define a dropout layer | |
| self.dropout = nn.Dropout(drop_prob) | |
| ## TODO: define the final, fully-connected output layer | |
| self.fc = nn.Linear(hidden_dim, output_size) | |
| self.sig = nn.Sigmoid() | |
| print(self) | |
| def forward(self, x, hidden): | |
| """ | |
| Perform a forward pass of our model on some input and hidden state. | |
| """ | |
| batch_size = x.size(0) | |
| emb_x = self.embedding(x) | |
| lstm_out, hidden = self.lstm(emb_x, hidden) | |
| lstm_out = lstm_out.contiguous().view(-1, self.hidden_dim) | |
| out = self.dropout(lstm_out) | |
| out = self.fc(out) | |
| sig_out = self.sig(out) | |
| sig_out = sig_out.view(batch_size, -1) | |
| sig_out = sig_out[:, -1] | |
| return sig_out, hidden | |
| def init_hidden(self, batch_size): | |
| ''' Initializes hidden state ''' | |
| # Create two new tensors with sizes n_layers x batch_size x hidden_dim, | |
| # initialized to zero, for hidden state and cell state of LSTM | |
| weight = next(self.parameters()).data | |
| train_on_gpu=torch.cuda.is_available() | |
| if (train_on_gpu): | |
| hidden = (weight.new(self.n_layers, batch_size, self.hidden_dim).zero_().cuda(), | |
| weight.new(self.n_layers, batch_size, self.hidden_dim).zero_().cuda()) | |
| else: | |
| hidden = (weight.new(self.n_layers, batch_size, self.hidden_dim).zero_(), | |
| weight.new(self.n_layers, batch_size, self.hidden_dim).zero_()) | |
| return hidden | |
| def main(): | |
| data_directory = os.getenv('DATA_DIRECTORY', 'data') | |
| dataset_filename = os.getenv('DATASET_FILENAME', 'reviews.pt') | |
| visualize_data = os.getenv('VISUALIZE_DATA', 'FALSE').upper() == 'TRUE' | |
| features, encoded_labels = load_preprocess_data(data_directory, dataset_filename, visualize_data) | |
| train_data_frac = 0.8 | |
| batch_size = 50 | |
| train_loader, valid_loader, test_loader = get_data_loaders(features, encoded_labels, train_data_frac, batch_size) | |
| # First checking if GPU is available | |
| train_on_gpu=torch.cuda.is_available() | |
| print() | |
| if(train_on_gpu): | |
| print('Training on GPU.') | |
| else: | |
| print('No GPU available, training on CPU.') | |
| reviews, _ = load_reviews_and_labels() | |
| reviews_split, words = data_preprocessing(reviews) | |
| _, vocab_to_int = enconding_words(reviews_split, words) | |
| vocab_size = len(vocab_to_int)+1 | |
| output_size = 1 | |
| embedding_dim = 400 | |
| hidden_dim = 256 | |
| n_layers = 2 | |
| net = SentimentRNN(vocab_size, output_size, embedding_dim, hidden_dim, n_layers, drop_prob=0.5) | |
| # Training | |
| lr = 0.001 | |
| criterion = nn.BCELoss() | |
| optimizer = torch.optim.Adam(net.parameters(), lr=lr) | |
| epochs = 4 # 3-4 is approx where I noticed the validation loss stop decreasing | |
| counter = 0 | |
| print_every = 100 | |
| clip=5 # gradient clipping | |
| # move model to GPU, if available | |
| if(train_on_gpu): | |
| net.cuda() | |
| net.train() | |
| # train for some number of epochs | |
| for e in range(epochs): | |
| # initialize hidden state | |
| h = net.init_hidden(batch_size) | |
| # batch loop | |
| for inputs, labels in train_loader: | |
| counter += 1 | |
| if(train_on_gpu): | |
| inputs, labels = inputs.cuda(), labels.cuda() | |
| # Creating new variables for the hidden state, otherwise | |
| # we'd backprop through the entire training history | |
| h = tuple([each.data for each in h]) | |
| # zero accumulated gradients | |
| net.zero_grad() | |
| # get the output from the model | |
| output, h = net(inputs, h) | |
| # calculate the loss and perform backprop | |
| loss = criterion(output.squeeze(), labels.float()) | |
| loss.backward() | |
| # `clip_grad_norm` helps prevent the exploding gradient problem in RNNs / LSTMs. | |
| nn.utils.clip_grad_norm_(net.parameters(), clip) | |
| optimizer.step() | |
| # loss stats | |
| if counter % print_every == 0: | |
| # Get validation loss | |
| val_h = net.init_hidden(batch_size) | |
| val_losses = [] | |
| net.eval() | |
| for inputs, labels in valid_loader: | |
| # Creating new variables for the hidden state, otherwise | |
| # we'd backprop through the entire training history | |
| val_h = tuple([each.data for each in val_h]) | |
| if(train_on_gpu): | |
| inputs, labels = inputs.cuda(), labels.cuda() | |
| output, val_h = net(inputs, val_h) | |
| val_loss = criterion(output.squeeze(), labels.float()) | |
| val_losses.append(val_loss.item()) | |
| net.train() | |
| print("Epoch: {}/{}...".format(e+1, epochs), | |
| "Step: {}...".format(counter), | |
| "Loss: {:.6f}...".format(loss.item()), | |
| "Val Loss: {:.6f}".format(np.mean(val_losses))) | |
| # Get test data loss and accuracy | |
| test_losses = [] # track loss | |
| num_correct = 0 | |
| # init hidden state | |
| h = net.init_hidden(batch_size) | |
| net.eval() | |
| # iterate over test data | |
| for inputs, labels in test_loader: | |
| # Creating new variables for the hidden state, otherwise | |
| # we'd backprop through the entire training history | |
| h = tuple([each.data for each in h]) | |
| if(train_on_gpu): | |
| inputs, labels = inputs.cuda(), labels.cuda() | |
| # get predicted outputs | |
| output, h = net(inputs, h) | |
| # calculate loss | |
| test_loss = criterion(output.squeeze(), labels.float()) | |
| test_losses.append(test_loss.item()) | |
| # convert output probabilities to predicted class (0 or 1) | |
| pred = torch.round(output.squeeze()) # rounds to the nearest integer | |
| # compare predictions to true label | |
| correct_tensor = pred.eq(labels.float().view_as(pred)) | |
| correct = np.squeeze(correct_tensor.numpy()) if not train_on_gpu else np.squeeze(correct_tensor.cpu().numpy()) | |
| num_correct += np.sum(correct) | |
| # -- stats! -- ## | |
| # avg test loss | |
| print("Test loss: {:.3f}".format(np.mean(test_losses))) | |
| # accuracy over all test data | |
| test_acc = num_correct/len(test_loader.dataset) | |
| print("Test accuracy: {:.3f}".format(test_acc)) | |
| # negative test review | |
| test_review_neg = 'The worst movie I have seen; acting was terrible and I want my money back. This movie had bad acting and the dialogue was slow.' | |
| print(net(test_review_neg)) | |
| if __name__ == "__main__": | |
| main() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment