from typing import Any
import networkx as nx
from npap.interfaces import (
AggregationMode,
AggregationProfile,
DataLoadingStrategy,
EdgePropertyStrategy,
NodePropertyStrategy,
PartitioningStrategy,
PartitionResult,
PhysicalAggregationStrategy,
TopologyStrategy,
)
from npap.logging import LogCategory, log_debug, log_info, log_warning
from npap.partitioning import VAElectricalDistancePartitioning
[docs]
class PartitioningManager:
"""Manages partitioning strategies."""
[docs]
def __init__(self):
self._strategies: dict[str, PartitioningStrategy] = {}
self._register_default_strategies()
[docs]
def register_strategy(self, name: str, strategy: PartitioningStrategy) -> None:
"""Register a new partitioning strategy."""
self._strategies[name] = strategy
log_debug(f"Registered partitioning strategy: {name}", LogCategory.MANAGER)
[docs]
def partition(self, graph: nx.DiGraph, method: str, **kwargs) -> dict[int, list[Any]]:
"""Execute partitioning using specified strategy."""
if method not in self._strategies:
available = ", ".join(self._strategies.keys())
raise ValueError(f"Unknown partitioning strategy: {method}. Available: {available}")
log_info(f"Partitioning graph with strategy: {method}", LogCategory.PARTITIONING)
return self._strategies[method].partition(graph, **kwargs)
def _register_default_strategies(self) -> None:
"""Register built-in partitioning strategies."""
from npap.partitioning.electrical import ElectricalDistancePartitioning
from npap.partitioning.geographical import GeographicalPartitioning
from npap.partitioning.va_geographical import (
VAGeographicalConfig,
VAGeographicalPartitioning,
)
# Geographical distance partitioning strategies
self._strategies["geographical_kmeans"] = GeographicalPartitioning(
algorithm="kmeans", distance_metric="euclidean"
)
self._strategies["geographical_kmedoids_euclidean"] = GeographicalPartitioning(
algorithm="kmedoids", distance_metric="euclidean"
)
self._strategies["geographical_kmedoids_haversine"] = GeographicalPartitioning(
algorithm="kmedoids", distance_metric="haversine"
)
self._strategies["geographical_dbscan_euclidean"] = GeographicalPartitioning(
algorithm="dbscan", distance_metric="euclidean"
)
self._strategies["geographical_dbscan_haversine"] = GeographicalPartitioning(
algorithm="dbscan", distance_metric="haversine"
)
self._strategies["geographical_hierarchical"] = GeographicalPartitioning(
algorithm="hierarchical", distance_metric="euclidean"
)
self._strategies["geographical_hdbscan_euclidean"] = GeographicalPartitioning(
algorithm="hdbscan", distance_metric="euclidean"
)
self._strategies["geographical_hdbscan_haversine"] = GeographicalPartitioning(
algorithm="hdbscan", distance_metric="haversine"
)
# Electrical distance partitioning strategies
self._strategies["electrical_kmeans"] = ElectricalDistancePartitioning(algorithm="kmeans")
self._strategies["electrical_kmedoids"] = ElectricalDistancePartitioning(
algorithm="kmedoids"
)
# Voltage-Aware Geographical partitioning strategies - Standard mode
self._strategies["va_geographical_kmedoids_euclidean"] = VAGeographicalPartitioning(
algorithm="kmedoids", distance_metric="euclidean"
)
self._strategies["va_geographical_kmedoids_haversine"] = VAGeographicalPartitioning(
algorithm="kmedoids", distance_metric="haversine"
)
self._strategies["va_geographical_hierarchical"] = VAGeographicalPartitioning(
algorithm="hierarchical", distance_metric="haversine"
)
# Voltage-Aware Geographical partitioning strategies - Proportional mode
self._strategies["va_geographical_proportional_kmedoids_euclidean"] = (
VAGeographicalPartitioning(
algorithm="kmedoids",
distance_metric="euclidean",
config=VAGeographicalConfig(proportional_clustering=True),
)
)
self._strategies["va_geographical_proportional_kmedoids_haversine"] = (
VAGeographicalPartitioning(
algorithm="kmedoids",
distance_metric="haversine",
config=VAGeographicalConfig(proportional_clustering=True),
)
)
self._strategies["va_geographical_proportional_hierarchical"] = VAGeographicalPartitioning(
algorithm="hierarchical",
distance_metric="haversine",
config=VAGeographicalConfig(proportional_clustering=True),
)
# Voltage-Aware Electrical partitioning strategies
self._strategies["va_electrical_kmedoids"] = VAElectricalDistancePartitioning(
algorithm="kmedoids"
)
self._strategies["va_electrical_hierarchical"] = VAElectricalDistancePartitioning(
algorithm="hierarchical"
)
log_debug("Registered default partitioning strategies", LogCategory.MANAGER)
[docs]
class AggregationManager:
"""
Manages aggregation strategies and orchestrates the aggregation process.
Aggregation is a 3-step process:
1. Topology creation (graph structure)
2. Physical aggregation (electrical laws)
3. Statistical property aggregation (independent properties)
"""
[docs]
def __init__(self):
# Topology strategies (how graph structure is reduced)
self._topology_strategies: dict[str, TopologyStrategy] = {}
# Physical aggregation strategies (electrical laws)
self._physical_strategies: dict[str, PhysicalAggregationStrategy] = {}
# Statistical property aggregation strategies
self._node_strategies: dict[str, NodePropertyStrategy] = {}
self._edge_strategies: dict[str, EdgePropertyStrategy] = {}
self._register_default_strategies()
[docs]
def register_topology_strategy(self, name: str, strategy: TopologyStrategy) -> None:
"""Register a topology strategy."""
self._topology_strategies[name] = strategy
log_debug(f"Registered topology strategy: {name}", LogCategory.MANAGER)
[docs]
def register_physical_strategy(self, name: str, strategy: PhysicalAggregationStrategy) -> None:
"""Register a physical aggregation strategy."""
self._physical_strategies[name] = strategy
log_debug(f"Registered physical strategy: {name}", LogCategory.MANAGER)
[docs]
def register_node_strategy(self, name: str, strategy: NodePropertyStrategy) -> None:
"""Register a node property aggregation strategy."""
self._node_strategies[name] = strategy
[docs]
def register_edge_strategy(self, name: str, strategy: EdgePropertyStrategy) -> None:
"""Register an edge property aggregation strategy."""
self._edge_strategies[name] = strategy
[docs]
@staticmethod
def get_mode_profile(mode: AggregationMode, **overrides) -> AggregationProfile:
"""
Get pre-defined aggregation profile for a given mode.
Parameters
----------
mode : AggregationMode
Aggregation mode.
**overrides : dict
Override specific profile parameters.
Returns
-------
AggregationProfile
AggregationProfile configured for the mode.
"""
from npap.aggregation.modes import get_mode_profile
return get_mode_profile(mode, **overrides)
[docs]
def aggregate(
self,
graph: nx.DiGraph,
partition_map: dict[int, list[Any]],
profile: AggregationProfile = None,
) -> nx.DiGraph | nx.MultiDiGraph:
"""
Execute aggregation using the specified profile.
Aggregation is a 3-step process:
1. Create topology (nodes + edge structure)
2. Apply physical aggregation (if specified)
3. Aggregate remaining properties statistically
Parameters
----------
graph : nx.DiGraph
Graph to aggregate.
partition_map : dict[int, list[Any]]
Mapping of cluster_id to list of node_ids.
profile : AggregationProfile, optional
Aggregation profile (uses defaults if not provided).
Returns
-------
nx.DiGraph or nx.MultiDiGraph
Aggregated graph. Returns a ``MultiDiGraph`` when
``profile.edge_type_properties`` is populated.
"""
if profile is None:
profile = AggregationProfile()
log_info(
f"Aggregating graph: {len(partition_map)} clusters, "
f"topology={profile.topology_strategy}",
LogCategory.AGGREGATION,
)
# Validate strategies exist
self._validate_profile(profile)
# Pre-compute mappings for efficient aggregation
from npap.aggregation.basic_strategies import (
build_cluster_edge_map,
build_node_to_cluster_map,
)
node_to_cluster = build_node_to_cluster_map(partition_map)
cluster_edge_map = build_cluster_edge_map(graph, node_to_cluster)
# Step 1: Create topology
topology_strategy = self._topology_strategies[profile.topology_strategy]
aggregated = topology_strategy.create_topology(graph, partition_map)
log_info(
f"Created topology: {aggregated.number_of_nodes()} nodes, "
f"{aggregated.number_of_edges()} edges",
LogCategory.AGGREGATION,
)
# Track which properties are handled by physical aggregation
physical_modified_properties = set()
# Step 2: Apply physical aggregation (if specified)
if profile.physical_strategy:
physical_strategy = self._physical_strategies[profile.physical_strategy]
# Validate topology compatibility
if topology_strategy.__class__.__name__ != physical_strategy.required_topology:
log_warning(
f"Physical strategy '{profile.physical_strategy}' recommends '{physical_strategy.required_topology}' topology, "
f"but '{profile.topology_strategy}' is being used. Results may be incorrect.",
LogCategory.AGGREGATION,
)
# Apply physical aggregation
aggregated = physical_strategy.aggregate(
graph,
partition_map,
aggregated,
profile.physical_properties,
profile.physical_parameters,
)
# Mark properties as modified by physical strategy
physical_modified_properties = set(physical_strategy.modifies_properties)
# Warn user if they tried to override physical properties
self._check_property_conflicts(profile, physical_modified_properties)
# Step 3: Aggregate node properties
self._aggregate_node_properties(
graph, partition_map, aggregated, profile, physical_modified_properties
)
# Step 4: Aggregate edge properties
if profile.edge_type_properties:
aggregated = self._aggregate_typed_edge_properties(
graph,
partition_map,
aggregated,
profile,
physical_modified_properties,
)
else:
self._aggregate_edge_properties(
graph,
partition_map,
aggregated,
profile,
physical_modified_properties,
cluster_edge_map,
)
log_info(
f"Aggregation complete: {aggregated.number_of_nodes()} nodes, "
f"{aggregated.number_of_edges()} edges",
LogCategory.AGGREGATION,
)
return aggregated
[docs]
def aggregate_parallel_edges(
self,
graph: nx.MultiDiGraph,
edge_properties: dict[str, str] = None,
default_strategy: str = "sum",
warn_on_defaults: bool = True,
) -> nx.DiGraph:
"""
Collapse parallel edges in a MultiDiGraph to produce a simple DiGraph.
This method aggregates all parallel edges between the same directed node pairs
using the specified edge property strategies, converting a MultiDiGraph
to a simple DiGraph that can be partitioned.
For directed graphs, edges (A->B) and (B->A) are treated as separate edges
and are aggregated independently.
Parameters
----------
graph : nx.MultiDiGraph
MultiDiGraph with potential parallel edges.
edge_properties : dict[str, str], optional
Dict mapping property names to aggregation strategies.
Example: ``{"reactance": "average", "length": "sum"}``.
default_strategy : str
Strategy to use for properties not specified.
warn_on_defaults : bool
Whether to warn when using default strategy.
Returns
-------
nx.DiGraph
Simple DiGraph with parallel edges aggregated.
Raises
------
ValueError
If graph is not a MultiDiGraph.
AggregationError
If aggregation fails.
"""
from collections import defaultdict
from npap.exceptions import AggregationError
# Validate input
if not isinstance(graph, nx.MultiDiGraph):
raise ValueError(
f"Expected MultiDiGraph, got {type(graph).__name__}. "
"This method is only for collapsing parallel edges in directed multigraphs."
)
log_info(
f"Aggregating parallel edges in MultiDiGraph with {graph.number_of_edges()} edges",
LogCategory.AGGREGATION,
)
edge_properties = edge_properties or {}
# Validate strategies exist
for prop, strategy in edge_properties.items():
if strategy not in self._edge_strategies:
available = ", ".join(self._edge_strategies.keys())
raise ValueError(
f"Unknown edge strategy '{strategy}' for property '{prop}'. "
f"Available: {available}"
)
if default_strategy not in self._edge_strategies:
available = ", ".join(self._edge_strategies.keys())
raise ValueError(
f"Unknown default edge strategy '{default_strategy}'. Available: {available}"
)
try:
# Create simple directed graph with all nodes
simple_graph = nx.DiGraph()
simple_graph.add_nodes_from(graph.nodes(data=True))
# Group edges by (u, v) and collect all properties
edge_groups: dict[tuple, list[dict]] = defaultdict(list)
all_properties: set[str] = set()
for u, v, data in graph.edges(data=True):
edge_groups[(u, v)].append(data)
all_properties.update(data.keys())
# Pre-resolve strategies for each property
property_strategies = self._resolve_property_strategies(
properties=all_properties,
user_specified=edge_properties,
available_strategies=self._edge_strategies,
default_strategy=default_strategy,
warn_on_defaults=warn_on_defaults,
property_type="Edge",
)
# Process each unique edge group
parallel_count = 0
for (u, v), parallel_edges_data in edge_groups.items():
if len(parallel_edges_data) > 1:
parallel_count += 1
# Aggregate all properties using pre-resolved strategies
aggregated_attrs = {
prop: strategy.aggregate_property(parallel_edges_data, prop)
for prop, strategy in property_strategies.items()
}
simple_graph.add_edge(u, v, **aggregated_attrs)
log_info(
f"Parallel edge aggregation complete: {parallel_count} parallel edge groups collapsed",
LogCategory.AGGREGATION,
)
return simple_graph
except Exception as e:
raise AggregationError(
f"Failed to aggregate parallel edges: {e}",
strategy="parallel_edge_aggregation",
) from e
def _validate_profile(self, profile: AggregationProfile) -> None:
"""Validate that all strategies in profile exist."""
if profile.topology_strategy not in self._topology_strategies:
available = ", ".join(self._topology_strategies.keys())
raise ValueError(
f"Unknown topology strategy: {profile.topology_strategy}. Available: {available}"
)
if profile.physical_strategy and profile.physical_strategy not in self._physical_strategies:
available = ", ".join(self._physical_strategies.keys())
raise ValueError(
f"Unknown physical strategy: {profile.physical_strategy}. Available: {available}"
)
# Validate node property strategies
for prop, strategy in profile.node_properties.items():
if strategy not in self._node_strategies:
available = ", ".join(self._node_strategies.keys())
raise ValueError(
f"Unknown node strategy '{strategy}' for property '{prop}'. "
f"Available: {available}"
)
# Validate edge property strategies
for prop, strategy in profile.edge_properties.items():
if strategy not in self._edge_strategies:
available = ", ".join(self._edge_strategies.keys())
raise ValueError(
f"Unknown edge strategy '{strategy}' for property '{prop}'. "
f"Available: {available}"
)
# Validate per-type edge property strategies
for edge_type, type_strategies in profile.edge_type_properties.items():
for prop, strategy in type_strategies.items():
if strategy not in self._edge_strategies:
available = ", ".join(self._edge_strategies.keys())
raise ValueError(
f"Unknown edge strategy '{strategy}' for property '{prop}' "
f"in edge type '{edge_type}'. Available: {available}"
)
@staticmethod
def _check_property_conflicts(profile: AggregationProfile, physical_properties: set) -> None:
"""Check if user tried to override properties handled by physical strategy."""
for prop in profile.node_properties:
if prop in physical_properties:
log_warning(
f"Node property '{prop}' is modified by physical strategy "
f"'{profile.physical_strategy}'. User statistical aggregation for this property will be IGNORED.",
LogCategory.AGGREGATION,
)
for prop in profile.edge_properties:
if prop in physical_properties:
log_warning(
f"Edge property '{prop}' is modified by physical strategy "
f"'{profile.physical_strategy}'. User statistical aggregation for this property will be IGNORED.",
LogCategory.AGGREGATION,
)
@staticmethod
def _resolve_property_strategies(
properties: set[str],
user_specified: dict[str, str],
available_strategies: dict[str, Any],
default_strategy: str,
warn_on_defaults: bool,
property_type: str,
) -> dict[str, Any]:
"""
Resolve strategies for a set of properties.
Parameters
----------
properties : set[str]
Set of property names to resolve.
user_specified : dict[str, str]
User-specified property to strategy name mapping.
available_strategies : dict[str, Any]
Available strategy name to strategy object mapping.
default_strategy : str
Default strategy name to use.
warn_on_defaults : bool
Whether to warn when using default strategy.
property_type : str
"Node" or "Edge" for warning messages.
Returns
-------
dict[str, Any]
Dict mapping property name to strategy object (or None for fallback).
"""
property_strategies: dict[str, Any] = {}
warned_properties: set[str] = set()
for prop in properties:
if prop in user_specified:
strategy_name = user_specified[prop]
property_strategies[prop] = available_strategies[strategy_name]
elif default_strategy in available_strategies:
if warn_on_defaults and prop not in warned_properties:
log_warning(
f"{property_type} property '{prop}' not specified. "
f"Using default strategy '{default_strategy}'",
LogCategory.AGGREGATION,
)
warned_properties.add(prop)
property_strategies[prop] = available_strategies[default_strategy]
else:
property_strategies[prop] = None # Will use fallback
return property_strategies
def _aggregate_node_properties(
self,
graph: nx.DiGraph,
partition_map: dict[int, list[Any]],
aggregated: nx.DiGraph,
profile: AggregationProfile,
skip_properties: set = None,
) -> None:
"""Aggregate node properties statistically."""
skip_properties = skip_properties or set()
# Collect all possible properties in single pass
all_properties: set[str] = set()
for nodes in partition_map.values():
for node in nodes:
all_properties.update(graph.nodes[node].keys())
# Remove properties to skip
properties_to_aggregate = all_properties - skip_properties
# Pre-resolve strategies
property_strategies = self._resolve_property_strategies(
properties=properties_to_aggregate,
user_specified=profile.node_properties,
available_strategies=self._node_strategies,
default_strategy=profile.default_node_strategy,
warn_on_defaults=profile.warn_on_defaults,
property_type="Node",
)
# Aggregate properties for each cluster
for cluster_id, nodes in partition_map.items():
node_attrs = {}
for prop, strategy in property_strategies.items():
if strategy is not None:
node_attrs[prop] = strategy.aggregate_property(graph, nodes, prop)
elif nodes:
# Fallback to first value
node_attrs[prop] = graph.nodes[nodes[0]].get(prop)
# Batch update node attributes
aggregated.nodes[cluster_id].update(node_attrs)
def _aggregate_edge_properties(
self,
graph: nx.DiGraph,
partition_map: dict[int, list[Any]],
aggregated: nx.DiGraph,
profile: AggregationProfile,
skip_properties: set = None,
cluster_edge_map: dict[tuple[int, int], list[dict[str, Any]]] = None,
) -> None:
"""Aggregate edge properties statistically."""
skip_properties = skip_properties or set()
# Build cluster_edge_map if not provided
if cluster_edge_map is None:
from npap.aggregation.basic_strategies import (
build_cluster_edge_map,
build_node_to_cluster_map,
)
node_to_cluster = build_node_to_cluster_map(partition_map)
cluster_edge_map = build_cluster_edge_map(graph, node_to_cluster)
# Collect all possible edge properties in single pass
all_properties: set[str] = set()
for edge_list in cluster_edge_map.values():
for edge_data in edge_list:
all_properties.update(edge_data.keys())
# Remove properties to skip
properties_to_aggregate = all_properties - skip_properties
# Pre-resolve strategies
property_strategies = self._resolve_property_strategies(
properties=properties_to_aggregate,
user_specified=profile.edge_properties,
available_strategies=self._edge_strategies,
default_strategy=profile.default_edge_strategy,
warn_on_defaults=profile.warn_on_defaults,
property_type="Edge",
)
# Aggregate properties for each edge using pre-computed mapping (O(1) lookup)
for edge in aggregated.edges():
cluster1, cluster2 = edge
original_edges = cluster_edge_map.get((cluster1, cluster2), [])
if not original_edges:
continue
# Aggregate properties
edge_attrs = {}
for prop, strategy in property_strategies.items():
if strategy is not None:
edge_attrs[prop] = strategy.aggregate_property(original_edges, prop)
elif original_edges:
# Fallback to first value
edge_attrs[prop] = original_edges[0].get(prop)
# Batch update edge attributes
aggregated.edges[edge].update(edge_attrs)
def _aggregate_typed_edge_properties(
self,
graph: nx.DiGraph,
partition_map: dict[int, list[Any]],
aggregated: nx.DiGraph,
profile: AggregationProfile,
skip_properties: set = None,
) -> nx.MultiDiGraph:
"""Aggregate edge properties per edge type, returning a MultiDiGraph.
When ``profile.edge_type_properties`` is populated, edges are grouped
by their ``"type"`` attribute and aggregated independently using
per-type strategy dicts. The result is a ``MultiDiGraph`` so that
multiple edges (one per type) can connect the same cluster pair.
Parameters
----------
graph : nx.DiGraph
Original graph.
partition_map : dict[int, list[Any]]
Cluster mapping.
aggregated : nx.DiGraph
Topology graph with aggregated node properties.
profile : AggregationProfile
Aggregation profile with ``edge_type_properties``.
skip_properties : set, optional
Properties already handled by physical aggregation.
Returns
-------
nx.MultiDiGraph
Aggregated graph with typed edges.
"""
skip_properties = skip_properties or set()
from npap.aggregation.basic_strategies import (
build_node_to_cluster_map,
build_typed_cluster_edge_map,
)
type_attr = profile.edge_type_attribute
node_to_cluster = build_node_to_cluster_map(partition_map)
typed_map = build_typed_cluster_edge_map(graph, node_to_cluster, type_attribute=type_attr)
# Convert the DiGraph topology to a MultiDiGraph, preserving nodes
multi = nx.MultiDiGraph()
multi.add_nodes_from(aggregated.nodes(data=True))
for edge_type, cluster_edges in typed_map.items():
# Determine which strategy dict to use for this type
user_specified = profile.edge_type_properties.get(edge_type, profile.edge_properties)
# Collect all properties across edges of this type
all_properties: set[str] = set()
for edge_list in cluster_edges.values():
for edge_data in edge_list:
all_properties.update(edge_data.keys())
# Exclude the type attribute itself — it is set explicitly below
properties_to_aggregate = all_properties - skip_properties - {type_attr}
# Resolve strategies
property_strategies = self._resolve_property_strategies(
properties=properties_to_aggregate,
user_specified=user_specified,
available_strategies=self._edge_strategies,
default_strategy=profile.default_edge_strategy,
warn_on_defaults=profile.warn_on_defaults,
property_type=f"Edge[{edge_type}]",
)
for (c1, c2), original_edges in cluster_edges.items():
edge_attrs: dict[str, Any] = {type_attr: edge_type}
for prop, strategy in property_strategies.items():
if strategy is not None:
edge_attrs[prop] = strategy.aggregate_property(original_edges, prop)
elif original_edges:
edge_attrs[prop] = original_edges[0].get(prop)
multi.add_edge(c1, c2, **edge_attrs)
log_info(
f"Typed edge aggregation: {multi.number_of_edges()} edges "
f"across {len(typed_map)} types",
LogCategory.AGGREGATION,
)
return multi
def _register_default_strategies(self) -> None:
"""Register built-in aggregation strategies."""
from npap.aggregation.basic_strategies import (
AverageEdgeStrategy,
AverageNodeStrategy,
ElectricalTopologyStrategy,
EquivalentReactanceStrategy,
FirstEdgeStrategy,
FirstNodeStrategy,
SimpleTopologyStrategy,
SumEdgeStrategy,
SumNodeStrategy,
)
from npap.aggregation.physical_strategies import KronReductionStrategy
# Topology strategies
self._topology_strategies["simple"] = SimpleTopologyStrategy()
self._topology_strategies["electrical"] = ElectricalTopologyStrategy()
# Physical strategies
self._physical_strategies["kron_reduction"] = KronReductionStrategy()
# Node property strategies
self._node_strategies["sum"] = SumNodeStrategy()
self._node_strategies["average"] = AverageNodeStrategy()
self._node_strategies["first"] = FirstNodeStrategy()
# Edge property strategies
self._edge_strategies["sum"] = SumEdgeStrategy()
self._edge_strategies["average"] = AverageEdgeStrategy()
self._edge_strategies["first"] = FirstEdgeStrategy()
self._edge_strategies["equivalent_reactance"] = EquivalentReactanceStrategy()
log_debug("Registered default aggregation strategies", LogCategory.MANAGER)
[docs]
class PartitionAggregatorManager:
"""Main orchestrator - the primary class users interact with."""
[docs]
def __init__(self):
self._current_graph: nx.DiGraph | None = None
self._current_graph_hash: str | None = None
self._current_partition: PartitionResult | None = None
# Managers for strategies
self.input_manager = InputDataManager()
self.partitioning_manager = PartitioningManager()
self.aggregation_manager = AggregationManager()
log_debug("PartitionAggregatorManager initialized", LogCategory.MANAGER)
[docs]
def load_data(self, strategy: str, **kwargs) -> nx.DiGraph | nx.MultiDiGraph:
"""Load data using specified strategy."""
self._current_graph = self.input_manager.load(strategy, **kwargs)
self._current_graph_hash = self._compute_graph_hash(self._current_graph)
self._current_partition = None # Clear any existing partition
return self._current_graph
[docs]
def partition(self, strategy: str, **kwargs) -> PartitionResult:
"""Partition current graph and store result."""
if not self._current_graph:
raise ValueError("No graph loaded. Call load_data() first.")
# Check if graph is MultiDiGraph
if isinstance(self._current_graph, nx.MultiDiGraph):
raise ValueError(
"Cannot partition MultiDiGraph directly. MultiDiGraphs contain parallel edges "
"that must be aggregated first. Please call manager.aggregate_parallel_edges() before partitioning."
)
mapping = self.partitioning_manager.partition(self._current_graph, strategy, **kwargs)
self._current_partition = PartitionResult(
mapping=mapping,
original_graph_hash=self._current_graph_hash,
strategy_name=strategy,
strategy_metadata={},
n_clusters=len(set(mapping.keys())),
)
log_info(
f"Partitioned into {self._current_partition.n_clusters} clusters",
LogCategory.PARTITIONING,
)
return self._current_partition
[docs]
def aggregate(
self,
partition_result: PartitionResult = None,
profile: AggregationProfile = None,
mode: AggregationMode = None,
**overrides,
) -> nx.DiGraph | nx.MultiDiGraph:
"""
Aggregate using partition result and profile.
Parameters
----------
partition_result : PartitionResult, optional
Partition to use (or use stored partition).
profile : AggregationProfile, optional
Aggregation profile (custom configuration).
mode : AggregationMode, optional
Aggregation mode (pre-defined configuration).
**overrides : dict
Override specific profile parameters when using mode.
Returns
-------
nx.DiGraph or nx.MultiDiGraph
Aggregated graph. Returns a ``MultiDiGraph`` when
``profile.edge_type_properties`` is populated.
Notes
-----
If both profile and mode are provided, profile takes precedence.
"""
if not self._current_graph:
raise ValueError("No graph loaded.")
# Use provided partition or stored partition
partition_to_use = partition_result or self._current_partition
if not partition_to_use:
raise ValueError(
"No partition available. Call partition() first or provide partition_result."
)
# Validate partition compatibility
from npap.utils import validate_graph_compatibility
validate_graph_compatibility(partition_to_use, self._current_graph_hash)
# Determine profile to use
if profile is None and mode is not None:
profile = self.aggregation_manager.get_mode_profile(mode, **overrides)
elif profile is None:
profile = AggregationProfile() # Default
# Aggregate and update current graph
self._current_graph = self.aggregation_manager.aggregate(
self._current_graph, partition_to_use.mapping, profile
)
# Update hash and clear partition since graph has changed
self._current_graph_hash = self._compute_graph_hash(self._current_graph)
self._current_partition = None
return self._current_graph
[docs]
def aggregate_parallel_edges(
self,
edge_properties: dict[str, str] = None,
default_strategy: str = "sum",
warn_on_defaults: bool = True,
) -> nx.DiGraph:
"""
Convert current MultiDiGraph to simple DiGraph by aggregating parallel edges.
This method aggregates all parallel edges between the same directed node pairs
into single edges, using the specified aggregation strategies. The graph
must be a MultiDiGraph for this operation to be meaningful.
For directed graphs, edges (A->B) and (B->A) are treated as separate edges.
Parameters
----------
edge_properties : dict[str, str], optional
Dict mapping property names to aggregation strategies.
Example: ``{"reactance": "average", "length": "sum"}``.
default_strategy : str
Strategy to use for properties not specified.
warn_on_defaults : bool
Whether to warn when using default strategy.
Returns
-------
nx.DiGraph
Simple DiGraph with parallel edges aggregated.
Raises
------
ValueError
If no graph is loaded or graph is not a MultiDiGraph.
"""
if not self._current_graph:
raise ValueError("No graph loaded. Call load_data() first.")
if not isinstance(self._current_graph, nx.MultiDiGraph):
raise ValueError(
f"Current graph is not a MultiDiGraph (it's {type(self._current_graph).__name__}). "
"This method is only for aggregating parallel edges in MultiDiGraphs."
)
# Aggregate parallel edges
self._current_graph = self.aggregation_manager.aggregate_parallel_edges(
self._current_graph,
edge_properties=edge_properties,
default_strategy=default_strategy,
warn_on_defaults=warn_on_defaults,
)
# Update hash since graph has changed
self._current_graph_hash = self._compute_graph_hash(self._current_graph)
# Clear partition since graph structure changed
self._current_partition = None
return self._current_graph
[docs]
def full_workflow(
self,
data_strategy: str,
partition_strategy: str,
aggregation_profile: AggregationProfile = None,
aggregation_mode: AggregationMode = None,
**kwargs,
) -> nx.DiGraph:
"""
Execute complete workflow without storing intermediates.
If a MultiDiGraph is loaded, parallel edges will be automatically aggregated
before partitioning. If a voltage-aware partitioning strategy is selected,
voltage levels will be grouped first in 220kV and 380kV voltage levels.
Parameters
----------
data_strategy : str
Data loading strategy name.
partition_strategy : str
Partitioning strategy name.
aggregation_profile : AggregationProfile, optional
Aggregation profile (custom).
aggregation_mode : AggregationMode, optional
Aggregation mode (pre-defined).
**kwargs : dict
Parameters for data loading, parallel edge aggregation, and partitioning.
Returns
-------
nx.DiGraph
Aggregated graph.
Examples
--------
>>> manager = PartitionAggregatorManager()
>>> result = manager.full_workflow(
... data_strategy="csv_files",
... partition_strategy="geographical_kmeans",
... node_file="buses.csv",
... edge_file="lines.csv",
... n_clusters=10
... )
"""
log_info("Starting full workflow", LogCategory.MANAGER)
# Parameters for data loading
data_params = {
k: v
for k, v in kwargs.items()
if k
in [
"node_file",
"edge_file",
"line_file",
"transformer_file",
"graph",
"connection_string",
"table_prefix",
"delimiter",
"decimal",
"node_id_col",
"edge_from_col",
"edge_to_col",
"bidirectional",
]
}
# Parameters for parallel edge aggregation (if MultiDiGraph)
parallel_edge_params = {
k: v
for k, v in kwargs.items()
if k
in [
"edge_properties",
"parallel_edge_default_strategy",
"parallel_edge_warn_on_defaults",
]
}
# Parameters for partitioning
partition_params = {
k: v
for k, v in kwargs.items()
if k not in data_params and k not in parallel_edge_params
}
# Step 1: Load data
self.load_data(data_strategy, **data_params)
# Step 2: If MultiDiGraph, aggregate parallel edges first
if isinstance(self._current_graph, nx.MultiDiGraph):
log_info(
"MultiDiGraph detected, auto-aggregating parallel edges",
LogCategory.MANAGER,
)
self.aggregate_parallel_edges(**parallel_edge_params)
# For voltage-aware strategies, group voltages first
if partition_strategy.startswith("va_"):
self.group_by_voltage_levels([220, 380])
# Step 3: Partition
partition_result = self.partition(partition_strategy, **partition_params)
# Step 4: Aggregate clusters
result = self.aggregate(partition_result, aggregation_profile, aggregation_mode)
log_info("Full workflow complete", LogCategory.MANAGER)
return result
[docs]
def get_current_graph(self) -> nx.DiGraph | None:
"""
Get the current graph.
Returns
-------
nx.DiGraph or None
Current graph, or None if no graph is loaded.
"""
return self._current_graph
[docs]
def get_current_partition(self) -> PartitionResult | None:
"""
Get the current partition result.
Returns
-------
PartitionResult or None
Current partition result, or None if not partitioned.
"""
return self._current_partition
@staticmethod
def _compute_graph_hash(graph: nx.DiGraph) -> str:
"""
Compute hash for graph validation.
Parameters
----------
graph : nx.DiGraph
Graph to compute hash for.
Returns
-------
str
Hash string for the graph.
"""
from npap.utils import compute_graph_hash
return compute_graph_hash(graph)
[docs]
def plot_network(
self,
style: str = "simple",
graph: nx.DiGraph = None,
show: bool = True,
config=None,
**kwargs,
):
"""
Plot the network on an interactive map.
Parameters
----------
style : str
Plot style:
- 'simple': All edges same color (fast, minimal)
- 'voltage_aware': Edges colored by type (line/trafo/dc_link)
- 'clustered': Nodes colored by cluster assignment
graph : nx.DiGraph, optional
Graph to plot (uses current graph if not provided).
show : bool
Whether to display immediately.
config : PlotConfig, optional
PlotConfig instance to override defaults. If provided,
kwargs will further override values from this config.
**kwargs : dict
Additional configuration options (see PlotConfig for full list).
Returns
-------
go.Figure
Plotly Figure object.
Raises
------
ValueError
If no graph is loaded or clustered style without partition.
Examples
--------
>>> manager.plot_network(style="voltage_aware", title="My Network")
>>> manager.plot_network(style="clustered") # After partitioning
"""
from npap.visualization import plot_network
target_graph = graph if graph is not None else self._current_graph
if target_graph is None:
raise ValueError("No graph loaded. Call load_data() first.")
# For clustered style, pass the partition map
partition_map = None
if style == "clustered":
if self._current_partition is None:
raise ValueError(
"Cannot create clustered plot without partitioning. Call partition() first."
)
partition_map = self._current_partition.mapping
log_info(f"Creating network plot with style: {style}", LogCategory.VISUALIZATION)
return plot_network(
graph=target_graph,
style=style,
partition_map=partition_map,
show=show,
config=config,
**kwargs,
)
[docs]
def group_by_voltage_levels(
self,
target_levels: list[float],
voltage_attr: str = "voltage",
store_original: bool = True,
handle_missing: str = "infer",
) -> dict[str, Any]:
"""
Group bus voltage levels to predefined target values.
This method reassigns each node's voltage to the nearest target voltage level,
creating clean voltage "islands" for subsequent voltage-aware partitioning.
Parameters
----------
target_levels : list[float]
List of target voltage levels (kV) to harmonize to.
Example: ``[220, 380]`` for European transmission grid.
voltage_attr : str
Node attribute containing voltage level.
store_original : bool
If True, stores original voltage in 'original_{voltage_attr}'.
handle_missing : str
How to handle nodes without voltage data:
- 'infer': Infer from connected neighbors
- 'nearest': Assign to nearest target level
- 'error': Raise an error
- 'skip': Leave as None
Returns
-------
dict[str, Any]
Summary dict with:
- 'total_nodes': Total number of nodes processed
- 'reassignments': Dict mapping original_voltage to (target_voltage, count)
- 'missing_handled': Number of nodes with missing voltage
- 'voltage_distribution': Dict mapping target_voltage to node_count
Raises
------
ValueError
If no graph loaded, target_levels empty, or handle_missing='error'
with missing voltages.
Examples
--------
>>> manager.group_by_voltage_levels([220, 380])
{'total_nodes': 6000, 'voltage_distribution': {220: 3500, 380: 2500}, ...}
"""
if not self._current_graph:
raise ValueError("No graph loaded. Call load_data() first.")
if not target_levels:
raise ValueError("target_levels cannot be empty.")
target_levels = sorted(target_levels)
graph = self._current_graph
log_info(
f"Grouping voltages to {len(target_levels)} target levels: {target_levels}",
LogCategory.MANAGER,
)
# Statistics tracking
reassignments: dict[Any, tuple[float, int]] = {} # original -> (target, count)
missing_nodes: list[Any] = []
voltage_distribution: dict[float, int] = dict.fromkeys(target_levels, 0)
# First pass: identify missing voltages and reassign known ones
for node in graph.nodes():
node_data = graph.nodes[node]
original_voltage = node_data.get(voltage_attr)
# Try fallback attributes
if original_voltage is None:
original_voltage = node_data.get("voltage", node_data.get("v_nom"))
if original_voltage is None:
missing_nodes.append(node)
continue
# Store original if requested
if store_original:
node_data[f"original_{voltage_attr}"] = original_voltage
# Find nearest target level
if isinstance(original_voltage, (int, float)):
target = min(target_levels, key=lambda t: abs(t - original_voltage))
else:
# Non-numeric voltage - try to parse or skip
try:
numeric_v = float(original_voltage)
target = min(target_levels, key=lambda t: abs(t - numeric_v))
except (ValueError, TypeError):
missing_nodes.append(node)
continue
# Reassign
node_data[voltage_attr] = target
voltage_distribution[target] += 1
# Track reassignment
orig_key = (
round(original_voltage, 1)
if isinstance(original_voltage, float)
else original_voltage
)
if orig_key not in reassignments:
reassignments[orig_key] = (target, 0)
reassignments[orig_key] = (
reassignments[orig_key][0],
reassignments[orig_key][1] + 1,
)
# Handle missing voltages
if missing_nodes:
if handle_missing == "error":
raise ValueError(
f"{len(missing_nodes)} nodes have missing voltage data. "
f"First few: {missing_nodes[:5]}"
)
elif handle_missing == "infer":
# Infer from connected neighbors
inferred_count = 0
still_missing = []
for node in missing_nodes:
neighbor_voltages = []
for neighbor in graph.neighbors(node):
nv = graph.nodes[neighbor].get(voltage_attr)
if nv is not None and nv in target_levels:
neighbor_voltages.append(nv)
# Also check predecessors for directed graphs
if hasattr(graph, "predecessors"):
for pred in graph.predecessors(node):
pv = graph.nodes[pred].get(voltage_attr)
if pv is not None and pv in target_levels:
neighbor_voltages.append(pv)
if neighbor_voltages:
# Use most common neighbor voltage
from collections import Counter
target = Counter(neighbor_voltages).most_common(1)[0][0]
graph.nodes[node][voltage_attr] = target
if store_original:
graph.nodes[node][f"original_{voltage_attr}"] = None
voltage_distribution[target] += 1
inferred_count += 1
else:
still_missing.append(node)
if still_missing:
log_warning(
f"{len(still_missing)} nodes could not be inferred. "
f"Assigning to nearest target level.",
LogCategory.MANAGER,
)
default_target = target_levels[0]
for node in still_missing:
graph.nodes[node][voltage_attr] = default_target
if store_original:
graph.nodes[node][f"original_{voltage_attr}"] = None
voltage_distribution[default_target] += 1
log_info(
f"Inferred voltage for {inferred_count} nodes from neighbors",
LogCategory.MANAGER,
)
elif handle_missing == "nearest":
# Assign to first target level (arbitrary)
default_target = target_levels[0]
for node in missing_nodes:
graph.nodes[node][voltage_attr] = default_target
if store_original:
graph.nodes[node][f"original_{voltage_attr}"] = None
voltage_distribution[default_target] += 1
elif handle_missing == "skip":
log_warning(
f"{len(missing_nodes)} nodes have no voltage (will cluster separately)",
LogCategory.MANAGER,
)
# Update graph hash since we modified node attributes
self._current_graph_hash = self._compute_graph_hash(self._current_graph)
return {
"total_nodes": len(list(graph.nodes())),
"reassignments": reassignments,
"missing_handled": len(missing_nodes),
"voltage_distribution": voltage_distribution,
"target_levels": target_levels,
}