Skip to content

Commit

Permalink
[Example] Update the example of distributed GNN training of RGCN (dml…
Browse files Browse the repository at this point in the history
…c#2709)

* support gpu training.

* remove unnecessary arguments.

* update README.

* update time measurement.

* add zero_grad.

Co-authored-by: Ubuntu <[email protected]>
Co-authored-by: Jinjing Zhou <[email protected]>
  • Loading branch information
3 people authored Mar 1, 2021
1 parent 668bd92 commit d0638b1
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 28 deletions.
2 changes: 1 addition & 1 deletion examples/pytorch/rgcn/experimental/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ python3 ~/workspace/dgl/tools/launch.py \
--num_samplers 4 \
--part_config data/ogbn-mag.json \
--ip_config ip_config.txt \
"python3 entity_classify_dist.py --graph-name ogbn-mag --dataset ogbn-mag --fanout='25,25' --batch-size 512 --n-hidden 64 --lr 0.01 --eval-batch-size 16 --low-mem --dropout 0.5 --use-self-loop --n-bases 2 --n-epochs 3 --layer-norm --ip-config ip_config.txt --num-workers 4 --num-servers 1 --sparse-embedding --sparse-lr 0.06 --node-feats"
"python3 entity_classify_dist.py --graph-name ogbn-mag --dataset ogbn-mag --fanout='25,25' --batch-size 1024 --n-hidden 64 --lr 0.01 --eval-batch-size 1024 --low-mem --dropout 0.5 --use-self-loop --n-bases 2 --n-epochs 3 --layer-norm --ip-config ip_config.txt --num-workers 4 --num-servers 1 --sparse-embedding --sparse-lr 0.06 --num_gpus 1"
```

We can get the performance score at the second epoch:
Expand Down
59 changes: 34 additions & 25 deletions examples/pytorch/rgcn/experimental/entity_classify_dist.py
Original file line number Diff line number Diff line change
Expand Up @@ -371,12 +371,22 @@ def run(args, device, data):
low_mem=args.low_mem,
layer_norm=args.layer_norm)
model = model.to(device)

if not args.standalone:
model = th.nn.parallel.DistributedDataParallel(model)
# If there are dense parameters in the embedding layer
# or we use Pytorch saprse embeddings.
if len(embed_layer.node_projs) > 0 or not args.dgl_sparse:
embed_layer = DistributedDataParallel(embed_layer, device_ids=None, output_device=None)
if args.num_gpus == -1:
model = DistributedDataParallel(model)
# If there are dense parameters in the embedding layer
# or we use Pytorch saprse embeddings.
if len(embed_layer.node_projs) > 0 or not args.dgl_sparse:
embed_layer = DistributedDataParallel(embed_layer)
else:
dev_id = g.rank() % args.num_gpus
model = DistributedDataParallel(model, device_ids=[dev_id], output_device=dev_id)
# If there are dense parameters in the embedding layer
# or we use Pytorch saprse embeddings.
if len(embed_layer.node_projs) > 0 or not args.dgl_sparse:
embed_layer = embed_layer.to(device)
embed_layer = DistributedDataParallel(embed_layer, device_ids=[dev_id], output_device=dev_id)

if args.sparse_embedding:
if args.dgl_sparse and args.standalone:
Expand All @@ -391,14 +401,14 @@ def run(args, device, data):
else:
emb_optimizer = th.optim.SparseAdam(list(embed_layer.module.node_embeds.parameters()), lr=args.sparse_lr)
print('optimize Pytorch sparse embedding:', embed_layer.module.node_embeds)

dense_params = list(model.parameters())
if args.node_feats:
if args.standalone:
dense_params += list(embed_layer.node_projs.parameters())
print('optimize dense projection:', embed_layer.node_projs)
else:
dense_params += list(embed_layer.module.node_projs.parameters())
print('optimize dense projection:', embed_layer.module.node_projs)
if args.standalone:
dense_params += list(embed_layer.node_projs.parameters())
print('optimize dense projection:', embed_layer.node_projs)
else:
dense_params += list(embed_layer.module.node_projs.parameters())
print('optimize dense projection:', embed_layer.module.node_projs)
optimizer = th.optim.Adam(dense_params, lr=args.lr, weight_decay=args.l2norm)
else:
all_params = list(model.parameters()) + list(embed_layer.parameters())
Expand Down Expand Up @@ -439,7 +449,7 @@ def run(args, device, data):
for block in blocks:
gen_norm(block)
feats = embed_layer(blocks[0].srcdata[dgl.NID], blocks[0].srcdata[dgl.NTYPE])
label = labels[seeds]
label = labels[seeds].to(device)
copy_time = time.time()
feat_copy_t.append(copy_time - tic_step)

Expand All @@ -450,17 +460,17 @@ def run(args, device, data):

# backward
optimizer.zero_grad()
if args.sparse_embedding and not args.dgl_sparse:
if args.sparse_embedding:
emb_optimizer.zero_grad()
loss.backward()
optimizer.step()
if args.sparse_embedding:
emb_optimizer.step()
compute_end = time.time()
forward_t.append(forward_end - copy_time)
backward_t.append(compute_end - forward_end)

# Aggregate gradients in multiple nodes.
# Update model parameters
optimizer.step()
if args.sparse_embedding:
emb_optimizer.step()
update_t.append(time.time() - compute_end)
step_t = time.time() - start
step_time.append(step_t)
Expand Down Expand Up @@ -504,7 +514,10 @@ def main(args):
g.rank(), len(train_nid), len(np.intersect1d(train_nid.numpy(), local_nid)),
len(val_nid), len(np.intersect1d(val_nid.numpy(), local_nid)),
len(test_nid), len(np.intersect1d(test_nid.numpy(), local_nid))))
device = th.device('cpu')
if args.num_gpus == -1:
device = th.device('cpu')
else:
device = th.device('cuda:'+str(g.rank() % args.num_gpus))
labels = g.nodes['paper'].data['labels'][np.arange(g.number_of_nodes('paper'))]
all_val_nid = th.LongTensor(np.nonzero(g.nodes['paper'].data['val_mask'][np.arange(g.number_of_nodes('paper'))])).squeeze()
all_test_nid = th.LongTensor(np.nonzero(g.nodes['paper'].data['test_mask'][np.arange(g.number_of_nodes('paper'))])).squeeze()
Expand All @@ -524,8 +537,8 @@ def main(args):
parser.add_argument('--num-servers', type=int, default=1, help='Server count on each machine.')

# rgcn related
parser.add_argument("--gpu", type=str, default='0',
help="gpu")
parser.add_argument('--num_gpus', type=int, default=-1,
help="the number of GPU device. Use -1 for CPU training")
parser.add_argument("--dropout", type=float, default=0,
help="dropout probability")
parser.add_argument("--n-hidden", type=int, default=16,
Expand Down Expand Up @@ -561,14 +574,10 @@ def main(args):
help="Number of workers for distributed dataloader.")
parser.add_argument("--low-mem", default=False, action='store_true',
help="Whether use low mem RelGraphCov")
parser.add_argument("--mix-cpu-gpu", default=False, action='store_true',
help="Whether store node embeddins in cpu")
parser.add_argument("--sparse-embedding", action='store_true',
help='Use sparse embedding for node embeddings.')
parser.add_argument("--dgl-sparse", action='store_true',
help='Whether to use DGL sparse embedding')
parser.add_argument('--node-feats', default=False, action='store_true',
help='Whether use node features')
parser.add_argument('--layer-norm', default=False, action='store_true',
help='Use layer norm')
parser.add_argument('--local_rank', type=int, help='get rank of the process')
Expand Down
20 changes: 18 additions & 2 deletions python/dgl/distributed/sparse_emb.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,11 @@ def __call__(self, idx):
self._trace.append((idx, emb))
return emb

def reset_trace(self):
'''Reset the traced data.
'''
self._trace = []

class SparseAdagradUDF:
''' The UDF to update the embeddings with sparse Adagrad.
Expand Down Expand Up @@ -151,6 +156,7 @@ class SparseAdagrad:
def __init__(self, params, lr):
self._params = params
self._lr = lr
self._clean_grad = False
# We need to register a state sum for each embedding in the kvstore.
for emb in params:
assert isinstance(emb, DistEmbedding), 'SparseAdagrad only supports DistEmbeding'
Expand Down Expand Up @@ -185,5 +191,15 @@ def step(self):
# after we push them.
grads = F.cat(grads, 0)
kvstore.push(name, idxs, grads)
# Clean up the old traces.
emb._trace = []

if self._clean_grad:
# clean gradient track
for emb in self._params:
emb.reset_trace()
self._clean_grad = False


def zero_grad(self):
"""clean grad cache
"""
self._clean_grad = True

0 comments on commit d0638b1

Please sign in to comment.