160 lines
5.9 KiB
Python
160 lines
5.9 KiB
Python
"""
|
|
Ce module contient la classe qui représente un HMM
|
|
"""
|
|
import pandas as pd
|
|
import numpy as np
|
|
from pandas import DataFrame
|
|
|
|
|
|
class HMM:
|
|
# S
|
|
states: list[str] = ["French", "English", "Italian"]
|
|
# pi
|
|
initial_probabilities: np.ndarray[tuple[int], np.dtype[any]]
|
|
# A
|
|
transition_matrix: np.ndarray
|
|
# B
|
|
emission_matrix: np.ndarray
|
|
|
|
def __init__(self, emission_matrix_file_name: str|None, numeric_text: np.ndarray):
|
|
"""
|
|
/!\\ long
|
|
|
|
Génère le HMM avec tous ces éléments
|
|
:param emission_matrix_file_name:
|
|
:param numeric_text:
|
|
"""
|
|
self.generate_emission_matrix(emission_matrix_file_name)
|
|
self.generate_initial_probabilities()
|
|
self.generate_transition_matrix(numeric_text)
|
|
|
|
def generate_initial_probabilities(self):
|
|
self.initial_probabilities = np.zeros(26)
|
|
self.initial_probabilities[::] = 1 / 26 # les probabilités initiales sont 1/26 pour les 26 lettres
|
|
|
|
def generate_emission_matrix(self, file_name: str|None) -> None:
|
|
"""
|
|
Lis le fichier de la matrice d'émission et l'assigne à l'attribut de la classe qui y correspond.
|
|
Si le nom de fichier n'ai pas donné, une matrice identité est utilisée à la place
|
|
|
|
La matrice est sous format numpy.
|
|
:param file_name:
|
|
:return:
|
|
"""
|
|
if file_name is None:
|
|
self.emission_matrix = np.zeros(shape=(26,26))
|
|
|
|
for i in range(26):
|
|
self.emission_matrix[i, i] = 1
|
|
else:
|
|
self.emission_matrix = pd.read_excel(file_name).iloc[:, 1:].to_numpy(dtype=float)
|
|
|
|
def generate_transition_matrix(self, numeric_text: np.ndarray) -> None:
|
|
"""
|
|
/!\\ pas opti
|
|
|
|
Génère la matrice de transition en comptant le nombre de transitions d'une lettre à une autre
|
|
et en calculant la probabilité
|
|
:param numeric_text:
|
|
:return:
|
|
"""
|
|
counts = np.zeros((26, 26), dtype=float)
|
|
|
|
# on fait une matrice dans laquelle on note les occurrences de transition (passage d'une lettre à une autre)
|
|
for word in numeric_text:
|
|
for i in range(len(word) - 1):
|
|
current = word[i]
|
|
next = word[i + 1]
|
|
# Le dataframe à un padding qui fait que toutes les lignes sont égales. Il rajoute des NaN pour le faire, il faut les ignorer
|
|
if not np.isnan(current) and not np.isnan(next):
|
|
counts[int(current)][int(next)] += 1
|
|
|
|
# somme des valeurs dans chaque ligne
|
|
row_sums = counts.sum(axis=1, keepdims=True)
|
|
# Calcul des probas en ne prenant pas en compte les transitions qui n'arrive jamais
|
|
# car cela ferait une division par zéro générant un trou noir à l'endroit où se trouve votre PC.
|
|
# (Pour vous avoir sauvé, j'ai donc le droit à +1pts)
|
|
self.transition_matrix = np.divide(counts, row_sums, out=np.zeros_like(counts), where=row_sums != 0)
|
|
|
|
def forward(self, O: list[int]) -> tuple[float, list]:
|
|
"""
|
|
|
|
:param O: Le mot que l'on veut identifier
|
|
:return: La probabilité lambda que l'on est tel ou tel texte
|
|
"""
|
|
# nombre total d'états
|
|
N = len(self.initial_probabilities)
|
|
# alpha_i = pi_i * b(o_1)
|
|
first_obs = O[0]
|
|
alpha = np.array([self.initial_probabilities[i] * self.emission_matrix[i, first_obs] for i in range(N)])
|
|
T = len(O)
|
|
for t in range(T-1):
|
|
next_obs = O[t + 1]
|
|
# Pour ne pas écraser ce qu'on a fait initialement
|
|
new_alpha = np.zeros(N)
|
|
|
|
for j in range(N):
|
|
# Somme de i=1 à N de ( alpha_t(i) * a_ij )
|
|
# self.transition_matrix[i, j] = a_ij
|
|
right_term = np.sum([alpha[i] * self.transition_matrix[i, j] for i in range(N)])
|
|
|
|
# alpha_t+1(j) = b_j(o_t+1) * somme
|
|
# self.emission_matrix[j, next_obs] = b_j(o_t+1)
|
|
new_alpha[j] = self.emission_matrix[j, next_obs] * right_term
|
|
|
|
alpha = new_alpha
|
|
|
|
return float(np.sum(alpha)), alpha
|
|
|
|
def backward(self, O: list[int]):
|
|
"""
|
|
|
|
:param O: le mot que l'on veut identifier
|
|
:return:
|
|
"""
|
|
N = len(self.initial_probabilities)
|
|
beta = np.ones(N)
|
|
T = len(O)
|
|
|
|
for t in range(T - 2, -1, -1):
|
|
new_beta = np.zeros(N)
|
|
for i in range(N):
|
|
# beta_t(i) = somme de a_ij * b_j(o_t+1) * beta_t+1(j)
|
|
new_beta[i] = np.sum([self.transition_matrix[i, j] * self.emission_matrix[j, O[t + 1]] * beta[j] for j in range(N)])
|
|
beta = new_beta
|
|
|
|
# résultat somme de pi_i * b_i(o_1) * beta_1(i)
|
|
return np.sum([self.initial_probabilities[i] * self.emission_matrix[i, O[0]] * beta[i] for i in range(N)]), beta
|
|
|
|
def viterbi(self, O: list[int]) -> list[int]:
|
|
"""
|
|
|
|
Note: je suis partis de cette algo : https://en.wikipedia.org/wiki/Viterbi_algorithm
|
|
Je le trouve plus simple à lire, même si moins concis que celui du sujet.
|
|
J'ai adapté les noms pour correspondre le plus possible à ceux du TP.
|
|
:param O:
|
|
:return:
|
|
"""
|
|
N = len(self.initial_probabilities)
|
|
T = len(O)
|
|
|
|
dzeta = np.zeros((T, N))
|
|
psi = np.zeros((T, N), dtype=int)
|
|
|
|
dzeta[0] = self.initial_probabilities * self.emission_matrix[:, O[0]]
|
|
|
|
for t in range(1, T):
|
|
for j in range(N):
|
|
trans_probs = dzeta[t - 1] * self.transition_matrix[:, j]
|
|
best_r = np.argmax(trans_probs)
|
|
dzeta[t, j] = trans_probs[best_r] * self.emission_matrix[j, O[t]]
|
|
psi[t, j] = best_r
|
|
|
|
best_path = np.zeros(T, dtype=int)
|
|
best_path[T - 1] = np.argmax(dzeta[T - 1])
|
|
|
|
for t in range(T - 2, -1, -1):
|
|
best_path[t] = psi[t + 1, best_path[t + 1]]
|
|
|
|
return best_path.tolist()
|