Source code for topobench.transforms.liftings.graph2hypergraph.modularity_maximization_lifting

"""This module implements the ModularityMaximizationLifting class."""

import torch
import torch_geometric

from topobench.transforms.liftings.graph2hypergraph.base import (
    Graph2HypergraphLifting,
)


[docs] class ModularityMaximizationLifting(Graph2HypergraphLifting): r"""Lifts graphs to hypergraph domain using modularity maximization and community detection. This method creates hyperedges based on the community structure of the graph and k-nearest neighbors within each community. Parameters ---------- num_communities : int, optional The number of communities to detect. Default is 2. k_neighbors : int, optional The number of nearest neighbors to consider within each community. Default is 3. **kwargs : optional Additional arguments for the base class. """ def __init__(self, num_communities=2, k_neighbors=3, **kwargs): super().__init__(**kwargs) self.num_communities = num_communities self.k_neighbors = k_neighbors
[docs] def modularity_matrix(self, data): r"""Compute the modularity matrix B of the graph. B_ij = A_ij - (k_i * k_j) / (2m) Parameters ---------- data : torch_geometric.data.Data The input graph data. Returns ------- torch.Tensor The modularity matrix B. """ a = torch.zeros((data.num_nodes, data.num_nodes)) a[data.edge_index[0], data.edge_index[1]] = 1 k = a.sum(dim=1) m = data.edge_index.size(1) / 2 return a - torch.outer(k, k) / (2 * m)
[docs] def kmeans(self, x, n_clusters, n_iterations=100): r"""Perform k-means clustering on the input data. Note: This implementation uses random initialization, so results may vary between runs even for the same input data. Parameters ---------- x : torch.Tensor The input data to cluster. n_clusters : int The number of clusters to form. n_iterations : int, optional The maximum number of iterations. Default is 100. Returns ------- torch.Tensor The cluster assignments for each input point. Warning ------- Due to random initialization of centroids, the resulting hyperedges may differ each time the code is run, even with the same input. """ # Initialize cluster centers randomly centroids = x[ torch.randperm( x.shape[0], )[:n_clusters] ] cluster_assignments = torch.zeros(x.shape[0], dtype=torch.long) for _ in range(n_iterations): # Assign points to the nearest centroid distances = torch.cdist(x, centroids) cluster_assignments = torch.argmin(distances, dim=1) # Update centroids new_centroids = torch.stack( [ x[cluster_assignments == k].mean(dim=0) for k in range(n_clusters) ] ) if torch.allclose(centroids, new_centroids): break centroids = new_centroids return cluster_assignments
[docs] def detect_communities(self, b): r"""Detect communities using spectral clustering on the modularity matrix. Parameters ---------- b : torch.Tensor The modularity matrix. Returns ------- torch.Tensor The community assignments for each node. """ eigvals, eigvecs = torch.linalg.eigh(b) leading_eigvecs = eigvecs[ :, torch.argsort(eigvals, descending=True)[: self.num_communities] ] # Use implemented k-means clustering on the leading eigenvectors return self.kmeans(leading_eigvecs, self.num_communities)
[docs] def lift_topology(self, data: torch_geometric.data.Data) -> dict: r"""Lift the graph topology to a hypergraph based on community structure and k-nearest neighbors. Parameters ---------- data : torch_geometric.data.Data The input graph data. Returns ------- dict A dictionary containing the incidence matrix of the hypergraph, number of hyperedges, and the original node features. """ b = self.modularity_matrix(data) community_assignments = self.detect_communities(b) num_nodes = data.x.shape[0] num_hyperedges = num_nodes incidence_matrix = torch.zeros(num_nodes, num_nodes) for i in range(num_nodes): # Find nodes in the same community same_community = ( (community_assignments == community_assignments[i]) .nonzero() .view(-1) ) # Calculate distances to nodes in the same community distances = torch.norm( data.x[i].unsqueeze(0) - data.x[same_community], dim=1 ) # Select k nearest neighbors within the community k = min(self.k_neighbors, len(same_community)) _, nearest_indices = torch.topk(distances, k, largest=False) nearest_neighbors = same_community[nearest_indices] # Create a hyperedge incidence_matrix[i, nearest_neighbors] = 1 incidence_matrix[i, i] = 1 # Include the node itself incidence_matrix = incidence_matrix.to_sparse_coo() return { "incidence_hyperedges": incidence_matrix, "num_hyperedges": num_hyperedges, "x_0": data.x, }