gigl.distributed.DistLinkPredictionDataPartitioner#
- class gigl.distributed.dist_link_prediction_data_partitioner.DistLinkPredictionDataPartitioner(should_assign_edges_by_src_node: bool = False, node_ids: Tensor | Dict[NodeType, Tensor] | None = None, node_features: Tensor | Dict[NodeType, Tensor] | None = None, edge_index: Tensor | Dict[EdgeType, Tensor] | None = None, edge_features: Tensor | Dict[EdgeType, Tensor] | None = None, positive_labels: Tensor | Dict[EdgeType, Tensor] | None = None, negative_labels: Tensor | Dict[EdgeType, Tensor] | None = None)#
Bases:
object
This class is based on GLT’s DistRandomPartitioner class (alibaba/graphlearn-for-pytorch) and has been optimized for better flexibility and memory management. We assume that init_rpc() and init_worker_group have been called to initialize the rpc and context, respectively, prior to this class. This class aims to partition homogeneous and heterogeneous input data, such as nodes, node features, edges, edge features, and any supervision labels across multiple machines. This class also produces partition books for edges and nodes, which are 1-d tensors that indicate which rank each node id and edge id are stored on. For example, the node partition book
[0, 0, 1, 2]
Means that node 0 is on rank 0, node 1 is on rank 0, node 2 is on rank 1, and node 3 is on rank 2.
In this class, node and edge id and feature tensors can be passed in either through the constructor or the public register functions. It is required to have registered these tensors to the class prior to partitioning. For optimal memory management, it is recommended that the reference to these large tensors be deleted after being registered to the class but before partitioning, as maintaining both original and intermediate tensors can cause OOM concerns. Registering these tensors is available through both the constructor and the register functions to support the multiple use ways customers can use partitioning:
Option 1: User wants to Partition just the nodes of a graph
` partitioner = DistLinkPredictionDataPartitioner() # Customer doesn't have to pass in excessive amounts of parameters to the constructor to partition only nodes partitioner.register_nodes(node_ids) del node_ids # Del reference to node_ids outside of DistLinkPredictionDataPartitioner to allow memory cleanup within the class partitioner.partition_nodes() # We may optionally want to call gc.collect() to ensure that any lingering memory is cleaned up, which may happen in cases where only a subset of inputs are partitioned (i.e no feats or labels) gc.collect() `
Option 2: User wants to partition all parts of a graph together and in sequence
``` partitioner = DistLinkPredictionDataPartitioner(node_ids, edge_index, node_features, edge_features, pos_labels, neg_labels) # Register is called in the __init__ functions and doesn’t need to be called at all outside the class. del (
node_ids, edge_index, node_features, edge_features, pos_labels, neg_labels
) # Del reference to tensors outside of DistLinkPredictionDataPartitioner to allow memory cleanup within the class partitioner.partition() # We may optionally want to call gc.collect() to ensure that any lingering memory is cleaned up, which may happen in cases where only a subset of inputs are partitioned (i.e no feats or labels) gc.collect() ```
The use case for only partitioning one entity through Option 1 may be in cases where we want to further parallelize some of the workload, since the previous GLT use case only had access to Partition() which calls partitioning of entities in sequence.
For optimal memory management, it is recommended that the reference to these large tensors be deleted after being registered to the class but before partitioning, as maintaining both original and intermediate tensors can cause OOM concerns.
Once all desired tensors are registered, you can either call the partition function to partition all registered fields or partition each field individually through the public partition_{entity_type} functions. With the partition function, fields which are not registered will return None. Note that each entity type should only be partitioned once, since registered fields are cleaned up after partitioning for optimal memory impact.
- From GLT’s description of DistRandomPartitioner:
Each distributed partitioner will process a part of the full graph and feature data, and partition them. A distributed partitioner’s rank is corresponding to a partition index, and the number of all distributed partitioners must be same with the number of output partitions. During partitioning, the partitioned results will be sent to other distributed partitioners according to their ranks. After partitioning, each distributed partitioner will own a partitioned graph with its corresponding rank and further save the partitioned results into the local output directory.
Methods
Initializes the parameters of the partitioner. Also optionally takes in node and edge tensors as arguments and registers them to the partitioner. Registered entities should be a dictionary of Dict[[NodeType or EdgeType], torch.Tensor] if heterogeneous or a torch.Tensor if homogeneous. This class assumes the distributed context has already been initialized outside of this class with the glt.distributed.init_worker_group() function and that rpc has been initialized with glt_distributed.init_rpc(). Args: should_assign_edges_by_src_node (bool): Whether edges should be assigned to the machine of the source nodes during partitioning node_ids (Optional[Union[torch.Tensor, Dict[NodeType, torch.Tensor]]]): Optionally registered node ids from input. Tensors should be of shape [num_nodes_on_current_rank] node_features (Optional[Union[torch.Tensor, Dict[NodeType, torch.Tensor]]]): Optionally registered node feats from input. Tensors should be of shope [num_nodes_on_current_rank, node_feat_dim] edge_index (Optional[Union[torch.Tensor, Dict[EdgeType, torch.Tensor]]]): Optionally registered edge indexes from input. Tensors should be of shape [2, num_edges_on_current_rank] edge_features (Optional[Union[torch.Tensor, Dict[EdgeType, torch.Tensor]]]): Optionally registered edge features from input. Tensors should be of shape [num_edges_on_current_rank, edge_feat_dim] positive_labels (Optional[Union[torch.Tensor, Dict[EdgeType, torch.Tensor]]]): Optionally registered positive labels from input. Tensors should be of shape [2, num_pos_labels_on_current_rank] negative_labels (Optional[Union[torch.Tensor, Dict[EdgeType, torch.Tensor]]]): Optionally registered negative labels from input. Tensors should be of shape [2, num_neg_labels_on_current_rank].
Calls partition on all registered fields. Note that at minimum nodes and edges must be registered when using this function. Returns: PartitionOutput: Reshuffled Outputs of Partitioning.
Partitions edges of a graph. If heterogeneous, partitions edges for all edge type. Must call partition_node first to get the node partition book as input. Args: node_partition_book (Union[PartitionBook, Dict[NodeType, PartitionBook]]): The computed Node Partition Book Returns: Union[ Tuple[GraphPartitionData, PartitionBook], Tuple[Dict[EdgeType, GraphPartitionData], Dict[EdgeType, PartitionBook]], ]: Partitioned Graph Data and corresponding edge partition book, is a dictionary if heterogeneous.
Partitions edge features of a graph. If heterogeneous, partitions edge features for all edge type. Must call partition_edge first to get the edge partition book as input. Args: edge_partition_book (Union[PartitionBook, Dict[EdgeType, PartitionBook]]): The computed Edge Partition Book Returns: Union[FeaturePartitionData, Dict[EdgeType, FeaturePartitionData]]: Feature Partition Data of ids and features or Dict if heterogeneous.
Partitions positive or negative labels of a graph. If heterogeneous, partitions labels for all edge type. Must call partition_node first to get the node partition book as input. Args: node_partition_book (Union[PartitionBook, Dict[NodeType, PartitionBook]]): The computed Node Partition Book is_positive (bool): Whether positive labels are currently being registered. If False, negative labels will be partitioned. Returns: Union[torch.Tensor, Dict[EdgeType, torch.Tensor]]: Returns the edge indices for partitioned positive or negative label, dependent on the is_positive flag.
Partitions nodes of a graph.
Partitions node features of a graph.
Registers the edge features to the partitioner.
Registers the edge_index to the partitioner.
Registers the positive or negative label to the partitioner.
Registers the node features to the partitioner.
Registers the node ids to the partitioner.
- __init__(should_assign_edges_by_src_node: bool = False, node_ids: Tensor | Dict[NodeType, Tensor] | None = None, node_features: Tensor | Dict[NodeType, Tensor] | None = None, edge_index: Tensor | Dict[EdgeType, Tensor] | None = None, edge_features: Tensor | Dict[EdgeType, Tensor] | None = None, positive_labels: Tensor | Dict[EdgeType, Tensor] | None = None, negative_labels: Tensor | Dict[EdgeType, Tensor] | None = None)#
Initializes the parameters of the partitioner. Also optionally takes in node and edge tensors as arguments and registers them to the partitioner. Registered entities should be a dictionary of Dict[[NodeType or EdgeType], torch.Tensor] if heterogeneous or a torch.Tensor if homogeneous. This class assumes the distributed context has already been initialized outside of this class with the glt.distributed.init_worker_group() function and that rpc has been initialized with glt_distributed.init_rpc(). Args:
should_assign_edges_by_src_node (bool): Whether edges should be assigned to the machine of the source nodes during partitioning node_ids (Optional[Union[torch.Tensor, Dict[NodeType, torch.Tensor]]]): Optionally registered node ids from input. Tensors should be of shape [num_nodes_on_current_rank] node_features (Optional[Union[torch.Tensor, Dict[NodeType, torch.Tensor]]]): Optionally registered node feats from input. Tensors should be of shope [num_nodes_on_current_rank, node_feat_dim] edge_index (Optional[Union[torch.Tensor, Dict[EdgeType, torch.Tensor]]]): Optionally registered edge indexes from input. Tensors should be of shape [2, num_edges_on_current_rank] edge_features (Optional[Union[torch.Tensor, Dict[EdgeType, torch.Tensor]]]): Optionally registered edge features from input. Tensors should be of shape [num_edges_on_current_rank, edge_feat_dim] positive_labels (Optional[Union[torch.Tensor, Dict[EdgeType, torch.Tensor]]]): Optionally registered positive labels from input. Tensors should be of shape [2, num_pos_labels_on_current_rank] negative_labels (Optional[Union[torch.Tensor, Dict[EdgeType, torch.Tensor]]]): Optionally registered negative labels from input. Tensors should be of shape [2, num_neg_labels_on_current_rank]
- __weakref__#
list of weak references to the object (if defined)
- partition() PartitionOutput #
Calls partition on all registered fields. Note that at minimum nodes and edges must be registered when using this function. Returns:
PartitionOutput: Reshuffled Outputs of Partitioning
- partition_edge(node_partition_book: PartitionBook | Dict[NodeType, PartitionBook]) Tuple[GraphPartitionData, PartitionBook] | Tuple[Dict[EdgeType, GraphPartitionData], Dict[EdgeType, PartitionBook]] #
Partitions edges of a graph. If heterogeneous, partitions edges for all edge type. Must call partition_node first to get the node partition book as input. Args:
node_partition_book (Union[PartitionBook, Dict[NodeType, PartitionBook]]): The computed Node Partition Book
- Returns:
- Union[
Tuple[GraphPartitionData, PartitionBook], Tuple[Dict[EdgeType, GraphPartitionData], Dict[EdgeType, PartitionBook]],
]: Partitioned Graph Data and corresponding edge partition book, is a dictionary if heterogeneous
- partition_edge_features(edge_partition_book: PartitionBook | Dict[EdgeType, PartitionBook]) FeaturePartitionData | Dict[EdgeType, FeaturePartitionData] #
Partitions edge features of a graph. If heterogeneous, partitions edge features for all edge type. Must call partition_edge first to get the edge partition book as input. Args:
edge_partition_book (Union[PartitionBook, Dict[EdgeType, PartitionBook]]): The computed Edge Partition Book
- Returns:
Union[FeaturePartitionData, Dict[EdgeType, FeaturePartitionData]]: Feature Partition Data of ids and features or Dict if heterogeneous.
- partition_labels(node_partition_book: PartitionBook | Dict[NodeType, PartitionBook], is_positive: bool) Tensor | Dict[EdgeType, Tensor] #
Partitions positive or negative labels of a graph. If heterogeneous, partitions labels for all edge type. Must call partition_node first to get the node partition book as input. Args:
node_partition_book (Union[PartitionBook, Dict[NodeType, PartitionBook]]): The computed Node Partition Book is_positive (bool): Whether positive labels are currently being registered. If False, negative labels will be partitioned.
- Returns:
Union[torch.Tensor, Dict[EdgeType, torch.Tensor]]: Returns the edge indices for partitioned positive or negative label, dependent on the is_positive flag
- partition_node() PartitionBook | Dict[NodeType, PartitionBook] #
Partitions nodes of a graph. If heterogeneous, partitions nodes for all node types.
- Returns:
Union[PartitionBook, Dict[NodeType, PartitionBook]]: Partition Book of input nodes or Dict if heterogeneous
- partition_node_features(node_partition_book: PartitionBook | Dict[NodeType, PartitionBook]) FeaturePartitionData | Dict[NodeType, FeaturePartitionData] #
Partitions node features of a graph. If heterogeneous, partitions features for all node type. Must call partition_node first to get the node partition book as input.
- Args:
node_partition_book (Union[PartitionBook, Dict[NodeType, PartitionBook]]): The Computed Node Partition Book
- Returns:
Union[FeaturePartitionData, Dict[NodeType, FeaturePartitionData]]: Feature Partition Data of ids and features or Dict if heterogeneous.
- register_edge_features(edge_features: Tensor | Dict[EdgeType, Tensor]) None #
Registers the edge features to the partitioner.
For optimal memory management, it is recommended that the reference to edge_features tensor be deleted after calling this function using del <tensor>, as maintaining both original and intermediate tensors can cause OOM concerns. We do not need to perform all_gather calls here since register_edge_index is responsible for determining total number of edges across all ranks and inferrring edge ids. Args:
edge_features(Union[torch.Tensor, Dict[EdgeType, torch.Tensor]]): Input edge features which is either a torch.Tensor if homogeneous or a Dict if heterogeneous
- register_edge_index(edge_index: Tensor | Dict[EdgeType, Tensor]) None #
Registers the edge_index to the partitioner. Also computes additional fields for partitioning such as the total number of edges across all ranks and the number of edges on the current rnak.
For optimal memory management, it is recommended that the reference to edge_index tensor be deleted after calling this function using del <tensor>, as maintaining both original and intermediate tensors can cause OOM concerns. Args:
edge_index (Union[torch.Tensor, Dict[EdgeType, torch.Tensor]]): Input edge index which is either a torch.Tensor if homogeneous or a Dict if heterogeneous
- register_labels(label_edge_index: Tensor | Dict[EdgeType, Tensor], is_positive: bool) None #
Registers the positive or negative label to the partitioner. Note that for the homogeneous case, all edge types of the graph must be present in the label edge index dictionary.
For optimal memory management, it is recommended that the reference to the label tensor be deleted after calling this function using del <tensor>, as maintaining both original and intermediate tensors can cause OOM concerns. We do not need to perform all_gather calls here since register_edge_index is responsible for determining total number of edges across all ranks and inferring edge ids. Args:
label_edge_index (Union[torch.Tensor, Dict[EdgeType, torch.Tensor]]): Input positive or negative labels which is either a torch.Tensor if homogeneous or a Dict if heterogeneous is_positive (bool): Whether positive labels are currently being registered. If False, labels will be registered as negative
- register_node_features(node_features: Tensor | Dict[NodeType, Tensor]) None #
Registers the node features to the partitioner.
For optimal memory management, it is recommended that the reference to node_features tensor be deleted after calling this function using del <tensor>, as maintaining both original and intermediate tensors can cause OOM concerns. We do not need to perform all_gather calls here since register_node_ids is responsible for determining total number of nodes across all ranks. Args:
node_features(Union[torch.Tensor, Dict[NodeType, torch.Tensor]]): Input node features which is either a torch.Tensor if homogeneous or a Dict if heterogeneous
- register_node_ids(node_ids: Tensor | Dict[NodeType, Tensor]) None #
Registers the node ids to the partitioner. Also computes additional fields for partitioning such as the total number of nodes across all ranks.
For optimal memory management, it is recommended that the reference to the node_id tensor be deleted after calling this function using del <tensor>, as maintaining both original and intermediate tensors can cause OOM concerns. Args:
node_ids (Union[torch.Tensor, Dict[NodeType, torch.Tensor]]): Input node_ids which is either a torch.Tensor if homogeneous or a Dict if heterogeneous