from dataclasses import dataclass
from typing import Any
import networkx as nx
import numpy as np
from npap.exceptions import PartitioningError
from npap.interfaces import PartitioningStrategy
from npap.logging import LogCategory, log_debug, log_info, log_warning
from npap.utils import (
compute_geographical_distances,
create_partition_map,
run_dbscan,
run_hdbscan,
run_hierarchical,
run_kmeans,
run_kmedoids,
validate_partition,
validate_required_attributes,
with_runtime_config,
)
[docs]
@dataclass
class GeographicalConfig:
"""
Configuration parameters for geographical partitioning.
Attributes
----------
random_state : int
Random seed for reproducibility in stochastic algorithms.
max_iter : int
Maximum iterations for iterative algorithms (K-means, K-medoids).
n_init : int
Number of initializations for K-means.
hierarchical_linkage : str
Linkage criterion for hierarchical clustering
('ward' for Euclidean, or 'complete'/'average'/'single').
infinite_distance : float
Value used to represent "infinite" distance between nodes in different
AC islands. Using a large finite value instead of np.inf to avoid
numerical issues in clustering algorithms.
"""
random_state: int = 42
max_iter: int = 300
n_init: int = 10
hierarchical_linkage: str = "ward"
infinite_distance: float = 1e4
[docs]
class GeographicalPartitioning(PartitioningStrategy):
"""
Partition nodes based on geographical distance.
This strategy clusters nodes based on their spatial coordinates using
various clustering algorithms. It supports both Euclidean distance
(for projected coordinates) and Haversine distance (for lat/lon coordinates).
AC-Island Awareness:
When the graph contains data with DC links (nodes have given
'ac_island' attribute), the strategy automatically respects AC island
boundaries by assigning infinite distance between nodes in different
AC islands.
Note: K-means and hierarchical with 'ward' linkage cannot support
AC-island awareness because they use raw coordinates instead of
precomputed distances. When AC islands are detected with these
algorithms, a warning is issued recommending alternative algorithms.
Supported algorithms:
- 'kmeans': K-Means clustering (Euclidean only, fast, no AC-island support)
- 'kmedoids': K-Medoids clustering (any metric, robust to outliers, AC-island aware)
- 'dbscan': DBSCAN density-based clustering (automatic cluster count, AC-island aware)
- 'hierarchical': Agglomerative hierarchical clustering (AC-island aware except ward)
- 'hdbscan': HDBSCAN density-based clustering (automatic cluster count, AC-island aware)
Configuration can be provided at:
- Instantiation time (via `config` parameter in __init__)
- Partition time (via `config` or individual parameters in partition())
Partition-time parameters override instance defaults for that call only.
"""
SUPPORTED_ALGORITHMS = ["kmeans", "kmedoids", "dbscan", "hierarchical", "hdbscan"]
SUPPORTED_DISTANCE_METRICS = ["euclidean", "haversine"]
# Algorithms that support AC-island awareness (use precomputed distances)
AC_ISLAND_AWARE_ALGORITHMS = ["kmedoids", "dbscan", "hdbscan"]
# Config parameter names for runtime override detection
_CONFIG_PARAMS = {
"random_state",
"max_iter",
"n_init",
"hierarchical_linkage",
"infinite_distance",
}
[docs]
def __init__(
self,
algorithm: str = "kmeans",
distance_metric: str = "euclidean",
ac_island_attr: str = "ac_island",
config: GeographicalConfig | None = None,
):
"""
Initialize geographical partitioning strategy.
Parameters
----------
algorithm : str, default='kmeans'
Clustering algorithm ('kmeans', 'kmedoids', 'dbscan',
'hierarchical', 'hdbscan').
distance_metric : str, default='euclidean'
Distance metric ('haversine', 'euclidean').
ac_island_attr : str, default='ac_island'
Node attribute name containing AC island ID.
config : GeographicalConfig, optional
Configuration parameters for the algorithm.
Raises
------
ValueError
If unsupported algorithm or distance metric specified.
"""
self.algorithm = algorithm
self.distance_metric = distance_metric
self.ac_island_attr: str = ac_island_attr
self.config = config or GeographicalConfig()
if algorithm not in self.SUPPORTED_ALGORITHMS:
raise ValueError(
f"Unsupported algorithm: {algorithm}. "
f"Supported: {', '.join(self.SUPPORTED_ALGORITHMS)}"
)
if distance_metric not in self.SUPPORTED_DISTANCE_METRICS:
raise ValueError(
f"Unsupported distance metric: {distance_metric}. "
f"Supported: {', '.join(self.SUPPORTED_DISTANCE_METRICS)}"
)
log_debug(
f"Initialized GeographicalPartitioning: algorithm={algorithm}, metric={distance_metric}",
LogCategory.PARTITIONING,
)
@property
def required_attributes(self) -> dict[str, list[str]]:
"""Required node attributes for geographical partitioning."""
return {"nodes": ["lat", "lon"], "edges": []}
def _get_strategy_name(self) -> str:
"""Get descriptive strategy name for error messages."""
return f"geographical_{self.algorithm}"
[docs]
@with_runtime_config(GeographicalConfig, _CONFIG_PARAMS)
@validate_required_attributes
def partition(self, graph: nx.Graph, **kwargs) -> dict[int, list[Any]]:
"""
Partition nodes based on geographical coordinates.
Automatically detects and respects AC island boundaries when the graph
contains voltage-aware data (nodes have 'ac_island' attribute). When
AC islands are detected, infinite distance is assigned between nodes
in different AC islands.
Parameters
----------
graph : nx.Graph
NetworkX graph with lat, lon attributes on nodes.
**kwargs : dict
Additional parameters:
- n_clusters : Number of clusters (required for kmeans, kmedoids, hierarchical)
- eps : Epsilon (required for dbscan)
- min_samples : Minimum samples (required for dbscan)
- min_cluster_size : Minimum cluster size for HDBSCAN (default: 5)
- config : GeographicalConfig instance to override instance config
- random_state : Override config parameter
- max_iter : Override config parameter
- n_init : Override config parameter
- hierarchical_linkage : Override config parameter
- infinite_distance : Override config parameter
Returns
-------
dict[int, list[Any]]
Dictionary mapping cluster_id -> list of node_ids.
Raises
------
PartitioningError
If partitioning fails.
"""
try:
# Get effective config (injected by decorator)
effective_config = kwargs.get("_effective_config", self.config)
n_clusters = kwargs.get("n_clusters")
# Extract coordinates
nodes = list(graph.nodes())
coordinates = self._extract_coordinates(graph, nodes)
# Auto-detect AC island data
ac_islands = None
has_ac_islands = self._has_ac_island_data(graph, nodes)
if has_ac_islands:
ac_islands = self._extract_ac_islands(graph, nodes)
n_ac_islands = len(set(ac_islands))
# Check if algorithm supports AC-island awareness
if not self._supports_ac_island_awareness(effective_config):
log_warning(
f"AC islands detected ({n_ac_islands} islands) but '{self.algorithm}' "
f"algorithm does not support AC-island-aware partitioning. "
f"Consider using 'kmedoids', 'dbscan', or 'hierarchical' (with non-ward linkage) for AC-island awareness.",
LogCategory.PARTITIONING,
)
ac_islands = None # Disable AC-island awareness for unsupported algorithms
else:
log_info(
f"Starting AC-island-aware geographical partitioning: {self.algorithm}, "
f"n_clusters={n_clusters}, metric={self.distance_metric}, "
f"ac_islands={n_ac_islands}",
LogCategory.PARTITIONING,
)
else:
log_info(
f"Starting geographical partitioning: {self.algorithm}, "
f"n_clusters={n_clusters}, metric={self.distance_metric}",
LogCategory.PARTITIONING,
)
log_debug(
f"Extracted coordinates for {len(nodes)} nodes",
LogCategory.PARTITIONING,
)
# Perform clustering (pass ac_islands for AC-island-aware clustering)
labels = self._run_clustering(
coordinates, effective_config, ac_islands=ac_islands, **kwargs
)
# Create and validate partition
partition_map = create_partition_map(nodes, labels)
validate_partition(partition_map, len(nodes), self._get_strategy_name())
# Validate AC island consistency if AC-island awareness was applied
if ac_islands is not None:
self._validate_cluster_ac_island_consistency(graph, partition_map)
log_info(
f"Geographical partitioning complete: {len(partition_map)} clusters",
LogCategory.PARTITIONING,
)
return partition_map
except Exception as e:
if isinstance(e, PartitioningError):
raise
raise PartitioningError(
f"Geographical partitioning failed: {e}",
strategy=self._get_strategy_name(),
graph_info={
"nodes": len(list(graph.nodes())),
"edges": len(graph.edges()),
},
) from e
def _supports_ac_island_awareness(self, config: GeographicalConfig) -> bool:
"""
Check if current algorithm configuration supports AC-island awareness.
AC-island awareness requires precomputed distance matrices. Algorithms
that work directly on raw coordinates (kmeans, hierarchical with ward)
cannot support this feature.
Parameters
----------
config : GeographicalConfig
Current configuration.
Returns
-------
bool
True if AC-island awareness is supported, False otherwise.
"""
if self.algorithm in self.AC_ISLAND_AWARE_ALGORITHMS:
return True
# Hierarchical supports AC-island awareness with non-ward linkage
if self.algorithm == "hierarchical" and config.hierarchical_linkage != "ward":
return True
return False
def _extract_coordinates(self, graph: nx.Graph, nodes: list[Any]) -> np.ndarray:
"""
Extract coordinates from graph nodes.
Parameters
----------
graph : nx.Graph
NetworkX graph with lat, lon attributes on nodes.
nodes : list[Any]
List of node IDs.
Returns
-------
np.ndarray
Array of coordinates (n x 2).
Raises
------
PartitioningError
If any node is missing latitude or longitude.
"""
coordinates = []
for node in nodes:
node_data = graph.nodes[node]
lat = node_data.get("lat")
lon = node_data.get("lon")
if lat is None or lon is None:
raise PartitioningError(
f"Node {node} missing latitude or longitude",
strategy=self._get_strategy_name(),
graph_info={"nodes": len(nodes), "edges": len(graph.edges())},
)
coordinates.append([lat, lon])
return np.array(coordinates)
def _run_clustering(
self, coordinates: np.ndarray, config: GeographicalConfig, **kwargs
) -> np.ndarray:
"""
Dispatch to appropriate clustering algorithm.
Parameters
----------
coordinates : np.ndarray
Array of coordinates (n x 2).
config : GeographicalConfig
Configuration parameters.
**kwargs : dict
Additional clustering parameters.
Returns
-------
np.ndarray
Array of cluster labels.
"""
if self.algorithm == "kmeans":
return self._kmeans_clustering(coordinates, config, **kwargs)
elif self.algorithm == "kmedoids":
return self._kmedoids_clustering(coordinates, config, **kwargs)
elif self.algorithm == "dbscan":
return self._dbscan_clustering(coordinates, config, **kwargs)
elif self.algorithm == "hierarchical":
return self._hierarchical_clustering(coordinates, config, **kwargs)
elif self.algorithm == "hdbscan":
return self._hdbscan_clustering(coordinates, config, **kwargs)
else:
raise PartitioningError(
f"Unknown algorithm: {self.algorithm}",
strategy=self._get_strategy_name(),
)
def _kmeans_clustering(
self, coordinates: np.ndarray, config: GeographicalConfig, **kwargs
) -> np.ndarray:
"""
Perform K-means clustering on geographical coordinates.
Parameters
----------
coordinates : np.ndarray
Array of coordinates (n x 2).
config : GeographicalConfig
Configuration parameters.
**kwargs : dict
Must include 'n_clusters'.
Returns
-------
np.ndarray
Array of cluster labels.
Raises
------
PartitioningError
If distance metric is not euclidean or n_clusters is invalid.
"""
# K-means requires Euclidean distance
if self.distance_metric != "euclidean":
raise PartitioningError(
f"K-means does not support {self.distance_metric} distance. "
"Use 'kmedoids' algorithm for non-Euclidean metrics.",
strategy=self._get_strategy_name(),
)
n_clusters = kwargs.get("n_clusters")
if n_clusters is None or n_clusters <= 0:
raise PartitioningError(
"K-means requires a positive 'n_clusters' parameter.",
strategy=self._get_strategy_name(),
)
cartesian_coordinates = self._convert_spherical_to_cartesian(coordinates)
log_debug(f"Running K-means with {n_clusters} clusters", LogCategory.PARTITIONING)
return run_kmeans(
cartesian_coordinates, n_clusters, config.random_state, config.max_iter, config.n_init
)
def _kmedoids_clustering(
self, coordinates: np.ndarray, config: GeographicalConfig, **kwargs
) -> np.ndarray:
"""
Perform K-medoids clustering on geographical coordinates.
Parameters
----------
coordinates : np.ndarray
Array of coordinates (n x 2).
config : GeographicalConfig
Configuration parameters.
**kwargs : dict
Must include 'n_clusters'. May include 'ac_islands'.
Returns
-------
np.ndarray
Array of cluster labels.
Raises
------
PartitioningError
If n_clusters is invalid.
"""
n_clusters = kwargs.get("n_clusters")
if n_clusters is None or n_clusters <= 0:
raise PartitioningError(
"K-medoids requires a positive 'n_clusters' parameter.",
strategy=self._get_strategy_name(),
)
ac_islands = kwargs.get("ac_islands")
if ac_islands is not None:
log_debug(
f"Running AC-island-aware K-medoids with {n_clusters} clusters, "
f"metric={self.distance_metric}",
LogCategory.PARTITIONING,
)
distance_matrix = self._build_ac_island_aware_distance_matrix(
coordinates, ac_islands, config
)
else:
log_debug(
f"Running K-medoids with {n_clusters} clusters, metric={self.distance_metric}",
LogCategory.PARTITIONING,
)
distance_matrix = compute_geographical_distances(coordinates, self.distance_metric)
return run_kmedoids(distance_matrix, n_clusters)
def _dbscan_clustering(
self, coordinates: np.ndarray, config: GeographicalConfig, **kwargs
) -> np.ndarray:
"""
Perform DBSCAN clustering on geographical coordinates.
Parameters
----------
coordinates : np.ndarray
Array of coordinates (n x 2).
config : GeographicalConfig
Configuration parameters.
**kwargs : dict
Must include 'eps' and 'min_samples'. May include 'ac_islands'.
Returns
-------
np.ndarray
Array of cluster labels.
Raises
------
PartitioningError
If eps or min_samples is missing.
"""
eps = kwargs.get("eps")
min_samples = kwargs.get("min_samples")
if eps is None or min_samples is None:
raise PartitioningError(
"DBSCAN requires 'eps' and 'min_samples' parameters.",
strategy=self._get_strategy_name(),
)
ac_islands = kwargs.get("ac_islands")
if ac_islands is not None:
log_debug(
f"Running AC-island-aware DBSCAN with eps={eps}, min_samples={min_samples}",
LogCategory.PARTITIONING,
)
distance_matrix = self._build_ac_island_aware_distance_matrix(
coordinates, ac_islands, config
)
else:
log_debug(
f"Running DBSCAN with eps={eps}, min_samples={min_samples}",
LogCategory.PARTITIONING,
)
distance_matrix = compute_geographical_distances(coordinates, self.distance_metric)
return run_dbscan(distance_matrix, eps, min_samples)
def _hierarchical_clustering(
self, coordinates: np.ndarray, config: GeographicalConfig, **kwargs
) -> np.ndarray:
"""
Perform Hierarchical Clustering on geographical coordinates.
Parameters
----------
coordinates : np.ndarray
Array of coordinates (n x 2).
config : GeographicalConfig
Configuration parameters.
**kwargs : dict
Must include 'n_clusters'. May include 'ac_islands'.
Returns
-------
np.ndarray
Array of cluster labels.
Raises
------
PartitioningError
If n_clusters is invalid or ward linkage used with non-euclidean.
"""
n_clusters = kwargs.get("n_clusters")
if n_clusters is None or n_clusters <= 0:
raise PartitioningError(
"Hierarchical clustering requires a positive 'n_clusters' parameter.",
strategy=self._get_strategy_name(),
)
linkage = config.hierarchical_linkage
ac_islands = kwargs.get("ac_islands")
# Ward linkage only works with Euclidean distance on raw features
if linkage == "ward":
if self.distance_metric != "euclidean":
raise PartitioningError(
"Ward linkage for Hierarchical Clustering requires Euclidean distance.",
strategy=self._get_strategy_name(),
)
log_debug(
f"Running hierarchical clustering with {n_clusters} clusters, linkage={linkage}",
LogCategory.PARTITIONING,
)
# Use sklearn's AgglomerativeClustering directly with ward linkage
from sklearn.cluster import AgglomerativeClustering
clustering = AgglomerativeClustering(
n_clusters=n_clusters, metric="euclidean", linkage="ward"
)
return clustering.fit_predict(coordinates)
else:
# For other linkages, use precomputed distance matrix
if ac_islands is not None:
log_debug(
f"Running AC-island-aware hierarchical clustering with {n_clusters} clusters, "
f"linkage={linkage}",
LogCategory.PARTITIONING,
)
distance_matrix = self._build_ac_island_aware_distance_matrix(
coordinates, ac_islands, config
)
else:
log_debug(
f"Running hierarchical clustering with {n_clusters} clusters, linkage={linkage}",
LogCategory.PARTITIONING,
)
distance_matrix = compute_geographical_distances(coordinates, self.distance_metric)
return run_hierarchical(distance_matrix, n_clusters, linkage)
def _hdbscan_clustering(
self, coordinates: np.ndarray, config: GeographicalConfig, **kwargs
) -> np.ndarray:
"""
Perform HDBSCAN clustering on geographical coordinates.
Parameters
----------
coordinates : np.ndarray
Array of coordinates (n x 2).
config : GeographicalConfig
Configuration parameters.
**kwargs : dict
May include 'min_cluster_size' and 'ac_islands'.
Returns
-------
np.ndarray
Array of cluster labels.
"""
min_cluster_size = kwargs.get("min_cluster_size", 5)
ac_islands = kwargs.get("ac_islands")
if ac_islands is not None:
log_debug(
f"Running AC-island-aware HDBSCAN with min_cluster_size={min_cluster_size}",
LogCategory.PARTITIONING,
)
distance_matrix = self._build_ac_island_aware_distance_matrix(
coordinates, ac_islands, config
)
# Use HDBSCAN with precomputed distances
import hdbscan
clusterer = hdbscan.HDBSCAN(min_cluster_size=min_cluster_size, metric="precomputed")
return clusterer.fit_predict(distance_matrix)
else:
log_debug(
f"Running HDBSCAN with min_cluster_size={min_cluster_size}, "
f"metric={self.distance_metric}",
LogCategory.PARTITIONING,
)
coords_rad = np.radians(coordinates)
return run_hdbscan(coords_rad, min_cluster_size, self.distance_metric)
# =========================================================================
# AC-ISLAND AWARENESS METHODS
# =========================================================================
def _has_ac_island_data(self, graph: nx.Graph, nodes: list[Any]) -> bool:
"""
Check if the graph has AC island data on nodes.
AC island data is automatically detected by checking for the 'ac_island'
attribute on nodes, which is set by the VoltageAwareStrategy (va_loader)
when loading power system data with DC links.
Parameters
----------
graph : nx.Graph
NetworkX graph.
nodes : list[Any]
List of node IDs.
Returns
-------
bool
True if all nodes have the ac_island attribute, False otherwise.
"""
for node in nodes:
if self.ac_island_attr not in graph.nodes[node]:
return False
return True
def _extract_ac_islands(self, graph: nx.Graph, nodes: list[Any]) -> np.ndarray:
"""
Extract AC island IDs from graph nodes.
Parameters
----------
graph : nx.Graph
NetworkX graph.
nodes : list[Any]
List of node IDs.
Returns
-------
np.ndarray
Array of AC island IDs for each node.
Raises
------
PartitioningError
If any node is missing the ac_island attribute.
"""
ac_islands = []
missing_nodes = []
for node in nodes:
ac_island = graph.nodes[node].get("ac_island")
if ac_island is None:
missing_nodes.append(node)
ac_islands.append(ac_island)
if missing_nodes:
sample = missing_nodes[:5]
raise PartitioningError(
f"AC-island-aware partitioning requires 'ac_island' attribute "
f"on all nodes. {len(missing_nodes)} node(s) are missing this attribute "
f"(first few: {sample}). Use 'va_loader' to automatically detect AC islands.",
strategy=self._get_strategy_name(),
)
return np.array(ac_islands)
def _build_ac_island_aware_distance_matrix(
self,
coordinates: np.ndarray,
ac_islands: np.ndarray,
config: GeographicalConfig,
) -> np.ndarray:
"""
Build distance matrix with AC island awareness.
Nodes in the same AC island use geographical distance.
Nodes in different AC islands get infinite distance.
Parameters
----------
coordinates : np.ndarray
Array of [lat, lon] coordinates (n x 2).
ac_islands : np.ndarray
Array of AC island IDs (n).
config : GeographicalConfig
Configuration instance.
Returns
-------
np.ndarray
Distance matrix (n x n) where:
- d[i,j] = geographical_distance if same AC island
- d[i,j] = infinite_distance if different AC islands
- d[i,i] = 0 (diagonal)
"""
# Calculate geographical distances
geo_distances = compute_geographical_distances(coordinates, self.distance_metric)
# same_island[i,j] = True if ac_islands[i] == ac_islands[j]
same_island_mask = ac_islands[:, np.newaxis] == ac_islands[np.newaxis, :]
# Build distance matrix: geo_distance if same island, infinite otherwise
distance_matrix = np.where(same_island_mask, geo_distances, config.infinite_distance)
# Ensure diagonal is zero (nodes have zero distance to themselves)
np.fill_diagonal(distance_matrix, 0.0)
# Log statistics
n_ac_islands = len(np.unique(ac_islands))
# Count compatible pairs (same island, excluding diagonal)
compatible_pairs = (np.sum(same_island_mask) - len(ac_islands)) // 2
log_debug(
f"Built AC-island-aware distance matrix: {n_ac_islands} AC island(s), "
f"{compatible_pairs} compatible pairs",
LogCategory.PARTITIONING,
)
return distance_matrix
@staticmethod
def _validate_cluster_ac_island_consistency(
graph: nx.Graph, partition_map: dict[int, list[Any]]
) -> None:
"""
Validate that clusters don't mix different AC islands.
With infinite distances between AC islands, clusters should never
contain nodes from multiple AC islands.
Parameters
----------
graph : nx.Graph
Original NetworkX graph.
partition_map : dict[int, list[Any]]
Resulting partition mapping.
"""
for cluster_id, nodes in partition_map.items():
ac_islands_in_cluster = set()
for node in nodes:
ac_island = graph.nodes[node].get("ac_island")
if ac_island is not None:
ac_islands_in_cluster.add(ac_island)
if len(ac_islands_in_cluster) > 1:
log_warning(
f"Cluster {cluster_id} contains nodes from multiple AC islands: "
f"{ac_islands_in_cluster}. This should not happen with infinite distances.",
LogCategory.PARTITIONING,
warn_user=False,
)
@staticmethod
def _convert_spherical_to_cartesian(coordinates: np.ndarray) -> np.ndarray:
"""
Convert spherical coordinates (lat, lon) to Cartesian (x, y, z).
This is used for K-means clustering which operates in Euclidean space.
Args:
coordinates: Array of [lat, lon] in degrees (n x 2)
Returns
-------
Array of [x, y, z] Cartesian coordinates (n x 3)
"""
coords = np.asarray(coordinates)
if coords.ndim != 2 or coords.shape[1] != 2:
raise PartitioningError(
"Coordinates must be a (n, 2) array of [lat, lon] pairs.",
strategy="geographical_conversion",
)
lat_rad = np.radians(coords[:, 0])
lon_rad = np.radians(coords[:, 1])
cos_lat = np.cos(lat_rad)
x = cos_lat * np.cos(lon_rad)
y = cos_lat * np.sin(lon_rad)
z = np.sin(lat_rad)
return np.column_stack((x, y, z))