Source code for sgnlp.models.sentic_gcn.utils

import argparse
import json
import logging
import pickle
import random
import pathlib
import requests
import urllib
import math
import tempfile
import shutil
from typing import Dict, List, Tuple, Union

import numpy as np
import spacy
import torch
from torch.utils.data import random_split, Dataset
from transformers import PreTrainedTokenizer
from transformers.tokenization_utils_base import BatchEncoding

from .data_class import SenticGCNTrainArgs


[docs]def parse_args_and_load_config( config_path: str = "config/senticnet_gcn_config.json", ) -> SenticGCNTrainArgs: """Get config from config file using argparser Returns: SenticGCNTrainArgs: SenticGCNTrainArgs instance populated from config """ parser = argparse.ArgumentParser(description="SenticASGCN Training") parser.add_argument("--config", type=str, default=config_path) args = parser.parse_args() cfg_path = pathlib.Path(__file__).parent / args.config with open(cfg_path, "r") as cfg_file: cfg = json.load(cfg_file) sentic_asgcn_args = SenticGCNTrainArgs(**cfg) return sentic_asgcn_args
[docs]def set_random_seed(seed: int = 776) -> None: """Helper method to set random seeds for python, numpy and torch Args: seed (int, optional): seed value to set. Defaults to 776. """ random.seed(seed) np.random.seed(seed) torch.manual_seed(seed) torch.cuda.manual_seed(seed) torch.backends.cudnn.deterministic = True torch.backends.cudnn.benchmark = False
[docs]def download_tokenizer_files( base_url: str, save_folder: Union[str, pathlib.Path], files: List[str] = ["special_tokens_map.json", "tokenizer_config.json", "vocab.pkl"], ) -> None: """ Helper method to download files from online storage. Args: base_url (str): Url string to storage folder. save_folder (Union[str, pathlib.Path]): Local folder to save downloaded files. Folder will be created if it does not exists. """ file_paths = [urllib.parse.urljoin(base_url, file_name) for file_name in files] for file_path in file_paths: download_url_file(file_path, save_folder)
[docs]def download_url_file(url: str, save_folder: Union[str, pathlib.Path]) -> None: """ Helper method to download and save url file. Args: url (str): Url of file to download. save_folder (Union[str, pathlib.Path]): Folder to save downloaded file. Will be created if it does not exists. """ save_folder_path = pathlib.Path(save_folder) if not isinstance(save_folder, pathlib.Path) else save_folder save_folder_path.mkdir(exist_ok=True) fn_start_pos = url.rfind("/") + 1 file_name = url[fn_start_pos:] save_file_path = save_folder_path.joinpath(file_name) req = requests.get(url) if req.status_code == requests.codes.ok: with open(save_file_path, "wb") as f: for data in req: f.write(data) else: logging.error(f"Fail to request files from {url}.")
[docs]def pad_and_truncate( sequence: List[float], max_len: int, dtype: str = "int64", padding: str = "post", truncating: str = "post", value: int = 0, ) -> np.ndarray: """ Helper method for padding and truncating text and aspect segment. Args: sequence (List[float]): input sequence of indices max_len (int): maximum len to pad dtype (str, optional): data type to cast indices. Defaults to "int64". padding (str, optional): type of padding, 'pre' or 'post'. Defaults to "post". truncating (str, optional): type of truncating, 'pre' or 'post'. Defaults to "post". value (int, optional): value used for padding. Defaults to 0. Returns: np.ndarray: return a ndarray padded to the max_len """ seq_arr = (np.ones(max_len) * value).astype(dtype) trunc = sequence[-max_len:] if truncating == "pre" else sequence[:max_len] trunc = np.asarray(trunc, dtype=dtype) if padding == "post": seq_arr[: len(trunc)] = trunc else: seq_arr[-len(trunc) :] = trunc return seq_arr
[docs]def load_word_vec(word_vec_file_path: str, vocab: Dict[str, int], embed_dim: int = 300) -> Dict[str, np.asarray]: """ Helper method to load word vectors from file (e.g. GloVe) for each word in vocab. Args: word_vec_file_path (str): full file path to word vectors. vocab (Dict[str, int]): dictionary of vocab word as key and word index as values. embed_dim (int, optional): embedding dimension. Defaults to 300. Returns: Dict[str, np.asarray]: dictionary with words as key and word vectors as values. """ with open(word_vec_file_path, "r", encoding="utf-8", newline="\n", errors="ignore") as fin: word_vec = {} for line in fin: tokens = line.rstrip().split() word, vec = " ".join(tokens[:-embed_dim]), tokens[-embed_dim:] if word in vocab.keys(): word_vec[word] = np.asarray(vec, dtype="float32") return word_vec
[docs]def build_embedding_matrix( word_vec_file_path: str, vocab: Dict[str, int], embed_dim: int = 300, save_embed_matrix: bool = False, save_embed_file_path: str = None, ) -> np.ndarray: """ Helper method to generate an embedding matrix. Args: word_vec_file_path (str): full file path to word vectors. vocab (Dict[str, int]): dictionary of vocab word as key and word index as values. embed_dim (int, optional): embedding dimension. Defaults to 300. save_embed_matrix (bool, optional): flag to indicate if . Defaults to False. save_embed_directory (str, optional): [description]. Defaults to None. Returns: np.array: numpy array of embedding matrix """ embedding_matrix = np.zeros((len(vocab), embed_dim)) embedding_matrix[1, :] = np.random.uniform(-1 / np.sqrt(embed_dim), 1 / np.sqrt(embed_dim), (1, embed_dim)) word_vec = load_word_vec(word_vec_file_path, vocab, embed_dim) for word, idx in vocab.items(): vec = word_vec.get(word) if vec is not None: embedding_matrix[idx] = vec if save_embed_matrix: save_file_path = pathlib.Path(save_embed_file_path) if not save_file_path.exists(): save_file_path.parent.mkdir(exist_ok=True) with open(save_file_path, "wb") as fout: pickle.dump(embedding_matrix, fout) return embedding_matrix
[docs]def load_and_process_senticnet( senticnet_file_path: str = None, save_preprocessed_senticnet: bool = False, saved_preprocessed_senticnet_file_path: str = "senticnet.pkl", ) -> Dict[str, float]: """ Helper method to load and process senticnet. Default is SenticNet 5.0. If a saved preprocess senticnet file is available, and save flag is set to false, it will be loaded from file instead. Source: https://github.com/BinLiang-NLP/Sentic-GCN/tree/main/senticnet-5.0 Args: senticnet_file_path (str): File path to senticnet 5.0 file. save_preprocessed_senticnet (bool): Flag to indicate if processed senticnet should be saved. saved_preprocessed_senticnet_file_path: (str): File path to saved preprocessed senticnet file. Returns: Dict[str, float]: return dictionary with concept word as keys and intensity as values. """ saved_senticnet_file_path = pathlib.Path(saved_preprocessed_senticnet_file_path) if saved_senticnet_file_path.exists() and not save_preprocessed_senticnet: with open(saved_senticnet_file_path, "rb") as f: sentic_dict = pickle.load(f) else: senticnet_file_path = pathlib.Path(senticnet_file_path) sentic_dict = {} with open(senticnet_file_path, "r") as f: for line in f: line = line.strip() if not line: continue items = line.split("\t") if "_" in items[0] or "CONCEPT" == items[0]: continue # skip words with '_' sentic_dict[items[0]] = items[-1] if save_preprocessed_senticnet: saved_senticnet_file_path.parent.mkdir(exist_ok=True) with open(saved_senticnet_file_path, "wb") as f: pickle.dump(sentic_dict, f) return sentic_dict
[docs]def generate_dependency_adj_matrix(text: str, aspect: str, senticnet: Dict[str, float], spacy_pipeline) -> np.ndarray: """ Helper method to generate senticnet depdency adj matrix. Args: text (str): input text to process aspect (str): aspect from input text senticnet (Dict[str, float]): dictionary of preprocessed senticnet. See load_and_process_senticnet() spacy_pipeline : Spacy pretrained pipeline (e.g. 'en_core_web_sm') Returns: np.ndarray: return ndarry representing adj matrix. """ document = spacy_pipeline(text) seq_len = len(text.split()) matrix = np.zeros((seq_len, seq_len)).astype("float32") for token in document: sentic = float(senticnet[str(token)]) + 1.0 if str(token) in senticnet else 0 if str(token) in aspect: sentic += 1.0 if token.i < seq_len: matrix[token.i][token.i] = 1.0 * sentic for child in token.children: if str(child) in aspect: sentic += 1.0 if child.i < seq_len: matrix[token.i][child.i] = 1.0 * sentic matrix[child.i][token.i] = 1.0 * sentic return matrix
[docs]class SenticGCNDataset(Dataset): """ Data class for SenticGCN dataset. """ def __init__(self, data: List[Dict[str, torch.Tensor]]) -> None: self.data = data def __getitem__(self, index: int) -> Dict[str, torch.Tensor]: return self.data[index] def __len__(self): return len(self.data)
[docs]class SenticGCNDatasetGenerator: """ Main dataset generator class to preprocess raw dataset file. Set mode to 'train' to generate dataset for training. Set mode to 'test' to generate dataset for training from eval_args. """ def __init__(self, config: SenticGCNTrainArgs, tokenizer: PreTrainedTokenizer, mode: str = "train") -> None: self.config = config self.senticnet = self._load_senticnet(mode) self.spacy_pipeline = spacy.load( config.spacy_pipeline if mode == "train" else config.eval_args["spacy_pipeline"] ) self.tokenizer = tokenizer def _load_senticnet(self, mode: str) -> Dict[str, float]: if mode == "train": senticnet_ = load_and_process_senticnet( self.config.senticnet_word_file_path, self.config.save_preprocessed_senticnet, self.config.saved_preprocessed_senticnet_file_path, ) else: if self.config.eval_args["senticnet"].startswith("https://") or self.config.eval_args[ "senticnet" ].startswith("http://"): with tempfile.TemporaryDirectory() as tmpdir: temp_dir = pathlib.Path(tmpdir) download_url_file(self.config.eval_args["senticnet"], temp_dir) saved_path = temp_dir.joinpath("senticnet.pickle") senticnet_ = load_and_process_senticnet(saved_preprocessed_senticnet_file_path=saved_path) shutil.rmtree(temp_dir, ignore_errors=True) elif self.config.eval_args["senticnet"].endswith(".pkl") or self.config.eval_args["senticnet"].endswith( ".pickle" ): senticnet_ = load_and_process_senticnet( saved_preprocessed_senticnet_file_path=self.config.eval_args["senticnet"] ) else: raise ValueError( """ Error initializing SenticNet! Please only provide url to pickle file cloud storage location or local file path. """ ) return senticnet_ def _read_raw_dataset(self, files_path: List[str]) -> List[str]: """ Private helper method to read raw dataset files based on requested type (e.g. Train or Test). Args: file_path (str): file path to the dataset Returns: List[str]: list of str consisting of the full text, aspect and polarity index. """ all_lines = [] for dataset_file in files_path: with open(dataset_file, "r", encoding="utf-8", newline="\n", errors="ignore") as f: lines = f.readlines() all_lines = all_lines + lines return all_lines def _generate_senticgcn_dataset(self, raw_data: List[str]) -> Dict[str, List]: """ Data preprocess method to generate all indices required for SenticGCN model training. Args: raw_data (List[str]): list of text, aspect word and polarity read from raw dataset file. Returns: Dict[str, List]]: return a dictionary of dataset sub-type and their list of values. """ all_data = [] for i in range(0, len(raw_data), 3): # Process full text, aspect and polarity index text_left, _, text_right = [s.lower().strip() for s in raw_data[i].partition("$T$")] aspect = raw_data[i + 1].lower().strip() full_text = f"{text_left} {aspect} {text_right}" polarity = raw_data[i + 2].strip() # Process indices text_indices = self.tokenizer( full_text, return_tensors=None, return_attention_mask=False, return_token_type_ids=False, ) aspect_indices = self.tokenizer( aspect, return_tensors=None, return_attention_mask=False, return_token_type_ids=False, ) left_indices = self.tokenizer( text_left, return_tensors=None, return_attention_mask=False, return_token_type_ids=False, ) polarity = int(polarity) + 1 graph = generate_dependency_adj_matrix(full_text, aspect, self.senticnet, self.spacy_pipeline) all_data.append( { "text_indices": text_indices["input_ids"], "aspect_indices": aspect_indices["input_ids"], "left_indices": left_indices["input_ids"], "polarity": polarity, "sdat_graph": graph, } ) return all_data def _generate_senticgcnbert_dataset(self, raw_data: List[str]) -> Dict[str, List]: """ Data preprocess method to generate all indices required for SenticGCNBert model training. Args: raw_data (List[str]): List of text, aspect word and polarity read from raw dataset file. Returns: Dict[str, List]: return a dictionary of dataset sub-type and their values. """ all_data = [] max_len = self.config.max_len for i in range(0, len(raw_data), 3): # Process full text, aspect and polarity index text_left, _, text_right = [s.lower().strip() for s in raw_data[i].partition("$T$")] aspect = raw_data[i + 1].lower().strip() polarity = raw_data[i + 2].strip() full_text = f"{text_left} {aspect} {text_right}" full_text_with_bert_tokens = f"[CLS] {full_text} [SEP] {aspect} [SEP]" # Process indices text_indices = self.tokenizer( full_text, max_length=max_len, padding="max_length", truncation=True, add_special_tokens=False, return_tensors=None, return_attention_mask=False, return_token_type_ids=False, ) aspect_indices = self.tokenizer( aspect, max_length=max_len, padding="max_length", truncation=True, add_special_tokens=False, return_tensors=None, return_attention_mask=False, return_token_type_ids=False, ) left_indices = self.tokenizer( text_left, max_length=max_len, padding="max_length", truncation=True, add_special_tokens=False, return_tensors=None, return_attention_mask=False, return_token_type_ids=False, ) polarity = int(polarity) + 1 # Process bert related indices text_bert_indices = self.tokenizer( full_text_with_bert_tokens, max_length=max_len, padding="max_length", truncation=True, add_special_tokens=False, return_tensors=None, return_attention_mask=False, return_token_type_ids=False, ) text_len = np.sum(text_indices["input_ids"] != 0) aspect_len = np.sum(aspect_indices["input_ids"] != 0) # array of [0] for texts including [CLS] and [SEP] and [1] for aspect and ending [SEP] concat_segment_indices = [0] * (text_len + 2) + [1] * (aspect_len + 1) concat_segment_indices = pad_and_truncate(concat_segment_indices, max_len) # Process graph graph = generate_dependency_adj_matrix(full_text, aspect, self.senticnet, self.spacy_pipeline) sdat_graph = np.pad( graph, ( (0, max_len - graph.shape[0]), (0, max_len - graph.shape[0]), ), "constant", ) all_data.append( { "text_indices": torch.tensor(text_indices["input_ids"]), "aspect_indices": torch.tensor(aspect_indices["input_ids"]), "left_indices": torch.tensor(left_indices["input_ids"]), "text_bert_indices": torch.tensor(text_bert_indices["input_ids"]), "bert_segment_indices": torch.tensor(concat_segment_indices), "polarity": torch.tensor(polarity), "sdat_graph": torch.tensor(sdat_graph), } ) return all_data
[docs] def generate_datasets(self) -> Tuple[SenticGCNDataset, SenticGCNDataset, SenticGCNDataset]: """ Main wrapper method to generate datasets for both SenticGCN and SenticGCNBert based on config. Returns: Tuple[SenticGCNDataset, SenticGCNDataset, SenticGCNDataset]: return SenticGCNDataset instances for train/val/test data. """ # Read raw data from dataset files raw_train_data = self._read_raw_dataset(self.config.dataset_train) raw_test_data = self._read_raw_dataset(self.config.dataset_test) # Generate dataset dictionary if self.config.model == "senticgcn": train_data = self._generate_senticgcn_dataset(raw_train_data) test_data = self._generate_senticgcn_dataset(raw_test_data) else: train_data = self._generate_senticgcnbert_dataset(raw_train_data) test_data = self._generate_senticgcnbert_dataset(raw_test_data) # Train/Val/Test split if self.config.valset_ratio > 0: valset_len = int(len(train_data) * self.config.valset_ratio) train_data, val_data = random_split(train_data, (len(train_data) - valset_len, valset_len)) else: val_data = test_data return SenticGCNDataset(train_data), SenticGCNDataset(val_data), SenticGCNDataset(test_data)
[docs] def generate_test_datasets(self) -> SenticGCNDataset: """ Main wrapper method to generate test datasets for both SenticGCN and SenticGCNBert based on eval config. Returns: SenticGCNDataset: return SenticGCNDataset instance for test datasets """ raw_data = self._read_raw_dataset(self.config.eval_args["test_filename"]) if self.config.eval_args["model"] == "senticgcn": test_data = self._generate_senticgcn_dataset(raw_data) else: test_data = self._generate_senticgcnbert_dataset(raw_data) return SenticGCNDataset(test_data)
[docs]class BucketIterator: """ Iterator class for use with non-bert version of SenticGCN. """ def __init__( self, data: List[Dict[str, BatchEncoding]], batch_size: int, sort_key: str = "text_indices", shuffle=True, sort=True, ): self.shuffle = shuffle self.sort = sort self.sort_key = sort_key self.batches = self._sort_and_pad(data, batch_size) self.batch_len = len(self.batches) def _sort_and_pad(self, data: List[Dict[str, List]], batch_size: int) -> List[Dict[str, List[torch.Tensor]]]: """ Private method to sort and pad input dataset. Args: data (List[Dict[str, List]]): input dataset batch_size (int): batch size to split dataset Returns: List[Dict[str, List[torch.Tensor]]]: return list of dictionary of dataset batches """ num_batch = int(math.ceil(len(data) / batch_size)) if self.sort: sorted_data = sorted(data, key=lambda x: len(x[self.sort_key])) else: sorted_data = data batches = [] for i in range(num_batch): batches.append(self._pad_data(sorted_data[i * batch_size : (i + 1) * batch_size])) return batches def _pad_data(self, batch_data: Dict[str, List]) -> Dict[str, List[torch.Tensor]]: """ Private method to each sub dataset to max length for their specific batch Args: batch_data (Dict[str, List]): dictionary of sub dataset and their list of values Returns: Dict[str, List[torch.Tensor]]: return a dictionary of list of tensor values """ batch_text_indices = [] batch_aspect_indices = [] batch_left_indices = [] batch_polarity = [] batch_sdat_graph = [] max_len = max([len(t[self.sort_key]) for t in batch_data]) for item in batch_data: (text_indices, aspect_indices, left_indices, polarity, sdat_graph,) = ( item["text_indices"], item["aspect_indices"], item["left_indices"], item["polarity"], item["sdat_graph"], ) # Calculate padding length text_padding = [0] * (max_len - len(text_indices)) aspect_padding = [0] * (max_len - len(aspect_indices)) left_padding = [0] * (max_len - len(left_indices)) batch_text_indices.append(text_indices + text_padding) batch_aspect_indices.append(aspect_indices + aspect_padding) batch_left_indices.append(left_indices + left_padding) batch_polarity.append(polarity) batch_sdat_graph.append( np.pad(sdat_graph, ((0, max_len - len(text_indices)), (0, max_len - len(text_indices))), "constant") ) return { "text_indices": torch.tensor(batch_text_indices), "aspect_indices": torch.tensor(batch_aspect_indices), "left_indices": torch.tensor(batch_left_indices), "polarity": torch.tensor(batch_polarity), "sdat_graph": torch.tensor(np.array(batch_sdat_graph)), } def __iter__(self): if self.shuffle: random.shuffle(self.batches) for idx in range(self.batch_len): yield self.batches[idx]