Source code for dgl.nodeflow

"""Class for NodeFlow data structure."""
from __future__ import absolute_import

from ._ffi.object import register_object, ObjectBase
from ._ffi.function import _init_api
from .base import ALL, is_all, DGLError
from . import backend as F
from .frame import Frame, FrameRef
from .graph import DGLBaseGraph
from .graph_index import transform_ids
from .runtime import ir, scheduler, Runtime
from . import utils
from .view import LayerView, BlockView

__all__ = ['NodeFlow']

@register_object('graph.NodeFlow')
class NodeFlowObject(ObjectBase):
    """NodeFlow object"""

    @property
    def graph(self):
        """The graph structure of this nodeflow.

        Returns
        -------
        GraphIndex
        """
        return _CAPI_NodeFlowGetGraph(self)

    @property
    def layer_offsets(self):
        """The offsets of each layer.

        Returns
        -------
        NDArray
        """
        return _CAPI_NodeFlowGetLayerOffsets(self)

    @property
    def block_offsets(self):
        """The offsets of each block.

        Returns
        -------
        NDArray
        """
        return _CAPI_NodeFlowGetBlockOffsets(self)

    @property
    def node_mapping(self):
        """Mapping array from nodeflow node id to parent graph

        Returns
        -------
        NDArray
        """
        return _CAPI_NodeFlowGetNodeMapping(self)

    @property
    def edge_mapping(self):
        """Mapping array from nodeflow edge id to parent graph

        Returns
        -------
        NDArray
        """
        return _CAPI_NodeFlowGetEdgeMapping(self)

[docs]class NodeFlow(DGLBaseGraph): """The NodeFlow class stores the sampling results of Neighbor sampling and Layer-wise sampling. These sampling algorithms generate graphs with multiple layers. The edges connect the nodes between two layers while there don't exist edges between the nodes in the same layer. We store multiple layers of the sampling results in a single graph. We store extra information, such as the node and edge mapping from the NodeFlow graph to the parent graph. DO NOT create NodeFlow object directly. Use sampling method to generate NodeFlow instead. Parameters ---------- parent : DGLGraph The parent graph. nfobj : NodeFlowObject The nodeflow object """ def __init__(self, parent, nfobj): super(NodeFlow, self).__init__(nfobj.graph) self._parent = parent self._node_mapping = utils.toindex(nfobj.node_mapping) self._edge_mapping = utils.toindex(nfobj.edge_mapping) self._layer_offsets = utils.toindex(nfobj.layer_offsets).tonumpy() self._block_offsets = utils.toindex(nfobj.block_offsets).tonumpy() # node/edge frames self._node_frames = [FrameRef(Frame(num_rows=self.layer_size(i))) \ for i in range(self.num_layers)] self._edge_frames = [FrameRef(Frame(num_rows=self.block_size(i))) \ for i in range(self.num_blocks)] # registered functions self._message_funcs = [None] * self.num_blocks self._reduce_funcs = [None] * self.num_blocks self._apply_node_funcs = [None] * self.num_blocks self._apply_edge_funcs = [None] * self.num_blocks def _get_layer_id(self, layer_id): """The layer Id might be negative. We need to convert it to the actual layer Id. """ if layer_id >= 0: return layer_id else: return self.num_layers + layer_id def _get_block_id(self, block_id): """The block Id might be negative. We need to convert it to the actual block Id. """ if block_id >= 0: return block_id else: return self.num_blocks + block_id def _get_node_frame(self, layer_id): return self._node_frames[layer_id] def _get_edge_frame(self, block_id): return self._edge_frames[block_id] @property def num_layers(self): """Get the number of layers. Returns ------- int the number of layers """ return len(self._layer_offsets) - 1 @property def num_blocks(self): """Get the number of blocks. Returns ------- int the number of blocks """ return self.num_layers - 1 @property def layers(self): """Return a LayerView of this NodeFlow. This is mainly for usage like: * `g.layers[2].data['h']` to get the node features of layer#2. """ return LayerView(self) @property def blocks(self): """Return a BlockView of this NodeFlow. This is mainly for usage like: * `g.blocks[1].data['h']` to get the edge features of blocks from layer#1 to layer#2. """ return BlockView(self)
[docs] def node_attr_schemes(self, layer_id): """Return the node feature schemes. Each feature scheme is a named tuple that stores the shape and data type of the node feature Parameters ---------- layer_id : int the specified layer to get node data scheme. Returns ------- dict of str to schemes The schemes of node feature columns. """ layer_id = self._get_layer_id(layer_id) return self._node_frames[layer_id].schemes
[docs] def edge_attr_schemes(self, block_id): """Return the edge feature schemes. Each feature scheme is a named tuple that stores the shape and data type of the node feature Parameters ---------- block_id : int the specified block to get edge data scheme. Returns ------- dict of str to schemes The schemes of edge feature columns. """ block_id = self._get_block_id(block_id) return self._edge_frames[block_id].schemes
[docs] def layer_size(self, layer_id): """Return the number of nodes in a specified layer. Parameters ---------- layer_id : int the specified layer to return the number of nodes. """ layer_id = self._get_layer_id(layer_id) return int(self._layer_offsets[layer_id + 1]) - int(self._layer_offsets[layer_id])
[docs] def block_size(self, block_id): """Return the number of edges in a specified block. Parameters ---------- block_id : int the specified block to return the number of edges. """ block_id = self._get_block_id(block_id) return int(self._block_offsets[block_id + 1]) - int(self._block_offsets[block_id])
[docs] def copy_from_parent(self, node_embed_names=ALL, edge_embed_names=ALL, ctx=None): """Copy node/edge features from the parent graph. Parameters ---------- node_embed_names : a list of lists of strings, optional The names of embeddings in each layer. edge_embed_names : a list of lists of strings, optional The names of embeddings in each block. ctx : Context The device to copy tensor to. If None, features will stay at its original device """ if self._parent._node_frame.num_rows != 0 and self._parent._node_frame.num_columns != 0: if is_all(node_embed_names): for i in range(self.num_layers): nid = utils.toindex(self.layer_parent_nid(i)) self._node_frames[i] = FrameRef(Frame(_copy_frame( self._parent._node_frame[nid], ctx))) elif node_embed_names is not None: assert isinstance(node_embed_names, list) \ and len(node_embed_names) == self.num_layers, \ "The specified embedding names should be the same as the number of layers." for i in range(self.num_layers): nid = self.layer_parent_nid(i) self._node_frames[i] = _get_frame(self._parent._node_frame, node_embed_names[i], nid, ctx) if self._parent._edge_frame.num_rows != 0 and self._parent._edge_frame.num_columns != 0: if is_all(edge_embed_names): for i in range(self.num_blocks): eid = utils.toindex(self.block_parent_eid(i)) self._edge_frames[i] = FrameRef(Frame(_copy_frame( self._parent._edge_frame[eid], ctx))) elif edge_embed_names is not None: assert isinstance(edge_embed_names, list) \ and len(edge_embed_names) == self.num_blocks, \ "The specified embedding names should be the same as the number of flows." for i in range(self.num_blocks): eid = self.block_parent_eid(i) self._edge_frames[i] = _get_frame(self._parent._edge_frame, edge_embed_names[i], eid, ctx)
[docs] def copy_to_parent(self, node_embed_names=ALL, edge_embed_names=ALL): """Copy node/edge embeddings to the parent graph. Note: if a node in the parent graph appears in multiple layers and they in the NodeFlow has node data with the same name, the data of this node in the lower layer will overwrite the node data in previous layer. For example, node 5 in the parent graph appears in layer 0 and 1 and they have the same node data 'h'. The node data in layer 1 of this node will overwrite its data in layer 0 when copying the data back. To avoid this, users can give node data in each layer a different name. Parameters ---------- node_embed_names : a list of lists of strings, optional The names of embeddings in each layer. edge_embed_names : a list of lists of strings, optional The names of embeddings in each block. """ if self._parent._node_frame.num_rows != 0 and self._parent._node_frame.num_columns != 0: if is_all(node_embed_names): for i in range(self.num_layers): nid = utils.toindex(self.layer_parent_nid(i)) # We should write data back directly. self._parent._node_frame.update_rows(nid, self._node_frames[i], inplace=True) elif node_embed_names is not None: assert isinstance(node_embed_names, list) \ and len(node_embed_names) == self.num_layers, \ "The specified embedding names should be the same as the number of layers." for i in range(self.num_layers): nid = utils.toindex(self.layer_parent_nid(i)) _update_frame(self._parent._node_frame, node_embed_names[i], nid, self._node_frames[i]) if self._parent._edge_frame.num_rows != 0 and self._parent._edge_frame.num_columns != 0: if is_all(edge_embed_names): for i in range(self.num_blocks): eid = utils.toindex(self.block_parent_eid(i)) self._parent._edge_frame.update_rows(eid, self._edge_frames[i], inplace=True) elif edge_embed_names is not None: assert isinstance(edge_embed_names, list) \ and len(edge_embed_names) == self.num_blocks, \ "The specified embedding names should be the same as the number of flows." for i in range(self.num_blocks): eid = utils.toindex(self.block_parent_eid(i)) _update_frame(self._parent._edge_frame, edge_embed_names[i], eid, self._edge_frames[i])
[docs] def map_to_parent_nid(self, nid): """This maps the child node Ids to the parent Ids. Parameters ---------- nid : tensor The node ID array in the NodeFlow graph. Returns ------- Tensor The parent node id array. """ nid = utils.toindex(nid) # TODO(minjie): should not directly use [] return self._node_mapping.tousertensor()[nid.tousertensor()]
[docs] def map_to_parent_eid(self, eid): """This maps the child edge Ids to the parent Ids. Parameters ---------- nid : tensor The edge ID array in the NodeFlow graph. Returns ------- Tensor The parent edge id array. """ eid = utils.toindex(eid) # TODO(minjie): should not directly use [] return self._edge_mapping.tousertensor()[eid.tousertensor()]
[docs] def map_from_parent_nid(self, layer_id, parent_nids, remap_local=False): """Map parent node Ids to NodeFlow node Ids in a certain layer. If `remap_local` is True, it returns the node Ids local to the layer. Otherwise, the node Ids are unique in the NodeFlow. Parameters ---------- layer_id : int The layer Id. parent_nids: list or Tensor Node Ids in the parent graph. remap_local: boolean Remap layer/block-level local Id if True; otherwise, NodeFlow-level Id. Returns ------- Tensor Node Ids in the NodeFlow. """ layer_id = self._get_layer_id(layer_id) parent_nids = utils.toindex(parent_nids) layers = self._layer_offsets start = int(layers[layer_id]) end = int(layers[layer_id + 1]) # TODO(minjie): should not directly use [] mapping = self._node_mapping.tousertensor() mapping = mapping[start:end] mapping = utils.toindex(mapping) nflow_ids = transform_ids(mapping, parent_nids) if remap_local: return nflow_ids.tousertensor() else: return nflow_ids.tousertensor() + int(self._layer_offsets[layer_id])
[docs] def layer_in_degree(self, layer_id): """Return the in-degree of the nodes in the specified layer. Parameters ---------- layer_id : int The layer Id. Returns ------- Tensor The degree of the nodes in the specified layer. """ layer_id = self._get_layer_id(layer_id) return self._graph.in_degrees(utils.toindex(self.layer_nid(layer_id))).tousertensor()
[docs] def layer_out_degree(self, layer_id): """Return the out-degree of the nodes in the specified layer. Parameters ---------- layer_id : int The layer Id. Returns ------- Tensor The degree of the nodes in the specified layer. """ layer_id = self._get_layer_id(layer_id) return self._graph.out_degrees(utils.toindex(self.layer_nid(layer_id))).tousertensor()
[docs] def layer_nid(self, layer_id): """Get the node Ids in the specified layer. The returned node Ids are unique in the NodeFlow. Parameters ---------- layer_id : int The layer to get the node Ids. Returns ------- Tensor The node ids. """ layer_id = self._get_layer_id(layer_id) assert layer_id + 1 < len(self._layer_offsets) start = self._layer_offsets[layer_id] end = self._layer_offsets[layer_id + 1] return F.arange(int(start), int(end))
[docs] def layer_parent_nid(self, layer_id): """Get the node Ids of the parent graph in the specified layer layer_parent_nid(-1) returns seed vertices for this NodeFlow. Parameters ---------- layer_id : int The layer to get the node Ids. Returns ------- Tensor The parent node id array. """ layer_id = self._get_layer_id(layer_id) assert layer_id + 1 < len(self._layer_offsets) start = self._layer_offsets[layer_id] end = self._layer_offsets[layer_id + 1] # TODO(minjie): should not directly use [] return self._node_mapping.tousertensor()[start:end]
[docs] def block_eid(self, block_id): """Get the edge Ids in the specified block. The returned edge Ids are unique in the NodeFlow. Parameters ---------- block_id : int the specified block to get edge Ids. Returns ------- Tensor The edge ids of the block in the NodeFlow. """ block_id = self._get_block_id(block_id) start = self._block_offsets[block_id] end = self._block_offsets[block_id + 1] return F.arange(int(start), int(end))
[docs] def block_parent_eid(self, block_id): """Get the edge Ids of the parent graph in the specified block. Parameters ---------- block_id : int the specified block to get edge Ids. Returns ------- Tensor The edge ids of the block in the parent graph. """ block_id = self._get_block_id(block_id) start = self._block_offsets[block_id] end = self._block_offsets[block_id + 1] # TODO(minjie): should not directly use [] ret = self._edge_mapping.tousertensor()[start:end] # If `add_self_loop` is enabled, the returned parent eid can be -1. # We have to make sure this case doesn't happen. assert F.asnumpy(F.sum(ret == -1, 0)) == 0, "The eid in the parent graph is invalid." return ret
[docs] def block_edges(self, block_id, remap_local=False): """Return the edges in a block. If remap_local is True, returned indices u, v, eid will be remapped to local Ids (i.e. starting from 0) in the block or in the layer. Otherwise, u, v, eid are unique in the NodeFlow. Parameters ---------- block_id : int The specified block to return the edges. remap_local : boolean Remap layer/block-level local Id if True; otherwise, NodeFlow-level Id. Returns ------- Tensor The src nodes. Tensor The dst nodes. Tensor The edge ids. """ block_id = self._get_block_id(block_id) layer0_size = self._layer_offsets[block_id + 1] - self._layer_offsets[block_id] rst = _CAPI_NodeFlowGetBlockAdj(self._graph, "coo", int(layer0_size), int(self._layer_offsets[block_id + 1]), int(self._layer_offsets[block_id + 2]), remap_local) idx = utils.toindex(rst(0)).tousertensor() eid = utils.toindex(rst(1)) num_edges = int(len(idx) / 2) assert len(eid) == num_edges return idx[num_edges:len(idx)], idx[0:num_edges], eid.tousertensor()
[docs] def block_adjacency_matrix(self, block_id, ctx): """Return the adjacency matrix representation for a specific block in a NodeFlow. A row of the returned adjacency matrix represents the destination of an edge and the column represents the source. Parameters ---------- block_id : int The specified block to return the adjacency matrix. ctx : context The context of the returned matrix. Returns ------- SparseTensor The adjacency matrix. Tensor A index for data shuffling due to sparse format change. Return None if shuffle is not required. """ block_id = self._get_block_id(block_id) fmt = F.get_preferred_sparse_format() # We need to extract two layers. layer0_size = self._layer_offsets[block_id + 1] - self._layer_offsets[block_id] rst = _CAPI_NodeFlowGetBlockAdj(self._graph, fmt, int(layer0_size), int(self._layer_offsets[block_id + 1]), int(self._layer_offsets[block_id + 2]), True) num_rows = self.layer_size(block_id + 1) num_cols = self.layer_size(block_id) if fmt == "csr": indptr = F.copy_to(utils.toindex(rst(0)).tousertensor(), ctx) indices = F.copy_to(utils.toindex(rst(1)).tousertensor(), ctx) shuffle = utils.toindex(rst(2)) dat = F.ones(indices.shape, dtype=F.float32, ctx=ctx) return F.sparse_matrix(dat, ('csr', indices, indptr), (num_rows, num_cols))[0], shuffle.tousertensor() elif fmt == "coo": ## FIXME(minjie): data type idx = F.copy_to(utils.toindex(rst(0)).tousertensor(), ctx) m = self.block_size(block_id) idx = F.reshape(idx, (2, m)) dat = F.ones((m,), dtype=F.float32, ctx=ctx) adj, shuffle_idx = F.sparse_matrix(dat, ('coo', idx), (num_rows, num_cols)) return adj, shuffle_idx else: raise Exception("unknown format")
[docs] def block_incidence_matrix(self, block_id, typestr, ctx): """Return the incidence matrix representation of the block. An incidence matrix is an n x m sparse matrix, where n is the number of nodes and m is the number of edges. Each nnz value indicating whether the edge is incident to the node or not. There are two types of an incidence matrix `I`: * ``in``: - I[v, e] = 1 if e is the in-edge of v (or v is the dst node of e); - I[v, e] = 0 otherwise. * ``out``: - I[v, e] = 1 if e is the out-edge of v (or v is the src node of e); - I[v, e] = 0 otherwise. "both" isn't defined in the block of a NodeFlow. Parameters ---------- block_id : int The specified block to return the incidence matrix. typestr : str Can be either "in", "out" or "both" ctx : context The context of returned incidence matrix. Returns ------- SparseTensor The incidence matrix. Tensor A index for data shuffling due to sparse format change. Return None if shuffle is not required. """ block_id = self._get_block_id(block_id) src, dst, eid = self.block_edges(block_id, remap_local=True) src = F.copy_to(src, ctx) # the index of the ctx will be cached dst = F.copy_to(dst, ctx) # the index of the ctx will be cached eid = F.copy_to(eid, ctx) # the index of the ctx will be cached if typestr == 'in': n = self.layer_size(block_id + 1) m = self.block_size(block_id) row = F.unsqueeze(dst, 0) col = F.unsqueeze(eid, 0) idx = F.cat([row, col], dim=0) # FIXME(minjie): data type dat = F.ones((m,), dtype=F.float32, ctx=ctx) inc, shuffle_idx = F.sparse_matrix(dat, ('coo', idx), (n, m)) elif typestr == 'out': n = self.layer_size(block_id) m = self.block_size(block_id) row = F.unsqueeze(src, 0) col = F.unsqueeze(eid, 0) idx = F.cat([row, col], dim=0) # FIXME(minjie): data type dat = F.ones((m,), dtype=F.float32, ctx=ctx) inc, shuffle_idx = F.sparse_matrix(dat, ('coo', idx), (n, m)) else: raise DGLError('Invalid incidence matrix type: %s' % str(typestr)) return inc, shuffle_idx
[docs] def set_n_initializer(self, initializer, layer_id=ALL, field=None): """Set the initializer for empty node features. Initializer is a callable that returns a tensor given the shape, data type and device context. When a subset of the nodes are assigned a new feature, initializer is used to create feature for rest of the nodes. Parameters ---------- initializer : callable The initializer. layer_id : int the layer to set the initializer. field : str, optional The feature field name. Default is set an initializer for all the feature fields. """ if is_all(layer_id): for i in range(self.num_layers): self._node_frames[i].set_initializer(initializer, field) else: layer_id = self._get_layer_id(layer_id) self._node_frames[layer_id].set_initializer(initializer, field)
[docs] def set_e_initializer(self, initializer, block_id=ALL, field=None): """Set the initializer for empty edge features. Initializer is a callable that returns a tensor given the shape, data type and device context. When a subset of the edges are assigned a new feature, initializer is used to create feature for rest of the edges. Parameters ---------- initializer : callable The initializer. block_id : int the block to set the initializer. field : str, optional The feature field name. Default is set an initializer for all the feature fields. """ if is_all(block_id): for i in range(self.num_blocks): self._edge_frames[i].set_initializer(initializer, field) else: block_id = self._get_block_id(block_id) self._edge_frames[block_id].set_initializer(initializer, field)
[docs] def register_message_func(self, func, block_id=ALL): """Register global message function for a block. Once registered, ``func`` will be used as the default message function in message passing operations, including :func:`block_compute`, :func:`prop_flow`. Parameters ---------- func : callable Message function on the edge. The function should be an :mod:`Edge UDF <dgl.udf>`. block_id : int or ALL the block to register the message function. """ if is_all(block_id): self._message_funcs = [func] * self.num_blocks else: block_id = self._get_block_id(block_id) self._message_funcs[block_id] = func
[docs] def register_reduce_func(self, func, block_id=ALL): """Register global message reduce function for a block. Once registered, ``func`` will be used as the default message reduce function in message passing operations, including :func:`block_compute`, :func:`prop_flow`. Parameters ---------- func : callable Reduce function on the node. The function should be a :mod:`Node UDF <dgl.udf>`. block_id : int or ALL the block to register the reduce function. """ if is_all(block_id): self._reduce_funcs = [func] * self.num_blocks else: block_id = self._get_block_id(block_id) self._reduce_funcs[block_id] = func
[docs] def register_apply_node_func(self, func, block_id=ALL): """Register global node apply function for a block. Once registered, ``func`` will be used as the default apply node function. Related operations include :func:`apply_layer`, :func:`block_compute`, :func:`prop_flow`. Parameters ---------- func : callable Apply function on the nodes. The function should be a :mod:`Node UDF <dgl.udf>`. block_id : int or ALL the block to register the apply node function. """ if is_all(block_id): self._apply_node_funcs = [func] * self.num_blocks else: block_id = self._get_block_id(block_id) self._apply_node_funcs[block_id] = func
[docs] def register_apply_edge_func(self, func, block_id=ALL): """Register global edge apply function for a block. Once registered, ``func`` will be used as the default apply edge function in :func:`apply_block`. Parameters ---------- func : callable Apply function on the edge. The function should be an :mod:`Edge UDF <dgl.udf>`. block_id : int or ALL the block to register the apply edge function. """ if is_all(block_id): self._apply_edge_funcs = [func] * self.num_blocks else: block_id = self._get_block_id(block_id) self._apply_edge_funcs[block_id] = func
[docs] def apply_layer(self, layer_id, func="default", v=ALL, inplace=False): """Apply node update function on the node embeddings in the specified layer. Parameters ---------- layer_id : int The specified layer to update node embeddings. func : callable or None, optional Apply function on the nodes. The function should be a :mod:`Node UDF <dgl.udf>`. v : a list of vertex Ids or ALL. The vertex Ids (unique in the NodeFlow) to run the node update function. inplace : bool, optional If True, update will be done in place, but autograd will break. """ layer_id = self._get_layer_id(layer_id) if func == "default": func = self._apply_node_funcs[layer_id] if is_all(v): v = utils.toindex(slice(0, self.layer_size(layer_id))) else: v = v - int(self._layer_offsets[layer_id]) v = utils.toindex(v) with ir.prog() as prog: scheduler.schedule_nodeflow_apply_nodes(graph=self, layer_id=layer_id, v=v, apply_func=func, inplace=inplace) Runtime.run(prog)
[docs] def apply_block(self, block_id, func="default", edges=ALL, inplace=False): """Apply edge update function on the edge embeddings in the specified layer. Parameters ---------- block_id : int The specified block to update edge embeddings. func : callable or None, optional Apply function on the edges. The function should be an :mod:`Edge UDF <dgl.udf>`. edges : a list of edge Ids or ALL. The edges Id to run the edge update function. inplace : bool, optional If True, update will be done in place, but autograd will break. """ block_id = self._get_block_id(block_id) if func == "default": func = self._apply_edge_funcs[block_id] assert func is not None if is_all(edges): u, v, _ = self.block_edges(block_id, remap_local=True) u = utils.toindex(u) v = utils.toindex(v) eid = utils.toindex(slice(0, self.block_size(block_id))) elif isinstance(edges, tuple): u, v = edges # Rewrite u, v to handle edge broadcasting and multigraph. u, v, eid = self._graph.edge_ids(utils.toindex(u), utils.toindex(v)) u = utils.toindex(u.tousertensor() - int(self._layer_offsets[block_id])) v = utils.toindex(v.tousertensor() - int(self._layer_offsets[block_id + 1])) eid = utils.toindex(eid.tousertensor() - int(self._block_offsets[block_id])) else: eid = utils.toindex(edges) u, v, _ = self._graph.find_edges(eid) u = utils.toindex(u.tousertensor() - int(self._layer_offsets[block_id])) v = utils.toindex(v.tousertensor() - int(self._layer_offsets[block_id + 1])) eid = utils.toindex(edges - int(self._block_offsets[block_id])) with ir.prog() as prog: scheduler.schedule_nodeflow_apply_edges(graph=self, block_id=block_id, u=u, v=v, eid=eid, apply_func=func, inplace=inplace) Runtime.run(prog)
def _glb2lcl_nid(self, nid, layer_id): layer_id = self._get_layer_id(layer_id) return nid - int(self._layer_offsets[layer_id]) def _glb2lcl_eid(self, eid, block_id): block_id = self._get_block_id(block_id) return eid - int(self._block_offsets[block_id])
[docs] def block_compute(self, block_id, message_func="default", reduce_func="default", apply_node_func="default", v=ALL, inplace=False): """Perform the computation on the specified block. It's similar to `pull` in DGLGraph. On the given block i, it runs `pull` on nodes in layer i+1, which generates messages on edges in block i, runs the reduce function and node update function on nodes in layer i+1. Parameters ---------- block_id : int The block to run the computation. message_func : callable, optional Message function on the edges. The function should be an :mod:`Edge UDF <dgl.udf>`. reduce_func : callable, optional Reduce function on the node. The function should be a :mod:`Node UDF <dgl.udf>`. apply_node_func : callable, optional Apply function on the nodes. The function should be a :mod:`Node UDF <dgl.udf>`. v : a list of vertex Ids or ALL. The Node Ids (unique in the NodeFlow) in layer block_id+1 to run the computation. inplace: bool, optional If True, update will be done in place, but autograd will break. """ block_id = self._get_block_id(block_id) if message_func == "default": message_func = self._message_funcs[block_id] if reduce_func == "default": reduce_func = self._reduce_funcs[block_id] if apply_node_func == "default": apply_node_func = self._apply_node_funcs[block_id] assert message_func is not None assert reduce_func is not None if is_all(v): with ir.prog() as prog: scheduler.schedule_nodeflow_update_all(graph=self, block_id=block_id, message_func=message_func, reduce_func=reduce_func, apply_func=apply_node_func) Runtime.run(prog) else: dest_nodes = utils.toindex(v) u, v, eid = self._graph.in_edges(dest_nodes) assert len(u) > 0, "block_compute must run on edges" u = utils.toindex(self._glb2lcl_nid(u.tousertensor(), block_id)) v = utils.toindex(self._glb2lcl_nid(v.tousertensor(), block_id + 1)) dest_nodes = utils.toindex( self._glb2lcl_nid(dest_nodes.tousertensor(), block_id + 1)) eid = utils.toindex(self._glb2lcl_eid(eid.tousertensor(), block_id)) with ir.prog() as prog: scheduler.schedule_nodeflow_compute(graph=self, block_id=block_id, u=u, v=v, eid=eid, dest_nodes=dest_nodes, message_func=message_func, reduce_func=reduce_func, apply_func=apply_node_func, inplace=inplace) Runtime.run(prog)
[docs] def prop_flow(self, message_funcs="default", reduce_funcs="default", apply_node_funcs="default", flow_range=ALL, inplace=False): """Perform the computation on flows. By default, it runs on all blocks, one-by-one. On block i, it runs `pull` on nodes in layer i+1, which generates messages on edges in block i, runs the reduce function and node update function on nodes in layer i+1. Users can specify a list of message functions, reduce functions and node apply functions, one for each block. Thus, when a list is given, the length of the list should be the same as the number of blocks. Parameters ---------- message_funcs : a callable, a list of callable, optional Message functions on the edges. The function should be an :mod:`Edge UDF <dgl.udf>`. reduce_funcs : a callable, a list of callable, optional Reduce functions on the node. The function should be a :mod:`Node UDF <dgl.udf>`. apply_node_funcs : a callable, a list of callable, optional Apply functions on the nodes. The function should be a :mod:`Node UDF <dgl.udf>`. flow_range : int or a slice or ALL. The specified blocks to run the computation. inplace: bool, optional If True, update will be done in place, but autograd will break. """ if is_all(flow_range): flow_range = range(0, self.num_blocks) elif isinstance(flow_range, slice): if flow_range.step != 1: raise DGLError("We can't propogate flows and skip some of them") flow_range = range(flow_range.start, flow_range.stop) else: raise DGLError("unknown flow range") for i in flow_range: if message_funcs == "default": message_func = self._message_funcs[i] elif isinstance(message_funcs, list): message_func = message_funcs[i] else: message_func = message_funcs if reduce_funcs == "default": reduce_func = self._reduce_funcs[i] elif isinstance(reduce_funcs, list): reduce_func = reduce_funcs[i] else: reduce_func = reduce_funcs if apply_node_funcs == "default": apply_node_func = self._apply_node_funcs[i] elif isinstance(apply_node_funcs, list): apply_node_func = apply_node_funcs[i] else: apply_node_func = apply_node_funcs self.block_compute(i, message_func, reduce_func, apply_node_func, inplace=inplace)
def _copy_to_like(arr1, arr2): return F.copy_to(arr1, F.context(arr2)) def _get_frame(frame, names, ids, ctx): col_dict = {} for name in names: col = frame[name][_copy_to_like(ids, frame[name])] if ctx: col = F.copy_to(col, ctx) col_dict[name] = col if len(col_dict) == 0: return FrameRef(Frame(num_rows=len(ids))) else: return FrameRef(Frame(col_dict)) def _copy_frame(frame, ctx): new_frame = {} for name in frame: new_frame[name] = F.copy_to(frame[name], ctx) if ctx else frame[name] return new_frame def _update_frame(frame, names, ids, new_frame): col_dict = {name: new_frame[name] for name in names} if len(col_dict) > 0: frame.update_rows(ids, FrameRef(Frame(col_dict)), inplace=True) _init_api("dgl.nodeflow", __name__)