bayesiancnn/Mixtures/gmm.py

305 lines
11 KiB
Python
Executable File

# Courtesy of https://github.com/ldeecke/gmm-torch
import torch
import numpy as np
from math import pi
class GaussianMixture(torch.nn.Module):
"""
Fits a mixture of k=1,..,K Gaussians to the input data. Input tensors are expected to be flat with dimensions (n: number of samples, d: number of features).
The model then extends them to (n, k: number of components, d).
The model parametrization (mu, sigma) is stored as (1, k, d), and probabilities are shaped (n, k, 1) if they relate to an individual sample, or (1, k, 1) if they assign membership probabilities to one of the mixture components.
"""
def __init__(self, n_components, n_features, mu_init=None, var_init=None, eps=1.e-6):
"""
Initializes the model and brings all tensors into their required shape. The class expects data to be fed as a flat tensor in (n, d). The class owns:
x: torch.Tensor (n, k, d)
mu: torch.Tensor (1, k, d)
var: torch.Tensor (1, k, d)
pi: torch.Tensor (1, k, 1)
eps: float
n_components: int
n_features: int
score: float
args:
n_components: int
n_features: int
mu_init: torch.Tensor (1, k, d)
var_init: torch.Tensor (1, k, d)
eps: float
"""
super(GaussianMixture, self).__init__()
self.eps = eps
self.n_components = n_components
self.n_features = n_features
self.log_likelihood = -np.inf
self.mu_init = mu_init
self.var_init = var_init
self._init_params()
def _init_params(self):
if self.mu_init is not None:
assert self.mu_init.size() == (1, self.n_components, self.n_features), "Input mu_init does not have required tensor dimensions (1, %i, %i)" % (self.n_components, self.n_features)
# (1, k, d)
self.mu = torch.nn.Parameter(self.mu_init, requires_grad=False)
else:
self.mu = torch.nn.Parameter(torch.randn(1, self.n_components, self.n_features), requires_grad=False)
if self.var_init is not None:
assert self.var_init.size() == (1, self.n_components, self.n_features), "Input var_init does not have required tensor dimensions (1, %i, %i)" % (self.n_components, self.n_features)
# (1, k, d)
self.var = torch.nn.Parameter(self.var_init, requires_grad=False)
else:
self.var = torch.nn.Parameter(torch.ones(1, self.n_components, self.n_features), requires_grad=False)
# (1, k, 1)
self.pi = torch.nn.Parameter(torch.Tensor(1, self.n_components, 1), requires_grad=False).fill_(1./self.n_components)
self.params_fitted = False
def bic(self, x):
"""
Bayesian information criterion for samples x.
args:
x: torch.Tensor (n, d) or (n, k, d)
returns:
bic: float
"""
n = x.shape[0]
if len(x.size()) == 2:
# (n, d) --> (n, k, d)
x = x.unsqueeze(1).expand(n, self.n_components, x.size(1))
bic = -2. * self.__score(self.pi, self.__p_k(x, self.mu, self.var), sum_data=True) * n + self.n_components * np.log(n)
return bic
def fit(self, x, warm_start=False, delta=1e-8, n_iter=1000):
"""
Public method that fits data to the model.
args:
n_iter: int
delta: float
"""
if not warm_start and self.params_fitted:
self._init_params()
if len(x.size()) == 2:
# (n, d) --> (n, k, d)
x = x.unsqueeze(1).expand(x.size(0), self.n_components, x.size(1))
i = 0
j = np.inf
while (i <= n_iter) and (j >= delta):
log_likelihood_old = self.log_likelihood
mu_old = self.mu
var_old = self.var
self.__em(x)
self.log_likelihood = self.__score(self.pi, self.__p_k(x, self.mu, self.var))
if (self.log_likelihood.abs() == float("Inf")) or (self.log_likelihood == float("nan")):
# when the log-likelihood assumes inane values, reinitialize model
self.__init__(self.n_components, self.n_features)
i += 1
j = self.log_likelihood - log_likelihood_old
if j <= delta:
# when the score decreases, revert to old parameters
self.__update_mu(mu_old)
self.__update_var(var_old)
self.params_fitted = True
def predict(self, x, probs=False):
"""
Assigns input data to one of the mixture components by evaluating the likelihood under each. If probs=True returns normalized probabilities of class membership instead.
args:
x: torch.Tensor (n, d) or (n, k, d)
probs: bool
returns:
y: torch.LongTensor (n)
"""
if len(x.size()) == 2:
# (n, d) --> (n, k, d)
x = x.unsqueeze(1).expand(x.size(0), self.n_components, x.size(1))
p_k = self.__p_k(x, self.mu, self.var)
if probs:
return p_k / (p_k.sum(1, keepdim=True) + self.eps)
else:
_, predictions = torch.max(p_k, 1)
return torch.squeeze(predictions).type(torch.LongTensor)
def predict_proba(self, x):
"""
Returns normalized probabilities of class membership.
args:
x: torch.Tensor (n, d) or (n, k, d)
returns:
y: torch.LongTensor (n)
"""
return self.predict(x, probs=True)
def score_samples(self, x):
"""
Computes log-likelihood of data (x) under the current model.
args:
x: torch.Tensor (n, d) or (n, k, d)
returns:
score: torch.LongTensor (n)
"""
if len(x.size()) == 2:
# (n, d) --> (n, k, d)
x = x.unsqueeze(1).expand(x.size(0), self.n_components, x.size(1))
score = self.__score(self.pi, self.__p_k(x, self.mu, self.var), sum_data=False)
return score
def __p_k(self, x, mu, var):
"""
Returns a tensor with dimensions (n, k, 1) indicating the likelihood of data belonging to the k-th Gaussian.
args:
x: torch.Tensor (n, k, d)
mu: torch.Tensor (1, k, d)
var: torch.Tensor (1, k, d)
returns:
p_k: torch.Tensor (n, k, 1)
"""
# (1, k, d) --> (n, k, d)
mu = mu.expand(x.size(0), self.n_components, self.n_features)
var = var.expand(x.size(0), self.n_components, self.n_features)
# (n, k, d) --> (n, k, 1)
exponent = torch.exp(-.5 * torch.sum((x - mu) * (x - mu) / var, 2, keepdim=True))
# (n, k, d) --> (n, k, 1)
prefactor = torch.rsqrt(((2. * pi) ** self.n_features) * torch.prod(var, dim=2, keepdim=True) + self.eps)
return prefactor * exponent
def __e_step(self, pi, p_k):
"""
Computes weights that indicate the probabilistic belief that a data point was generated by one of the k mixture components. This is the so-called expectation step of the EM-algorithm.
args:
pi: torch.Tensor (1, k, 1)
p_k: torch.Tensor (n, k, 1)
returns:
weights: torch.Tensor (n, k, 1)
"""
weights = pi * p_k
return torch.div(weights, torch.sum(weights, 1, keepdim=True) + self.eps)
def __m_step(self, x, weights):
"""
Updates the model's parameters. This is the maximization step of the EM-algorithm.
args:
x: torch.Tensor (n, k, d)
weights: torch.Tensor (n, k, 1)
returns:
pi_new: torch.Tensor (1, k, 1)
mu_new: torch.Tensor (1, k, d)
var_new: torch.Tensor (1, k, d)
"""
# (n, k, 1) --> (1, k, 1)
n_k = torch.sum(weights, 0, keepdim=True)
pi_new = torch.div(n_k, torch.sum(n_k, 1, keepdim=True) + self.eps)
# (n, k, d) --> (1, k, d)
mu_new = torch.div(torch.sum(weights * x, 0, keepdim=True), n_k + self.eps)
# (n, k, d) --> (1, k, d)
var_new = torch.div(torch.sum(weights * (x - mu_new) * (x - mu_new), 0, keepdim=True), n_k + self.eps)
return pi_new, mu_new, var_new
def __em(self, x):
"""
Performs one iteration of the expectation-maximization algorithm by calling the respective subroutines.
args:
x: torch.Tensor (n, k, d)
"""
weights = self.__e_step(self.pi, self.__p_k(x, self.mu, self.var))
pi_new, mu_new, var_new = self.__m_step(x, weights)
self.__update_pi(pi_new)
self.__update_mu(mu_new)
self.__update_var(var_new)
def __score(self, pi, p_k, sum_data=True):
"""
Computes the log-likelihood of the data under the model.
args:
pi: torch.Tensor (1, k, 1)
p_k: torch.Tensor (n, k, 1)
"""
weights = pi * p_k
if sum_data:
return torch.sum(torch.log(torch.sum(weights, 1) + self.eps))
else:
return torch.log(torch.sum(weights, 1) + self.eps)
def __update_mu(self, mu):
"""
Updates mean to the provided value.
args:
mu: torch.FloatTensor
"""
assert mu.size() in [(self.n_components, self.n_features), (1, self.n_components, self.n_features)], "Input mu does not have required tensor dimensions (%i, %i) or (1, %i, %i)" % (self.n_components, self.n_features, self.n_components, self.n_features)
if mu.size() == (self.n_components, self.n_features):
self.mu = mu.unsqueeze(0)
elif mu.size() == (1, self.n_components, self.n_features):
self.mu.data = mu
def __update_var(self, var):
"""
Updates variance to the provided value.
args:
var: torch.FloatTensor
"""
assert var.size() in [(self.n_components, self.n_features), (1, self.n_components, self.n_features)], "Input var does not have required tensor dimensions (%i, %i) or (1, %i, %i)" % (self.n_components, self.n_features, self.n_components, self.n_features)
if var.size() == (self.n_components, self.n_features):
self.var = var.unsqueeze(0)
elif var.size() == (1, self.n_components, self.n_features):
self.var.data = var
def __update_pi(self, pi):
"""
Updates pi to the provided value.
args:
pi: torch.FloatTensor
"""
assert pi.size() in [(1, self.n_components, 1)], "Input pi does not have required tensor dimensions (%i, %i, %i)" % (1, self.n_components, 1)
self.pi.data = pi