import copy
from typing import Collection, Dict, Union
from numpy.typing import ArrayLike
import numpy as np
import pandas as pd
from prettytable import PrettyTable
from scipy.stats import skew, kurtosis
from pyinterpolate.core.data_models.points import VariogramPoints
from pyinterpolate.semivariogram.experimental.classes.experimental_variogram import (
ExperimentalVariogram)
from pyinterpolate.transform.statistical import remove_outliers
def build_swarmplot_input(data: Dict, bins=None):
"""
Function prepares data for lagged beeswarm plot.
Parameters
----------
data : Dict
``{lag: [values]}``
bins : int, optional
If ``None`` given then number of bins per lag is chosen automatically.
Returns
-------
: numpy array
[lags (x-coordinates), values]
"""
xs_arr = []
ys_arr = []
lags = list(data.keys())
step_sizes = [x - lags[idx] for idx, x in enumerate(lags[1:])]
ss = [lags[0]].copy()
ss.extend(step_sizes)
for idx, dict_obj in enumerate(data.items()):
center = dict_obj[0]
values = dict_obj[1]
step_size = ss[idx]
# bin values
# TODO: set max points per level
if bins is None:
histogram, bin_edges = np.histogram(values, bins='auto')
else:
histogram, bin_edges = np.histogram(values, bins=bins)
# Now prepare x-coordinates per lag
x_indexes = []
y_values = []
# Define limits
max_no = np.max(histogram)
limits = [
step_size * _get_bin_width(x, max_no) for x in histogram
]
lower_limits = [center - l for l in limits]
upper_limits = [center + l for l in limits]
for jdx, no_points in enumerate(histogram):
xc = np.linspace(lower_limits[jdx],
upper_limits[jdx],
no_points)
yc = [bin_edges[jdx+1] for _ in xc]
x_indexes.extend(xc)
y_values.extend(yc)
xs_arr.extend(x_indexes)
ys_arr.extend(y_values)
arr = np.array([xs_arr, ys_arr]).transpose()
return arr
[docs]
class VariogramCloud:
"""
Class calculates Variogram Point Cloud and presents it in a scatterplot,
boxplot or violinplot.
Parameters
----------
ds : numpy array
``[x, y, value]``
values : ArrayLike, optional
Observation in the i-th geometry (from ``geometries``). Optional
parameter, if not given then ``ds`` must be provided.
geometries : ArrayLike, optional
Array or similar structure with geometries. It must have the same
length as ``values``. Optional parameter, if not given then ``ds``
must be provided. Point type geometry.
step_size : float
The fixed distance between lags grouping point neighbors.
max_range : float
The maximum distance at which the semivariance is calculated.
direction : float, optional
Direction of semivariogram, values from 0 to 360 degrees:
- 0 or 180: is E-W,
- 90 or 270 is N-S,
- 45 or 225 is NE-SW,
- 135 or 315 is NW-SE.
tolerance : float, optional
If ``tolerance`` is 0 then points must be placed at a single line with
the beginning in the origin of the coordinate system and the
direction given by y-axis and direction parameter.
If ``tolerance`` is ``> 0`` then the bin is selected as an elliptical
area with major axis pointed in the same direction as the line for
``0`` tolerance.
* The major axis size == ``step_size``.
* The minor axis size is ``tolerance * step_size``
* The baseline point is at a center of the ellipse.
* The ``tolerance == 1`` creates an omnidirectional semivariogram.
custom_bins : numpy array, optional
Custom bins for semivariance calculation. If provided, then parameter
``step_size`` is ignored and ``max_range`` is set to the final bin
distance.
custom_weights : numpy array, optional
Custom weights assigned to points. Only semivariance values are
weighted.
Attributes
----------
semivariances : Dict
Lag - all semivariances between point pairs:
``{lag: [semivariances], }``.
lags : ArrayLike
Lags.
direction : float
See ``direction`` parameter.
tolerance : float
See ``tolerance`` parameter.
Methods
-------
average_semivariance()
Returns ``lag, average semivariance, number of point pairs``
as a numpy array.
describe()
The point cloud statistics.
plot()
Plots scatterplot, boxplot or violinplot of the point cloud.
remove_outliers()
Removes outliers from the semivariance plots.
See Also
--------
ExperimentalVariogram : class that calculates experimental semivariogram,
experimental covariogram and data variance.
Examples
--------
>>> import numpy as np
>>> from pyinterpolate import VariogramCloud
>>>
>>>
>>> REFERENCE_INPUT = np.array([
... [0, 0, 8],
... [1, 0, 6],
... [2, 0, 4],
... [3, 0, 3],
... [4, 0, 6],
... [5, 0, 5],
... [6, 0, 7],
... [7, 0, 2],
... [8, 0, 8],
... [9, 0, 9],
... [10, 0, 5],
... [11, 0, 6],
... [12, 0, 3]
... ])
>>> STEP_SIZE = 1
>>> MAX_RANGE = 4
>>> vc = VariogramCloud(
... values=REFERENCE_INPUT[:, -1],
... geometries=REFERENCE_INPUT[:, :-1],
... step_size=STEP_SIZE,
... max_range=MAX_RANGE
... )
>>> stats = vc.describe()
>>> print(stats[1]['count'])
24
>>> print(stats[2]['median'])
9
"""
def __init__(self,
ds: Union[ArrayLike, VariogramPoints] = None,
values: ArrayLike = None,
geometries: ArrayLike = None,
step_size: float = None,
max_range: float = None,
direction: float = None,
tolerance: float = None,
custom_bins: Union[np.ndarray, Collection] = None,
custom_weights: np.ndarray = None):
self._experimental_variogram = ExperimentalVariogram(
ds=ds,
values=values,
geometries=geometries,
step_size=step_size,
max_range=max_range,
direction=direction,
tolerance=tolerance,
custom_bins=custom_bins,
custom_weights=custom_weights,
is_semivariance=True,
is_covariance=False,
as_cloud=True
)
self.semivariances = self._experimental_variogram.point_cloud_semivariances
self.lags = self._experimental_variogram.lags
self.direction = direction
self.tolerance = tolerance
# Additional params
self._stats_names = ['lag',
'count',
'mean',
'std',
'min',
'25%',
'median',
'75%',
'max',
'skewness',
'kurtosis']
[docs]
def average_semivariance(self):
"""
Returns mean of semivariances for each lag - which is equal to
the experimental semivariogram output.
Returns
-------
: numpy array
Mean of semivariances for each lag.
"""
ds = []
for l in self.lags:
ds.append(
[
l,
np.mean(self.semivariances[l]),
len(self.semivariances[l])
]
)
return np.array(ds)
[docs]
def describe(self, as_dataframe=False) -> Union[Dict, pd.DataFrame]:
"""
Method calculates basic statistics. Includes count (point pairs
number), average semivariance, standard deviation, minimum,
1st quartile, median, 3rd quartile, maximum, skewness, and kurtosis.
Returns
-------
statistics : Dict
>>> statistics = {
... lag_number:
... {
... 'count': point pairs count,
... 'mean': mean semivariance,
... 'std': standard deviation,
... 'min': minimal variance,
... '25%': first quartile of variances,
... 'median': second quartile of variances,
... '75%': third quartile of variances,
... 'max': max variance,
... 'skewness': skewness,
... 'kurtosis': kurtosis
... }
... }
"""
# Get stats per lag
statistics = {}
for lag in self.lags:
lag_dict = {}
data = self.semivariances[lag]
pairs_count = len(data)
lag_dict['count'] = pairs_count
avg_semi = np.mean(data) / 2
lag_dict['mean'] = avg_semi
std_cloud = np.std(data)
lag_dict['std'] = std_cloud
min_cloud = np.min(data)
lag_dict['min'] = min_cloud
max_cloud = np.max(data)
lag_dict['max'] = max_cloud
first_q = np.quantile(data, q=0.25)
lag_dict['25%'] = first_q
med_cloud = np.median(data)
lag_dict['median'] = med_cloud
third_q = np.quantile(data, q=0.75)
lag_dict['75%'] = third_q
skew_cloud = skew(data)
lag_dict['skewness'] = skew_cloud
kurt_cloud = kurtosis(data)
lag_dict['kurtosis'] = kurt_cloud
lag_dict['lag'] = lag
# Update statistics
statistics[lag] = lag_dict
if as_dataframe:
return pd.DataFrame(statistics)
else:
return statistics
[docs]
def experimental_semivariances(self) -> ExperimentalVariogram:
"""
Returns experimental semivariogram.
Returns
-------
: ExperimentalVariogram
Experimental semivariogram object.
"""
return self._experimental_variogram
[docs]
def plot(self, kind='scatter'):
"""
Method plots variogram point cloud.
Parameters
----------
kind : string, default='scatter'
available plot types: 'scatter', 'box', 'violin'
Returns
-------
: bool
``True`` if Figure was plotted.
"""
if kind == 'scatter':
self._scatter_plot()
elif kind == 'box':
self._box_plot()
elif kind == 'violin':
self._violin_plot()
else:
msg = (f'Plot kind {kind} is not available. '
f'Use "scatter", "box" or "violin" instead.')
raise KeyError(msg)
return True
[docs]
def remove_outliers(self,
method='zscore',
z_lower_limit=-3,
z_upper_limit=3,
iqr_lower_limit=1.5,
iqr_upper_limit=1.5,
inplace=False):
"""
Method cleans semivariogram point cloud from outliers.
Parameters
----------
method : str, default='zscore'
Method used to detect outliers. Can be 'zscore' or 'iqr'.
z_lower_limit : float
Number of standard deviations from the mean to the left side
of a distribution. Must be lower than 0.
z_upper_limit : float
Number of standard deviations from the mean to the right side
of a distribution. Must be greater than 0.
iqr_lower_limit : float
Number of standard deviations from the 1st quartile into
the lowest values. Must be greater or equal to zero.
iqr_upper_limit : float
Number of standard deviations from the 3rd quartile into
the largest values. Must be greater or equal to zero.
inplace : bool, default = False
Overwrite semivariances or return new object.
Returns
-------
: VariogramCloud
If ``inplace`` is set to ``False`` then method returns new
instance of an object with cleaned semivariances.
Raises
------
RunetimeError
The attribute experimental_point_cloud is not calculated.
"""
if inplace:
self.semivariances = remove_outliers(
self.semivariances,
method=method,
z_lower_limit=z_lower_limit,
z_upper_limit=z_upper_limit,
iqr_lower_limit=iqr_lower_limit,
iqr_upper_limit=iqr_upper_limit)
return None
else:
new_instance = copy.deepcopy(self)
new_instance.semivariances = remove_outliers(
self.semivariances,
method=method,
z_lower_limit=z_lower_limit,
z_upper_limit=z_upper_limit,
iqr_lower_limit=iqr_lower_limit,
iqr_upper_limit=iqr_upper_limit
)
return new_instance
def _box_plot(self):
title_box = 'Boxplot of Variogram Point Cloud per lag.'
self.__dist_plots(title_box, 'box')
def _scatter_plot(self):
import matplotlib.pyplot as plt
ds = self.__prep_scatterplot_data()
plt.figure(figsize=(14, 8))
xs = ds[:, 0]
ys = ds[:, 1]
plt.scatter(x=xs,
y=ys,
s=0.2)
plt.title('Variogram Point Cloud per lag.')
plt.show()
def _violin_plot(self):
title_violin = 'Variogram Point Cloud distributions per lag.'
self.__dist_plots(title_violin, 'violin')
def __str__(self):
pretty_table = PrettyTable()
pretty_table.field_names = self._stats_names
pretty_table.add_rows(self.__desc_str())
table = '\n\n' + pretty_table.get_string()
return table
def __dist_plots(self, title_label: str, kind='box'):
import matplotlib.pyplot as plt
plot_data_values, plabels = self.__prep_distplot_data()
fig, ax = plt.subplots(figsize=(14, 8))
if kind == 'box':
ax.boxplot(plot_data_values, showfliers=False)
elif kind == 'violin':
vplot = ax.violinplot(plot_data_values,
showmeans=True,
showmedians=True,
showextrema=True)
vplot['cmeans'].set_color('orange')
vplot['cmedians'].set(color='black', ls='dashed')
vplot['cmins'].set(color='black')
vplot['cmaxes'].set(color='black')
ax.legend([vplot['cmeans'],
vplot['cmedians'],
vplot['cmins'],
vplot['cmaxes']], ['mean', 'median', 'min & max'],
loc='upper left')
str_plabels = [str(f'{plabel:.2f}') for plabel in plabels]
ax.set_xticks(np.arange(1, len(plabels) + 1))
ax.set_xticklabels(str_plabels)
plt.title(title_label)
plt.show()
def __desc_str(self):
description = self.describe()
rows = []
for lag in self.lags:
vals = description[lag]
row = []
for fname in self._stats_names:
row.append(vals[fname])
rows.append(row)
return rows
def __prep_distplot_data(self):
data = list(self.semivariances.values())
x_tick_labels = list(self.semivariances.keys())
return data, x_tick_labels
def __prep_scatterplot_data(self) -> np.array:
ds = build_swarmplot_input(data=self.semivariances)
return ds
def _get_bin_width(no, max_no, max_width=0.3):
"""
Function gets bin width on a plot.
Parameters
----------
no : int
The number of points within bin.
max_no : int
The maximum number of points within all bins.
max_width : float
The maximum deviation from the lag center in percent.
Returns
-------
: float
The deviation from the lag center.
"""
bin_width = (no * max_width) / max_no
return bin_width