Source code for npap.partitioning.va_electrical

from collections import defaultdict
from dataclasses import dataclass
from typing import Any

import networkx as nx
import numpy as np
from scipy.linalg import LinAlgError, solve

from npap.exceptions import PartitioningError, ValidationError
from npap.interfaces import EdgeType, PartitioningStrategy
from npap.logging import LogCategory, log_debug, log_info, log_warning
from npap.utils import (
    create_partition_map,
    run_hierarchical,
    run_kmedoids,
    validate_partition,
    with_runtime_config,
)


[docs] @dataclass class VAElectricalConfig: """ Configuration for voltage-aware electrical distance partitioning. Attributes ---------- zero_reactance_replacement : float Reactance value used when edge reactance is zero. Default is 1e-5. regularization_factor : float Small value added to B matrix diagonal for numerical stability. Set to 0.0 to disable regularization. Default 1e-10 provides mild regularization that prevents singular matrix issues. infinite_distance : float Value used to represent "infinite" distance between nodes in different voltage levels, AC islands, or disconnected components. Using a large finite value instead of np.inf to avoid numerical issues in clustering. voltage_tolerance : float Tolerance for voltage comparison (kV). Nodes with voltages within this tolerance are considered the same voltage level. Default is 1.0 kV. hierarchical_linkage : str Linkage criterion for hierarchical clustering. Options: 'complete', 'average', 'single'. Default is 'complete'. """ zero_reactance_replacement: float = 1e-5 regularization_factor: float = 1e-10 infinite_distance: float = 1e4 voltage_tolerance: float = 1.0 hierarchical_linkage: str = "complete"
[docs] class VAElectricalDistancePartitioning(PartitioningStrategy): """ Voltage-Aware Electrical Distance Partitioning. This strategy partitions nodes based on electrical distance (PTDF) computed over the complete AC network (lines AND transformers), while enforcing voltage level boundaries as hard clustering constraints via post-processing. Notes ----- **Physical Rationale** In power systems, PTDF (Power Transfer Distribution Factor) describes how power injections affect line flows within an AC network. This strategy: 1. Computes PTDF for the complete AC network (lines + transformers) per AC island, capturing full electrical coupling including through transformers. 2. Post-processes distances to enforce voltage level separation: nodes at different voltage levels receive infinite distance, ensuring they never cluster together. 3. Transformer edges and buses at different voltage levels are set to infinite distance, preserving voltage hierarchy in the partitioning. This approach is physically meaningful because: - The full PTDF captures how power flows through the entire AC network - Voltage level boundaries are then enforced as hard clustering constraints - Transformers become inter-cluster edges after aggregation **Constraint Hierarchy** 1. AC Islands: Nodes in different AC islands have infinite distance (computed via separate PTDF matrices). 2. Voltage Levels: Nodes at different voltage levels have infinite distance (enforced via post-processing). **Algorithm** 1. Group nodes by AC island 2. For each AC island: a. Extract full AC subgraph (lines + transformers, exclude DC links) b. Select ONE slack bus for the entire AC island c. Compute PTDF including ALL AC elements d. Calculate electrical distances from PTDF columns 3. Combine into full distance matrix with infinite inter-island distances 4. Post-process: Set infinite distance for node pairs at different voltage levels 5. Run clustering algorithm on the distance matrix **Supported Algorithms** - ``kmedoids``: K-Medoids clustering (works with precomputed distance matrices) - ``hierarchical``: Agglomerative clustering with precomputed distances See Also -------- ElectricalDistancePartitioning : Standard electrical partitioning (AC-island aware) VAGeographicalPartitioning : Voltage-aware geographical partitioning """ SUPPORTED_ALGORITHMS = ["kmedoids", "hierarchical"] SUPPORTED_LINKAGES = ["complete", "average", "single"] # Edge types that participate in AC power flow (have reactance) AC_EDGE_TYPES = {EdgeType.LINE.value, EdgeType.TRAFO.value} # Config parameter names for runtime override detection _CONFIG_PARAMS = { "zero_reactance_replacement", "regularization_factor", "infinite_distance", "voltage_tolerance", "hierarchical_linkage", }
[docs] def __init__( self, algorithm: str = "kmedoids", slack_bus: Any | None = None, voltage_attr: str = "voltage", ac_island_attr: str = "ac_island", config: VAElectricalConfig | None = None, ): """ Initialize voltage-aware electrical distance partitioning strategy. Parameters ---------- algorithm : str, default='kmedoids' Clustering algorithm ('kmedoids', 'hierarchical'). slack_bus : Any, optional Specific node to use as slack bus, or None for auto-selection. voltage_attr : str, default='voltage' Node attribute name containing voltage level. ac_island_attr : str, default='ac_island' Node attribute name containing AC island ID. config : VAElectricalConfig, optional Configuration parameters for distance calculations. Raises ------ ValueError If unsupported algorithm or linkage is specified. """ self.algorithm = algorithm self.slack_bus = slack_bus self.voltage_attr = voltage_attr self.ac_island_attr = ac_island_attr self.config = config or VAElectricalConfig() if algorithm not in self.SUPPORTED_ALGORITHMS: raise ValueError( f"Unsupported algorithm: {algorithm}. " f"Supported: {', '.join(self.SUPPORTED_ALGORITHMS)}" ) # Validate hierarchical linkage if self.config.hierarchical_linkage not in self.SUPPORTED_LINKAGES: raise ValueError( f"Unsupported hierarchical linkage: {self.config.hierarchical_linkage}. " f"Supported: {', '.join(self.SUPPORTED_LINKAGES)}" ) log_debug( f"Initialized VAElectricalDistancePartitioning: algorithm={algorithm}, " f"voltage_attr={voltage_attr}, ac_island_attr={ac_island_attr}", LogCategory.PARTITIONING, )
@property def required_attributes(self) -> dict[str, list[str]]: """Required attributes for voltage-aware electrical partitioning.""" return { "nodes": [self.voltage_attr, self.ac_island_attr], "edges": [], # Reactance validated separately for AC edges } def _get_strategy_name(self) -> str: """Get descriptive strategy name for error messages.""" return f"va_electrical_{self.algorithm}"
[docs] @with_runtime_config(VAElectricalConfig, _CONFIG_PARAMS) def partition(self, graph: nx.DiGraph, **kwargs) -> dict[int, list[Any]]: """ Partition nodes based on voltage-aware electrical distance using PTDF. Computes PTDF for the full AC network (lines + transformers) per AC island, then enforces voltage level separation via post-processing. Parameters ---------- graph : nx.DiGraph NetworkX DiGraph with voltage and ac_island on nodes, reactance on AC edges (lines and transformers). **kwargs : dict Additional parameters including: - ``n_clusters`` (int): Number of clusters (required). - ``config`` (VAElectricalConfig, optional): Override instance config. - ``hierarchical_linkage`` (str, optional): Override linkage for hierarchical clustering. - Individual config parameters to override. Returns ------- dict[int, list[Any]] Dictionary mapping cluster_id -> list of node_ids. Raises ------ PartitioningError If partitioning fails. ValidationError If required attributes are missing. ValueError If unsupported hierarchical_linkage is specified. """ try: effective_config = kwargs.get("_effective_config", self.config) effective_slack = kwargs.get("slack_bus", self.slack_bus) n_clusters = kwargs.get("n_clusters", 0) # Validate hierarchical_linkage if overridden if effective_config.hierarchical_linkage not in self.SUPPORTED_LINKAGES: raise ValueError( f"Unsupported hierarchical linkage: {effective_config.hierarchical_linkage}. " f"Supported: {', '.join(self.SUPPORTED_LINKAGES)}" ) if n_clusters is None or n_clusters <= 0: raise PartitioningError( "Voltage-aware electrical partitioning requires a positive " "'n_clusters' parameter.", strategy=self._get_strategy_name(), ) nodes = list(graph.nodes()) n_nodes = len(nodes) if n_clusters > n_nodes: raise PartitioningError( f"Cannot create {n_clusters} clusters from {n_nodes} nodes.", strategy=self._get_strategy_name(), ) # Validate required attributes self._validate_node_attributes(graph, nodes) # Group nodes by AC island ac_islands = self._group_nodes_by_ac_island(graph, nodes) # Build voltage key array for post-processing voltage_keys = self._build_voltage_key_array(graph, nodes, effective_config) n_unique_voltages = len(np.unique(voltage_keys)) if n_unique_voltages <= 1: raise ValidationError( "Voltage-aware electrical partitioning requires multiple voltage " "levels, but only one was found. Use standard " "ElectricalDistancePartitioning for single-voltage networks.", strategy=self._get_strategy_name(), ) log_info( f"Starting VA electrical partitioning (PTDF): {self.algorithm}, " f"n_clusters={n_clusters}, ac_islands={len(ac_islands)}, " f"voltage_levels={n_unique_voltages}", LogCategory.PARTITIONING, ) # Calculate electrical distance matrix (full PTDF per AC island) distance_matrix = self._calculate_electrical_distance_matrix( graph, nodes, ac_islands, effective_config, effective_slack ) # Post-process: enforce voltage level constraints self._apply_voltage_constraints(distance_matrix, voltage_keys, effective_config) # Perform clustering labels = self._run_clustering(distance_matrix, effective_config, **kwargs) # Create and validate partition partition_map = create_partition_map(nodes, labels) validate_partition(partition_map, n_nodes, self._get_strategy_name()) # Validate cluster consistency self._validate_cluster_consistency(graph, partition_map, effective_config) log_info( f"VA electrical partitioning complete: {len(partition_map)} clusters", LogCategory.PARTITIONING, ) return partition_map except Exception as e: if isinstance(e, (PartitioningError, ValidationError)): raise raise PartitioningError( f"Voltage-aware electrical partitioning failed: {e}", strategy=self._get_strategy_name(), graph_info={ "nodes": len(list(graph.nodes())), "edges": len(graph.edges()), }, ) from e
def _run_clustering( self, distance_matrix: np.ndarray, config: VAElectricalConfig, **kwargs, ) -> np.ndarray: """ Dispatch to appropriate clustering algorithm. Parameters ---------- distance_matrix : np.ndarray Precomputed distance matrix (n_nodes x n_nodes). config : VAElectricalConfig Configuration with hierarchical_linkage. **kwargs : dict Additional parameters including n_clusters. Returns ------- np.ndarray Cluster labels for each node. """ n_clusters = kwargs.get("n_clusters") if self.algorithm == "kmedoids": log_debug( f"Running K-medoids clustering with {n_clusters} clusters", LogCategory.PARTITIONING, ) return run_kmedoids(distance_matrix, n_clusters) elif self.algorithm == "hierarchical": log_debug( f"Running hierarchical clustering with {n_clusters} clusters, " f"linkage={config.hierarchical_linkage}", LogCategory.PARTITIONING, ) return run_hierarchical(distance_matrix, n_clusters, config.hierarchical_linkage) else: raise PartitioningError( f"Unknown algorithm: {self.algorithm}", strategy=self._get_strategy_name(), ) # ========================================================================= # NODE ATTRIBUTE VALIDATION # ========================================================================= def _validate_node_attributes( self, graph: nx.DiGraph, nodes: list[Any], ) -> None: """ Validate that all nodes have required ac_island and voltage attributes. Parameters ---------- graph : nx.DiGraph NetworkX DiGraph with node attributes. nodes : list[Any] List of nodes to validate. Raises ------ ValidationError If any node is missing required attributes. """ missing_dc = [] missing_voltage = [] for node in nodes: node_data = graph.nodes[node] if self.ac_island_attr not in node_data: missing_dc.append(node) if self.voltage_attr not in node_data: missing_voltage.append(node) if missing_dc: sample = missing_dc[:5] raise ValidationError( f"VA electrical partitioning requires '{self.ac_island_attr}' attribute " f"on all nodes. {len(missing_dc)} node(s) missing (first few: {sample}). " f"Use 'va_loader' to automatically detect AC islands.", missing_attributes={"nodes": [self.ac_island_attr]}, strategy=self._get_strategy_name(), ) if missing_voltage: sample = missing_voltage[:5] raise ValidationError( f"VA electrical partitioning requires '{self.voltage_attr}' attribute " f"on all nodes. {len(missing_voltage)} node(s) missing (first few: {sample}). " f"Use 'va_loader' to load voltage-aware data.", missing_attributes={"nodes": [self.voltage_attr]}, strategy=self._get_strategy_name(), ) def _validate_ac_edge_attributes(self, graph: nx.DiGraph) -> None: """ Validate that AC edges (lines and transformers) have reactance attribute. Parameters ---------- graph : nx.DiGraph NetworkX DiGraph to validate. Raises ------ ValidationError If any AC edge is missing the 'x' attribute. """ missing_edges = [] for u, v, data in graph.edges(data=True): edge_type = data.get("type", EdgeType.LINE.value) if edge_type in self.AC_EDGE_TYPES and "x" not in data: missing_edges.append((u, v)) if missing_edges: sample = missing_edges[:5] raise ValidationError( f"AC edges (lines/transformers) require 'x' (reactance) attribute. " f"{len(missing_edges)} edge(s) missing (first few: {sample}).", missing_attributes={"edges": ["x"]}, strategy=self._get_strategy_name(), ) # ========================================================================= # AC ISLAND GROUPING # ========================================================================= def _group_nodes_by_ac_island( self, graph: nx.DiGraph, nodes: list[Any], ) -> dict[Any, list[Any]]: """ Group nodes by AC island attribute. Parameters ---------- graph : nx.DiGraph NetworkX DiGraph with ac_island attribute on nodes. nodes : list[Any] List of nodes to group. Returns ------- dict[Any, list[Any]] Dictionary mapping ac_island_id -> list of nodes. """ islands: dict[Any, list[Any]] = defaultdict(list) for node in nodes: island_id = graph.nodes[node].get(self.ac_island_attr) islands[island_id].append(node) return dict(islands) # ========================================================================= # VOLTAGE KEY COMPUTATION # ========================================================================= def _build_voltage_key_array( self, graph: nx.DiGraph, nodes: list[Any], config: VAElectricalConfig, ) -> np.ndarray: """ Build array of voltage keys for all nodes. Voltage keys are integer indices that group nodes by voltage level (within tolerance). This enables efficient vectorized comparison. Parameters ---------- graph : nx.DiGraph NetworkX DiGraph with voltage attribute on nodes. nodes : list[Any] Ordered list of nodes. config : VAElectricalConfig Configuration with voltage_tolerance. Returns ------- np.ndarray Integer array of shape (n_nodes,) where each element is the voltage group index for that node. """ n_nodes = len(nodes) voltages = np.empty(n_nodes, dtype=np.float64) for i, node in enumerate(nodes): voltage = graph.nodes[node].get(self.voltage_attr) voltages[i] = float(voltage) if isinstance(voltage, (int, float)) else np.nan # Quantize voltages to tolerance-based keys tolerance = max(config.voltage_tolerance, 0.1) voltage_keys_float = np.round(voltages / tolerance) * tolerance # Convert to integer indices for efficient comparison unique_keys, voltage_indices = np.unique(voltage_keys_float, return_inverse=True) log_debug( f"Built voltage key array: {len(unique_keys)} unique voltage levels", LogCategory.PARTITIONING, ) return voltage_indices @staticmethod def _get_voltage_key(voltage: Any, config: VAElectricalConfig) -> Any: """Get voltage key for tolerance-based matching (scalar version).""" if isinstance(voltage, (int, float)): tolerance = max(config.voltage_tolerance, 0.1) return round(voltage / tolerance) * tolerance return voltage # ========================================================================= # ELECTRICAL DISTANCE MATRIX CALCULATION # ========================================================================= def _calculate_electrical_distance_matrix( self, graph: nx.DiGraph, nodes: list[Any], ac_islands: dict[Any, list[Any]], config: VAElectricalConfig, slack_bus: Any | None, ) -> np.ndarray: """ Calculate electrical distance matrix with per-AC-island PTDF. Computes PTDF including ALL AC elements (lines + transformers) for each AC island, then combines into a full distance matrix with infinite inter-island distances. Parameters ---------- graph : nx.DiGraph NetworkX DiGraph with reactance on AC edges. nodes : list[Any] Ordered list of all nodes. ac_islands : dict[Any, list[Any]] Mapping of ac_island_id -> list of nodes. config : VAElectricalConfig Configuration parameters. slack_bus : Any, optional User-specified slack bus. Returns ------- np.ndarray Distance matrix (n_nodes x n_nodes) with infinite distances between different AC islands. """ node_to_idx = {node: idx for idx, node in enumerate(nodes)} n_nodes = len(nodes) # Initialize with infinite distances distance_matrix = np.full((n_nodes, n_nodes), config.infinite_distance) np.fill_diagonal(distance_matrix, 0.0) # Process each AC island for island_id, island_nodes in ac_islands.items(): log_debug( f"Processing AC island {island_id}: {len(island_nodes)} nodes", LogCategory.PARTITIONING, ) # Skip single-node islands if len(island_nodes) == 1: log_debug( f"AC island {island_id} has single node, distance = 0", LogCategory.PARTITIONING, ) continue # Extract full AC subgraph (lines + transformers, exclude DC links) ac_subgraph = self._extract_ac_subgraph(graph, island_nodes) # Validate AC edges have reactance self._validate_ac_subgraph_reactance(ac_subgraph, island_id) # Validate island connectivity self._validate_island_connectivity(ac_subgraph, island_id, island_nodes) # Select ONE slack bus for the entire AC island island_slack = self._select_slack_bus(ac_subgraph, island_nodes, slack_bus, island_id) # Compute PTDF-based distances for this island island_distances = self._compute_island_ptdf_distances( ac_subgraph, island_nodes, island_slack, config ) # Insert into full matrix using vectorized indexing island_indices = np.array([node_to_idx[n] for n in island_nodes]) distance_matrix[np.ix_(island_indices, island_indices)] = island_distances log_info( f"Processed {len(ac_islands)} AC island(s) for PTDF computation", LogCategory.PARTITIONING, ) return distance_matrix def _extract_ac_subgraph( self, graph: nx.DiGraph, island_nodes: list[Any], ) -> nx.DiGraph: """ Extract AC subgraph containing lines AND transformers (exclude DC links). Parameters ---------- graph : nx.DiGraph Full NetworkX DiGraph. island_nodes : list[Any] Nodes in this AC island. Returns ------- nx.DiGraph Subgraph with only AC edges (lines + transformers). """ island_node_set = set(island_nodes) subgraph = nx.DiGraph() # Add nodes with attributes for node in island_nodes: subgraph.add_node(node, **graph.nodes[node]) # Add AC edges (lines and transformers, exclude DC links) for u, v, data in graph.edges(data=True): if u in island_node_set and v in island_node_set: edge_type = data.get("type", EdgeType.LINE.value) if edge_type in self.AC_EDGE_TYPES: subgraph.add_edge(u, v, **data) return subgraph def _validate_ac_subgraph_reactance( self, ac_subgraph: nx.DiGraph, island_id: Any, ) -> None: """ Validate that all edges in AC subgraph have reactance attribute. Parameters ---------- ac_subgraph : nx.DiGraph AC-only subgraph. island_id : Any AC island identifier. Raises ------ ValidationError If any edge is missing the 'x' attribute. """ missing_edges = [] for u, v, data in ac_subgraph.edges(data=True): if "x" not in data: missing_edges.append((u, v)) if missing_edges: sample = missing_edges[:5] raise ValidationError( f"AC edges in AC island {island_id} require 'x' (reactance) " f"attribute. {len(missing_edges)} edge(s) missing (first few: {sample}).", missing_attributes={"edges": ["x"]}, strategy=self._get_strategy_name(), ) def _validate_island_connectivity( self, ac_subgraph: nx.DiGraph, island_id: Any, island_nodes: list[Any], ) -> None: """ Validate that a AC island's AC subgraph is connected. Parameters ---------- ac_subgraph : nx.DiGraph AC-only subgraph for the island. island_id : Any AC island identifier. island_nodes : list[Any] Nodes in this island. Raises ------ PartitioningError If AC subgraph is not weakly connected. """ if ac_subgraph.number_of_edges() == 0: raise PartitioningError( f"AC island {island_id} has no AC edges (lines/transformers). " f"Cannot compute electrical distances without AC connectivity.", strategy=self._get_strategy_name(), graph_info={"island_id": island_id, "n_nodes": len(island_nodes)}, ) if not nx.is_weakly_connected(ac_subgraph): n_components = nx.number_weakly_connected_components(ac_subgraph) raise PartitioningError( f"AC island {island_id} is not AC-connected. Found {n_components} " f"disconnected AC components within the island. This may indicate " f"missing line/transformer data or incorrect AC island assignment.", strategy=self._get_strategy_name(), graph_info={ "island_id": island_id, "n_nodes": len(island_nodes), "n_ac_edges": ac_subgraph.number_of_edges(), "n_components": n_components, }, ) # ========================================================================= # VOLTAGE CONSTRAINT POST-PROCESSING # ========================================================================= @staticmethod def _apply_voltage_constraints( distance_matrix: np.ndarray, voltage_keys: np.ndarray, config: VAElectricalConfig, ) -> None: """ Set infinite distance between nodes at different voltage levels. This is a post-processing step that enforces voltage level boundaries after PTDF distances have been computed. Uses broadcasting for O(n²) memory but O(1) NumPy operations, which is highly efficient for large graphs due to SIMD vectorization. Parameters ---------- distance_matrix : np.ndarray Distance matrix to modify in-place (n_nodes x n_nodes). voltage_keys : np.ndarray Integer array of voltage group indices for each node. config : VAElectricalConfig Configuration with infinite_distance value. """ # Create boolean mask where voltage levels differ # Broadcasting: voltage_keys[:, None] has shape (n, 1) # voltage_keys[None, :] has shape (1, n) # Result has shape (n, n) different_voltage_mask = voltage_keys[:, np.newaxis] != voltage_keys[np.newaxis, :] # Apply infinite distance where voltage levels differ distance_matrix[different_voltage_mask] = config.infinite_distance n_modified = np.sum(different_voltage_mask) log_debug( f"Applied voltage constraints: {n_modified} pairs set to infinite distance", LogCategory.PARTITIONING, ) def _validate_cluster_consistency( self, graph: nx.DiGraph, partition_map: dict[int, list[Any]], config: VAElectricalConfig, ) -> None: """ Validate that clusters don't mix incompatible AC islands or voltage levels. Parameters ---------- graph : nx.DiGraph Original NetworkX graph. partition_map : dict[int, list[Any]] Resulting partition mapping. config : VAElectricalConfig Configuration with voltage_tolerance. """ for cluster_id, cluster_nodes in partition_map.items(): ac_islands_in_cluster = set() voltages_in_cluster = set() for node in cluster_nodes: node_data = graph.nodes[node] ac_island = node_data.get(self.ac_island_attr) if ac_island is not None: ac_islands_in_cluster.add(ac_island) voltage = node_data.get(self.voltage_attr) if voltage is not None: voltages_in_cluster.add(self._get_voltage_key(voltage, config)) if len(ac_islands_in_cluster) > 1: log_warning( f"Cluster {cluster_id} contains nodes from multiple AC islands: " f"{ac_islands_in_cluster}. This should not happen with infinite distances.", LogCategory.PARTITIONING, warn_user=False, ) if len(voltages_in_cluster) > 1: log_warning( f"Cluster {cluster_id} contains multiple voltage levels: " f"{voltages_in_cluster}. This should not happen with infinite distances.", LogCategory.PARTITIONING, warn_user=False, ) # ========================================================================= # SLACK BUS SELECTION # ========================================================================= @staticmethod def _select_slack_bus( ac_subgraph: nx.DiGraph, island_nodes: list[Any], user_slack: Any | None, island_id: Any, ) -> Any: """ Select slack bus for a AC island. Parameters ---------- ac_subgraph : nx.DiGraph AC subgraph for this AC island. island_nodes : list[Any] Nodes in this AC island. user_slack : Any, optional User-specified slack bus. island_id : Any AC island identifier for logging. Returns ------- Any Selected slack bus node. """ island_node_set = set(island_nodes) if user_slack is not None and user_slack in island_node_set: log_debug( f"Using user-specified slack bus {user_slack} for AC island {island_id}", LogCategory.PARTITIONING, ) return user_slack # Auto-select: highest total degree in AC subgraph degrees = {n: ac_subgraph.in_degree(n) + ac_subgraph.out_degree(n) for n in island_nodes} selected = max(island_nodes, key=lambda n: degrees[n]) log_debug( f"Auto-selected slack bus {selected} for AC island {island_id} " f"(degree={degrees[selected]})", LogCategory.PARTITIONING, ) return selected # ========================================================================= # PTDF COMPUTATION # ========================================================================= def _compute_island_ptdf_distances( self, ac_subgraph: nx.DiGraph, island_nodes: list[Any], slack_bus: Any, config: VAElectricalConfig, ) -> np.ndarray: """ Compute PTDF-based electrical distances for a AC island. Parameters ---------- ac_subgraph : nx.DiGraph AC subgraph (lines + transformers) for this AC island. island_nodes : list[Any] Ordered list of nodes in this island. slack_bus : Any Slack bus node for this island. config : VAElectricalConfig Configuration parameters. Returns ------- np.ndarray Distance matrix for this island (n x n). """ # Build PTDF matrix ptdf_matrix, active_nodes = self._build_ptdf_matrix( ac_subgraph, island_nodes, slack_bus, config ) log_debug( f"Built PTDF matrix: shape {ptdf_matrix.shape}", LogCategory.PARTITIONING, ) # Compute distances from PTDF columns distance_matrix_active = self._compute_distances_from_ptdf(ptdf_matrix) # Integrate slack bus distances return self._integrate_slack_bus( distance_matrix_active, ptdf_matrix, island_nodes, slack_bus, active_nodes ) def _build_ptdf_matrix( self, ac_subgraph: nx.DiGraph, nodes: list[Any], slack_bus: Any, config: VAElectricalConfig, ) -> tuple[np.ndarray, list[Any]]: """ Build the Power Transfer Distribution Factor (PTDF) matrix. PTDF = diag{b} * K_sba * (K_sba^T * diag{b} * K_sba)^(-1) Parameters ---------- ac_subgraph : nx.DiGraph AC subgraph (lines + transformers). nodes : list[Any] Ordered list of nodes in island. slack_bus : Any Slack bus node. config : VAElectricalConfig Configuration parameters. Returns ------- tuple[np.ndarray, list[Any]] PTDF matrix (n_edges x n_active) and list of active nodes. """ # Extract edges and susceptances from AC subgraph edges, susceptances = self._extract_susceptances(ac_subgraph, config) if len(edges) == 0: raise PartitioningError( "No valid AC edges found for PTDF matrix construction.", strategy=self._get_strategy_name(), ) # Build slack-bus-adjusted incidence matrix active_nodes = [n for n in nodes if n != slack_bus] n_active = len(active_nodes) n_edges = len(edges) node_to_idx = {node: idx for idx, node in enumerate(active_nodes)} # Pre-compute indices for assignment u_indices = np.array([node_to_idx.get(u, -1) for u, v in edges], dtype=np.int32) v_indices = np.array([node_to_idx.get(v, -1) for u, v in edges], dtype=np.int32) edge_indices = np.arange(n_edges, dtype=np.int32) # Build incidence matrix K_sba K_sba = np.zeros((n_edges, n_active), dtype=np.float64) # Assignment for source nodes (u -> edge leaves u) valid_u = u_indices >= 0 K_sba[edge_indices[valid_u], u_indices[valid_u]] = -1.0 # Assignment for target nodes (v -> edge enters v) valid_v = v_indices >= 0 K_sba[edge_indices[valid_v], v_indices[valid_v]] = 1.0 # Build susceptance matrix B = K_sba^T @ diag(b) @ K_sba # Efficient: (K.T * b) @ K avoids explicit diagonal matrix K_scaled = K_sba.T * susceptances B_matrix = K_scaled @ K_sba B_matrix = (B_matrix + B_matrix.T) / 2.0 # Ensure symmetry # Apply Tikhonov regularization for numerical stability if config.regularization_factor > 0: B_matrix += config.regularization_factor * np.eye(n_active) # Compute PTDF: solve B @ X = A^T where A = diag(b) @ K_sba A = susceptances[:, np.newaxis] * K_sba try: # Using assume_a='sym' enables faster symmetric solver X = solve(B_matrix, A.T, assume_a="sym") ptdf_matrix = X.T except LinAlgError: log_warning( "B matrix is singular, using least-squares solution.", LogCategory.PARTITIONING, ) X, _, _, _ = np.linalg.lstsq(B_matrix, A.T, rcond=None) ptdf_matrix = X.T return ptdf_matrix, active_nodes def _extract_susceptances( self, ac_subgraph: nx.DiGraph, config: VAElectricalConfig, ) -> tuple[list[tuple[Any, Any]], np.ndarray]: """ Extract edges and susceptances from AC subgraph. Parameters ---------- ac_subgraph : nx.DiGraph AC subgraph (lines + transformers). config : VAElectricalConfig Configuration parameters. Returns ------- tuple[list[tuple[Any, Any]], np.ndarray] List of edges and array of susceptances (b = 1/x). """ edges = [] susceptances = [] zero_count = 0 for u, v, data in ac_subgraph.edges(data=True): reactance = data.get("x") if reactance is None: raise PartitioningError( f"AC edge ({u}, {v}) missing reactance 'x'", strategy=self._get_strategy_name(), ) if not isinstance(reactance, (int, float)): raise PartitioningError( f"Edge ({u}, {v}) reactance must be numeric, got {type(reactance)}", strategy=self._get_strategy_name(), ) if reactance == 0: zero_count += 1 reactance = config.zero_reactance_replacement edges.append((u, v)) susceptances.append(1.0 / reactance) if zero_count > 0: log_warning( f"{zero_count} edge(s) with zero reactance replaced with " f"{config.zero_reactance_replacement}", LogCategory.PARTITIONING, ) return edges, np.array(susceptances, dtype=np.float64) def _compute_distances_from_ptdf(self, ptdf_matrix: np.ndarray) -> np.ndarray: """ Compute electrical distances from PTDF column differences. d_ij = ||PTDF[:,i] - PTDF[:,j]||_2 Uses identity: ||a-b||^2 = ||a||^2 + ||b||^2 - 2<a,b> This enables O(n²) computation via optimized BLAS matrix multiply. Parameters ---------- ptdf_matrix : np.ndarray PTDF matrix (n_edges x n_active). Returns ------- np.ndarray Distance matrix (n_active x n_active). """ # Use float32 for faster computation on large matrices X = ptdf_matrix.T.astype(np.float32, copy=False) # Squared norms: ||x_i||^2 for each row norms_sq = np.einsum("ij,ij->i", X, X) # Gram matrix: <x_i, x_j> for all pairs (BLAS-optimized) gram = X @ X.T # ||a - b||^2 = ||a||^2 + ||b||^2 - 2<a,b> dist_sq = norms_sq[:, np.newaxis] + norms_sq - 2.0 * gram # Handle numerical errors (small negative values) np.maximum(dist_sq, 0.0, out=dist_sq) # Square root in-place distances = np.sqrt(dist_sq, out=dist_sq) np.fill_diagonal(distances, 0.0) if np.any(np.isnan(distances)): raise PartitioningError( "NaN values in distance matrix after PTDF calculation.", strategy=self._get_strategy_name(), ) return distances.astype(np.float64) @staticmethod def _integrate_slack_bus( dist_active: np.ndarray, ptdf_matrix: np.ndarray, island_nodes: list[Any], slack_bus: Any, active_nodes: list[Any], ) -> np.ndarray: """ Integrate slack bus into distance matrix. Slack bus has implicit PTDF column of zeros (reference bus). Distance from slack to node i = ||PTDF[:,i]||_2 Parameters ---------- dist_active : np.ndarray Distance matrix for active nodes (n_active x n_active). ptdf_matrix : np.ndarray PTDF matrix (n_edges x n_active). island_nodes : list[Any] All nodes in island. slack_bus : Any Slack bus node. active_nodes : list[Any] Active nodes (excluding slack). Returns ------- np.ndarray Full distance matrix including slack bus (n_island x n_island). """ n_island = len(island_nodes) full_dist = np.zeros((n_island, n_island), dtype=np.float64) slack_idx = island_nodes.index(slack_bus) active_indices = np.array([island_nodes.index(node) for node in active_nodes]) # Copy active distances using fancy indexing full_dist[np.ix_(active_indices, active_indices)] = dist_active # Slack bus distances: ||PTDF[:,i]||_2 (L2 norm of each column) slack_dists = np.linalg.norm(ptdf_matrix, axis=0) full_dist[slack_idx, active_indices] = slack_dists full_dist[active_indices, slack_idx] = slack_dists return full_dist