Pipeline Parallelism in PyTorch

PyTorch’s PiPPy library complementary quickstart

Matías Battocchia
4 min readSep 1, 2023

As LLMs keep growing in size I started wondering about the possibility of deploying a model across many machines. While training is done in a distributed way, inference rarely goes beyond one machine as it has much less requirements to serve a model.

When the model is big, many of us would request a single-GPU machine with enough resources to serve the model. As a rule of thumb, you need at least twice GPU space for inference than the number of the model parameters. For example, LLaMA 13B has 13 giga-parameters (a giga is a billion), hence it requires 26 GB of GPU. Using float precision, it is 2 bytes per parameter, that is why. Choose a NVIDIA A100 40GB and we are good to go.

If the model is too big, then a tool like Hugging Face Accelerate can help us to run the model on a single machine with multiple GPUs. You are able to attach up to 8 devices to one machine in most cloud providers.

Then there are projects like Alpa that enable the deployment of really big models like OPT 175B and BLOOM 176B on multi-machine multi-device clusters. However, there is a catch: the model needs to be supported by the project.

Doing further research I came across PyTorch’s PiPPy project. It features automatic splitting of model code. It means that you do not have to make modifications to the model code to make parallelism work; that is great news because it potentially supports any model.

Pipeline parallelism consists on sequentially distributing the model across multiple GPU devices and/or machines. Image by author.

While consulting PiPPy docs and source code, I did the following exercise in order to grasp elemental insights from this tool and pipeline parallelism in general:

Run a model across two Docker containers

The idea is to deploy heavyweight models but for the sake of simplicity, let’s use this model.

# example.py

import os
import pippy
from torch.distributed import rpc
from torch import nn

class Net(nn.Module):
def __init__(self):
super().__init__()
self.fc1 = nn.Linear(128, 128)
self.fc2 = nn.Linear(128, 8)

def forward(self, x):
x = self.fc1(x)
x = nn.functional.relu(x)
x = self.fc2(x)

return x

net = Net()
net.eval()

Let’s assume that we have a bunch of machines, all on the same network. The same script runs on all the nodes, there are a head node and worker nodes. Worker nodes should be able to reach the head node at a host:port. Each node has a rank number and the total count of nodes is the world.

# example.py (continuation)

RANK = int(os.environ["RANK"])
WORLD = int(os.environ["WORLD"])
HOST = os.environ["HOST"]
PORT = os.environ["PORT"]
print(f"My rank is {RANK}")


# first thing to do is to init RCP
print("Waiting for all the nodes...")
rpc.init_rpc(
f"worker{RANK}", # just an identifier
rank=RANK,
world_size=WORLD,
rpc_backend_options=rpc.TensorPipeRpcBackendOptions(
num_worker_threads=8,
rpc_timeout=10, # seconds
init_method=f"tcp://{HOST}:{PORT}", # head node's address and port
)
)

# split the model, each process materializes its pipeline stage
driver, stage = pippy.all_compile(
net,
num_ranks=WORLD,
num_chunks=WORLD, # microbatching
schedule="FillDrain", # feed chunks through the pipeline sequentially
split_policy=pippy.split_into_equal_size(WORLD), # split the model into specified number of equal-size stages
)
print(stage)

if rank == 0:
x = torch.randn(4, 128)
y = driver(x) # only rank 0 is able the call the pipeline's driver
print(y)

rpc.shutdown()
print("Bye!")

In a terminal:

$ docker build -t example .
$ docker network create rpc
$ docker run -e RANK=0 -e WORLD=2 -e HOST=head -e PORT=3000 \
--net rpc --name head --rm -it example

Notice how the HOST address equals the container’s name.

In another terminal:

$ docker run -e RANK=1 -e WORLD=2 -e HOST=head -e PORT=3000 \
--net rpc --name worker --rm -it example

This container is named worker, HOST points to the rank 0 container head.

Head output, after all processes join.

My rank is 0
Waiting for all nodes...

PipeStageModule(
(fc1): Linear(in_features=128, out_features=128, bias=True)
)

def forward(self, x):
fc1 = self.fc1(x); x = None
return fc1

tensor([[-0.1043, ..., -0.0093],
...
[-0.1566, ..., -0.0765]])
Bye!

Worker output.

My rank is 1
Waiting for all nodes...

PipeStageModule(
(fc2): Linear(in_features=128, out_features=8, bias=True)
)

def forward(self, fc1):
relu = nn.functional.relu(fc1, inplace = False); fc1 = None
fc2 = self.fc2(relu); relu = None
return fc2

Bye!

Mission accomplished 🌟. This simple model was automatically split in two submodules and then pipelined across separate environments.

Next steps in this topic are

  • use GPU,
  • more than one GPU device per machine,
  • deploy on local Kubernetes,
  • deploy on Google Cloud GKE,
  • manage sizeable models.
# requirements.txt - CPU-only PyTorch

packaging # missing dependency from PiPPy
numpy
--index-url https://download.pytorch.org/whl/cpu
torch
# Dockerfile

FROM python:3

WORKDIR /usr/src/app

COPY requirements.txt ./
RUN pip install --no-cache-dir -r requirements.txt

RUN git clone https://github.com/pytorch/PiPPy.git \
&& cd PiPPy \
&& python setup.py install \
&& cd ..

COPY . .

ENTRYPOINT ["python", "example.py"]

--

--

Matías Battocchia
Matías Battocchia

Written by Matías Battocchia

I studied at Universidad de Buenos Aires. I live in Mendoza, Argentina. Interests: data, NLP, blockchain.

No responses yet