Note: If you want more demos showcasing what you can do with PySyft, you can follow @theoryffel and @openminedorg on Twitter. Thanks for all the feedback!

Many works in Privacy-Preserving ML currently explore Federated Learning and Differential Privacy, but it happens to be difficult to use them together and there are very little open-source examples on how to use them.

We present here a very simple example of combining Federated Learning (FL) with Differential Privacy (DP), which can be an interesting baseline to experiment with these great technologies. More specifically, we show how the DP Opacus library released by PyTorch can be used in PySyft FL workflows with very little overhead.

Disclaimer: there are tons of way to improve this, and I highly encourage you to do so if you want to get your hands dirty!

Learn More: If you want to learn more about DP, I have put a list of OpenMined blog articles at the end of this one which explain these notions more in depth =)

Setup

We are showcasing differentially private and federated training of a simple convolutional model over MNIST. The model used is the same as in Kritika's recent blog post.

We consider here that the dataset is partitioned across two workers, which train their model over their own partition in a differentially private way for 1 epoch before aggregating by doing the average of the two models, during 10 epochs. There are millions of way to improve this of course, including by splitting the dataset in an heterogeneous way, adding secure aggregation and more.

Imports

We do the classic imports for PyTorch + the PrivacyEngine engine from Opacus that we will be using.

from tqdm import tqdm

import torch as th
from torchvision import datasets, transforms
from opacus import PrivacyEngine 

Next come the PySyft imports, with our two workers alice & bob!

import syft as sy

hook = sy.TorchHook(th)
alice = sy.VirtualWorker(hook, id="alice")
bob = sy.VirtualWorker(hook, id="bob")
workers = [alice, bob]

# this is done to have the local worker (you on your notebook!) have a registry
# of objects like every other workers, which is disabled by default but needed here
sy.local_worker.is_client_worker = False

Federated setup

We will now simulate that the workers hold a partition of the dataset, by actually sending it to them, which is done using the .federate method. In a real world setting, all workers would come with their own data and we would request a pointer to this data.

train_datasets = datasets.MNIST('../mnist',
                 train=True, download=True,
                 transform=transforms.Compose([transforms.ToTensor(),
                 transforms.Normalize((0.1307,), (0.3081,)),])
                 ).federate(*workers)

Next, we create a copy of the model for each worker, and we send them those models. We also send together a dedicated optimizer and an Opacus privacy engine that we attach to the optimizer, which will make the training differentially private on each worker and will track the privacy budget spent. Note that we could set different privacy requirements for each partition of the dataset!

We also create a local_model which we will use for model aggregation, as you will see right now.

def make_model():
    return th.nn.Sequential(
        th.nn.Conv2d(1, 16, 8, 2, padding=3),
        th.nn.ReLU(),
        th.nn.MaxPool2d(2, 1),
        th.nn.Conv2d(16, 32, 4, 2),
        th.nn.ReLU(),
        th.nn.MaxPool2d(2, 1),
        th.nn.Flatten(), 
        th.nn.Linear(32 * 4 * 4, 32),
        th.nn.ReLU(),
        th.nn.Linear(32, 10)
    )

# the local version that we will use to do the aggregation
local_model = make_model()

models, dataloaders, optimizers, privacy_engines = [], [], [], []
for worker in workers:
    model = make_model()
    optimizer = th.optim.SGD(model.parameters(), lr=0.1)
    model.send(worker)
    dataset = train_datasets[worker.id]
    dataloader = th.utils.data.DataLoader(dataset, batch_size=128, shuffle=True, drop_last=True)
    privacy_engine = PrivacyEngine(model,
                                   batch_size=128, 
                                   sample_size=len(dataset), 
                                   alphas=range(2,32), 
                                   noise_multiplier=1.2,
                                   max_grad_norm=1.0)
    privacy_engine.attach(optimizer)
    
    models.append(model)
    dataloaders.append(dataloader)
    optimizers.append(optimizer)
    privacy_engines.append(privacy_engine)

Last, we need the functionalities for aggregating the remote models and sending the new updates. We have split those in two functions. send_new_models sends the version of the local_model to all parties, while federated_aggregation performs an aggregation of all remote models and stores the new version in the local_model. Note that we could simply improve it by doing a weighted average depending of the size of each dataset, but here the split is homogeneous so there is no need.

def send_new_models(local_model, models):
    with th.no_grad():
        for remote_model in models:
            for new_param, remote_param in zip(local_model.parameters(), remote_model.parameters()):
                worker = remote_param.location
                remote_value = new_param.send(worker)
                remote_param.set_(remote_value)

            
def federated_aggregation(local_model, models):
    with th.no_grad():
        for local_param, *remote_params in zip(*([local_model.parameters()] + [model.parameters() for model in models])):
            param_stack = th.zeros(*remote_params[0].shape)
            for remote_param in remote_params:
                param_stack += remote_param.copy().get()
            param_stack /= len(remote_params)
            local_param.set_(param_stack)

Training

Now comes the training! It's performed in 3 steps: first, we send the last version of the model to each workers. Second, we train remotely the models for one epoch and extract the privacy spent. Note that this loop on the workers could be done in parallel instead of sequentially. And last, the aggregate the models together.

def train(epoch, delta):
        
    # 1. Send new version of the model
    send_new_models(local_model, models)

    # 2. Train remotely the models
    for i, worker in enumerate(workers):
        dataloader = dataloaders[i]
        model = models[i]
        optimizer = optimizers[i]
        
        model.train()
        criterion = th.nn.CrossEntropyLoss()
        losses = []   
        for i, (data, target) in enumerate(tqdm(dataloader)):
            optimizer.zero_grad()
            output = model(data)
            loss = criterion(output, target)
            loss.backward()
            optimizer.step()
            losses.append(loss.get().item()) 

        sy.local_worker.clear_objects()
        epsilon, best_alpha = optimizer.privacy_engine.get_privacy_spent(delta) 
        print(
            f"[{worker.id}]\t"
            f"Train Epoch: {epoch} \t"
            f"Loss: {sum(losses)/len(losses):.4f} "
            f"(ε = {epsilon:.2f}, δ = {delta}) for α = {best_alpha}")

    # 3. Federated aggregation of the updated models
    federated_aggregation(local_model, models)
for epoch in range(5):
    train(epoch, delta=1e-5)
100%|██████████| 235/235 [00:49<00:00,  4.76it/s]
[alice]	Train Epoch: 0 	Loss: 0.6405 (ε = 0.86, δ = 1e-05) for α = 15
100%|██████████| 235/235 [00:48<00:00,  4.86it/s]
[bob]	Train Epoch: 0 	Loss: 0.5508 (ε = 0.86, δ = 1e-05) for α = 15
100%|██████████| 235/235 [00:47<00:00,  4.93it/s]
[alice]	Train Epoch: 1 	Loss: 0.1169 (ε = 0.90, δ = 1e-05) for α = 15
100%|██████████| 235/235 [00:47<00:00,  4.91it/s]
[bob]	Train Epoch: 1 	Loss: 0.1080 (ε = 0.90, δ = 1e-05) for α = 15
100%|██████████| 235/235 [00:47<00:00,  4.98it/s]
[alice]	Train Epoch: 2 	Loss: 0.0792 (ε = 0.94, δ = 1e-05) for α = 15
100%|██████████| 235/235 [00:46<00:00,  5.09it/s]
[bob]	Train Epoch: 2 	Loss: 0.0776 (ε = 0.94, δ = 1e-05) for α = 15
100%|██████████| 235/235 [00:59<00:00,  3.96it/s]
[alice]	Train Epoch: 3 	Loss: 0.0619 (ε = 0.97, δ = 1e-05) for α = 15
100%|██████████| 235/235 [00:49<00:00,  4.70it/s]
[bob]	Train Epoch: 3 	Loss: 0.0632 (ε = 0.97, δ = 1e-05) for α = 15
100%|██████████| 235/235 [00:48<00:00,  4.89it/s]
[alice]	Train Epoch: 4 	Loss: 0.0521 (ε = 1.01, δ = 1e-05) for α = 15
100%|██████████| 235/235 [00:46<00:00,  5.07it/s]
[bob]	Train Epoch: 4 	Loss: 0.0510 (ε = 1.01, δ = 1e-05) for α = 15

And you can observe that the loss indeed decreases!

That's all you need to know, now feel free to experiment by yourself and improve this demo!

Differential Privacy Articles in OpenMined Blog

THEORY & EXAMPLES

CODE

Star PySyft on GitHub

You can also help our community by starring the repositories! This helps raise awareness of the cool tools we're building.

Join our Slack!

The best way to keep up to date on the latest advancements is to join our community!

Let's put it together

Here is the full code 🙂

from tqdm import tqdm

import torch as th
from torchvision import datasets, transforms
from opacus import PrivacyEngine 
import syft as sy

hook = sy.TorchHook(th)
alice = sy.VirtualWorker(hook, id="alice")
bob = sy.VirtualWorker(hook, id="bob")
workers = [alice, bob]

sy.local_worker.is_client_worker = False

train_datasets = datasets.MNIST('../mnist',
                 train=True, download=True,
                 transform=transforms.Compose([transforms.ToTensor(),
                 transforms.Normalize((0.1307,), (0.3081,)),])
                 ).federate(*workers)

def make_model():
    return th.nn.Sequential(
        th.nn.Conv2d(1, 16, 8, 2, padding=3),
        th.nn.ReLU(),
        th.nn.MaxPool2d(2, 1),
        th.nn.Conv2d(16, 32, 4, 2),
        th.nn.ReLU(),
        th.nn.MaxPool2d(2, 1),
        th.nn.Flatten(), 
        th.nn.Linear(32 * 4 * 4, 32),
        th.nn.ReLU(),
        th.nn.Linear(32, 10)
    )

# the local version that we will use to do the aggregation
local_model = make_model()

models, dataloaders, optimizers, privacy_engines = [], [], [], []
for worker in workers:
    model = make_model()
    optimizer = th.optim.SGD(model.parameters(), lr=0.1)
    model.send(worker)
    dataset = train_datasets[worker.id]
    dataloader = th.utils.data.DataLoader(dataset, batch_size=128, shuffle=True, drop_last=True)
    privacy_engine = PrivacyEngine(model,
                                   batch_size=128, 
                                   sample_size=len(dataset), 
                                   alphas=range(2,32), 
                                   noise_multiplier=1.2,
                                   max_grad_norm=1.0)
    privacy_engine.attach(optimizer)
    
    models.append(model)
    dataloaders.append(dataloader)
    optimizers.append(optimizer)
    privacy_engines.append(privacy_engine)
    
def send_new_models(local_model, models):
    with th.no_grad():
        for remote_model in models:
            for new_param, remote_param in zip(local_model.parameters(), remote_model.parameters()):
                worker = remote_param.location
                remote_value = new_param.send(worker)
                remote_param.set_(remote_value)

            
def federated_aggregation(local_model, models):
    with th.no_grad():
        for local_param, *remote_params in zip(*([local_model.parameters()] + [model.parameters() for model in models])):
            param_stack = th.zeros(*remote_params[0].shape)
            for remote_param in remote_params:
                param_stack += remote_param.copy().get()
            param_stack /= len(remote_params)
            local_param.set_(param_stack)

def train(epoch, delta):
        
    # 1. Send new version of the model
    send_new_models(local_model, models)

    # 2. Train remotely the models
    for i, worker in enumerate(workers):
        dataloader = dataloaders[i]
        model = models[i]
        optimizer = optimizers[i]
        
        model.train()
        criterion = th.nn.CrossEntropyLoss()
        losses = []   
        for i, (data, target) in enumerate(tqdm(dataloader)):
            optimizer.zero_grad()
            output = model(data)
            loss = criterion(output, target)
            loss.backward()
            optimizer.step()
            losses.append(loss.get().item()) 

        sy.local_worker.clear_objects()
        epsilon, best_alpha = optimizer.privacy_engine.get_privacy_spent(delta) 
        print(
            f"[{worker.id}]\t"
            f"Train Epoch: {epoch} \t"
            f"Loss: {sum(losses)/len(losses):.4f} "
            f"(ε = {epsilon:.2f}, δ = {delta}) for α = {best_alpha}")

    # 3. Federated aggregation of the updated models
    federated_aggregation(local_model, models)

for epoch in range(5):
    train(epoch, delta=1e-5)

A big thanks to Kritika Prakash for proof reading this blog post =)