PyTorch aggregation methods in Fed-BioMed¶
Difficulty level: advanced
Introduction¶
This tutorial focuses on how to deal with heteregenous dataset by changing its Aggegator
. Fed-BioMed provides different methods for Aggregation. Selecting an appropriate Aggregation method can be critical when being confronted to unbalanced /heterogenous datasets.
Aggregators
provides a way to merge local models sent by Nodes
into a global, more generalized model. Please note that designing Nodes
sampling Strategies
could also help when working on heterogenous datasets.
For more information about Aggregators
object in Fed-BioMed, and on how to create your own Aggregator
; please see Aggregators
in the User Guide
Before you start¶
For this tutorial, we will be using heterogenous Fed-IXI dataset, provided by FLamby. FLamby comes with a few medical datasets that have heterogenous data properties. Please have a look at the notebooks on how to use FLamby in Fed-BioMed tutorials before starting.
1. Defining an Experiment
using FedAverage
Aggregator
¶
First, let's re-use the TorchTrainingPlan
that is defined in the FLamby tutorials. FedAveraging has been introduced by McMahan et al. as the first aggregation method in the Federated Learning litterature. It does the weighted sum of all Nodes
local models parameters in order to obtain a global model:
In this tutorial, we will keep the same TrainingPlan
(and thus the same model) for all the Experimentations
, we will be changing only Aggregators
from fedbiomed.common.training_plans import TorchTrainingPlan
from flamby.datasets.fed_ixi import Baseline, BaselineLoss, Optimizer
from fedbiomed.common.data import FlambyDataset, DataManager
class MyTrainingPlan(TorchTrainingPlan):
def init_model(self, model_args):
return Baseline()
def init_optimizer(self, optimizer_args):
return Optimizer(self.model().parameters(), lr=optimizer_args["lr"])
def init_dependencies(self):
return ["from flamby.datasets.fed_ixi import Baseline, BaselineLoss, Optimizer",
"from fedbiomed.common.data import FlambyDataset, DataManager"]
def training_step(self, data, target):
output = self.model().forward(data)
return BaselineLoss().forward(output, target)
def training_data(self, batch_size=2):
dataset = FlambyDataset()
loader_arguments = {'batch_size': batch_size, 'shuffle': True}
return DataManager(dataset, **loader_arguments)
We define hereafter parameters for Experiment
to be used with vanilla FedAverage
model_args = {}
training_args = {
'batch_size': 8,
'optimizer_args': {
"lr" : 1e-3
},
'epochs': 1,
'dry_run': False,
'num_updates': 50
}
Activate Tensorboard
%load_ext tensorboard
from fedbiomed.researcher.environ import environ
import os
fedavg_tensorboard_dir = os.path.join(environ['ROOT_DIR'], 'fedavg_runs')
os.makedirs(fedavg_tensorboard_dir, exist_ok=True)
environ['TENSORBOARD_RESULTS_DIR'] = fedavg_tensorboard_dir
tensorboard --logdir "$fedavg_tensorboard_dir"
We then import FedAverage
Aggregator
from Fed-BioMed's Aggregators
from fedbiomed.researcher.experiment import Experiment
from fedbiomed.researcher.aggregators import FedAverage
from fedbiomed.researcher.strategies.default_strategy import DefaultStrategy
tags = ['flixi']
exp_fed_avg = Experiment()
exp_fed_avg.set_training_plan_class(training_plan_class=MyTrainingPlan)
exp_fed_avg.set_model_args(model_args=model_args)
exp_fed_avg.set_training_args(training_args=training_args)
exp_fed_avg.set_tags(tags = tags)
exp_fed_avg.set_aggregator(aggregator=FedAverage())
exp_fed_avg.set_round_limit(rounds)
exp_fed_avg.set_training_data(training_data=None, from_tags=True)
exp_fed_avg.set_job()
exp_fed_avg.set_strategy(node_selection_strategy=DefaultStrategy)
exp_fed_avg.set_tensorboard(True)
exp_fed_avg.run(increase=True)
2. Defining an Experiment
using FedProx
Aggregator
¶
In order to improve our results, we can change our Aggregator
, by changing FedAverage
into FedProx
. Since FedProx
is a FedAverge
aggregator with a regularization term, we are re-using FedAverage
Aggregator
but we will be adding to the training_args
fedprox_mu
, that is the regularization parameter.
# let's create a new folder for storing tensorbaord results for FedProx aggregator
import os
from fedbiomed.researcher.environ import environ
fedprox_tensorboard_dir = os.path.join(environ['ROOT_DIR'], 'fedprox_runs')
os.makedirs(fedprox_tensorboard_dir, exist_ok=True)
environ['TENSORBOARD_RESULTS_DIR'] = fedprox_tensorboard_dir
%reload_ext tensorboard
tensorboard --logdir "$fedprox_tensorboard_dir"
model_args = {}
training_args_fedprox = {
'batch_size': 8,
'optimizer_args': {
"lr" : 1e-3
},
'epochs': 1,
'dry_run': False,
'num_updates': 50,
'fedprox_mu': .1 # This parameter indicates that we are going to use FedProx
}
from fedbiomed.researcher.experiment import Experiment
from fedbiomed.researcher.aggregators import FedAverage
from fedbiomed.researcher.strategies.default_strategy import DefaultStrategy
tags = ['flixi']
rounds = 3
exp_fedprox = Experiment()
exp_fedprox.set_training_plan_class(training_plan_class=MyTrainingPlan)
exp_fedprox.set_model_args(model_args=model_args)
exp_fedprox.set_training_args(training_args=training_args_fedprox)
exp_fedprox.set_tags(tags = tags)
exp_fedprox.set_aggregator(aggregator=FedAverage())
exp_fedprox.set_round_limit(rounds)
exp_fedprox.set_training_data(training_data=None, from_tags=True)
exp_fedprox.set_job()
exp_fedprox.set_strategy(node_selection_strategy=DefaultStrategy)
exp_fedprox.set_tensorboard(True)
exp_fedprox.run(increase=True)
3. Defining an Experiment
using SCAFFOLD
Aggregator
¶
Scaffold
purpose is to limit the so called client drift that may happen when dealing with heterogenous datasset accross Nodes
.
In order to use Scaffold
, we will have to import another Aggregator
from fedbiomed.researcher.aggregators
module, as you can see below.
Scaffold
takes server_lr
and fds
the as arguments.
server_lr
is the server learning rate (inScaffold
, used to perform a gradient descent on global model's updatesfds
is theFederated Dataset
containing information aboutNodes
connected to the network after issuing aTrainRequest
Please note that it is possible to use Scaffold
with a regularization parameter as suggested in FedProx
. For that, you just have to specify fedprox_mu
into the training_args
dictionary, as shown in the FedProx
example
# let's create a new folder for storing tensorbaord results for SCAFFOLD aggregator
scaffold_tensorboard_dir = os.path.join(environ['ROOT_DIR'], 'scaffold_runs')
os.makedirs(scaffold_tensorboard_dir, exist_ok=True)
environ['TENSORBOARD_RESULTS_DIR'] = scaffold_tensorboard_dir
%reload_ext tensorboard
tensorboard --logdir "$scaffold_tensorboard_dir"
from fedbiomed.researcher.aggregators import Scaffold
from fedbiomed.researcher.strategies.default_strategy import DefaultStrategy
server_lr = .8
exp_scaffold = Experiment()
exp_scaffold.set_training_plan_class(training_plan_class=MyTrainingPlan)
exp_scaffold.set_model_args(model_args=model_args)
exp_scaffold.set_training_args(training_args=training_args)
exp_scaffold.set_tags(tags = tags)
exp_scaffold.set_aggregator(aggregator=FedAverage())
exp_scaffold.set_round_limit(rounds)
exp_scaffold.set_training_data(training_data=None, from_tags=True)
exp_scaffold.set_job()
exp_scaffold.set_strategy(node_selection_strategy=DefaultStrategy)
exp_scaffold.set_tensorboard(True)
exp_scaffold.set_aggregator(Scaffold(server_lr=server_lr))
exp_scaffold.run(increase=True)
4. Going further¶
In this tutorial we presented 3 important Aggregators
that can be found in the Federated Learning Literature. If you want to create your custom Aggregator
, please check our Aggregation User guide
You may have noticed that thanks to Fed-BioMed's modular structure, it is possible to alternate from one aggregator to another while conducting an Experiment
. For instance, you may start with SCAFFOLD
Aggregator
for the 3 first rounds, and then switch to FedAverage
Aggregator
for the remaining rounds, as shown in the example below:
from fedbiomed.researcher.aggregators import Scaffold, FedAverage
from fedbiomed.researcher.strategies.default_strategy import DefaultStrategy
server_lr = .8
exp_multi_agg = Experiment()
# selecting how many rounds of each aggregator we will perform
rounds_scaffold = 3
rounds_fedavg = 1
exp_multi_agg.set_training_plan_class(training_plan_class=MyTrainingPlan)
exp_multi_agg.set_model_args(model_args=model_args)
exp_multi_agg.set_training_args(training_args=training_args)
exp_multi_agg.set_tags(tags = tags)
exp_multi_agg.set_aggregator(aggregator=FedAverage())
exp_multi_agg.set_round_limit(rounds_scaffold + rounds_fedavg)
exp_multi_agg.set_training_data(training_data=None, from_tags=True)
exp_multi_agg.set_job()
exp_multi_agg.set_strategy(node_selection_strategy=DefaultStrategy)
exp_multi_agg.set_aggregator(Scaffold(server_lr=server_lr))
exp_multi_agg.run(rounds=rounds_scaffold)
exp_multi_agg.set_aggregator(FedAverage())
exp_multi_agg.run(rounds=rounds_fedavg)