earthwork-net-model / ena_run_model.py
mac999's picture
Upload 7 files
af359c9 verified
# title: ENA model runner
# author: Taewook Kang, Kyubyung Kang
# date: 2024.3.27
# description: ENA model test and evaluation
# license: MIT
# version
# 0.1. 2024.3.27. create file
#
import json, os, re, logging
import torch, torch.nn as nn, torch.optim as optim, numpy as np, matplotlib.pyplot as plt, seaborn as sns
import torch.nn.functional as F
from torch.optim.lr_scheduler import StepLR, ReduceLROnPlateau
from torch.utils.tensorboard import SummaryWriter
from torch.utils.data import Dataset, DataLoader
from transformers import BertTokenizer, BertForSequenceClassification, BertConfig, BertModel
from sklearn.metrics import confusion_matrix
from collections import defaultdict
from datetime import datetime
from tqdm import tqdm
from ena_dataset import load_train_chunk_data, update_feature_dims_freq, update_onehot_encoding
# write log file using logger
logging.basicConfig(filename= './ewnet_logs.txt', level=logging.DEBUG, format='%(asctime)s %(levelname)s %(message)s', datefmt='%Y%m%d %H:%M')
logger = logging.getLogger('ewnet')
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f'device: {device}')
# param
hyperparam = None
# train model
class EarthworkNetMLP(nn.Module):
def __init__(self, input_dim, hidden_dim, output_dim, dropout_ratio=0.2):
super(EarthworkNetMLP, self).__init__()
models = []
models.append(nn.Linear(input_dim, hidden_dim[0]))
models.append(nn.ReLU())
models.append(nn.BatchNorm1d(hidden_dim[0])) # Batch normalization after activation
models.append(nn.Dropout(dropout_ratio))
for i in range(1, len(hidden_dim)):
models.append(nn.Linear(hidden_dim[i-1], hidden_dim[i]))
models.append(nn.ReLU())
models.append(nn.BatchNorm1d(hidden_dim[i]))
models.append(nn.Dropout(dropout_ratio))
models.append(nn.Linear(hidden_dim[-1], output_dim))
self.layers = nn.Sequential(*models)
def forward(self, x):
# print("Shape of x:", x.shape)
x = self.layers(x)
return x
# train model using LSTM
class EarthworkNetLSTM(nn.Module):
def __init__(self, input_dim, hidden_dim, output_dim, num_layers=2, dropout_ratio=0.2):
super(EarthworkNetLSTM, self).__init__()
# sequence series data. ex) token pattern(slope angle). top(0.5), bottom(0.5), top(0.6), bottom(0.6)...
# time series features = (token_type, curve_angle)
# label = (label_onehot)
models = []
models.append(nn.LSTM(input_dim, hidden_dim[0], num_layers, batch_first=True, dropout=dropout_ratio))
for i in range(1, len(hidden_dim)):
models.append(nn.Linear(hidden_dim[i-1], hidden_dim[i]))
models.append(nn.Linear(hidden_dim[-1], output_dim))
self.layers = nn.Sequential(*models)
def forward(self, x):
# print("Shape of x:", x.shape)
for layer in self.layers:
if type(layer) == torch.nn.modules.rnn.LSTM:
x, _ = layer(x)
else:
x = layer(x)
return x
# create dataset. earthwork_feature -> label
class EarthworkDataset(Dataset):
def __init__(self, raw_data):
self.raw_dataset = raw_data
def __len__(self):
return len(self.raw_dataset)
def __getitem__(self, idx):
# origin_data = self.raw_dataset[idx]
features = self.raw_dataset[idx]['feature_dims'] # already, tokenized from 'feature_text'
label = self.raw_dataset[idx]['label_onehot']
features = torch.tensor(features, dtype=torch.float32).to(device)
label = torch.tensor(label, dtype=torch.float32).to(device)
return features, label
def decode_data_to_geom(input_dataset, predictions, labels, input_feature_dims, label_kinds):
global hyperparam
match_count = 0
for i in range(len(input_dataset)): # batch size
input_geom_features = input_dataset[i].cpu().numpy()
prediction_index = predictions[i].item()
label_index = labels[i].cpu().numpy()
geom_feautres = []
for j in range(len(input_feature_dims)):
if input_geom_features[j] == 0.0:
continue
geom_feautres.append(f'{input_feature_dims[j]}({input_geom_features[j]:.2f})')
prediction_label = label_kinds[prediction_index]
label = label_kinds[label_index]
match = prediction_label == label
if match:
match_count += 1
logger.debug(f'{hyperparam["model"]} {hyperparam["hidden_dim"]} Equal : {prediction_label == label}, Label: {label}, Predicted: {prediction_label}, Geom: {geom_feautres}')
return match_count
def test_mlp_model(model, batch_size, test_raw_dataset, input_feature_dims, label_kinds):
print(f'test data count: {len(test_raw_dataset)}')
test_dataset = EarthworkDataset(test_raw_dataset)
test_dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=True)
# test model
accuracies = []
rmse = 0.0
correct = 0
total = 0
total_match = 0
with torch.no_grad():
for i, (data, labels) in enumerate(test_dataloader):
outputs = model(data)
_, predicted = torch.max(outputs.data, 1)
_, labels = torch.max(labels.data, 1)
total += labels.size(0)
correct += (predicted == labels).sum().item()
accuracies.append(correct / total)
match_count = decode_data_to_geom(data, predicted, labels, input_feature_dims, label_kinds)
total_match += match_count
average_accuracy = correct / total
print(f'Match count: {total_match}, Total count: {total}')
print(f'Accuracy of the network on the test data: {average_accuracy:.4f}')
return accuracies, average_accuracy
def run_MLP_LSTM(model_file_list, base_model):
global hyperparam
# prepare train dataset
data_dir = './dataset'
geom_list = load_train_chunk_data(data_dir)
input_feature_dims = update_feature_dims_freq(geom_list) # input_feature_dims = update_feature_dims_token(geom_list)
label_kinds = update_onehot_encoding(geom_list)
train_raw_dataset = geom_list[:int(len(geom_list) * 0.8)]
test_raw_dataset = geom_list[int(len(geom_list) * 0.8):]
print(f'total data count: {len(geom_list)}')
print(f'train data count: {len(train_raw_dataset)}, test data count: {len(test_raw_dataset)}')
# train model and write it
param_layers = [[128], [128, 64, 32], [256, 128, 64]]
if base_model == 'MLP':
param_layers = [[128, 64, 32], [64, 128, 64], [64, 128, 64, 32], [32, 64, 32]]
for index, param_layer in enumerate(param_layers):
logger.debug(f'model : {base_model}')
params = {
'model': base_model,
'input_dim': len(input_feature_dims),
'hidden_dim': param_layer, # 0.95, [128, 64, 32],
'output_dim': len(label_kinds),
'batch_size': 32,
'epochs': 150, # 150, # 5000
'lr': 0.001
}
hyperparam = params
# create train model
model = EarthworkNetMLP(params['input_dim'], params['hidden_dim'], params['output_dim']).to(device)
if base_model == 'LSTM':
model = EarthworkNetLSTM(params['input_dim'], params['hidden_dim'], params['output_dim']).to(device)
model_file = './' + model_file_list[index]
model.load_state_dict(torch.load(model_file))
model.eval()
accuracies, acc = test_mlp_model(model, params['batch_size'], test_raw_dataset, input_feature_dims, label_kinds)
# Generate random training data
def generate_random_text(label_index, length=100):
base_text = f'This is text for label R{label_index + 1}. '
random_text_length = max(0, length - len(base_text)) # Calculate the length of the random text to generate
random_text = ''.join(random.choice(string.ascii_lowercase + string.digits) for _ in range(random_text_length)) # Generate the random text
return base_text + random_text
# Define dataset class
class EarthworkTransformDataset(Dataset):
def __init__(self, input_ids, attention_mask, labels):
self.input_ids = input_ids
self.attention_mask = attention_mask
self.labels = labels
def __len__(self):
return len(self.input_ids)
def __getitem__(self, idx):
input_ids_tensor = torch.tensor(self.input_ids[idx]).to(device)
attention_mask_tensor = torch.tensor(self.attention_mask[idx]).to(device)
label_tensor = torch.tensor(self.labels[idx]).to(device)
return input_ids_tensor, attention_mask_tensor, label_tensor
# custom transformer
class PositionalEncoding(nn.Module):
def __init__(self, d_model, vocab_size=5000, dropout=0.1):
super().__init__()
self.dropout = nn.Dropout(p=dropout)
pe = torch.zeros(vocab_size, d_model)
position = torch.arange(0, vocab_size, dtype=torch.float).unsqueeze(1)
div_term = torch.exp(
torch.arange(0, d_model, 2).float()
* (-math.log(10000.0) / d_model)
)
pe[:, 0::2] = torch.sin(position * div_term)
pe[:, 1::2] = torch.cos(position * div_term)
pe = pe.unsqueeze(0)
self.register_buffer("pe", pe)
def forward(self, x):
x = x + self.pe[:, : x.size(1), :]
return self.dropout(x)
class EarthworkNetTransformer(nn.Module):
def __init__(
self,
input_feature_size,
d_model,
num_labels,
nhead=8,
dim_feedforward=2048,
dim_fc=[64, 32],
num_layers=6,
dropout=0.1,
activation="relu",
classifier_dropout=0.1,
):
super().__init__()
self.d_model = d_model
# self.pos_encoder = PositionalEncoding(d_model=d_model, dropout=dropout, vocab_size=vocab_size)
self.input_fc = nn.Linear(input_feature_size, d_model)
encoder_layer = nn.TransformerEncoderLayer(
d_model=d_model,
nhead=nhead,
dim_feedforward=dim_feedforward,
dropout=dropout
)
self.src_mask = None
self.nhead = nhead
self.transformer_encoder = nn.TransformerEncoder(
encoder_layer,
num_layers=num_layers,
# TBD. output_attentions=True
)
self.fc_layers = []
fc_layers_dims = [d_model] + dim_fc + [num_labels]
for i in range(1, len(fc_layers_dims)):
fc = nn.Linear(fc_layers_dims[i-1], fc_layers_dims[i]).to(device)
self.fc_layers.append(fc)
self.init_weights()
def generate_square_subsequent_mask(self, sz):
mask = (torch.triu(torch.ones(sz, sz)) == 1).transpose(0, 1)
mask = mask.float().masked_fill(mask == 0, float('-inf')).masked_fill(mask == 1, float(0.0))
return mask
def init_weights(self):
initrange = 0.1
for fc in self.fc_layers:
fc.bias.data.zero_()
fc.weight.data.uniform_(-initrange, initrange)
def forward(self, x, attention_mask):
# x = self.pos_encoder(x)
if self.src_mask is None or self.src_mask.size(0) != len(x):
device = x.device
mask = self.generate_square_subsequent_mask(len(x)).to(device)
self.src_mask = mask
# batch_size = x.shape[0]
# mask = torch.tril(torch.ones(self.nhead, batch_size, batch_size)).to(x.device)
x = x.float()
x = self.input_fc(x)
x = self.transformer_encoder(x, mask=self.src_mask) # , src_key_padding_mask=attention_mask1) # , mask=attention_mask)
# x = x.mean(dim=1)
for fc in self.fc_layers:
x = fc(x)
return x
def run_transform(model_file_list):
data_dir = './dataset'
geom_list = load_train_chunk_data(data_dir)
input_feature_dims = update_feature_dims_freq(geom_list) # input_feature_dims = update_feature_dims_token(geom_list)
label_kinds = update_onehot_encoding(geom_list)
num_labels = len(label_kinds)
max_input_string = max(len(d['feature_text']) for d in geom_list)
max_input_string = 320 # nhead=8. 320=8*40
train_raw_dataset = geom_list[:int(len(geom_list) * 0.8)]
test_raw_dataset = geom_list[int(len(geom_list) * 0.8):]
print(f'total data count: {len(geom_list)}')
print(f'train data count: {len(train_raw_dataset)}, test data count: {len(test_raw_dataset)}')
# Tokenize and pad sequences
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
max_length = max_input_string
batch_sizes = [32, 64, 128]
for index, batch_size in enumerate(batch_sizes):
encoding = {'input_ids': [], 'attention_mask': []}
for d in train_raw_dataset:
token_text = tokenizer(d['feature_text'], padding='max_length', truncation=True, max_length=max_length)
if len(token_text['input_ids']) < max_length: # fill the rest with padding token
token_text['input_ids'] += [tokenizer.pad_token_id] * (max_length - len(token_text['input_ids']))
token_text['attention_mask'] += [0] * (max_length - len(token_text['attention_mask']))
encoding['input_ids'].append(token_text['input_ids'])
encoding['attention_mask'].append(token_text['attention_mask'])
input_ids = encoding['input_ids']
attention_mask = encoding['attention_mask']
label2id = {label: i for i, label in enumerate(sorted(set(d['label'] for d in train_raw_dataset)))}
id2label = {v: k for k, v in label2id.items()}
labels = [label2id[d['label']] for d in train_raw_dataset] # Convert labels to numerical format
# hyperparameters
logger.debug(f'model : transformer')
params = {
'model': 'transformer',
'input_dim': len(input_feature_dims),
'hidden_dim': [64],
'output_dim': len(label2id),
'batch_size': batch_size,
'epochs': 300,
'lr': 1e-5
}
# batch_size = params['batch_size'] # 32, 64, 128
dim_fc = params['hidden_dim']
epochs = params['epochs'] # 5000 # 500 150
# model
model = EarthworkNetTransformer(input_feature_size=max_length, d_model=512, num_labels=len(label2id), dim_fc=dim_fc).to(device)
dataset = EarthworkTransformDataset(input_ids, attention_mask, labels)
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)
# test the model
model_file = './' + model_file_list[index]
model.load_state_dict(torch.load(model_file))
model.eval()
for i, test_raw in enumerate(test_raw_dataset):
label = test_raw['label']
input_text = test_raw['feature_text']
encoding = tokenizer(input_text, return_tensors='pt', padding='max_length', truncation=True, max_length=max_length)
input_ids = encoding['input_ids'].to(device)
attention_mask = encoding['attention_mask'].to(device)
output = model(input_ids, attention_mask)
predicted_label = id2label[output.argmax().item()]
feature_dims = input_text.split(' ')
logger.debug(f'{params["model"]} {params["batch_size"]} Equal : {predicted_label == label}, Label: {label}, Predicted: {predicted_label}, Geom: {feature_dims}')
print(f'test data count: {len(test_raw_dataset)}')
encoding = tokenizer([d['feature_text'] for d in test_raw_dataset], padding='max_length', truncation=True, max_length=max_length)
input_ids = encoding['input_ids']
attention_mask = encoding['attention_mask']
label2id = {label: i for i, label in enumerate(sorted(set(d['label'] for d in test_raw_dataset)))}
id2label = {v: k for k, v in label2id.items()}
labels = [label2id[d['label']] for d in test_raw_dataset] # Convert labels to numerical format
test_dataset = EarthworkTransformDataset(input_ids, attention_mask, labels)
test_dataloader = DataLoader(test_dataset, batch_size=32, shuffle=True)
correct = 0
total = 0
accuracies = []
with torch.no_grad():
for i, (input_ids, attention_mask, labels) in enumerate(tqdm(test_dataloader, desc="test")):
outputs = model(input_ids, attention_mask)
_, predicted = torch.max(outputs, 1)
total += len(labels)
correct += (predicted == labels).sum().item()
accuracies.append(correct / total)
average_accuracy = correct / total
print(f'Accuracy of the network on the test data: {average_accuracy:.4f}')
# BERT model
class EarthworkBertDataset(Dataset):
def __init__(self, input_ids, attention_mask, labels):
self.input_ids = input_ids
self.attention_mask = attention_mask
self.labels = labels
def __len__(self):
return len(self.input_ids)
def __getitem__(self, idx):
input_ids_tensor = torch.tensor(self.input_ids[idx]).to(device)
attention_mask_tensor = torch.tensor(self.attention_mask[idx]).to(device)
label_tensor = torch.tensor(self.labels[idx]).to(device)
return input_ids_tensor, attention_mask_tensor, label_tensor
# Define EarthworkNetTransformer model architecture
class EarthworkNetTransformerBert(torch.nn.Module):
def __init__(self, num_labels):
super(EarthworkNetTransformerBert, self).__init__()
self.bert = BertForSequenceClassification.from_pretrained('bert-base-uncased', num_labels=num_labels, output_attentions=True)
def forward(self, input_ids, attention_mask):
outputs = self.bert(input_ids, attention_mask=attention_mask)
return outputs['logits'], outputs['attentions']
def run_bert(model_file):
# prepare train dataset
data_dir = './dataset'
geom_list = load_train_chunk_data(data_dir)
input_feature_dims = update_feature_dims_freq(geom_list) # input_feature_dims = update_feature_dims_token(geom_list)
label_kinds = update_onehot_encoding(geom_list)
num_labels = len(label_kinds)
max_input_string = max(len(d['feature_text']) for d in geom_list)
train_raw_dataset = geom_list[:int(len(geom_list) * 0.8)]
test_raw_dataset = geom_list[int(len(geom_list) * 0.8):]
print(f'total data count: {len(geom_list)}')
print(f'train data count: {len(train_raw_dataset)}, test data count: {len(test_raw_dataset)}')
# Tokenize and pad sequences
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
max_length = max_input_string
encoding = tokenizer([d['feature_text'] for d in train_raw_dataset], padding=True, truncation=True, max_length=max_length)
input_ids = encoding['input_ids'] # TBD. shape is 50?
attention_mask = encoding['attention_mask']
label2id = {label: i for i, label in enumerate(sorted(set(d['label'] for d in train_raw_dataset)))}
id2label = {v: k for k, v in label2id.items()}
labels = [label2id[d['label']] for d in train_raw_dataset] # Convert labels to numerical format
# Initialize model
model = EarthworkNetTransformerBert(num_labels=len(label2id)).to(device)
epochs = 150 # 50 #
batch_size = 32
params = {
'model': 'BERT',
'input_dim': len(input_feature_dims),
'hidden_dim': 512,
'output_dim': len(label2id),
'batch_size': batch_size,
'epochs': epochs,
'lr': 1e-5,
}
dataset = EarthworkBertDataset(input_ids, attention_mask, labels)
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)
# test the model
logger.debug(f'model : bert')
model_file = './' + model_file
model.load_state_dict(torch.load(model_file))
model.eval()
for i, test_raw in enumerate(test_raw_dataset):
label = test_raw['label']
input_text = test_raw['feature_text']
encoding = tokenizer(input_text, return_tensors='pt', padding=True, truncation=True, max_length=max_length)
input_ids = encoding['input_ids'].to(device)
attention_mask = encoding['attention_mask'].to(device)
output, att = model(input_ids, attention_mask)
predicted_label = id2label[output.argmax().item()]
feature_dims = input_text.split(' ')
logger.debug(f'{params["model"]} Equal : {predicted_label == label}, Label: {label}, Predicted: {predicted_label}, Geom: {feature_dims}')
attention_matrix = att[-1]
attention_layer = attention_matrix[-1]
attention_mat = attention_layer[-1]
# for j, attention_mat in enumerate(attention_layer):
att_mat = attention_mat.detach().cpu().numpy()
fig, ax = plt.subplots()
cax = ax.matshow(att_mat, cmap='viridis')
fig.colorbar(cax)
plt.savefig(f'./graph/bert_attention_{i}.png')
plt.close()
print(f'test data count: {len(test_raw_dataset)}')
encoding = tokenizer([d['feature_text'] for d in test_raw_dataset], padding=True, truncation=True, max_length=max_length)
input_ids = encoding['input_ids']
attention_mask = encoding['attention_mask']
label2id = {label: i for i, label in enumerate(sorted(set(d['label'] for d in test_raw_dataset)))}
id2label = {v: k for k, v in label2id.items()}
labels = [label2id[d['label']] for d in test_raw_dataset] # Convert labels to numerical format
test_dataset = EarthworkBertDataset(input_ids, attention_mask, labels)
test_dataloader = DataLoader(test_dataset, batch_size=32, shuffle=True)
correct = 0
total = 0
accuracies = []
with torch.no_grad():
for i, (input_ids, attention_mask, labels) in enumerate(tqdm(test_dataloader, desc="test")):
outputs, att = model(input_ids, attention_mask)
_, predicted = torch.max(outputs, 1)
total += len(labels)
correct += (predicted == labels).sum().item()
accuracies.append(correct / total)
y_score = torch.nn.functional.softmax(outputs, dim=1)
average_accuracy = correct / total
print(f'Accuracy of the network on the test data: {average_accuracy:.4f}')
if __name__ == '__main__':
models = ['earthwork_model_20240503_1650.pth','earthwork_model_20240503_1714.pth','earthwork_model_20240503_1716.pth','earthwork_model_20240503_1718.pth']
run_MLP_LSTM(models, 'MLP')
models = ['earthwork_model_20240503_1730.pth','earthwork_model_20240503_1732.pth','earthwork_model_20240503_1734.pth']
run_MLP_LSTM(models, 'LSTM')
models = ['earthwork_trans_model_20240503_2003.pth','earthwork_trans_model_20240503_2014.pth','earthwork_trans_model_20240503_2021.pth']
run_transform(models)
run_bert('earthwork_trans_model_20240504_0103.pth')