Pipeline Parallelism in PyTorch
PyTorch’s PiPPy library complementary quickstart
As LLMs keep growing in size I started wondering about the possibility of deploying a model across many machines. While training is done in a distributed way, inference rarely goes beyond one machine as it has much less requirements to serve a model.
When the model is big, many of us would request a single-GPU machine with enough resources to serve the model. As a rule of thumb, you need at least twice GPU space for inference than the number of the model parameters. For example, LLaMA 13B has 13 giga-parameters (a giga is a billion), hence it requires 26 GB of GPU. Using float precision, it is 2 bytes per parameter, that is why. Choose a NVIDIA A100 40GB and we are good to go.
If the model is too big, then a tool like Hugging Face Accelerate can help us to run the model on a single machine with multiple GPUs. You are able to attach up to 8 devices to one machine in most cloud providers.
Then there are projects like Alpa that enable the deployment of really big models like OPT 175B and BLOOM 176B on multi-machine multi-device clusters. However, there is a catch: the model needs to be supported by the project.
Doing further research I came across PyTorch’s PiPPy project. It features automatic splitting of model code. It means that you do not have to make modifications to the model code to make parallelism work; that is great news because it potentially supports any model.
While consulting PiPPy docs and source code, I did the following exercise in order to grasp elemental insights from this tool and pipeline parallelism in general:
Run a model across two Docker containers
The idea is to deploy heavyweight models but for the sake of simplicity, let’s use this model.
# example.py
import os
import pippy
from torch.distributed import rpc
from torch import nn
class Net(nn.Module):
def __init__(self):
super().__init__()
self.fc1 = nn.Linear(128, 128)
self.fc2 = nn.Linear(128, 8)
def forward(self, x):
x = self.fc1(x)
x = nn.functional.relu(x)
x = self.fc2(x)
return x
net = Net()
net.eval()
Let’s assume that we have a bunch of machines, all on the same network. The same script runs on all the nodes, there are a head node and worker nodes. Worker nodes should be able to reach the head node at a host:port. Each node has a rank number and the total count of nodes is the world.
# example.py (continuation)
RANK = int(os.environ["RANK"])
WORLD = int(os.environ["WORLD"])
HOST = os.environ["HOST"]
PORT = os.environ["PORT"]
print(f"My rank is {RANK}")
# first thing to do is to init RCP
print("Waiting for all the nodes...")
rpc.init_rpc(
f"worker{RANK}", # just an identifier
rank=RANK,
world_size=WORLD,
rpc_backend_options=rpc.TensorPipeRpcBackendOptions(
num_worker_threads=8,
rpc_timeout=10, # seconds
init_method=f"tcp://{HOST}:{PORT}", # head node's address and port
)
)
# split the model, each process materializes its pipeline stage
driver, stage = pippy.all_compile(
net,
num_ranks=WORLD,
num_chunks=WORLD, # microbatching
schedule="FillDrain", # feed chunks through the pipeline sequentially
split_policy=pippy.split_into_equal_size(WORLD), # split the model into specified number of equal-size stages
)
print(stage)
if rank == 0:
x = torch.randn(4, 128)
y = driver(x) # only rank 0 is able the call the pipeline's driver
print(y)
rpc.shutdown()
print("Bye!")
In a terminal:
$ docker build -t example .
$ docker network create rpc
$ docker run -e RANK=0 -e WORLD=2 -e HOST=head -e PORT=3000 \
--net rpc --name head --rm -it example
Notice how the HOST address equals the container’s name.
In another terminal:
$ docker run -e RANK=1 -e WORLD=2 -e HOST=head -e PORT=3000 \
--net rpc --name worker --rm -it example
This container is named worker, HOST points to the rank 0 container head.
Head output, after all processes join.
My rank is 0
Waiting for all nodes...
PipeStageModule(
(fc1): Linear(in_features=128, out_features=128, bias=True)
)
def forward(self, x):
fc1 = self.fc1(x); x = None
return fc1
tensor([[-0.1043, ..., -0.0093],
...
[-0.1566, ..., -0.0765]])
Bye!
Worker output.
My rank is 1
Waiting for all nodes...
PipeStageModule(
(fc2): Linear(in_features=128, out_features=8, bias=True)
)
def forward(self, fc1):
relu = nn.functional.relu(fc1, inplace = False); fc1 = None
fc2 = self.fc2(relu); relu = None
return fc2
Bye!
Mission accomplished 🌟. This simple model was automatically split in two submodules and then pipelined across separate environments.
Next steps in this topic are
- use GPU,
- more than one GPU device per machine,
- deploy on local Kubernetes,
- deploy on Google Cloud GKE,
- manage sizeable models.
# requirements.txt - CPU-only PyTorch
packaging # missing dependency from PiPPy
numpy
--index-url https://download.pytorch.org/whl/cpu
torch
# Dockerfile
FROM python:3
WORKDIR /usr/src/app
COPY requirements.txt ./
RUN pip install --no-cache-dir -r requirements.txt
RUN git clone https://github.com/pytorch/PiPPy.git \
&& cd PiPPy \
&& python setup.py install \
&& cd ..
COPY . .
ENTRYPOINT ["python", "example.py"]