Source code for sgnlp.models.sentic_gcn.preprocess

import logging
import pathlib
import shutil
import string
import tempfile
import urllib.parse
from collections import namedtuple
from typing import Dict, List, Tuple, Union

import numpy as np
import spacy
import torch
from transformers import PreTrainedTokenizer, PreTrainedTokenizerFast, PretrainedConfig, PreTrainedModel

from .config import SenticGCNEmbeddingConfig, SenticGCNBertEmbeddingConfig
from .modeling import SenticGCNEmbeddingModel, SenticGCNBertEmbeddingModel
from .tokenization import SenticGCNTokenizer, SenticGCNBertTokenizer
from .utils import (
    load_and_process_senticnet,
    download_tokenizer_files,
    download_url_file,
    pad_and_truncate,
    generate_dependency_adj_matrix,
)


logging.basicConfig(level=logging.DEBUG)


SenticGCNData = namedtuple(
    "SenticGCNData", ["full_text", "aspect", "left_text", "full_text_tokens", "aspect_token_indexes"]
)
SenticGCNBertData = namedtuple(
    "SenticGCNBertData",
    ["full_text", "aspect", "left_text", "full_text_with_bert_tokens", "full_text_tokens", "aspect_token_indexes"],
)


[docs]class SenticGCNBasePreprocessor: """ Base preprocessor class provides initialization for spacy, senticnet, tokenizer and embedding model. Class is only meant to be inherited by derived preprocessor. """ def __init__( self, tokenizer: Union[str, PreTrainedTokenizer, PreTrainedTokenizerFast], embedding_model: Union[str, PreTrainedModel], tokenizer_class: PreTrainedTokenizer, embedding_config_class: PretrainedConfig, embedding_model_class: PreTrainedModel, config_filename: str = "config.json", model_filename: str = "pytorch_model.bin", spacy_pipeline: str = "en_core_web_sm", senticnet: Union[ str, Dict[str, float] ] = "https://storage.googleapis.com/sgnlp/models/sentic_gcn/senticnet.pickle", device: str = "cpu", ) -> None: # Set device self.device = ( torch.device("cuda" if torch.cuda.is_available() else "cpu") if not device else torch.device(device) ) self.spacy_pipeline = spacy.load(spacy_pipeline) try: # Load senticnet if isinstance(senticnet, dict): senticnet_ = senticnet elif senticnet.startswith("https://") or senticnet.startswith("http://"): with tempfile.TemporaryDirectory() as tmpdir: temp_dir = pathlib.Path(tmpdir) download_url_file(senticnet, temp_dir) saved_path = temp_dir.joinpath("senticnet.pickle") senticnet_ = load_and_process_senticnet(saved_preprocessed_senticnet_file_path=saved_path) shutil.rmtree(temp_dir, ignore_errors=True) elif senticnet.endswith(".pkl") or senticnet.endswith(".pickle"): senticnet_ = load_and_process_senticnet(saved_preprocessed_senticnet_file_path=senticnet) elif senticnet.endswith(".txt"): senticnet_ = load_and_process_senticnet(senticnet_file_path=senticnet) else: raise ValueError( """ Error initializing SenticNet! For downloading from cloud storage, please provide url to pickle file location (i.e. string url starting with https:// or http://). For processed SenticNet dictionary, please provide pickle file location (i.e. file with .pkl or .pickle extension). For raw SenticNet-5.0 file, please provide text file path (i.e. file with .txt extension). For externally created SenticNet dictionary, please provide a dictionary with words as key and sentic score as values. """ ) self.senticnet = senticnet_ except Exception as e: logging.error(e) raise Exception( """ Error initializing SenticNet! Please ensure that input is either a dictionary, a str path to a saved pickle file, an url to cloud storage or str path to the raw senticnet file. """ ) try: # Init Tokenizer if isinstance(tokenizer, PreTrainedTokenizer) or isinstance(tokenizer, PreTrainedTokenizerFast): # Load from external instance tokenizer_ = tokenizer else: if tokenizer.startswith("https://") or tokenizer.startswith("http://"): # Load from cloud # Download tokenizer files to temp dir with tempfile.TemporaryDirectory() as tmpdir: temp_dir = pathlib.Path(tmpdir) download_tokenizer_files(tokenizer, temp_dir) tokenizer_ = tokenizer_class.from_pretrained(temp_dir) shutil.rmtree(temp_dir, ignore_errors=True) else: # Load from local directory or from HuggingFace model repository tokenizer_ = tokenizer_class.from_pretrained(tokenizer) self.tokenizer = tokenizer_ except Exception as e: logging.error(e) raise Exception( """ Error initializing tokenizer! Please ensure that input tokenizer is either a PreTrainedTokenizer instance, an url to cloud storage folder, local folder or HuggingFace model name. """ ) try: # Init Embedding model if isinstance(embedding_model, PreTrainedModel): # Load from external instance embed_model = embedding_model else: if embedding_model.startswith("https://") or embedding_model.startswith("http://"): # Load from cloud config_url = urllib.parse.urljoin(embedding_model, config_filename) model_url = urllib.parse.urljoin(embedding_model, model_filename) embed_config = embedding_config_class.from_pretrained(config_url) embed_model = embedding_model_class.from_pretrained(model_url, config=embed_config) else: # Load from local folder embed_model_name = pathlib.Path(embedding_model) if embed_model_name.is_dir(): config_path = embed_model_name.joinpath(config_filename) model_path = embed_model_name.joinpath(model_filename) embed_config = embedding_config_class.from_pretrained(config_path) embed_model = embedding_model_class.from_pretrained(model_path, config=embed_config) else: # Load from HuggingFace model repository embed_config = embedding_config_class.from_pretrained(embedding_model) embed_model = embedding_model_class.from_pretrained(embedding_model, config=embed_config) self.embedding_model = embed_model self.embedding_model.to(self.device) except Exception as e: logging.error(e) raise Exception( """ Error initializing embedding model! Please ensure that input tokenizer is either a PreTrainedModel instance, an url to cloud storage folder, local folder or HuggingFace model name. """ )
[docs]class SenticGCNPreprocessor(SenticGCNBasePreprocessor): """ Class for preprocessing sentence(s) and its aspect(s) to a batch of tensors for the SenticGCNBertModel to predict on. """ def __init__( self, tokenizer: Union[ str, PreTrainedTokenizer ] = "https://storage.googleapis.com/sgnlp/models/sentic_gcn/senticgcn_tokenizer/", embedding_model: Union[ str, PreTrainedModel ] = "https://storage.googleapis.com/sgnlp/models/sentic_gcn/senticgcn_embedding_model/", config_filename: str = "config.json", model_filename: str = "pytorch_model.bin", spacy_pipeline: str = "en_core_web_sm", senticnet: Union[ str, Dict[str, float] ] = "https://storage.googleapis.com/sgnlp/models/sentic_gcn/senticnet.pickle", device: str = "cpu", ) -> None: super().__init__( tokenizer=tokenizer, embedding_model=embedding_model, tokenizer_class=SenticGCNTokenizer, embedding_config_class=SenticGCNEmbeddingConfig, embedding_model_class=SenticGCNEmbeddingModel, config_filename=config_filename, model_filename=model_filename, spacy_pipeline=spacy_pipeline, senticnet=senticnet, device=device, ) def _process_indices(self, data_batch: List[SenticGCNData]) -> List[torch.Tensor]: """ Private helper method to generate all indices and embeddings from list of input data required for model input. Args: data_batch (List[SenticGCNData]): list of processed inputs as SenticGCNData Returns: List[torch.Tensor]: return a list of tensors for model input """ all_text_indices = [] all_aspect_indices = [] all_left_indices = [] all_sdat_graph = [] all_data = [] max_len = 0 for data in data_batch: text_indices = self.tokenizer( data.full_text, return_tensors=None, return_attention_mask=False, return_token_type_ids=False, ) aspect_indices = self.tokenizer( data.aspect, return_tensors=None, return_attention_mask=False, return_token_type_ids=False, ) left_indices = self.tokenizer( data.left_text, return_tensors=None, return_attention_mask=False, return_token_type_ids=False, ) graph = generate_dependency_adj_matrix(data.full_text, data.aspect, self.senticnet, self.spacy_pipeline) all_data.append( { "text_indices": text_indices["input_ids"], "aspect_indices": aspect_indices["input_ids"], "left_indices": left_indices["input_ids"], "sdat_graph": graph, } ) if max_len < len(text_indices["input_ids"]): max_len = len(text_indices["input_ids"]) for item in all_data: (text_indices, aspect_indices, left_indices, sdat_graph,) = ( item["text_indices"], item["aspect_indices"], item["left_indices"], item["sdat_graph"], ) text_padding = [0] * (max_len - len(text_indices)) aspect_padding = [0] * (max_len - len(aspect_indices)) left_padding = [0] * (max_len - len(left_indices)) sdat_graph = np.pad( sdat_graph, ((0, max_len - len(text_indices)), (0, max_len - len(text_indices))), "constant", ) all_text_indices.append(text_indices + text_padding) all_aspect_indices.append(aspect_indices + aspect_padding) all_left_indices.append(left_indices + left_padding) all_sdat_graph.append(sdat_graph) all_text_indices = torch.tensor(all_text_indices).to(self.device) text_embeddings = self.embedding_model(all_text_indices) return [ all_text_indices, torch.tensor(all_aspect_indices).to(self.device), torch.tensor(all_left_indices).to(self.device), text_embeddings, torch.tensor(all_sdat_graph).to(self.device), ] def _process_inputs(self, data_batch: List[Dict[str, Union[str, List[str]]]]) -> List[SenticGCNData]: """ Private helper method to process input data batch. Input entries are repeated for each input aspect. If input aspect have multiple occurance in the sentence, each occurance is process as an entry. Args: data_batch (List[Dict[str, Union[str, List[str]]]]): list of dictionaries with 2 keys, 'sentence' and 'aspect'. 'sentence' value are strings and 'aspect' value is a list of accompanying aspect. Returns: List[SenticGCNData]: return list of processed inputs as SenticGCNData """ processed_inputs = [] for batch in data_batch: full_text = batch["sentence"].lower().strip() full_text_tokens = batch["sentence"].split() for aspect in batch["aspects"]: aspect = aspect.lower().strip() aspect_tokens = aspect.translate(str.maketrans("", "", string.punctuation)).split() aspect_indexes = [] for idx in range(len(full_text_tokens)): try: if ( " ".join(full_text_tokens[idx : idx + len(aspect_tokens)]) .translate(str.maketrans("", "", string.punctuation)) .lower() == aspect ): aspect_indexes.append(list(map(lambda x: idx + x, [*range(len(aspect_tokens))]))) except IndexError: continue aspect_idxs = [index for index in range(len(full_text)) if full_text.startswith(aspect, index)] for aspect_index, aspect_token_indexes in zip(aspect_idxs, aspect_indexes): left_text = full_text[:aspect_index].strip() processed_inputs.append( SenticGCNData( full_text=full_text, aspect=aspect, left_text=left_text, full_text_tokens=full_text_tokens, aspect_token_indexes=aspect_token_indexes, ) ) return processed_inputs
[docs] def __call__( self, data_batch: List[Dict[str, Union[str, List[str]]]] ) -> Tuple[List[SenticGCNData], List[torch.Tensor]]: """ Method to generate list of input tensors from a list of sentences and their accompanying list of aspect. Args: data_batch (List[Dict[str, Union[str, List[str]]]]): list of dictionaries with 2 keys, 'sentence' and 'aspect'. 'sentence' value are strings and 'aspect' value is a list of accompanying aspect. Returns: Tuple[List[SenticGCNData], List[torch.Tensor]]: return a list of ordered tensors for 'text_indices', 'aspect_indices', 'left_indices', 'text_embeddings' and 'sdat_graph'. """ processed_inputs = self._process_inputs(data_batch) return processed_inputs, self._process_indices(processed_inputs)
[docs]class SenticGCNBertPreprocessor(SenticGCNBasePreprocessor): """ Class for preprocessing sentence(s) and its aspect(s) to a batch of tensors for the SenticGCNBertModel to predict on. """ def __init__( self, tokenizer: Union[str, PreTrainedTokenizer] = "bert-base-uncased", embedding_model: Union[str, PreTrainedModel] = "bert-base-uncased", config_filename: str = "config.json", model_filename: str = "pytorch_model.bin", spacy_pipeline: str = "en_core_web_sm", senticnet: Union[ str, Dict[str, float] ] = "https://storage.googleapis.com/sgnlp/models/sentic_gcn/senticnet.pickle", max_len: int = 85, device: str = "cpu", ) -> None: super().__init__( tokenizer=tokenizer, embedding_model=embedding_model, tokenizer_class=SenticGCNBertTokenizer, embedding_config_class=SenticGCNBertEmbeddingConfig, embedding_model_class=SenticGCNBertEmbeddingModel, config_filename=config_filename, model_filename=model_filename, spacy_pipeline=spacy_pipeline, senticnet=senticnet, device=device, ) self.max_len = max_len def _process_indices(self, data_batch: List[SenticGCNBertData]) -> List[torch.Tensor]: """ Private helper method to generate all indices and embeddings from list of input data required for model input. Args: data_batch (List[SenticGCNBertData]): list of processed inputs as SenticGCNBertData Returns: List[torch.Tensor]: return a list of tensors for model input """ all_text_indices = [] all_aspect_indices = [] all_left_indices = [] all_text_bert_indices = [] all_bert_segment_indices = [] all_sdat_graph = [] for data in data_batch: text_indices = self.tokenizer( data.full_text, max_length=self.max_len, padding="max_length", truncation=True, add_special_tokens=False, return_tensors=None, return_attention_mask=False, return_token_type_ids=False, ) aspect_indices = self.tokenizer( data.aspect, max_length=self.max_len, padding="max_length", truncation=True, add_special_tokens=False, return_tensors=None, return_attention_mask=False, return_token_type_ids=False, ) left_indices = self.tokenizer( data.left_text, max_length=self.max_len, padding="max_length", truncation=True, add_special_tokens=False, return_tensors=None, return_attention_mask=False, return_token_type_ids=False, ) text_bert_indices = self.tokenizer( data.full_text_with_bert_tokens, max_length=self.max_len, padding="max_length", truncation=True, add_special_tokens=False, return_tensors=None, return_attention_mask=False, return_token_type_ids=False, ) text_len = np.sum(text_indices["input_ids"] != 0) aspect_len = np.sum(aspect_indices["input_ids"] != 0) concat_segment_indices = [0] * (text_len + 2) + [1] * (aspect_len + 1) concat_segment_indices = pad_and_truncate(concat_segment_indices, self.max_len) graph = generate_dependency_adj_matrix(data.full_text, data.aspect, self.senticnet, self.spacy_pipeline) sdat_graph = np.pad( graph, ( (0, self.max_len - graph.shape[0]), (0, self.max_len - graph.shape[0]), ), "constant", ) all_text_indices.append(text_indices["input_ids"]) all_aspect_indices.append(aspect_indices["input_ids"]) all_left_indices.append(left_indices["input_ids"]) all_text_bert_indices.append(text_bert_indices["input_ids"]) all_bert_segment_indices.append(concat_segment_indices) all_sdat_graph.append(sdat_graph) all_text_bert_indices = torch.tensor(all_text_bert_indices).to(self.device) all_bert_segment_indices = torch.tensor(np.array(all_bert_segment_indices)).to(self.device) text_embeddings = self.embedding_model(all_text_bert_indices, token_type_ids=all_bert_segment_indices)[ "last_hidden_state" ] return [ torch.tensor(all_text_indices).to(self.device), torch.tensor(all_aspect_indices).to(self.device), torch.tensor(all_left_indices).to(self.device), text_embeddings, torch.tensor(all_sdat_graph).to(self.device), ] def _process_inputs(self, data_batch: List[Dict[str, Union[str, List[str]]]]) -> List[SenticGCNBertData]: """ Private helper method to process input data batch. Input entries are repeated for each input aspect. If input aspect have multiple occurance in the sentence, each occurance is process as an entry. Args: data_batch (List[Dict[str, Union[str, List[str]]]]): list of dictionaries with 2 keys, 'sentence' and 'aspect'. 'sentence' value are strings and 'aspect' value is a list of accompanying aspect. Returns: List[SenticGCNBertData]: return list of processed inputs as SenticGCNBertData """ processed_inputs = [] for batch in data_batch: full_text = batch["sentence"].lower().strip() full_text_tokens = batch["sentence"].split() for aspect in batch["aspects"]: aspect = aspect.lower().strip() aspect_tokens = aspect.translate(str.maketrans("", "", string.punctuation)).split() aspect_indexes = [] for idx in range(len(full_text_tokens)): try: if ( " ".join(full_text_tokens[idx : idx + len(aspect_tokens)]) .translate(str.maketrans("", "", string.punctuation)) .lower() == aspect ): aspect_indexes.append(list(map(lambda x: idx + x, [*range(len(aspect_tokens))]))) except IndexError: continue aspect_idxs = [index for index in range(len(full_text)) if full_text.startswith(aspect, index)] for aspect_index, aspect_token_indexes in zip(aspect_idxs, aspect_indexes): left_text = full_text[:aspect_index].strip() full_text_with_bert_tokens = f"[CLS] {full_text} [SEP] {aspect} [SEP]" processed_inputs.append( SenticGCNBertData( full_text=full_text, aspect=aspect, left_text=left_text, full_text_with_bert_tokens=full_text_with_bert_tokens, full_text_tokens=full_text_tokens, aspect_token_indexes=aspect_token_indexes, ) ) return processed_inputs
[docs] def __call__( self, data_batch: List[Dict[str, Union[str, List[str]]]] ) -> Tuple[List[SenticGCNBertData], List[torch.Tensor]]: """ Method to generate list of input tensors from a list of sentences and their accompanying list of aspect. Args: data_batch (List[Dict[str, Union[str, List[str]]]]): list of dictionaries with 2 keys, 'sentence' and 'aspect'. 'sentence' value are strings and 'aspect' value is a list of accompanying aspect. Returns: Tuple[List[SenticGCNData], List[torch.Tensor]]: return a list of ordered tensors for 'text_indices', 'aspect_indices', 'left_indices', 'text_embeddings' and 'sdat_graph'. """ processed_inputs = self._process_inputs(data_batch) return processed_inputs, self._process_indices(processed_inputs)