torch.nn.parallel

Data parallelism is a way to process multiple data batches across multiple devices simultaneously to achieve better performance. In PyTorch, the DistributedSampler ensures each device gets a non-overlapping input batch. The model is replicated on all the devices; each replica calculates gradients and simultaneously synchronizes with the others using the ring all-reduce algorithm(梯度在設備間環狀傳遞、求和、更新).

DataParallel(DP) vs. Distributed Data Parallel(DDP)

DataParallel DistributedDataParallel
More overhead; model is replicated and destroyed at each forward pass Model is replicated only once
Only supports single-node parallelism Supports scaling to multiple machines
Slower; uses multithreading on a single process and runs into Global Interpreter Lock (GIL) contention Faster (no GIL contention) because it uses multiprocessing
特性 Multithreading (多线程) Multiprocessing (多进程)
适用场景 I/O密集型任务(如文件操作、网络请求等) CPU密集型任务(如科学计算、数据处理等)
执行方式 线程共享同一内存空间,多线程在同一进程内执行 每个进程拥有独立内存空间,多个进程并行执行
性能 由于 GIL,不能真正并行执行 CPU 密集型任务 能充分利用多核 CPU,适合 CPU 密集型任务
内存开销 内存共享,开销较小 每个进程拥有独立内存,内存消耗较大
创建和销毁开销 线程较轻量,创建和销毁开销较小 进程较重,创建和销毁的开销较大
进程/线程间通信 线程间共享内存,通信高效但可能存在线程安全问题 进程间需要使用 IPC(如队列、管道),通信较复杂
调试难度 难度较大,尤其是在多线程竞争和死锁问题上 相对较简单,进程独立,错误隔离更好

並發vs.並行

Ring-Allreduce

https://andrew.gibiansky.com/blog/machine-learning/baidu-allreduce/
上面這邊博客比PyTorch DDP官方教程裡提供的講得清楚很多。

算法名

所謂Ring是指將GPU構成邏輯環,每個GPU只跟兩個GPUs連接,一個left neighbour一個right neighbour,chunk只會從左邊接收發給右邊。

所謂Allreduce是指相比1個reducer而言,所有的GPUs都作為聚合者reducer。

Array求和操作GPU應遠快過CPU,所以如果5個GPUs會這樣分配

所有GPUs都參與聚合操作

算法

Ring-Allreduce以計算一個求和任務為例(exactly what we need in backprop),有Scatter Reduce-Allgather兩個階段:

Partitioning of an array into N chunks

  1. 首先假如有5個GPUs,所有GPU上的array就都被分成5個chunks。
  2. Scatter Reduce 階段每個第 N 個 GPU 都會發生 N-1 個 iterations,每次迭代的操作是求和:
    1. 1st iteration 發送第 n 個 chunk,接收第 n-1 個 chunk;
    2. 之後的所有 iterations 裡都是發送剛接收到的 chunk。
  3. Allgather 階段每個第 N 個 GPU 也都會發生 N-1 個 iterations,每次迭代只是用接收的 chunk 覆蓋原本 chunk:
    1. 1st iteration 發送第 n+1 個 chunk,接收第 n 個 chunk;
    2. 之後的所有 iterations 裡都是發送剛接收到的 chunk。(同上)
Scatter-reduce data transfers (iteration 1) Scatter-reduce data transfers (iteration 2) Scatter-reduce data transfers (iteration 3) Scatter-reduce data transfers (iteration 4) Final state after all scatter-reduce transfers

每個 GPU 上有一個求和結束的 chunk,每個 GPU 都作為 reducer 完成了工作。

Allgather data transfers (iteration 1) Allgather data transfers (iteration 2) Allgather data transfers (iteration 3) Allgather data transfers (iteration 4) Final state after all allgather transfers

Bandwidth vs. Latency:
Bandwidth帶寬是每秒通過A點的數據量;
Latency延遲是一個數據從A點到B點需要的時間。

帶寬遠大於延遲時 Ring-Allreduce 這種與 GPU 數量無關的算法會很快,上面第一種方法因為壓力會給到帶寬(reducer要挨個給每個GPU返回幾個G的梯度數據更新參數)所以耗時跟 GPU 數量是線性增長關係。

MapReduce的reduce

reduce算法中reduce的意思不是減少,而是“合併”(併是動詞,並是其他副詞連詞詞性)、“聚合”。

假如用MapReduce算法做統計文本的單詞頻率這個任務,以“hello world hello”為例,有Map-Shuffle-Reduce三個階段:

  1. Map:文本分割成單詞,構造鍵值對。
    ("hello", 1)
    ("world", 1)
    ("hello", 1)
  2. Shuffle:Map的輸出會經過Shuffle把Key一樣的值聚集到一起。
    ("hello", [1, 1])
    ("world", [1])
  3. Reduce:對Shuffle的結果聚合,這個任務是求和,有的任務是統計Key出現的次數,有的是求Value最大最小值,有的是把Value合併成一個數據結構。
    ("hello", 2)
    ("world", 1)

DDP

  1. 構建 Process Group(進程組,DDP是multiprocessing,DP是multithreading)。
    def ddp_setup(rank: int, world_size: int):
    """
    Args:
    rank: Unique identifier of each process
    world_size: Total number of processes
    """
    os.environ["MASTER_ADDR"] = "localhost"
    os.environ["MASTER_PORT"] = "12355"
    torch.cuda.set_device(rank) # sets the default GPU for each process.
    # 分佈式進程組 distributed process group 包含mp.spawn出來的所有能交流和同步的進程
    torch.distributed.init_process_group(backend="nccl", rank=rank, world_size=world_size) # 默認用 TCP 協議初始化 distributed process group,backend 的選擇上,GPU分佈式訓練用NCCL,CPU分佈式訓練用Gloo
  2. 構建 DDP model。
    self.model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[gpu_id])
  3. Distributing Input Data(分發輸入數據)。
    train_data = torch.utils.data.DataLoader(
    dataset=train_dataset,
    batch_size=32,
    shuffle=False, # We don't shuffle
    sampler=torch.utils.data.distributed.DistributedSampler(train_dataset), # 為所有 distributed process 隨機採樣不重複的數據,GPUs/Processe 之間不重複,共有 32 * nprocs 個有效 samples,如果用 torch.distributed,DistributedSampler 會自動獲取 rank 所以不用顯式指定?
    )
    def _run_epoch(self, epoch):
    b_sz = len(next(iter(self.train_data))[0])
    self.train_data.sampler.set_epoch(epoch) # 每個 epoch 重新打亂一次,否則每個 epoch 隨機抽樣順序一樣每個 process 可能拿到同樣的數據
    for source, targets in self.train_data:
    ...
    self._run_batch(source, targets)
  4. 保存模型 checkpoints。
    - ckp = self.model.state_dict()
    + ckp = self.model.module.state_dict()
    ...
    ...
    - if epoch % self.save_every == 0:
    + if self.gpu_id == 0 and epoch % self.save_every == 0: # 只有rank0主進程保存ckp,因為根據上面的ring-allreduce算法每個gpu上的參數相同,集合調用 Collective calls 要在這之前是什麼意思?
    self._save_checkpoint(epoch)
  5. 運行分佈式訓練(distributed training)。
    - def main(device, total_epochs, save_every):
    + def main(rank, world_size, total_epochs, save_every):
    + ddp_setup(rank, world_size)
    dataset, model, optimizer = load_train_objs()
    train_data = prepare_dataloader(dataset, batch_size=32)
    - trainer = Trainer(model, train_data, optimizer, device, save_every)
    + trainer = Trainer(model, train_data, optimizer, rank, save_every) # 原來是 device,現在是 rank
    trainer.train(total_epochs)
    + torch.distributed.destroy_process_group() # 結束訓練要釋放 process group

    if __name__ == "__main__":
    import sys
    total_epochs = int(sys.argv[1])
    save_every = int(sys.argv[2])
    - device = 0 # shorthand for cuda:0
    - main(device, total_epochs, save_every)
    + world_size = torch.cuda.device_count() # world_size 不一定等於 gpus 數?
    # torch.multiprocessing is a PyTorch wrapper around Python’s native multiprocessing
    + torch.multiprocessing.mp.spawn(main, args=(world_size, total_epochs, save_every,), nprocs=world_size) # mp.spawn 調用時會由 DDP 自動分配 rank 給 process

在今年的結尾想玩好大戰略遊戲

wikipedia定義裡正好就是我在玩的鋼鐵雄心、歐陸風雲、全面戰爭,Verdun能真實代入這三款遊戲裡的兵卒。

生產速度下降,下崗工人增加,社會就會不穩定。銀行貸款要增加是很容易的,無非就是多發票子,但後果嚴重得不得了。同志們,所有的部長都知道在銀行貸款問題上,我是摳得最緊的,因為我知道這裡面風險最大。銀行貸款一撒開後果不得了,你別看現在是通貨緊縮,要變通貨膨脹也容易啊。因此,只有通過發國債來增加投資。當然,發國債説到底也是利用銀行貸款,也是利用銀行裡閒置的存款,但我認為這樣做沒有什麼風險。——〈朱鎔基在一九九九年中央經濟工作會議上的講話〉

评论

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×