Spaces:
Build error
Build error
| import paddle | |
| import numpy as np | |
| import random | |
| from paddlenlp.transformers import SkepTokenizer, SkepModel | |
| import gradio as gr | |
| from seqeval.metrics.sequence_labeling import get_entities | |
| label_ext_path = "./data/data121190/label_ext.dict" | |
| label_cls_path = "./data/data121242/label_cls.dict" | |
| ext_model_path = "./best_ext.pdparams" | |
| cls_model_path = "./best_cls.pdparams" | |
| def set_seed(seed): | |
| paddle.seed(seed) | |
| random.seed(seed) | |
| np.random.seed(seed) | |
| def format_print(results): | |
| for result in results: | |
| aspect, opinion = result[0], set(result[1:]) | |
| print(f"aspect: {aspect}, opinion: {opinion}\n") | |
| def decoding(text, tag_seq): | |
| assert len(text) == len(tag_seq), f"text len: {len(text)}, tag_seq len: {len(tag_seq)}" | |
| puncs = list(",.?;!,。?;!") | |
| splits = [idx for idx in range(len(text)) if text[idx] in puncs] | |
| prev = 0 | |
| sub_texts, sub_tag_seqs = [], [] | |
| for i, split in enumerate(splits): | |
| sub_tag_seqs.append(tag_seq[prev:split]) | |
| sub_texts.append(text[prev:split]) | |
| prev = split | |
| sub_tag_seqs.append(tag_seq[prev:]) | |
| sub_texts.append((text[prev:])) | |
| ents_list = [] | |
| for sub_text, sub_tag_seq in zip(sub_texts, sub_tag_seqs): | |
| ents = get_entities(sub_tag_seq, suffix=False) | |
| ents_list.append((sub_text, ents)) | |
| aps = [] | |
| no_a_words = [] | |
| for sub_tag_seq, ent_list in ents_list: | |
| sub_aps = [] | |
| sub_no_a_words = [] | |
| # print(ent_list) | |
| for ent in ent_list: | |
| ent_name, start, end = ent | |
| if ent_name == "Aspect": | |
| aspect = sub_tag_seq[start:end+1] | |
| sub_aps.append([aspect]) | |
| if len(sub_no_a_words) > 0: | |
| sub_aps[-1].extend(sub_no_a_words) | |
| sub_no_a_words.clear() | |
| else: | |
| ent_name == "Opinion" | |
| opinion = sub_tag_seq[start:end + 1] | |
| if len(sub_aps) > 0: | |
| sub_aps[-1].append(opinion) | |
| else: | |
| sub_no_a_words.append(opinion) | |
| if sub_aps: | |
| aps.extend(sub_aps) | |
| if len(no_a_words) > 0: | |
| aps[-1].extend(no_a_words) | |
| no_a_words.clear() | |
| elif sub_no_a_words: | |
| if len(aps) > 0: | |
| aps[-1].extend(sub_no_a_words) | |
| else: | |
| no_a_words.extend(sub_no_a_words) | |
| if no_a_words: | |
| no_a_words.insert(0, "None") | |
| aps.append(no_a_words) | |
| return aps | |
| def is_aspect_first(text, aspect, opinion_word): | |
| return text.find(aspect) <= text.find(opinion_word) | |
| def concate_aspect_and_opinion(text, aspect, opinion_words): | |
| aspect_text = "" | |
| for opinion_word in opinion_words: | |
| if is_aspect_first(text, aspect, opinion_word): | |
| aspect_text += aspect+opinion_word+"," | |
| else: | |
| aspect_text += opinion_word+aspect+"," | |
| aspect_text = aspect_text[:-1] | |
| return aspect_text | |
| def format_print(results): | |
| for result in results: | |
| aspect, opinions, sentiment = result["aspect"], result["opinions"], result["sentiment"] | |
| print(f"aspect: {aspect}, opinions: {opinions}, sentiment: {sentiment}") | |
| print() | |
| return f"aspect: {aspect}, opinions: {opinions}, sentiment: {sentiment}" | |
| def is_target_first(text, target, word): | |
| return text.find(target) <= text.find(word) | |
| def ext_load_dict(dict_path): | |
| with open(dict_path, "r", encoding="utf-8") as f: | |
| words = [word.strip() for word in f.readlines()] | |
| word2id = dict(zip(words, range(len(words)))) | |
| id2word = dict((v, k) for k, v in word2id.items()) | |
| return word2id, id2word | |
| def cls_load_dict(dict_path): | |
| with open(dict_path, "r", encoding="utf-8") as f: | |
| words = [word.strip() for word in f.readlines()] | |
| word2id = dict(zip(words, range(len(words)))) | |
| id2word = dict((v, k) for k, v in word2id.items()) | |
| return word2id, id2word | |
| def read(data_path): | |
| with open(data_path, "r", encoding="utf-8") as f: | |
| for line in f.readlines(): | |
| items = line.strip().split("\t") | |
| assert len(items) == 3 | |
| example = {"label": int( | |
| items[0]), "target_text": items[1], "text": items[2]} | |
| yield example | |
| def convert_example_to_feature(example, tokenizer, label2id, max_seq_len=512, is_test=False): | |
| encoded_inputs = tokenizer( | |
| example["target_text"], text_pair=example["text"], max_seq_len=max_seq_len, return_length=True) | |
| if not is_test: | |
| label = example["label"] | |
| return encoded_inputs["input_ids"], encoded_inputs["token_type_ids"], encoded_inputs["seq_len"], label | |
| return encoded_inputs["input_ids"], encoded_inputs["token_type_ids"], encoded_inputs["seq_len"] | |
| class SkepForTokenClassification(paddle.nn.Layer): | |
| def __init__(self, skep, num_classes=2, dropout=None): | |
| super(SkepForTokenClassification, self).__init__() | |
| self.num_classes = num_classes | |
| self.skep = skep | |
| self.dropout = paddle.nn.Dropout( | |
| dropout if dropout is not None else self.skep.config["hidden_dropout_prob"]) | |
| self.classifier = paddle.nn.Linear( | |
| self.skep.config["hidden_size"], num_classes) | |
| def forward(self, input_ids, token_type_ids=None, position_ids=None, attention_mask=None): | |
| sequence_output, _ = self.skep( | |
| input_ids, token_type_ids=token_type_ids, position_ids=position_ids, attention_mask=attention_mask) | |
| sequence_output = self.dropout(sequence_output) | |
| logits = self.classifier(sequence_output) | |
| return logits | |
| class SkepForSequenceClassification(paddle.nn.Layer): | |
| def __init__(self, skep, num_classes=2, dropout=None): | |
| super(SkepForSequenceClassification, self).__init__() | |
| self.num_classes = num_classes | |
| self.skep = skep | |
| self.dropout = paddle.nn.Dropout( | |
| dropout if dropout is not None else self.skep.config["hidden_dropout_prob"]) | |
| self.classifier = paddle.nn.Linear( | |
| self.skep.config["hidden_size"], num_classes) | |
| def forward(self, input_ids, token_type_ids=None, position_ids=None, attention_mask=None): | |
| _, pooled_output = self.skep(input_ids, token_type_ids=token_type_ids, | |
| position_ids=position_ids, attention_mask=attention_mask) | |
| pooled_output = self.dropout(pooled_output) | |
| logits = self.classifier(pooled_output) | |
| return logits | |
| # load dict | |
| model_name = "skep_ernie_1.0_large_ch" | |
| target1_dir = "./skepTokenizer" | |
| target2_dir = "./skepModel" | |
| ext_label2id, ext_id2label = ext_load_dict(label_ext_path) | |
| cls_label2id, cls_id2label = cls_load_dict(label_cls_path) | |
| tokenizer = SkepTokenizer.from_pretrained(target1_dir) | |
| print("label dict loaded.") | |
| # load ext model | |
| ext_state_dict = paddle.load(ext_model_path) | |
| ext_skep = SkepModel.from_pretrained(target2_dir) | |
| ext_model = SkepForTokenClassification(ext_skep, num_classes=len(ext_label2id)) | |
| ext_model.load_dict(ext_state_dict) | |
| print("extraction model loaded.") | |
| # load cls model | |
| cls_state_dict = paddle.load(cls_model_path) | |
| cls_skep = ext_skep | |
| cls_model = SkepForSequenceClassification( | |
| cls_skep, num_classes=len(cls_label2id)) | |
| cls_model.load_dict(cls_state_dict) | |
| print("classification model loaded.") | |
| def predict(input_text): | |
| ext_model.eval() | |
| cls_model.eval() | |
| # processing input text | |
| encoded_inputs = tokenizer(list(input_text), is_split_into_words=True, max_seq_len=max_seq_len,) | |
| input_ids = paddle.to_tensor([encoded_inputs["input_ids"]]) | |
| token_type_ids = paddle.to_tensor([encoded_inputs["token_type_ids"]]) | |
| # extract aspect and opinion words | |
| logits = ext_model(input_ids, token_type_ids=token_type_ids) | |
| predictions = logits.argmax(axis=2).numpy()[0] | |
| tag_seq = [ext_id2label[idx] for idx in predictions][1:-1] | |
| aps = decoding(input_text, tag_seq) | |
| # predict sentiment for aspect with cls_model | |
| results = [] | |
| for ap in aps: | |
| aspect = ap[0] | |
| opinion_words = list(set(ap[1:])) | |
| aspect_text = concate_aspect_and_opinion(input_text, aspect, opinion_words) | |
| encoded_inputs = tokenizer(aspect_text, text_pair=input_text, max_seq_len=max_seq_len, return_length=True) | |
| input_ids = paddle.to_tensor([encoded_inputs["input_ids"]]) | |
| token_type_ids = paddle.to_tensor([encoded_inputs["token_type_ids"]]) | |
| logits = cls_model(input_ids, token_type_ids=token_type_ids) | |
| prediction = logits.argmax(axis=1).numpy()[0] | |
| result = {"aspect": aspect, "opinions": opinion_words, "sentiment": cls_id2label[prediction]} | |
| results.append(result) | |
| # print results | |
| return format_print(results) | |
| max_seq_len = 1024 | |
| gr.Interface(inputs=["text"],outputs=["text"],fn= predict).launch() |