Extending NPAP#

NPAP’s strategy pattern architecture makes it easy to add custom data loaders, partitioning algorithms, and aggregation strategies. This guide shows how to extend each component.

Tip

Want to contribute your custom strategy to NPAP? See the Contributing to NPAP guide for details on forking the repository, development setup, and submitting pull requests.

Architecture Overview#

        flowchart TB
    subgraph Interfaces
        DLS[DataLoadingStrategy]
        PS[PartitioningStrategy]
        TS[TopologyStrategy]
        PAS[PhysicalAggregationStrategy]
        NPS[NodePropertyStrategy]
        EPS[EdgePropertyStrategy]
    end

    subgraph Your Code
        CL[Custom Loader]
        CP[Custom Partitioner]
        CA[Custom Aggregator]
    end

    CL -->|inherits| DLS
    CP -->|inherits| PS
    CA -->|inherits| NPS
    CA -->|inherits| EPS

    style DLS fill:#2993B5,stroke:#1d6f8a,color:#fff
    style PS fill:#2993B5,stroke:#1d6f8a,color:#fff
    style TS fill:#2993B5,stroke:#1d6f8a,color:#fff
    style PAS fill:#2993B5,stroke:#1d6f8a,color:#fff
    style NPS fill:#2993B5,stroke:#1d6f8a,color:#fff
    style EPS fill:#2993B5,stroke:#1d6f8a,color:#fff
    style CL fill:#0fad6b,stroke:#076b3f,color:#fff
    style CP fill:#0fad6b,stroke:#076b3f,color:#fff
    style CA fill:#0fad6b,stroke:#076b3f,color:#fff
    

All strategies inherit from abstract base classes in npap.interfaces.

Custom Data Loading Strategy#

Interface#

from abc import ABC, abstractmethod
import networkx as nx

class DataLoadingStrategy(ABC):
    @abstractmethod
    def load(self, **kwargs) -> nx.DiGraph | nx.MultiDiGraph:
        """Load data and return a NetworkX directed graph."""
        pass

    @abstractmethod
    def validate_inputs(self, **kwargs) -> bool:
        """Validate input parameters before loading."""
        pass

Example: JSON Loader#

from npap.interfaces import DataLoadingStrategy
import networkx as nx
import json

class JSONFileStrategy(DataLoadingStrategy):
    """Load network from a JSON file."""

    def validate_inputs(self, **kwargs) -> bool:
        """Validate that file path is provided and file exists."""
        if "file_path" not in kwargs:
            return False

        import os
        return os.path.exists(kwargs["file_path"])

    def load(self, **kwargs) -> nx.DiGraph:
        """Load graph from JSON file.

        Parameters
        ----------
        file_path : str
            Path to JSON file with 'nodes' and 'edges' keys.

        Returns
        -------
        nx.DiGraph
            Loaded network graph.
        """
        file_path = kwargs["file_path"]

        with open(file_path, "r") as f:
            data = json.load(f)

        G = nx.DiGraph()

        # Add nodes with attributes
        for node in data["nodes"]:
            node_id = node.pop("id")
            G.add_node(node_id, **node)

        # Add edges with attributes
        for edge in data["edges"]:
            source = edge.pop("from")
            target = edge.pop("to")
            G.add_edge(source, target, **edge)

        return G

Registering the Strategy#

import npap

manager = npap.PartitionAggregatorManager()

# Register custom strategy
manager.input_manager.register_strategy("json_file", JSONFileStrategy())

# Use it
graph = manager.load_data("json_file", file_path="network.json")

Custom Partitioning Strategy#

Interface#

from abc import ABC, abstractmethod
import networkx as nx

class PartitioningStrategy(ABC):
    @property
    @abstractmethod
    def required_attributes(self) -> dict[str, list[str]]:
        """Required node and edge attributes.

        Returns
        -------
        dict
            {'nodes': ['attr1', 'attr2'], 'edges': ['attr3']}
        """
        pass

    @abstractmethod
    def partition(
        self,
        graph: nx.DiGraph,
        **kwargs
    ) -> dict[int, list]:
        """Partition the graph into clusters.

        Parameters
        ----------
        graph : nx.DiGraph
            The network to partition.
        **kwargs
            Algorithm-specific parameters.

        Returns
        -------
        dict[int, list]
            Mapping of cluster_id -> list of node IDs.
        """
        pass

Example: Degree-Based Partitioning#

from npap.interfaces import PartitioningStrategy
import networkx as nx

class DegreePartitioning(PartitioningStrategy):
    """Partition nodes based on their degree."""

    @property
    def required_attributes(self) -> dict[str, list[str]]:
        # No special attributes required
        return {"nodes": [], "edges": []}

    def partition(
        self,
        graph: nx.DiGraph,
        n_clusters: int = 3,
        **kwargs
    ) -> dict[int, list]:
        """Partition by node degree into n_clusters groups.

        Parameters
        ----------
        graph : nx.DiGraph
            Input graph.
        n_clusters : int
            Number of degree-based clusters.

        Returns
        -------
        dict[int, list]
            Cluster mapping.
        """
        import numpy as np

        # Get degrees
        degrees = dict(graph.degree())
        nodes = list(degrees.keys())
        degree_values = np.array(list(degrees.values()))

        # Create clusters based on degree percentiles
        percentiles = np.linspace(0, 100, n_clusters + 1)
        thresholds = np.percentile(degree_values, percentiles)

        # Assign nodes to clusters
        clusters = {i: [] for i in range(n_clusters)}
        for node, degree in degrees.items():
            for i in range(n_clusters):
                if thresholds[i] <= degree <= thresholds[i + 1]:
                    clusters[i].append(node)
                    break

        return clusters

Registering the Strategy#

manager = npap.PartitionAggregatorManager()
manager.partitioning_manager.register_strategy("degree", DegreePartitioning())

# Use it
partition = manager.partition("degree", n_clusters=5)

Using the Validation Decorator#

NPAP provides a decorator for automatic attribute validation:

from npap.interfaces import PartitioningStrategy
from npap.utils import validate_required_attributes

class MyPartitioning(PartitioningStrategy):
    @property
    def required_attributes(self) -> dict[str, list[str]]:
        return {"nodes": ["weight"], "edges": []}

    @validate_required_attributes
    def partition(self, graph, **kwargs):
        # Validation happens automatically before this runs
        # If 'weight' is missing, raises ValidationError
        ...

Custom Aggregation Strategies#

Node Property Strategy#

from npap.interfaces import NodePropertyStrategy
import networkx as nx

class MedianNodeStrategy(NodePropertyStrategy):
    """Aggregate node properties using median."""

    def aggregate_property(
        self,
        graph: nx.DiGraph,
        nodes: list,
        property_name: str
    ):
        """Compute median of property across nodes.

        Parameters
        ----------
        graph : nx.DiGraph
            The original graph.
        nodes : list
            Nodes in the cluster.
        property_name : str
            Name of property to aggregate.

        Returns
        -------
        float
            Median value.
        """
        import numpy as np

        values = [
            graph.nodes[n][property_name]
            for n in nodes
            if property_name in graph.nodes[n]
        ]

        if not values:
            return 0

        return float(np.median(values))

Edge Property Strategy#

from npap.interfaces import EdgePropertyStrategy

class MaxEdgeStrategy(EdgePropertyStrategy):
    """Aggregate edge properties using maximum value."""

    def aggregate_property(
        self,
        original_edges: list[dict[str, Any]],
        property_name: str
    ):
        """Return maximum property value across edges.

        Parameters
        ----------
        original_edges : list[dict[str, Any]]
            List of edge attribute dictionaries.
        property_name : str
            Name of property to aggregate.

        Returns
        -------
        float
            Maximum value.
        """
        values = [
            edge[property_name]
            for edge in original_edges
            if property_name in edge
        ]

        if not values:
            return 0

        return max(values)

Registering Aggregation Strategies#

manager = npap.PartitionAggregatorManager()

# Register node strategy
manager.aggregation_manager.register_node_strategy("median", MedianNodeStrategy())

# Register edge strategy
manager.aggregation_manager.register_edge_strategy("max", MaxEdgeStrategy())

# Use in profile
from npap import AggregationProfile

profile = AggregationProfile(
    node_properties={
        "load": "median"  # Use our custom strategy
    },
    edge_properties={
        "capacity": "max"  # Use our custom strategy
    }
)

aggregated = manager.aggregate(profile=profile)

Custom Topology Strategy#

For custom network reduction approaches:

from npap.interfaces import TopologyStrategy
import networkx as nx

class FullyConnectedTopology(TopologyStrategy):
    """Create fully connected aggregated network."""

    @property
    def can_create_new_edges(self) -> bool:
        """This strategy creates edges that didn't exist."""
        return True

    def create_topology(
        self,
        graph: nx.DiGraph,
        partition_map: dict[int, list]
    ) -> nx.DiGraph:
        """Create fully connected topology.

        Parameters
        ----------
        graph : nx.DiGraph
            Original graph.
        partition_map : dict[int, list]
            Cluster assignments.

        Returns
        -------
        nx.DiGraph
            Fully connected aggregated topology.
        """
        G = nx.DiGraph()

        # Add cluster nodes
        clusters = list(partition_map.keys())
        G.add_nodes_from(clusters)

        # Add all possible edges
        for i in clusters:
            for j in clusters:
                if i != j:
                    G.add_edge(i, j)

        return G

Best Practices#

1. Follow the Interface Contract#

Always implement all abstract methods:

class MyStrategy(PartitioningStrategy):
    @property
    def required_attributes(self):
        return {"nodes": [], "edges": []}  # Must implement

    def partition(self, graph, **kwargs):
        pass  # Must implement

2. Handle Edge Cases#

def partition(self, graph, n_clusters=10, **kwargs):
    # Handle empty graph
    if graph.number_of_nodes() == 0:
        return {}

    # Handle n_clusters > nodes
    n_clusters = min(n_clusters, graph.number_of_nodes())

    # Proceed with partitioning
    ...

3. Use Type Hints#

from typing import Any
import networkx as nx

def partition(
    self,
    graph: nx.DiGraph,
    n_clusters: int = 10,
    **kwargs: Any
) -> dict[int, list[Any]]:
    ...

4. Document Your Strategy#

class MyPartitioning(PartitioningStrategy):
    """Short description of the strategy.

    This strategy partitions networks based on [algorithm description].

    Parameters
    ----------
    param1 : type
        Description of parameter.

    Attributes
    ----------
    required_attributes : dict
        Requires 'lat', 'lon' on nodes.

    Examples
    --------
    >>> strategy = MyPartitioning()
    >>> result = strategy.partition(graph, n_clusters=5)
    """

5. Raise Appropriate Exceptions#

from npap import PartitioningError, ValidationError

class MyPartitioning(PartitioningStrategy):
    def partition(self, graph, n_clusters=10, **kwargs):
        if n_clusters < 1:
            raise ValidationError(
                "n_clusters must be positive",
                strategy="my_partitioning"
            )

        try:
            # Algorithm logic
            ...
        except Exception as e:
            raise PartitioningError(
                f"Partitioning failed: {e}",
                strategy="my_partitioning"
            )

Complete Example: LMP-Based Partitioning#

Here’s a complete example of a Locational Marginal Price (LMP) based partitioning strategy:

from npap.interfaces import PartitioningStrategy
from npap.utils import validate_required_attributes, create_partition_map
import networkx as nx
import numpy as np

class LMPPartitioning(PartitioningStrategy):
    """Partition based on Locational Marginal Prices.

    Nodes with similar LMPs are grouped together, as they
    have similar economic value for power injection.

    Required Attributes
    -------------------
    nodes : lmp
        Locational marginal price at each node.
    """

    @property
    def required_attributes(self) -> dict[str, list[str]]:
        return {"nodes": ["lmp"], "edges": []}

    @validate_required_attributes
    def partition(
        self,
        graph: nx.DiGraph,
        n_clusters: int = 10,
        **kwargs
    ) -> dict[int, list]:
        """Partition by LMP using k-means.

        Parameters
        ----------
        graph : nx.DiGraph
            Network with 'lmp' node attribute.
        n_clusters : int
            Number of price zones.

        Returns
        -------
        dict[int, list]
            Cluster mapping.
        """
        from sklearn.cluster import KMeans

        nodes = list(graph.nodes())
        lmps = np.array([
            [graph.nodes[n]["lmp"]]
            for n in nodes
        ])

        # Cluster by LMP
        kmeans = KMeans(n_clusters=n_clusters, random_state=42)
        labels = kmeans.fit_predict(lmps)

        return create_partition_map(nodes, labels)


# Usage
manager = npap.PartitionAggregatorManager()
manager.partitioning_manager.register_strategy("lmp", LMPPartitioning())

# Add LMP data to graph
for node in graph.nodes():
    graph.nodes[node]["lmp"] = compute_lmp(node)  # Your LMP calculation

manager.load_data("networkx_direct", graph=graph)
partition = manager.partition("lmp", n_clusters=10)

Testing Custom Strategies#

import pytest
import networkx as nx
from my_strategies import MyPartitioning

class TestMyPartitioning:
    def test_basic_partition(self):
        # Create test graph
        G = nx.DiGraph()
        G.add_nodes_from(range(10))
        for i in range(9):
            G.add_edge(i, i + 1)

        strategy = MyPartitioning()
        result = strategy.partition(G, n_clusters=3)

        # Verify result structure
        assert isinstance(result, dict)
        assert len(result) == 3

        # Verify all nodes assigned
        all_nodes = set()
        for nodes in result.values():
            all_nodes.update(nodes)
        assert all_nodes == set(range(10))

    def test_empty_graph(self):
        G = nx.DiGraph()
        strategy = MyPartitioning()
        result = strategy.partition(G, n_clusters=3)
        assert result == {}

    def test_missing_attributes(self):
        G = nx.DiGraph()
        G.add_node(1)  # Missing required attributes

        strategy = MyPartitioning()
        with pytest.raises(ValidationError):
            strategy.partition(G, n_clusters=1)