Source code for dgl.dataloading.neighbor

"""Data loading components for neighbor sampling"""
from .dataloader import BlockSampler
from .. import sampling, subgraph, distributed
from .. import ndarray as nd
from .. import backend as F
from ..base import ETYPE

[docs]class MultiLayerNeighborSampler(BlockSampler): """Sampler that builds computational dependency of node representations via neighbor sampling for multilayer GNN. This sampler will make every node gather messages from a fixed number of neighbors per edge type. The neighbors are picked uniformly. Parameters ---------- fanouts : list[int] or list[dict[etype, int] or None] List of neighbors to sample per edge type for each GNN layer, starting from the first layer. If the graph is homogeneous, only an integer is needed for each layer. If None is provided for one layer, all neighbors will be included regardless of edge types. If -1 is provided for one edge type on one layer, then all inbound edges of that edge type will be included. replace : bool, default True Whether to sample with replacement return_eids : bool, default False Whether to return the edge IDs involved in message passing in the MFG. If True, the edge IDs will be stored as an edge feature named ``dgl.EID``. Examples -------- To train a 3-layer GNN for node classification on a set of nodes ``train_nid`` on a homogeneous graph where each node takes messages from 5, 10, 15 neighbors for the first, second, and third layer respectively (assuming the backend is PyTorch): >>> sampler = dgl.dataloading.MultiLayerNeighborSampler([5, 10, 15]) >>> collator = dgl.dataloading.NodeCollator(g, train_nid, sampler) >>> dataloader = torch.utils.data.DataLoader( ... collator.dataset, collate_fn=collator.collate, ... batch_size=1024, shuffle=True, drop_last=False, num_workers=4) >>> for blocks in dataloader: ... train_on(blocks) If training on a heterogeneous graph and you want different number of neighbors for each edge type, one should instead provide a list of dicts. Each dict would specify the number of neighbors to pick per edge type. >>> sampler = dgl.dataloading.MultiLayerNeighborSampler([ ... {('user', 'follows', 'user'): 5, ... ('user', 'plays', 'game'): 4, ... ('game', 'played-by', 'user'): 3}] * 3) Notes ----- For the concept of MFGs, please refer to :ref:`User Guide Section 6 <guide-minibatch>` and :doc:`Minibatch Training Tutorials <tutorials/large/L0_neighbor_sampling_overview>`. """ def __init__(self, fanouts, replace=False, return_eids=False): super().__init__(len(fanouts), return_eids) self.fanouts = fanouts self.replace = replace # used to cache computations and memory allocations # list[dgl.nd.NDArray]; each array stores the fan-outs of all edge types self.fanout_arrays = [] self.prob_arrays = None
[docs] def sample_frontier(self, block_id, g, seed_nodes): fanout = self.fanouts[block_id] if isinstance(g, distributed.DistGraph): if fanout is None: # TODO(zhengda) There is a bug in the distributed version of in_subgraph. # let's use sample_neighbors to replace in_subgraph for now. frontier = distributed.sample_neighbors(g, seed_nodes, -1, replace=False) else: if len(g.etypes) > 1: # heterogeneous distributed graph # The edge type is stored in g.edata[dgl.ETYPE] assert isinstance(fanout, int), "For distributed training, " \ "we can only sample same number of neighbors for each edge type" frontier = distributed.sample_etype_neighbors( g, seed_nodes, ETYPE, fanout, replace=self.replace) else: frontier = distributed.sample_neighbors( g, seed_nodes, fanout, replace=self.replace) else: if fanout is None: frontier = subgraph.in_subgraph(g, seed_nodes) else: self._build_fanout(block_id, g) self._build_prob_arrays(g) frontier = sampling.sample_neighbors( g, seed_nodes, self.fanout_arrays[block_id], replace=self.replace, prob=self.prob_arrays) return frontier
def _build_prob_arrays(self, g): # build prob_arrays only once if self.prob_arrays is None: self.prob_arrays = [nd.array([], ctx=nd.cpu())] * len(g.etypes) def _build_fanout(self, block_id, g): assert not self.fanouts is None, \ "_build_fanout() should only be called when fanouts is not None" # build fanout_arrays only once for each layer while block_id >= len(self.fanout_arrays): for i in range(len(self.fanouts)): fanout = self.fanouts[i] if not isinstance(fanout, dict): fanout_array = [int(fanout)] * len(g.etypes) else: if len(fanout) != len(g.etypes): raise DGLError('Fan-out must be specified for each edge type ' 'if a dict is provided.') fanout_array = [None] * len(g.etypes) for etype, value in fanout.items(): fanout_array[g.get_etype_id(etype)] = value self.fanout_arrays.append( F.to_dgl_nd(F.tensor(fanout_array, dtype=F.int64)))
[docs]class MultiLayerFullNeighborSampler(MultiLayerNeighborSampler): """Sampler that builds computational dependency of node representations by taking messages from all neighbors for multilayer GNN. This sampler will make every node gather messages from every single neighbor per edge type. Parameters ---------- n_layers : int The number of GNN layers to sample. return_eids : bool, default False Whether to return the edge IDs involved in message passing in the MFG. If True, the edge IDs will be stored as an edge feature named ``dgl.EID``. Examples -------- To train a 3-layer GNN for node classification on a set of nodes ``train_nid`` on a homogeneous graph where each node takes messages from all neighbors for the first, second, and third layer respectively (assuming the backend is PyTorch): >>> sampler = dgl.dataloading.MultiLayerFullNeighborSampler(3) >>> collator = dgl.dataloading.NodeCollator(g, train_nid, sampler) >>> dataloader = torch.utils.data.DataLoader( ... collator.dataset, collate_fn=collator.collate, ... batch_size=1024, shuffle=True, drop_last=False, num_workers=4) >>> for blocks in dataloader: ... train_on(blocks) Notes ----- For the concept of MFGs, please refer to :ref:`User Guide Section 6 <guide-minibatch>` and :doc:`Minibatch Training Tutorials <tutorials/large/L0_neighbor_sampling_overview>`. """ def __init__(self, n_layers, return_eids=False): super().__init__([None] * n_layers, return_eids=return_eids)