Source code for dgl.contrib.sampling.sampler

"""This file contains NodeFlow samplers."""

import sys
import numpy as np
import threading
import random
import traceback

from ..._ffi.function import _init_api
from ... import utils
from ...nodeflow import NodeFlow
from ... import backend as F
from ...utils import unwrap_to_ptr_list

try:
    import Queue as queue
except ImportError:
    import queue

__all__ = ['NeighborSampler', 'LayerSampler']

class SampledSubgraphLoader(object):
    def __init__(self, g, batch_size, sampler,
                 expand_factor=None, num_hops=1, layer_sizes=None,
                 neighbor_type='in', node_prob=None, seed_nodes=None,
                 shuffle=False, num_workers=1, add_self_loop=False):
        self._g = g
        if not g._graph.is_readonly():
            raise NotImplementedError("NodeFlow loader only support read-only graphs.")
        self._batch_size = batch_size
        self._sampler = sampler
        if sampler == 'neighbor':
            self._expand_factor = expand_factor
            self._num_hops = num_hops
        elif sampler == 'layer':
            self._layer_sizes = utils.toindex(layer_sizes)
        else:
            raise NotImplementedError('Invalid sampler option: "%s"' % sampler)
        self._node_prob = node_prob
        if node_prob is not None:
            raise NotImplementedError('Non-uniform sampling is currently not supported.')
        self._add_self_loop = add_self_loop
        if self._node_prob is not None:
            assert self._node_prob.shape[0] == g.number_of_nodes(), \
                    "We need to know the sampling probability of every node"
        if seed_nodes is None:
            self._seed_nodes = F.arange(0, g.number_of_nodes())
        else:
            self._seed_nodes = seed_nodes
        if shuffle:
            self._seed_nodes = F.rand_shuffle(self._seed_nodes)
        self._seed_nodes = utils.toindex(self._seed_nodes)
        self._num_workers = num_workers
        self._neighbor_type = neighbor_type
        self._nflows = []
        self._seed_ids = []
        self._nflow_idx = 0

    def _prefetch(self):
        if self._sampler == 'neighbor':
            handles = unwrap_to_ptr_list(_CAPI_UniformSampling(
                self._g._graph._handle,
                self._seed_nodes.todgltensor(),
                int(self._nflow_idx),    # start batch id
                int(self._batch_size),   # batch size
                int(self._num_workers),  # num batches
                int(self._expand_factor),
                int(self._num_hops),
                self._neighbor_type,
                self._add_self_loop))
        elif self._sampler == 'layer':
            handles = unwrap_to_ptr_list(_CAPI_LayerSampling(
                self._g._graph._handle,
                self._seed_nodes.todgltensor(),
                int(self._nflow_idx),    # start batch id
                int(self._batch_size),   # batch size
                int(self._num_workers),  # num batches
                self._layer_sizes.todgltensor(),
                self._neighbor_type))
        else:
            raise NotImplementedError('Invalid sampler option: "%s"' % self._sampler)
        nflows = [NodeFlow(self._g, hdl) for hdl in handles]
        self._nflows.extend(nflows)
        self._nflow_idx += len(nflows)

    def __iter__(self):
        return self

    def __next__(self):
        # If we don't have prefetched NodeFlows, let's prefetch them.
        if len(self._nflows) == 0:
            self._prefetch()
        # At this point, if we still don't have NodeFlows, we must have
        # iterate all NodeFlows and we should stop the iterator now.
        if len(self._nflows) == 0:
            raise StopIteration
        return self._nflows.pop(0)

class _Prefetcher(object):
    """Internal shared prefetcher logic. It can be sub-classed by a Thread-based implementation
    or Process-based implementation."""
    _dataq = None  # Data queue transmits prefetched elements
    _controlq = None  # Control queue to instruct thread / process shutdown
    _errorq = None  # Error queue to transmit exceptions from worker to master

    _checked_start = False  # True once startup has been checkd by _check_start

    def __init__(self, loader, num_prefetch):
        super(_Prefetcher, self).__init__()
        self.loader = loader
        assert num_prefetch > 0, 'Unbounded Prefetcher is unsupported.'
        self.num_prefetch = num_prefetch

    def run(self):
        """Method representing the process’s activity."""
        # Startup - Master waits for this
        try:
            loader_iter = iter(self.loader)
            self._errorq.put(None)
        except Exception as e:  # pylint: disable=broad-except
            tb = traceback.format_exc()
            self._errorq.put((e, tb))

        while True:
            try:  # Check control queue
                c = self._controlq.get(False)
                if c is None:
                    break
                else:
                    raise RuntimeError('Got unexpected control code {}'.format(repr(c)))
            except queue.Empty:
                pass
            except RuntimeError as e:
                tb = traceback.format_exc()
                self._errorq.put((e, tb))
                self._dataq.put(None)

            try:
                data = next(loader_iter)
                error = None
            except Exception as e:  # pylint: disable=broad-except
                tb = traceback.format_exc()
                error = (e, tb)
                data = None
            finally:
                self._errorq.put(error)
                self._dataq.put(data)

    def __next__(self):
        next_item = self._dataq.get()
        next_error = self._errorq.get()

        if next_error is None:
            return next_item
        else:
            self._controlq.put(None)
            if isinstance(next_error[0], StopIteration):
                raise StopIteration
            else:
                return self._reraise(*next_error)

    def _reraise(self, e, tb):
        print('Reraising exception from Prefetcher', file=sys.stderr)
        print(tb, file=sys.stderr)
        raise e

    def _check_start(self):
        assert not self._checked_start
        self._checked_start = True
        next_error = self._errorq.get(block=True)
        if next_error is not None:
            self._reraise(*next_error)

    def next(self):
        return self.__next__()


class _ThreadPrefetcher(_Prefetcher, threading.Thread):
    """Internal threaded prefetcher."""

    def __init__(self, *args, **kwargs):
        super(_ThreadPrefetcher, self).__init__(*args, **kwargs)
        self._dataq = queue.Queue(self.num_prefetch)
        self._controlq = queue.Queue()
        self._errorq = queue.Queue(self.num_prefetch)
        self.daemon = True
        self.start()
        self._check_start()

class _PrefetchingLoader(object):
    """Prefetcher for a Loader in a separate Thread or Process.
    This iterator will create another thread or process to perform
    ``iter_next`` and then store the data in memory. It potentially accelerates
    the data read, at the cost of more memory usage.

    Parameters
    ----------
    loader : an iterator
        Source loader.
    num_prefetch : int, default 1
        Number of elements to prefetch from the loader. Must be greater 0.
    """

    def __init__(self, loader, num_prefetch=1):
        self._loader = loader
        self._num_prefetch = num_prefetch
        if num_prefetch < 1:
            raise ValueError('num_prefetch must be greater 0.')

    def __iter__(self):
        return _ThreadPrefetcher(self._loader, self._num_prefetch)

[docs]def NeighborSampler(g, batch_size, expand_factor, num_hops=1, neighbor_type='in', node_prob=None, seed_nodes=None, shuffle=False, num_workers=1, prefetch=False, add_self_loop=False): '''Create a sampler that samples neighborhood. It returns a generator of :class:`~dgl.NodeFlow`. This can be viewed as an analogy of *mini-batch training* on graph data -- the given graph represents the whole dataset and the returned generator produces mini-batches (in the form of :class:`~dgl.NodeFlow` objects). A NodeFlow grows from sampled nodes. It first samples a set of nodes from the given ``seed_nodes`` (or all the nodes if not given), then samples their neighbors and extracts the subgraph. If the number of hops is :math:`k(>1)`, the process is repeated recursively, with the neighbor nodes just sampled become the new seed nodes. The result is a graph we defined as :class:`~dgl.NodeFlow` that contains :math:`k+1` layers. The last layer is the initial seed nodes. The sampled neighbor nodes in layer :math:`i+1` are in layer :math:`i`. All the edges are from nodes in layer :math:`i` to layer :math:`i+1`. TODO(minjie): give a figure here. As an analogy to mini-batch training, the ``batch_size`` here is equal to the number of the initial seed nodes (number of nodes in the last layer). The number of nodeflow objects (the number of batches) is calculated by ``len(seed_nodes) // batch_size`` (if ``seed_nodes`` is None, then it is equal to the set of all nodes in the graph). Parameters ---------- g : DGLGraph The DGLGraph where we sample NodeFlows. batch_size : int The batch size (i.e, the number of nodes in the last layer) expand_factor : int, float, str The number of neighbors sampled from the neighbor list of a vertex. The value of this parameter can be: * int: indicates the number of neighbors sampled from a neighbor list. * float: indicates the ratio of the sampled neighbors in a neighbor list. * str: indicates some common ways of calculating the number of sampled neighbors, e.g., ``sqrt(deg)``. Note that no matter how large the expand_factor, the max number of sampled neighbors is the neighborhood size. num_hops : int, optional The number of hops to sample (i.e, the number of layers in the NodeFlow). Default: 1 neighbor_type: str, optional Indicates the neighbors on different types of edges. * "in": the neighbors on the in-edges. * "out": the neighbors on the out-edges. * "both": the neighbors on both types of edges. Default: "in" node_prob : Tensor, optional A 1D tensor for the probability that a neighbor node is sampled. None means uniform sampling. Otherwise, the number of elements should be equal to the number of vertices in the graph. Default: None seed_nodes : Tensor, optional A 1D tensor list of nodes where we sample NodeFlows from. If None, the seed vertices are all the vertices in the graph. Default: None shuffle : bool, optional Indicates the sampled NodeFlows are shuffled. Default: False num_workers : int, optional The number of worker threads that sample NodeFlows in parallel. Default: 1 prefetch : bool, optional If true, prefetch the samples in the next batch. Default: False add_self_loop : bool, optional If true, add self loop to the sampled NodeFlow. The edge IDs of the self loop edges are -1. Default: False Returns ------- generator The generator of NodeFlows. ''' loader = SampledSubgraphLoader(g, batch_size, 'neighbor', expand_factor=expand_factor, num_hops=num_hops, neighbor_type=neighbor_type, node_prob=node_prob, seed_nodes=seed_nodes, shuffle=shuffle, num_workers=num_workers, add_self_loop=add_self_loop) if not prefetch: return loader else: return _PrefetchingLoader(loader, num_prefetch=num_workers*2)
def LayerSampler(g, batch_size, layer_sizes, neighbor_type='in', node_prob=None, seed_nodes=None, shuffle=False, num_workers=1, prefetch=False): '''Create a sampler that samples neighborhood. This creates a NodeFlow loader that samples subgraphs from the input graph with layer-wise sampling. This sampling method is implemented in C and can perform sampling very efficiently. The NodeFlow loader returns a list of NodeFlows. The size of the NodeFlow list is the number of workers. Parameters ---------- g: the DGLGraph where we sample NodeFlows. batch_size: The number of NodeFlows in a batch. layer_size: A list of layer sizes. node_prob: the probability that a neighbor node is sampled. Not implemented. seed_nodes: a list of nodes where we sample NodeFlows from. If it's None, the seed vertices are all vertices in the graph. shuffle: indicates the sampled NodeFlows are shuffled. num_workers: the number of worker threads that sample NodeFlows in parallel. prefetch : bool, default False Whether to prefetch the samples in the next batch. Returns ------- A NodeFlow iterator The iterator returns a list of batched NodeFlows. ''' loader = SampledSubgraphLoader(g, batch_size, 'layer', layer_sizes=layer_sizes, neighbor_type=neighbor_type, node_prob=node_prob, seed_nodes=seed_nodes, shuffle=shuffle, num_workers=num_workers) if not prefetch: return loader else: return _PrefetchingLoader(loader, num_prefetch=num_workers*2) def create_full_nodeflow(g, num_layers, add_self_loop=False): """Convert a full graph to NodeFlow to run a L-layer GNN model. Parameters ---------- g : DGLGraph a DGL graph num_layers : int The number of layers add_self_loop : bool, default False Whether to add self loop to the sampled NodeFlow. If True, the edge IDs of the self loop edges are -1. Returns ------- NodeFlow a NodeFlow with a specified number of layers. """ batch_size = g.number_of_nodes() expand_factor = g.number_of_nodes() sampler = NeighborSampler(g, batch_size, expand_factor, num_layers, add_self_loop=add_self_loop) return next(sampler) _init_api('dgl.sampling', __name__)