PyTorch aggregation methods in Fed-BioMed¶
Difficulty level: advanced
Introduction¶
This tutorial focuses on how to deal with heteregenous dataset by changing its Aggegator. Fed-BioMed provides different methods for Aggregation. Selecting an appropriate Aggregation method can be critical when being confronted to unbalanced /heterogenous datasets.
Aggregators provides a way to merge local models sent by Nodes into a global, more generalized model. Please note that designing Nodes sampling Strategies could also help when working on heterogenous datasets.
For more information about Aggregators object in Fed-BioMed, and on how to create your own Aggregator; please see Aggregators in the User Guide
Before you start¶
For this tutorial, we will be using heterogenous Fed-IXI dataset, provided by FLamby. FLamby comes with a few medical datasets that have heterogenous data properties. Please have a look at the notebooks on how to use FLamby in Fed-BioMed tutorials before starting - you will indeed need to set up FLamby before running this tutorial.
1. Defining an Experiment using FedAverage Aggregator¶
First, let's re-use the TorchTrainingPlan that is defined in the FLamby tutorials. FedAveraging has been introduced by McMahan et al. as the first aggregation method in the Federated Learning litterature. It does the weighted sum of all Nodes local models parameters in order to obtain a global model:
In this tutorial, we will keep the same TrainingPlan (and thus the same model) for all the Experimentations, we will be changing only Aggregators
from fedbiomed.common.training_plans import TorchTrainingPlan
from flamby.datasets.fed_ixi import Baseline, BaselineLoss, Optimizer
from fedbiomed.common.data import FlambyDataset, DataManager
class MyTrainingPlan(TorchTrainingPlan):
def init_model(self, model_args):
return Baseline()
def init_optimizer(self, optimizer_args):
return Optimizer(self.model().parameters(), lr=optimizer_args["lr"])
def init_dependencies(self):
return ["from flamby.datasets.fed_ixi import Baseline, BaselineLoss, Optimizer",
"from fedbiomed.common.data import FlambyDataset, DataManager"]
def training_step(self, data, target):
output = self.model().forward(data)
return BaselineLoss().forward(output, target)
def training_data(self, batch_size=2):
dataset = FlambyDataset()
loader_arguments = {'batch_size': batch_size, 'shuffle': True}
return DataManager(dataset, **loader_arguments)
We define hereafter parameters for Experiment to be used with vanilla FedAverage
model_args = {}
training_args = {
'batch_size': 8,
'optimizer_args': {
"lr" : 1e-3
},
'dry_run': False,
'num_updates': 50
}
Activate Tensorboard
%load_ext tensorboard
from fedbiomed.researcher.environ import environ
import os
fedavg_tensorboard_dir = os.path.join(environ['ROOT_DIR'], 'fedavg_runs')
os.makedirs(fedavg_tensorboard_dir, exist_ok=True)
environ['TENSORBOARD_RESULTS_DIR'] = fedavg_tensorboard_dir
tensorboard --logdir "$fedavg_tensorboard_dir"
We then import FedAverage Aggregator from Fed-BioMed's Aggregators
from fedbiomed.researcher.experiment import Experiment
from fedbiomed.researcher.aggregators import FedAverage
from fedbiomed.researcher.strategies.default_strategy import DefaultStrategy
tags = ['flixi']
exp_fed_avg = Experiment()
exp_fed_avg.set_training_plan_class(training_plan_class=MyTrainingPlan)
exp_fed_avg.set_model_args(model_args=model_args)
exp_fed_avg.set_training_args(training_args=training_args)
exp_fed_avg.set_tags(tags = tags)
exp_fed_avg.set_aggregator(aggregator=FedAverage())
exp_fed_avg.set_round_limit(rounds)
exp_fed_avg.set_training_data(training_data=None, from_tags=True)
exp_fed_avg.set_job()
exp_fed_avg.set_strategy(node_selection_strategy=DefaultStrategy)
exp_fed_avg.set_tensorboard(True)
exp_fed_avg.run(increase=True)
2. Defining an Experiment using FedProx Aggregator¶
In order to improve our results, we can change our Aggregator, by changing FedAverage into FedProx. Since FedProx is a FedAverge aggregator with a regularization term, we are re-using FedAverage Aggregator but we will be adding to the training_args fedprox_mu, that is the regularization parameter.
# let's create a new folder for storing tensorbaord results for FedProx aggregator
import os
from fedbiomed.researcher.environ import environ
fedprox_tensorboard_dir = os.path.join(environ['ROOT_DIR'], 'fedprox_runs')
os.makedirs(fedprox_tensorboard_dir, exist_ok=True)
environ['TENSORBOARD_RESULTS_DIR'] = fedprox_tensorboard_dir
%reload_ext tensorboard
tensorboard --logdir "$fedprox_tensorboard_dir"
model_args = {}
training_args_fedprox = {
'batch_size': 8,
'optimizer_args': {
"lr" : 1e-3
},
'dry_run': False,
'num_updates': 50,
'fedprox_mu': .1 # This parameter indicates that we are going to use FedProx
}
from fedbiomed.researcher.experiment import Experiment
from fedbiomed.researcher.aggregators import FedAverage
from fedbiomed.researcher.strategies.default_strategy import DefaultStrategy
tags = ['flixi']
rounds = 3
exp_fedprox = Experiment()
exp_fedprox.set_training_plan_class(training_plan_class=MyTrainingPlan)
exp_fedprox.set_model_args(model_args=model_args)
exp_fedprox.set_training_args(training_args=training_args_fedprox)
exp_fedprox.set_tags(tags = tags)
exp_fedprox.set_aggregator(aggregator=FedAverage())
exp_fedprox.set_round_limit(rounds)
exp_fedprox.set_training_data(training_data=None, from_tags=True)
exp_fedprox.set_job()
exp_fedprox.set_strategy(node_selection_strategy=DefaultStrategy)
exp_fedprox.set_tensorboard(True)
exp_fedprox.run(increase=True)
3. Defining an Experiment using SCAFFOLD Aggregator¶
Scaffold purpose is to limit the so called client drift that may happen when dealing with heterogenous datasset accross Nodes.
In order to use Scaffold, we will have to import another Aggregator from fedbiomed.researcher.aggregators module, as you can see below.
Scaffold takes server_lr and fds the as arguments.
server_lris the server learning rate (inScaffold, used to perform a gradient descent on global model's updatesfdsis theFederated Datasetcontaining information aboutNodesconnected to the network after issuing aTrainRequest
Please note that it is possible to use Scaffold with a regularization parameter as suggested in FedProx. For that, you just have to specify fedprox_mu into the training_args dictionary, as shown in the FedProx example
# let's create a new folder for storing tensorbaord results for SCAFFOLD aggregator
scaffold_tensorboard_dir = os.path.join(environ['ROOT_DIR'], 'scaffold_runs')
os.makedirs(scaffold_tensorboard_dir, exist_ok=True)
environ['TENSORBOARD_RESULTS_DIR'] = scaffold_tensorboard_dir
%reload_ext tensorboard
tensorboard --logdir "$scaffold_tensorboard_dir"
from fedbiomed.researcher.aggregators import Scaffold
from fedbiomed.researcher.strategies.default_strategy import DefaultStrategy
server_lr = .8
exp_scaffold = Experiment()
exp_scaffold.set_training_plan_class(training_plan_class=MyTrainingPlan)
exp_scaffold.set_model_args(model_args=model_args)
exp_scaffold.set_training_args(training_args=training_args)
exp_scaffold.set_tags(tags = tags)
exp_scaffold.set_aggregator(aggregator=FedAverage())
exp_scaffold.set_round_limit(rounds)
exp_scaffold.set_training_data(training_data=None, from_tags=True)
exp_scaffold.set_job()
exp_scaffold.set_strategy(node_selection_strategy=DefaultStrategy)
exp_scaffold.set_tensorboard(True)
exp_scaffold.set_aggregator(Scaffold(server_lr=server_lr))
exp_scaffold.run(increase=True)
4. Going further¶
In this tutorial we presented 3 important Aggregators that can be found in the Federated Learning Literature. If you want to create your custom Aggregator, please check our Aggregation User guide
You may have noticed that thanks to Fed-BioMed's modular structure, it is possible to alternate from one aggregator to another while conducting an Experiment. For instance, you may start with SCAFFOLD Aggregator for the 3 first rounds, and then switch to FedAverage Aggregator for the remaining rounds, as shown in the example below:
from fedbiomed.researcher.aggregators import Scaffold, FedAverage
from fedbiomed.researcher.strategies.default_strategy import DefaultStrategy
server_lr = .8
exp_multi_agg = Experiment()
# selecting how many rounds of each aggregator we will perform
rounds_scaffold = 3
rounds_fedavg = 1
exp_multi_agg.set_training_plan_class(training_plan_class=MyTrainingPlan)
exp_multi_agg.set_model_args(model_args=model_args)
exp_multi_agg.set_training_args(training_args=training_args)
exp_multi_agg.set_tags(tags = tags)
exp_multi_agg.set_aggregator(aggregator=FedAverage())
exp_multi_agg.set_round_limit(rounds_scaffold + rounds_fedavg)
exp_multi_agg.set_training_data(training_data=None, from_tags=True)
exp_multi_agg.set_job()
exp_multi_agg.set_strategy(node_selection_strategy=DefaultStrategy)
exp_multi_agg.set_aggregator(Scaffold(server_lr=server_lr))
exp_multi_agg.run(rounds=rounds_scaffold)
exp_multi_agg.set_aggregator(FedAverage())
exp_multi_agg.run(rounds=rounds_fedavg)