import re
import dateparser
import itertools
import math
import numpy as np
from malaya.num2word import to_cardinal
from malaya.text.function import (
is_laugh,
is_mengeluh,
multireplace,
case_of,
PUNCTUATION,
)
from malaya.dictionary import is_english, is_malay, is_malaysia_location
from malaya.text.regex import (
_past_date_string,
_now_date_string,
_future_date_string,
_yesterday_tomorrow_date_string,
_depan_date_string,
_expressions,
_left_datetime,
_right_datetime,
_today_time,
_left_datetodaytime,
_right_datetodaytime,
_left_yesterdaydatetime,
_right_yesterdaydatetime,
_left_yesterdaydatetodaytime,
_right_yesterdaydatetodaytime,
)
from malaya.text.tatabahasa import (
date_replace,
consonants,
sounds,
bulan,
)
from malaya.text.normalization import (
_remove_postfix,
_normalize_title,
_is_number_regex,
_string_to_num,
_replace_compound,
cardinal,
digit_unit,
rom_to_int,
ordinal,
fraction,
money,
ignore_words,
digit,
unpack_english_contractions,
repeat_word,
replace_laugh,
replace_mengeluh,
replace_betul,
digits,
normalize_numbers_with_shortform,
)
from malaya.text.rules import rules_normalizer, rules_normalizer_rev
from malaya.cluster import cluster_words
from malaya.function import validator
from malaya.preprocessing import Tokenizer, demoji
from typing import Callable, List
import logging
logger = logging.getLogger(__name__)
def normalized_entity(normalized):
normalized = re.sub(_expressions['ic'], '', normalized)
money_ = re.findall(_expressions['money'], normalized)
money_ = [(s, money(s)[1]) for s in money_]
dates_ = re.findall(_expressions['date'], normalized)
past_date_string_ = re.findall(_past_date_string, normalized)
logger.debug(f'past_date_string_: {past_date_string_}')
now_date_string_ = re.findall(_now_date_string, normalized)
logger.debug(f'now_date_string_: {now_date_string_}')
future_date_string_ = re.findall(_future_date_string, normalized)
logger.debug(f'future_date_string_: {future_date_string_}')
yesterday_date_string_ = re.findall(
_yesterday_tomorrow_date_string, normalized
)
logger.debug(f'yesterday_date_string_: {yesterday_date_string_}')
depan_date_string_ = re.findall(_depan_date_string, normalized)
logger.debug(f'depan_date_string_: {depan_date_string_}')
today_time_ = re.findall(_today_time, normalized)
logger.debug(f'today_time_: {today_time_}')
time_ = re.findall(_expressions['time'], normalized)
logger.debug(f'time_: {time_}')
left_datetime_ = [
f'{i[0]} {i[1]}' for i in re.findall(_left_datetime, normalized)
]
logger.debug(f'left_datetime_: {left_datetime_}')
right_datetime_ = [
f'{i[0]} {i[1]}' for i in re.findall(_right_datetime, normalized)
]
logger.debug(f'right_datetime_: {left_datetime_}')
today_left_datetime_ = [
f'{i[0]} {i[1]}' for i in re.findall(_left_datetodaytime, normalized)
]
logger.debug(f'today_left_datetime_: {today_left_datetime_}')
today_right_datetime_ = [
f'{i[0]} {i[1]}' for i in re.findall(_right_datetodaytime, normalized)
]
logger.debug(f'today_right_datetime_: {today_right_datetime_}')
left_yesterdaydatetime_ = [
f'{i[0]} {i[1]}'
for i in re.findall(_left_yesterdaydatetime, normalized)
]
logger.debug(f'left_yesterdaydatetime_: {left_yesterdaydatetime_}')
right_yesterdaydatetime_ = [
f'{i[0]} {i[1]}'
for i in re.findall(_right_yesterdaydatetime, normalized)
]
logger.debug(f'right_yesterdaydatetime_: {right_yesterdaydatetime_}')
left_yesterdaydatetodaytime_ = [
f'{i[0]} {i[1]}'
for i in re.findall(_left_yesterdaydatetodaytime, normalized)
]
logger.debug(f'left_yesterdaydatetodaytime_: {left_yesterdaydatetodaytime_}')
right_yesterdaydatetodaytime_ = [
f'{i[0]} {i[1]}'
for i in re.findall(_right_yesterdaydatetodaytime, normalized)
]
logger.debug(f'right_yesterdaydatetodaytime_: {right_yesterdaydatetodaytime_}')
dates_ = (
dates_
+ past_date_string_
+ now_date_string_
+ future_date_string_
+ yesterday_date_string_
+ depan_date_string_
+ time_
+ today_time_
+ left_datetime_
+ right_datetime_
+ today_left_datetime_
+ today_right_datetime_
+ left_yesterdaydatetime_
+ right_yesterdaydatetime_
+ left_yesterdaydatetodaytime_
+ right_yesterdaydatetodaytime_
)
dates_ = [d.replace('.', ':') for d in dates_ if not isinstance(d, tuple)]
dates_ = [multireplace(s, date_replace) for s in dates_]
dates_ = [re.sub(r'[ ]+', ' ', s).strip() for s in dates_]
dates_ = cluster_words(dates_)
dates_ = {s: dateparser.parse(s) for s in dates_}
money_ = {s[0]: s[1] for s in money_}
return dates_, money_
def check_repeat(word):
if len(word) < 2:
return word, 1
if word[-1].isdigit() and not word[-2].isdigit():
repeat = int(word[-1])
word = word[:-1]
else:
repeat = 1
if repeat < 1:
repeat = 1
return word, repeat
def groupby(string):
results = []
for word in string.split():
if not (
_is_number_regex(word)
or re.findall(_expressions['url'], word)
or re.findall(_expressions['money'], word.lower())
or re.findall(_expressions['number'], word)
):
word = ''.join([''.join(s)[:2] for _, s in itertools.groupby(word)])
results.append(word)
return ' '.join(results)
def put_spacing_num(string):
string = re.sub('[A-Za-z]+', lambda ele: ' ' + ele[0] + ' ', string).split()
for i in range(len(string)):
if _is_number_regex(string[i]):
string[i] = ' '.join([to_cardinal(int(n)) for n in string[i]])
string = ' '.join(string)
return re.sub(r'[ ]+', ' ', string).strip()
[docs]class Normalizer:
def __init__(self, tokenizer, speller=None, stemmer=None):
self._tokenizer = tokenizer
self._speller = speller
self._stemmer = stemmer
self._demoji = None
self._compiled = {
k.lower(): re.compile(_expressions[k]) for k, v in _expressions.items()
}
[docs] def normalize(
self,
string: str,
normalize_text: bool = True,
normalize_url: bool = False,
normalize_email: bool = False,
normalize_year: bool = True,
normalize_telephone: bool = True,
normalize_date: bool = True,
normalize_time: bool = True,
normalize_emoji: bool = True,
normalize_elongated: bool = True,
normalize_hingga: bool = True,
normalize_pada_hari_bulan: bool = True,
normalize_fraction: bool = True,
normalize_money: bool = True,
normalize_units: bool = True,
normalize_percent: bool = True,
normalize_ic: bool = True,
normalize_number: bool = True,
normalize_x_kali: bool = True,
normalize_cardinal: bool = True,
normalize_ordinal: bool = True,
normalize_entity: bool = True,
expand_contractions: bool = True,
check_english_func=is_english,
check_malay_func=is_malay,
translator: Callable = None,
language_detection_word: Callable = None,
acceptable_language_detection: List[str] = ['EN', 'CAPITAL', 'NOT_LANG'],
segmenter=None,
text_scorer=None,
text_scorer_window: int = 2,
not_a_word_threshold: float = 1e-4,
dateparser_settings={'TIMEZONE': 'GMT+8'},
**kwargs,
):
"""
Normalize a string.
Parameters
----------
string : str
normalize_text: bool, optional (default=True)
if True, will try to replace shortforms with internal corpus.
normalize_url: bool, optional (default=False)
if True, replace `://` with empty and `.` with `dot`.
`https://huseinhouse.com` -> `https huseinhouse dot com`.
normalize_email: bool, optional (default=False)
if True, replace `@` with `di`, `.` with `dot`.
`husein.zol05@gmail.com` -> `husein dot zol kosong lima di gmail dot com`.
normalize_year: bool, optional (default=True)
if True, `tahun 1987` -> `tahun sembilan belas lapan puluh tujuh`.
if True, `1970-an` -> `sembilan belas tujuh puluh an`.
if False, `tahun 1987` -> `tahun seribu sembilan ratus lapan puluh tujuh`.
normalize_telephone: bool, optional (default=True)
if True, `no 012-1234567` -> `no kosong satu dua, satu dua tiga empat lima enam tujuh`
normalize_date: bool, optional (default=True)
if True, `01/12/2001` -> `satu disember dua ribu satu`.
if True, `Jun 2017` -> `satu Jun dua ribu tujuh belas`.
if True, `2017 Jun` -> `satu Jun dua ribu tujuh belas`.
if False, `2017 Jun` -> `01/06/2017`.
if False, `Jun 2017` -> `01/06/2017`.
normalize_time: bool, optional (default=True)
if True, `pukul 2.30` -> `pukul dua tiga puluh minit`.
if False, `pukul 2.30` -> `'02:00:00'`
normalize_emoji: bool, (default=True)
if True, `🔥` -> `emoji api`
Load from `malaya.preprocessing.demoji`.
normalize_elongated: bool, optional (default=True)
if True, `betuii` -> `betui`.
normalize_hingga: bool, optional (default=True)
if True, `2011 - 2019` -> `dua ribu sebelas hingga dua ribu sembilan belas`
normalize_pada_hari_bulan: bool, optional (default=True)
if True, `pada 10/4` -> `pada sepuluh hari bulan empat`
normalize_fraction: bool, optional (default=True)
if True, `10 /4` -> `sepuluh per empat`
normalize_money: bool, optional (default=True)
if True, `rm10.4m` -> `sepuluh juta empat ratus ribu ringgit`
normalize_units: bool, optional (default=True)
if True, `61.2 kg` -> `enam puluh satu perpuluhan dua kilogram`
normalize_percent: bool, optional (default=True)
if True, `0.8%` -> `kosong perpuluhan lapan peratus`
normalize_ic: bool, optional (default=True)
if True, `911111-01-1111` -> `sembilan satu satu satu satu satu sempang kosong satu sempang satu satu satu satu`
normalize_number: bool, optional (default=True)
if True `0123` -> `kosong satu dua tiga`
normalize_x_kali: bool, optional (default=True)
if True `10x` -> 'sepuluh kali'
normalize_cardinal: bool, optional (default=True)
if True, `123` -> `seratus dua puluh tiga`
normalize_ordinal: bool, optional (default=True)
if True, `ke-123` -> `keseratus dua puluh tiga`
normalize_entity: bool, optional (default=True)
normalize entities, only effect `date`, `datetime`, `time` and `money` patterns string only.
expand_contractions: bool, optional (default=True)
expand english contractions.
check_english_func: Callable, optional (default=malaya.text.function.is_english)
function to check a word in english dictionary, default is malaya.text.function.is_english.
this parameter also will be use for malay text normalization.
check_malay_func: Callable, optional (default=malaya.text.function.is_malay)
function to check a word in malay dictionary, default is malaya.text.function.is_malay.
translator: Callable, optional (default=None)
function to translate EN word to MS word.
language_detection_word: Callable, optional (default=None)
function to detect language for each words to get better translation results.
acceptable_language_detection: List[str], optional (default=['EN', 'CAPITAL', 'NOT_LANG'])
only translate substrings if the results from `language_detection_word` is in `acceptable_language_detection`.
segmenter: Callable, optional (default=None)
function to segmentize word.
If provide, it will expand a word, apaitu -> apa itu
text_scorer: Callable, optional (default=None)
function to validate upper word.
If lower case score is higher or equal than upper case score, will choose lower case.
text_scorer_window: int, optional (default=2)
size of lookback and lookforward to validate upper word.
not_a_word_threshold: float, optional (default=1e-4)
assume a word is not a human word if score lower than `not_a_word_threshold`.
only usable if passed `text_scorer` parameter.
dateparser_settings: Dict, optional (default={'TIMEZONE': 'GMT+8'})
default dateparser setting, check support settings at https://dateparser.readthedocs.io/en/latest/
Returns
-------
result: {'normalize', 'date', 'money'}
"""
if normalize_emoji:
if self._demoji is None:
logger.info('caching malaya.preprocessing.demoji inside normalizer')
self._demoji = demoji().demoji
result_demoji = self._demoji(string)
else:
result_demoji = None
if expand_contractions:
logger.debug(f'before expand_contractions: {string}')
string = unpack_english_contractions(string)
logger.debug(f'after expand_contractions: {string}')
tokenized = self._tokenizer(string)
s = f'tokenized: {tokenized}'
logger.debug(s)
string = ' '.join(tokenized)
if normalize_elongated:
logger.debug(f'before normalize_elongated: {string}')
normalized = []
got_speller = hasattr(self._speller, 'normalize_elongated')
for word in string.split():
word_lower = word.lower()
if (
len(re.findall(r'(.)\1{1}', word))
and not word[0].isupper()
and not word_lower.startswith('ke-')
and not len(re.findall(_expressions['email'], word))
and not len(re.findall(_expressions['url'], word))
and not len(re.findall(_expressions['hashtag'], word))
and not len(re.findall(_expressions['phone'], word))
and not len(re.findall(_expressions['money'], word))
and not len(re.findall(_expressions['date'], word))
and not len(re.findall(_expressions['ic'], word))
and not len(re.findall(_expressions['user'], word))
and not len(re.findall(_expressions['number'], word))
and not _is_number_regex(word)
and check_english_func is not None
and not check_english_func(word_lower)
):
word = self._compiled['normalize_elong'].sub(r'\1\1', groupby(word))
if got_speller:
word = self._speller.normalize_elongated(word)
normalized.append(word)
string = ' '.join(normalized)
logger.debug(f'after normalize_elongated: {string}')
if normalize_text:
logger.debug(f'before normalize_text: {string}')
string = replace_laugh(string)
string = replace_mengeluh(string)
string = replace_betul(string)
string = _replace_compound(string)
logger.debug(f'after normalize_text: {string}')
result, normalized = [], []
spelling_correction = {}
spelling_correction_condition = {}
tokenized = self._tokenizer(string)
index = 0
while index < len(tokenized):
word = tokenized[index]
word_lower = word.lower()
word_upper = word.upper()
word_title = word.title()
first_c = word[0].isupper()
s = f'index: {index}, word: {word}, queue: {result}'
logger.debug(s)
if word in PUNCTUATION:
s = f'index: {index}, word: {word}, condition punct'
logger.debug(s)
result.append(word)
index += 1
continue
normalized.append(rules_normalizer.get(word_lower, word_lower))
if word_lower in ignore_words:
s = f'index: {index}, word: {word}, condition ignore words'
logger.debug(s)
result.append(word)
index += 1
continue
if re.findall(_expressions['ic'], word_lower):
s = f'index: {index}, word: {word}, condition IC'
logger.debug(s)
if normalize_ic:
splitted = word.split('-')
ics = [digit(s) for s in splitted]
word = ' sempang '.join(ics)
result.append(word)
index += 1
continue
if re.findall(_expressions['hashtag'], word_lower):
s = f'index: {index}, word: {word}, condition hashtag'
logger.debug(s)
result.append(word)
index += 1
continue
if re.findall(_expressions['url'], word_lower):
s = f'index: {index}, word: {word}, condition url'
logger.debug(s)
if normalize_url:
word = word.replace('://', ' ').replace('.', ' dot ')
word = put_spacing_num(word)
word = word.replace(
'https',
'HTTPS').replace(
'http',
'HTTP').replace(
'www',
'WWW')
result.append(word)
index += 1
continue
if re.findall(_expressions['email'], word_lower):
s = f'index: {index}, word: {word}, condition email'
logger.debug(s)
if normalize_email:
word = (
word.replace('://', ' ')
.replace('.', ' dot ')
.replace('@', ' di ')
)
word = put_spacing_num(word)
result.append(word)
index += 1
continue
if re.findall(_expressions['phone'], word_lower):
s = f'index: {index}, word: {word}, condition phone'
logger.debug(s)
if normalize_telephone:
splitted = word.split('-')
if len(splitted) == 2:
left = put_spacing_num(splitted[0])
right = put_spacing_num(splitted[1])
word = f'{left}, {right}'
result.append(word)
index += 1
continue
if re.findall(_expressions['user'], word_lower):
s = f'index: {index}, word: {word}, condition user'
logger.debug(s)
result.append(word)
index += 1
continue
if normalize_emoji and word_lower in result_demoji:
s = f'index: {index}, word: {word}, condition emoji'
r = f'emoji {result_demoji[word_lower]}'
if index - 1 >= 0:
if tokenized[index - 1] == '.':
r = r[0].upper() + r[1:]
elif len(result) and result[-1][-1] == ',':
pass
elif tokenized[index - 1] != ',':
r = f', {r}'
if index + 1 < len(tokenized):
if tokenized[index + 1] == '.':
pass
elif tokenized[index + 1] != ',':
r = f'{r} ,'
result.append(r)
index += 1
continue
if text_scorer is not None:
score = math.exp(text_scorer(word_lower))
s = f'index: {index}, word: {word}, score: {score}, text_scorer is not None'
logger.debug(s)
if score <= not_a_word_threshold:
s = f'index: {index}, word: {word}, text_scorer(word_lower) <= not_a_word_threshold'
logger.debug(s)
result.append(word)
index += 1
continue
if (
first_c
and not len(re.findall(_expressions['money'], word_lower))
and not len(re.findall(_expressions['date'], word_lower))
):
s = f'index: {index}, word: {word}, condition not in money and date'
logger.debug(s)
if word_lower in rules_normalizer and normalize_text:
result.append(case_of(word)(rules_normalizer[word_lower]))
index += 1
continue
elif word_upper not in ['KE', 'PADA', 'RM', 'SEN', 'HINGGA']:
norm_title = _normalize_title(word) if normalize_text else word
if norm_title != word:
s = f'index: {index}, word: {word}, norm_title != word'
logger.debug(s)
result.append(norm_title)
index += 1
continue
titled = True
if len(word) > 1 and text_scorer is not None:
s = f'index: {index}, word: {word}, condition text_scorer is not None'
logger.debug(s)
l = ' '.join(result[-text_scorer_window:])
if len(l):
lower = f'{l} {word_lower}'
title = f'{l} {word_title}'
normal = f'{l} {word}'
upper = f'{l} {word_upper}'
else:
lower = word_lower
title = word_title
normal = word
upper = word_upper
if index + 1 < len(tokenized):
r = ' '.join(tokenized[index + 1: index + 1 + text_scorer_window])
if len(r):
lower = f'{lower} {r}'
title = f'{title} {r}'
normal = f'{normal} {r}'
upper = f'{upper} {r}'
lower_score = text_scorer(lower)
title_score = text_scorer(title)
normal_score = text_scorer(normal)
upper_score = text_scorer(upper)
s = f'index: {index}, word: {word}, lower: {lower} , normal: {normal} , lower_score: {lower_score}, title_score: {title_score}, normal_score: {normal_score}, upper_score: {upper_score}'
logger.debug(s)
scores = [lower_score, title_score, upper_score]
max_score = max(scores)
argmax = np.argmax(scores)
if max_score > normal_score:
s = f'index: {index}, word: {word}, max_score > normal_score'
logger.debug(s)
if argmax == 0:
word = word_lower
titled = False
elif argmax == 1:
word = word_title
elif argmax == 2:
word = word_upper
if titled:
s = f'index: {index}, word: {word}, condition titled'
logger.debug(s)
result.append(word)
index += 1
continue
if check_english_func is not None and len(word) > 1:
s = f'index: {index}, word: {word}, condition check english'
logger.debug(s)
found = False
word_, repeat = check_repeat(word)
word_lower_ = word_.lower()
selected_word = word_
if check_english_func(word_lower_):
found = True
# suree -> sure -> detect
elif len(word_lower_) > 1 and len(word_) > 1 and word_lower_[-1] == word_lower_[-2] and check_english_func(word_lower_[:-1]):
found = True
selected_word = word_[:-1]
if found:
if translator is not None and language_detection_word is None:
s = f'index: {index}, word: {word_}, condition to translate inside checking'
logger.debug(s)
translated = translator(selected_word)
if len(translated) >= len(selected_word) * 3:
logger.debug(f'reject translation, {selected_word} -> {translated}')
elif ', United States' in translated:
logger.debug(f'reject translation, {word_} -> {translated}')
else:
selected_word = translated
result.append(repeat_word(case_of(word)(selected_word), repeat))
index += 1
continue
if check_malay_func is not None and len(word) > 1:
s = f'index: {index}, word: {word}, condition check malay'
logger.debug(s)
if word_lower not in ['pada', 'ke', 'tahun', 'thun']:
if check_malay_func(word_lower):
result.append(word)
index += 1
continue
# kenapaa -> kenapa -> detect
elif len(word_lower) > 1 and word_lower[-1] == word_lower[-2] and check_malay_func(word_lower[:-1]):
result.append(word[:-1])
index += 1
continue
if is_malaysia_location(word):
s = f'index: {index}, word: {word}, is_malaysia_location'
logger.debug(s)
result.append(word_lower.title())
index += 1
continue
if word_lower in rules_normalizer and normalize_text:
s = f'index: {index}, word: {word}, condition in early rules normalizer'
logger.debug(s)
result.append(case_of(word)(rules_normalizer[word_lower]))
index += 1
continue
if len(word) > 2 and normalize_text and check_english_func is not None and not check_english_func(
word):
s = f'index: {index}, word: {word}, condition len(word) > 2 and norm text'
logger.debug(s)
if word[-2] in consonants and word[-1] == 'e':
word = word[:-1] + 'a'
if word[0] == 'x' and len(
word) > 1 and normalize_text and check_english_func is not None and not check_english_func(word):
s = f'index: {index}, word: {word}, condition word[0] == `x` and len(word) > 1 and norm text'
logger.debug(s)
result_string = 'tak '
word = word[1:]
else:
s = f'index: {index}, word: {word}, condition else for (word[0] == `x` and len(word) > 1 and norm text)'
logger.debug(s)
result_string = ''
if normalize_ordinal and word_lower == 'ke' and index < (len(tokenized) - 2):
s = f'index: {index}, word: {word}, condition ke'
logger.debug(s)
if tokenized[index + 1] == '-' and _is_number_regex(
tokenized[index + 2]
):
result.append(
ordinal(
word + tokenized[index + 1] + tokenized[index + 2]
)
)
index += 3
continue
elif tokenized[index + 1] == '-' and re.match(
'.*(V|X|I|L|D)', tokenized[index + 2]
):
result.append(
ordinal(
word
+ tokenized[index + 1]
+ str(rom_to_int(tokenized[index + 2]))
)
)
index += 3
continue
else:
result.append('ke')
index += 1
continue
if normalize_hingga and _is_number_regex(word) and index < (len(tokenized) - 2):
s = f'index: {index}, word: {word}, condition hingga'
logger.debug(s)
if tokenized[index + 1] == '-' and _is_number_regex(
tokenized[index + 2]
):
result.append(
to_cardinal(_string_to_num(word))
+ ' hingga '
+ to_cardinal(_string_to_num(tokenized[index + 2]))
)
index += 3
continue
if normalize_pada_hari_bulan and word_lower == 'pada' and index < (len(tokenized) - 3):
s = f'index: {index}, word: {word}, condition pada hari bulan'
logger.debug(s)
if (
_is_number_regex(tokenized[index + 1])
and tokenized[index + 2] in '/-'
and _is_number_regex(tokenized[index + 3])
):
result.append(
'pada %s hari bulan %s'
% (
to_cardinal(_string_to_num(tokenized[index + 1])),
to_cardinal(_string_to_num(tokenized[index + 3])),
)
)
index += 4
continue
if (
word_lower in ['tahun', 'thun']
and index < (len(tokenized) - 1)
and normalize_year
):
s = f'index: {index}, word: {word}, condition tahun'
logger.debug(s)
if (
_is_number_regex(tokenized[index + 1])
and len(tokenized[index + 1]) == 4
):
t = tokenized[index + 1]
if t[1] != '0':
l = to_cardinal(int(t[:2]))
r = to_cardinal(int(t[2:]))
c = f'{l} {r}'
else:
c = to_cardinal(int(t))
if (
index < (len(tokenized) - 3)
and tokenized[index + 2] == '-'
and tokenized[index + 3].lower() == 'an'
):
end = 'an'
plus = 4
else:
end = ''
plus = 2
result.append(f'tahun {c}{end}')
index += plus
continue
if normalize_fraction and _is_number_regex(word) and index < (len(tokenized) - 2):
s = f'index: {index}, word: {word}, condition fraction'
logger.debug(s)
if tokenized[index + 1] == '/' and _is_number_regex(
tokenized[index + 2]
):
result.append(
fraction(
word + tokenized[index + 1] + tokenized[index + 2]
)
)
index += 3
continue
if (
tokenized[index + 1] == '-'
and tokenized[index + 2].lower() == 'an'
and normalize_year
and len(word) == 4
):
t = word
if t[1] != '0':
l = to_cardinal(int(t[:2]))
r = to_cardinal(int(t[2:]))
c = f'{l} {r}'
else:
c = to_cardinal(int(t))
result.append(f'{c}an')
index += 3
continue
if re.findall(_expressions['money'], word_lower):
s = f'index: {index}, word: {word}, condition money'
logger.debug(s)
if normalize_money:
money_, _ = money(word)
result.append(money_)
if index < (len(tokenized) - 1):
if tokenized[index + 1].lower() in ('sen', 'cent'):
index += 2
else:
index += 1
else:
index += 1
else:
result.append(word)
index += 1
continue
if (
re.findall(_expressions['temperature'], word_lower)
or re.findall(_expressions['distance'], word_lower)
or re.findall(_expressions['volume'], word_lower)
or re.findall(_expressions['duration'], word_lower)
or re.findall(_expressions['weight'], word_lower)
):
s = f'index: {index}, word: {word}, condition units'
logger.debug(s)
if normalize_units:
word = word.replace(' ', '')
word = digit_unit(word)
result.append(word)
index += 1
continue
if re.findall(_expressions['percent'], word_lower):
s = f'index: {index}, word: {word}, condition percent'
logger.debug(s)
if normalize_percent:
word = word.replace('%', '')
word = cardinal(word) + ' peratus'
result.append(word)
index += 1
continue
if re.findall(_expressions['date'], word_lower):
s = f'index: {index}, word: {word}, condition date'
logger.debug(s)
word = word_lower
word = multireplace(word, date_replace)
word = re.sub(r'[ ]+', ' ', word).strip()
try:
s = f'index: {index}, word: {word}, parsing date'
logger.debug(s)
parsed = dateparser.parse(word, settings=dateparser_settings)
if parsed:
word = parsed.strftime('%d/%m/%Y')
if normalize_date:
day, month, year = word.split('/')
day = cardinal(day)
month = bulan[int(month)].title()
year = cardinal(year)
word = f'{day} {month} {year}'
except Exception as e:
logger.warning(str(e))
result.append(word)
index += 1
continue
if (
re.findall(_expressions['time'], word_lower)
or re.findall(_expressions['time_pukul'], word_lower)
):
s = f'index: {index}, word: {word}, condition time'
logger.debug(s)
word = word_lower
word = multireplace(word, date_replace)
word = re.sub(r'[ ]+', ' ', word).strip()
if index - 1 >= 0 and tokenized[index - 1].lower() in ['pkul', 'pukul', 'pkl']:
prefix = ''
else:
prefix = 'pukul '
try:
s = f'index: {index}, word: {word}, parsing time'
logger.debug(s)
parsed = dateparser.parse(word.replace('.', ':'))
if parsed:
word = parsed.strftime('%H:%M:%S')
hour, minute, second = word.split(':')
if normalize_time:
hour = cardinal(hour)
if int(minute) > 0:
minute = cardinal(minute)
minute = f'{minute} minit'
else:
minute = ''
if int(second) > 0:
second = cardinal(second)
second = f'{second} saat'
else:
second = ''
word = f'{prefix}{hour} {minute} {second}'
else:
pukul = f'{prefix}{hour}'
if int(minute) > 0:
pukul = f'{pukul}.{minute}'
if int(second) > 0:
pukul = f'{pukul}:{second}'
word = pukul
word = re.sub(r'[ ]+', ' ', word).strip()
except Exception as e:
logger.warning(str(e))
result.append(word)
index += 1
continue
if (
re.findall(_expressions['number'], word_lower)
and word_lower[0] == '0'
and '.' not in word_lower
):
s = f'index: {index}, word: {word}, condition digit and word[0] == `0`'
logger.debug(s)
if normalize_number:
word = digit(word)
result.append(word)
index += 1
continue
if (
len(word_lower) >= 2
and word_lower[-1] == 'x'
and re.findall(_expressions['number'], word_lower[:-1])
and '.' not in word_lower
):
s = f'index: {index}, word: {word}, condition x kali'
logger.debug(s)
word = word[:-1]
if normalize_x_kali:
word = cardinal(word)
word = f'{word} kali'
result.append(word)
index += 1
continue
if normalize_cardinal:
cardinal_ = cardinal(word)
if cardinal_ != word:
s = f'index: {index}, word: {word}, condition cardinal'
logger.debug(s)
result.append(cardinal_)
index += 1
continue
if normalize_ordinal:
normalized_ke = ordinal(word)
if normalized_ke != word:
s = f'index: {index}, word: {word}, condition ordinal'
logger.debug(s)
result.append(normalized_ke)
index += 1
continue
if len(re.findall(_expressions['number'], word)):
s = f'index: {index}, word: {word}, condition is number'
logger.debug(s)
result.append(word)
index += 1
continue
if re.findall(_expressions['number_with_shortform'], word_lower):
s = f'index: {index}, word: {word_lower}, condition is number_with_shortform'
logger.debug(s)
if normalize_cardinal:
w = normalize_numbers_with_shortform(word_lower)
else:
w = word
result.append(w)
index += 1
continue
if segmenter is not None:
s = f'index: {index}, word: {word}, condition to segment'
logger.debug(s)
if word[-1] in digits:
word_ = word[:-1]
d = word[-1]
else:
word_ = word
d = ''
segmentized = segmenter(word_) + d
words = segmentized.split()
else:
words = [word]
for no_word, word in enumerate(words):
if self._stemmer is not None:
s = f'index: {index}, word: {word}, self._stemmer is not None'
word, end_result_string = _remove_postfix(
word,
stemmer=self._stemmer,
validate_word=False,
)
if len(end_result_string) and end_result_string[0] in digits:
word = word + end_result_string[0]
end_result_string = end_result_string[1:]
else:
end_result_string = ''
if normalize_text:
word, repeat = check_repeat(word)
else:
repeat = 1
s = f'index: {index}, word: {word}, end_result_string: {end_result_string}, repeat: {repeat}'
logger.debug(s)
if normalize_text:
s = f'index: {index}, word: {word}, condition normalize text'
logger.debug(s)
if word in sounds:
selected = sounds[word]
elif word in rules_normalizer:
selected = rules_normalizer[word]
# betuii -> betui -> betul
elif len(word) > 1 and word[-1] == word[-2] and word[:-1] in rules_normalizer:
selected = rules_normalizer[word[:-1]]
# betuii -> betui -> betul
elif len(word) > 1 and word[-1] == word[-2] and word[:-1] in rules_normalizer_rev:
selected = word[:-1]
else:
selected = word
if translator is not None and language_detection_word is None:
s = f'index: {index}, word: {word}, condition to translate'
logger.debug(s)
translated = translator(word)
if len(translated) >= len(word) * 3:
logger.debug(f'reject translation, {word} -> {translated}')
elif ', United States' in translated:
logger.debug(f'reject translation, {word} -> {translated}')
elif translated in PUNCTUATION:
logger.debug(f'reject translation, {word} -> {translated}')
else:
selected = translated
if selected == word and self._speller:
s = f'index: {index}, word: {word}, condition to spelling correction'
logger.debug(s)
spelling_correction[len(result)] = selected
else:
selected = word
selected = repeat_word(selected, repeat)
spelling_correction_condition[len(result)] = [
repeat, result_string, end_result_string]
result.append(result_string + selected + end_result_string)
index += 1
for index, selected in spelling_correction.items():
logger.debug(f'spelling correction, index: {index}, selected: {selected}')
selected = self._speller.correct(
selected, string=result, index=index, **kwargs
)
repeat, result_string, end_result_string = spelling_correction_condition[index]
selected = repeat_word(selected, repeat)
selected = result_string + selected + end_result_string
result[index] = selected
result = ' '.join(result)
normalized = ' '.join(normalized)
result = re.sub(r'[ ]+', ' ', result).strip()
normalized = re.sub(r'[ ]+', ' ', normalized).strip()
if translator is not None and language_detection_word is not None:
splitted = result.split()
result_langs = language_detection_word(splitted)
logger.debug(f'condition translator and language_detection_word, {result_langs}')
new_result, temp, temp_lang = [], [], []
for no_r, r in enumerate(result_langs):
s = f'index: {no_r}, label: {r}, word: {splitted[no_r]}, queue: {new_result}'
logger.debug(s)
if r in acceptable_language_detection and not is_laugh(
splitted[no_r]) and not is_mengeluh(splitted[no_r]):
temp.append(splitted[no_r])
temp_lang.append(r)
else:
if len(temp):
if 'EN' in temp_lang:
logger.debug(
f'condition len(temp) and EN in temp_lang, {temp}, {temp_lang}')
translated = translator(' '.join(temp))
new_result.extend(translated.split())
else:
logger.debug(
f'condition len(temp) and EN not in temp_lang, {temp}, {temp_lang}')
new_result.extend(temp)
temp = []
temp_lang = []
new_result.append(splitted[no_r])
if len(temp):
if 'EN' in temp_lang:
logger.debug(f'condition len(temp) and EN in temp_lang, {temp}, {temp_lang}')
translated = translator(' '.join(temp))
new_result.extend(translated.split())
else:
logger.debug(
f'condition len(temp) and EN not in temp_lang, {temp}, {temp_lang}')
new_result.extend(temp)
result = ' '.join(new_result)
if normalize_entity:
dates_, money_ = normalized_entity(normalized)
else:
dates_, money_ = {}, {}
return {'normalize': result, 'date': dates_, 'money': money_}
[docs]def load(
speller: Callable = None,
stemmer: Callable = None,
**kwargs,
):
"""
Load a Normalizer using any spelling correction model.
Parameters
----------
speller: Callable, optional (default=None)
function to correct spelling, must have `correct` or `normalize_elongated` method.
stemmer: Callable, optional (default=None)
function to stem, must have `stem_word` method.
If provide stemmer, will accurately to stem kata imbuhan akhir.
Returns
-------
result: malaya.normalizer.rules.Normalizer class
"""
validator.validate_object_methods(
speller, ['correct', 'normalize_elongated'], 'speller'
)
if stemmer is not None:
if not hasattr(stemmer, 'stem_word'):
raise ValueError('stemmer must have `stem_word` method')
tokenizer = Tokenizer(**kwargs).tokenize
return Normalizer(tokenizer=tokenizer, speller=speller, stemmer=stemmer)