Managers#

The manager classes orchestrate the partitioning and aggregation workflow.

PartitionAggregatorManager#

class npap.managers.PartitionAggregatorManager[source]

Bases: object

Main orchestrator - the primary class users interact with.

Methods

aggregate([partition_result, profile, mode])

Aggregate using partition result and profile.

aggregate_parallel_edges([edge_properties, ...])

Convert current MultiDiGraph to simple DiGraph by aggregating parallel edges.

full_workflow(data_strategy, partition_strategy)

Execute complete workflow without storing intermediates.

get_current_graph()

Get the current graph.

get_current_partition()

Get the current partition result.

group_by_voltage_levels(target_levels[, ...])

Group bus voltage levels to predefined target values.

load_data(strategy, **kwargs)

Load data using specified strategy.

partition(strategy, **kwargs)

Partition current graph and store result.

plot_network([style, graph, show, config])

Plot the network on an interactive map.

See npap.PartitionAggregatorManager for full member documentation.

__init__()[source]
load_data(strategy, **kwargs)[source]

Load data using specified strategy.

Return type:

DiGraph | MultiDiGraph

Parameters:

strategy (str)

partition(strategy, **kwargs)[source]

Partition current graph and store result.

Return type:

PartitionResult

Parameters:

strategy (str)

aggregate(partition_result=None, profile=None, mode=None, **overrides)[source]

Aggregate using partition result and profile.

Parameters:
  • partition_result (PartitionResult, optional) – Partition to use (or use stored partition).

  • profile (AggregationProfile, optional) – Aggregation profile (custom configuration).

  • mode (AggregationMode, optional) – Aggregation mode (pre-defined configuration).

  • **overrides (dict) – Override specific profile parameters when using mode.

Returns:

Aggregated graph. Returns a MultiDiGraph when profile.edge_type_properties is populated.

Return type:

nx.DiGraph or nx.MultiDiGraph

Notes

If both profile and mode are provided, profile takes precedence.

aggregate_parallel_edges(edge_properties=None, default_strategy='sum', warn_on_defaults=True)[source]

Convert current MultiDiGraph to simple DiGraph by aggregating parallel edges.

This method aggregates all parallel edges between the same directed node pairs into single edges, using the specified aggregation strategies. The graph must be a MultiDiGraph for this operation to be meaningful.

For directed graphs, edges (A->B) and (B->A) are treated as separate edges.

Parameters:
  • edge_properties (dict[str, str], optional) – Dict mapping property names to aggregation strategies. Example: {"reactance": "average", "length": "sum"}.

  • default_strategy (str) – Strategy to use for properties not specified.

  • warn_on_defaults (bool) – Whether to warn when using default strategy.

Returns:

Simple DiGraph with parallel edges aggregated.

Return type:

nx.DiGraph

Raises:

ValueError – If no graph is loaded or graph is not a MultiDiGraph.

full_workflow(data_strategy, partition_strategy, aggregation_profile=None, aggregation_mode=None, **kwargs)[source]

Execute complete workflow without storing intermediates.

If a MultiDiGraph is loaded, parallel edges will be automatically aggregated before partitioning. If a voltage-aware partitioning strategy is selected, voltage levels will be grouped first in 220kV and 380kV voltage levels.

Parameters:
  • data_strategy (str) – Data loading strategy name.

  • partition_strategy (str) – Partitioning strategy name.

  • aggregation_profile (AggregationProfile, optional) – Aggregation profile (custom).

  • aggregation_mode (AggregationMode, optional) – Aggregation mode (pre-defined).

  • **kwargs (dict) – Parameters for data loading, parallel edge aggregation, and partitioning.

Returns:

Aggregated graph.

Return type:

nx.DiGraph

Examples

>>> manager = PartitionAggregatorManager()
>>> result = manager.full_workflow(
...     data_strategy="csv_files",
...     partition_strategy="geographical_kmeans",
...     node_file="buses.csv",
...     edge_file="lines.csv",
...     n_clusters=10
... )
get_current_graph()[source]

Get the current graph.

Returns:

Current graph, or None if no graph is loaded.

Return type:

nx.DiGraph or None

get_current_partition()[source]

Get the current partition result.

Returns:

Current partition result, or None if not partitioned.

Return type:

PartitionResult or None

plot_network(style='simple', graph=None, show=True, config=None, **kwargs)[source]

Plot the network on an interactive map.

Parameters:
  • style (str) –

    Plot style:

    • ’simple’: All edges same color (fast, minimal)

    • ’voltage_aware’: Edges colored by type (line/trafo/dc_link)

    • ’clustered’: Nodes colored by cluster assignment

  • graph (nx.DiGraph, optional) – Graph to plot (uses current graph if not provided).

  • show (bool) – Whether to display immediately.

  • config (PlotConfig, optional) – PlotConfig instance to override defaults. If provided, kwargs will further override values from this config.

  • **kwargs (dict) – Additional configuration options (see PlotConfig for full list).

Returns:

Plotly Figure object.

Return type:

go.Figure

Raises:

ValueError – If no graph is loaded or clustered style without partition.

Examples

>>> manager.plot_network(style="voltage_aware", title="My Network")
>>> manager.plot_network(style="clustered")  # After partitioning
group_by_voltage_levels(target_levels, voltage_attr='voltage', store_original=True, handle_missing='infer')[source]

Group bus voltage levels to predefined target values.

This method reassigns each node’s voltage to the nearest target voltage level, creating clean voltage “islands” for subsequent voltage-aware partitioning.

Parameters:
  • target_levels (list[float]) – List of target voltage levels (kV) to harmonize to. Example: [220, 380] for European transmission grid.

  • voltage_attr (str) – Node attribute containing voltage level.

  • store_original (bool) – If True, stores original voltage in ‘original_{voltage_attr}’.

  • handle_missing (str) –

    How to handle nodes without voltage data:

    • ’infer’: Infer from connected neighbors

    • ’nearest’: Assign to nearest target level

    • ’error’: Raise an error

    • ’skip’: Leave as None

Returns:

Summary dict with:

  • ’total_nodes’: Total number of nodes processed

  • ’reassignments’: Dict mapping original_voltage to (target_voltage, count)

  • ’missing_handled’: Number of nodes with missing voltage

  • ’voltage_distribution’: Dict mapping target_voltage to node_count

Return type:

dict[str, Any]

Raises:

ValueError – If no graph loaded, target_levels empty, or handle_missing=’error’ with missing voltages.

Examples

>>> manager.group_by_voltage_levels([220, 380])
{'total_nodes': 6000, 'voltage_distribution': {220: 3500, 380: 2500}, ...}

InputDataManager#

class npap.managers.InputDataManager[source]#

Bases: object

Manages data loading from different sources.

Methods

load(strategy_name, **kwargs)

Load data using specified strategy.

register_strategy(name, strategy)

Register a new data loading strategy.

__init__()[source]#
register_strategy(name, strategy)[source]#

Register a new data loading strategy.

Return type:

None

Parameters:
load(strategy_name, **kwargs)[source]#

Load data using specified strategy.

Return type:

DiGraph | MultiDiGraph

Parameters:

strategy_name (str)

PartitioningManager#

class npap.managers.PartitioningManager[source]#

Bases: object

Manages partitioning strategies.

Methods

partition(graph, method, **kwargs)

Execute partitioning using specified strategy.

register_strategy(name, strategy)

Register a new partitioning strategy.

__init__()[source]#
register_strategy(name, strategy)[source]#

Register a new partitioning strategy.

Return type:

None

Parameters:
partition(graph, method, **kwargs)[source]#

Execute partitioning using specified strategy.

Return type:

dict[int, list[Any]]

Parameters:

AggregationManager#

class npap.managers.AggregationManager[source]#

Bases: object

Manages aggregation strategies and orchestrates the aggregation process.

Aggregation is a 3-step process: 1. Topology creation (graph structure) 2. Physical aggregation (electrical laws) 3. Statistical property aggregation (independent properties)

Methods

aggregate(graph, partition_map[, profile])

Execute aggregation using the specified profile.

aggregate_parallel_edges(graph[, ...])

Collapse parallel edges in a MultiDiGraph to produce a simple DiGraph.

get_mode_profile(mode, **overrides)

Get pre-defined aggregation profile for a given mode.

register_edge_strategy(name, strategy)

Register an edge property aggregation strategy.

register_node_strategy(name, strategy)

Register a node property aggregation strategy.

register_physical_strategy(name, strategy)

Register a physical aggregation strategy.

register_topology_strategy(name, strategy)

Register a topology strategy.

__init__()[source]#
register_topology_strategy(name, strategy)[source]#

Register a topology strategy.

Return type:

None

Parameters:
register_physical_strategy(name, strategy)[source]#

Register a physical aggregation strategy.

Return type:

None

Parameters:
register_node_strategy(name, strategy)[source]#

Register a node property aggregation strategy.

Return type:

None

Parameters:
register_edge_strategy(name, strategy)[source]#

Register an edge property aggregation strategy.

Return type:

None

Parameters:
static get_mode_profile(mode, **overrides)[source]#

Get pre-defined aggregation profile for a given mode.

Parameters:
  • mode (AggregationMode) – Aggregation mode.

  • **overrides (dict) – Override specific profile parameters.

Returns:

AggregationProfile configured for the mode.

Return type:

AggregationProfile

aggregate(graph, partition_map, profile=None)[source]#

Execute aggregation using the specified profile.

Aggregation is a 3-step process:

  1. Create topology (nodes + edge structure)

  2. Apply physical aggregation (if specified)

  3. Aggregate remaining properties statistically

Parameters:
  • graph (nx.DiGraph) – Graph to aggregate.

  • partition_map (dict[int, list[Any]]) – Mapping of cluster_id to list of node_ids.

  • profile (AggregationProfile, optional) – Aggregation profile (uses defaults if not provided).

Returns:

Aggregated graph. Returns a MultiDiGraph when profile.edge_type_properties is populated.

Return type:

nx.DiGraph or nx.MultiDiGraph

aggregate_parallel_edges(graph, edge_properties=None, default_strategy='sum', warn_on_defaults=True)[source]#

Collapse parallel edges in a MultiDiGraph to produce a simple DiGraph.

This method aggregates all parallel edges between the same directed node pairs using the specified edge property strategies, converting a MultiDiGraph to a simple DiGraph that can be partitioned.

For directed graphs, edges (A->B) and (B->A) are treated as separate edges and are aggregated independently.

Parameters:
  • graph (nx.MultiDiGraph) – MultiDiGraph with potential parallel edges.

  • edge_properties (dict[str, str], optional) – Dict mapping property names to aggregation strategies. Example: {"reactance": "average", "length": "sum"}.

  • default_strategy (str) – Strategy to use for properties not specified.

  • warn_on_defaults (bool) – Whether to warn when using default strategy.

Returns:

Simple DiGraph with parallel edges aggregated.

Return type:

nx.DiGraph

Raises: