Source code for dgl.contrib.sampling.sampler

"""This file contains NodeFlow samplers."""

import sys
import numpy as np
import threading
from numbers import Integral
import traceback

from ..._ffi.function import _init_api
from ... import utils
from ...nodeflow import NodeFlow
from ... import backend as F
from ... import subgraph

try:
    import Queue as queue
except ImportError:
    import queue

__all__ = ['NeighborSampler', 'LayerSampler', 'EdgeSampler']

class SamplerIter(object):
    def __init__(self, sampler):
        super(SamplerIter, self).__init__()
        self._sampler = sampler
        self._batches = []
        self._batch_idx = 0

    def prefetch(self):
        batches = self._sampler.fetch(self._batch_idx)
        self._batches.extend(batches)
        self._batch_idx += len(batches)

    def __next__(self):
        if len(self._batches) == 0:
            self.prefetch()
        if len(self._batches) == 0:
            raise StopIteration
        return self._batches.pop(0)

class PrefetchingWrapper(object):
    """Internal shared prefetcher logic. It can be sub-classed by a Thread-based implementation
    or Process-based implementation."""
    _dataq = None  # Data queue transmits prefetched elements
    _controlq = None  # Control queue to instruct thread / process shutdown
    _errorq = None  # Error queue to transmit exceptions from worker to master

    _checked_start = False  # True once startup has been checkd by _check_start

    def __init__(self, sampler_iter, num_prefetch):
        super(PrefetchingWrapper, self).__init__()
        self.sampler_iter = sampler_iter
        assert num_prefetch > 0, 'Unbounded Prefetcher is unsupported.'
        self.num_prefetch = num_prefetch

    def run(self):
        """Method representing the process activity."""
        # Startup - Master waits for this
        try:
            loader_iter = self.sampler_iter
            self._errorq.put(None)
        except Exception as e:  # pylint: disable=broad-except
            tb = traceback.format_exc()
            self._errorq.put((e, tb))

        while True:
            try:  # Check control queue
                c = self._controlq.get(False)
                if c is None:
                    break
                else:
                    raise RuntimeError('Got unexpected control code {}'.format(repr(c)))
            except queue.Empty:
                pass
            except RuntimeError as e:
                tb = traceback.format_exc()
                self._errorq.put((e, tb))
                self._dataq.put(None)

            try:
                data = next(loader_iter)
                error = None
            except Exception as e:  # pylint: disable=broad-except
                tb = traceback.format_exc()
                error = (e, tb)
                data = None
            finally:
                self._errorq.put(error)
                self._dataq.put(data)

    def __next__(self):
        next_item = self._dataq.get()
        next_error = self._errorq.get()

        if next_error is None:
            return next_item
        else:
            self._controlq.put(None)
            if isinstance(next_error[0], StopIteration):
                raise StopIteration
            else:
                return self._reraise(*next_error)

    def _reraise(self, e, tb):
        print('Reraising exception from Prefetcher', file=sys.stderr)
        print(tb, file=sys.stderr)
        raise e

    def _check_start(self):
        assert not self._checked_start
        self._checked_start = True
        next_error = self._errorq.get(block=True)
        if next_error is not None:
            self._reraise(*next_error)

    def next(self):
        return self.__next__()

class ThreadPrefetchingWrapper(PrefetchingWrapper, threading.Thread):
    """Internal threaded prefetcher."""

    def __init__(self, *args, **kwargs):
        super(ThreadPrefetchingWrapper, self).__init__(*args, **kwargs)
        self._dataq = queue.Queue(self.num_prefetch)
        self._controlq = queue.Queue()
        self._errorq = queue.Queue(self.num_prefetch)
        self.daemon = True
        self.start()
        self._check_start()


class NodeFlowSampler(object):
    '''Base class that generates NodeFlows from a graph.

    Class properties
    ----------------
    immutable_only : bool
        Whether the sampler only works on immutable graphs.
        Subclasses can override this property.
    '''
    immutable_only = False

    def __init__(
            self,
            g,
            batch_size,
            seed_nodes,
            shuffle,
            num_prefetch,
            prefetching_wrapper_class):
        self._g = g
        if self.immutable_only and not g._graph.is_readonly():
            raise NotImplementedError("This loader only support read-only graphs.")

        self._batch_size = int(batch_size)

        if seed_nodes is None:
            self._seed_nodes = F.arange(0, g.number_of_nodes())
        else:
            self._seed_nodes = seed_nodes
        if shuffle:
            self._seed_nodes = F.rand_shuffle(self._seed_nodes)
        self._seed_nodes = utils.toindex(self._seed_nodes)

        if num_prefetch:
            self._prefetching_wrapper_class = prefetching_wrapper_class
        self._num_prefetch = num_prefetch

    def fetch(self, current_nodeflow_index):
        '''
        Method that returns the next "bunch" of NodeFlows.
        Each worker will return a single NodeFlow constructed from a single
        batch.

        Subclasses of NodeFlowSampler should override this method.

        Parameters
        ----------
        current_nodeflow_index : int
            How many NodeFlows the sampler has generated so far.

        Returns
        -------
        list[NodeFlow]
            Next "bunch" of nodeflows to be processed.
        '''
        raise NotImplementedError

    def __iter__(self):
        it = SamplerIter(self)
        if self._num_prefetch:
            return self._prefetching_wrapper_class(it, self._num_prefetch)
        else:
            return it

    @property
    def g(self):
        return self._g

    @property
    def seed_nodes(self):
        return self._seed_nodes

    @property
    def batch_size(self):
        return self._batch_size

[docs]class NeighborSampler(NodeFlowSampler): r'''Create a sampler that samples neighborhood. It returns a generator of :class:`~dgl.NodeFlow`. This can be viewed as an analogy of *mini-batch training* on graph data -- the given graph represents the whole dataset and the returned generator produces mini-batches (in the form of :class:`~dgl.NodeFlow` objects). A NodeFlow grows from sampled nodes. It first samples a set of nodes from the given ``seed_nodes`` (or all the nodes if not given), then samples their neighbors and extracts the subgraph. If the number of hops is :math:`k(>1)`, the process is repeated recursively, with the neighbor nodes just sampled become the new seed nodes. The result is a graph we defined as :class:`~dgl.NodeFlow` that contains :math:`k+1` layers. The last layer is the initial seed nodes. The sampled neighbor nodes in layer :math:`i+1` are in layer :math:`i`. All the edges are from nodes in layer :math:`i` to layer :math:`i+1`. TODO(minjie): give a figure here. As an analogy to mini-batch training, the ``batch_size`` here is equal to the number of the initial seed nodes (number of nodes in the last layer). The number of nodeflow objects (the number of batches) is calculated by ``len(seed_nodes) // batch_size`` (if ``seed_nodes`` is None, then it is equal to the set of all nodes in the graph). Note: NeighborSampler currently only supprts immutable graphs. Parameters ---------- g : DGLGraph The DGLGraph where we sample NodeFlows. batch_size : int The batch size (i.e, the number of nodes in the last layer) expand_factor : int, float, str The number of neighbors sampled from the neighbor list of a vertex. The value of this parameter can be: * int: indicates the number of neighbors sampled from a neighbor list. * float: indicates the ratio of the sampled neighbors in a neighbor list. * str: indicates some common ways of calculating the number of sampled neighbors, e.g., ``sqrt(deg)``. Note that no matter how large the expand_factor, the max number of sampled neighbors is the neighborhood size. num_hops : int, optional The number of hops to sample (i.e, the number of layers in the NodeFlow). Default: 1 neighbor_type: str, optional Indicates the neighbors on different types of edges. * "in": the neighbors on the in-edges. * "out": the neighbors on the out-edges. Default: "in" transition_prob : str, optional A 1D tensor containing the (unnormalized) transition probability. The probability of a node v being sampled from a neighbor u is proportional to the edge weight, normalized by the sum over edge weights grouping by the destination node. In other words, given a node v, the probability of node u and edge (u, v) included in the NodeFlow layer preceding that of v is given by: .. math:: p(u, v) = \frac{w_{u, v}}{\sum_{u', (u', v) \in E} w_{u', v}} If neighbor type is "out", then the probability is instead normalized by the sum grouping by source node: .. math:: p(v, u) = \frac{w_{v, u}}{\sum_{u', (v, u') \in E} w_{v, u'}} If a str is given, the edge weight will be loaded from the edge feature column with the same name. The feature column must be a scalar column in this case. Default: None seed_nodes : Tensor, optional A 1D tensor list of nodes where we sample NodeFlows from. If None, the seed vertices are all the vertices in the graph. Default: None shuffle : bool, optional Indicates the sampled NodeFlows are shuffled. Default: False num_workers : int, optional The number of worker threads that sample NodeFlows in parallel. Default: 1 prefetch : bool, optional If true, prefetch the samples in the next batch. Default: False add_self_loop : bool, optional If true, add self loop to the sampled NodeFlow. The edge IDs of the self loop edges are -1. Default: False ''' immutable_only = True def __init__( self, g, batch_size, expand_factor=None, num_hops=1, neighbor_type='in', transition_prob=None, seed_nodes=None, shuffle=False, num_workers=1, prefetch=False, add_self_loop=False): super(NeighborSampler, self).__init__( g, batch_size, seed_nodes, shuffle, num_workers * 2 if prefetch else 0, ThreadPrefetchingWrapper) assert g.is_readonly, "NeighborSampler doesn't support mutable graphs. " + \ "Please turn it into an immutable graph with DGLGraph.readonly" assert isinstance(expand_factor, Integral), 'non-int expand_factor not supported' self._expand_factor = int(expand_factor) self._num_hops = int(num_hops) self._add_self_loop = add_self_loop self._num_workers = int(num_workers) self._neighbor_type = neighbor_type self._transition_prob = transition_prob def fetch(self, current_nodeflow_index): if self._transition_prob is None: prob = F.tensor([], F.float32) elif isinstance(self._transition_prob, str): prob = self.g.edata[self._transition_prob] else: prob = self._transition_prob nfobjs = _CAPI_NeighborSampling( self.g._graph, self.seed_nodes.todgltensor(), current_nodeflow_index, # start batch id self.batch_size, # batch size self._num_workers, # num batches self._expand_factor, self._num_hops, self._neighbor_type, self._add_self_loop, F.zerocopy_to_dgl_ndarray(prob)) nflows = [NodeFlow(self.g, obj) for obj in nfobjs] return nflows
[docs]class LayerSampler(NodeFlowSampler): '''Create a sampler that samples neighborhood. This creates a NodeFlow loader that samples subgraphs from the input graph with layer-wise sampling. This sampling method is implemented in C and can perform sampling very efficiently. The NodeFlow loader returns a list of NodeFlows. The size of the NodeFlow list is the number of workers. Note: LayerSampler currently only supprts immutable graphs. Parameters ---------- g : DGLGraph The DGLGraph where we sample NodeFlows. batch_size : int The batch size (i.e, the number of nodes in the last layer) layer_size: int A list of layer sizes. neighbor_type: str, optional Indicates the neighbors on different types of edges. * "in": the neighbors on the in-edges. * "out": the neighbors on the out-edges. Default: "in" node_prob : Tensor, optional A 1D tensor for the probability that a neighbor node is sampled. None means uniform sampling. Otherwise, the number of elements should be equal to the number of vertices in the graph. It's not implemented. Default: None seed_nodes : Tensor, optional A 1D tensor list of nodes where we sample NodeFlows from. If None, the seed vertices are all the vertices in the graph. Default: None shuffle : bool, optional Indicates the sampled NodeFlows are shuffled. Default: False num_workers : int, optional The number of worker threads that sample NodeFlows in parallel. Default: 1 prefetch : bool, optional If true, prefetch the samples in the next batch. Default: False ''' immutable_only = True def __init__( self, g, batch_size, layer_sizes, neighbor_type='in', node_prob=None, seed_nodes=None, shuffle=False, num_workers=1, prefetch=False): super(LayerSampler, self).__init__( g, batch_size, seed_nodes, shuffle, num_workers * 2 if prefetch else 0, ThreadPrefetchingWrapper) assert g.is_readonly, "LayerSampler doesn't support mutable graphs. " + \ "Please turn it into an immutable graph with DGLGraph.readonly" assert node_prob is None, 'non-uniform node probability not supported' self._num_workers = int(num_workers) self._neighbor_type = neighbor_type self._layer_sizes = utils.toindex(layer_sizes) def fetch(self, current_nodeflow_index): nfobjs = _CAPI_LayerSampling( self.g._graph, self.seed_nodes.todgltensor(), current_nodeflow_index, # start batch id self.batch_size, # batch size self._num_workers, # num batches self._layer_sizes.todgltensor(), self._neighbor_type) nflows = [NodeFlow(self.g, obj) for obj in nfobjs] return nflows
class EdgeSampler(object): '''Edge sampler for link prediction. This samples edges from a given graph. The edges sampled for a batch are placed in a subgraph before returning. In many link prediction tasks, negative edges are required to train a model. A negative edge is constructed by corrupting an existing edge in the graph. The current implementation support two ways of corrupting an edge: corrupt the head node of an edge (by randomly selecting a node as the head node), or corrupt the tail node of an edge. When we corrupt the head node of an edge, we randomly sample a node from the entire graph as the head node. It's possible the constructed edge exists in the graph. By default, the implementation doesn't explicitly check if the sampled negative edge exists in a graph. However, a user can exclude positive edges from negative edges by specifying 'exclude_positive=True'. When negative edges are created, a batch of negative edges are also placed in a subgraph. Currently, negative_mode only supports only 'head' and 'tail'. If negative_mode=='head', the negative edges are generated by corrupting head nodes; otherwise, the tail nodes are corrupted. Parameters ---------- g : DGLGraph The DGLGraph where we sample edges. batch_size : int The batch size (i.e, the number of edges from the graph) seed_edges : tensor A list of edges where we sample from. shuffle : bool whether randomly shuffle the list of edges where we sample from. num_workers : int The number of workers to sample edges in parallel. prefetch : bool, optional If true, prefetch the samples in the next batch. Default: False negative_mode : string The method used to construct negative edges. Possible values are 'head', 'tail'. neg_sample_size : int The number of negative edges to sample for each edge. exclude_positive : int Whether to exclude positive edges from the negative edges. Class properties ---------------- immutable_only : bool Whether the sampler only works on immutable graphs. Subclasses can override this property. ''' immutable_only = False def __init__( self, g, batch_size, seed_edges=None, shuffle=False, num_workers=1, prefetch=False, negative_mode="", neg_sample_size=0, exclude_positive=False): self._g = g if self.immutable_only and not g._graph.is_readonly(): raise NotImplementedError("This loader only support read-only graphs.") self._batch_size = int(batch_size) if seed_edges is None: self._seed_edges = F.arange(0, g.number_of_edges()) else: self._seed_edges = seed_edges if shuffle: self._seed_edges = F.rand_shuffle(self._seed_edges) self._seed_edges = utils.toindex(self._seed_edges) if prefetch: self._prefetching_wrapper_class = ThreadPrefetchingWrapper self._num_prefetch = num_workers * 2 if prefetch else 0 self._num_workers = int(num_workers) self._negative_mode = negative_mode self._neg_sample_size = neg_sample_size self._exclude_positive = exclude_positive def fetch(self, current_index): ''' It returns a list of subgraphs if it only samples positive edges. It returns a list of subgraph pairs if it samples both positive edges and negative edges. Parameters ---------- current_index : int How many batches the sampler has generated so far. Returns ------- list[GraphIndex] or list[(GraphIndex, GraphIndex)] Next "bunch" of edges to be processed. ''' subgs = _CAPI_UniformEdgeSampling( self.g._graph, self.seed_edges.todgltensor(), current_index, # start batch id self.batch_size, # batch size self._num_workers, # num batches self._negative_mode, self._neg_sample_size, self._exclude_positive) if len(subgs) == 0: return [] if self._negative_mode == "": # If no negative subgraphs. return [subgraph.DGLSubGraph(self.g, subg) for subg in subgs] else: rets = [] assert self._num_workers * 2 == len(subgs) for i in range(self._num_workers): pos_subg = subgraph.DGLSubGraph(self.g, subgs[i]) neg_subg = subgraph.DGLSubGraph(self.g, subgs[i + self._num_workers]) rets.append((pos_subg, neg_subg)) return rets def __iter__(self): it = SamplerIter(self) if self._num_prefetch: return self._prefetching_wrapper_class(it, self._num_prefetch) else: return it @property def g(self): return self._g @property def seed_edges(self): return self._seed_edges @property def batch_size(self): return self._batch_size def create_full_nodeflow(g, num_layers, add_self_loop=False): """Convert a full graph to NodeFlow to run a L-layer GNN model. Parameters ---------- g : DGLGraph a DGL graph num_layers : int The number of layers add_self_loop : bool, default False Whether to add self loop to the sampled NodeFlow. If True, the edge IDs of the self loop edges are -1. Returns ------- NodeFlow a NodeFlow with a specified number of layers. """ batch_size = g.number_of_nodes() expand_factor = g.number_of_nodes() sampler = NeighborSampler(g, batch_size, expand_factor, num_layers, add_self_loop=add_self_loop) return next(iter(sampler)) _init_api('dgl.sampling', __name__)