305 lines
11 KiB
Python
Executable File
305 lines
11 KiB
Python
Executable File
# Courtesy of https://github.com/ldeecke/gmm-torch
|
|
|
|
import torch
|
|
import numpy as np
|
|
|
|
from math import pi
|
|
|
|
|
|
class GaussianMixture(torch.nn.Module):
|
|
"""
|
|
Fits a mixture of k=1,..,K Gaussians to the input data. Input tensors are expected to be flat with dimensions (n: number of samples, d: number of features).
|
|
The model then extends them to (n, k: number of components, d).
|
|
The model parametrization (mu, sigma) is stored as (1, k, d), and probabilities are shaped (n, k, 1) if they relate to an individual sample, or (1, k, 1) if they assign membership probabilities to one of the mixture components.
|
|
"""
|
|
def __init__(self, n_components, n_features, mu_init=None, var_init=None, eps=1.e-6):
|
|
"""
|
|
Initializes the model and brings all tensors into their required shape. The class expects data to be fed as a flat tensor in (n, d). The class owns:
|
|
x: torch.Tensor (n, k, d)
|
|
mu: torch.Tensor (1, k, d)
|
|
var: torch.Tensor (1, k, d)
|
|
pi: torch.Tensor (1, k, 1)
|
|
eps: float
|
|
n_components: int
|
|
n_features: int
|
|
score: float
|
|
args:
|
|
n_components: int
|
|
n_features: int
|
|
mu_init: torch.Tensor (1, k, d)
|
|
var_init: torch.Tensor (1, k, d)
|
|
eps: float
|
|
"""
|
|
|
|
super(GaussianMixture, self).__init__()
|
|
|
|
self.eps = eps
|
|
self.n_components = n_components
|
|
self.n_features = n_features
|
|
self.log_likelihood = -np.inf
|
|
|
|
self.mu_init = mu_init
|
|
self.var_init = var_init
|
|
|
|
self._init_params()
|
|
|
|
|
|
def _init_params(self):
|
|
if self.mu_init is not None:
|
|
assert self.mu_init.size() == (1, self.n_components, self.n_features), "Input mu_init does not have required tensor dimensions (1, %i, %i)" % (self.n_components, self.n_features)
|
|
# (1, k, d)
|
|
self.mu = torch.nn.Parameter(self.mu_init, requires_grad=False)
|
|
else:
|
|
self.mu = torch.nn.Parameter(torch.randn(1, self.n_components, self.n_features), requires_grad=False)
|
|
|
|
if self.var_init is not None:
|
|
assert self.var_init.size() == (1, self.n_components, self.n_features), "Input var_init does not have required tensor dimensions (1, %i, %i)" % (self.n_components, self.n_features)
|
|
# (1, k, d)
|
|
self.var = torch.nn.Parameter(self.var_init, requires_grad=False)
|
|
else:
|
|
self.var = torch.nn.Parameter(torch.ones(1, self.n_components, self.n_features), requires_grad=False)
|
|
|
|
# (1, k, 1)
|
|
self.pi = torch.nn.Parameter(torch.Tensor(1, self.n_components, 1), requires_grad=False).fill_(1./self.n_components)
|
|
|
|
self.params_fitted = False
|
|
|
|
|
|
def bic(self, x):
|
|
"""
|
|
Bayesian information criterion for samples x.
|
|
args:
|
|
x: torch.Tensor (n, d) or (n, k, d)
|
|
returns:
|
|
bic: float
|
|
"""
|
|
n = x.shape[0]
|
|
|
|
if len(x.size()) == 2:
|
|
# (n, d) --> (n, k, d)
|
|
x = x.unsqueeze(1).expand(n, self.n_components, x.size(1))
|
|
|
|
bic = -2. * self.__score(self.pi, self.__p_k(x, self.mu, self.var), sum_data=True) * n + self.n_components * np.log(n)
|
|
|
|
return bic
|
|
|
|
|
|
def fit(self, x, warm_start=False, delta=1e-8, n_iter=1000):
|
|
"""
|
|
Public method that fits data to the model.
|
|
args:
|
|
n_iter: int
|
|
delta: float
|
|
"""
|
|
|
|
if not warm_start and self.params_fitted:
|
|
self._init_params()
|
|
|
|
if len(x.size()) == 2:
|
|
# (n, d) --> (n, k, d)
|
|
x = x.unsqueeze(1).expand(x.size(0), self.n_components, x.size(1))
|
|
|
|
i = 0
|
|
j = np.inf
|
|
|
|
while (i <= n_iter) and (j >= delta):
|
|
|
|
log_likelihood_old = self.log_likelihood
|
|
mu_old = self.mu
|
|
var_old = self.var
|
|
|
|
self.__em(x)
|
|
self.log_likelihood = self.__score(self.pi, self.__p_k(x, self.mu, self.var))
|
|
|
|
if (self.log_likelihood.abs() == float("Inf")) or (self.log_likelihood == float("nan")):
|
|
# when the log-likelihood assumes inane values, reinitialize model
|
|
self.__init__(self.n_components, self.n_features)
|
|
|
|
i += 1
|
|
j = self.log_likelihood - log_likelihood_old
|
|
if j <= delta:
|
|
# when the score decreases, revert to old parameters
|
|
self.__update_mu(mu_old)
|
|
self.__update_var(var_old)
|
|
|
|
self.params_fitted = True
|
|
|
|
|
|
def predict(self, x, probs=False):
|
|
"""
|
|
Assigns input data to one of the mixture components by evaluating the likelihood under each. If probs=True returns normalized probabilities of class membership instead.
|
|
args:
|
|
x: torch.Tensor (n, d) or (n, k, d)
|
|
probs: bool
|
|
returns:
|
|
y: torch.LongTensor (n)
|
|
"""
|
|
|
|
if len(x.size()) == 2:
|
|
# (n, d) --> (n, k, d)
|
|
x = x.unsqueeze(1).expand(x.size(0), self.n_components, x.size(1))
|
|
|
|
p_k = self.__p_k(x, self.mu, self.var)
|
|
if probs:
|
|
return p_k / (p_k.sum(1, keepdim=True) + self.eps)
|
|
else:
|
|
_, predictions = torch.max(p_k, 1)
|
|
return torch.squeeze(predictions).type(torch.LongTensor)
|
|
|
|
def predict_proba(self, x):
|
|
"""
|
|
Returns normalized probabilities of class membership.
|
|
args:
|
|
x: torch.Tensor (n, d) or (n, k, d)
|
|
returns:
|
|
y: torch.LongTensor (n)
|
|
"""
|
|
return self.predict(x, probs=True)
|
|
|
|
|
|
def score_samples(self, x):
|
|
"""
|
|
Computes log-likelihood of data (x) under the current model.
|
|
args:
|
|
x: torch.Tensor (n, d) or (n, k, d)
|
|
returns:
|
|
score: torch.LongTensor (n)
|
|
"""
|
|
if len(x.size()) == 2:
|
|
# (n, d) --> (n, k, d)
|
|
x = x.unsqueeze(1).expand(x.size(0), self.n_components, x.size(1))
|
|
|
|
score = self.__score(self.pi, self.__p_k(x, self.mu, self.var), sum_data=False)
|
|
return score
|
|
|
|
|
|
def __p_k(self, x, mu, var):
|
|
"""
|
|
Returns a tensor with dimensions (n, k, 1) indicating the likelihood of data belonging to the k-th Gaussian.
|
|
args:
|
|
x: torch.Tensor (n, k, d)
|
|
mu: torch.Tensor (1, k, d)
|
|
var: torch.Tensor (1, k, d)
|
|
returns:
|
|
p_k: torch.Tensor (n, k, 1)
|
|
"""
|
|
|
|
# (1, k, d) --> (n, k, d)
|
|
mu = mu.expand(x.size(0), self.n_components, self.n_features)
|
|
var = var.expand(x.size(0), self.n_components, self.n_features)
|
|
|
|
# (n, k, d) --> (n, k, 1)
|
|
exponent = torch.exp(-.5 * torch.sum((x - mu) * (x - mu) / var, 2, keepdim=True))
|
|
# (n, k, d) --> (n, k, 1)
|
|
prefactor = torch.rsqrt(((2. * pi) ** self.n_features) * torch.prod(var, dim=2, keepdim=True) + self.eps)
|
|
|
|
return prefactor * exponent
|
|
|
|
|
|
def __e_step(self, pi, p_k):
|
|
"""
|
|
Computes weights that indicate the probabilistic belief that a data point was generated by one of the k mixture components. This is the so-called expectation step of the EM-algorithm.
|
|
args:
|
|
pi: torch.Tensor (1, k, 1)
|
|
p_k: torch.Tensor (n, k, 1)
|
|
returns:
|
|
weights: torch.Tensor (n, k, 1)
|
|
"""
|
|
|
|
weights = pi * p_k
|
|
return torch.div(weights, torch.sum(weights, 1, keepdim=True) + self.eps)
|
|
|
|
|
|
def __m_step(self, x, weights):
|
|
"""
|
|
Updates the model's parameters. This is the maximization step of the EM-algorithm.
|
|
args:
|
|
x: torch.Tensor (n, k, d)
|
|
weights: torch.Tensor (n, k, 1)
|
|
returns:
|
|
pi_new: torch.Tensor (1, k, 1)
|
|
mu_new: torch.Tensor (1, k, d)
|
|
var_new: torch.Tensor (1, k, d)
|
|
"""
|
|
|
|
# (n, k, 1) --> (1, k, 1)
|
|
n_k = torch.sum(weights, 0, keepdim=True)
|
|
pi_new = torch.div(n_k, torch.sum(n_k, 1, keepdim=True) + self.eps)
|
|
# (n, k, d) --> (1, k, d)
|
|
mu_new = torch.div(torch.sum(weights * x, 0, keepdim=True), n_k + self.eps)
|
|
# (n, k, d) --> (1, k, d)
|
|
var_new = torch.div(torch.sum(weights * (x - mu_new) * (x - mu_new), 0, keepdim=True), n_k + self.eps)
|
|
|
|
return pi_new, mu_new, var_new
|
|
|
|
|
|
def __em(self, x):
|
|
"""
|
|
Performs one iteration of the expectation-maximization algorithm by calling the respective subroutines.
|
|
args:
|
|
x: torch.Tensor (n, k, d)
|
|
"""
|
|
|
|
weights = self.__e_step(self.pi, self.__p_k(x, self.mu, self.var))
|
|
pi_new, mu_new, var_new = self.__m_step(x, weights)
|
|
|
|
self.__update_pi(pi_new)
|
|
self.__update_mu(mu_new)
|
|
self.__update_var(var_new)
|
|
|
|
|
|
def __score(self, pi, p_k, sum_data=True):
|
|
"""
|
|
Computes the log-likelihood of the data under the model.
|
|
args:
|
|
pi: torch.Tensor (1, k, 1)
|
|
p_k: torch.Tensor (n, k, 1)
|
|
"""
|
|
|
|
weights = pi * p_k
|
|
if sum_data:
|
|
return torch.sum(torch.log(torch.sum(weights, 1) + self.eps))
|
|
else:
|
|
return torch.log(torch.sum(weights, 1) + self.eps)
|
|
|
|
|
|
def __update_mu(self, mu):
|
|
"""
|
|
Updates mean to the provided value.
|
|
args:
|
|
mu: torch.FloatTensor
|
|
"""
|
|
|
|
assert mu.size() in [(self.n_components, self.n_features), (1, self.n_components, self.n_features)], "Input mu does not have required tensor dimensions (%i, %i) or (1, %i, %i)" % (self.n_components, self.n_features, self.n_components, self.n_features)
|
|
|
|
if mu.size() == (self.n_components, self.n_features):
|
|
self.mu = mu.unsqueeze(0)
|
|
elif mu.size() == (1, self.n_components, self.n_features):
|
|
self.mu.data = mu
|
|
|
|
|
|
def __update_var(self, var):
|
|
"""
|
|
Updates variance to the provided value.
|
|
args:
|
|
var: torch.FloatTensor
|
|
"""
|
|
|
|
assert var.size() in [(self.n_components, self.n_features), (1, self.n_components, self.n_features)], "Input var does not have required tensor dimensions (%i, %i) or (1, %i, %i)" % (self.n_components, self.n_features, self.n_components, self.n_features)
|
|
|
|
if var.size() == (self.n_components, self.n_features):
|
|
self.var = var.unsqueeze(0)
|
|
elif var.size() == (1, self.n_components, self.n_features):
|
|
self.var.data = var
|
|
|
|
|
|
def __update_pi(self, pi):
|
|
"""
|
|
Updates pi to the provided value.
|
|
args:
|
|
pi: torch.FloatTensor
|
|
"""
|
|
|
|
assert pi.size() in [(1, self.n_components, 1)], "Input pi does not have required tensor dimensions (%i, %i, %i)" % (1, self.n_components, 1)
|
|
|
|
self.pi.data = pi |