|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import json, os, re, logging
|
|
import torch, torch.nn as nn, torch.optim as optim, numpy as np, matplotlib.pyplot as plt, seaborn as sns
|
|
import torch.nn.functional as F
|
|
from torch.optim.lr_scheduler import StepLR, ReduceLROnPlateau
|
|
from torch.utils.tensorboard import SummaryWriter
|
|
from torch.utils.data import Dataset, DataLoader
|
|
from transformers import BertTokenizer, BertForSequenceClassification, BertConfig, BertModel
|
|
from sklearn.metrics import confusion_matrix
|
|
from collections import defaultdict
|
|
from datetime import datetime
|
|
from tqdm import tqdm
|
|
from ena_dataset import load_train_chunk_data, update_feature_dims_freq, update_onehot_encoding
|
|
|
|
|
|
logging.basicConfig(filename= './ewnet_logs.txt', level=logging.DEBUG, format='%(asctime)s %(levelname)s %(message)s', datefmt='%Y%m%d %H:%M')
|
|
logger = logging.getLogger('ewnet')
|
|
|
|
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
|
|
print(f'device: {device}')
|
|
|
|
|
|
hyperparam = None
|
|
|
|
|
|
class EarthworkNetMLP(nn.Module):
|
|
def __init__(self, input_dim, hidden_dim, output_dim, dropout_ratio=0.2):
|
|
super(EarthworkNetMLP, self).__init__()
|
|
|
|
models = []
|
|
models.append(nn.Linear(input_dim, hidden_dim[0]))
|
|
models.append(nn.ReLU())
|
|
models.append(nn.BatchNorm1d(hidden_dim[0]))
|
|
models.append(nn.Dropout(dropout_ratio))
|
|
|
|
for i in range(1, len(hidden_dim)):
|
|
models.append(nn.Linear(hidden_dim[i-1], hidden_dim[i]))
|
|
models.append(nn.ReLU())
|
|
models.append(nn.BatchNorm1d(hidden_dim[i]))
|
|
models.append(nn.Dropout(dropout_ratio))
|
|
|
|
models.append(nn.Linear(hidden_dim[-1], output_dim))
|
|
self.layers = nn.Sequential(*models)
|
|
|
|
def forward(self, x):
|
|
|
|
x = self.layers(x)
|
|
return x
|
|
|
|
|
|
class EarthworkNetLSTM(nn.Module):
|
|
def __init__(self, input_dim, hidden_dim, output_dim, num_layers=2, dropout_ratio=0.2):
|
|
super(EarthworkNetLSTM, self).__init__()
|
|
|
|
|
|
|
|
|
|
models = []
|
|
|
|
models.append(nn.LSTM(input_dim, hidden_dim[0], num_layers, batch_first=True, dropout=dropout_ratio))
|
|
for i in range(1, len(hidden_dim)):
|
|
models.append(nn.Linear(hidden_dim[i-1], hidden_dim[i]))
|
|
|
|
models.append(nn.Linear(hidden_dim[-1], output_dim))
|
|
self.layers = nn.Sequential(*models)
|
|
|
|
def forward(self, x):
|
|
|
|
for layer in self.layers:
|
|
if type(layer) == torch.nn.modules.rnn.LSTM:
|
|
x, _ = layer(x)
|
|
else:
|
|
x = layer(x)
|
|
|
|
return x
|
|
|
|
|
|
class EarthworkDataset(Dataset):
|
|
def __init__(self, raw_data):
|
|
self.raw_dataset = raw_data
|
|
|
|
def __len__(self):
|
|
return len(self.raw_dataset)
|
|
|
|
def __getitem__(self, idx):
|
|
|
|
features = self.raw_dataset[idx]['feature_dims']
|
|
label = self.raw_dataset[idx]['label_onehot']
|
|
features = torch.tensor(features, dtype=torch.float32).to(device)
|
|
label = torch.tensor(label, dtype=torch.float32).to(device)
|
|
return features, label
|
|
|
|
def decode_data_to_geom(input_dataset, predictions, labels, input_feature_dims, label_kinds):
|
|
global hyperparam
|
|
match_count = 0
|
|
for i in range(len(input_dataset)):
|
|
input_geom_features = input_dataset[i].cpu().numpy()
|
|
prediction_index = predictions[i].item()
|
|
label_index = labels[i].cpu().numpy()
|
|
|
|
geom_feautres = []
|
|
for j in range(len(input_feature_dims)):
|
|
if input_geom_features[j] == 0.0:
|
|
continue
|
|
geom_feautres.append(f'{input_feature_dims[j]}({input_geom_features[j]:.2f})')
|
|
|
|
prediction_label = label_kinds[prediction_index]
|
|
label = label_kinds[label_index]
|
|
|
|
match = prediction_label == label
|
|
if match:
|
|
match_count += 1
|
|
logger.debug(f'{hyperparam["model"]} {hyperparam["hidden_dim"]} Equal : {prediction_label == label}, Label: {label}, Predicted: {prediction_label}, Geom: {geom_feautres}')
|
|
|
|
return match_count
|
|
|
|
def test_mlp_model(model, batch_size, test_raw_dataset, input_feature_dims, label_kinds):
|
|
print(f'test data count: {len(test_raw_dataset)}')
|
|
test_dataset = EarthworkDataset(test_raw_dataset)
|
|
test_dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=True)
|
|
|
|
|
|
accuracies = []
|
|
rmse = 0.0
|
|
correct = 0
|
|
total = 0
|
|
total_match = 0
|
|
with torch.no_grad():
|
|
for i, (data, labels) in enumerate(test_dataloader):
|
|
outputs = model(data)
|
|
_, predicted = torch.max(outputs.data, 1)
|
|
_, labels = torch.max(labels.data, 1)
|
|
total += labels.size(0)
|
|
correct += (predicted == labels).sum().item()
|
|
accuracies.append(correct / total)
|
|
|
|
match_count = decode_data_to_geom(data, predicted, labels, input_feature_dims, label_kinds)
|
|
total_match += match_count
|
|
|
|
average_accuracy = correct / total
|
|
print(f'Match count: {total_match}, Total count: {total}')
|
|
print(f'Accuracy of the network on the test data: {average_accuracy:.4f}')
|
|
return accuracies, average_accuracy
|
|
|
|
def run_MLP_LSTM(model_file_list, base_model):
|
|
global hyperparam
|
|
|
|
|
|
data_dir = './dataset'
|
|
geom_list = load_train_chunk_data(data_dir)
|
|
input_feature_dims = update_feature_dims_freq(geom_list)
|
|
label_kinds = update_onehot_encoding(geom_list)
|
|
|
|
train_raw_dataset = geom_list[:int(len(geom_list) * 0.8)]
|
|
test_raw_dataset = geom_list[int(len(geom_list) * 0.8):]
|
|
print(f'total data count: {len(geom_list)}')
|
|
print(f'train data count: {len(train_raw_dataset)}, test data count: {len(test_raw_dataset)}')
|
|
|
|
|
|
param_layers = [[128], [128, 64, 32], [256, 128, 64]]
|
|
if base_model == 'MLP':
|
|
param_layers = [[128, 64, 32], [64, 128, 64], [64, 128, 64, 32], [32, 64, 32]]
|
|
for index, param_layer in enumerate(param_layers):
|
|
logger.debug(f'model : {base_model}')
|
|
|
|
params = {
|
|
'model': base_model,
|
|
'input_dim': len(input_feature_dims),
|
|
'hidden_dim': param_layer,
|
|
'output_dim': len(label_kinds),
|
|
'batch_size': 32,
|
|
'epochs': 150,
|
|
'lr': 0.001
|
|
}
|
|
hyperparam = params
|
|
|
|
model = EarthworkNetMLP(params['input_dim'], params['hidden_dim'], params['output_dim']).to(device)
|
|
if base_model == 'LSTM':
|
|
model = EarthworkNetLSTM(params['input_dim'], params['hidden_dim'], params['output_dim']).to(device)
|
|
model_file = './' + model_file_list[index]
|
|
model.load_state_dict(torch.load(model_file))
|
|
model.eval()
|
|
|
|
accuracies, acc = test_mlp_model(model, params['batch_size'], test_raw_dataset, input_feature_dims, label_kinds)
|
|
|
|
|
|
def generate_random_text(label_index, length=100):
|
|
base_text = f'This is text for label R{label_index + 1}. '
|
|
random_text_length = max(0, length - len(base_text))
|
|
random_text = ''.join(random.choice(string.ascii_lowercase + string.digits) for _ in range(random_text_length))
|
|
return base_text + random_text
|
|
|
|
|
|
class EarthworkTransformDataset(Dataset):
|
|
def __init__(self, input_ids, attention_mask, labels):
|
|
self.input_ids = input_ids
|
|
self.attention_mask = attention_mask
|
|
self.labels = labels
|
|
|
|
def __len__(self):
|
|
return len(self.input_ids)
|
|
|
|
def __getitem__(self, idx):
|
|
input_ids_tensor = torch.tensor(self.input_ids[idx]).to(device)
|
|
attention_mask_tensor = torch.tensor(self.attention_mask[idx]).to(device)
|
|
label_tensor = torch.tensor(self.labels[idx]).to(device)
|
|
return input_ids_tensor, attention_mask_tensor, label_tensor
|
|
|
|
|
|
class PositionalEncoding(nn.Module):
|
|
def __init__(self, d_model, vocab_size=5000, dropout=0.1):
|
|
super().__init__()
|
|
self.dropout = nn.Dropout(p=dropout)
|
|
|
|
pe = torch.zeros(vocab_size, d_model)
|
|
position = torch.arange(0, vocab_size, dtype=torch.float).unsqueeze(1)
|
|
div_term = torch.exp(
|
|
torch.arange(0, d_model, 2).float()
|
|
* (-math.log(10000.0) / d_model)
|
|
)
|
|
pe[:, 0::2] = torch.sin(position * div_term)
|
|
pe[:, 1::2] = torch.cos(position * div_term)
|
|
pe = pe.unsqueeze(0)
|
|
self.register_buffer("pe", pe)
|
|
|
|
def forward(self, x):
|
|
x = x + self.pe[:, : x.size(1), :]
|
|
return self.dropout(x)
|
|
|
|
class EarthworkNetTransformer(nn.Module):
|
|
def __init__(
|
|
self,
|
|
input_feature_size,
|
|
d_model,
|
|
num_labels,
|
|
nhead=8,
|
|
dim_feedforward=2048,
|
|
dim_fc=[64, 32],
|
|
num_layers=6,
|
|
dropout=0.1,
|
|
activation="relu",
|
|
classifier_dropout=0.1,
|
|
):
|
|
super().__init__()
|
|
|
|
self.d_model = d_model
|
|
|
|
|
|
self.input_fc = nn.Linear(input_feature_size, d_model)
|
|
encoder_layer = nn.TransformerEncoderLayer(
|
|
d_model=d_model,
|
|
nhead=nhead,
|
|
dim_feedforward=dim_feedforward,
|
|
dropout=dropout
|
|
)
|
|
|
|
self.src_mask = None
|
|
self.nhead = nhead
|
|
self.transformer_encoder = nn.TransformerEncoder(
|
|
encoder_layer,
|
|
num_layers=num_layers,
|
|
|
|
)
|
|
self.fc_layers = []
|
|
fc_layers_dims = [d_model] + dim_fc + [num_labels]
|
|
for i in range(1, len(fc_layers_dims)):
|
|
fc = nn.Linear(fc_layers_dims[i-1], fc_layers_dims[i]).to(device)
|
|
self.fc_layers.append(fc)
|
|
|
|
self.init_weights()
|
|
|
|
def generate_square_subsequent_mask(self, sz):
|
|
mask = (torch.triu(torch.ones(sz, sz)) == 1).transpose(0, 1)
|
|
mask = mask.float().masked_fill(mask == 0, float('-inf')).masked_fill(mask == 1, float(0.0))
|
|
return mask
|
|
|
|
def init_weights(self):
|
|
initrange = 0.1
|
|
for fc in self.fc_layers:
|
|
fc.bias.data.zero_()
|
|
fc.weight.data.uniform_(-initrange, initrange)
|
|
|
|
def forward(self, x, attention_mask):
|
|
|
|
if self.src_mask is None or self.src_mask.size(0) != len(x):
|
|
device = x.device
|
|
mask = self.generate_square_subsequent_mask(len(x)).to(device)
|
|
self.src_mask = mask
|
|
|
|
|
|
|
|
x = x.float()
|
|
x = self.input_fc(x)
|
|
x = self.transformer_encoder(x, mask=self.src_mask)
|
|
|
|
for fc in self.fc_layers:
|
|
x = fc(x)
|
|
|
|
return x
|
|
|
|
def run_transform(model_file_list):
|
|
data_dir = './dataset'
|
|
geom_list = load_train_chunk_data(data_dir)
|
|
input_feature_dims = update_feature_dims_freq(geom_list)
|
|
label_kinds = update_onehot_encoding(geom_list)
|
|
num_labels = len(label_kinds)
|
|
max_input_string = max(len(d['feature_text']) for d in geom_list)
|
|
max_input_string = 320
|
|
|
|
train_raw_dataset = geom_list[:int(len(geom_list) * 0.8)]
|
|
test_raw_dataset = geom_list[int(len(geom_list) * 0.8):]
|
|
print(f'total data count: {len(geom_list)}')
|
|
print(f'train data count: {len(train_raw_dataset)}, test data count: {len(test_raw_dataset)}')
|
|
|
|
|
|
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
|
|
max_length = max_input_string
|
|
|
|
batch_sizes = [32, 64, 128]
|
|
for index, batch_size in enumerate(batch_sizes):
|
|
encoding = {'input_ids': [], 'attention_mask': []}
|
|
for d in train_raw_dataset:
|
|
token_text = tokenizer(d['feature_text'], padding='max_length', truncation=True, max_length=max_length)
|
|
if len(token_text['input_ids']) < max_length:
|
|
token_text['input_ids'] += [tokenizer.pad_token_id] * (max_length - len(token_text['input_ids']))
|
|
token_text['attention_mask'] += [0] * (max_length - len(token_text['attention_mask']))
|
|
encoding['input_ids'].append(token_text['input_ids'])
|
|
encoding['attention_mask'].append(token_text['attention_mask'])
|
|
|
|
input_ids = encoding['input_ids']
|
|
attention_mask = encoding['attention_mask']
|
|
|
|
label2id = {label: i for i, label in enumerate(sorted(set(d['label'] for d in train_raw_dataset)))}
|
|
id2label = {v: k for k, v in label2id.items()}
|
|
labels = [label2id[d['label']] for d in train_raw_dataset]
|
|
|
|
|
|
logger.debug(f'model : transformer')
|
|
|
|
params = {
|
|
'model': 'transformer',
|
|
'input_dim': len(input_feature_dims),
|
|
'hidden_dim': [64],
|
|
'output_dim': len(label2id),
|
|
'batch_size': batch_size,
|
|
'epochs': 300,
|
|
'lr': 1e-5
|
|
}
|
|
|
|
|
|
dim_fc = params['hidden_dim']
|
|
epochs = params['epochs']
|
|
|
|
|
|
model = EarthworkNetTransformer(input_feature_size=max_length, d_model=512, num_labels=len(label2id), dim_fc=dim_fc).to(device)
|
|
dataset = EarthworkTransformDataset(input_ids, attention_mask, labels)
|
|
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)
|
|
|
|
|
|
model_file = './' + model_file_list[index]
|
|
model.load_state_dict(torch.load(model_file))
|
|
model.eval()
|
|
|
|
for i, test_raw in enumerate(test_raw_dataset):
|
|
label = test_raw['label']
|
|
input_text = test_raw['feature_text']
|
|
encoding = tokenizer(input_text, return_tensors='pt', padding='max_length', truncation=True, max_length=max_length)
|
|
input_ids = encoding['input_ids'].to(device)
|
|
attention_mask = encoding['attention_mask'].to(device)
|
|
output = model(input_ids, attention_mask)
|
|
predicted_label = id2label[output.argmax().item()]
|
|
|
|
feature_dims = input_text.split(' ')
|
|
logger.debug(f'{params["model"]} {params["batch_size"]} Equal : {predicted_label == label}, Label: {label}, Predicted: {predicted_label}, Geom: {feature_dims}')
|
|
|
|
|
|
print(f'test data count: {len(test_raw_dataset)}')
|
|
encoding = tokenizer([d['feature_text'] for d in test_raw_dataset], padding='max_length', truncation=True, max_length=max_length)
|
|
input_ids = encoding['input_ids']
|
|
attention_mask = encoding['attention_mask']
|
|
|
|
label2id = {label: i for i, label in enumerate(sorted(set(d['label'] for d in test_raw_dataset)))}
|
|
id2label = {v: k for k, v in label2id.items()}
|
|
labels = [label2id[d['label']] for d in test_raw_dataset]
|
|
|
|
test_dataset = EarthworkTransformDataset(input_ids, attention_mask, labels)
|
|
test_dataloader = DataLoader(test_dataset, batch_size=32, shuffle=True)
|
|
|
|
correct = 0
|
|
total = 0
|
|
accuracies = []
|
|
with torch.no_grad():
|
|
for i, (input_ids, attention_mask, labels) in enumerate(tqdm(test_dataloader, desc="test")):
|
|
outputs = model(input_ids, attention_mask)
|
|
_, predicted = torch.max(outputs, 1)
|
|
total += len(labels)
|
|
correct += (predicted == labels).sum().item()
|
|
accuracies.append(correct / total)
|
|
|
|
average_accuracy = correct / total
|
|
print(f'Accuracy of the network on the test data: {average_accuracy:.4f}')
|
|
|
|
|
|
class EarthworkBertDataset(Dataset):
|
|
def __init__(self, input_ids, attention_mask, labels):
|
|
self.input_ids = input_ids
|
|
self.attention_mask = attention_mask
|
|
self.labels = labels
|
|
|
|
def __len__(self):
|
|
return len(self.input_ids)
|
|
|
|
def __getitem__(self, idx):
|
|
input_ids_tensor = torch.tensor(self.input_ids[idx]).to(device)
|
|
attention_mask_tensor = torch.tensor(self.attention_mask[idx]).to(device)
|
|
label_tensor = torch.tensor(self.labels[idx]).to(device)
|
|
return input_ids_tensor, attention_mask_tensor, label_tensor
|
|
|
|
|
|
class EarthworkNetTransformerBert(torch.nn.Module):
|
|
def __init__(self, num_labels):
|
|
super(EarthworkNetTransformerBert, self).__init__()
|
|
self.bert = BertForSequenceClassification.from_pretrained('bert-base-uncased', num_labels=num_labels, output_attentions=True)
|
|
|
|
def forward(self, input_ids, attention_mask):
|
|
outputs = self.bert(input_ids, attention_mask=attention_mask)
|
|
return outputs['logits'], outputs['attentions']
|
|
|
|
def run_bert(model_file):
|
|
|
|
data_dir = './dataset'
|
|
geom_list = load_train_chunk_data(data_dir)
|
|
input_feature_dims = update_feature_dims_freq(geom_list)
|
|
label_kinds = update_onehot_encoding(geom_list)
|
|
num_labels = len(label_kinds)
|
|
max_input_string = max(len(d['feature_text']) for d in geom_list)
|
|
|
|
train_raw_dataset = geom_list[:int(len(geom_list) * 0.8)]
|
|
test_raw_dataset = geom_list[int(len(geom_list) * 0.8):]
|
|
print(f'total data count: {len(geom_list)}')
|
|
print(f'train data count: {len(train_raw_dataset)}, test data count: {len(test_raw_dataset)}')
|
|
|
|
|
|
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
|
|
max_length = max_input_string
|
|
|
|
encoding = tokenizer([d['feature_text'] for d in train_raw_dataset], padding=True, truncation=True, max_length=max_length)
|
|
input_ids = encoding['input_ids']
|
|
attention_mask = encoding['attention_mask']
|
|
|
|
label2id = {label: i for i, label in enumerate(sorted(set(d['label'] for d in train_raw_dataset)))}
|
|
id2label = {v: k for k, v in label2id.items()}
|
|
labels = [label2id[d['label']] for d in train_raw_dataset]
|
|
|
|
|
|
model = EarthworkNetTransformerBert(num_labels=len(label2id)).to(device)
|
|
|
|
epochs = 150
|
|
batch_size = 32
|
|
params = {
|
|
'model': 'BERT',
|
|
'input_dim': len(input_feature_dims),
|
|
'hidden_dim': 512,
|
|
'output_dim': len(label2id),
|
|
'batch_size': batch_size,
|
|
'epochs': epochs,
|
|
'lr': 1e-5,
|
|
}
|
|
|
|
dataset = EarthworkBertDataset(input_ids, attention_mask, labels)
|
|
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)
|
|
|
|
|
|
logger.debug(f'model : bert')
|
|
|
|
model_file = './' + model_file
|
|
model.load_state_dict(torch.load(model_file))
|
|
model.eval()
|
|
|
|
for i, test_raw in enumerate(test_raw_dataset):
|
|
label = test_raw['label']
|
|
input_text = test_raw['feature_text']
|
|
encoding = tokenizer(input_text, return_tensors='pt', padding=True, truncation=True, max_length=max_length)
|
|
input_ids = encoding['input_ids'].to(device)
|
|
attention_mask = encoding['attention_mask'].to(device)
|
|
output, att = model(input_ids, attention_mask)
|
|
predicted_label = id2label[output.argmax().item()]
|
|
|
|
feature_dims = input_text.split(' ')
|
|
logger.debug(f'{params["model"]} Equal : {predicted_label == label}, Label: {label}, Predicted: {predicted_label}, Geom: {feature_dims}')
|
|
|
|
attention_matrix = att[-1]
|
|
attention_layer = attention_matrix[-1]
|
|
attention_mat = attention_layer[-1]
|
|
|
|
att_mat = attention_mat.detach().cpu().numpy()
|
|
fig, ax = plt.subplots()
|
|
cax = ax.matshow(att_mat, cmap='viridis')
|
|
fig.colorbar(cax)
|
|
plt.savefig(f'./graph/bert_attention_{i}.png')
|
|
plt.close()
|
|
|
|
print(f'test data count: {len(test_raw_dataset)}')
|
|
encoding = tokenizer([d['feature_text'] for d in test_raw_dataset], padding=True, truncation=True, max_length=max_length)
|
|
input_ids = encoding['input_ids']
|
|
attention_mask = encoding['attention_mask']
|
|
|
|
label2id = {label: i for i, label in enumerate(sorted(set(d['label'] for d in test_raw_dataset)))}
|
|
id2label = {v: k for k, v in label2id.items()}
|
|
labels = [label2id[d['label']] for d in test_raw_dataset]
|
|
|
|
test_dataset = EarthworkBertDataset(input_ids, attention_mask, labels)
|
|
test_dataloader = DataLoader(test_dataset, batch_size=32, shuffle=True)
|
|
|
|
correct = 0
|
|
total = 0
|
|
accuracies = []
|
|
with torch.no_grad():
|
|
for i, (input_ids, attention_mask, labels) in enumerate(tqdm(test_dataloader, desc="test")):
|
|
outputs, att = model(input_ids, attention_mask)
|
|
_, predicted = torch.max(outputs, 1)
|
|
total += len(labels)
|
|
correct += (predicted == labels).sum().item()
|
|
accuracies.append(correct / total)
|
|
y_score = torch.nn.functional.softmax(outputs, dim=1)
|
|
|
|
average_accuracy = correct / total
|
|
print(f'Accuracy of the network on the test data: {average_accuracy:.4f}')
|
|
|
|
|
|
if __name__ == '__main__':
|
|
models = ['earthwork_model_20240503_1650.pth','earthwork_model_20240503_1714.pth','earthwork_model_20240503_1716.pth','earthwork_model_20240503_1718.pth']
|
|
run_MLP_LSTM(models, 'MLP')
|
|
|
|
models = ['earthwork_model_20240503_1730.pth','earthwork_model_20240503_1732.pth','earthwork_model_20240503_1734.pth']
|
|
run_MLP_LSTM(models, 'LSTM')
|
|
|
|
models = ['earthwork_trans_model_20240503_2003.pth','earthwork_trans_model_20240503_2014.pth','earthwork_trans_model_20240503_2021.pth']
|
|
run_transform(models)
|
|
|
|
run_bert('earthwork_trans_model_20240504_0103.pth') |