from collections import defaultdict
from dataclasses import dataclass
from typing import Any
import networkx as nx
import numpy as np
from scipy.linalg import LinAlgError, solve
from npap.exceptions import PartitioningError, ValidationError
from npap.interfaces import EdgeType, PartitioningStrategy
from npap.logging import LogCategory, log_debug, log_info, log_warning
from npap.utils import (
create_partition_map,
run_kmeans,
run_kmedoids,
validate_partition,
with_runtime_config,
)
[docs]
@dataclass
class ElectricalDistanceConfig:
"""
Configuration parameters for electrical distance calculations.
Centralizes all magic numbers and tolerances used in the electrical
distance partitioning algorithm for better maintainability and tuning.
Attributes
----------
zero_reactance_replacement : float
Reactance value used when edge reactance is zero.
regularization_factor : float
Small value added to B matrix diagonal for numerical stability.
Set to 0.0 to disable regularization. Default 1e-10 provides
mild regularization that prevents singular matrix issues.
infinite_distance : float
Value used to represent "infinite" distance between AC islands.
"""
zero_reactance_replacement: float = 1e-5
regularization_factor: float = 1e-10
infinite_distance: float = 1e4
[docs]
class ElectricalDistancePartitioning(PartitioningStrategy):
"""
Partition nodes based on electrical distance using PTDF in power networks.
This strategy uses the Power Transfer Distribution Factor (PTDF) approach
to calculate electrical distances between nodes. The electrical distance
is derived from the Euclidean distance between PTDF column vectors, which
represents how similarly power injections at different nodes affect line flows.
Mathematical basis:
- Build incidence matrix K from directed network topology
- Remove slack bus column to get K_sba (slack-bus-adjusted)
- Calculate susceptance matrix B = K_sba^T · diag{b} · K_sba
- Compute PTDF = diag{b} · K_sba · B^(-1)
- Electrical distance: d_ij = ||PTDF[:,i] - PTDF[:,j]||_2
Multi AC-Island Support:
Networks with multiple AC islands (AC zones connected via HVDC) are handled
by computing PTDF matrices independently for each island:
1. Group nodes by AC island
2. Select/detect slack bus per island
3. Extract AC-only subgraph per island (lines + transformers, no DC links)
4. Compute PTDF and distances per island
5. Combine into block-diagonal distance matrix with infinite inter-island distances
This is physically correct because PTDF describes AC power flow behavior,
and DC links decouple the AC dynamics between islands.
Configuration can be provided at:
- Instantiation time (via `config` parameter in __init__)
- Partition time (via `config` or individual parameters in partition())
Partition-time parameters override instance defaults for that call only.
"""
SUPPORTED_ALGORITHMS = ["kmeans", "kmedoids"]
# Edge types that participate in AC power flow (have reactance)
AC_EDGE_TYPES = {EdgeType.LINE.value, EdgeType.TRAFO.value}
# Config parameter names for runtime override detection
_CONFIG_PARAMS = {
"zero_reactance_replacement",
"regularization_factor",
"infinite_distance",
}
[docs]
def __init__(
self,
algorithm: str = "kmeans",
slack_bus: Any | None = None,
ac_island_attr: str = "ac_island",
config: ElectricalDistanceConfig | None = None,
):
"""
Initialize electrical distance partitioning strategy.
Parameters
----------
algorithm : str, default='kmeans'
Clustering algorithm ('kmeans', 'kmedoids').
slack_bus : Any, optional
Specific node to use as slack bus (applied to its island),
or None for auto-selection per island.
ac_island_attr : str, default='ac_island'
Node attribute name containing AC island ID.
config : ElectricalDistanceConfig, optional
Configuration parameters for distance calculations.
Raises
------
ValueError
If unsupported algorithm is specified.
"""
self.algorithm = algorithm
self.slack_bus = slack_bus
self.ac_island_attr = ac_island_attr
self.config = config or ElectricalDistanceConfig()
if algorithm not in self.SUPPORTED_ALGORITHMS:
raise ValueError(
f"Unsupported algorithm: {algorithm}. "
f"Supported: {', '.join(self.SUPPORTED_ALGORITHMS)}"
)
log_debug(
f"Initialized ElectricalDistancePartitioning: algorithm={algorithm}, "
f"ac_island_attr={ac_island_attr}",
LogCategory.PARTITIONING,
)
@property
def required_attributes(self) -> dict[str, list[str]]:
"""Required attributes for electrical distance partitioning."""
return {
"nodes": [], # ac_island is validated separately with helpful message
"edges": ["x"], # Reactance attribute required on AC edges
}
def _get_strategy_name(self) -> str:
"""Get descriptive strategy name for error messages."""
return f"electrical_{self.algorithm}"
[docs]
@with_runtime_config(ElectricalDistanceConfig, _CONFIG_PARAMS)
def partition(self, graph: nx.DiGraph, **kwargs) -> dict[int, list[Any]]:
"""
Partition nodes based on electrical distance using PTDF.
Parameters
----------
graph : nx.DiGraph
NetworkX DiGraph with reactance data on AC edges and ac_island on nodes.
**kwargs : dict
Additional parameters:
- n_clusters : Number of clusters (required)
- random_state : Random seed for reproducibility
- max_iter : Maximum iterations for clustering
- config : ElectricalDistanceConfig instance to override instance config
- slack_bus : Override the slack bus for this partition call
- zero_reactance_replacement : Override config parameter
- regularization_factor : Override config parameter
- infinite_distance : Override config parameter
Returns
-------
dict[int, list[Any]]
Dictionary mapping cluster_id -> list of node_ids.
Raises
------
PartitioningError
If partitioning fails.
ValidationError
If ac_island attribute is missing.
"""
try:
# Get effective config (injected by decorator)
effective_config = kwargs.get("_effective_config", self.config)
# Resolve slack bus (kwargs override instance default)
effective_slack = kwargs.get("slack_bus", self.slack_bus)
# Validate AC island attributes
self._validate_ac_island_attributes(graph)
# Validate AC edges have reactance
self._validate_ac_edge_attributes(graph)
n_clusters = kwargs.get("n_clusters")
log_info(
f"Starting electrical distance partitioning (PTDF): {self.algorithm}, "
f"n_clusters={n_clusters}",
LogCategory.PARTITIONING,
)
if n_clusters is None or n_clusters <= 0:
raise PartitioningError(
"Electrical distance partitioning requires a positive 'n_clusters' parameter.",
strategy=self._get_strategy_name(),
)
nodes = list(graph.nodes())
n_nodes = len(nodes)
if n_clusters > n_nodes:
raise PartitioningError(
f"Cannot create {n_clusters} clusters from {n_nodes} nodes.",
strategy=self._get_strategy_name(),
)
# Calculate electrical distance matrix using per-island PTDF
log_debug(
"Computing per-island PTDF-based electrical distance matrix",
LogCategory.PARTITIONING,
)
distance_matrix = self._calculate_electrical_distance_matrix(
graph, nodes, effective_config, effective_slack
)
# Perform clustering
labels = self._run_clustering(distance_matrix, **kwargs)
# Create and validate partition
partition_map = create_partition_map(nodes, labels)
validate_partition(partition_map, n_nodes, self._get_strategy_name())
# Validate AC island consistency
self._validate_cluster_ac_island_consistency(graph, partition_map)
log_info(
f"Electrical partitioning complete: {len(partition_map)} clusters",
LogCategory.PARTITIONING,
)
return partition_map
except Exception as e:
if isinstance(e, (PartitioningError, ValidationError)):
raise
raise PartitioningError(
f"Electrical distance partitioning failed: {e}",
strategy=self._get_strategy_name(),
graph_info={
"nodes": len(list(graph.nodes())),
"edges": len(graph.edges()),
},
) from e
def _run_clustering(self, distance_matrix: np.ndarray, **kwargs) -> np.ndarray:
"""
Dispatch to appropriate clustering algorithm.
Parameters
----------
distance_matrix : np.ndarray
Precomputed distance matrix (n_nodes x n_nodes).
**kwargs : dict
Additional parameters including n_clusters, random_state, max_iter.
Returns
-------
np.ndarray
Array of cluster labels.
"""
n_clusters = kwargs.get("n_clusters")
random_state = kwargs.get("random_state", 42)
max_iter = kwargs.get("max_iter", 300)
if self.algorithm == "kmeans":
log_debug("Running K-means on distance matrix", LogCategory.PARTITIONING)
return run_kmeans(distance_matrix, n_clusters, random_state, max_iter)
elif self.algorithm == "kmedoids":
log_debug("Running K-medoids on distance matrix", LogCategory.PARTITIONING)
return run_kmedoids(distance_matrix, n_clusters)
else:
raise PartitioningError(
f"Unknown algorithm: {self.algorithm}",
strategy=self._get_strategy_name(),
)
# =========================================================================
# VALIDATION METHODS
# =========================================================================
def _validate_ac_island_attributes(self, graph: nx.DiGraph) -> None:
"""
Validate that all nodes have the AC island attribute.
Provides a helpful error message directing users to use va_loader
or manually add the ac_island attribute.
Parameters
----------
graph : nx.DiGraph
NetworkX DiGraph to validate.
Raises
------
ValidationError
If any node is missing the ac_island attribute.
"""
missing_nodes = []
for node in graph.nodes():
if self.ac_island_attr not in graph.nodes[node]:
missing_nodes.append(node)
if missing_nodes:
sample = missing_nodes[:5]
raise ValidationError(
f"Electrical distance partitioning requires '{self.ac_island_attr}' attribute "
f"on all nodes for AC island isolation. "
f"{len(missing_nodes)} node(s) are missing this attribute (first few: {sample}). "
f"Please use 'va_loader' data loading strategy to automatically detect AC islands, "
f"or manually add the '{self.ac_island_attr}' attribute to all nodes.",
missing_attributes={"nodes": [self.ac_island_attr]},
strategy=self._get_strategy_name(),
)
def _validate_ac_edge_attributes(self, graph: nx.DiGraph) -> None:
"""
Validate that AC edges (lines and transformers) have reactance attribute.
DC links are excluded from this validation as they don't participate
in AC power flow and don't require reactance.
Parameters
----------
graph : nx.DiGraph
NetworkX DiGraph to validate.
Raises
------
ValidationError
If any AC edge is missing the 'x' attribute.
"""
missing_edges = []
for u, v, data in graph.edges(data=True):
edge_type = data.get("type", EdgeType.LINE.value)
# Only validate AC edges
if edge_type in self.AC_EDGE_TYPES:
if "x" not in data:
missing_edges.append((u, v))
if missing_edges:
sample = missing_edges[:5]
raise ValidationError(
f"AC edges (lines/transformers) require 'x' (reactance) attribute. "
f"{len(missing_edges)} edge(s) are missing this attribute (first few: {sample}).",
missing_attributes={"edges": ["x"]},
strategy=self._get_strategy_name(),
)
def _validate_island_connectivity(
self, ac_subgraph: nx.DiGraph, island_id: Any, island_nodes: list[Any]
) -> None:
"""
Validate that an island's AC subgraph is connected.
Parameters
----------
ac_subgraph : nx.DiGraph
AC-only subgraph for the island.
island_id : Any
AC island identifier.
island_nodes : list[Any]
Nodes in this island.
Raises
------
PartitioningError
If AC subgraph is not weakly connected.
"""
if len(island_nodes) == 1:
# Single node is trivially connected
return
if ac_subgraph.number_of_edges() == 0:
raise PartitioningError(
f"AC island {island_id} has no AC edges (lines/transformers). "
f"Cannot compute electrical distances without AC connectivity.",
strategy=self._get_strategy_name(),
graph_info={"island_id": island_id, "n_nodes": len(island_nodes)},
)
if not nx.is_weakly_connected(ac_subgraph):
n_components = nx.number_weakly_connected_components(ac_subgraph)
raise PartitioningError(
f"AC island {island_id} is not AC-connected. Found {n_components} "
f"disconnected AC components within the island. This may indicate "
f"missing line/transformer data or incorrect AC island assignment.",
strategy=self._get_strategy_name(),
graph_info={
"island_id": island_id,
"n_nodes": len(island_nodes),
"n_ac_edges": ac_subgraph.number_of_edges(),
"n_components": n_components,
},
)
def _validate_cluster_ac_island_consistency(
self, graph: nx.DiGraph, partition_map: dict[int, list[Any]]
) -> None:
"""
Validate that clusters don't mix different AC islands.
With infinite distances between AC islands, clusters should never mix
nodes from different AC islands.
Parameters
----------
graph : nx.DiGraph
Original NetworkX graph.
partition_map : dict[int, list[Any]]
Resulting partition mapping.
"""
for cluster_id, nodes in partition_map.items():
ac_islands_in_cluster = set()
for node in nodes:
ac_island = graph.nodes[node].get(self.ac_island_attr)
if ac_island is not None:
ac_islands_in_cluster.add(ac_island)
if len(ac_islands_in_cluster) > 1:
log_warning(
f"Cluster {cluster_id} contains nodes from multiple AC islands: "
f"{ac_islands_in_cluster}. This should not happen with infinite distances.",
LogCategory.PARTITIONING,
warn_user=False,
)
# =========================================================================
# MULTI AC-ISLAND ELECTRICAL DISTANCE CALCULATION
# =========================================================================
def _calculate_electrical_distance_matrix(
self,
graph: nx.DiGraph,
nodes: list[Any],
config: ElectricalDistanceConfig,
slack_bus: Any | None,
) -> np.ndarray:
"""
Calculate electrical distance matrix with per-island PTDF computation.
For networks with multiple AC islands:
1. Group nodes by AC island
2. For each island: extract AC subgraph, select slack, compute PTDF distances
3. Combine into block-diagonal matrix with infinite inter-island distances
Parameters
----------
graph : nx.DiGraph
NetworkX DiGraph with reactance on AC edges.
nodes : list[Any]
Ordered list of all nodes.
config : ElectricalDistanceConfig
Configuration instance.
slack_bus : Any, optional
User-specified slack bus.
Returns
-------
np.ndarray
Distance matrix (n_nodes × n_nodes) with infinite distances between islands.
Raises
------
PartitioningError
If distance matrix calculation fails.
"""
# Group nodes by AC island
islands = self._group_nodes_by_ac_island(graph, nodes)
n_islands = len(islands)
log_info(
f"Processing {n_islands} AC island(s) for PTDF computation",
LogCategory.PARTITIONING,
)
# Build node index mapping for the full matrix
node_to_idx = {node: idx for idx, node in enumerate(nodes)}
n_nodes = len(nodes)
# Initialize full distance matrix with infinite distances
distance_matrix = np.full((n_nodes, n_nodes), config.infinite_distance)
np.fill_diagonal(distance_matrix, 0.0)
# Process each island independently
for island_id, island_nodes in islands.items():
log_debug(
f"Processing AC island {island_id}: {len(island_nodes)} nodes",
LogCategory.PARTITIONING,
)
# Handle single-node islands
if len(island_nodes) == 1:
log_debug(
f"Island {island_id} has single node, distance = 0",
LogCategory.PARTITIONING,
)
continue
# Extract AC-only subgraph for this island
ac_subgraph = self._extract_ac_subgraph(graph, island_nodes)
# Validate island connectivity
self._validate_island_connectivity(ac_subgraph, island_id, island_nodes)
# Select slack bus for this island
island_slack = self._select_slack_bus_for_island(
ac_subgraph, island_nodes, slack_bus, island_id
)
# Compute PTDF-based distances for this island
island_distances = self._compute_island_distances(
ac_subgraph, island_nodes, island_slack, config
)
# Insert island distances into full matrix
self._insert_island_distances(
distance_matrix, island_distances, island_nodes, node_to_idx
)
return distance_matrix
def _group_nodes_by_ac_island(
self, graph: nx.DiGraph, nodes: list[Any]
) -> dict[Any, list[Any]]:
"""
Group nodes by their AC island attribute.
Parameters
----------
graph : nx.DiGraph
NetworkX DiGraph with ac_island attribute on nodes.
nodes : list[Any]
List of nodes to group.
Returns
-------
dict[Any, list[Any]]
Dictionary mapping island_id -> list of nodes in that island.
"""
islands: dict[Any, list[Any]] = defaultdict(list)
for node in nodes:
island_id = graph.nodes[node].get(self.ac_island_attr)
islands[island_id].append(node)
return dict(islands)
def _extract_ac_subgraph(self, graph: nx.DiGraph, island_nodes: list[Any]) -> nx.DiGraph:
"""
Extract AC-only subgraph for a set of nodes.
Includes only lines and transformers, excluding DC links.
Parameters
----------
graph : nx.DiGraph
Full NetworkX DiGraph.
island_nodes : list[Any]
Nodes to include in subgraph.
Returns
-------
nx.DiGraph
Subgraph containing only AC edges between island nodes.
"""
island_node_set = set(island_nodes)
ac_subgraph = nx.DiGraph()
# Add nodes with attributes
for node in island_nodes:
ac_subgraph.add_node(node, **graph.nodes[node])
# Add only AC edges (lines and transformers)
for u, v, data in graph.edges(data=True):
if u in island_node_set and v in island_node_set:
edge_type = data.get("type", EdgeType.LINE.value)
if edge_type in self.AC_EDGE_TYPES:
ac_subgraph.add_edge(u, v, **data)
return ac_subgraph
@staticmethod
def _select_slack_bus_for_island(
ac_subgraph: nx.DiGraph,
island_nodes: list[Any],
user_slack: Any | None,
island_id: Any,
) -> Any:
"""
Select slack bus for a specific island.
If user specified a slack bus that's in this island, use it.
Otherwise, select the node with highest total degree in the AC subgraph.
Parameters
----------
ac_subgraph : nx.DiGraph
AC-only subgraph for this island.
island_nodes : list[Any]
Nodes in this island.
user_slack : Any, optional
User-specified slack bus (may be None or in different island).
island_id : Any
AC island identifier for logging.
Returns
-------
Any
Selected slack bus node for this island.
"""
island_node_set = set(island_nodes)
# Check if user-specified slack is in this island
if user_slack is not None and user_slack in island_node_set:
log_debug(
f"Using user-specified slack bus {user_slack} for island {island_id}",
LogCategory.PARTITIONING,
)
return user_slack
# Auto-select: use node with highest total degree in AC subgraph
degrees = {n: ac_subgraph.in_degree(n) + ac_subgraph.out_degree(n) for n in island_nodes}
selected = max(island_nodes, key=lambda n: degrees[n])
log_debug(
f"Auto-selected slack bus {selected} for island {island_id} (degree={degrees[selected]})",
LogCategory.PARTITIONING,
)
return selected
def _compute_island_distances(
self,
ac_subgraph: nx.DiGraph,
island_nodes: list[Any],
slack_bus: Any,
config: ElectricalDistanceConfig,
) -> np.ndarray:
"""
Compute PTDF-based electrical distances for a single island.
Parameters
----------
ac_subgraph : nx.DiGraph
AC-only subgraph for this island.
island_nodes : list[Any]
Ordered list of nodes in this island.
slack_bus : Any
Slack bus for this island.
config : ElectricalDistanceConfig
Configuration instance.
Returns
-------
np.ndarray
Distance matrix for this island (n_island × n_island).
"""
# Build PTDF matrix for this island
ptdf_matrix, active_nodes = self._build_ptdf_matrix(
ac_subgraph, island_nodes, slack_bus, config
)
log_debug(
f"Built island PTDF matrix: shape {ptdf_matrix.shape}",
LogCategory.PARTITIONING,
)
# Compute distances from PTDF columns
distance_matrix_active = self._compute_ptdf_distances(ptdf_matrix)
# Integrate slack bus into island distance matrix
distance_matrix_island = self._integrate_slack_bus_distances(
distance_matrix_active, ptdf_matrix, island_nodes, slack_bus, active_nodes
)
return distance_matrix_island
@staticmethod
def _insert_island_distances(
full_matrix: np.ndarray,
island_distances: np.ndarray,
island_nodes: list[Any],
node_to_idx: dict[Any, int],
) -> None:
"""
Insert island distance matrix into the full distance matrix.
Parameters
----------
full_matrix : np.ndarray
Full distance matrix (modified in place).
island_distances : np.ndarray
Distance matrix for this island.
island_nodes : list[Any]
Ordered list of nodes in this island.
node_to_idx : dict[Any, int]
Mapping from node to index in full matrix.
"""
# Build index array for island nodes in full matrix
island_indices = np.array([node_to_idx[node] for node in island_nodes])
# Use np.ix_ for efficient block assignment
full_matrix[np.ix_(island_indices, island_indices)] = island_distances
# =========================================================================
# PTDF MATRIX CONSTRUCTION
# =========================================================================
def _build_ptdf_matrix(
self,
ac_subgraph: nx.DiGraph,
island_nodes: list[Any],
slack_bus: Any,
config: ElectricalDistanceConfig,
) -> tuple[np.ndarray, list[Any]]:
"""
Build the Power Transfer Distribution Factor (PTDF) matrix for a dc-island.
PTDF = diag{b} · K_sba · (K_sba^T · diag{b} · K_sba)^(-1)
Where:
- K_sba is the slack-bus-adjusted incidence matrix
- b is the vector of susceptances (1/reactance)
- The resulting PTDF has shape (n_edges × n_active_nodes)
Instead of computing B^(-1) explicitly, we solve the linear system
directly which is significantly faster for large networks.
Parameters
----------
ac_subgraph : nx.DiGraph
AC-only DiGraph for this island.
island_nodes : list[Any]
Ordered list of nodes in this island.
slack_bus : Any
Slack bus node for this island.
config : ElectricalDistanceConfig
Configuration instance.
Returns
-------
tuple[np.ndarray, list[Any]]
Tuple of (PTDF matrix [n_edges × n_active], list of active nodes).
Raises
------
PartitioningError
If matrix construction fails.
"""
try:
# Extract edges and susceptances (AC edges only)
edges, susceptances = self._extract_edge_susceptances(ac_subgraph, config)
if len(edges) == 0:
raise PartitioningError(
"No valid AC edges found for PTDF matrix construction.",
strategy=self._get_strategy_name(),
)
# Build slack-bus-adjusted incidence matrix
K_sba, active_nodes = self._build_incidence_matrix(edges, island_nodes, slack_bus)
log_debug(
f"Built incidence matrix K_sba: shape {K_sba.shape}",
LogCategory.PARTITIONING,
)
# Build susceptance matrix B = K_sba^T @ diag(b) @ K_sba
B_matrix = self._compute_B_matrix(K_sba, susceptances)
log_debug(f"Built B matrix: shape {B_matrix.shape}", LogCategory.PARTITIONING)
# Compute PTDF using direct linear solve
ptdf_matrix = self._compute_ptdf(K_sba, susceptances, B_matrix, config)
return ptdf_matrix, active_nodes
except Exception as e:
if isinstance(e, PartitioningError):
raise
raise PartitioningError(
f"Failed to build PTDF matrix: {e}", strategy=self._get_strategy_name()
) from e
def _extract_edge_susceptances(
self, ac_subgraph: nx.DiGraph, config: ElectricalDistanceConfig
) -> tuple[list[tuple[Any, Any]], np.ndarray]:
"""
Extract AC edges and their susceptances from the subgraph.
Only processes lines and transformers (AC edges).
Susceptance b = 1/x where x is the reactance.
Parameters
----------
ac_subgraph : nx.DiGraph
AC-only subgraph.
config : ElectricalDistanceConfig
Configuration instance.
Returns
-------
tuple[list[tuple[Any, Any]], np.ndarray]
Tuple of (list of (from, to) edges, array of susceptances).
Raises
------
PartitioningError
If reactance values are invalid.
"""
edges = []
susceptances = []
zero_reactance_count = 0
for u, v, data in ac_subgraph.edges(data=True):
reactance = data.get("x")
if reactance is None:
raise PartitioningError(
f"AC edge ({u}, {v}) missing reactance attribute 'x'",
strategy=self._get_strategy_name(),
)
if not isinstance(reactance, (int, float)):
raise PartitioningError(
f"Edge ({u}, {v}) reactance must be numeric, got {type(reactance)}",
strategy=self._get_strategy_name(),
)
if reactance == 0:
zero_reactance_count += 1
reactance = config.zero_reactance_replacement
edges.append((u, v))
susceptances.append(1.0 / reactance)
# Emit single warning if any zero reactance edges found
if zero_reactance_count > 0:
log_warning(
f"{zero_reactance_count} edge(s) have zero reactance. "
f"Using replacement value: {config.zero_reactance_replacement}",
LogCategory.PARTITIONING,
)
return edges, np.array(susceptances)
@staticmethod
def _build_incidence_matrix(
edges: list[tuple[Any, Any]], nodes: list[Any], slack_bus: Any
) -> tuple[np.ndarray, list[Any]]:
"""
Build slack-bus-adjusted incidence matrix K_sba.
For each directed edge (u → v):
- K[edge_idx, u] = -1 (edge leaves u)
- K[edge_idx, v] = +1 (edge enters v)
The slack bus column is removed to make B invertible.
Parameters
----------
edges : list[tuple[Any, Any]]
List of (from_node, to_node) tuples.
nodes : list[Any]
Ordered list of all nodes in island.
slack_bus : Any
Node to exclude (slack bus).
Returns
-------
tuple[np.ndarray, list[Any]]
Tuple of (K_sba matrix [n_edges × n_active], list of active nodes).
"""
# Active nodes (without slack bus)
active_nodes = [n for n in nodes if n != slack_bus]
n_active = len(active_nodes)
n_edges = len(edges)
# Create node index mapping for active nodes
node_to_idx = {node: idx for idx, node in enumerate(active_nodes)}
# Build incidence matrix using vectorized assignment
K_sba = np.zeros((n_edges, n_active))
# Pre-compute indices for all edges (-1 means node not in active set)
u_indices = np.array([node_to_idx.get(u, -1) for u, v in edges])
v_indices = np.array([node_to_idx.get(v, -1) for u, v in edges])
edge_indices = np.arange(n_edges)
# Vectorized assignment for valid source nodes
valid_u = u_indices >= 0
K_sba[edge_indices[valid_u], u_indices[valid_u]] = -1.0
# Vectorized assignment for valid target nodes
valid_v = v_indices >= 0
K_sba[edge_indices[valid_v], v_indices[valid_v]] = 1.0
return K_sba, active_nodes
@staticmethod
def _compute_B_matrix(K_sba: np.ndarray, susceptances: np.ndarray) -> np.ndarray:
"""
Compute susceptance matrix B = K_sba^T @ diag(b) @ K_sba.
Uses efficient broadcasting: B = (K^T * b) @ K, avoiding explicit
diagonal matrix construction.
Parameters
----------
K_sba : np.ndarray
Slack-bus-adjusted incidence matrix (n_edges × n_active).
susceptances : np.ndarray
Array of susceptance values (n_edges,).
Returns
-------
np.ndarray
Symmetric susceptance matrix (n_active × n_active).
"""
# Efficient: (K.T * b) @ K where b is broadcast along columns
# K.T shape: (n_active, n_edges), susceptances shape: (n_edges,)
K_scaled = K_sba.T * susceptances
B_matrix = K_scaled @ K_sba
# Ensure symmetry (handles floating point errors)
return (B_matrix + B_matrix.T) / 2.0
@staticmethod
def _compute_ptdf(
K_sba: np.ndarray,
susceptances: np.ndarray,
B_matrix: np.ndarray,
config: ElectricalDistanceConfig,
) -> np.ndarray:
"""
Compute PTDF matrix by solving linear system directly.
To avoid computing B^(-1) explicitly, we solve:
B @ X = A^T where A = diag(b) @ K_sba
Then PTDF = X^T
This is mathematically equivalent but 3-5x faster for large matrices.
Parameters
----------
K_sba : np.ndarray
Slack-bus-adjusted incidence matrix (n_edges × n_active).
susceptances : np.ndarray
Array of susceptance values (n_edges,).
B_matrix : np.ndarray
Susceptance matrix (n_active × n_active).
config : ElectricalDistanceConfig
Configuration instance.
Returns
-------
np.ndarray
PTDF matrix (n_edges × n_active).
"""
# Apply Tikhonov regularization for numerical stability
if config.regularization_factor > 0:
B_matrix = B_matrix + config.regularization_factor * np.eye(B_matrix.shape[0])
# A = diag(b) @ K_sba, shape (n_edges, n_active)
A = susceptances[:, np.newaxis] * K_sba
try:
# Solve B @ X = A^T for X, then PTDF = X^T
# Using assume_a='sym' tells scipy B is symmetric -> uses faster algorithm
X = solve(B_matrix, A.T, assume_a="sym")
ptdf_matrix = X.T
except LinAlgError:
# Fallback to least-squares solution if matrix is singular
log_warning(
"B matrix is singular, using least-squares solution.",
LogCategory.PARTITIONING,
)
X, _, _, _ = np.linalg.lstsq(B_matrix, A.T, rcond=None)
ptdf_matrix = X.T
return ptdf_matrix
# =========================================================================
# DISTANCE CALCULATION METHODS
# =========================================================================
def _compute_ptdf_distances(self, ptdf_matrix: np.ndarray) -> np.ndarray:
"""
Compute electrical distances using Euclidean distance between PTDF columns.
Electrical distance: d_ij = ||PTDF[:,i] - PTDF[:,j]||_2
Uses the identity ||a-b||² = ||a||² + ||b||² - 2⟨a,b⟩ to compute all
pairwise distances via a single matrix multiplication (Gram matrix),
which is highly optimized by BLAS libraries.
Parameters
----------
ptdf_matrix : np.ndarray
PTDF matrix (n_edges × n_active).
Returns
-------
np.ndarray
Distance matrix for active nodes (n_active × n_active).
Raises
------
PartitioningError
If distance calculation produces invalid values.
"""
# Use float32 for faster computation
X = ptdf_matrix.T.astype(np.float32, copy=False)
# Compute squared norms for each node's PTDF profile
norms_sq = np.einsum("ij,ij->i", X, X)
# Compute Gram matrix (all pairwise dot products) - BLAS optimized
gram = X @ X.T
# ||a - b||² = ||a||² + ||b||² - 2⟨a,b⟩
dist_sq = norms_sq[:, np.newaxis] + norms_sq
dist_sq -= 2.0 * gram # In-place to save memory
# Handle numerical errors (small negative values from floating point)
np.maximum(dist_sq, 0.0, out=dist_sq)
# Take square root to get actual distances
distance_matrix = np.sqrt(dist_sq, out=dist_sq) # In-place
# Ensure diagonal is exactly zero
np.fill_diagonal(distance_matrix, 0.0)
if np.any(np.isnan(distance_matrix)):
raise PartitioningError(
"Distance matrix contains NaN values after PTDF distance calculation.",
strategy=self._get_strategy_name(),
)
return distance_matrix.astype(np.float64)
def _integrate_slack_bus_distances(
self,
distance_matrix_active: np.ndarray,
ptdf_matrix: np.ndarray,
island_nodes: list[Any],
slack_bus: Any,
active_nodes: list[Any],
) -> np.ndarray:
"""
Integrate slack bus into the island's distance matrix.
The slack bus has an implicit PTDF column of zeros (reference bus).
Distance from slack to node i = ||PTDF[:,i] - 0||_2 = ||PTDF[:,i]||_2
Parameters
----------
distance_matrix_active : np.ndarray
Distance matrix for active nodes (n_active × n_active).
ptdf_matrix : np.ndarray
PTDF matrix (n_edges × n_active).
island_nodes : list[Any]
Complete list of nodes in this island.
slack_bus : Any
Slack bus node for this island.
active_nodes : list[Any]
List of active nodes (excluding slack bus).
Returns
-------
np.ndarray
Full island distance matrix including slack bus (n_island × n_island).
"""
n_island = len(island_nodes)
# Create full island matrix
distance_matrix_island = np.zeros((n_island, n_island))
# Get slack index in island node list
slack_idx = island_nodes.index(slack_bus)
# Map active indices to island indices
active_to_island = np.array([island_nodes.index(n) for n in active_nodes])
# Copy active distances to island matrix using fancy indexing
distance_matrix_island[np.ix_(active_to_island, active_to_island)] = distance_matrix_active
# Calculate slack bus distances (vectorized)
# Distance from slack to node i = ||PTDF[:,i]||_2 (L2 norm of each column)
ptdf_column_norms = np.linalg.norm(ptdf_matrix, axis=0)
# Assign slack distances using vectorized indexing
distance_matrix_island[slack_idx, active_to_island] = ptdf_column_norms
distance_matrix_island[active_to_island, slack_idx] = ptdf_column_norms
# Validate island matrix
if np.any(np.isnan(distance_matrix_island)):
raise PartitioningError(
"Distance matrix contains NaN values after slack bus integration.",
strategy=self._get_strategy_name(),
)
return distance_matrix_island