"""
Distance calculation functions.
Authors
-------
1. Szymon Moliński | @SimonMolinsky
"""
from typing import Dict, Union, Hashable
import geopandas as gpd
import numpy as np
import pandas as pd
from scipy.spatial.distance import cdist
def _calc_b2b_dist_from_dataframe(
ps_blocks: Union[pd.DataFrame, gpd.GeoDataFrame],
lon_col_name: Union[str, Hashable],
lat_col_name: Union[str, Hashable],
val_col_name: Union[str, Hashable],
block_id_col_name: Union[str, Hashable]
) -> pd.DataFrame:
"""Fully vectorized version - fastest approach."""
unique_blocks = ps_blocks[block_id_col_name].unique()
n_blocks = len(unique_blocks)
# Pre-compute block data once
block_data = {}
for block_id in unique_blocks:
mask = ps_blocks[block_id_col_name] == block_id
coords = ps_blocks.loc[mask, [lon_col_name, lat_col_name]].values
weights = ps_blocks.loc[mask, val_col_name].values
block_data[block_id] = (coords, weights)
# Initialize distance matrix
distance_matrix = np.zeros((n_blocks, n_blocks))
# Compute distances for upper triangle only
for i in range(n_blocks):
for j in range(i, n_blocks):
if i == j:
distance_matrix[i, j] = 0.0
else:
coords_i, weights_i = block_data[unique_blocks[i]]
coords_j, weights_j = block_data[unique_blocks[j]]
# Use scipy's cdist for pairwise distances
dist_matrix = cdist(coords_i, coords_j)
weight_matrix = np.outer(weights_i, weights_j)
distance = np.sum(dist_matrix * weight_matrix) / np.sum(
weight_matrix)
distance_matrix[i, j] = distance
distance_matrix[j, i] = distance # Symmetric
# Create DataFrame
return pd.DataFrame(
distance_matrix,
index=unique_blocks,
columns=unique_blocks
)
# def _calc_b2b_dist_from_dataframe(
# ps_blocks: Union[pd.DataFrame, gpd.GeoDataFrame],
# lon_col_name: Union[str, Hashable],
# lat_col_name: Union[str, Hashable],
# val_col_name: Union[str, Hashable],
# block_id_col_name: Union[str, Hashable],
# verbose=False
# ) -> pd.DataFrame:
# r"""
# Function calculates distances between the blocks' point supports.
#
# Parameters
# ----------
# ps_blocks : Union[pd.DataFrame, gpd.GeoDataFrame]
# DataFrame with point supports and block indexes.
#
# lon_col_name : Union[str, Hashable]
# Longitude or x coordinate.
#
# lat_col_name : Union[str, Hashable]
# Latitude or y coordinate.
#
# val_col_name : Union[str, Hashable]
# The point support values column.
#
# block_id_col_name : Union[str, Hashable]
# Column with block names / indexes.
#
# verbose : bool, default = False
# Show progress bar.
#
# Returns
# -------
# block_distances : DataFrame
# Indexes and columns are block indexes, cells are distances.
#
# """
# calculated_pairs = set()
# unique_blocks = list(ps_blocks[block_id_col_name].unique())
#
# col_set = [lon_col_name, lat_col_name, val_col_name]
#
# results = []
#
# for block_i in tqdm(unique_blocks, disable=not verbose):
# for block_j in unique_blocks:
# # Check if it was estimated
# if not (block_i, block_j) in calculated_pairs:
# if block_i == block_j:
# results.append([block_i, block_j, 0])
# else:
# i_value = ps_blocks[
# ps_blocks[block_id_col_name] == block_i
# ]
# j_value = ps_blocks[
# ps_blocks[block_id_col_name] == block_j
# ]
# value = _calculate_block_to_block_distance(
# i_value[col_set].to_numpy(),
# j_value[col_set].to_numpy()
# )
# results.append([block_i, block_j, value])
# results.append([block_j, block_i, value])
# calculated_pairs.add((block_i, block_j))
# calculated_pairs.add((block_j, block_i))
#
# # Create output dataframe
# df = pd.DataFrame(data=results, columns=['block_i', 'block_j', 'z'])
# df = df.pivot_table(
# values='z',
# index='block_i',
# columns='block_j'
# )
#
# # sort
# df = df.reindex(columns=unique_blocks)
# df = df.reindex(index=unique_blocks)
#
# return df
# noinspection PyUnresolvedReferences
def _calc_b2b_dist_from_ps(ps_blocks: 'PointSupport') -> Dict:
r"""
Function calculates distances between the blocks' point supports.
Parameters
----------
ps_blocks : PointSupport
PointSupport object with parsed blocks and their respective point
supports.
Returns
-------
block_distances : DataFrame
Indexes and columns are block indexes, cells are distances.
"""
block_distances = _calc_b2b_dist_from_dataframe(
ps_blocks=ps_blocks.point_support,
lon_col_name=ps_blocks.lon_col_name,
lat_col_name=ps_blocks.lat_col_name,
val_col_name=ps_blocks.value_column_name,
block_id_col_name=ps_blocks.point_support_blocks_index_name
)
return block_distances
# noinspection PyUnresolvedReferences
[docs]
def calc_block_to_block_distance(
ps_blocks: Union[pd.DataFrame, 'PointSupport'],
lon_col_name=None,
lat_col_name=None,
val_col_name=None,
block_id_col_name=None
) -> pd.DataFrame:
r"""
Function calculates distances between the blocks' point supports.
Parameters
----------
ps_blocks : Union[pd.DataFrame, PointSupport]
The point support of polygons.
* ``DataFrame``, then ``lon_col_name``, ``lat_col_name``,
``val_col_name``, ``block_id_col_name`` parameters must be
provided,
* ``PointSupport``.
lon_col_name : optional
Longitude or x coordinate.
lat_col_name : optional
Latitude or y coordinate.
val_col_name : optional
The point support values column.
block_id_col_name : optional
Column with block names / indexes.
Returns
-------
block_distances : DataFrame
Indexes and columns representing unique blocks, values in cells
are weighted distances between blocks.
Raises
------
AttributeError
Blocks are provided as DataFrame but column names were not given.
Examples
--------
>>> import os
>>> import geopandas as gpd
>>> from pyinterpolate import (
... calc_block_to_block_distance,
... Blocks,
... ExperimentalVariogram,
... PointSupport,
... TheoreticalVariogram
... )
>>>
>>>
>>> FILENAME = 'cancer_data.gpkg'
>>> LAYER_NAME = 'areas'
>>> DS = gpd.read_file(FILENAME, layer=LAYER_NAME)
>>> AREA_VALUES = 'rate'
>>> AREA_INDEX = 'FIPS'
>>> AREA_GEOMETRY = 'geometry'
>>> PS_LAYER_NAME = 'points'
>>> PS_VALUES = 'POP10'
>>> PS_GEOMETRY = 'geometry'
>>> PS = gpd.read_file(FILENAME, layer=PS_LAYER_NAME)
>>>
>>> CANCER_DATA = {
... 'ds': DS,
... 'index_column_name': AREA_INDEX,
... 'value_column_name': AREA_VALUES,
... 'geometry_column_name': AREA_GEOMETRY
... }
>>> POINT_SUPPORT_DATA = {
... 'ps': PS,
... 'value_column_name': PS_VALUES,
... 'geometry_column_name': PS_GEOMETRY
... }
>>> BLOCKS = Blocks(**CANCER_DATA)
>>> indexes = BLOCKS.block_indexes
>>>
>>> PS = PointSupport(
... points=POINT_SUPPORT_DATA['ps'],
... ps_blocks=BLOCKS,
... points_value_column=POINT_SUPPORT_DATA['value_column_name'],
... points_geometry_column=POINT_SUPPORT_DATA['geometry_column_name']
... )
>>> distances = calc_block_to_block_distance(PS)
>>> print(distances.head())
42049.0 42039.0 ... 23009.0 23029.0
42049.0 0.000000 48885.422652 ... 9.780781e+05 1.061496e+06
42039.0 48885.422652 0.000000 ... 9.952927e+05 1.079528e+06
42085.0 91350.566468 52240.566481 ... 1.030860e+06 1.115642e+06
42073.0 120259.614440 78853.981609 ... 1.042438e+06 1.127643e+06
42007.0 152110.979345 109955.408914 ... 1.055898e+06 1.141472e+06
"""
if isinstance(ps_blocks, pd.DataFrame):
if (
(lon_col_name is None) or
(lat_col_name is None) or
(val_col_name is None) or
(block_id_col_name is None)
):
raise AttributeError(
'Please provide the required column names '
'if you pass DataFrame into the function'
)
block_distances = _calc_b2b_dist_from_dataframe(
ps_blocks,
lon_col_name=lon_col_name,
lat_col_name=lat_col_name,
val_col_name=val_col_name,
block_id_col_name=block_id_col_name
)
else:
block_distances = _calc_b2b_dist_from_ps(ps_blocks)
return block_distances
# def _calculate_block_to_block_distance(ps_block_1: np.ndarray,
# ps_block_2: np.ndarray) -> float:
# r"""
# Function calculates distance between two blocks' point supports.
#
# Parameters
# ----------
# ps_block_1 : numpy array
# Point support of the first block.
#
# ps_block_2 : numpy array
# Point support of the second block.
#
# Returns
# -------
# weighted_distances : float
# Weighted point-support distance between blocks.
#
# Notes
# -----
# The weighted distance between blocks is derived from the equation given
# in publication [1] from References. This distance is weighted by
#
# References
# ----------
# .. [1] Goovaerts, P. Kriging and Semivariogram Deconvolution in the
# Presence of Irregular Geographical Units.
# Math Geosci 40, 101–128 (2008).
# https://doi.org/10.1007/s11004-007-9129-1
#
# TODO
# ----
# * Add reference equation to the special part of the documentation.
# """
#
# a_shape = ps_block_1.shape[0]
# b_shape = ps_block_2.shape[0]
# ax = ps_block_1[:, 0].reshape(1, a_shape)
# bx = ps_block_2[:, 0].reshape(b_shape, 1)
# dx = ax - bx
# ay = ps_block_1[:, 1].reshape(1, a_shape)
# by = ps_block_2[:, 1].reshape(b_shape, 1)
# dy = ay - by
# aval = ps_block_1[:, -1].reshape(1, a_shape)
# bval = ps_block_2[:, -1].reshape(b_shape, 1)
# w = aval * bval
#
# dist = np.sqrt(dx ** 2 + dy ** 2, dtype=float, casting='unsafe')
#
# wdist = dist * w
# distances_sum = np.sum(wdist) / np.sum(w)
# return distances_sum
def select_neighbors_in_range(data: pd.DataFrame,
current_lag: float,
previous_lag: float):
"""
Function selects the neighbors of each block within a range given by
previous and current lags.
Parameters
----------
data : DataFrame
Distances between blocks, columns and rows are block indexes.
current_lag : float
Actual maximum distance.
previous_lag : float
Previous maximum distance.
Returns
-------
neighbors : Dict
block index: [list of neighbor indexes within a range
given by previous and current lags]
"""
neighbors = data.reset_index()
neighbors = neighbors.melt(id_vars=neighbors.columns[0], value_vars=neighbors.columns[1:])
neighbors = neighbors[
(neighbors['value'] > previous_lag) & (neighbors['value'] <= current_lag)
]
dneighbors = neighbors.dropna()
if len(dneighbors) == 0:
return {}
else:
pneighbors = dneighbors.drop(columns='value')
other_neighbors_col = pneighbors.columns[-1]
pneighbors = pneighbors.groupby(pneighbors.columns[0])[
other_neighbors_col
].apply(list)
output = pneighbors.to_dict()
return output