"""Define graph partition book."""
import numpy as np
from .. import backend as F
from ..base import NID, EID
from .. import utils
from .shared_mem_utils import _to_shared_mem, _get_ndata_path, _get_edata_path, DTYPE_DICT
from .._ffi.ndarray import empty_shared_mem
from ..ndarray import exist_shared_mem_array
def _move_metadata_to_shared_mem(graph_name, num_nodes, num_edges, part_id,
num_partitions, node_map, edge_map, is_range_part):
''' Move all metadata of the partition book to the shared memory.
We need these metadata to construct graph partition book.
'''
meta = _to_shared_mem(F.tensor([int(is_range_part), num_nodes, num_edges,
num_partitions, part_id]),
_get_ndata_path(graph_name, 'meta'))
node_map = _to_shared_mem(node_map, _get_ndata_path(graph_name, 'node_map'))
edge_map = _to_shared_mem(edge_map, _get_edata_path(graph_name, 'edge_map'))
return meta, node_map, edge_map
def _get_shared_mem_metadata(graph_name):
''' Get the metadata of the graph through shared memory.
The metadata includes the number of nodes and the number of edges. In the future,
we can add more information, especially for heterograph.
'''
# The metadata has 5 elements: is_range_part, num_nodes, num_edges, num_partitions, part_id
# We might need to extend the list in the future.
shape = (5,)
dtype = F.int64
dtype = DTYPE_DICT[dtype]
data = empty_shared_mem(_get_ndata_path(graph_name, 'meta'), False, shape, dtype)
dlpack = data.to_dlpack()
meta = F.asnumpy(F.zerocopy_from_dlpack(dlpack))
is_range_part, num_nodes, num_edges, num_partitions, part_id = meta
# Load node map
length = num_partitions if is_range_part else num_nodes
data = empty_shared_mem(_get_ndata_path(graph_name, 'node_map'), False, (length,), dtype)
dlpack = data.to_dlpack()
node_map = F.zerocopy_from_dlpack(dlpack)
# Load edge_map
length = num_partitions if is_range_part else num_edges
data = empty_shared_mem(_get_edata_path(graph_name, 'edge_map'), False, (length,), dtype)
dlpack = data.to_dlpack()
edge_map = F.zerocopy_from_dlpack(dlpack)
return is_range_part, part_id, num_partitions, node_map, edge_map
def get_shared_mem_partition_book(graph_name, graph_part):
'''Get a graph partition book from shared memory.
A graph partition book of a specific graph can be serialized to shared memory.
We can reconstruct a graph partition book from shared memory.
Parameters
----------
graph_name : str
The name of the graph.
graph_part : DGLGraph
The graph structure of a partition.
Returns
-------
GraphPartitionBook
A graph partition book for a particular partition.
'''
if not exist_shared_mem_array(_get_ndata_path(graph_name, 'meta')):
return None
is_range_part, part_id, num_parts, node_map, edge_map = _get_shared_mem_metadata(graph_name)
if is_range_part == 1:
return RangePartitionBook(part_id, num_parts, node_map, edge_map)
else:
return BasicPartitionBook(part_id, num_parts, node_map, edge_map, graph_part)
[docs]class GraphPartitionBook:
""" The base class of the graph partition book.
For distributed training, a graph is partitioned into multiple parts and is loaded
in multiple machines. The partition book contains all necessary information to locate
nodes and edges in the cluster.
The partition book contains various partition information, including
* the number of partitions,
* the partition ID that a node or edge belongs to,
* the node IDs and the edge IDs that a partition has.
* the local IDs of nodes and edges in a partition.
Currently, there are two classes that implement `GraphPartitionBook`:
`BasicGraphPartitionBook` and `RangePartitionBook`. `BasicGraphPartitionBook`
stores the mappings between every individual node/edge ID and partition ID on
every machine, which usually consumes a lot of memory, while `RangePartitionBook`
calculates the mapping between node/edge IDs and partition IDs based on some small
metadata because nodes/edges have been relabeled to have IDs in the same partition
fall in a contiguous ID range. `RangePartitionBook` is usually a preferred way to
provide mappings between node/edge IDs and partition IDs.
A graph partition book is constructed automatically when a graph is partitioned.
When a graph partition is loaded, a graph partition book is loaded as well.
Please see :py:meth:`~dgl.distributed.partition.partition_graph`,
:py:meth:`~dgl.distributed.partition.load_partition` and
:py:meth:`~dgl.distributed.partition.load_partition_book` for more details.
"""
[docs] def shared_memory(self, graph_name):
"""Move the partition book to shared memory.
Parameters
----------
graph_name : str
The graph name. This name will be used to read the partition book from shared
memory in another process.
"""
[docs] def num_partitions(self):
"""Return the number of partitions.
Returns
-------
int
number of partitions
"""
[docs] def nid2partid(self, nids):
"""From global node IDs to partition IDs
Parameters
----------
nids : tensor
global node IDs
Returns
-------
tensor
partition IDs
"""
[docs] def eid2partid(self, eids):
"""From global edge IDs to partition IDs
Parameters
----------
eids : tensor
global edge IDs
Returns
-------
tensor
partition IDs
"""
[docs] def partid2nids(self, partid):
"""From partition id to global node IDs
Parameters
----------
partid : int
partition id
Returns
-------
tensor
node IDs
"""
[docs] def partid2eids(self, partid):
"""From partition id to global edge IDs
Parameters
----------
partid : int
partition id
Returns
-------
tensor
edge IDs
"""
[docs] def nid2localnid(self, nids, partid):
"""Get local node IDs within the given partition.
Parameters
----------
nids : tensor
global node IDs
partid : int
partition ID
Returns
-------
tensor
local node IDs
"""
[docs] def eid2localeid(self, eids, partid):
"""Get the local edge ids within the given partition.
Parameters
----------
eids : tensor
global edge ids
partid : int
partition ID
Returns
-------
tensor
local edge ids
"""
@property
def partid(self):
"""Get the current partition id
Return
------
int
The partition id of current machine
"""
class BasicPartitionBook(GraphPartitionBook):
"""This provides the most flexible way to store parition information.
The partition book maintains the mapping of every single node IDs and edge IDs to
partition IDs. This is very flexible at the coast of large memory consumption.
On a large graph, the mapping consumes significant memory and this partition book
is not recommended.
Parameters
----------
part_id : int
partition id of current partition book
num_parts : int
number of total partitions
node_map : tensor
global node id mapping to partition id
edge_map : tensor
global edge id mapping to partition id
part_graph : DGLGraph
The graph partition structure.
"""
def __init__(self, part_id, num_parts, node_map, edge_map, part_graph):
assert part_id >= 0, 'part_id cannot be a negative number.'
assert num_parts > 0, 'num_parts must be greater than zero.'
self._part_id = int(part_id)
self._num_partitions = int(num_parts)
self._nid2partid = F.tensor(node_map)
assert F.dtype(self._nid2partid) == F.int64, \
'the node map must be stored in an integer array'
self._eid2partid = F.tensor(edge_map)
assert F.dtype(self._eid2partid) == F.int64, \
'the edge map must be stored in an integer array'
# Get meta data of the partition book.
self._partition_meta_data = []
_, nid_count = np.unique(F.asnumpy(self._nid2partid), return_counts=True)
_, eid_count = np.unique(F.asnumpy(self._eid2partid), return_counts=True)
for partid in range(self._num_partitions):
part_info = {}
part_info['machine_id'] = partid
part_info['num_nodes'] = int(nid_count[partid])
part_info['num_edges'] = int(eid_count[partid])
self._partition_meta_data.append(part_info)
# Get partid2nids
self._partid2nids = []
sorted_nid = F.tensor(np.argsort(F.asnumpy(self._nid2partid)))
start = 0
for offset in nid_count:
part_nids = sorted_nid[start:start+offset]
start += offset
self._partid2nids.append(part_nids)
# Get partid2eids
self._partid2eids = []
sorted_eid = F.tensor(np.argsort(F.asnumpy(self._eid2partid)))
start = 0
for offset in eid_count:
part_eids = sorted_eid[start:start+offset]
start += offset
self._partid2eids.append(part_eids)
# Get nidg2l
self._nidg2l = [None] * self._num_partitions
global_id = part_graph.ndata[NID]
max_global_id = np.amax(F.asnumpy(global_id))
# TODO(chao): support int32 index
g2l = F.zeros((max_global_id+1), F.int64, F.context(global_id))
g2l = F.scatter_row(g2l, global_id, F.arange(0, len(global_id)))
self._nidg2l[self._part_id] = g2l
# Get eidg2l
self._eidg2l = [None] * self._num_partitions
global_id = part_graph.edata[EID]
max_global_id = np.amax(F.asnumpy(global_id))
# TODO(chao): support int32 index
g2l = F.zeros((max_global_id+1), F.int64, F.context(global_id))
g2l = F.scatter_row(g2l, global_id, F.arange(0, len(global_id)))
self._eidg2l[self._part_id] = g2l
# node size and edge size
self._edge_size = len(self.partid2eids(self._part_id))
self._node_size = len(self.partid2nids(self._part_id))
def shared_memory(self, graph_name):
"""Move data to shared memory.
"""
self._meta, self._nid2partid, self._eid2partid = _move_metadata_to_shared_mem(
graph_name, self._num_nodes(), self._num_edges(), self._part_id, self._num_partitions,
self._nid2partid, self._eid2partid, False)
def num_partitions(self):
"""Return the number of partitions.
"""
return self._num_partitions
def metadata(self):
"""Return the partition meta data.
"""
return self._partition_meta_data
def _num_nodes(self):
""" The total number of nodes
"""
return len(self._nid2partid)
def _num_edges(self):
""" The total number of edges
"""
return len(self._eid2partid)
def nid2partid(self, nids):
"""From global node IDs to partition IDs
"""
return F.gather_row(self._nid2partid, nids)
def eid2partid(self, eids):
"""From global edge IDs to partition IDs
"""
return F.gather_row(self._eid2partid, eids)
def partid2nids(self, partid):
"""From partition id to global node IDs
"""
return self._partid2nids[partid]
def partid2eids(self, partid):
"""From partition id to global edge IDs
"""
return self._partid2eids[partid]
def nid2localnid(self, nids, partid):
"""Get local node IDs within the given partition.
"""
if partid != self._part_id:
raise RuntimeError('Now GraphPartitionBook does not support \
getting remote tensor of nid2localnid.')
return F.gather_row(self._nidg2l[partid], nids)
def eid2localeid(self, eids, partid):
"""Get the local edge ids within the given partition.
"""
if partid != self._part_id:
raise RuntimeError('Now GraphPartitionBook does not support \
getting remote tensor of eid2localeid.')
return F.gather_row(self._eidg2l[partid], eids)
@property
def partid(self):
"""Get the current partition id
"""
return self._part_id
class RangePartitionBook(GraphPartitionBook):
"""This partition book supports more efficient storage of partition information.
This partition book is used if the nodes and edges of a graph partition are assigned
with contiguous IDs. It uses very small amount of memory to store the partition
information.
Parameters
----------
part_id : int
partition id of current partition book
num_parts : int
number of total partitions
node_map : tensor
map global node id to partition id
edge_map : tensor
map global edge id to partition id
"""
def __init__(self, part_id, num_parts, node_map, edge_map):
assert part_id >= 0, 'part_id cannot be a negative number.'
assert num_parts > 0, 'num_parts must be greater than zero.'
self._partid = part_id
self._num_partitions = num_parts
if not isinstance(node_map, np.ndarray):
node_map = F.asnumpy(node_map)
if not isinstance(edge_map, np.ndarray):
edge_map = F.asnumpy(edge_map)
self._node_map = node_map
self._edge_map = edge_map
# Get meta data of the partition book
self._partition_meta_data = []
for partid in range(self._num_partitions):
nrange_start = node_map[partid - 1] if partid > 0 else 0
nrange_end = node_map[partid]
erange_start = edge_map[partid - 1] if partid > 0 else 0
erange_end = edge_map[partid]
part_info = {}
part_info['machine_id'] = partid
part_info['num_nodes'] = int(nrange_end - nrange_start)
part_info['num_edges'] = int(erange_end - erange_start)
self._partition_meta_data.append(part_info)
def shared_memory(self, graph_name):
"""Move data to shared memory.
"""
self._meta = _move_metadata_to_shared_mem(
graph_name, self._num_nodes(), self._num_edges(), self._partid,
self._num_partitions, F.tensor(self._node_map), F.tensor(self._edge_map), True)
def num_partitions(self):
"""Return the number of partitions.
"""
return self._num_partitions
def _num_nodes(self):
""" The total number of nodes
"""
return int(self._node_map[-1])
def _num_edges(self):
""" The total number of edges
"""
return int(self._edge_map[-1])
def metadata(self):
"""Return the partition meta data.
"""
return self._partition_meta_data
def nid2partid(self, nids):
"""From global node IDs to partition IDs
"""
nids = utils.toindex(nids)
ret = np.searchsorted(self._node_map, nids.tonumpy(), side='right')
ret = utils.toindex(ret)
return ret.tousertensor()
def eid2partid(self, eids):
"""From global edge IDs to partition IDs
"""
eids = utils.toindex(eids)
ret = np.searchsorted(self._edge_map, eids.tonumpy(), side='right')
ret = utils.toindex(ret)
return ret.tousertensor()
def partid2nids(self, partid):
"""From partition id to global node IDs
"""
# TODO do we need to cache it?
start = self._node_map[partid - 1] if partid > 0 else 0
end = self._node_map[partid]
return F.arange(start, end)
def partid2eids(self, partid):
"""From partition id to global edge IDs
"""
# TODO do we need to cache it?
start = self._edge_map[partid - 1] if partid > 0 else 0
end = self._edge_map[partid]
return F.arange(start, end)
def nid2localnid(self, nids, partid):
"""Get local node IDs within the given partition.
"""
if partid != self._partid:
raise RuntimeError('Now RangePartitionBook does not support \
getting remote tensor of nid2localnid.')
nids = utils.toindex(nids)
nids = nids.tousertensor()
start = self._node_map[partid - 1] if partid > 0 else 0
return nids - int(start)
def eid2localeid(self, eids, partid):
"""Get the local edge ids within the given partition.
"""
if partid != self._partid:
raise RuntimeError('Now RangePartitionBook does not support \
getting remote tensor of eid2localeid.')
eids = utils.toindex(eids)
eids = eids.tousertensor()
start = self._edge_map[partid - 1] if partid > 0 else 0
return eids - int(start)
@property
def partid(self):
"""Get the current partition id
"""
return self._partid
NODE_PART_POLICY = 'node'
EDGE_PART_POLICY = 'edge'
[docs]class PartitionPolicy(object):
"""This defines a partition policy for a distributed tensor or distributed embedding.
When DGL shards tensors and stores them in a cluster of machines, it requires
partition policies that map rows of the tensors to machines in the cluster.
Although an arbitrary partition policy can be defined, DGL currently supports
two partition policies for mapping nodes and edges to machines. To define a partition
policy from a graph partition book, users need to specify the policy name ('node' or 'edge').
Parameters
----------
policy_str : str
Partition policy name, e.g., 'edge' or 'node'.
partition_book : GraphPartitionBook
A graph partition book
"""
def __init__(self, policy_str, partition_book):
# TODO(chao): support more policies for HeteroGraph
assert policy_str in (EDGE_PART_POLICY, NODE_PART_POLICY), \
'policy_str must be \'edge\' or \'node\'.'
self._policy_str = policy_str
self._part_id = partition_book.partid
self._partition_book = partition_book
@property
def policy_str(self):
"""Get the policy name
Returns
-------
str
The name of the partition policy.
"""
return self._policy_str
@property
def part_id(self):
"""Get partition ID
Returns
-------
int
The partition ID
"""
return self._part_id
@property
def partition_book(self):
"""Get partition book
Returns
-------
GraphPartitionBook
The graph partition book
"""
return self._partition_book
[docs] def to_local(self, id_tensor):
"""Mapping global ID to local ID.
Parameters
----------
id_tensor : tensor
Gloabl ID tensor
Return
------
tensor
local ID tensor
"""
if self._policy_str == EDGE_PART_POLICY:
return self._partition_book.eid2localeid(id_tensor, self._part_id)
elif self._policy_str == NODE_PART_POLICY:
return self._partition_book.nid2localnid(id_tensor, self._part_id)
else:
raise RuntimeError('Cannot support policy: %s ' % self._policy_str)
[docs] def to_partid(self, id_tensor):
"""Mapping global ID to partition ID.
Parameters
----------
id_tensor : tensor
Global ID tensor
Return
------
tensor
partition ID
"""
if self._policy_str == EDGE_PART_POLICY:
return self._partition_book.eid2partid(id_tensor)
elif self._policy_str == NODE_PART_POLICY:
return self._partition_book.nid2partid(id_tensor)
else:
raise RuntimeError('Cannot support policy: %s ' % self._policy_str)
[docs] def get_part_size(self):
"""Get data size of current partition.
Returns
-------
int
data size
"""
if self._policy_str == EDGE_PART_POLICY:
return len(self._partition_book.partid2eids(self._part_id))
elif self._policy_str == NODE_PART_POLICY:
return len(self._partition_book.partid2nids(self._part_id))
else:
raise RuntimeError('Cannot support policy: %s ' % self._policy_str)
[docs] def get_size(self):
"""Get the full size of the data.
Returns
-------
int
data size
"""
if self._policy_str == EDGE_PART_POLICY:
return self._partition_book._num_edges()
elif self._policy_str == NODE_PART_POLICY:
return self._partition_book._num_nodes()
else:
raise RuntimeError('Cannot support policy: %s ' % self._policy_str)