Source code for topobench.transforms.liftings.pointcloud2hypergraph.mogmst_lifting

"""Mixture of Gaussians and Minimum Spanning Tree (MoGMST) Lifting."""

import numpy as np
import torch
import torch_geometric
from networkx import from_numpy_array, minimum_spanning_tree
from sklearn.metrics import pairwise_distances
from sklearn.mixture import GaussianMixture

from topobench.transforms.liftings.pointcloud2hypergraph.base import (
    PointCloud2HypergraphLifting,
)


[docs] class MoGMSTLifting(PointCloud2HypergraphLifting): r"""Lift a point cloud to a hypergraph. We find a Mixture of Gaussians and then create a Minimum Spanning Tree (MST) between the means of the Gaussians. Parameters ---------- min_components : int or None, optional The minimum number of components for the Mixture of Gaussians model. It needs to be at least 1 (default: None). max_components : int or None, optional The maximum number of components for the Mixture of Gaussians model. It needs to be greater or equal than min_components (default: None). random_state : int, optional The random state for the Mixture of Gaussians model (default: None). **kwargs : optional Additional arguments for the class. """ def __init__( self, min_components=None, max_components=None, random_state=None, **kwargs, ): super().__init__(**kwargs) self.min_components = min_components self.max_components = max_components self.random_state = random_state
[docs] def lift_topology(self, data: torch_geometric.data.Data) -> dict: """Lift the topology of a graph to a hypergraph. Parameters ---------- data : torch_geometric.data.Data The input data to be lifted. Returns ------- dict The lifted topology. """ # Find a mix of Gaussians number_of_points = data.x.shape[0] labels, num_components, means = self.find_mog(data.x.numpy()) # If no labels are found, return a single hyperedge with all the nodes if labels is None: incidence = torch.ones((number_of_points, 1)) incidence = incidence.to_sparse_coo() return { "incidence_hyperedges": incidence, "num_hyperedges": 1, "x_0": data.x, } # Create MST distance_matrix = pairwise_distances(means) original_graph = from_numpy_array(distance_matrix) mst = minimum_spanning_tree(original_graph) # Create hypergraph incidence incidence = torch.zeros((number_of_points, 2 * num_components)) # Add to which Gaussian the points belong to nodes = torch.arange(0, number_of_points, dtype=torch.int32) lbls = torch.tensor(labels, dtype=torch.int32) values = torch.ones(number_of_points) incidence[nodes, lbls] = values # Add neighbours in MST for i, j in mst.edges(): mask_i = labels == i mask_j = labels == j incidence[mask_i, num_components + j] = 1 incidence[mask_j, num_components + i] = 1 incidence = incidence.clone().detach().to_sparse_coo() return { "incidence_hyperedges": incidence, "num_hyperedges": 2 * num_components, "x_0": data.x, }
[docs] def find_mog(self, data) -> tuple[np.ndarray, int, np.ndarray]: """Find the best number of components for a Mixture of Gaussians model. Parameters ---------- data : np.ndarray The input data to be fitted. Returns ------- tuple[np.ndarray, int, np.ndarray] The labels of the data, the number of components and the means of the components. """ possible_num_components = [ self.min_components if self.min_components is not None else 1 ] if self.min_components is not None and self.max_components is not None: possible_num_components = range( self.min_components, self.max_components + 1 ) elif self.min_components is None and self.max_components is None: possible_num_components = [ 2**i for i in range(1, int(np.log2(data.shape[0] / 2)) + 1) ] best_score = float("inf") best_labels = None best_num_components = 0 means = None for i in possible_num_components: gm = GaussianMixture( n_components=i, random_state=self.random_state ) labels = gm.fit_predict(data) score = gm.aic(data) if score < best_score: best_score = score best_labels = labels best_num_components = i means = gm.means_ return best_labels, best_num_components, means