import numpy as np
import json
import tensorflow as tf
from scipy.spatial.distance import cdist
from sklearn.neighbors import NearestNeighbors
from malaya.text.jarowinkler import JaroWinkler
from malaya.function import check_file
from malaya.path import PATH_WORDVECTOR, S3_PATH_WORDVECTOR
from malaya.function import get_device, generate_session
from malaya.text.calculator import Calculator
from herpetologist import check_type
from typing import List, Tuple
_wordvector_availability = {
'news': {
'Size (MB)': 200.2,
'Vocab size': 195466,
'lowercase': True,
'Description': 'pretrained on cleaned Malay news size 256',
},
'wikipedia': {
'Size (MB)': 781.7,
'Vocab size': 763350,
'lowercase': True,
'Description': 'pretrained on Malay wikipedia word2vec size 256',
},
'socialmedia': {
'Size (MB)': 1300,
'Vocab size': 1294638,
'lowercase': True,
'Description': 'pretrained on cleaned Malay twitter and Malay instagram size 256',
},
'combine': {
'Size (MB)': 1900,
'Vocab size': 1903143,
'lowercase': True,
'Description': 'pretrained on cleaned Malay news + Malay social media + Malay wikipedia size 256',
},
}
[docs]def available_wordvector():
"""
List available transformer models.
"""
from malaya.function import describe_availability
return describe_availability(_wordvector_availability)
[docs]@check_type
def load(model: str = 'wikipedia', **kwargs):
"""
Return malaya.wordvector.WordVector object.
Parameters
----------
model : str, optional (default='wikipedia')
Model architecture supported. Allowed values:
* ``'news'`` - pretrained on cleaned Malay news size 256.
* ``'wikipedia'`` - pretrained on Malay wikipedia word2vec size 256.
* ``'socialmedia'`` - pretrained on cleaned Malay twitter and Malay instagram size 256.
* ``'combine'`` - pretrained on cleaned Malay news + Malay social media + Malay wikipedia size 256.
Returns
-------
vocabulary: indices dictionary for `vector`.
vector: np.array, 2D.
"""
model = model.lower()
if model not in _wordvector_availability:
raise ValueError(
'model not supported, please check supported models from `malaya.wordvector.available_wordvector()`.'
)
path = check_file(PATH_WORDVECTOR[model], S3_PATH_WORDVECTOR[model], **kwargs)
with open(path['vocab']) as fopen:
vocab = json.load(fopen)
vector = np.load(path['model'])
return vocab, vector
[docs]class WordVector:
@check_type
def __init__(self, embed_matrix, dictionary: dict, **kwargs):
"""
Parameters
----------
embed_matrix: numpy array
dictionary: dictionary
"""
self._embed_matrix = embed_matrix
self._dictionary = dictionary
self._reverse_dictionary = {v: k for k, v in dictionary.items()}
self.words = list(dictionary.keys())
self._jarowinkler = JaroWinkler()
device = get_device(**kwargs)
_graph = tf.Graph()
with _graph.as_default():
with tf.device(device):
self._embedding = tf.compat.v1.placeholder(
tf.float32, self._embed_matrix.shape
)
self._x = tf.compat.v1.placeholder(
tf.float32, [None, self._embed_matrix.shape[1]]
)
normed_embedding = tf.nn.l2_normalize(self._embedding, axis=1)
normed_array = tf.nn.l2_normalize(self._x, axis=1)
self._cosine_similarity = tf.matmul(
normed_array, tf.transpose(normed_embedding, [1, 0])
)
self._sess = generate_session(_graph, **kwargs)
[docs] @check_type
def get_vector_by_name(
self, word: str, soft: bool = False, topn_soft: int = 5
):
"""
get vector based on string.
Parameters
----------
word: str
soft: bool, (default=True)
if True, a word not in the dictionary will be replaced with nearest JaroWinkler ratio.
if False, it will throw an exception if a word not in the dictionary.
topn_soft: int, (default=5)
if word not found in dictionary, will returned `topn_soft` size of similar size using jarowinkler.
Returns
-------
vector: np.array, 1D
"""
if word not in self._dictionary:
arr = np.array(
[self._jarowinkler.similarity(word, k) for k in self.words]
)
idx = (-arr).argsort()[:topn_soft]
words = [self.words[i] for i in idx]
if soft:
return words
else:
strings = ', '.join(words)
raise Exception(
'input not found in dictionary, here top-5 nearest words [%s]'
% (strings)
)
return self._embed_matrix[self._dictionary[word]]
[docs] @check_type
def tree_plot(
self, labels, figsize: Tuple[int, int] = (7, 7), annotate: bool = True
):
"""
plot a tree plot based on output from calculator / n_closest / analogy.
Parameters
----------
labels : list
output from calculator / n_closest / analogy.
visualize : bool
if True, it will render plt.show, else return data.
figsize : tuple, (default=(7, 7))
figure size for plot.
Returns
-------
embed: np.array, 2D.
labelled: labels for X / Y axis.
"""
try:
import matplotlib.pyplot as plt
import seaborn as sns
sns.set()
except BaseException:
raise ModuleNotFoundError(
'matplotlib and seaborn not installed. Please install it and try again.'
)
if not isinstance(labels, list):
raise ValueError('input must be a list')
if not isinstance(figsize, tuple):
raise ValueError('figsize must be a tuple')
if not isinstance(annotate, bool):
raise ValueError('annotate must be a boolean')
idx = [
self.words.index(e[0] if isinstance(e, list) else e) for e in labels
]
embed = self._embed_matrix[idx]
embed = embed.dot(embed.T)
embed = (embed - embed.min()) / (embed.max() - embed.min())
labelled = []
for label in labels:
label = (
'%s, %.3f' % (label[0], label[1])
if isinstance(label, list)
else label
)
labelled.append(label)
plt.figure(figsize=figsize)
g = sns.clustermap(
embed,
cmap='Blues',
xticklabels=labelled,
yticklabels=labelled,
annot=annotate,
)
plt.show()
return embed, labelled
[docs] @check_type
def scatter_plot(
self,
labels,
centre: str = None,
figsize: Tuple[int, int] = (7, 7),
plus_minus: int = 25,
handoff: float = 5e-5,
):
"""
plot a scatter plot based on output from calculator / n_closest / analogy.
Parameters
----------
labels : list
output from calculator / n_closest / analogy
centre : str, (default=None)
centre label, if a str, it will annotate in a red color.
figsize : tuple, (default=(7, 7))
figure size for plot.
Returns
-------
tsne: np.array, 2D.
"""
try:
from sklearn.manifold import TSNE
import matplotlib.pyplot as plt
import seaborn as sns
sns.set()
except BaseException:
raise ModuleNotFoundError(
'matplotlib and seaborn not installed. Please install it and try again.'
)
idx = [
self.words.index(e[0] if isinstance(e, list) else e) for e in labels
]
if centre:
idx.append(self.words.index(centre))
cp_idx = idx[:]
for i in idx:
cp_idx.extend(np.arange(i - plus_minus, i).tolist())
cp_idx.extend(np.arange(i, i + plus_minus).tolist())
tsne = TSNE(n_components=2, random_state=0).fit_transform(
self._embed_matrix[cp_idx]
)
plt.figure(figsize=figsize)
plt.scatter(tsne[:, 0], tsne[:, 1])
for label, x, y in zip(
labels, tsne[: len(labels), 0], tsne[: len(labels), 1]
):
label = (
'%s, %.3f' % (label[0], label[1])
if isinstance(label, list)
else label
)
plt.annotate(
label,
xy=(x, y),
xytext=(0, 0),
textcoords='offset points',
)
if centre:
plt.annotate(
centre,
xy=(tsne[len(labels), 0], tsne[len(labels), 1]),
xytext=(0, 0),
textcoords='offset points',
color='red',
)
plt.xlim(
tsne[: len(idx), 0].min() + handoff,
tsne[: len(idx), 0].max() + handoff,
)
plt.ylim(
tsne[: len(idx), 1].min() + handoff,
tsne[: len(idx), 1].max() + handoff,
)
plt.xticks([])
plt.yticks([])
plt.show()
return tsne
def _calculate(self, equation):
tokens, temp = [], ''
for char in equation:
if char == ' ':
continue
if char not in '()*+-':
temp += char
else:
if len(temp):
row = self._dictionary[
self.words[
np.argmax(
[
self._jarowinkler.similarity(temp, k)
for k in self.words
]
)
]
]
tokens.append(
','.join(
self._embed_matrix[row, :].astype('str').tolist()
)
)
temp = ''
tokens.append(char)
if len(temp):
row = self._dictionary[
self.words[
np.argmax(
[
self._jarowinkler.similarity(temp, k)
for k in self.words
]
)
]
]
tokens.append(
','.join(self._embed_matrix[row, :].astype('str').tolist())
)
return Calculator(tokens).exp()
def _batch_process(self, batch, num_closest=5, return_similarity=True):
top_k = tf.nn.top_k(self._cosine_similarity, k=num_closest)
results = self._sess.run(
top_k,
feed_dict={self._x: batch, self._embedding: self._embed_matrix},
)
indices = results.indices
values = results.values
words = []
if not return_similarity:
for result in indices:
words.append([self._reverse_dictionary[i] for i in result])
else:
for no in range(len(results)):
words.append(
[
(
self._reverse_dictionary[indices[no, i]],
values[no, i],
)
for i in range(len(indices[no]))
]
)
return words
[docs] @check_type
def batch_calculator(
self,
equations: List[str],
num_closest: int = 5,
return_similarity: bool = False,
):
"""
batch calculator parser for word2vec using tensorflow.
Parameters
----------
equations: list of str
Eg, '[(mahathir + najib) - rosmah]'
num_closest: int, (default=5)
number of words closest to the result.
Returns
-------
word_list: list of nearest words
"""
batches = np.array([self._calculate(eq) for eq in equations])
return self._batch_process(
batches,
num_closest=num_closest,
return_similarity=return_similarity,
)
[docs] @check_type
def calculator(
self,
equation: str,
num_closest: int = 5,
metric: str = 'cosine',
return_similarity: bool = True,
):
"""
calculator parser for word2vec.
Parameters
----------
equation: str
Eg, '(mahathir + najib) - rosmah'
num_closest: int, (default=5)
number of words closest to the result.
metric: str, (default='cosine')
vector distance algorithm.
return_similarity: bool, (default=True)
if True, will return between 0-1 represents the distance.
Returns
-------
word_list: list of nearest words
"""
calculated = self._calculate(equation)
if return_similarity:
nn = NearestNeighbors(num_closest + 1, metric=metric).fit(
self._embed_matrix
)
distances, idx = nn.kneighbors(calculated.reshape((1, -1)))
word_list = []
for i in range(1, idx.shape[1]):
word_list.append(
[self._reverse_dictionary[idx[0, i]], 1 - distances[0, i]]
)
return word_list
else:
closest_indices = self.closest_row_indices(
calculated, num_closest + 1, metric
)
word_list = []
for i in closest_indices:
word_list.append(self._reverse_dictionary[i])
return word_list
[docs] @check_type
def batch_n_closest(
self,
words: List[str],
num_closest: int = 5,
return_similarity: bool = False,
soft: bool = True,
):
"""
find nearest words based on a batch of words using Tensorflow.
Parameters
----------
words: list
Eg, ['najib','anwar']
num_closest: int, (default=5)
number of words closest to the result.
return_similarity: bool, (default=True)
if True, will return between 0-1 represents the distance.
soft: bool, (default=True)
if True, a word not in the dictionary will be replaced with nearest JaroWinkler ratio.
if False, it will throw an exception if a word not in the dictionary.
Returns
-------
word_list: list of nearest words
"""
if soft:
for i in range(len(words)):
if words[i] not in self.words:
words[i] = self.words[
np.argmax(
[
self._jarowinkler.similarity(words[i], k)
for k in self.words
]
)
]
else:
for i in range(len(words)):
if words[i] not in self.words:
raise Exception(
'%s not in dictionary, please use another word or set `soft` = True'
% (words[i])
)
batches = np.array([self.get_vector_by_name(w) for w in words])
return self._batch_process(
batches,
num_closest=num_closest,
return_similarity=return_similarity,
)
[docs] @check_type
def n_closest(
self,
word: str,
num_closest: int = 5,
metric: str = 'cosine',
return_similarity: bool = True,
):
"""
find nearest words based on a word.
Parameters
----------
word: str
Eg, 'najib'
num_closest: int, (default=5)
number of words closest to the result.
metric: str, (default='cosine')
vector distance algorithm.
return_similarity: bool, (default=True)
if True, will return between 0-1 represents the distance.
Returns
-------
word_list: list of nearest words
"""
if return_similarity:
nn = NearestNeighbors(num_closest + 1, metric=metric).fit(
self._embed_matrix
)
distances, idx = nn.kneighbors(
self.get_vector_by_name(word).reshape((1, -1))
)
word_list = []
for i in range(1, idx.shape[1]):
word_list.append(
[self._reverse_dictionary[idx[0, i]], 1 - distances[0, i]]
)
return word_list
else:
wv = self.get_vector_by_name(word)
closest_indices = self.closest_row_indices(
wv, num_closest + 1, metric
)
word_list = []
for i in closest_indices:
word_list.append(self._reverse_dictionary[i])
if word in word_list:
word_list.remove(word)
return word_list
def closest_row_indices(self, wv, num, metric):
dist_array = np.ravel(
cdist(self._embed_matrix, wv.reshape((1, -1)), metric=metric)
)
sorted_indices = np.argsort(dist_array)
return sorted_indices[:num]
[docs] @check_type
def analogy(
self, a: str, b: str, c: str, num: int = 1, metric: str = 'cosine'
):
"""
analogy calculation, vb - va + vc.
Parameters
----------
a: str
b: str
c: str
num: int, (default=1)
metric: str, (default='cosine')
vector distance algorithm.
Returns
-------
word_list: list of nearest words.
"""
if a not in self._dictionary:
raise ValueError('a not in dictinary')
if b not in self._dictionary:
raise ValueError('b not in dictinary')
if c not in self._dictionary:
raise ValueError('c not in dictinary')
va = self.get_vector_by_name(a)
vb = self.get_vector_by_name(b)
vc = self.get_vector_by_name(c)
vd = vb - va + vc
closest_indices = self.closest_row_indices(vd, num, metric)
d_word_list = []
for i in closest_indices:
d_word_list.append(self._reverse_dictionary[i])
return d_word_list
[docs] @check_type
def project_2d(self, start: int, end: int):
"""
project word2vec into 2d dimension.
Parameters
----------
start: int
end: int
Returns
-------
embed_2d: TSNE decomposition
word_list: words in between `start` and `end`.
"""
tsne = TSNE(n_components=2)
embed_2d = tsne.fit_transform(self._embed_matrix[start:end, :])
word_list = []
for i in range(start, end):
word_list.append(self._reverse_dictionary[i])
return embed_2d, word_list
[docs] @check_type
def network(
self,
word: str,
num_closest: int = 8,
depth: int = 4,
min_distance: float = 0.5,
iteration: int = 300,
figsize: Tuple[int, int] = (15, 15),
node_color: str = '#72bbd0',
node_factor: int = 50,
):
"""
plot a social network based on word given
Parameters
----------
word : str
centre of social network.
num_closest: int, (default=8)
number of words closest to the node.
depth: int, (default=4)
depth of social network. More deeper more expensive to calculate, big^O(num_closest ** depth).
min_distance: float, (default=0.5)
minimum distance among nodes. Increase the value to increase the distance among nodes.
iteration: int, (default=300)
number of loops to train the social network to fit min_distace.
figsize: tuple, (default=(15, 15))
figure size for plot.
node_color: str, (default='#72bbd0')
color for nodes.
node_factor: int, (default=10)
size factor for depth nodes. Increase this value will increase nodes sizes based on depth.
Returns
-------
G: networkx graph object
"""
try:
import pandas as pd
import networkx as nx
import matplotlib.pyplot as plt
except BaseException:
raise ModuleNotFoundError(
'matplotlib, networkx and pandas not installed. Please install it and try again.'
)
def get_follower(
centre,
top,
max_depth=depth,
current_depth=0,
accepted_list=[],
data=[],
):
if current_depth == max_depth:
return data, accepted_list
if centre in accepted_list:
return data, accepted_list
else:
accepted_list.append(centre)
closest = n_closest(
centre, num_closest=num_closest, return_similarity=False
)
d = {
'name': centre,
'followers_ids': closest,
'centre': top,
'power': max_depth - current_depth,
}
data.append(d)
cd = current_depth
if cd + 1 < max_depth:
for fid in closest:
data, accepted_list = get_follower(
fid,
fid,
max_depth=max_depth,
current_depth=cd + 1,
accepted_list=accepted_list,
data=data,
)
return data, accepted_list
data, accepted_list = get_follower(word, word)
df = pd.DataFrame(data)
id_unique = np.unique(df['centre']).tolist()
followers = []
for i in range(df.shape[0]):
followers += df.followers_ids.iloc[i]
followers = list(set(followers))
followers = [i for i in followers if i in id_unique]
true_followers = []
for i in range(df.shape[0]):
follows = df.followers_ids.iloc[i]
follows = [k for k in follows if k in followers]
true_followers.append(follows)
df['true_followers'] = true_followers
G = nx.Graph()
for i in range(df.shape[0]):
size = df.power.iloc[i] * node_factor
G.add_node(df.centre.iloc[i], name=df.centre.iloc[i], size=size)
for i in range(df.shape[0]):
for k in df.true_followers.iloc[i]:
G.add_edge(df.centre.iloc[i], k)
sizes = [G.node[node]['size'] for node in G]
names = [G.node[node]['name'] for node in G]
labeldict = dict(zip(G.nodes(), names))
plt.figure(figsize=figsize)
plt.axis('equal')
nx.draw(
G,
node_color=node_color,
labels=labeldict,
with_labels=1,
node_size=sizes,
pos=nx.spring_layout(G, k=min_distance, iterations=iteration),
)
plt.show()
return G