Source code for dgl.dataloading.async_transferer

"""API for transferring data to the GPU over second stream."""

from .. import backend as F
from .. import ndarray
from .. import utils
from .._ffi.function import _init_api

class Transfer(object):
    """ Class for representing an asynchronous transfer. """
    def __init__(self, transfer_id, handle):
        """ Create a new Transfer object.

        Parameters
        ----------
        transfer_id : int
            The id of the asynchronous transfer.
        handle : DGLAsyncTransferer
            The handle of the DGLAsyncTransferer object that initiated the
            transfer.
        """

        self._transfer_id = transfer_id
        self._handle = handle

    def wait(self):
        """ Wait for this transfer to finish, and return the result.

        Returns
        -------
        Tensor
            The new tensor on the target context.
        """
        res_tensor = _CAPI_DGLAsyncTransfererWait(self._handle, self._transfer_id)
        return F.zerocopy_from_dgl_ndarray(res_tensor)



[docs]class AsyncTransferer(object): """ Class for initiating asynchronous copies to the GPU on a second GPU stream. To initiate a transfer to a GPU: >>> tensor_cpu = torch.ones(100000).pin_memory() >>> transferer = dgl.dataloading.AsyncTransferer(torch.device(0)) >>> future = transferer.async_copy(tensor_cpu, torch.device(0)) And then to wait for the transfer to finish and get a copy of the tensor on the GPU. >>> tensor_gpu = future.wait() """
[docs] def __init__(self, device): """ Create a new AsyncTransferer object. Parameters ---------- device : Device or context object. The context in which the second stream will be created. Must be a GPU context for the copy to be asynchronous. """ if isinstance(device, ndarray.DGLContext): ctx = device else: ctx = utils.to_dgl_context(device) self._handle = _CAPI_DGLAsyncTransfererCreate(ctx)
[docs] def async_copy(self, tensor, device): """ Initiate an asynchronous copy on the internal stream. For this call to be asynchronous, the context the AsyncTranserer is created with must be a GPU context, and the input tensor must be in pinned memory. Currently, only transfers to the GPU are supported. Parameters ---------- tensor : Tensor The tensor to transfer. device : Device or context object. The context to transfer to. Returns ------- Transfer A Transfer object that can be waited on to get the tensor in the new context. """ if isinstance(device, ndarray.DGLContext): ctx = device else: ctx = utils.to_dgl_context(device) if ctx.device_type != ndarray.DGLContext.STR2MASK["gpu"]: raise ValueError("'device' must be a GPU device.") tensor = F.zerocopy_to_dgl_ndarray(tensor) transfer_id = _CAPI_DGLAsyncTransfererStartTransfer(self._handle, tensor, ctx) return Transfer(transfer_id, self._handle)
_init_api("dgl.dataloading.async_transferer")