Source code for tclust._tclust

from ._iteration import Iteration

import numpy as np
from sklearn.base import BaseEstimator, ClusterMixin
from sklearn.utils.validation import check_is_fitted

from numba import jit  # to speed up some routines

import warnings
warnings.filterwarnings('ignore')


[docs]class TClust(ClusterMixin, BaseEstimator): """ General Trimming Approach to Robust Cluster Analysis TClust searches for k (or less) clusters with different covariance structures in a data matrix x. To make the estimation robust, a proportion alpha of observations may be trimmed. This iterative algorithm initializes k clusters randomly and performs "concentration steps" in order to improve the current cluster assignment. The number of maximum concentration steps to be performed is given by iter_max. For approximately obtaining the global optimum, the system is initialized n_inits times and concentration steps are performed until convergence or ksteps is reached. When processing more complex data sets, higher values of n_inits and ksteps have to be specified (obviously implying extra computation time). However, if more than half of the iterations do not converge, a warning message is issued, indicating that n_inits has to be increased. The parameter restr_cov_var defines the cluster’s shape restrictions, which are applied to all clusters during each iteration. Options "eigen"/"deter" restrict the ratio between the maximum and minimum eigenvalue/determinant of all cluster’s covariance structures to parameter restr_fact. Setting restr_fact=1 yields the strongest restriction, forcing all eigenvalues/determinants to be equal and so the method looks for similarly scattered (respectively spherical) clusters. Option "sigma" is a simpler restriction, which averages the covariance structures during each iteration (weighted by cluster sizes) in order to get similar (equal) cluster scatters. .. note:: The trimmed k-means method (tkmeans) can be obtained by setting parameters restr="eigen", restr_fact=1 and equal_weights = True. Parameters ---------- k : int, default=2 The number of clusters alpha : float, default=0.05 The proportion of observations to be trimmed n_inits : int, default=20 The number of random intializations to be performed ksteps : int, default=40 The maximum number of concentration steps to be performed restr_cov_value : string, default='eigen' The type of restriction to be applied on the cluster scatter matrices. Valid values are {"eigen", "deter", "sigma"}. equal_weights : bool, default=False Specifying whether cluster weights are equal. zero_tol : float, default=1e-16 The zero tolerance used. maxfact_e : float, default=5 Level of eigen constraints. maxfact_d : float, default=5 Level of determinant constraints. m : float, default=2. Fuzzy power parameter. opt : string, default='hard' Type of assignment. Accepted values are {'hard', 'mixture', 'fuzzy'} sol_ini : object of class Iteration, default=None Initial solution provided by the user. tk : bool, default=False Whether to use tkmeans initialization. verbose : bool, default=True Whether to print the progress of the objective function throughout the iterations. Example -------- >>> from tclust import TClust >>> import numpy as np >>> X = np.array([[1, 2], [1, 4], [1, 0], ... [10, 2], [10, 4], [10, 0]]) >>> clustering = TClust(k=2).fit(X) >>> clustering.labels_ array([2, 2, 2, 1, 1, 1], dtype=int64) >>> clustering.iter.center array([[10., 2.], [ 1., 2.]]) """ def __init__(self, k=2, alpha=0.05, n_inits=20, ksteps=10, equal_weights=False, restr_cov_value='eigen', maxfact_e=5., maxfact_d=5, m=2., zero_tol=1e-16, opt='hard', sol_ini=None, tk=False, verbose=True): """ Initialise :param k: Number of clusters initially searched for. :param alpha: Proportion of observations to be trimmed. :param n_inits: Number of random initializations to be performed. :param ksteps: Maximum number of concentration steps to be performed. The concentration steps are stopped whenever two consecutive steps lead to the same data partition. :param equal_weights: Specifies whether equal cluster weights (True) or not (False) shall be considered in the concentration and assignment steps. :param restr_cov_value: Type of restriction to be applied on the cluster scatter matrices. Valid values are "eigen" (default) , "deter" and "sigma". :param maxfact_e: level of eigen constraints :param maxfact_d: level of determinant constraints :param m: fuzzy power parameter :param zero_tol: The zero tolerance used. Default = 1e-16. :param verbose: Defines the verbosity level (default = True). If True, it gives additional information on the iteratively decreaseing objective function's value. :param opt: type of assignment ('hard', 'mixture', or 'fuzzy'). Default='hard'. :param sol_ini: Initial solution provided by the user. Default=None (i.e., random initialization). :param tk: Whether to use random tkmeans initialization. """ self.k = k # number of clusters self.alpha = alpha # level of trimming self.n_inits = n_inits # number of random initializations self.ksteps = ksteps # number of iterations within each initialization self.equal_weights = equal_weights # equal population proportions for all clusters self.zero_tol = zero_tol # zero tolerance (to be implemented) self.m = m # fuzzy power parameter self.restr_deter = None self.restr_cov_value = restr_cov_value self.maxfact_e = maxfact_e # level eigen constraints self.maxfact_d = maxfact_d # level determinant constraints assert opt.lower() in ['mixture', 'hard', 'fuzzy'], "opt must be 'mixture' for a mixture model, " \ "'hard' for a hard clustering assignment, " \ "or 'fuzzy' for fuzzy clustering" self.opt = opt.lower() # estimated model ('mixture' for a mixture model; 'hard' for hard assignment; 'fuzzy' for fuzzy clustering) self.sol_ini = sol_ini # initial solution (provided by the user); needs to be an Iteration object self.tk = tk # tkmeans self.iter = None self.best_iter = Iteration() self.no_trim = None self.verbose = verbose
[docs] def fit(self, X, y=None): """ Compute tclust clustering. Parameters ----------- X : {array-like, sparse matrix} of shape (n_samples, n_features) Training instances to cluster. The observations should be given row-wise. y : Ignored Not used, present for API consistency with scikit-learn by convention. Returns -------- self Fitted estimator. """ n_samples, n_features = X.shape # number of observations (n_samples); number of dimensions (n_features) self._check_params(n_samples, n_features) # Start algorithm for j in range(self.n_inits): if self.verbose and j > 0 and (j + 1) % 50 == 0: print('Initialisation = {}/{}'.format(j + 1, self.n_inits)) if self.sol_ini is not None: # if user provided initial solution self.n_inits = 1 self.iter = self.sol_ini else: # random initial solution self.init_clusters(X) for i in range(self.ksteps): self.f_restr(n_features) # restricting the clusters' scatter structure if self.iter.code == 0: # all eigenvalues are 0 if i > 0: self.calc_obj_function(X) self.treatSingularity() return else: for k in range(self.k): self.iter.sigma[:, :, k] = np.identity(n_features) if self.opt == 'fuzzy': # estimates the cluster's assignment and TRIMMING (fuzzy assignment) self.findFuzzyClusterLabels(X) else: # estimates the cluster's assignment and TRIMMING (mixture models and hard assignment) self.findClusterLabels(X) if self.iter.code == 2 or i == self.ksteps - 1: # if findClusterLabels returned 1, meaning that the cluster assignment has not changed or we're in the last concentration step: break # break the for loop: we finished this iteration! Don't re-estimate cluster parameters this time self.estimClustPar(X) # estimates the cluster's parameters self.calc_obj_function(X) # calculates the objetive function value if self.iter.obj > self.best_iter.obj: if self.verbose: print('(%d/%d) New iteration is better! Old obj: %.2f; New obj: %.2f' % ( j + 1, self.n_inits, self.best_iter.obj, self.iter.obj)) self.best_iter.update(obj=self.iter.obj, labels_=self.iter.labels_, csize=self.iter.csize, cw=self.iter.cw, sigma=self.iter.sigma, center=self.iter.center, z_ij=self.iter.z_ij, lambd=self.iter.lambd, code=self.iter.code) self.labels_ = self.best_iter.labels_.copy() return self
[docs] def predict(self, X): """ Predict the closest cluster each sample in X belongs to. :param X: {array-like, sparse matrix} of shape (n_samples, n_features) New data to predict. :return: labels_: ndarray of shape (n_samples, ) Index of the cluster each sample belongs to. """ check_is_fitted(self) if self.opt in ['hard', 'mixture']: self.findClusterLabels(X) else: self.findFuzzyClusterLabels(X) return iter.labels_
[docs] def fit_predict(self, X, y=None): """ Compute cluster centers and predict cluster index for each sample. Convenience method; equivalent to calling fit(X) followed by predict(X). Parameters ---------- X : {array-like, sparse matrix} of shape (n_samples, n_features) New data to transform. y : Ignored Not used, present here for API consistency by convention. Returns ------- labels_ : ndarray of shape (n_samples,) Index of the cluster each sample belongs to. """ return self.fit(X).labels_
[docs] def calc_obj_function(self, X): """ Calculates the objective function value for mixture, hard, and fuzzy assignments :param X: array of shape=(nsamples, nfeatures) containing the data :return: N/A """ ww_m = np.zeros(shape=(X.shape[0], 1)) # mixture ww = np.zeros(shape=(X.shape[0], 1)) # hard or fuzzy for k in range(self.k): # for each cluster if self.tk: w_m = self.iter.cw[k] * dmnorm_tk(x=X, mu=self.iter.center[k, :], lambd=self.iter.lambd[k]) else: w_m = self.iter.cw[k] * dmnorm(x=X, mu=self.iter.center[k, :], sigma=np.asarray(self.iter.sigma[:, :, k])) w_m = w_m.reshape((-1, 1)) # column vector, 1 value per sample ww_m += w_m * (w_m >= 0) # calculates each individual contribution for the obj funct (mixture assignment) if self.opt == 'hard': w = w_m * (self.iter.labels_ == (k + 1)).reshape((-1, 1)) w = w.reshape((-1, 1)) ww += w * (w >= 0) # calculates each individual contribution for the obj funct (hard assignment) elif self.opt == 'fuzzy': z_ij_flattened = self.iter.z_ij[:, k].reshape((-1, 1)) w = z_ij_flattened * np.log(w_m * (w_m >= 0) + 1 * (z_ij_flattened == 0)) ww += w # calculates each individual contribution for the obj funct (fuzzy assignment) assert ww.shape[-1] == ww_m.shape[-1] == 1 ww_m = ww_m * (ww_m >= 0) if self.opt == 'mixture': self.iter.obj = np.sum(np.log(ww_m[self.iter.labels_ > 0])) elif self.opt == 'hard': ww = ww * (ww >= 0) self.iter.obj = np.sum(np.log(ww[self.iter.labels_ > 0])) elif self.opt == 'fuzzy': self.iter.obj = np.sum(ww[self.iter.labels_ > 0]) return
[docs] def estimClustPar(self, X): """ Function to estimate model parameters :param X: array of shape=(nsamples, nfeatures) containing the data :return: N/A """ for k in range(self.k): # for each cluster if self.iter.csize[k] > self.zero_tol: # if cluster size is > 0 self.iter.center[k, :] = (self.iter.z_ij[:, k].T).dot(X) / self.iter.csize[k] X_c = X - self.iter.center[k, :] if not self.tk: # CHECKED OK self.iter.sigma[:, :, k] = \ np.matmul(np.multiply(X_c, self.iter.z_ij[:, k].reshape(-1, 1)).T, X_c) / self.iter.csize[k] else: self.iter.lambd[k] = \ np.mean(np.sum(np.matmul(self.iter.z_ij[:, k].T, X_c ** 2) / self.iter.csize[k], axis=0)) else: # this cluster's size is 0 if self.tk: self.iter.lambd[k] = 0 else: self.iter.sigma[:, :, k] = np.zeros((self.iter.sigma.shape[0], self.iter.sigma.shape[1]))
#################################################################################### ########### Functions for obtaining the assignment and trimming: ################### ########### - findClusterLabels: for mixture models and hard assignment ############ ########### - findFuzzyClusterLabels: for fuzzy assignment ######################### ####################################################################################
[docs] def findClusterLabels(self, X): """ Obtain the cluster assignment and trimming in the non-fuzzy case (i.e., mixture and hard assignments) :param X: array of shape=(nsamples, nfeatures) containing the data :return: N/A """ ll = self.get_ll(X) old_labels_ = self.iter.labels_.copy() self.iter.labels_ = np.argmax(ll, axis=1) + 1 # searching the cluster which fits each observation best pre_z_h = np.max(ll, axis=1) pre_z_m = np.sum(ll, axis=1) pre_z_ = np.tile(pre_z_m.reshape(-1, 1), (1, self.k)) assert pre_z_.shape[0] == X.shape[0] assert pre_z_.shape[1] == self.k # TODO: if we want to use this function to predict on new samples, we need to pass a new alpha to avoid/change # the trimming or an option to trim=False? # To obtain the trimming: tc_set is the non-trimming indicator if self.opt == 'mixture': tc_set = np.argsort(pre_z_m.argsort()) >= (np.floor(X.shape[0] * self.alpha)) elif self.opt == 'hard': tc_set = np.argsort(pre_z_h.argsort()) >= (np.floor(X.shape[0] * self.alpha)) # To obtain the self.iter.z_ij matrix (which contains the assignment and trimming): self.iter.labels_ = (np.argmax(ll, axis=1) + 1) * tc_set # hard assignnment including trimming (labels_ E [1,K]) self.iter.z_ij = ll / (pre_z_ + (pre_z_ == 0)) * tc_set.reshape((-1, 1)) # mixture assignment including trimming # Obtain the size of the clusters and the estimated weight of each population if self.opt == 'hard': self.iter.z_ij = 0 * self.iter.z_ij self.iter.z_ij[np.arange(X.shape[0]), 1 * (self.iter.labels_ == 0) + (self.iter.labels_ - 1)] = 1 self.iter.z_ij[~tc_set, :] = 0 # return 0 for trimmed samples self.iter.code = 2 * np.all(old_labels_ == self.iter.labels_) # setting the code, signaling whether the labels have changed --- it's the only stopping rule implemented self.iter.csize = np.asarray([self.iter.labels_.tolist().count(cl + 1) for cl in range(self.k)]) elif self.opt == 'mixture': self.iter.csize = np.sum(self.iter.z_ij, axis=0) if not self.equal_weights: self.iter.cw = self.iter.csize / np.sum(self.iter.csize) # obtain cluster weights
[docs] def findFuzzyClusterLabels(self, X): """ Obtain assignment and trimming in the fuzzy case :param X: array of shape=(nsamples, nfeatures) containing the data :return: N/A """ n = X.shape[0] ll = self.get_ll(X) # Obtain the cluster assignnment (self.iter.labels_) self.iter.labels_ = np.argmax(ll, axis=1) + 1 # searching the cluster which fits best for each observation ll_ = np.max(ll, axis=1) # Obtain the cluster assignment matrix (self.iter.z_ij) yy = np.nan * np.ones((n, self.k, self.k)) # Initialization ll_log = np.log(ll) for ii1 in range(self.k): for ii2 in range(self.k): yy[:, ii1, ii2] = (ll_log[:, ii1] / ll_log[:, ii2]) ** (1 / (self.m - 1)) yy_ = 1 / np.sum(yy, axis=2) # [nsamples, self.k] nm = self.k - np.sum(np.invert(np.isnan(yy_)), axis=1) where = (0 < nm) & (nm < self.k) if np.any(where): yy1_ = yy_[where, :].copy() yy1_[np.isnan(yy_[where, :])] = 0 yy_[where, :] = yy1_ yy_[nm == self.k, :] = 1 / self.k self.iter.z_ij = np.zeros((n, self.k)) # Initialization self.iter.z_ij[np.arange(n), self.iter.labels_ - 1] = 1 self.iter.z_ij[ll_ <= 1, :] = yy_[ll_ <= 1, :] ** self.m # Obtain the trimming: tc_set is the non trimming indicator pre_z = np.nansum(self.iter.z_ij * ll_log, axis=1) tc_set = np.argsort(pre_z.argsort()) >= np.floor(n * self.alpha) # Obtain the assignment iter$labels_ iter$z_ij including trimming self.iter.labels_ = (np.argmax(ll, axis=1) + 1) * tc_set self.iter.z_ij[~tc_set, :] = 0 # Obtain the size of the clusters and the estimated weight of each population self.iter.csize = np.sum(self.iter.z_ij, axis=0) if not self.equal_weights: self.iter.cw = self.iter.csize / np.sum(self.iter.csize)
################################################################ ########### Functions for random initialization ################ ################################################################
[docs] def getini(self): """ Calculates the initial cluster sizes :return: array, number of samples in each cluster """ if self.k == 1: return np.array(self.no_trim) pi_ini = np.random.uniform(low=0, high=1, size=self.k) # sample from random uniform distribution ni_ini = np.random.choice(self.k, self.no_trim, replace=True, p=pi_ini / np.sum(pi_ini)) + 1 return np.asarray([ni_ini.tolist().count(cl + 1) for cl in range(self.k)])
[docs] def init_clusters(self, X): """ Calculates the initial cluster assignment and initial values for the parameters :param X: 2D array of data [samples, dimensions] :return: N/A """ n, p = X.shape for k in range(self.k): # Select observations randomly for the current initialisation cluster idx = np.random.choice(range(n), size=p + 1, replace=False) X_ini = X[idx, :] self.iter.center[k] = np.mean(X_ini, axis=0) # calculate the center if self.tk: X_c = X_ini - self.iter.center[k, :] # .reshape(p + 1, p) self.iter.lambd[k] = np.mean(np.sum(np.ones(shape=(1, p + 1)).dot(X_c ** 2) / (p + 1), axis=0)) else: cc = p / (p + 1) * np.cov(X_ini.T) # calculating sigma (cc = current cov) self.iter.sigma[:, :, k] = cc if self.equal_weights: # if we're considering equal weights, cw is set here AND NEVER CHANGED self.iter.csize = np.asarray([self.no_trim / self.k] * self.k) else: self.iter.csize = self.getini() self.iter.cw = self.iter.csize / self.no_trim
[docs] def treatSingularity(self): """ To manage singular situations. :return: N/A """ if self.restr_deter or self.restr_cov_value == 'sigma': print("WARNING: Points in the dataset are concentrated in k subspaces after trimming") else: print("WARNING: Points in the dataset are concentrated in k points after trimming")
[docs] def get_ll(self, X): """ Extracted this function to avoid repetition in the code :param X: array; 2D array of data, of shape [nsamp, nfeat] :return: array of shape (nsamp, k); ll """ ll = np.nan * np.ones((X.shape[0], self.k)) if self.tk: for k in range(self.k): ll[:, k] = self.iter.cw[k] * dmnorm_tk(x=X, mu=self.iter.center[k, :], lambd=self.iter.lambd[k]) else: for k in range(self.k): ll[:, k] = self.iter.cw[k] * dmnorm(x=X, mu=self.iter.center[k, :], sigma=np.asarray(self.iter.sigma[:, :, k])) return ll
##################################################################################### ########### Functions to apply constraints to covariance matrices ################### #####################################################################################
[docs] def restr2_eigenv(self, autovalues, ni_ini, factor_e, zero_tol): """ Function for applying eigen constraints. These are the typical constraints. :param autovalues: matrix containing eigenvalues :param ni_ini: current sample size of the clusters :param factor_e: level of the constraints :param zero_tol: tolerance level :return: ? """ assert factor_e > 0 # Initialization d = np.copy(autovalues.T) p, k = autovalues.shape n = np.sum(ni_ini) if p > k: nis = np.tile(np.array(ni_ini).reshape((1, -1)), (k, p))[:k, :p] else: nis = np.tile(np.array(ni_ini).reshape((-1, 1)), (k, p))[:k, :p] # d_ is the ordered set of values in which the restriction objective function change the definition # points in d_ correspond to the frontiers for the intervals in which this objective function # has the same definition # ed is a set with the middle points of these intervals d_ = list(np.sort(np.concatenate((d.flatten(), d.flatten() / factor_e)))) ed = (np.asarray(d_ + [2 * d_[-1]]) + np.asarray([0] + d_)) / 2 dim = ed.shape[0] # The only relevant eigenvalues are those that belong to clusters with sample size > 0. # Eigenvalues corresponding to clusters with 0 individuals have no influence in the objective function. # If all the eigenvalues are 0 during the smart initialization, we assign to all the eigenvalues the value 1. if np.max(d[nis > 0]) <= zero_tol: return np.zeros((p, k)) # solution corresponds to matrix of 0s # Check if the eigenvalues verify the restrictions if np.min(d[nis > 0]) == 0: # avoiding runtime warning when dividing by 0 denom = 1e-16 else: denom = np.min(d[nis > 0]) if np.abs(np.max(d[nis > 0]) / denom) <= factor_e: d[nis == 0] = np.mean(d[nis > 0]) return d.T # the solution corresponds to the input because it verifies the constraints # we build the sol array, which contains the critical values of the interval functions which define the objective function # we use the centers of the interval to get a definition for the function in each interval # this set with the critical values (in the array sol) contains the optimum m value t = np.zeros((k, dim)) s = np.zeros((k, dim)) r = np.zeros((k, dim)) sol = np.zeros(dim) sal = np.zeros(dim) for mp_ in range(dim): r[:, mp_] = np.sum(d < ed[mp_], axis=1) + np.sum(d > (ed[mp_] * factor_e), axis=1) s[:, mp_] = np.sum(d * (d < ed[mp_]), axis=1) t[:, mp_] = np.sum(d * (d > (ed[mp_] * factor_e)), axis=1) sol[mp_] = np.sum(ni_ini / n * (s[:, mp_] + t[:, mp_] / factor_e)) / (np.sum(ni_ini / n * r[:, mp_])) # this can be NAN e = sol[mp_] * (d < sol[mp_]) + d * (d >= sol[mp_]) * (d <= factor_e * sol[mp_]) + (factor_e * sol[mp_]) * (d > factor_e * sol[mp_]) sal[mp_] = np.sum(-0.5 * nis / n * (np.log(e) + d / e)) # m is the optimum value for the eigenvalues procedure m = sol[np.nanargmax(sal)] # based on the m value we get the restricted eigenvalues temp = m * (d < m) + d * (d >= m) * (d <= factor_e * m) + (factor_e * m) * (d > factor_e * m) return temp.T # the return value
[docs] def restr2_deter_(self, autovalues, ni_ini, factor_d, factor_e, zero_tol=1e-16): """ Function for applying constraints to the determinants. Used when p>1 (multivariate case) -- in the univariate case the constraints can be obtained with restr2_eigenv() In order to avoid the instability in the current release of this function implemented in the CRAN, it is better to apply these constraints, at the desired level, joint to eigenvalue constraints at very low level (factor_e=1e10). In this way eigenvalues are not constrained in practice, but numerical issues are avoided. :param autovalues: matrix containing eigenvalues :param ni_ini: current sample size of the clusters :param factor_d: constraint level for the determinants :param factor_e: constraint level for the eigenvalues :param zero_tol: tolerance level :return: ? """ nrows, ncols = autovalues.shape autovalues[autovalues < 1e-16] = 0 # TODO: it's like this in the R code, but should probably be passed as a variable instead autovalues_ = np.copy(autovalues) for k_ in range(autovalues.shape[1]): autovalues_[:, k_] = self.restr2_eigenv(autovalues=autovalues[:, k_].reshape(-1, 1), ni_ini=1, factor_e=factor_e, zero_tol=zero_tol).reshape(-1) es = np.prod(autovalues_, axis=0) es[es == 0] = 1 temp = np.tile((es ** (1 / nrows)).reshape(1, -1), (nrows, 1)) assert temp.shape[0] == nrows assert temp.shape[1] == ncols gm = autovalues_ / temp d_ = np.sum(autovalues / gm, axis=0).reshape(1, -1) / nrows d_[np.isnan(d_)] = 0 dfin = self.restr2_eigenv(autovalues=d_, ni_ini=ni_ini, factor_e=factor_d ** (1 / nrows), zero_tol=zero_tol) d__ = np.tile(dfin.reshape(1, -1), (nrows, 1)) * (gm * (gm > 0) + 1 * (gm == 0)) return d__
[docs] def restr_diffax(self, p): """ Function which manages the application of constraints (deter, eigen) :param p: int, number of features of the data :return: N/A """ u = np.nan * np.ones((p, p, self.k)) d = np.nan * np.ones((p, self.k)) if not self.tk: for k in range(self.k): d[:, k], u[:, :, k] = np.linalg.eig(self.iter.sigma[:, :, k]) else: d = np.tile(self.iter.lambd, (d.shape[0], 1)) d[d < 0] = 0 # all eigenvalues < 0 are assigned to 0, this happens sometimes due to numerical errors if self.restr_deter and p > 1: d = self.restr2_deter_(autovalues=d, ni_ini=self.iter.csize, factor_d=self.maxfact_d, factor_e=1 if self.tk else self.maxfact_e, zero_tol=self.zero_tol) else: d = self.restr2_eigenv(autovalues=d, ni_ini=self.iter.csize, factor_e=self.maxfact_e, zero_tol=self.zero_tol) # Checking for singularity in all clusters: self.iter.code = np.max(d) > self.zero_tol if not self.iter.code: return if not self.tk: for k in range(self.k): # recomposing the sigmas self.iter.sigma[:, :, k] = np.dot(np.dot(u[:, :, k], np.diag(d[:, k])), u[:, :, k].T) else: self.iter.lambd = d[0, :].copy()
[docs] def restr_avgcov(self, p): """ Restricts the clusters' covariance matrices to be equal. Simple function to get the pooled within group covariance matrix. :param p: int, number of dimensions of the data :return: N/A """ s_all = np.zeros((p, p)) for k in range(self.k): s_all += self.iter.sigma[:, :, k] * self.iter.csize[k] / np.sum(self.iter.csize) for k in range(self.k): self.iter.sigma[:, :, k] = s_all.copy() self.iter.code = int(np.sum(np.diag(s_all)) > self.zero_tol)
def _check_params(self, n, p): """ Checks the parameters for the execution and completes missing variables. :param n: int, number of observations :param p: int, number of dimensions :return: N/A """ self.iter = Iteration().fill(nobs=n, ndim=p, k=self.k) if self.restr_cov_value == 'sigma': self.f_restr = self.restr_avgcov self.restr_deter = False if p == 1: self.f_restr = self.restr_diffax self.restr_deter = False else: if self.restr_cov_value == 'eigen': self.f_restr = self.restr_diffax self.restr_deter = False elif self.restr_cov_value == 'deter': self.f_restr = self.restr_diffax self.restr_deter = True self.no_trim = int(np.floor(n * (1 - self.alpha))) # number of observations which are considered not outliers
##################################################### ######## Shortcut for trimmed k-means ############### #####################################################
[docs]def tkmeans(X, k, alpha=0.05, niter=20, ksteps=10, equal_weights=False, maxfact_d=5, m=2., zero_tol=1e-16, opt="hard", sol_ini=None, verbose=False): """ Convenient function to run trimmed k-means on the data. Parameters ---------- X : array Data to be clustered. shape=(n_observations, n_dimensions) k : int The number of clusters alpha : float, default=0.05 The proportion of observations to be trimmed niter : int, default=20 The number of random intializations to be performed ksteps : int, default=10 The maximum number of concentration steps to be performed equal_weights : bool, default=False Specifying whether cluster weights are equal. maxfact_d : float, default=5 Level of determinant constraints. m : float, default=2. Fuzzy power parameter. zero_tol : float, default=1e-16 The zero tolerance used. opt : string, default='hard' Type of assignment. Accepted values are {'hard', 'mixture', 'fuzzy'} sol_ini : object of class Iteration, default=None Initial solution provided by the user. None is used for random initializations. verbose : bool, default=True Whether to print the progress of the objective function throughout the iterations. Returns ------- Fitted TClust estimator. """ if X.shape[1] > 1: return TClust(k=k, alpha=alpha, n_inits=niter, ksteps=ksteps, equal_weights=equal_weights, verbose=verbose, restr_cov_value='deter', maxfact_e=1, maxfact_d=maxfact_d, m=m, zero_tol=zero_tol, opt=opt, sol_ini=sol_ini, tk=True).fit(X) elif X.shape[1] == 1: return TClust(k=k, alpha=alpha, n_inits=niter, ksteps=ksteps, equal_weights=equal_weights, verbose=verbose, restr_cov_value='deter', maxfact_e=1, maxfact_d=maxfact_d, m=m, zero_tol=zero_tol, opt=opt, sol_ini=sol_ini, tk=False).fit(X)
###################################################### ########### Miscelaneous functions ################### ######################################################
[docs]@jit(nopython=True) def dmnorm(x, mu, sigma): """ Multivariate normal density :param x: array of shape=(nsamples, nfeatures) containing the data :param mu: center of the cluster [features, ] :param sigma: :return: ? """ centered_array = x - mu inv_cov = np.linalg.inv(sigma) mahal_dist = [] for i in range(centered_array.shape[0]): mahal_dist.append(np.dot(np.dot(centered_array[i, :], inv_cov), centered_array[i, :])) assert len(mahal_dist) == x.shape[0] result = ((2 * np.pi) ** (-0.5 * len(mu))) * (np.linalg.det(sigma) ** (-1 / 2)) * np.exp(-0.5 * np.asarray(mahal_dist)) return result
[docs]@jit(nopython=True) def dmnorm_tk(x, mu, lambd): """ Multivariate normal density sigma=lambd*ID :param x: array of shape=(nsamples, nfeatures) containing the data :param mu: center of the cluster [features, ] :param lambd: one number - diagonal value for tkmeans (whatever that means) :return: ? """ a = ((2 * np.pi) ** (-0.5 * len(mu))) * (lambd ** (-len(mu) / 2)) b = (np.multiply(np.ones((x.shape[0], 1)), mu) - x) ** 2 c = np.exp(-(0.5 / lambd) * np.sum(b, axis=1)) return a * c