Managers#
The manager classes orchestrate the partitioning and aggregation workflow.
PartitionAggregatorManager#
- class npap.managers.PartitionAggregatorManager[source]
Bases:
objectMain orchestrator - the primary class users interact with.
Methods
aggregate([partition_result, profile, mode])Aggregate using partition result and profile.
aggregate_parallel_edges([edge_properties, ...])Convert current MultiDiGraph to simple DiGraph by aggregating parallel edges.
full_workflow(data_strategy, partition_strategy)Execute complete workflow without storing intermediates.
get_current_graph()Get the current graph.
get_current_partition()Get the current partition result.
group_by_voltage_levels(target_levels[, ...])Group bus voltage levels to predefined target values.
load_data(strategy, **kwargs)Load data using specified strategy.
partition(strategy, **kwargs)Partition current graph and store result.
plot_network([style, graph, show, config])Plot the network on an interactive map.
See
npap.PartitionAggregatorManagerfor full member documentation.- __init__()[source]
- load_data(strategy, **kwargs)[source]
Load data using specified strategy.
- Return type:
- Parameters:
strategy (str)
- partition(strategy, **kwargs)[source]
Partition current graph and store result.
- Return type:
- Parameters:
strategy (str)
- aggregate(partition_result=None, profile=None, mode=None, **overrides)[source]
Aggregate using partition result and profile.
- Parameters:
partition_result (
PartitionResult, optional) – Partition to use (or use stored partition).profile (
AggregationProfile, optional) – Aggregation profile (custom configuration).mode (
AggregationMode, optional) – Aggregation mode (pre-defined configuration).**overrides (
dict) – Override specific profile parameters when using mode.
- Returns:
Aggregated graph. Returns a
MultiDiGraphwhenprofile.edge_type_propertiesis populated.- Return type:
nx.DiGraphornx.MultiDiGraph
Notes
If both profile and mode are provided, profile takes precedence.
- aggregate_parallel_edges(edge_properties=None, default_strategy='sum', warn_on_defaults=True)[source]
Convert current MultiDiGraph to simple DiGraph by aggregating parallel edges.
This method aggregates all parallel edges between the same directed node pairs into single edges, using the specified aggregation strategies. The graph must be a MultiDiGraph for this operation to be meaningful.
For directed graphs, edges (A->B) and (B->A) are treated as separate edges.
- Parameters:
- Returns:
Simple DiGraph with parallel edges aggregated.
- Return type:
nx.DiGraph- Raises:
ValueError – If no graph is loaded or graph is not a MultiDiGraph.
- full_workflow(data_strategy, partition_strategy, aggregation_profile=None, aggregation_mode=None, **kwargs)[source]
Execute complete workflow without storing intermediates.
If a MultiDiGraph is loaded, parallel edges will be automatically aggregated before partitioning. If a voltage-aware partitioning strategy is selected, voltage levels will be grouped first in 220kV and 380kV voltage levels.
- Parameters:
data_strategy (
str) – Data loading strategy name.partition_strategy (
str) – Partitioning strategy name.aggregation_profile (
AggregationProfile, optional) – Aggregation profile (custom).aggregation_mode (
AggregationMode, optional) – Aggregation mode (pre-defined).**kwargs (
dict) – Parameters for data loading, parallel edge aggregation, and partitioning.
- Returns:
Aggregated graph.
- Return type:
nx.DiGraph
Examples
>>> manager = PartitionAggregatorManager() >>> result = manager.full_workflow( ... data_strategy="csv_files", ... partition_strategy="geographical_kmeans", ... node_file="buses.csv", ... edge_file="lines.csv", ... n_clusters=10 ... )
- get_current_graph()[source]
Get the current graph.
- Returns:
Current graph, or None if no graph is loaded.
- Return type:
nx.DiGraphorNone
- get_current_partition()[source]
Get the current partition result.
- Returns:
Current partition result, or None if not partitioned.
- Return type:
PartitionResultorNone
- plot_network(style='simple', graph=None, show=True, config=None, **kwargs)[source]
Plot the network on an interactive map.
- Parameters:
style (
str) –Plot style:
’simple’: All edges same color (fast, minimal)
’voltage_aware’: Edges colored by type (line/trafo/dc_link)
’clustered’: Nodes colored by cluster assignment
graph (
nx.DiGraph, optional) – Graph to plot (uses current graph if not provided).show (
bool) – Whether to display immediately.config (
PlotConfig, optional) – PlotConfig instance to override defaults. If provided, kwargs will further override values from this config.**kwargs (
dict) – Additional configuration options (see PlotConfig for full list).
- Returns:
Plotly Figure object.
- Return type:
go.Figure- Raises:
ValueError – If no graph is loaded or clustered style without partition.
Examples
>>> manager.plot_network(style="voltage_aware", title="My Network") >>> manager.plot_network(style="clustered") # After partitioning
- group_by_voltage_levels(target_levels, voltage_attr='voltage', store_original=True, handle_missing='infer')[source]
Group bus voltage levels to predefined target values.
This method reassigns each node’s voltage to the nearest target voltage level, creating clean voltage “islands” for subsequent voltage-aware partitioning.
- Parameters:
target_levels (
list[float]) – List of target voltage levels (kV) to harmonize to. Example:[220, 380]for European transmission grid.voltage_attr (
str) – Node attribute containing voltage level.store_original (
bool) – If True, stores original voltage in ‘original_{voltage_attr}’.handle_missing (
str) –How to handle nodes without voltage data:
’infer’: Infer from connected neighbors
’nearest’: Assign to nearest target level
’error’: Raise an error
’skip’: Leave as None
- Returns:
Summary dict with:
’total_nodes’: Total number of nodes processed
’reassignments’: Dict mapping original_voltage to (target_voltage, count)
’missing_handled’: Number of nodes with missing voltage
’voltage_distribution’: Dict mapping target_voltage to node_count
- Return type:
dict[str,Any]- Raises:
ValueError – If no graph loaded, target_levels empty, or handle_missing=’error’ with missing voltages.
Examples
>>> manager.group_by_voltage_levels([220, 380]) {'total_nodes': 6000, 'voltage_distribution': {220: 3500, 380: 2500}, ...}
InputDataManager#
- class npap.managers.InputDataManager[source]#
Bases:
objectManages data loading from different sources.
Methods
load(strategy_name, **kwargs)Load data using specified strategy.
register_strategy(name, strategy)Register a new data loading strategy.
- register_strategy(name, strategy)[source]#
Register a new data loading strategy.
- Return type:
- Parameters:
name (str)
strategy (DataLoadingStrategy)
PartitioningManager#
- class npap.managers.PartitioningManager[source]#
Bases:
objectManages partitioning strategies.
Methods
partition(graph, method, **kwargs)Execute partitioning using specified strategy.
register_strategy(name, strategy)Register a new partitioning strategy.
- register_strategy(name, strategy)[source]#
Register a new partitioning strategy.
- Return type:
- Parameters:
name (str)
strategy (PartitioningStrategy)
AggregationManager#
- class npap.managers.AggregationManager[source]#
Bases:
objectManages aggregation strategies and orchestrates the aggregation process.
Aggregation is a 3-step process: 1. Topology creation (graph structure) 2. Physical aggregation (electrical laws) 3. Statistical property aggregation (independent properties)
Methods
aggregate(graph, partition_map[, profile])Execute aggregation using the specified profile.
aggregate_parallel_edges(graph[, ...])Collapse parallel edges in a MultiDiGraph to produce a simple DiGraph.
get_mode_profile(mode, **overrides)Get pre-defined aggregation profile for a given mode.
register_edge_strategy(name, strategy)Register an edge property aggregation strategy.
register_node_strategy(name, strategy)Register a node property aggregation strategy.
register_physical_strategy(name, strategy)Register a physical aggregation strategy.
register_topology_strategy(name, strategy)Register a topology strategy.
- register_topology_strategy(name, strategy)[source]#
Register a topology strategy.
- Return type:
- Parameters:
name (str)
strategy (TopologyStrategy)
- register_physical_strategy(name, strategy)[source]#
Register a physical aggregation strategy.
- Return type:
- Parameters:
name (str)
strategy (PhysicalAggregationStrategy)
- register_node_strategy(name, strategy)[source]#
Register a node property aggregation strategy.
- Return type:
- Parameters:
name (str)
strategy (NodePropertyStrategy)
- register_edge_strategy(name, strategy)[source]#
Register an edge property aggregation strategy.
- Return type:
- Parameters:
name (str)
strategy (EdgePropertyStrategy)
- static get_mode_profile(mode, **overrides)[source]#
Get pre-defined aggregation profile for a given mode.
- Parameters:
mode (
AggregationMode) – Aggregation mode.**overrides (
dict) – Override specific profile parameters.
- Returns:
AggregationProfile configured for the mode.
- Return type:
AggregationProfile
- aggregate(graph, partition_map, profile=None)[source]#
Execute aggregation using the specified profile.
Aggregation is a 3-step process:
Create topology (nodes + edge structure)
Apply physical aggregation (if specified)
Aggregate remaining properties statistically
- Parameters:
graph (
nx.DiGraph) – Graph to aggregate.partition_map (
dict[int,list[Any]]) – Mapping of cluster_id to list of node_ids.profile (
AggregationProfile, optional) – Aggregation profile (uses defaults if not provided).
- Returns:
Aggregated graph. Returns a
MultiDiGraphwhenprofile.edge_type_propertiesis populated.- Return type:
nx.DiGraphornx.MultiDiGraph
- aggregate_parallel_edges(graph, edge_properties=None, default_strategy='sum', warn_on_defaults=True)[source]#
Collapse parallel edges in a MultiDiGraph to produce a simple DiGraph.
This method aggregates all parallel edges between the same directed node pairs using the specified edge property strategies, converting a MultiDiGraph to a simple DiGraph that can be partitioned.
For directed graphs, edges (A->B) and (B->A) are treated as separate edges and are aggregated independently.
- Parameters:
graph (
nx.MultiDiGraph) – MultiDiGraph with potential parallel edges.edge_properties (
dict[str,str], optional) – Dict mapping property names to aggregation strategies. Example:{"reactance": "average", "length": "sum"}.default_strategy (
str) – Strategy to use for properties not specified.warn_on_defaults (
bool) – Whether to warn when using default strategy.
- Returns:
Simple DiGraph with parallel edges aggregated.
- Return type:
nx.DiGraph- Raises:
ValueError – If graph is not a MultiDiGraph.
AggregationError – If aggregation fails.