import itertools
from collections import defaultdict
from typing import Any
import networkx as nx
import numpy as np
from npap.exceptions import AggregationError
from npap.interfaces import EdgePropertyStrategy, NodePropertyStrategy, TopologyStrategy
from npap.logging import LogCategory, log_debug, log_warning
# ============================================================================
# PRECOMPUTATION UTILITIES - Build mappings for fast lookups
# ============================================================================
[docs]
def build_node_to_cluster_map(partition_map: dict[int, list[Any]]) -> dict[Any, int]:
"""
Build a reverse mapping from node ID to cluster ID.
Args:
partition_map: Dict mapping cluster_id -> list of node IDs
Returns
-------
Dict mapping node_id -> cluster_id for O(1) lookups
"""
node_to_cluster = {}
for cluster_id, nodes in partition_map.items():
for node in nodes:
node_to_cluster[node] = cluster_id
return node_to_cluster
[docs]
def build_cluster_edge_map(
graph: nx.DiGraph, node_to_cluster: dict[Any, int]
) -> dict[tuple[int, int], list[dict[str, Any]]]:
"""
Build a mapping from cluster pairs to their original edge data.
Single pass over all edges - O(E) complexity.
Args:
graph: Original NetworkX graph
node_to_cluster: Mapping from node_id -> cluster_id
Returns
-------
Dict mapping (source_cluster, target_cluster) -> list of edge attribute dicts
"""
cluster_edges: dict[tuple[int, int], list[dict[str, Any]]] = defaultdict(list)
for u, v, data in graph.edges(data=True):
cluster_u = node_to_cluster.get(u)
cluster_v = node_to_cluster.get(v)
# Skip edges where nodes aren't in the partition (shouldn't happen normally)
if cluster_u is None or cluster_v is None:
continue
# Skip self-loops at cluster level (internal edges)
if cluster_u != cluster_v:
cluster_edges[(cluster_u, cluster_v)].append(data)
return dict(cluster_edges)
[docs]
def build_typed_cluster_edge_map(
graph: nx.DiGraph,
node_to_cluster: dict[Any, int],
type_attribute: str = "type",
) -> dict[str, dict[tuple[int, int], list[dict[str, Any]]]]:
"""
Build cluster edge maps grouped by edge type.
Like ``build_cluster_edge_map`` but returns a nested dict keyed first by the
value of the *type_attribute* on each edge, then by ``(source_cluster,
target_cluster)`` pair. Edges without the type attribute are collected under
the key ``"_untyped"``.
Single pass over all edges – O(E) complexity.
Parameters
----------
graph : nx.DiGraph
Original NetworkX graph.
node_to_cluster : dict[Any, int]
Mapping from node_id to cluster_id.
type_attribute : str
Edge attribute that stores the type label (default ``"type"``).
Returns
-------
dict[str, dict[tuple[int, int], list[dict[str, Any]]]]
Nested mapping: edge_type -> (src_cluster, tgt_cluster) -> [edge_data].
"""
typed_edges: dict[str, dict[tuple[int, int], list[dict[str, Any]]]] = {}
for u, v, data in graph.edges(data=True):
cluster_u = node_to_cluster.get(u)
cluster_v = node_to_cluster.get(v)
if cluster_u is None or cluster_v is None or cluster_u == cluster_v:
continue
edge_type = data.get(type_attribute, "_untyped")
if edge_type not in typed_edges:
typed_edges[edge_type] = defaultdict(list)
typed_edges[edge_type][(cluster_u, cluster_v)].append(data)
# Convert inner defaultdicts to plain dicts
return {t: dict(m) for t, m in typed_edges.items()}
[docs]
def build_cluster_connectivity_set(
graph: nx.DiGraph, node_to_cluster: dict[Any, int]
) -> set[tuple[int, int]]:
"""
Build a set of connected cluster pairs from original graph edges.
Single pass over all edges - O(E) complexity.
Args:
graph: Original NetworkX graph
node_to_cluster: Mapping from node_id -> cluster_id
Returns
-------
Set of (source_cluster, target_cluster) tuples where edges exist
"""
connected_clusters: set[tuple[int, int]] = set()
for u, v in graph.edges():
cluster_u = node_to_cluster.get(u)
cluster_v = node_to_cluster.get(v)
if cluster_u is not None and cluster_v is not None and cluster_u != cluster_v:
connected_clusters.add((cluster_u, cluster_v))
return connected_clusters
# ============================================================================
# TOPOLOGY STRATEGIES - Define graph structure
# ============================================================================
[docs]
class SimpleTopologyStrategy(TopologyStrategy):
"""
Simple topology: one node per cluster, edges only where original connections exist.
This is the most basic topology strategy:
- Creates one node per cluster
- Creates a directed edge between two clusters only if there was at least one
directed edge between nodes in those clusters in the original graph
- Preserves edge direction from original graph
- Does NOT create new edges
"""
[docs]
def create_topology(self, graph: nx.DiGraph, partition_map: dict[int, list[Any]]) -> nx.DiGraph:
"""Create aggregated topology with basic node and edge mapping."""
try:
aggregated = nx.DiGraph()
# Step 1: Create nodes (one per cluster)
aggregated.add_nodes_from(partition_map.keys())
# Step 2: Build node-to-cluster mapping
node_to_cluster = build_node_to_cluster_map(partition_map)
# Step 3: Find connected cluster pairs in single pass
connected_clusters = build_cluster_connectivity_set(graph, node_to_cluster)
# Step 4: Add edges for connected clusters
aggregated.add_edges_from(connected_clusters)
log_debug(
f"SimpleTopology: {aggregated.number_of_nodes()} nodes, "
f"{aggregated.number_of_edges()} edges",
LogCategory.AGGREGATION,
)
return aggregated
except Exception as e:
raise AggregationError(
f"Failed to create simple topology: {e}", strategy="simple_topology"
) from e
@property
def can_create_new_edges(self) -> bool:
"""Simple topology does not create new edges."""
return False
[docs]
class ElectricalTopologyStrategy(TopologyStrategy):
"""
Electrical topology: may create fully connected or partially connected topology
for subsequent physical aggregation (e.g., Kron reduction).
This topology strategy is designed for electrical networks where:
- Physical coupling may exist even without direct edges
- Kron reduction or other physical methods will determine final connectivity
- May start with fully connected graph and let physical strategy prune
"""
[docs]
def __init__(self, initial_connectivity: str = "existing"):
"""
Initialize electrical topology strategy.
Args:
initial_connectivity: How to initialize edges
- "existing": Only edges where original connections exist (like simple)
- "full": Fully connected (all cluster pairs connected)
"""
self.initial_connectivity = initial_connectivity
[docs]
def create_topology(self, graph: nx.DiGraph, partition_map: dict[int, list[Any]]) -> nx.DiGraph:
"""Create topology suitable for electrical aggregation."""
try:
aggregated = nx.DiGraph()
# Step 1: Create nodes
aggregated.add_nodes_from(partition_map.keys())
# Step 2: Create edges based on connectivity mode
if self.initial_connectivity == "full":
# Add all permutations as edges (excluding self-loops)
aggregated.add_edges_from(itertools.permutations(partition_map.keys(), 2))
log_debug(
f"ElectricalTopology (full): {len(partition_map)} nodes, "
f"{aggregated.number_of_edges()} edges",
LogCategory.AGGREGATION,
)
elif self.initial_connectivity == "existing":
# Build node-to-cluster mapping
node_to_cluster = build_node_to_cluster_map(partition_map)
# Find connected cluster pairs in single pass
connected_clusters = build_cluster_connectivity_set(graph, node_to_cluster)
# Add edges
aggregated.add_edges_from(connected_clusters)
log_debug(
f"ElectricalTopology (existing): {len(partition_map)} nodes, "
f"{aggregated.number_of_edges()} edges",
LogCategory.AGGREGATION,
)
else:
raise AggregationError(
f"Unknown connectivity mode: {self.initial_connectivity}",
strategy="electrical_topology",
)
return aggregated
except Exception as e:
if isinstance(e, AggregationError):
raise
raise AggregationError(
f"Failed to create electrical topology: {e}",
strategy="electrical_topology",
) from e
@property
def can_create_new_edges(self) -> bool:
"""Electrical topology may create new edges depending on connectivity mode."""
return self.initial_connectivity == "full"
# ============================================================================
# NODE PROPERTY STRATEGIES - Statistical aggregation for node properties
# ============================================================================
def _extract_numeric_node_values(
graph: nx.DiGraph, nodes: list[Any], property_name: str
) -> np.ndarray:
"""
Extract numeric values for a property from a list of nodes.
Uses list comprehension for speed, then converts to NumPy array.
Returns
-------
NumPy array of numeric values (may be empty)
"""
values = [
graph.nodes[node][property_name]
for node in nodes
if property_name in graph.nodes[node]
and isinstance(graph.nodes[node][property_name], (int, float))
]
return np.array(values, dtype=np.float64) if values else np.array([], dtype=np.float64)
[docs]
class SumNodeStrategy(NodePropertyStrategy):
"""Sum numerical properties across nodes in a cluster."""
[docs]
def aggregate_property(self, graph: nx.DiGraph, nodes: list[Any], property_name: str) -> Any:
"""Sum property values across nodes using NumPy."""
try:
values = _extract_numeric_node_values(graph, nodes, property_name)
return float(np.sum(values)) if len(values) > 0 else 0.0
except Exception as e:
raise AggregationError(
f"Failed to sum node property '{property_name}': {e}", strategy="sum"
) from e
[docs]
class AverageNodeStrategy(NodePropertyStrategy):
"""Average numerical properties across nodes in a cluster."""
[docs]
def aggregate_property(self, graph: nx.DiGraph, nodes: list[Any], property_name: str) -> Any:
"""Average property values across nodes using NumPy."""
try:
values = _extract_numeric_node_values(graph, nodes, property_name)
if len(values) > 0:
return float(np.mean(values))
# No numeric values found — fall back to first available value
first_value = FirstNodeStrategy().aggregate_property(graph, nodes, property_name)
if first_value is not None:
log_warning(
f"No numeric values found for node property '{property_name}'. "
f"Falling back to first available value.",
LogCategory.AGGREGATION,
warn_user=True,
)
return first_value
except Exception as e:
raise AggregationError(
f"Failed to average node property '{property_name}': {e}",
strategy="average",
) from e
[docs]
class FirstNodeStrategy(NodePropertyStrategy):
"""Take the first available value for non-numerical properties."""
[docs]
def aggregate_property(self, graph: nx.DiGraph, nodes: list[Any], property_name: str) -> Any:
"""Take first available property value."""
try:
for node in nodes:
if property_name in graph.nodes[node]:
return graph.nodes[node][property_name]
return None
except Exception as e:
raise AggregationError(
f"Failed to get first node property '{property_name}': {e}",
strategy="first",
) from e
# ============================================================================
# EDGE PROPERTY STRATEGIES - Statistical aggregation for edge properties
# ============================================================================
def _extract_numeric_edge_values(
original_edges: list[dict[str, Any]], property_name: str
) -> np.ndarray:
"""
Extract numeric values for a property from a list of edge data dicts.
Uses list comprehension for speed, then converts to NumPy array.
Returns
-------
NumPy array of numeric values (may be empty)
"""
values = [
edge_data[property_name]
for edge_data in original_edges
if property_name in edge_data and isinstance(edge_data[property_name], (int, float))
]
return np.array(values, dtype=np.float64) if values else np.array([], dtype=np.float64)
[docs]
class SumEdgeStrategy(EdgePropertyStrategy):
"""Sum numerical properties across edges."""
[docs]
def aggregate_property(self, original_edges: list[dict[str, Any]], property_name: str) -> Any:
"""Sum property values across edges using NumPy."""
try:
values = _extract_numeric_edge_values(original_edges, property_name)
return float(np.sum(values)) if len(values) > 0 else 0.0
except Exception as e:
raise AggregationError(
f"Failed to sum edge property '{property_name}': {e}", strategy="sum"
) from e
[docs]
class AverageEdgeStrategy(EdgePropertyStrategy):
"""Average numerical properties across edges."""
[docs]
def aggregate_property(self, original_edges: list[dict[str, Any]], property_name: str) -> Any:
"""Average property values across edges using NumPy."""
try:
values = _extract_numeric_edge_values(original_edges, property_name)
return float(np.mean(values)) if len(values) > 0 else 0.0
except Exception as e:
raise AggregationError(
f"Failed to average edge property '{property_name}': {e}",
strategy="average",
) from e
[docs]
class FirstEdgeStrategy(EdgePropertyStrategy):
"""Take the first available value for non-numerical properties."""
[docs]
def aggregate_property(self, original_edges: list[dict[str, Any]], property_name: str) -> Any:
"""Take first available property value."""
try:
for edge_data in original_edges:
if property_name in edge_data:
return edge_data[property_name]
return None
except Exception as e:
raise AggregationError(
f"Failed to get first edge property '{property_name}': {e}",
strategy="first",
) from e
[docs]
class EquivalentReactanceStrategy(EdgePropertyStrategy):
"""
Aggregates reactance for a set of parallel edges.
This strategy calculates the equivalent reactance by first converting
each edge's reactance (x) to susceptance (b = 1/x), summing the
susceptances, and then converting the total susceptance back
to an equivalent reactance (x_eq = 1 / b_eq).
This correctly models the physics of parallel lines as:
b_eq = b_1 + b_2 + ...
x_eq = 1 / b_eq
"""
[docs]
def aggregate_property(self, original_edges: list[dict[str, Any]], property_name: str) -> Any:
"""
Calculate the equivalent reactance for the given parallel edges.
Uses NumPy for vectorized susceptance calculation.
Returns
-------
Equivalent reactance value, or float('inf') if non-numeric values present
Raises
------
AggregationError
If no edges have the property at all (entirely missing attribute).
"""
try:
values = _extract_numeric_edge_values(original_edges, property_name)
if len(values) == 0:
# Distinguish: property exists but non-numeric vs. entirely missing
has_property = any(property_name in edge for edge in original_edges)
if has_property:
log_warning(
f"No numeric values found for edge property '{property_name}'. "
f"Returning inf (open circuit equivalent).",
LogCategory.AGGREGATION,
warn_user=False,
)
return float("inf")
raise AggregationError(
f"No edges have the '{property_name}' attribute. Cannot calculate "
f"equivalent reactance.",
strategy="equivalent_reactance",
)
epsilon = 1e-10
# Check for zero reactance (short circuit)
if np.any(np.abs(values) < epsilon):
return 0.0
# Vectorized susceptance calculation: b = 1/x
susceptances = 1.0 / values
total_susceptance = np.sum(susceptances)
if abs(total_susceptance) < epsilon:
return float("inf")
return float(1.0 / total_susceptance)
except Exception as e:
if isinstance(e, AggregationError):
raise
raise AggregationError(
f"Failed to calculate equivalent reactance for '{property_name}': {e}",
strategy="equivalent_reactance",
) from e