Source code for malaya.spelling_correction.probability

import json
import re
from functools import partial
from collections import Counter
from malaya.text.function import case_of, check_ratio_upper_lower
from malaya.dictionary import is_english, is_malay
from malaya.text.rules import rules_normalizer
from malaya.text.bpe import SentencePieceTokenizer
from malaya.path import PATH_NGRAM, S3_PATH_NGRAM
from malaya.function import check_file
from malaya.spelling_correction.base import (
    _augment_vowel_prob,
    _augment_vowel,
    get_permulaan_hujung,
    norvig_method,
)
from malaya.text.tatabahasa import (
    alphabet,
    consonants,
    vowels,
    permulaan,
    hujung,
    stopword_tatabahasa,
)
from typing import List, Callable, Dict
import logging

logger = logging.getLogger(__name__)


[docs]class Spell: def __init__( self, sp_tokenizer, corpus, stemmer, add_norvig_method=True, replace_augmentation=False, validate_end_vowel=True, minlen=3, maxlen=15, **kwargs, ): """ Base class for probability spelling correction. Parameters ---------- add_norvig_method: bool, optional (default=True) Use norvig augmentation method. replace_augmentation: bool, optional (default=False) Use replace norvig augmentation method. validate_end_vowel: bool, optional (default=True) Will validate candidate end vowels with word input. A candidate is a valid candidate if, - last vowel same as last input vowel. - if last vowel is `a` and last last input vowel is one of `eo`. minlen: int, optional (default=3) minimum length of candidates maxlen: int, optional (default=15) if input word length longer than max length, will ignore to search candidates. """ self._sp_tokenizer = sp_tokenizer self._augment = _augment_vowel_prob self._add_norvig_method = add_norvig_method self._replace_augmentation = replace_augmentation self._corpus = corpus self._stemmer = stemmer self.WORDS = Counter(self._corpus) self.N = sum(self.WORDS.values()) self.validate_end_vowel = validate_end_vowel self.minlen = minlen self.maxlen = maxlen
[docs] def edit_step(self, word): """ Generate possible combination of an input. """ pseudo = _augment_vowel(word) deletes, transposes, inserts, replaces, fuzziness = [], [], [], [], [] if all([len(w) < self.maxlen for w in pseudo]): if self._add_norvig_method: _, deletes_, transposes_, inserts_, replaces_ = norvig_method(word) deletes.extend(deletes_) transposes.extend(transposes_) inserts.extend(inserts_) replaces.extend(replaces_) pseudo.extend(self._augment( word, add_norvig_method=self._add_norvig_method, sp_tokenizer=self._sp_tokenizer )) if len(word): # berape -> berapa, mne -> mna if word[-1] == 'e': inner = word[:-1] + 'a' fuzziness.append(inner) pseudo.extend( self._augment(inner, sp_tokenizer=self._sp_tokenizer) ) # pikir -> fikir if word[0] == 'p': inner = 'f' + word[1:] fuzziness.append(inner) pseudo.extend( self._augment(inner, sp_tokenizer=self._sp_tokenizer) ) if len(word) > 2: # bapak -> bapa, mintak -> minta, mntak -> mnta if word[-2:] == 'ak': inner = word[:-1] fuzziness.append(word[:-1]) pseudo.extend( self._augment(word[:-1], sp_tokenizer=self._sp_tokenizer) ) # hnto -> hantar, bako -> bkar, sabo -> sabar # tido -> tidur if word[-1] in 'oa' and word[-2] in consonants: inner = word[:-1] + 'ar' fuzziness.append(inner) pseudo.extend( self._augment(inner, sp_tokenizer=self._sp_tokenizer) ) inner = word[:-1] + 'ur' fuzziness.append(inner) pseudo.extend( self._augment(inner, sp_tokenizer=self._sp_tokenizer) ) # antu -> hantu, antar -> hantar if word[0] == 'a' and word[1] in consonants: inner = 'h' + word fuzziness.append(inner) pseudo.extend(_augment_vowel(inner)) pseudo.extend( self._augment(inner, sp_tokenizer=self._sp_tokenizer) ) # ptg -> ptng, dtg -> dtng if ( word[-3] in consonants and word[-2] in consonants and word[-1] == 'g' ): inner = word[:-1] + 'ng' fuzziness.append(inner) pseudo.extend( self._augment(inner, sp_tokenizer=self._sp_tokenizer) ) # igt -> ingt if word[1] == 'g' and word[2] in consonants: inner = word[0] + 'n' + word[1:] fuzziness.append(inner) pseudo.extend( self._augment(inner, sp_tokenizer=self._sp_tokenizer) ) # kecik -> kecil if word[-1] == 'k' and word[-2] in vowels: inner = word[:-1] + 'l' fuzziness.append(inner) pseudo.extend( self._augment(inner, sp_tokenizer=self._sp_tokenizer) ) results = fuzziness + pseudo if self._add_norvig_method: if self._replace_augmentation: return set(results + deletes + transposes + inserts + replaces) else: return set(results + deletes + transposes + inserts) else: return set(results)
[docs] def edits2(self, word): """ All edits that are two edits away from `word`. """ return (e2 for e1 in self.edit_step(word) for e2 in self.edit_step(e1))
[docs] def known(self, words): """ The subset of `words` that appear in the dictionary of WORDS. """ return set(w for w in words if w in self.WORDS or is_malay(w))
[docs] def edit_candidates(self, word): """ Generate candidates given a word. Parameters ---------- word: str Returns ------- result: List[str] """ ttt = self.known(self.edit_step(word) if len(word) <= self.maxlen else [word]) ttt = {i for i in ttt if not all([c in consonants for c in i])} or {word} ttt = {i for i in ttt if len(i) >= self.minlen and not is_english(i)} ttt = self.known([word]) | ttt if not len(ttt): ttt = {word} ttt = list(ttt) if self.validate_end_vowel: if word[-1] in vowels: ttt = [w for w in ttt if w[-1] == word[-1] or (w[-1] in 'a' and word[-1] in 'eo') or ( len(w) >= 2 and w[-2:] in 'arur' and word[-1] in 'o')] if not len(ttt): return [word] return ttt
[docs] def correct_text(self, text: str): """ Correct all the words within a text, returning the corrected text. Parameters ---------- text: str Returns ------- result: str """ string = re.sub(r'[ ]+', ' ', text).strip() splitted = string.split() for no, word in enumerate(splitted): if not word.isupper() and check_ratio_upper_lower(word) < 0.5: word = re.sub('[a-zA-Z]+', self.correct_match, word) splitted[no] = word return ' '.join(splitted)
[docs] def correct_match(self, match): """ Spell-correct word in re.match, and preserve proper upper, lower, title case. """ word = match.group() return self.correct_word(word)
[docs] def correct_word(self, word: str): """ Spell-correct word, and preserve proper upper, lower and title case. Parameters ---------- word: str Returns ------- result: str """ if len(word) < 2: return word return case_of(word)(self.correct(word.lower()))
[docs]class Probability(Spell): """ The SpellCorrector extends the functionality of the Peter Norvig's spell-corrector in http://norvig.com/spell-correct.html And improve it using some algorithms from Normalization of noisy texts in Malaysian online reviews, https://www.researchgate.net/publication/287050449_Normalization_of_noisy_texts_in_Malaysian_online_reviews Added custom vowels augmentation. """ def __init__(self, corpus, sp_tokenizer=None, stemmer=None, **kwargs): Spell.__init__(self, sp_tokenizer, corpus, stemmer, **kwargs) def tokens(text): return REGEX_TOKEN.findall(text.lower())
[docs] def P(self, word): """ Probability of `word`. """ return self.WORDS[word] / self.N
def most_probable(self, words): _known = self.known(words) if _known: return max(_known, key=self.P) else: return []
[docs] def correct(self, word: str, score_func=None, **kwargs): """ Most probable spelling correction for word. Parameters ---------- word: str Returns ------- result: str """ if is_english(word): return word if self._corpus.get(word, 0) > 5000: return word if is_malay(word): return word if word in stopword_tatabahasa: return word cp_word = word[:] word, hujung_result, permulaan_result = get_permulaan_hujung(word, stemmer=self._stemmer) if len(word) < 2: word = cp_word hujung_result = '' permulaan_result = '' combined = True if len(word): if word in rules_normalizer: word = rules_normalizer[word] elif self._corpus.get(word, 0) > 1000: pass else: candidates1 = self.edit_candidates(word) candidates2 = self.edit_candidates(cp_word) if score_func is None: word1 = max(candidates1, key=self.P) word2 = max(candidates2, key=self.P) if self.P(word1) > self.P(word2): word = word1 else: word = word2 combined = False else: candidates1_score = {w: score_func(w, **kwargs) for w in candidates1} candidates2_score = {w: score_func(w, **kwargs) for w in candidates2} word1 = max(candidates1_score, key=candidates1_score.get) word2 = max(candidates2_score, key=candidates2_score.get) if candidates1_score[word1] > candidates2_score[word2]: word = word1 else: word = word2 combined = False if len(hujung_result) and not word.endswith(hujung_result) and combined: word = word + hujung_result if len(permulaan_result) and len(word) > 1 and not word.startswith( permulaan_result) and combined: if permulaan_result[-1] == word[0]: word = permulaan_result + word[1:] else: word = permulaan_result + word return word
def elong_normalized_candidates(self, word, acc=None): if acc is None: acc = [] candidates = [w for w in set(word) if word.count(w) > 1] for c in candidates: _w = word.replace(c + c, c) if _w in acc: continue acc.append(_w) self.elong_normalized_candidates(_w, acc) return acc + [word] def best_elong_candidate(self, word): candidates = self.elong_normalized_candidates(word) best = self.most_probable(candidates) return best or word def normalize_elongated(self, word, **kwargs): return case_of(word)(self.best_elong_candidate(word.lower()))
[docs]class ProbabilityLM(Probability): """ The SpellCorrector extends the functionality of the Peter Norvig's with Language Model. spell-corrector in http://norvig.com/spell-correct.html And improve it using some algorithms from Normalization of noisy texts in Malaysian online reviews, https://www.researchgate.net/publication/287050449_Normalization_of_noisy_texts_in_Malaysian_online_reviews Added custom vowels augmentation. """ def __init__(self, language_model, corpus, sp_tokenizer=None, stemmer=None, **kwargs): Spell.__init__(self, sp_tokenizer, corpus, stemmer, **kwargs) self._language_model = language_model def score( self, word, string, index: int = -1, lookback: int = 3, lookforward: int = 3, **kwargs, ): if lookback == -1: lookback = index elif lookback > index: lookback = index if lookforward == -1: lookforward = 9999999 left_hand = string[index - lookback: index] right_hand = string[index + 1: index + 1 + lookforward] string = left_hand + [word] + right_hand score = self._language_model.score(' '.join(string)) s = f'word: {word}, string: {string}, index: {index}, lookback: {lookback}, lookforward: {lookforward}, score: {score}' logger.debug(s) return score
[docs] def correct( self, word: str, string: List[str], index: int = -1, lookback: int = 3, lookforward: int = 3, **kwargs, ): """ Correct a word within a text, returning the corrected word. Parameters ---------- word: str string: List[str] Entire string, `word` must a word inside `string`. index: int, optional (default=-1) index of word in the string, if -1, will try to use `string.index(word)`. lookback: int, optional (default=3) N words on the left hand side. if put -1, will take all words on the left hand side. longer left hand side will take longer to compute. lookforward: int, optional (default=3) N words on the right hand side. if put -1, will take all words on the right hand side. longer right hand side will take longer to compute. Returns ------- result: str """ if index < 0: index = string.index(word) else: if word.lower() not in string[index].lower(): raise ValueError(f'{word} is not a subset or equal to index of the {string}') return super().correct( word=word, score_func=self.score, string=string, index=index, lookback=lookback, lookforward=lookforward, **kwargs )
[docs] def correct_text( self, text: str, lookback: int = 3, lookforward: int = 3, ): """ Correct all the words within a text, returning the corrected text. Parameters ---------- text: str lookback: int, optional (default=3) N words on the left hand side. if put -1, will take all words on the left hand side. longer left hand side will take longer to compute. lookforward: int, optional (default=3) N words on the right hand side. if put -1, will take all words on the right hand side. longer right hand side will take longer to compute. Returns ------- result: str """ string = re.sub(r'[ ]+', ' ', text).strip() splitted = string.split() strings = [] for no, word in enumerate(splitted): if not word.isupper() and check_ratio_upper_lower(word) < 0.5: p = partial( self.correct_match, string=splitted, index=no, lookback=lookback, lookforward=lookforward, ) word = re.sub('[a-zA-Z]+', p, word) strings.append(word) return ' '.join(strings)
[docs] def correct_word( self, word: str, string: List[str], index: int = -1, lookback: int = 3, lookforward: int = 3, ): """ Spell-correct word, and preserve proper upper, lower and title case. Parameters ---------- word: str string: List[str] Tokenized string, `word` must a word inside `string`. index: int, optional(default=-1) index of word in the string, if -1, will try to use `string.index(word)`. lookback: int, optional (default=3) N words on the left hand side. if put -1, will take all words on the left hand side. longer left hand side will take longer to compute. lookforward: int, optional (default=3) N words on the right hand side. if put -1, will take all words on the right hand side. longer right hand side will take longer to compute. Returns ------- result: str """ return case_of(word)(self.correct( word.lower(), string=string, index=index, lookback=lookback, lookforward=lookforward))
[docs] def correct_match( self, match, string: List[str], index: int = -1, lookback: int = 3, lookforward: int = 3, ): """ Spell-correct word in re.match, and preserve proper upper, lower, title case. """ word = match.group() if len(word) < 2: return word return case_of(word)(self.correct( word.lower(), string=string, index=index, lookback=lookback, lookforward=lookforward))
def best_elong_candidate(self, word): candidates = self.elong_normalized_candidates(word) best = self.most_probable(candidates) return best or word def normalize_elongated(self, word): return case_of(word)(self.best_elong_candidate(word.lower()))
[docs]def load( language_model=None, sentence_piece: bool = False, stemmer=None, additional_words: Dict[str, int] = {'ni': 100000, 'pun': 100000, 'la': 100000}, **kwargs, ): """ Load a Probability Spell Corrector. Parameters ---------- language_model: Callable, optional (default=None) If not None, must an object with `score` method. sentence_piece: bool, optional (default=False) if True, reduce possible augmentation states using sentence piece. stemmer: Callable, optional (default=None) a Callable object, must have `stem_word` method. additional_words: Dict[str, int], (default={'ni': 100000, 'pun': 100000, 'la': 100000}) additional bias vocab. Returns ------- result: model List of model classes: * if passed `language_model` will return `malaya.spelling_correction.probability.ProbabilityLM`. * else will return `malaya.spelling_correction.probability.Probability`. """ if stemmer is not None: if not hasattr(stemmer, 'stem_word'): raise ValueError('stemmer must have `stem_word` method') tokenizer = None if sentence_piece: path = check_file( PATH_NGRAM['sentencepiece'], S3_PATH_NGRAM['sentencepiece'], **kwargs ) vocab = path['vocab'] vocab_model = path['model'] tokenizer = SentencePieceTokenizer(vocab_file=vocab, spm_model_file=vocab_model) path = check_file(PATH_NGRAM[1], S3_PATH_NGRAM[1], **kwargs) with open(path['model']) as fopen: corpus = json.load(fopen) corpus = {**corpus, **additional_words} if language_model is not None: if not hasattr(language_model, 'score'): raise ValueError('`language_model` must have `score` method.') return ProbabilityLM(language_model, corpus, tokenizer, stemmer, **kwargs) else: return Probability(corpus, tokenizer, stemmer, **kwargs)