# title: ENA model runner # author: Taewook Kang, Kyubyung Kang # date: 2024.3.27 # description: ENA model test and evaluation # license: MIT # version # 0.1. 2024.3.27. create file # import json, os, re, logging import torch, torch.nn as nn, torch.optim as optim, numpy as np, matplotlib.pyplot as plt, seaborn as sns import torch.nn.functional as F from torch.optim.lr_scheduler import StepLR, ReduceLROnPlateau from torch.utils.tensorboard import SummaryWriter from torch.utils.data import Dataset, DataLoader from transformers import BertTokenizer, BertForSequenceClassification, BertConfig, BertModel from sklearn.metrics import confusion_matrix from collections import defaultdict from datetime import datetime from tqdm import tqdm from ena_dataset import load_train_chunk_data, update_feature_dims_freq, update_onehot_encoding # write log file using logger logging.basicConfig(filename= './ewnet_logs.txt', level=logging.DEBUG, format='%(asctime)s %(levelname)s %(message)s', datefmt='%Y%m%d %H:%M') logger = logging.getLogger('ewnet') device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') print(f'device: {device}') # param hyperparam = None # train model class EarthworkNetMLP(nn.Module): def __init__(self, input_dim, hidden_dim, output_dim, dropout_ratio=0.2): super(EarthworkNetMLP, self).__init__() models = [] models.append(nn.Linear(input_dim, hidden_dim[0])) models.append(nn.ReLU()) models.append(nn.BatchNorm1d(hidden_dim[0])) # Batch normalization after activation models.append(nn.Dropout(dropout_ratio)) for i in range(1, len(hidden_dim)): models.append(nn.Linear(hidden_dim[i-1], hidden_dim[i])) models.append(nn.ReLU()) models.append(nn.BatchNorm1d(hidden_dim[i])) models.append(nn.Dropout(dropout_ratio)) models.append(nn.Linear(hidden_dim[-1], output_dim)) self.layers = nn.Sequential(*models) def forward(self, x): # print("Shape of x:", x.shape) x = self.layers(x) return x # train model using LSTM class EarthworkNetLSTM(nn.Module): def __init__(self, input_dim, hidden_dim, output_dim, num_layers=2, dropout_ratio=0.2): super(EarthworkNetLSTM, self).__init__() # sequence series data. ex) token pattern(slope angle). top(0.5), bottom(0.5), top(0.6), bottom(0.6)... # time series features = (token_type, curve_angle) # label = (label_onehot) models = [] models.append(nn.LSTM(input_dim, hidden_dim[0], num_layers, batch_first=True, dropout=dropout_ratio)) for i in range(1, len(hidden_dim)): models.append(nn.Linear(hidden_dim[i-1], hidden_dim[i])) models.append(nn.Linear(hidden_dim[-1], output_dim)) self.layers = nn.Sequential(*models) def forward(self, x): # print("Shape of x:", x.shape) for layer in self.layers: if type(layer) == torch.nn.modules.rnn.LSTM: x, _ = layer(x) else: x = layer(x) return x # create dataset. earthwork_feature -> label class EarthworkDataset(Dataset): def __init__(self, raw_data): self.raw_dataset = raw_data def __len__(self): return len(self.raw_dataset) def __getitem__(self, idx): # origin_data = self.raw_dataset[idx] features = self.raw_dataset[idx]['feature_dims'] # already, tokenized from 'feature_text' label = self.raw_dataset[idx]['label_onehot'] features = torch.tensor(features, dtype=torch.float32).to(device) label = torch.tensor(label, dtype=torch.float32).to(device) return features, label def decode_data_to_geom(input_dataset, predictions, labels, input_feature_dims, label_kinds): global hyperparam match_count = 0 for i in range(len(input_dataset)): # batch size input_geom_features = input_dataset[i].cpu().numpy() prediction_index = predictions[i].item() label_index = labels[i].cpu().numpy() geom_feautres = [] for j in range(len(input_feature_dims)): if input_geom_features[j] == 0.0: continue geom_feautres.append(f'{input_feature_dims[j]}({input_geom_features[j]:.2f})') prediction_label = label_kinds[prediction_index] label = label_kinds[label_index] match = prediction_label == label if match: match_count += 1 logger.debug(f'{hyperparam["model"]} {hyperparam["hidden_dim"]} Equal : {prediction_label == label}, Label: {label}, Predicted: {prediction_label}, Geom: {geom_feautres}') return match_count def test_mlp_model(model, batch_size, test_raw_dataset, input_feature_dims, label_kinds): print(f'test data count: {len(test_raw_dataset)}') test_dataset = EarthworkDataset(test_raw_dataset) test_dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=True) # test model accuracies = [] rmse = 0.0 correct = 0 total = 0 total_match = 0 with torch.no_grad(): for i, (data, labels) in enumerate(test_dataloader): outputs = model(data) _, predicted = torch.max(outputs.data, 1) _, labels = torch.max(labels.data, 1) total += labels.size(0) correct += (predicted == labels).sum().item() accuracies.append(correct / total) match_count = decode_data_to_geom(data, predicted, labels, input_feature_dims, label_kinds) total_match += match_count average_accuracy = correct / total print(f'Match count: {total_match}, Total count: {total}') print(f'Accuracy of the network on the test data: {average_accuracy:.4f}') return accuracies, average_accuracy def run_MLP_LSTM(model_file_list, base_model): global hyperparam # prepare train dataset data_dir = './dataset' geom_list = load_train_chunk_data(data_dir) input_feature_dims = update_feature_dims_freq(geom_list) # input_feature_dims = update_feature_dims_token(geom_list) label_kinds = update_onehot_encoding(geom_list) train_raw_dataset = geom_list[:int(len(geom_list) * 0.8)] test_raw_dataset = geom_list[int(len(geom_list) * 0.8):] print(f'total data count: {len(geom_list)}') print(f'train data count: {len(train_raw_dataset)}, test data count: {len(test_raw_dataset)}') # train model and write it param_layers = [[128], [128, 64, 32], [256, 128, 64]] if base_model == 'MLP': param_layers = [[128, 64, 32], [64, 128, 64], [64, 128, 64, 32], [32, 64, 32]] for index, param_layer in enumerate(param_layers): logger.debug(f'model : {base_model}') params = { 'model': base_model, 'input_dim': len(input_feature_dims), 'hidden_dim': param_layer, # 0.95, [128, 64, 32], 'output_dim': len(label_kinds), 'batch_size': 32, 'epochs': 150, # 150, # 5000 'lr': 0.001 } hyperparam = params # create train model model = EarthworkNetMLP(params['input_dim'], params['hidden_dim'], params['output_dim']).to(device) if base_model == 'LSTM': model = EarthworkNetLSTM(params['input_dim'], params['hidden_dim'], params['output_dim']).to(device) model_file = './' + model_file_list[index] model.load_state_dict(torch.load(model_file)) model.eval() accuracies, acc = test_mlp_model(model, params['batch_size'], test_raw_dataset, input_feature_dims, label_kinds) # Generate random training data def generate_random_text(label_index, length=100): base_text = f'This is text for label R{label_index + 1}. ' random_text_length = max(0, length - len(base_text)) # Calculate the length of the random text to generate random_text = ''.join(random.choice(string.ascii_lowercase + string.digits) for _ in range(random_text_length)) # Generate the random text return base_text + random_text # Define dataset class class EarthworkTransformDataset(Dataset): def __init__(self, input_ids, attention_mask, labels): self.input_ids = input_ids self.attention_mask = attention_mask self.labels = labels def __len__(self): return len(self.input_ids) def __getitem__(self, idx): input_ids_tensor = torch.tensor(self.input_ids[idx]).to(device) attention_mask_tensor = torch.tensor(self.attention_mask[idx]).to(device) label_tensor = torch.tensor(self.labels[idx]).to(device) return input_ids_tensor, attention_mask_tensor, label_tensor # custom transformer class PositionalEncoding(nn.Module): def __init__(self, d_model, vocab_size=5000, dropout=0.1): super().__init__() self.dropout = nn.Dropout(p=dropout) pe = torch.zeros(vocab_size, d_model) position = torch.arange(0, vocab_size, dtype=torch.float).unsqueeze(1) div_term = torch.exp( torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model) ) pe[:, 0::2] = torch.sin(position * div_term) pe[:, 1::2] = torch.cos(position * div_term) pe = pe.unsqueeze(0) self.register_buffer("pe", pe) def forward(self, x): x = x + self.pe[:, : x.size(1), :] return self.dropout(x) class EarthworkNetTransformer(nn.Module): def __init__( self, input_feature_size, d_model, num_labels, nhead=8, dim_feedforward=2048, dim_fc=[64, 32], num_layers=6, dropout=0.1, activation="relu", classifier_dropout=0.1, ): super().__init__() self.d_model = d_model # self.pos_encoder = PositionalEncoding(d_model=d_model, dropout=dropout, vocab_size=vocab_size) self.input_fc = nn.Linear(input_feature_size, d_model) encoder_layer = nn.TransformerEncoderLayer( d_model=d_model, nhead=nhead, dim_feedforward=dim_feedforward, dropout=dropout ) self.src_mask = None self.nhead = nhead self.transformer_encoder = nn.TransformerEncoder( encoder_layer, num_layers=num_layers, # TBD. output_attentions=True ) self.fc_layers = [] fc_layers_dims = [d_model] + dim_fc + [num_labels] for i in range(1, len(fc_layers_dims)): fc = nn.Linear(fc_layers_dims[i-1], fc_layers_dims[i]).to(device) self.fc_layers.append(fc) self.init_weights() def generate_square_subsequent_mask(self, sz): mask = (torch.triu(torch.ones(sz, sz)) == 1).transpose(0, 1) mask = mask.float().masked_fill(mask == 0, float('-inf')).masked_fill(mask == 1, float(0.0)) return mask def init_weights(self): initrange = 0.1 for fc in self.fc_layers: fc.bias.data.zero_() fc.weight.data.uniform_(-initrange, initrange) def forward(self, x, attention_mask): # x = self.pos_encoder(x) if self.src_mask is None or self.src_mask.size(0) != len(x): device = x.device mask = self.generate_square_subsequent_mask(len(x)).to(device) self.src_mask = mask # batch_size = x.shape[0] # mask = torch.tril(torch.ones(self.nhead, batch_size, batch_size)).to(x.device) x = x.float() x = self.input_fc(x) x = self.transformer_encoder(x, mask=self.src_mask) # , src_key_padding_mask=attention_mask1) # , mask=attention_mask) # x = x.mean(dim=1) for fc in self.fc_layers: x = fc(x) return x def run_transform(model_file_list): data_dir = './dataset' geom_list = load_train_chunk_data(data_dir) input_feature_dims = update_feature_dims_freq(geom_list) # input_feature_dims = update_feature_dims_token(geom_list) label_kinds = update_onehot_encoding(geom_list) num_labels = len(label_kinds) max_input_string = max(len(d['feature_text']) for d in geom_list) max_input_string = 320 # nhead=8. 320=8*40 train_raw_dataset = geom_list[:int(len(geom_list) * 0.8)] test_raw_dataset = geom_list[int(len(geom_list) * 0.8):] print(f'total data count: {len(geom_list)}') print(f'train data count: {len(train_raw_dataset)}, test data count: {len(test_raw_dataset)}') # Tokenize and pad sequences tokenizer = BertTokenizer.from_pretrained('bert-base-uncased') max_length = max_input_string batch_sizes = [32, 64, 128] for index, batch_size in enumerate(batch_sizes): encoding = {'input_ids': [], 'attention_mask': []} for d in train_raw_dataset: token_text = tokenizer(d['feature_text'], padding='max_length', truncation=True, max_length=max_length) if len(token_text['input_ids']) < max_length: # fill the rest with padding token token_text['input_ids'] += [tokenizer.pad_token_id] * (max_length - len(token_text['input_ids'])) token_text['attention_mask'] += [0] * (max_length - len(token_text['attention_mask'])) encoding['input_ids'].append(token_text['input_ids']) encoding['attention_mask'].append(token_text['attention_mask']) input_ids = encoding['input_ids'] attention_mask = encoding['attention_mask'] label2id = {label: i for i, label in enumerate(sorted(set(d['label'] for d in train_raw_dataset)))} id2label = {v: k for k, v in label2id.items()} labels = [label2id[d['label']] for d in train_raw_dataset] # Convert labels to numerical format # hyperparameters logger.debug(f'model : transformer') params = { 'model': 'transformer', 'input_dim': len(input_feature_dims), 'hidden_dim': [64], 'output_dim': len(label2id), 'batch_size': batch_size, 'epochs': 300, 'lr': 1e-5 } # batch_size = params['batch_size'] # 32, 64, 128 dim_fc = params['hidden_dim'] epochs = params['epochs'] # 5000 # 500 150 # model model = EarthworkNetTransformer(input_feature_size=max_length, d_model=512, num_labels=len(label2id), dim_fc=dim_fc).to(device) dataset = EarthworkTransformDataset(input_ids, attention_mask, labels) dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True) # test the model model_file = './' + model_file_list[index] model.load_state_dict(torch.load(model_file)) model.eval() for i, test_raw in enumerate(test_raw_dataset): label = test_raw['label'] input_text = test_raw['feature_text'] encoding = tokenizer(input_text, return_tensors='pt', padding='max_length', truncation=True, max_length=max_length) input_ids = encoding['input_ids'].to(device) attention_mask = encoding['attention_mask'].to(device) output = model(input_ids, attention_mask) predicted_label = id2label[output.argmax().item()] feature_dims = input_text.split(' ') logger.debug(f'{params["model"]} {params["batch_size"]} Equal : {predicted_label == label}, Label: {label}, Predicted: {predicted_label}, Geom: {feature_dims}') print(f'test data count: {len(test_raw_dataset)}') encoding = tokenizer([d['feature_text'] for d in test_raw_dataset], padding='max_length', truncation=True, max_length=max_length) input_ids = encoding['input_ids'] attention_mask = encoding['attention_mask'] label2id = {label: i for i, label in enumerate(sorted(set(d['label'] for d in test_raw_dataset)))} id2label = {v: k for k, v in label2id.items()} labels = [label2id[d['label']] for d in test_raw_dataset] # Convert labels to numerical format test_dataset = EarthworkTransformDataset(input_ids, attention_mask, labels) test_dataloader = DataLoader(test_dataset, batch_size=32, shuffle=True) correct = 0 total = 0 accuracies = [] with torch.no_grad(): for i, (input_ids, attention_mask, labels) in enumerate(tqdm(test_dataloader, desc="test")): outputs = model(input_ids, attention_mask) _, predicted = torch.max(outputs, 1) total += len(labels) correct += (predicted == labels).sum().item() accuracies.append(correct / total) average_accuracy = correct / total print(f'Accuracy of the network on the test data: {average_accuracy:.4f}') # BERT model class EarthworkBertDataset(Dataset): def __init__(self, input_ids, attention_mask, labels): self.input_ids = input_ids self.attention_mask = attention_mask self.labels = labels def __len__(self): return len(self.input_ids) def __getitem__(self, idx): input_ids_tensor = torch.tensor(self.input_ids[idx]).to(device) attention_mask_tensor = torch.tensor(self.attention_mask[idx]).to(device) label_tensor = torch.tensor(self.labels[idx]).to(device) return input_ids_tensor, attention_mask_tensor, label_tensor # Define EarthworkNetTransformer model architecture class EarthworkNetTransformerBert(torch.nn.Module): def __init__(self, num_labels): super(EarthworkNetTransformerBert, self).__init__() self.bert = BertForSequenceClassification.from_pretrained('bert-base-uncased', num_labels=num_labels, output_attentions=True) def forward(self, input_ids, attention_mask): outputs = self.bert(input_ids, attention_mask=attention_mask) return outputs['logits'], outputs['attentions'] def run_bert(model_file): # prepare train dataset data_dir = './dataset' geom_list = load_train_chunk_data(data_dir) input_feature_dims = update_feature_dims_freq(geom_list) # input_feature_dims = update_feature_dims_token(geom_list) label_kinds = update_onehot_encoding(geom_list) num_labels = len(label_kinds) max_input_string = max(len(d['feature_text']) for d in geom_list) train_raw_dataset = geom_list[:int(len(geom_list) * 0.8)] test_raw_dataset = geom_list[int(len(geom_list) * 0.8):] print(f'total data count: {len(geom_list)}') print(f'train data count: {len(train_raw_dataset)}, test data count: {len(test_raw_dataset)}') # Tokenize and pad sequences tokenizer = BertTokenizer.from_pretrained('bert-base-uncased') max_length = max_input_string encoding = tokenizer([d['feature_text'] for d in train_raw_dataset], padding=True, truncation=True, max_length=max_length) input_ids = encoding['input_ids'] # TBD. shape is 50? attention_mask = encoding['attention_mask'] label2id = {label: i for i, label in enumerate(sorted(set(d['label'] for d in train_raw_dataset)))} id2label = {v: k for k, v in label2id.items()} labels = [label2id[d['label']] for d in train_raw_dataset] # Convert labels to numerical format # Initialize model model = EarthworkNetTransformerBert(num_labels=len(label2id)).to(device) epochs = 150 # 50 # batch_size = 32 params = { 'model': 'BERT', 'input_dim': len(input_feature_dims), 'hidden_dim': 512, 'output_dim': len(label2id), 'batch_size': batch_size, 'epochs': epochs, 'lr': 1e-5, } dataset = EarthworkBertDataset(input_ids, attention_mask, labels) dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True) # test the model logger.debug(f'model : bert') model_file = './' + model_file model.load_state_dict(torch.load(model_file)) model.eval() for i, test_raw in enumerate(test_raw_dataset): label = test_raw['label'] input_text = test_raw['feature_text'] encoding = tokenizer(input_text, return_tensors='pt', padding=True, truncation=True, max_length=max_length) input_ids = encoding['input_ids'].to(device) attention_mask = encoding['attention_mask'].to(device) output, att = model(input_ids, attention_mask) predicted_label = id2label[output.argmax().item()] feature_dims = input_text.split(' ') logger.debug(f'{params["model"]} Equal : {predicted_label == label}, Label: {label}, Predicted: {predicted_label}, Geom: {feature_dims}') attention_matrix = att[-1] attention_layer = attention_matrix[-1] attention_mat = attention_layer[-1] # for j, attention_mat in enumerate(attention_layer): att_mat = attention_mat.detach().cpu().numpy() fig, ax = plt.subplots() cax = ax.matshow(att_mat, cmap='viridis') fig.colorbar(cax) plt.savefig(f'./graph/bert_attention_{i}.png') plt.close() print(f'test data count: {len(test_raw_dataset)}') encoding = tokenizer([d['feature_text'] for d in test_raw_dataset], padding=True, truncation=True, max_length=max_length) input_ids = encoding['input_ids'] attention_mask = encoding['attention_mask'] label2id = {label: i for i, label in enumerate(sorted(set(d['label'] for d in test_raw_dataset)))} id2label = {v: k for k, v in label2id.items()} labels = [label2id[d['label']] for d in test_raw_dataset] # Convert labels to numerical format test_dataset = EarthworkBertDataset(input_ids, attention_mask, labels) test_dataloader = DataLoader(test_dataset, batch_size=32, shuffle=True) correct = 0 total = 0 accuracies = [] with torch.no_grad(): for i, (input_ids, attention_mask, labels) in enumerate(tqdm(test_dataloader, desc="test")): outputs, att = model(input_ids, attention_mask) _, predicted = torch.max(outputs, 1) total += len(labels) correct += (predicted == labels).sum().item() accuracies.append(correct / total) y_score = torch.nn.functional.softmax(outputs, dim=1) average_accuracy = correct / total print(f'Accuracy of the network on the test data: {average_accuracy:.4f}') if __name__ == '__main__': models = ['earthwork_model_20240503_1650.pth','earthwork_model_20240503_1714.pth','earthwork_model_20240503_1716.pth','earthwork_model_20240503_1718.pth'] run_MLP_LSTM(models, 'MLP') models = ['earthwork_model_20240503_1730.pth','earthwork_model_20240503_1732.pth','earthwork_model_20240503_1734.pth'] run_MLP_LSTM(models, 'LSTM') models = ['earthwork_trans_model_20240503_2003.pth','earthwork_trans_model_20240503_2014.pth','earthwork_trans_model_20240503_2021.pth'] run_transform(models) run_bert('earthwork_trans_model_20240504_0103.pth')