Source code for malaya.model.extractive_summarization

import numpy as np
import itertools
from malaya.text.function import (
    summary_textcleaning,
    split_into_sentences,
    simple_textcleaning,
    STOPWORDS,
)
from malaya.cluster import cluster_words
from malaya.graph.pagerank import pagerank
from sklearn.metrics.pairwise import cosine_similarity
from scipy.sparse import vstack


def create_ngram(string, ngram=10):
    splitted = string.split()
    ngram_list = []
    for i in range(0, len(splitted)):
        lower_bound = i - ngram if i - ngram > 0 else 0
        upper_bound = (
            i + ngram if i + ngram < len(splitted) else len(splitted) - 1
        )
        new_word = splitted[lower_bound:upper_bound]
        new_string = ' '.join(new_word)
        if new_string == '':
            new_string = ' '
        ngram_list.append(new_string)

    return ngram_list, splitted


def corpus_checker(corpus):
    if not isinstance(corpus, list) and not isinstance(corpus, str):
        raise ValueError('corpus must be a list or a string')
    if isinstance(corpus, list):
        if not isinstance(corpus[0], str):
            raise ValueError('corpus must be list of strings')
    if isinstance(corpus, str):
        corpus = split_into_sentences(corpus, minimum_length=20)
    else:
        corpus = '. '.join(corpus)
        corpus = split_into_sentences(corpus, minimum_length=20)
    return corpus


[docs]class SKLearn: def __init__(self, model, vectorizer): self.model = model self.vectorizer = vectorizer def _vectorize_word( self, corpus, isi_penting, window_size, important_words=10, **kwargs ): corpus = corpus_checker(corpus) splitted_fullstop = [summary_textcleaning(i) for i in corpus] original_strings = [i[0] for i in splitted_fullstop] cleaned_strings = [i[1] for i in splitted_fullstop] ngram_list, splitted = create_ngram( ' '.join(cleaned_strings), ngram=window_size ) splitted = ' '.join(original_strings).split() if isi_penting: isi_penting = [summary_textcleaning(isi_penting)[1]] else: isi_penting = [' '.join(cleaned_strings)] t = ngram_list + isi_penting self.vectorizer.fit(t) freq = self.vectorizer.transform(ngram_list) freq_isi_penting = self.vectorizer.transform(isi_penting) if important_words > 0: if hasattr(self.vectorizer, 'idf_'): indices = np.argsort(self.vectorizer.idf_)[::-1] else: indices = np.argsort(np.asarray(freq.sum(axis=0))[0])[::-1] features = self.vectorizer.get_feature_names_out() top_words = [features[i] for i in indices[:important_words]] else: top_words = [] t = vstack([freq, freq_isi_penting]) self.model.fit(t) vectors = self.model.transform(freq) vectors_isi_penting = self.model.transform(freq_isi_penting) similar_isi_penting = cosine_similarity(vectors, vectors_isi_penting) scores = similar_isi_penting[:, 0] ranked_sentences = sorted( ((scores[i], s, i) for i, s in enumerate(splitted)), reverse=True ) return (splitted, ranked_sentences, top_words, cluster_words(top_words)) def _vectorize_sentence( self, corpus, isi_penting, important_words=10, retry=5, **kwargs ): corpus = corpus_checker(corpus) splitted_fullstop = [summary_textcleaning(i) for i in corpus] original_strings = [i[0] for i in splitted_fullstop] cleaned_strings = [i[1] for i in splitted_fullstop] if isi_penting: isi_penting = [summary_textcleaning(isi_penting)[1]] t = cleaned_strings + isi_penting else: t = cleaned_strings self.vectorizer.fit(t) freq = self.vectorizer.transform(cleaned_strings) if isi_penting: freq_isi_penting = self.vectorizer.transform(isi_penting) if important_words > 0: if hasattr(self.vectorizer, 'idf_'): indices = np.argsort(self.vectorizer.idf_)[::-1] else: indices = np.argsort(np.asarray(freq.sum(axis=0))[0])[::-1] features = self.vectorizer.get_feature_names_out() top_words = [features[i] for i in indices[:important_words]] else: top_words = [] if isi_penting: t = vstack([freq, freq_isi_penting]) else: t = freq self.model.fit(t) vectors = self.model.transform(freq) if isi_penting: vectors_isi_penting = self.model.transform(freq_isi_penting) similar = cosine_similarity(vectors, vectors) if isi_penting: similar_isi_penting = cosine_similarity( vectors, vectors_isi_penting ) similar = similar * similar_isi_penting else: similar[similar >= 0.99] = 0 scores = pagerank(similar + 1e-6, retry, **kwargs) ranked_sentences = sorted( ((scores[i], s, i) for i, s in enumerate(original_strings)), reverse=True, ) return ( original_strings, ranked_sentences, top_words, cluster_words(top_words), )
[docs] def word_level( self, corpus, isi_penting: str = None, window_size: int = 10, important_words: int = 10, **kwargs ): """ Summarize list of strings / string on word level. Parameters ---------- corpus: str / List[str] isi_penting: str, optional (default=None) if not None, will put priority based on `isi_penting`. window_size: int, (default=10) window size for each word. important_words: int, (default=10) number of important words. Returns ------- dict: {'top-words', 'cluster-top-words', 'score'} """ splitted, ranked_sentences, top_words, cluster_top_words = self._vectorize_word( corpus, isi_penting=isi_penting, window_size=window_size, important_words=important_words, **kwargs ) for score, s, rank in ranked_sentences: s = (splitted[rank], score) splitted[rank] = s return { 'top-words': top_words, 'cluster-top-words': cluster_top_words, 'score': splitted, }
[docs] def sentence_level( self, corpus, isi_penting: str = None, top_k: int = 3, important_words: int = 10, **kwargs ): """ Summarize list of strings / string on sentence level. Parameters ---------- corpus: str / List[str] isi_penting: str, optional (default=None) if not None, will put priority based on `isi_penting`. top_k: int, (default=3) number of summarized strings. important_words: int, (default=10) number of important words. Returns ------- dict: {'summary', 'top-words', 'cluster-top-words', 'score'} """ original_strings, ranked_sentences, top_words, cluster_top_words = self._vectorize_sentence( corpus, isi_penting=isi_penting, important_words=important_words, **kwargs ) for score, s, rank in ranked_sentences: s = original_strings[rank].split() s = [(w, score) for w in s] original_strings[rank] = s merged = list(itertools.chain(*original_strings)) summary = [r[1] for r in ranked_sentences[:top_k]] return { 'summary': ' '.join(summary), 'top-words': top_words, 'cluster-top-words': cluster_top_words, 'score': merged, }
[docs]class Doc2Vec: def __init__(self, wordvector): self.wordvector = wordvector def _vectorize_word( self, corpus, isi_penting, window_size, aggregation=np.mean, soft=False, **kwargs, ): corpus = corpus_checker(corpus) splitted_fullstop = [summary_textcleaning(i) for i in corpus] original_strings = [i[0] for i in splitted_fullstop] cleaned_strings = [i[1] for i in splitted_fullstop] ngram_list, splitted = create_ngram( ' '.join(cleaned_strings), ngram=window_size ) splitted = ' '.join(original_strings).split() if isi_penting: isi_penting = summary_textcleaning(isi_penting)[1] else: isi_penting = ' '.join(cleaned_strings) vectors = [] for string in ngram_list: inside = [] for token in string.split(): if token in self.wordvector._dictionary: v = self.wordvector.get_vector_by_name(token) else: if not soft: v = np.zeros((self.wordvector._embed_matrix.shape[1])) else: arr = np.array( [ self.wordvector._jarowinkler.similarity( token, k ) for k in self.wordvector.words ] ) idx = (-arr).argsort()[0] v = self.wordvector.get_vector_by_name( self.wordvector.words[idx] ) inside.append(v) vectors.append(aggregation(inside, axis=0)) vectors = np.array(vectors) cleaned_isi_penting = isi_penting vectors_isi_penting = [] for token in cleaned_isi_penting.split(): if token in self.wordvector._dictionary: vectors_isi_penting.append( self.wordvector.get_vector_by_name(token) ) else: if not soft: vectors_isi_penting.append( np.zeros((self.wordvector._embed_matrix.shape[1])) ) else: arr = np.array( [ self.wordvector._jarowinkler.similarity(token, k) for k in self.wordvector.words ] ) idx = (-arr).argsort()[0] vectors_isi_penting.append( self.wordvector.get_vector_by_name( self.wordvector.words[idx] ) ) vectors_isi_penting = aggregation(vectors_isi_penting, axis=0) vectors_isi_penting = np.expand_dims(vectors_isi_penting, 0) similar_isi_penting = cosine_similarity(vectors, vectors_isi_penting) scores = similar_isi_penting[:, 0] ranked_sentences = sorted( ((scores[i], s, i) for i, s in enumerate(splitted)), reverse=True ) return (splitted, ranked_sentences) def _vectorize_sentence( self, corpus, isi_penting, aggregation=np.mean, soft=False, retry=5, **kwargs ): corpus = corpus_checker(corpus) splitted_fullstop = [summary_textcleaning(i) for i in corpus] original_strings = [i[0] for i in splitted_fullstop] cleaned_strings = [i[1] for i in splitted_fullstop] vectors = [] for string in cleaned_strings: inside = [] for token in string.split(): if token in self.wordvector._dictionary: v = self.wordvector.get_vector_by_name(token) else: if not soft: v = np.zeros((self.wordvector._embed_matrix.shape[1])) else: arr = np.array( [ self.wordvector._jarowinkler.similarity( token, k ) for k in self.wordvector.words ] ) idx = (-arr).argsort()[0] v = self.wordvector.get_vector_by_name( self.wordvector.words[idx] ) inside.append(v) vectors.append(aggregation(inside, axis=0)) vectors = np.array(vectors) if isi_penting: cleaned_isi_penting = summary_textcleaning(isi_penting)[1] vectors_isi_penting = [] for token in cleaned_isi_penting.split(): if token in self.wordvector._dictionary: v = self.wordvector.get_vector_by_name(token) else: if not soft: v = np.zeros((self.wordvector._embed_matrix.shape[1])) else: arr = np.array( [ self.wordvector._jarowinkler.similarity( token, k ) for k in self.wordvector.words ] ) idx = (-arr).argsort()[0] v = self.wordvector.get_vector_by_name( self.wordvector.words[idx] ) vectors_isi_penting.append(v) vectors_isi_penting = aggregation(vectors_isi_penting, axis=0) vectors_isi_penting = np.expand_dims(vectors_isi_penting, 0) similar = cosine_similarity(vectors, vectors) if isi_penting: similar_isi_penting = cosine_similarity( vectors, vectors_isi_penting ) similar = similar * similar_isi_penting else: similar[similar >= 0.99] = 0 scores = pagerank(similar + 1e-6, retry, **kwargs) ranked_sentences = sorted( ((scores[i], s, i) for i, s in enumerate(original_strings)), reverse=True, ) return (original_strings, ranked_sentences)
[docs] def word_level( self, corpus, isi_penting: str = None, window_size: int = 10, aggregation=np.mean, soft: bool = False, **kwargs, ): """ Summarize list of strings / string on sentence level. Parameters ---------- corpus: str / List[str] isi_penting: str, optional (default=None) if not None, will put priority based on `isi_penting`. window_size: int, (default=10) window size for each word. aggregation: Callable, optional (default=numpy.mean) Aggregation method for Doc2Vec. soft: bool, optional (default=False) soft: bool, (default=True) if True, a word not in the dictionary will be replaced with nearest JaroWinkler ratio. if False, it will returned embedding full with zeros. Returns ------- dict: {'score'} """ splitted, ranked_sentences = self._vectorize_word( corpus, isi_penting=isi_penting, window_size=window_size, aggregation=np.mean, soft=soft, ) for score, s, rank in ranked_sentences: s = (splitted[rank], score) splitted[rank] = s return {'score': splitted}
[docs] def sentence_level( self, corpus, isi_penting: str = None, top_k: int = 3, aggregation=np.mean, soft: bool = False, **kwargs ): """ Summarize list of strings / string on sentence level. Parameters ---------- corpus: str / List[str] isi_penting: str, optional (default=None) if not None, will put priority based on `isi_penting`. top_k: int, (default=3) number of summarized strings. aggregation: Callable, optional (default=numpy.mean) Aggregation method for Doc2Vec. soft: bool, optional (default=False) soft: bool, (default=True) if True, a word not in the dictionary will be replaced with nearest JaroWinkler ratio. if False, it will returned embedding full with zeros. Returns ------- dict: {'summary', 'score'} """ original_strings, ranked_sentences = self._vectorize_sentence( corpus, isi_penting=isi_penting, aggregation=aggregation, soft=soft, **kwargs, ) for score, s, rank in ranked_sentences: s = original_strings[rank].split() s = [(w, score) for w in s] original_strings[rank] = s merged = list(itertools.chain(*original_strings)) summary = [r[1] for r in ranked_sentences[:top_k]] return {'summary': ' '.join(summary), 'score': merged}
[docs]class Encoder: def __init__(self, vectorizer): self.vectorizer = vectorizer def _batching(self, l, batch_size=10, **kwargs): vs = [] for i in range(0, len(l), batch_size): index = min(i + batch_size, len(l)) batch_x = l[i:index] vectors = self.vectorizer.vectorize(batch_x, **kwargs) vs.append(vectors) return np.concatenate(vs, axis=0) def _vectorize_word( self, corpus, isi_penting, window_size=10, important_words=10, batch_size=10, **kwargs, ): corpus = corpus_checker(corpus) splitted_fullstop = [summary_textcleaning(i) for i in corpus] original_strings = [i[0] for i in splitted_fullstop] cleaned_strings = [i[1] for i in splitted_fullstop] ngram_list, splitted = create_ngram( ' '.join(cleaned_strings), ngram=window_size ) splitted = ' '.join(original_strings).split() if isi_penting: isi_penting = [isi_penting] else: isi_penting = original_strings vectors = self._batching(ngram_list, batch_size=batch_size, **kwargs) vectors_isi_penting = self._batching( isi_penting, batch_size=batch_size, **kwargs, ) if 'DeepSkipThought' in str(self.vectorizer): top_words = [] else: if hasattr(self.vectorizer, 'attention') and important_words > 0: attentions = self.vectorizer.attention(corpus, **kwargs) flatten = list(itertools.chain(*attentions)) r = {} for f in flatten: c = simple_textcleaning(f[0]) if c in STOPWORDS: continue if c not in r: r[c] = f[1] else: r[c] += f[1] top_words = sorted(r, key=r.get, reverse=True)[ :important_words ] else: top_words = [] vectors_isi_penting = np.mean(vectors_isi_penting, axis=0) vectors_isi_penting = np.expand_dims(vectors_isi_penting, axis=0) similar_isi_penting = cosine_similarity(vectors, vectors_isi_penting) scores = similar_isi_penting[:, 0] ranked_sentences = sorted( ((scores[i], s, i) for i, s in enumerate(splitted)), reverse=True ) return (splitted, ranked_sentences, top_words, cluster_words(top_words)) def _vectorize_sentence( self, corpus, isi_penting, important_words=10, batch_size=10, retry=5, **kwargs, ): corpus = corpus_checker(corpus) splitted_fullstop = [summary_textcleaning(i) for i in corpus] original_strings = [i[0] for i in splitted_fullstop] cleaned_strings = [i[1] for i in splitted_fullstop] vectors = self._batching(cleaned_strings, batch_size=batch_size, **kwargs) if isi_penting: vectors_isi_penting = self._batching( [isi_penting], batch_size=batch_size, **kwargs, ) if 'DeepSkipThought' in str(self.vectorizer): top_words = [] else: if hasattr(self.vectorizer, 'attention'): attentions = self.vectorizer.attention(corpus, **kwargs) flatten = list(itertools.chain(*attentions)) r = {} for f in flatten: c = simple_textcleaning(f[0]) if c in STOPWORDS: continue if c not in r: r[c] = f[1] else: r[c] += f[1] top_words = sorted(r, key=r.get, reverse=True)[ :important_words ] else: top_words = [] similar = cosine_similarity(vectors, vectors) if isi_penting: similar_isi_penting = cosine_similarity( vectors, vectors_isi_penting ) similar = similar * similar_isi_penting else: similar[similar >= 0.99] = 0 scores = pagerank(similar + 1e-6, retry, **kwargs) ranked_sentences = sorted( ((scores[i], s, i) for i, s in enumerate(original_strings)), reverse=True, ) return ( original_strings, ranked_sentences, top_words, cluster_words(top_words), )
[docs] def word_level( self, corpus, isi_penting: str = None, window_size: int = 10, important_words: int = 10, batch_size: int = 16, **kwargs ): """ Summarize list of strings / string on word level. Parameters ---------- corpus: str / List[str] isi_penting: str, optional (default=None) if not None, will put priority based on `isi_penting`. window_size: int, (default=10) window size for each word. important_words: int, (default=10) number of important words. batch_size: int, (default=16) for each feed-forward, we only feed N size of texts for each batch. This to prevent OOM. Returns ------- dict: {'summary', 'top-words', 'cluster-top-words', 'score'} """ splitted, ranked_sentences, top_words, cluster_top_words = self._vectorize_word( corpus, isi_penting=isi_penting, window_size=window_size, important_words=important_words, batch_size=batch_size, **kwargs ) for score, s, rank in ranked_sentences: s = (splitted[rank], score) splitted[rank] = s return { 'top-words': top_words, 'cluster-top-words': cluster_top_words, 'score': splitted, }
[docs] def sentence_level( self, corpus, isi_penting: str = None, top_k: int = 3, important_words: int = 10, batch_size: int = 16, **kwargs, ): """ Summarize list of strings / string on sentence level. Parameters ---------- corpus: str / List[str] isi_penting: str, optional (default=None) if not None, will put priority based on `isi_penting`. top_k: int, (default=3) number of summarized strings. important_words: int, (default=10) number of important words. batch_size: int, (default=16) for each feed-forward, we only feed N size of texts for each batch. This to prevent OOM. Returns ------- dict: {'summary', 'top-words', 'cluster-top-words', 'score'} """ original_strings, ranked_sentences, top_words, cluster_top_words = self._vectorize_sentence( corpus, isi_penting=isi_penting, important_words=important_words, batch_size=batch_size, **kwargs, ) for score, s, rank in ranked_sentences: s = original_strings[rank].split() s = [(w, score) for w in s] original_strings[rank] = s merged = list(itertools.chain(*original_strings)) summary = [r[1] for r in ranked_sentences[:top_k]] return { 'summary': ' '.join(summary), 'top-words': top_words, 'cluster-top-words': cluster_top_words, 'score': merged, }