PyTorch 並列トレーニング DistributedDataParallel の完全なコード例

WBOY
リリース: 2023-04-10 20:51:01
転載
1290 人が閲覧しました

大規模なデータセットを使用して大規模なディープ ニューラル ネットワーク (DNN) をトレーニングするという問題は、ディープ ラーニングの分野における大きな課題です。 DNN とデータセットのサイズが増加するにつれて、これらのモデルをトレーニングするための計算要件とメモリ要件も増加します。そのため、コンピューティング リソースが限られている 1 台のマシンでこれらのモデルをトレーニングすることが困難または不可能になります。大規模なデータセットを使用して大規模な DNN をトレーニングする際の主な課題には次のようなものがあります。

  • トレーニング時間が長い: モデルの複雑さとモデルのサイズに応じて、トレーニング プロセスが完了するまでに数週間、場合によっては数か月かかる場合があります。データセット 。
  • メモリ制限: 大規模な DNN では、トレーニング中にすべてのモデル パラメーター、勾配、中間アクティベーションを保存するために大量のメモリが必要になる場合があります。これにより、メモリ不足エラーが発生し、単一マシンでトレーニングできるモデルのサイズが制限される可能性があります。

これらの課題に対処するために、モデル並列処理、データ並列処理、ハイブリッド並列処理、ハードウェア、ソフトウェア、アルゴリズムの最適化。

この記事では、PyTorch を使用したデータ並列処理とモデル並列処理を示します。

PyTorch 並列トレーニング DistributedDataParallel の完全なコード例

私たちが並列処理と呼ぶものは、一般に、トレーニング時間を短縮するために、複数の GPU または複数のマシン上でディープ ニューラル ネットワーク (dnn) をトレーニングすることを指します。データ並列処理の背後にある基本的な考え方は、トレーニング データをより小さなチャンクに分割し、各 GPU またはマシンで個別のデータ チャンクを処理させることです。次に、各ノードの結果が結合され、モデル パラメーターの更新に使用されます。データ並列処理では、モデル アーキテクチャは各ノードで同じですが、モデル パラメーターはノード間で分割されます。各ノードは、割り当てられたデータ チャンクを使用して独自のローカル モデルをトレーニングし、各トレーニング反復の終了時にモデル パラメーターがすべてのノード間で同期されます。このプロセスは、モデルが満足のいく結果に収束するまで繰り返されます。

以下では、完全なコード例として ResNet50 および CIFAR10 データ セットを使用します:

データ並列処理では、モデル アーキテクチャは各ノードで同じままですが、モデル パラメーターはノード間で異なります。が完了すると、各ノードは割り当てられたデータ チャンクを使用して独自のローカル モデルをトレーニングします。

PyTorch の DistributedDataParallel ライブラリは、ノード間で勾配とモデル パラメーターを効率的に通信および同期して、分散トレーニングを実現します。この記事では、ResNet50 および CIFAR10 データセットを使用して PyTorch でデータ並列処理を実装する方法の例を示します。コードは複数の GPU またはマシン上で実行され、各マシンがトレーニング データのサブセットを処理します。トレーニング プロセスは、PyTorch の DistributedDataParallel ライブラリを使用して並列化されます。

必要なライブラリをインポートします

import os
 from datetime import datetime
 from time import time
 import argparse
 import torchvision
 import torchvision.transforms as transforms
 import torch
 import torch.nn as nn
 import torch.distributed as dist
 from torch.nn.parallel import DistributedDataParallel
ログイン後にコピー

次に、GPU を確認します。

import subprocess
 result = subprocess.run(['nvidia-smi'], stdout=subprocess.PIPE)
 print(result.stdout.decode())
ログイン後にコピー

複数のサーバーで実行する必要があるため、それらを 1 つずつ手動で実行するのは現実的ではないため、スケジューラーが必要です。ここでは、SLURM ファイルを使用してコードを実行します (slurmLinux および Unix 系カーネル用の無料のオープン ソース ジョブ スケジューラ)。

def main():
 
 # get distributed configuration from Slurm environment
 
 parser = argparse.ArgumentParser()
 parser.add_argument('-b', '--batch-size', default=128, type =int,
 help='batch size. it will be divided in mini-batch for each worker')
 parser.add_argument('-e','--epochs', default=2, type=int, metavar='N',
 help='number of total epochs to run')
 parser.add_argument('-c','--checkpoint', default=None, type=str,
 help='path to checkpoint to load')
 args = parser.parse_args()
 
 rank = int(os.environ['SLURM_PROCID'])
 local_rank = int(os.environ['SLURM_LOCALID'])
 size = int(os.environ['SLURM_NTASKS'])
 master_addr = os.environ["SLURM_SRUN_COMM_HOST"]
 port = "29500"
 node_id = os.environ['SLURM_NODEID']
 ddp_arg = [rank, local_rank, size, master_addr, port, node_id]
 train(args, ddp_arg)
ログイン後にコピー

次に、次を使用します。分散トレーニングを実行するための DistributedDataParallel ライブラリ。

def train(args, ddp_arg):
 
 rank, local_rank, size, MASTER_ADDR, port, NODE_ID = ddp_arg
 
 # display info
 if rank == 0:
 #print(">>> Training on ", len(hostnames), " nodes and ", size, " processes, master node is ", MASTER_ADDR)
 print(">>> Training on ", size, " GPUs, master node is ", MASTER_ADDR)
 #print("- Process {} corresponds to GPU {} of node {}".format(rank, local_rank, NODE_ID))
 
 print("- Process {} corresponds to GPU {} of node {}".format(rank, local_rank, NODE_ID))
 
 
 # configure distribution method: define address and port of the master node and initialise communication backend (NCCL)
 #dist.init_process_group(backend='nccl', init_method='env://', world_size=size, rank=rank)
 dist.init_process_group(
 backend='nccl',
 init_method='tcp://{}:{}'.format(MASTER_ADDR, port),
 world_size=size,
 rank=rank
)
 
 # distribute model
 torch.cuda.set_device(local_rank)
 gpu = torch.device("cuda")
 #model = ResNet18(classes=10).to(gpu)
 model = torchvision.models.resnet50(pretrained=False).to(gpu)
 ddp_model = DistributedDataParallel(model, device_ids=[local_rank])
 if args.checkpoint is not None:
 map_location = {'cuda:%d' % 0: 'cuda:%d' % local_rank}
 ddp_model.load_state_dict(torch.load(args.checkpoint, map_location=map_location))
 
 # distribute batch size (mini-batch)
 batch_size = args.batch_size
 batch_size_per_gpu = batch_size // size
 
 # define loss function (criterion) and optimizer
 criterion = nn.CrossEntropyLoss()
 optimizer = torch.optim.SGD(ddp_model.parameters(), 1e-4)
 
 
 transform_train = transforms.Compose([
 transforms.RandomCrop(32, padding=4),
 transforms.RandomHorizontalFlip(),
 transforms.ToTensor(),
 transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
])
 
 # load data with distributed sampler
 #train_dataset = torchvision.datasets.CIFAR10(root='./data',
 # train=True,
 # transform=transform_train,
 # download=False)
 
 # load data with distributed sampler
 train_dataset = torchvision.datasets.CIFAR10(root='./data',
train=True,
transform=transform_train,
download=False)
 
 train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset,
 num_replicas=size,
 rank=rank)
 
 train_loader = torch.utils.data.DataLoader(dataset=train_dataset,
batch_size=batch_size_per_gpu,
shuffle=False,
num_workers=0,
pin_memory=True,
sampler=train_sampler)
 
 # training (timers and display handled by process 0)
 if rank == 0: start = datetime.now()
 total_step = len(train_loader)
 
 for epoch in range(args.epochs):
 if rank == 0: start_dataload = time()
 
 for i, (images, labels) in enumerate(train_loader):
 
 # distribution of images and labels to all GPUs
 images = images.to(gpu, non_blocking=True)
 labels = labels.to(gpu, non_blocking=True)
 
 if rank == 0: stop_dataload = time()
 
 if rank == 0: start_training = time()
 
 # forward pass
 outputs = ddp_model(images)
 loss = criterion(outputs, labels)
 
 # backward and optimize
 optimizer.zero_grad()
 loss.backward()
 optimizer.step()
 
 if rank == 0: stop_training = time()
 if (i + 1) % 10 == 0 and rank == 0:
 print('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}, Time data load: {:.3f}ms, Time training: {:.3f}ms'.format(epoch + 1, args.epochs,
 i + 1, total_step, loss.item(), (stop_dataload - start_dataload)*1000,
(stop_training - start_training)*1000))
 if rank == 0: start_dataload = time()
 
 #Save checkpoint at every end of epoch
 if rank == 0:
 torch.save(ddp_model.state_dict(), './checkpoint/{}GPU_{}epoch.checkpoint'.format(size, epoch+1))
 
 if rank == 0:
 print(">>> Training complete in: " + str(datetime.now() - start))
 
 
 if __name__ == '__main__':
 
 main()
ログイン後にコピー

コードは、データとモデルを複数の GPU に分割し、分散方式でモデルを更新します。コードの説明をいくつか示します。

train(args, ddp_arg) には、args と ddp_arg という 2 つのパラメータがあります。args はスクリプトに渡されるコマンド ライン パラメータで、ddp_arg には分散トレーニング関連のパラメータが含まれます。

rank、local_rank、size、MASTER_ADDR、port、NODE_ID = ddp_arg: ddp_arg 内の分散トレーニング関連パラメーターを解凍します。

ランクが 0 の場合、現在使用されている GPU の数とマスター ノードの IP アドレス情報を出力します。

dist.init_process_group(backend='nccl', init_method='tcp://{}:{}'.format(MASTER_ADDR, port), world_size=size, Rank=rank): NCCL バックエンドを使用します。分散プロセスグループ。

torch.cuda.set_device(local_rank): このプロセスに指定された GPU を選択します。

model = torchvision.models.ResNet50 (pretrained=False).to(gpu): torchvision モデルから ResNet50 モデルをロードし、指定された GPU に移動します。

ddp_model = DistributedDataParallel(model, device_ids=[local_rank]): モデルを DistributedDataParallel モジュールでラップします。これは、分散トレーニングを実行できることを意味します

CIFAR-10 データのロード データの収集と適用拡張変換。

train_sampler=torch.utils.data.distributed.DistributedSampler(train_dataset,num_replicas=size,rank=rank): DistributedSampler オブジェクトを作成して、データ セットを複数の GPU に分割します。

train_loader =torch.utils.data.DataLoader(dataset=train_dataset,batch_size=batch_size_per_gpu,shuffle=False,num_workers=0,pin_memory=True,sampler=train_sampler): DataLoader オブジェクトを作成すると、データは次のようになります。モデルでは、分散データ サンプリング DistributedSampler が追加されることを除いて、これは通常のトレーニング ステップと一致します。

指定されたエポック数の間モデルをトレーニングし、optimizer.step() を使用して分散方式で重みを更新します。

rank0 は、各ラウンドの終了時にチェックポイントを保存します。

rank0 は、10 バッチごとの損失とトレーニング時間を示します。

トレーニングの終了時には、トレーニング モデルの印刷に費やされた合計時間もランク 0 になります。

コードテスト

トレーニングは、1/2/3/4 GPU を備えた 1 ノード、6/8 GPU を備えた 2 ノード、および 3/4 GPU を備えた各ノードを使用して実施されました。 Cifar10 でのテストを以下の図に示しますが、各テストのバッチ サイズは変わりません。各テストの完了にかかった時間を秒単位で記録しました。使用する GPU の数が増えると、テストの完了に必要な時間は短くなります。 8 つの GPU を使用した場合、完了までに 320 秒かかりました。これは記録された最速の時間です。これは確かですが、Resnet50 が比較的小規模なモデルであり、並列トレーニングを必要としないため、GPU の数が増加してもトレーニング速度が直線的に増加しないことがわかります。

PyTorch 並列トレーニング DistributedDataParallel の完全なコード例

複数の GPU でデータ並列処理を使用すると、特定のデータセットでディープ ニューラル ネットワーク (DNN) をトレーニングするのに必要な時間を大幅に短縮できます。 GPU の数が増えると、トレーニング プロセスを完了するのに必要な時間が短縮され、DNN をより効率的に並行してトレーニングできることがわかります。

このアプローチは、大規模なデータ セットまたは複雑な DNN アーキテクチャを扱う場合に特に役立ちます。複数の GPU を活用することにより、トレーニング プロセスが高速化され、より高速なモデルの反復と実験が可能になります。ただし、データ並列処理によって達成されるパフォーマンスの向上は、通信オーバーヘッドや GPU メモリの制限などの要因によって制限される可能性があり、最良の結果を得るには慎重な調整が必要であることに注意してください。

以上がPyTorch 並列トレーニング DistributedDataParallel の完全なコード例の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

ソース:51cto.com
このウェブサイトの声明
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。
最新の問題
人気のチュートリアル
詳細>
最新のダウンロード
詳細>
ウェブエフェクト
公式サイト
サイト素材
フロントエンドテンプレート