"""Node embedding optimizers"""
import abc
from abc import abstractmethod
import torch as th
from ...utils import get_shared_mem_array, create_shared_mem_array
from ...nn.pytorch import NodeEmbedding
class SparseGradOptimizer(abc.ABC):
r''' The abstract sparse optimizer.
Note: dgl sparse optimizer only work with dgl.NodeEmbedding
Parameters
----------
params : list of NodeEmbedding
The list of NodeEmbeddings.
lr : float
The learning rate.
'''
def __init__(self, params, lr):
self._params = params
self._lr = lr
self._rank = None
self._world_size = None
self._shared_cache = {}
self._clean_grad = False
self._opt_meta = {}
# hold released shared memory to let other process to munmap it first
# otherwise it will crash the training
self.shmem_buffer_holder = []
for emb in params:
assert isinstance(emb, NodeEmbedding), \
'DGL SparseOptimizer only supports dgl.nn.NodeEmbedding'
if self._rank is None:
self._rank = emb.rank
self._world_size = emb.world_size
else:
assert self._rank == emb.rank, \
'MultiGPU rank for each embedding should be same.'
assert self._world_size == emb.world_size, \
'MultiGPU world_size for each embedding should be same.'
emb_name = emb.name
if self._rank == 0: # the master gpu process
opt_meta = create_shared_mem_array(emb_name+'_opt_meta', \
(self._world_size, self._world_size), th.int32).zero_()
if self._rank == 0:
emb.store.set(emb_name+'_opt_meta', emb_name)
self._opt_meta[emb_name] = opt_meta
elif self._rank > 0:
# receive
emb.store.wait([emb_name+'_opt_meta'])
opt_meta = get_shared_mem_array(emb_name+'_opt_meta', \
(self._world_size, self._world_size), th.int32)
self._opt_meta[emb_name] = opt_meta
def step(self):
''' The step function.
The step function is invoked at the end of every batch to update embeddings
'''
with th.no_grad():
# Frequently alloc and free shared memory to hold intermediate tensor is expensive
# We cache shared memory buffers in shared_emb.
shared_emb = {emb.name: ([], []) for emb in self._params}
# Go through all sparse embeddings
for emb in self._params: # pylint: disable=too-many-nested-blocks
emb_name = emb.name
# we need to combine gradients from multiple forward paths
idx = []
grad = []
for i, data in emb._trace:
idx.append(i)
grad.append(data.grad.data)
idx = th.cat(idx, dim=0)
grad = th.cat(grad, dim=0)
device = grad.device
idx_dtype = idx.dtype
grad_dtype = grad.dtype
grad_dim = grad.shape[1]
if self._world_size > 1:
if emb_name not in self._shared_cache:
self._shared_cache[emb_name] = {}
# Each training process takes the resposibility of updating a range
# of node embeddings, thus we can parallel the gradient update.
# The overall progress includes:
# 1. In each training process:
# 1.a Deciding which process a node embedding belongs to according
# to the formula: process_id = node_idx mod num_of_process(N)
# 1.b Split the node index tensor and gradient tensor into N parts
# according to step 1.
# 1.c Write each node index sub-tensor and gradient sub-tensor into
# different DGL shared memory buffers.
# 2. Cross training process synchronization
# 3. In each traning process:
# 3.a Collect node index sub-tensors and gradient sub-tensors
# 3.b Do gradient update
# 4. Done
idx_split = th.remainder(idx, self._world_size).long()
for i in range(self._world_size):
mask = idx_split == i
idx_i = idx[mask]
grad_i = grad[mask]
if i == self._rank:
shared_emb[emb_name][0].append(idx_i)
shared_emb[emb_name][1].append(grad_i)
else:
# currently nccl does not support Alltoallv operation
# we need to use CPU shared memory to share gradient
# across processes
idx_i = idx_i.to(th.device('cpu'))
grad_i = grad_i.to(th.device('cpu'))
idx_shmem_name = 'idx_{}_{}_{}'.format(emb_name, self._rank, i)
grad_shmem_name = 'grad_{}_{}_{}'.format(emb_name, self._rank, i)
# Create shared memory to hold temporary index and gradient tensor for
# cross-process send and recv.
if idx_shmem_name not in self._shared_cache[emb_name] or \
self._shared_cache[emb_name][idx_shmem_name].shape[0] \
< idx_i.shape[0]:
if idx_shmem_name in self._shared_cache[emb_name]:
self.shmem_buffer_holder.append(
self._shared_cache[emb_name][idx_shmem_name])
self.shmem_buffer_holder.append(
self._shared_cache[emb_name][grad_shmem_name])
# The total number of buffers is the number of NodeEmbeddings *
# world_size * (world_size - 1). The minimun buffer size is 128.
#
# We extend the buffer by idx_i.shape[0] * 2 to avoid
# frequent shared memory allocation.
# The overall buffer cost will be smaller than three times
# the maximum memory requirement for sharing gradients.
buffer_size = 128 if idx_i.shape[0] < 128 else idx_i.shape[0] * 2
idx_shmem = create_shared_mem_array(idx_shmem_name, \
(buffer_size,), idx_dtype)
grad_shmem = create_shared_mem_array(grad_shmem_name, \
(buffer_size, grad_dim), grad_dtype)
self._shared_cache[emb_name][idx_shmem_name] = idx_shmem
self._shared_cache[emb_name][grad_shmem_name] = grad_shmem
# Fill shared memory with temporal index tensor and gradient tensor
self._shared_cache[emb_name][idx_shmem_name][:idx_i.shape[0]] \
= idx_i
self._shared_cache[emb_name][grad_shmem_name][:idx_i.shape[0]] \
= grad_i
self._opt_meta[emb_name][self._rank][i] = idx_i.shape[0]
else:
shared_emb[emb_name][0].append(idx)
shared_emb[emb_name][1].append(grad)
# make sure the idx shape is passed to each process through opt_meta
if self._world_size > 1:
th.distributed.barrier()
for emb in self._params: # pylint: disable=too-many-nested-blocks
emb_name = emb.name
if self._world_size > 1:
# gather gradients from all other processes
for i in range(self._world_size):
if i != self._rank:
idx_shmem_name = 'idx_{}_{}_{}'.format(emb_name, i, self._rank)
grad_shmem_name = 'grad_{}_{}_{}'.format(emb_name, i, self._rank)
size = self._opt_meta[emb_name][i][self._rank]
# Retrive shared memory holding the temporal index and gradient
# tensor that is sent to current training process
if idx_shmem_name not in self._shared_cache[emb_name] or \
self._shared_cache[emb_name][idx_shmem_name].shape[0] < size:
buffer_size = 128 if size < 128 else size * 2
idx_shmem = get_shared_mem_array(idx_shmem_name, \
(buffer_size,), idx_dtype)
grad_shmem = get_shared_mem_array(grad_shmem_name, \
(buffer_size, grad_dim), grad_dtype)
self._shared_cache[emb_name][idx_shmem_name] = idx_shmem
self._shared_cache[emb_name][grad_shmem_name] = grad_shmem
idx_i = self._shared_cache[emb_name][idx_shmem_name][:size]
grad_i = self._shared_cache[emb_name][grad_shmem_name][:size]
shared_emb[emb_name][0].append(idx_i.to(device,
non_blocking=True))
shared_emb[emb_name][1].append(grad_i.to(device,
non_blocking=True))
if self._clean_grad:
# clean gradient track
for emb in self._params:
emb.reset_trace()
self._clean_grad = False
for emb in self._params:
emb_name = emb.name
idx = th.cat(shared_emb[emb_name][0], dim=0)
grad = th.cat(shared_emb[emb_name][1], dim=0)
self.update(idx, grad, emb)
# synchronized gradient update
if self._world_size > 1:
th.distributed.barrier()
@abstractmethod
def update(self, idx, grad, emb):
""" Update embeddings in a sparse manner
Sparse embeddings are updated in mini batches. we maintains gradient states for
each embedding so they can be updated separately.
Parameters
----------
idx : tensor
Index of the embeddings to be updated.
grad : tensor
Gradient of each embedding.
emb : dgl.nn.NodeEmbedding
Sparse node embedding to update.
"""
def zero_grad(self):
"""clean grad cache
"""
self._clean_grad = True
[docs]class SparseAdagrad(SparseGradOptimizer):
r''' Node embedding optimizer using the Adagrad algorithm.
This optimizer implements a sparse version of Adagrad algorithm for
optimizing :class:`dgl.nn.NodeEmbedding`. Being sparse means it only updates
the embeddings whose gradients have updates, which are usually a very
small portion of the total embeddings.
Adagrad maintains a :math:`G_{t,i,j}` for every parameter in the embeddings, where
:math:`G_{t,i,j}=G_{t-1,i,j} + g_{t,i,j}^2` and :math:`g_{t,i,j}` is the gradient of
the dimension :math:`j` of embedding :math:`i` at step :math:`t`.
NOTE: The support of sparse Adagrad optimizer is experimental.
Parameters
----------
params : list[dgl.nn.NodeEmbedding]
The list of dgl.nn.NodeEmbedding.
lr : float
The learning rate.
eps : float, Optional
The term added to the denominator to improve numerical stability
Default: 1e-10
Examples
--------
>>> def initializer(emb):
th.nn.init.xavier_uniform_(emb)
return emb
>>> emb = dgl.nn.NodeEmbedding(g.number_of_nodes(), 10, 'emb', init_func=initializer)
>>> optimizer = dgl.optim.SparseAdagrad([emb], lr=0.001)
>>> for blocks in dataloader:
... ...
... feats = emb(nids, gpu_0)
... loss = F.sum(feats + 1, 0)
... loss.backward()
... optimizer.step()
'''
def __init__(self, params, lr, eps=1e-10):
super(SparseAdagrad, self).__init__(params, lr)
self._eps = eps
# We need to register a state sum for each embedding in the kvstore.
for emb in params:
assert isinstance(emb, NodeEmbedding), \
'SparseAdagrad only supports dgl.nn.NodeEmbedding'
if self._rank <= 0:
emb_name = emb.name
state = create_shared_mem_array(emb_name+'_state', \
emb.emb_tensor.shape, th.float32).zero_()
if self._rank == 0:
if self._world_size > 1:
emb.store.set(emb_name+'_opt', emb_name)
elif self._rank > 0:
# receive
emb_name = emb.name
emb.store.wait([emb_name+'_opt'])
state = get_shared_mem_array(emb_name+'_state', \
emb.emb_tensor.shape, th.float32)
emb.set_optm_state(state)
def update(self, idx, grad, emb):
""" Update embeddings in a sparse manner
Sparse embeddings are updated in mini batches. we maintains gradient states for
each embedding so they can be updated separately.
Parameters
----------
idx : tensor
Index of the embeddings to be updated.
grad : tensor
Gradient of each embedding.
emb : dgl.nn.NodeEmbedding
Sparse embedding to update.
"""
eps = self._eps
clr = self._lr
# the update is non-linear so indices must be unique
grad_indices, inverse, cnt = th.unique(idx, return_inverse=True, return_counts=True)
grad_values = th.zeros((grad_indices.shape[0], grad.shape[1]), device=grad.device)
grad_values.index_add_(0, inverse, grad)
grad_values = grad_values / cnt.unsqueeze(1)
grad_sum = (grad_values * grad_values)
state = emb.optm_state
state_dev = state.device
state_idx = grad_indices.to(state_dev)
grad_state = state[state_idx].to(grad.device)
grad_state += grad_sum
state[state_idx] = grad_state.to(state_dev)
std_values = grad_state.add_(eps).sqrt_()
tmp = clr * grad_values / std_values
emb.emb_tensor[state_idx] -= tmp.to(state_dev)
[docs]class SparseAdam(SparseGradOptimizer):
r''' Node embedding optimizer using the Adam algorithm.
This optimizer implements a sparse version of Adagrad algorithm for
optimizing :class:`dgl.nn.NodeEmbedding`. Being sparse means it only
updates the embeddings whose gradients have updates, which are usually
a very small portion of the total embeddings.
Adam maintains a :math:`Gm_{t,i,j}` and `Gp_{t,i,j}` for every parameter
in the embeddings, where
:math:`Gm_{t,i,j}=beta1 * Gm_{t-1,i,j} + (1-beta1) * g_{t,i,j}`,
:math:`Gp_{t,i,j}=beta2 * Gp_{t-1,i,j} + (1-beta2) * g_{t,i,j}^2`,
:math:`g_{t,i,j} = lr * Gm_{t,i,j} / (1 - beta1^t) / \sqrt{Gp_{t,i,j} / (1 - beta2^t)}` and
:math:`g_{t,i,j}` is the gradient of the dimension :math:`j` of embedding :math:`i`
at step :math:`t`.
NOTE: The support of sparse Adam optimizer is experimental.
Parameters
----------
params : list[dgl.nn.NodeEmbedding]
The list of dgl.nn.NodeEmbeddings.
lr : float
The learning rate.
betas : tuple[float, float], Optional
Coefficients used for computing running averages of gradient and its square.
Default: (0.9, 0.999)
eps : float, Optional
The term added to the denominator to improve numerical stability
Default: 1e-8
Examples
--------
>>> def initializer(emb):
th.nn.init.xavier_uniform_(emb)
return emb
>>> emb = dgl.nn.NodeEmbedding(g.number_of_nodes(), 10, 'emb', init_func=initializer)
>>> optimizer = dgl.optim.SparseAdam([emb], lr=0.001)
>>> for blocks in dataloader:
... ...
... feats = emb(nids, gpu_0)
... loss = F.sum(feats + 1, 0)
... loss.backward()
... optimizer.step()
'''
def __init__(self, params, lr, betas=(0.9, 0.999), eps=1e-08):
super(SparseAdam, self).__init__(params, lr)
self._lr = lr
self._beta1 = betas[0]
self._beta2 = betas[1]
self._eps = eps
# We need to register a state sum for each embedding in the kvstore.
for emb in params:
assert isinstance(emb, NodeEmbedding), \
'SparseAdam only supports dgl.nn.NodeEmbedding'
if self._rank <= 0:
emb_name = emb.name
state_step = create_shared_mem_array(emb_name+'_step', \
(emb.emb_tensor.shape[0],), th.float32).zero_()
state_mem = create_shared_mem_array(emb_name+'_mem', \
emb.emb_tensor.shape, th.float32).zero_()
state_power = create_shared_mem_array(emb_name+'_power', \
emb.emb_tensor.shape, th.float32).zero_()
if self._rank == 0:
emb_name = emb.name
if self._world_size > 1:
emb.store.set(emb_name+'_opt', emb_name)
elif self._rank > 0:
# receive
emb_name = emb.name
emb.store.wait([emb_name+'_opt'])
state_step = get_shared_mem_array(emb_name+'_step', \
(emb.emb_tensor.shape[0],), th.float32)
state_mem = get_shared_mem_array(emb_name+'_mem', \
emb.emb_tensor.shape, th.float32)
state_power = get_shared_mem_array(emb_name+'_power', \
emb.emb_tensor.shape, th.float32)
state = (state_step, state_mem, state_power)
emb.set_optm_state(state)
def update(self, idx, grad, emb):
""" Update embeddings in a sparse manner
Sparse embeddings are updated in mini batches. we maintains gradient states for
each embedding so they can be updated separately.
Parameters
----------
idx : tensor
Index of the embeddings to be updated.
grad : tensor
Gradient of each embedding.
emb : dgl.nn.NodeEmbedding
Sparse embedding to update.
"""
with th.no_grad():
beta1 = self._beta1
beta2 = self._beta2
eps = self._eps
clr = self._lr
state_step, state_mem, state_power = emb.optm_state
exec_dev = grad.device
state_dev = state_step.device
# There can be duplicated indices due to sampling.
# Thus unique them here and average the gradient here.
grad_indices, inverse, cnt = th.unique(idx,
return_inverse=True,
return_counts=True)
state_idx = grad_indices.to(state_dev)
state_step[state_idx] += 1
state_step = state_step[state_idx].to(exec_dev, non_blocking=True)
orig_mem = state_mem[state_idx].to(exec_dev, non_blocking=True)
orig_power = state_power[state_idx].to(exec_dev, non_blocking=True)
grad_values = th.zeros((grad_indices.shape[0], grad.shape[1]), device=exec_dev)
grad_values.index_add_(0, inverse, grad)
grad_values = grad_values / cnt.unsqueeze(1)
grad_mem = grad_values
grad_power = grad_values * grad_values
update_mem = beta1 * orig_mem + (1.-beta1) * grad_mem
update_power = beta2 * orig_power + (1.-beta2) * grad_power
state_mem[state_idx] = update_mem.to(state_dev, non_blocking=True)
state_power[state_idx] = update_power.to(state_dev, non_blocking=True)
update_mem_corr = update_mem / (1. - th.pow(th.tensor(beta1, device=exec_dev),
state_step)).unsqueeze(1)
update_power_corr = update_power / (1. - th.pow(th.tensor(beta2, device=exec_dev),
state_step)).unsqueeze(1)
std_values = clr * update_mem_corr / (th.sqrt(update_power_corr) + eps)
emb.emb_tensor[state_idx] -= std_values.to(state_dev)