Source code for cde.gmix

# Copyright 2020 Reid Swanson
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
This module provides methods for a mixture of
Gaussian distributions where the weights, means, and standard deviations are
known. The accompanying module :mod:`cde.kmn` provides helper functions for
learning these parameters using a neural network.
"""
# Python Modules
import logging
import math
import operator as op

from functools import partial, reduce
from typing import List, Optional, Tuple, Union

# 3rd Party Modules
import numpy as np
import pandas as pd
import plotnine as pn

from scipy.optimize import brentq
from scipy.stats import describe, norm

# Project Modules

log = logging.getLogger(__name__)


# region Constants
SQRT_2PI = math.sqrt(2.0 * math.pi)
# endregion Constants


# region Distribution Methods
[docs]def pdf(y: float, weights: np.ndarray, means: np.ndarray, sigmas: np.ndarray): """ The probability density function of the Gaussian mixture at a single point `y`. :param y: The value where the density will be estimated. :type y: float :param weights: The weights for each Gaussian in the mixture. :type weights: np.ndarray :param means: The mean values for each Gaussian in the mixture. :type means: np.ndarray :param sigmas: The sigma values for each Gaussian. :type sigmas: np.ndarray :return: The estimated density for each set of parameters. :rtype: np.ndarray """ axis = 0 if np.ndim(weights) == 1 else 1 return np.sum(weights * norm.pdf(y, loc=means, scale=sigmas), axis=axis)
[docs]def cdf(y: float, weights: np.ndarray, means: np.ndarray, sigmas: np.ndarray): """ The estimated cumulative density at the given target value :code:`y`. :param y: The target value where the cumulative density will be estimated. :type y: float :param weights: The weights for each Gaussian in the mixture. :type weights: np.ndarray :param means: The mean values for each Gaussian in the mixture. :type means: np.ndarray :param sigmas: The sigma values for each Gaussian. :type sigmas: np.ndarray :return: The cumulative density estimation for each set of predicted parameters. :rtype: np.ndarray """ axis = 0 if np.ndim(weights) == 1 else 1 return np.sum(weights * norm.cdf(y, loc=means, scale=sigmas), axis=axis)
[docs]def ppf( p: float, weights: np.ndarray, means: np.ndarray, sigmas: np.ndarray, search_lower_bound: float = -1000, search_upper_bound: float = 1000 ): """ The `percent point function <https://en.wikipedia.org/wiki/Quantile_function>`_. There is no analytic solution to finding the `ppf` so a heuristic search is performed following the recipe in this `Stack Exchange solution <https://stats.stackexchange.com/a/14484>`_. :param p: The desired probability. :type p: float (between 0 and 1) :param weights: The weights for each Gaussian in the mixture. :type weights: np.ndarray :param means: The mean values for each Gaussian in the mixture. :type means: np.ndarray :param sigmas: The sigma values for each Gaussian. :type sigmas: np.ndarray :param search_lower_bound: The lower bound for the search. :type search_lower_bound: numeric :param search_upper_bound: The upper bound for the search. :type search_upper_bound: :return: The value such that its probability is less than or equal to :code:`p`. :rtype: float """ # The optimization function def f(x, w, mu, sigma): return cdf(x, w.reshape(1, -1), mu.reshape(1, -1), sigma.reshape(1, -1)) - p weights = _make_2d(weights) means = _make_2d(means) sigmas = _make_2d(sigmas) weights_shape = np.shape(weights) means_shape = np.shape(means) sigmas_shape = np.shape(sigmas) if not (weights_shape == means_shape == sigmas_shape): log.error( f"The shapes of the distribution parameters must all be the same. " f"weights: {weights_shape}, means: {means_shape}, sigmas: {sigmas_shape}" ) raise ValueError("The shapes of the distribution parameters must all be the same.") roots = [ brentq(f, search_lower_bound, search_upper_bound, args=(weights[i], means[i], sigmas[i])) for i in range(weights.shape[0]) ] return np.array(roots)
[docs]def quantile( p: float, weights: np.ndarray, means: np.ndarray, sigmas: np.ndarray, search_lower_bound: float = -1000, search_upper_bound: float = 1000 ): """ This is just an alias of :meth:`cde.gmix.ppf`. :param p: :type p: :param weights: :type weights: :param means: :type means: :param sigmas: :type sigmas: :param search_lower_bound: :type search_lower_bound: :param search_upper_bound: :type search_upper_bound: :return: :rtype: """ return ppf(p, weights, means, sigmas, search_lower_bound, search_upper_bound)
[docs]def mean(weights: np.ndarray, means: np.ndarray): """ Return the mean value of the density estimation using the given parameters. :param weights: The weights for each Gaussian in the mixture. :type weights: np.ndarray :param means: The mean values for each Gaussian in the mixture. :type means: np.ndarray :return: The mean values for the given parameters. :rtype: np.ndarray """ return np.sum(weights * means, axis=1)
[docs]def median( weights: np.ndarray, means: np.ndarray, sigmas: np.ndarray, search_lower_bound: float = -1000, search_upper_bound: float = 1000 ): """ Return the median value of the density estimation using the given parameters. :param weights: The weights for each Gaussian in the mixture. :type weights: np.ndarray :param means: The mean values for each Gaussian in the mixture. :type means: np.ndarray :param sigmas: The sigma values for each Gaussian. :type sigmas: np.ndarray :return: The mean values for the given parameters. :param search_lower_bound: :type search_lower_bound: :param search_upper_bound: :type search_upper_bound: :rtype: np.ndarray """ return ppf(0.5, weights, means, sigmas, search_lower_bound, search_upper_bound)
[docs]def mode( weights: np.ndarray, means: np.ndarray, sigmas: np.ndarray, method: str = 'cnewton', threshold: float = 0.25, **kwargs ): if method == 'line': modes = _mode_line_search(weights, means, sigmas, **kwargs) elif method == 'newton': modes = _mode_newtons_method(weights, means, sigmas) elif method == 'cnewton': modes = _mode_cnewtons_method(weights, means, sigmas) else: log.error("Unknown mode search type. Use one of: (line, newton)") raise ValueError("Unknown search type.") result = [] for i in range(len(modes)): dens = np.array([m[1] for m in modes[i]]) ss = describe(dens) # noinspection PyUnresolvedReferences modes_i = [m for m in modes[i] if m[1] > threshold * ss.minmax[1]] result.append(modes_i) return result
[docs]def random( n: int, weights: np.ndarray, means: np.ndarray, sigmas: np.ndarray, rng: np.random.RandomState = None, method: str = 'inverse_transform' ): if method == 'inverse_transform': return _random_inverse_transform(n, weights, means, sigmas, rng) raise NotImplementedError()
# endregion Distribution Methods # region Plotting Methods # noinspection PyTypeChecker
[docs]def plot( weights: np.ndarray, means: np.ndarray, sigmas: np.ndarray, row_names: Union[list, np.ndarray, None] = None, x_min: Optional[float] = None, x_max: Optional[float] = None, x_ticks: Optional[List[float]] = None, n_points: int = 1000, show_mixture: bool = False, show_mean: bool = False, show_median: bool = False, show_mode: bool = False, percent_interval: Optional[float] = 0.9, ncol: int = None, figure_size: Tuple[float, float] = None ): # if not (np.ndim(weights) == np.ndim(means) == np.ndim(sigmas) == 1): # log.error("You can only plot one distribution at a time. (ndim != 1)") # raise ValueError("You can only plot one distribution at a time.") # Make sure the parameters are the right shape weights = _make_2d(weights) means = _make_2d(means) sigmas = _make_2d(sigmas) if row_names is None: row_names = np.arange(weights.shape[0]) else: row_names = np.array(row_names) # row_names = np.array(row_names) or np.arange(weights.shape[0]) row_names = ( np.char.mod('%03d', row_names) if all([isinstance(n, (int, np.int, np.int16, np.int32, np.int64)) for n in row_names]) else row_names ) # A simple utility method for making a data frame for each parameter array # of the correct shape. def make_df(a: np.ndarray, value_name: str): t = pd.DataFrame(a.transpose()) t.columns = row_names t['center_no'] = np.arange(a.shape[1]) return t.melt('center_no', var_name='group', value_name=value_name) def make_point_df(x_values): y_values = [ pdf(xv, weights[i], means[i], sigmas[i]) for i, xv in enumerate(np.nditer(x_values)) ] return pd.DataFrame({'x': x_values, 'y': y_values, 'group': row_names}) # Merge each individual dataframe into a single one. dfs = (make_df(weights, 'weight'), make_df(means, 'mu'), make_df(sigmas, 'sigma')) param_df = reduce(lambda a, b: pd.merge(a, b, how='left', on=['center_no', 'group']), dfs) # Get reasonable limits on the x-axis if they aren't specified as parameters min_mu, max_mu = np.min(means), np.max(means) max_sigma = np.max(sigmas) x_min = min_mu - 2 * max_sigma if x_min is None else x_min x_max = max_mu + 2 * max_sigma if x_max is None else x_max x = np.linspace(x_min, x_max, num=n_points) # Compute the weighted Normal pdf values for each row in the param_df and # store them in a new data frame. normal_df = pd.DataFrame( np.repeat(param_df.values, len(x), axis=0), columns=param_df.columns, index=np.tile(x, len(param_df)) ).rename_axis('x').reset_index() # Calculate the pdf values for each data point normal_df['y'] = normal_df.apply( lambda r: r['weight'] * norm.pdf(r['x'], r['mu'], r['sigma']), axis=1 ) # Convert the center_no and group columns to categorical data types cat_columns = ['center_no', 'group'] normal_df[cat_columns] = normal_df[cat_columns].apply(lambda t: t.astype('category')) # Reorder the columns to put y next to x (for human readability) normal_df = normal_df[['group', 'center_no', 'weight', 'mu', 'sigma', 'x', 'y']] gmix_df = normal_df[['group', 'x', 'y']].groupby( ['group', 'x'] ).agg({ 'y': 'sum' }).reset_index() # Add some end points so that the area under the curve gets filled in # completely along the x-axis by the geom_polygon. end_point_data = [] for grp in row_names: end_point_data.append({'group': grp, 'x': x_min - 0.1, 'y': 0}) end_point_data.append({'group': grp, 'x': x_max + 0.1, 'y': 0}) end_point_df = pd.DataFrame(end_point_data) gmix_df = gmix_df.append(end_point_df) gmix_df.sort_values(['group', 'x'], inplace=True) p = pn.ggplot() p += pn.geom_polygon( data=gmix_df, mapping=pn.aes('x', 'y', fill='group'), alpha=0.25, color='black', show_legend=len(row_names) > 1 ) if show_mean: mean_df = make_point_df(mean(weights, means)) segment_aes = pn.aes(x='x', xend='x', y=0, yend='y', color='group') text_aes = pn.aes(x='x', y=0, label='x') text_kw = {'format_string': 'μ: {:0.3f}', 'ha': 'left', 'va': 'top'} p += pn.geom_segment(mean_df, segment_aes, size=1.25, show_legend=len(row_names) > 1) p += pn.geom_text(mean_df, text_aes, **text_kw) if show_median or percent_interval: median_df = make_point_df(median(weights, means, sigmas)) segment_aes = pn.aes(x='x', xend='x', y=0, yend='y', color='group') text_aes = pn.aes(x='x', y='y', label='x') text_kw = {'format_string': 'x̃: {:0.3f}', 'ha': 'left', 'va': 'bottom'} p += pn.geom_segment( median_df, segment_aes, linetype='dashed', size=1.25, show_legend=len(row_names) > 1 ) p += pn.geom_text(median_df, text_aes, **text_kw) if show_mode: modes = [ mode(weights[i], means[i], sigmas[i], method='cnewton') for i in range(weights.shape[0]) ] d = [ dict(group=row_names[g_idx], x=x, y=y) for g_idx, m in enumerate(modes) for m_list in m for x, y in m_list ] mode_df = pd.DataFrame(d) segment_aes = pn.aes(x='x', xend='x', y=0, yend='y', color='group') text_aes = pn.aes(x='x', y='y/2', label='x', angle=-90) text_kw = {'format_string': 'x̂: {:0.3f}', 'ha': 'left', 'va': 'center', 'nudge_y': -0.005} p += pn.geom_segment( mode_df, segment_aes, linetype='dotted', size=1.25, show_legend=len(row_names) > 1 ) p += pn.geom_text(mode_df, text_aes, **text_kw) if percent_interval: if not 0 < percent_interval < 1: log.error(f"The percent interval must be between 0 and 100: {percent_interval}") raise ValueError("The percent interval must be between 0 and 100") def make_limit_df(bound_df, is_lower: bool): fn = op.le if is_lower else op.ge start = -1 if is_lower else 0 stop = None if is_lower else 1 # Select all the rows from the gmix_df that are outside the PI # for each group df_list = [ gmix_df.loc[(fn(gmix_df.x, row.x)) & (gmix_df.group == row.group), ] for _, row in bound_df.iterrows() ] tmp_df = pd.concat(df_list) # We need to drop the polygon to zero at the interval boundary, # so create a new row that has the same x values of the last point # of the lower bound and the first point of the upper bound, but # set the y value to 0. last_point = tmp_df.groupby('group').apply( lambda g: pd.DataFrame(g.iloc[start:stop, ].values, columns=g.columns) ) last_point['y'] = 0 tmp_df = tmp_df.append(last_point) tmp_df.reset_index(drop=True, inplace=True) # Make sure the columns are of the correct type (for some reason they # get reset to 'object' above. tmp_df['group'] = tmp_df.group.astype('category') tmp_df[['x', 'y']] = tmp_df[['x', 'y']].apply(lambda c: c.astype('float')) return tmp_df h = (1 - percent_interval) / 2 lower_bound = h upper_bound = percent_interval + h lower_bound_df = make_point_df(ppf(lower_bound, weights, means, sigmas)) upper_bound_df = make_point_df(ppf(upper_bound, weights, means, sigmas)) lower_df = make_limit_df(lower_bound_df, True) upper_df = make_limit_df(upper_bound_df, False) poly_kw = {'alpha': 0.6, 'show_legend': len(row_names) > 1} p += pn.geom_polygon(lower_df, pn.aes('x', 'y', fill='group'), **poly_kw) p += pn.geom_polygon(upper_df, pn.aes('x', 'y', fill='group'), **poly_kw) if show_mixture: p += pn.geom_path( data=normal_df, mapping=pn.aes( 'x', 'y', color='group', ), alpha=0.5, show_legend=len(row_names) > 1 ) if ncol and len(row_names) > 1: p += pn.facet_wrap('~ group', ncol=ncol) if weights.shape[0] == 1: if percent_interval: title = f"{row_names[0]} With {percent_interval:0.3f} PI" else: title = f"{row_names[0]}" p += pn.scale_fill_grey() p += pn.scale_color_grey() else: if percent_interval: title = f"Density Plot With {percent_interval:0.3f} PI" else: title = "Density Plot" palette = 'Dark2' p += pn.scale_fill_brewer(palette=palette, type='qualitative') p += pn.scale_color_brewer(palette=palette, type='qualitative') p + pn.scale_x_continuous(breaks=x_ticks) if x_ticks else p # p += pn.scale_y_continuous(limits=(-0.015, None)) p += pn.ylab('Density') p += pn.ggtitle(title) p += pn.theme_bw() if figure_size: p += pn.theme(figure_size=figure_size) return p
# endregion Plotting Methods # region Helper Methods
[docs]def make_centers( data: np.ndarray, n_centers: int, extend_lower: float = 0, extend_upper: float = 0, method: str = 'jenks' ): r""" Return an array of kernel centers given some target :code:`data`. There are currently two supported methods for creating the array. The first method uses the `Jenks natural breaks <https://en.wikipedia.org/wiki/Jenks_natural_breaks_optimization>`_ algorithm and the second simply divides the space up uniformly. If you suspect your training data will not adequately cover the range of values in the target distribution you can optionally extend the lower and upper bounds by a given proportion :math:`p` (specified as a floating point number between 0 and 1). The lower range will be extended by: :math:`lower\_bound = min(data) - p \cdot (max(data) - min(data))` While the upper range will be extended by: :math:`upper\_bound = max(data) + p \cdot (max(data) - min(data))` The *uniform* method splits the range (:math:`min - max`) using :func:`np.linspace` and then returns the mid points between the breaks. The *jenks* method simply uses the split points returned by the algorithm as the kernel centers, including the extended lower and upper bounds. It may be more precise to use the mid points between the breaks, but in practice it should not make much difference for many applications. If desired, this can easily be accomplished on the returned results. :param data: A 1-dimensional array of data used to identify the kernel centers. :param n_centers: The number of centers to return. :param extend_lower: The proportion to extend the lower bound by. :param extend_upper: THe proportion to extend the upper bound by. :param method: The method used to find kernel centers from the given data. The choices are *jenks* or *uniform*. :return: An array representing the center point of each kernel. """ log.debug(f"Making kernel centers using {method} method") if method == 'jenks': return _make_jenks_centers(data, n_centers, extend_lower, extend_upper) elif method == 'uniform': return _make_uniform_centers(data, n_centers, extend_lower, extend_upper) else: raise ValueError(f"Unknown method kernel centering method: {method}")
# endregion Helper Methods # region Utility Methods # region Common def _make_2d(a: np.ndarray): return a[:, np.newaxis].transpose() if np.ndim(a) == 1 else a # endregion Common # region Make Centers def _make_jenks_centers(data: np.ndarray, n_centers: int, extend_left: float, extend_right: float): from jenkspy import jenks_breaks data = _extend_data(data, extend_left, extend_right) breaks = jenks_breaks(data, nb_class=n_centers-1) return np.array(breaks) def _make_uniform_centers( data: np.ndarray, n_centers: int, extend_left: float, extend_right: float ): data = _extend_data(data, extend_left, extend_right) y_min, y_max = min(data), max(data) breaks = np.linspace(y_min, y_max, num=n_centers) return breaks def _extend_data(data: np.ndarray, extend_left: float, extend_right: float): if not 0 <= extend_left <= 1: raise ValueError(f"extend_left must be >= 0 and <= 1 ({extend_left})") if not 0 <= extend_right <= 1: raise ValueError(f"extend_right must be >= 0 and <= 1 ({extend_right})") if extend_left > 0 or extend_right > 0: min_val, max_val = np.min(data), np.max(data) span = max_val - min_val return np.append(data, [min_val - span * extend_left, max_val + span * extend_right]) return data # endregion Make Centers # region Random def _random_inverse_transform( n: int, weights: np.ndarray, means: np.ndarray, sigmas: np.ndarray, rng: np.random.RandomState ): # noinspection PyArgumentList uniform_rand = rng.rand(n) if rng else np.random.rand(n) return np.array([ppf(u, weights, means, sigmas) for u in uniform_rand]).transpose() # endregion Random # region Mode def _gmix_derivative_1(x: float, weights: np.ndarray, means: np.ndarray, sigmas: np.ndarray): # Note the values learned by the model and stored in `sigmas` may actually be sigma^2 return np.sum( weights * -(x - means) * norm.pdf(x, means, sigmas) / sigmas**2 ) def _gmix_derivative_2(x: float, weights: np.ndarray, means: np.ndarray, sigmas: np.ndarray): fx = norm.pdf(x, means, sigmas) return np.sum( weights * ( (-fx / (sigmas**2)) + ((x - means)**2 * fx / sigmas**4) ) ) def _mode_newtons_method( weights: np.ndarray, means: np.ndarray, sigmas: np.ndarray, max_itr: int = 1000, min_diff: float = 1e-4, min_grad: float = 1e-9 ): # This paper describes a more complicated method for finding the modes # that involves both Newton's method and gradient ascent. It combines the # methods, because Newton's method only works when the Hessian is # negative definite. However, in the univariate case it doesn't seem # to be a problem, although I could easily be wrong about this. # http://faculty.ucmerced.edu/mcarreira-perpinan/papers/cs-99-03.pdf def local_modes(loc_weights, loc_means, loc_sigmas): kwargs = {'weights': loc_weights, 'means': loc_means, 'sigmas': loc_sigmas} pdf_x = partial(pdf, **kwargs) deriv_1 = partial(_gmix_derivative_1, **kwargs) deriv_2 = partial(_gmix_derivative_2, **kwargs) loc_modes = [] for m in range(loc_weights.shape[0]): itr = 0 x_new = loc_means[m] while True: g1 = deriv_1(x_new) g2 = deriv_2(x_new) if np.isclose(g2, 0): break x_old = x_new x_new = x_old - (g1 / g2) if itr >= max_itr or math.fabs(g1) < min_grad: break itr += 1 # Only consider maxima if deriv_2(x_new) < 0: # Don't add the new value if we already found it from another # start point. close = [c for c in loc_modes if math.fabs(c - x_new) < min_diff] if len(close) == 0: loc_modes.append(x_new) return [(x, pdf_x(x)) for x in loc_modes] modes = [] weights = _make_2d(weights) means = _make_2d(means) sigmas = _make_2d(sigmas) n_groups = weights.shape[0] for g in range(n_groups): g_weights, g_means, g_sigmas = weights[g], means[g], sigmas[g] g_modes = local_modes(g_weights, g_means, g_sigmas) modes.append(g_modes) return modes def _mode_cnewtons_method( weights: np.ndarray, means: np.ndarray, sigmas: np.ndarray, max_itr: int = 1000, min_diff: float = 1e-4, min_grad: float = 1e-9 ): import cgmix weights = _make_2d(weights) means = _make_2d(means) sigmas = _make_2d(sigmas) n_groups = weights.shape[0] modes = [] for g in range(n_groups): g_weights, g_means, g_sigmas = weights[g].tolist(), means[g].tolist(), sigmas[g].tolist() result = cgmix.find_modes_newton(g_weights, g_means, g_sigmas, max_itr, min_diff, min_grad) result = [(t1, t2) for t1, t2 in result] modes.append(result) return modes def _mode_line_search( weights: np.ndarray, means: np.ndarray, sigmas: np.ndarray, precision: float = 0.005 ): log.debug(f"Using line search with precision: {precision}") weights = _make_2d(weights) means = _make_2d(means) sigmas = _make_2d(sigmas) def local_modes(lweights, lmeans, lsigmas): lweights = _make_2d(lweights) lmeans = _make_2d(lmeans) lsigmas = _make_2d(lsigmas) pdf_x = partial(pdf, weights=lweights, means=lmeans, sigmas=lsigmas) min_val = np.min(lmeans, axis=1) max_val = np.max(lmeans, axis=1) span = max_val - min_val min_val, max_val = min_val - span * 0.1, max_val + span * 0.1 x = np.arange(min_val, max_val, precision) values = np.array([pdf_x(x_val) for x_val in x]) diff = np.diff(values, axis=0) sign = np.sign(diff) rle = [_mode_rle(sign[:, i])[0] for i in range(sign.shape[1])] # noinspection PyTypeChecker indexes = [np.cumsum(a) for a in rle] lmodes = [ [ (x[item[j]], values[item[j]][i]) for j in range(0 if sign[i][0] > 0 else 1, len(item), 2) ] for i, item in enumerate(indexes) ] return lmodes[0] return [local_modes(weights[g], means[g], sigmas[g]) for g in range(weights.shape[0])] def _mode_rle(a: np.ndarray): ia = np.asarray(a) # force numpy n = len(ia) if n == 0: return None, None, None else: y = np.array(ia[1:] != ia[:-1]) # pairwise unequal (string safe) i = np.append(np.where(y), n - 1) # must include last element position z = np.diff(np.append(-1, i)) # run lengths p = np.cumsum(np.append(0, z))[:-1] # positions return z, p, ia[i] # endregion Mode # endregion Utility Methods