# Copyright 2020 Reid Swanson
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
This module provides methods for a mixture of
Gaussian distributions where the weights, means, and standard deviations are
known. The accompanying module :mod:`cde.kmn` provides helper functions for
learning these parameters using a neural network.
"""
# Python Modules
import logging
import math
import operator as op
from functools import partial, reduce
from typing import List, Optional, Tuple, Union
# 3rd Party Modules
import numpy as np
import pandas as pd
import plotnine as pn
from scipy.optimize import brentq
from scipy.stats import describe, norm
# Project Modules
log = logging.getLogger(__name__)
# region Constants
SQRT_2PI = math.sqrt(2.0 * math.pi)
# endregion Constants
# region Distribution Methods
[docs]def pdf(y: float, weights: np.ndarray, means: np.ndarray, sigmas: np.ndarray):
"""
The probability density function of the Gaussian mixture at a single point
`y`.
:param y: The value where the density will be estimated.
:type y: float
:param weights: The weights for each Gaussian in the mixture.
:type weights: np.ndarray
:param means: The mean values for each Gaussian in the mixture.
:type means: np.ndarray
:param sigmas: The sigma values for each Gaussian.
:type sigmas: np.ndarray
:return: The estimated density for each set of parameters.
:rtype: np.ndarray
"""
axis = 0 if np.ndim(weights) == 1 else 1
return np.sum(weights * norm.pdf(y, loc=means, scale=sigmas), axis=axis)
[docs]def cdf(y: float, weights: np.ndarray, means: np.ndarray, sigmas: np.ndarray):
"""
The estimated cumulative density at the given target value :code:`y`.
:param y: The target value where the cumulative density will be
estimated.
:type y: float
:param weights: The weights for each Gaussian in the mixture.
:type weights: np.ndarray
:param means: The mean values for each Gaussian in the mixture.
:type means: np.ndarray
:param sigmas: The sigma values for each Gaussian.
:type sigmas: np.ndarray
:return: The cumulative density estimation for each set of predicted
parameters.
:rtype: np.ndarray
"""
axis = 0 if np.ndim(weights) == 1 else 1
return np.sum(weights * norm.cdf(y, loc=means, scale=sigmas), axis=axis)
[docs]def ppf(
p: float,
weights: np.ndarray,
means: np.ndarray,
sigmas: np.ndarray,
search_lower_bound: float = -1000,
search_upper_bound: float = 1000
):
"""
The `percent point function <https://en.wikipedia.org/wiki/Quantile_function>`_.
There is no analytic solution to finding the `ppf` so a heuristic search
is performed following the recipe in this
`Stack Exchange solution <https://stats.stackexchange.com/a/14484>`_.
:param p: The desired probability.
:type p: float (between 0 and 1)
:param weights: The weights for each Gaussian in the mixture.
:type weights: np.ndarray
:param means: The mean values for each Gaussian in the mixture.
:type means: np.ndarray
:param sigmas: The sigma values for each Gaussian.
:type sigmas: np.ndarray
:param search_lower_bound: The lower bound for the search.
:type search_lower_bound: numeric
:param search_upper_bound: The upper bound for the search.
:type search_upper_bound:
:return: The value such that its probability is less than or equal to
:code:`p`.
:rtype: float
"""
# The optimization function
def f(x, w, mu, sigma):
return cdf(x, w.reshape(1, -1), mu.reshape(1, -1), sigma.reshape(1, -1)) - p
weights = _make_2d(weights)
means = _make_2d(means)
sigmas = _make_2d(sigmas)
weights_shape = np.shape(weights)
means_shape = np.shape(means)
sigmas_shape = np.shape(sigmas)
if not (weights_shape == means_shape == sigmas_shape):
log.error(
f"The shapes of the distribution parameters must all be the same. "
f"weights: {weights_shape}, means: {means_shape}, sigmas: {sigmas_shape}"
)
raise ValueError("The shapes of the distribution parameters must all be the same.")
roots = [
brentq(f, search_lower_bound, search_upper_bound, args=(weights[i], means[i], sigmas[i]))
for i in range(weights.shape[0])
]
return np.array(roots)
[docs]def quantile(
p: float,
weights: np.ndarray,
means: np.ndarray,
sigmas: np.ndarray,
search_lower_bound: float = -1000,
search_upper_bound: float = 1000
):
"""
This is just an alias of :meth:`cde.gmix.ppf`.
:param p:
:type p:
:param weights:
:type weights:
:param means:
:type means:
:param sigmas:
:type sigmas:
:param search_lower_bound:
:type search_lower_bound:
:param search_upper_bound:
:type search_upper_bound:
:return:
:rtype:
"""
return ppf(p, weights, means, sigmas, search_lower_bound, search_upper_bound)
[docs]def mean(weights: np.ndarray, means: np.ndarray):
"""
Return the mean value of the density estimation using the given parameters.
:param weights: The weights for each Gaussian in the mixture.
:type weights: np.ndarray
:param means: The mean values for each Gaussian in the mixture.
:type means: np.ndarray
:return: The mean values for the given parameters.
:rtype: np.ndarray
"""
return np.sum(weights * means, axis=1)
[docs]def mode(
weights: np.ndarray,
means: np.ndarray,
sigmas: np.ndarray,
method: str = 'cnewton',
threshold: float = 0.25,
**kwargs
):
if method == 'line':
modes = _mode_line_search(weights, means, sigmas, **kwargs)
elif method == 'newton':
modes = _mode_newtons_method(weights, means, sigmas)
elif method == 'cnewton':
modes = _mode_cnewtons_method(weights, means, sigmas)
else:
log.error("Unknown mode search type. Use one of: (line, newton)")
raise ValueError("Unknown search type.")
result = []
for i in range(len(modes)):
dens = np.array([m[1] for m in modes[i]])
ss = describe(dens)
# noinspection PyUnresolvedReferences
modes_i = [m for m in modes[i] if m[1] > threshold * ss.minmax[1]]
result.append(modes_i)
return result
[docs]def random(
n: int,
weights: np.ndarray,
means: np.ndarray,
sigmas: np.ndarray,
rng: np.random.RandomState = None,
method: str = 'inverse_transform'
):
if method == 'inverse_transform':
return _random_inverse_transform(n, weights, means, sigmas, rng)
raise NotImplementedError()
# endregion Distribution Methods
# region Plotting Methods
# noinspection PyTypeChecker
[docs]def plot(
weights: np.ndarray,
means: np.ndarray,
sigmas: np.ndarray,
row_names: Union[list, np.ndarray, None] = None,
x_min: Optional[float] = None,
x_max: Optional[float] = None,
x_ticks: Optional[List[float]] = None,
n_points: int = 1000,
show_mixture: bool = False,
show_mean: bool = False,
show_median: bool = False,
show_mode: bool = False,
percent_interval: Optional[float] = 0.9,
ncol: int = None,
figure_size: Tuple[float, float] = None
):
# if not (np.ndim(weights) == np.ndim(means) == np.ndim(sigmas) == 1):
# log.error("You can only plot one distribution at a time. (ndim != 1)")
# raise ValueError("You can only plot one distribution at a time.")
# Make sure the parameters are the right shape
weights = _make_2d(weights)
means = _make_2d(means)
sigmas = _make_2d(sigmas)
if row_names is None:
row_names = np.arange(weights.shape[0])
else:
row_names = np.array(row_names)
# row_names = np.array(row_names) or np.arange(weights.shape[0])
row_names = (
np.char.mod('%03d', row_names)
if all([isinstance(n, (int, np.int, np.int16, np.int32, np.int64)) for n in row_names])
else row_names
)
# A simple utility method for making a data frame for each parameter array
# of the correct shape.
def make_df(a: np.ndarray, value_name: str):
t = pd.DataFrame(a.transpose())
t.columns = row_names
t['center_no'] = np.arange(a.shape[1])
return t.melt('center_no', var_name='group', value_name=value_name)
def make_point_df(x_values):
y_values = [
pdf(xv, weights[i], means[i], sigmas[i])
for i, xv in enumerate(np.nditer(x_values))
]
return pd.DataFrame({'x': x_values, 'y': y_values, 'group': row_names})
# Merge each individual dataframe into a single one.
dfs = (make_df(weights, 'weight'), make_df(means, 'mu'), make_df(sigmas, 'sigma'))
param_df = reduce(lambda a, b: pd.merge(a, b, how='left', on=['center_no', 'group']), dfs)
# Get reasonable limits on the x-axis if they aren't specified as parameters
min_mu, max_mu = np.min(means), np.max(means)
max_sigma = np.max(sigmas)
x_min = min_mu - 2 * max_sigma if x_min is None else x_min
x_max = max_mu + 2 * max_sigma if x_max is None else x_max
x = np.linspace(x_min, x_max, num=n_points)
# Compute the weighted Normal pdf values for each row in the param_df and
# store them in a new data frame.
normal_df = pd.DataFrame(
np.repeat(param_df.values, len(x), axis=0),
columns=param_df.columns,
index=np.tile(x, len(param_df))
).rename_axis('x').reset_index()
# Calculate the pdf values for each data point
normal_df['y'] = normal_df.apply(
lambda r: r['weight'] * norm.pdf(r['x'], r['mu'], r['sigma']),
axis=1
)
# Convert the center_no and group columns to categorical data types
cat_columns = ['center_no', 'group']
normal_df[cat_columns] = normal_df[cat_columns].apply(lambda t: t.astype('category'))
# Reorder the columns to put y next to x (for human readability)
normal_df = normal_df[['group', 'center_no', 'weight', 'mu', 'sigma', 'x', 'y']]
gmix_df = normal_df[['group', 'x', 'y']].groupby(
['group', 'x']
).agg({
'y': 'sum'
}).reset_index()
# Add some end points so that the area under the curve gets filled in
# completely along the x-axis by the geom_polygon.
end_point_data = []
for grp in row_names:
end_point_data.append({'group': grp, 'x': x_min - 0.1, 'y': 0})
end_point_data.append({'group': grp, 'x': x_max + 0.1, 'y': 0})
end_point_df = pd.DataFrame(end_point_data)
gmix_df = gmix_df.append(end_point_df)
gmix_df.sort_values(['group', 'x'], inplace=True)
p = pn.ggplot()
p += pn.geom_polygon(
data=gmix_df,
mapping=pn.aes('x', 'y', fill='group'),
alpha=0.25,
color='black',
show_legend=len(row_names) > 1
)
if show_mean:
mean_df = make_point_df(mean(weights, means))
segment_aes = pn.aes(x='x', xend='x', y=0, yend='y', color='group')
text_aes = pn.aes(x='x', y=0, label='x')
text_kw = {'format_string': 'μ: {:0.3f}', 'ha': 'left', 'va': 'top'}
p += pn.geom_segment(mean_df, segment_aes, size=1.25, show_legend=len(row_names) > 1)
p += pn.geom_text(mean_df, text_aes, **text_kw)
if show_median or percent_interval:
median_df = make_point_df(median(weights, means, sigmas))
segment_aes = pn.aes(x='x', xend='x', y=0, yend='y', color='group')
text_aes = pn.aes(x='x', y='y', label='x')
text_kw = {'format_string': 'x̃: {:0.3f}', 'ha': 'left', 'va': 'bottom'}
p += pn.geom_segment(
median_df,
segment_aes,
linetype='dashed',
size=1.25,
show_legend=len(row_names) > 1
)
p += pn.geom_text(median_df, text_aes, **text_kw)
if show_mode:
modes = [
mode(weights[i], means[i], sigmas[i], method='cnewton')
for i in range(weights.shape[0])
]
d = [
dict(group=row_names[g_idx], x=x, y=y)
for g_idx, m in enumerate(modes)
for m_list in m
for x, y in m_list
]
mode_df = pd.DataFrame(d)
segment_aes = pn.aes(x='x', xend='x', y=0, yend='y', color='group')
text_aes = pn.aes(x='x', y='y/2', label='x', angle=-90)
text_kw = {'format_string': 'x̂: {:0.3f}', 'ha': 'left', 'va': 'center', 'nudge_y': -0.005}
p += pn.geom_segment(
mode_df,
segment_aes,
linetype='dotted',
size=1.25,
show_legend=len(row_names) > 1
)
p += pn.geom_text(mode_df, text_aes, **text_kw)
if percent_interval:
if not 0 < percent_interval < 1:
log.error(f"The percent interval must be between 0 and 100: {percent_interval}")
raise ValueError("The percent interval must be between 0 and 100")
def make_limit_df(bound_df, is_lower: bool):
fn = op.le if is_lower else op.ge
start = -1 if is_lower else 0
stop = None if is_lower else 1
# Select all the rows from the gmix_df that are outside the PI
# for each group
df_list = [
gmix_df.loc[(fn(gmix_df.x, row.x)) & (gmix_df.group == row.group), ]
for _, row in bound_df.iterrows()
]
tmp_df = pd.concat(df_list)
# We need to drop the polygon to zero at the interval boundary,
# so create a new row that has the same x values of the last point
# of the lower bound and the first point of the upper bound, but
# set the y value to 0.
last_point = tmp_df.groupby('group').apply(
lambda g: pd.DataFrame(g.iloc[start:stop, ].values, columns=g.columns)
)
last_point['y'] = 0
tmp_df = tmp_df.append(last_point)
tmp_df.reset_index(drop=True, inplace=True)
# Make sure the columns are of the correct type (for some reason they
# get reset to 'object' above.
tmp_df['group'] = tmp_df.group.astype('category')
tmp_df[['x', 'y']] = tmp_df[['x', 'y']].apply(lambda c: c.astype('float'))
return tmp_df
h = (1 - percent_interval) / 2
lower_bound = h
upper_bound = percent_interval + h
lower_bound_df = make_point_df(ppf(lower_bound, weights, means, sigmas))
upper_bound_df = make_point_df(ppf(upper_bound, weights, means, sigmas))
lower_df = make_limit_df(lower_bound_df, True)
upper_df = make_limit_df(upper_bound_df, False)
poly_kw = {'alpha': 0.6, 'show_legend': len(row_names) > 1}
p += pn.geom_polygon(lower_df, pn.aes('x', 'y', fill='group'), **poly_kw)
p += pn.geom_polygon(upper_df, pn.aes('x', 'y', fill='group'), **poly_kw)
if show_mixture:
p += pn.geom_path(
data=normal_df,
mapping=pn.aes(
'x',
'y',
color='group',
),
alpha=0.5,
show_legend=len(row_names) > 1
)
if ncol and len(row_names) > 1:
p += pn.facet_wrap('~ group', ncol=ncol)
if weights.shape[0] == 1:
if percent_interval:
title = f"{row_names[0]} With {percent_interval:0.3f} PI"
else:
title = f"{row_names[0]}"
p += pn.scale_fill_grey()
p += pn.scale_color_grey()
else:
if percent_interval:
title = f"Density Plot With {percent_interval:0.3f} PI"
else:
title = "Density Plot"
palette = 'Dark2'
p += pn.scale_fill_brewer(palette=palette, type='qualitative')
p += pn.scale_color_brewer(palette=palette, type='qualitative')
p + pn.scale_x_continuous(breaks=x_ticks) if x_ticks else p
# p += pn.scale_y_continuous(limits=(-0.015, None))
p += pn.ylab('Density')
p += pn.ggtitle(title)
p += pn.theme_bw()
if figure_size:
p += pn.theme(figure_size=figure_size)
return p
# endregion Plotting Methods
# region Helper Methods
[docs]def make_centers(
data: np.ndarray,
n_centers: int,
extend_lower: float = 0,
extend_upper: float = 0,
method: str = 'jenks'
):
r"""
Return an array of kernel centers given some target :code:`data`. There
are currently two supported methods for creating the array. The first
method uses the `Jenks natural breaks
<https://en.wikipedia.org/wiki/Jenks_natural_breaks_optimization>`_
algorithm and the second simply divides the space up uniformly.
If you suspect your training data will not adequately cover the range
of values in the target distribution you can optionally extend the
lower and upper bounds by a given proportion :math:`p` (specified as a
floating point number between 0 and 1). The lower range will be
extended by:
:math:`lower\_bound = min(data) - p \cdot (max(data) - min(data))`
While the upper range will be extended by:
:math:`upper\_bound = max(data) + p \cdot (max(data) - min(data))`
The *uniform* method splits the range (:math:`min - max`) using
:func:`np.linspace` and then returns the mid points between the breaks.
The *jenks* method simply uses the split points returned by the
algorithm as the kernel centers, including the extended lower and upper
bounds. It may be more precise to use the mid points between the
breaks, but in practice it should not make much
difference for many applications. If desired, this can easily be
accomplished on the returned results.
:param data: A 1-dimensional array of data used to identify the kernel
centers.
:param n_centers: The number of centers to return.
:param extend_lower: The proportion to extend the lower bound by.
:param extend_upper: THe proportion to extend the upper bound by.
:param method: The method used to find kernel centers from the given
data. The choices are *jenks* or *uniform*.
:return: An array representing the center point of each kernel.
"""
log.debug(f"Making kernel centers using {method} method")
if method == 'jenks':
return _make_jenks_centers(data, n_centers, extend_lower, extend_upper)
elif method == 'uniform':
return _make_uniform_centers(data, n_centers, extend_lower, extend_upper)
else:
raise ValueError(f"Unknown method kernel centering method: {method}")
# endregion Helper Methods
# region Utility Methods
# region Common
def _make_2d(a: np.ndarray):
return a[:, np.newaxis].transpose() if np.ndim(a) == 1 else a
# endregion Common
# region Make Centers
def _make_jenks_centers(data: np.ndarray, n_centers: int, extend_left: float, extend_right: float):
from jenkspy import jenks_breaks
data = _extend_data(data, extend_left, extend_right)
breaks = jenks_breaks(data, nb_class=n_centers-1)
return np.array(breaks)
def _make_uniform_centers(
data: np.ndarray,
n_centers: int,
extend_left: float,
extend_right: float
):
data = _extend_data(data, extend_left, extend_right)
y_min, y_max = min(data), max(data)
breaks = np.linspace(y_min, y_max, num=n_centers)
return breaks
def _extend_data(data: np.ndarray, extend_left: float, extend_right: float):
if not 0 <= extend_left <= 1:
raise ValueError(f"extend_left must be >= 0 and <= 1 ({extend_left})")
if not 0 <= extend_right <= 1:
raise ValueError(f"extend_right must be >= 0 and <= 1 ({extend_right})")
if extend_left > 0 or extend_right > 0:
min_val, max_val = np.min(data), np.max(data)
span = max_val - min_val
return np.append(data, [min_val - span * extend_left, max_val + span * extend_right])
return data
# endregion Make Centers
# region Random
def _random_inverse_transform(
n: int,
weights: np.ndarray,
means: np.ndarray,
sigmas: np.ndarray,
rng: np.random.RandomState
):
# noinspection PyArgumentList
uniform_rand = rng.rand(n) if rng else np.random.rand(n)
return np.array([ppf(u, weights, means, sigmas) for u in uniform_rand]).transpose()
# endregion Random
# region Mode
def _gmix_derivative_1(x: float, weights: np.ndarray, means: np.ndarray, sigmas: np.ndarray):
# Note the values learned by the model and stored in `sigmas` may actually be sigma^2
return np.sum(
weights * -(x - means) * norm.pdf(x, means, sigmas) / sigmas**2
)
def _gmix_derivative_2(x: float, weights: np.ndarray, means: np.ndarray, sigmas: np.ndarray):
fx = norm.pdf(x, means, sigmas)
return np.sum(
weights
* (
(-fx / (sigmas**2))
+ ((x - means)**2 * fx / sigmas**4)
)
)
def _mode_newtons_method(
weights: np.ndarray,
means: np.ndarray,
sigmas: np.ndarray,
max_itr: int = 1000,
min_diff: float = 1e-4,
min_grad: float = 1e-9
):
# This paper describes a more complicated method for finding the modes
# that involves both Newton's method and gradient ascent. It combines the
# methods, because Newton's method only works when the Hessian is
# negative definite. However, in the univariate case it doesn't seem
# to be a problem, although I could easily be wrong about this.
# http://faculty.ucmerced.edu/mcarreira-perpinan/papers/cs-99-03.pdf
def local_modes(loc_weights, loc_means, loc_sigmas):
kwargs = {'weights': loc_weights, 'means': loc_means, 'sigmas': loc_sigmas}
pdf_x = partial(pdf, **kwargs)
deriv_1 = partial(_gmix_derivative_1, **kwargs)
deriv_2 = partial(_gmix_derivative_2, **kwargs)
loc_modes = []
for m in range(loc_weights.shape[0]):
itr = 0
x_new = loc_means[m]
while True:
g1 = deriv_1(x_new)
g2 = deriv_2(x_new)
if np.isclose(g2, 0):
break
x_old = x_new
x_new = x_old - (g1 / g2)
if itr >= max_itr or math.fabs(g1) < min_grad:
break
itr += 1
# Only consider maxima
if deriv_2(x_new) < 0:
# Don't add the new value if we already found it from another
# start point.
close = [c for c in loc_modes if math.fabs(c - x_new) < min_diff]
if len(close) == 0:
loc_modes.append(x_new)
return [(x, pdf_x(x)) for x in loc_modes]
modes = []
weights = _make_2d(weights)
means = _make_2d(means)
sigmas = _make_2d(sigmas)
n_groups = weights.shape[0]
for g in range(n_groups):
g_weights, g_means, g_sigmas = weights[g], means[g], sigmas[g]
g_modes = local_modes(g_weights, g_means, g_sigmas)
modes.append(g_modes)
return modes
def _mode_cnewtons_method(
weights: np.ndarray,
means: np.ndarray,
sigmas: np.ndarray,
max_itr: int = 1000,
min_diff: float = 1e-4,
min_grad: float = 1e-9
):
import cgmix
weights = _make_2d(weights)
means = _make_2d(means)
sigmas = _make_2d(sigmas)
n_groups = weights.shape[0]
modes = []
for g in range(n_groups):
g_weights, g_means, g_sigmas = weights[g].tolist(), means[g].tolist(), sigmas[g].tolist()
result = cgmix.find_modes_newton(g_weights, g_means, g_sigmas, max_itr, min_diff, min_grad)
result = [(t1, t2) for t1, t2 in result]
modes.append(result)
return modes
def _mode_line_search(
weights: np.ndarray,
means: np.ndarray,
sigmas: np.ndarray,
precision: float = 0.005
):
log.debug(f"Using line search with precision: {precision}")
weights = _make_2d(weights)
means = _make_2d(means)
sigmas = _make_2d(sigmas)
def local_modes(lweights, lmeans, lsigmas):
lweights = _make_2d(lweights)
lmeans = _make_2d(lmeans)
lsigmas = _make_2d(lsigmas)
pdf_x = partial(pdf, weights=lweights, means=lmeans, sigmas=lsigmas)
min_val = np.min(lmeans, axis=1)
max_val = np.max(lmeans, axis=1)
span = max_val - min_val
min_val, max_val = min_val - span * 0.1, max_val + span * 0.1
x = np.arange(min_val, max_val, precision)
values = np.array([pdf_x(x_val) for x_val in x])
diff = np.diff(values, axis=0)
sign = np.sign(diff)
rle = [_mode_rle(sign[:, i])[0] for i in range(sign.shape[1])]
# noinspection PyTypeChecker
indexes = [np.cumsum(a) for a in rle]
lmodes = [
[
(x[item[j]], values[item[j]][i])
for j in range(0 if sign[i][0] > 0 else 1, len(item), 2)
]
for i, item in enumerate(indexes)
]
return lmodes[0]
return [local_modes(weights[g], means[g], sigmas[g]) for g in range(weights.shape[0])]
def _mode_rle(a: np.ndarray):
ia = np.asarray(a) # force numpy
n = len(ia)
if n == 0:
return None, None, None
else:
y = np.array(ia[1:] != ia[:-1]) # pairwise unequal (string safe)
i = np.append(np.where(y), n - 1) # must include last element position
z = np.diff(np.append(-1, i)) # run lengths
p = np.cumsum(np.append(0, z))[:-1] # positions
return z, p, ia[i]
# endregion Mode
# endregion Utility Methods