Parameter Aggregation in Fed-BioMed
Aggregation of model parameters plays an important role in federated learning, where we naturally deal with data heterogeneity. Unlike the distributed learning datasets, model parameters are saved as same-sized data blocks for each node training the same model. The number of samples, the quality of the samples cand their data distribution can vary in every Node
. In Fed-BioMed, we currently work on providing various solutions for this heterogeneity. Up to now, we support FedAverage
which performs the standard aggregation scheme in federated learning: federated averaging. We also provide FedProx
and SCAFFOLD
aggregation methods.
Fed-BioMed Aggregators
:
Fed-BioMed Aggregators
are showcased in the following tutorial.
Federated Averaging (FedAveraging)
FedAveraging
is the default Aggregator
in Fed-BioMed, introduced by McMahan et al.. It performs a weighted mean of local model parameters based on the size of node specific datasets. This operation occurs after each round of training in the Nodes
.
where \( w_{t} \) are the weights at round \(t\), \(K\) is the number of Nodes
participating at round \(t\), and \( n_k, n \) are the number of samples of the \(k\)-th node and of the total federation respectively.
FedProx
Similar to FedAveraging
, FedProx
performs a weighted sum of local model parameters. FedProx
however introduces a regularization operation, using \(\mathcal{L}_2\) norm, in order to tackle statistical heterogeneity. Basically, it reformulates the loss function by:
using the same notation as above, with \(\mu\) the regularization parameter (we obtain FedAveraging
by setting \(\mu=0\)) and \(F_k\) the objective function.
To use FedProx
, use FedAverage
from fedbiomed.researcher.aggregators
and specify a value for \(\mu\) in the training arguments training_args
using the argument name fedprox_mu
.
SCAFFOLD
SCAFFOLD
stands for Stochastic Controlled Averaging for Federated Learning. It introduces a correction state parameter in order to tackle the client drift, depicting the fact that when data across each Node
are heterogeneous, each of the Nodes
pushes the model in a different direction in the optimization space and the global model does not converge towards the true optima. In Fed-BioMed, only option 2 of the SCAFFOLD
paper has been implemented. Additional details about the implementation can be found in the developer API reference.
The corrected loss function used to update the model is computed as follows:
where \(c_k\) is the Node
correction term, \(c = \frac{1}{K}\sum_{k=1}^K{c_k}\) is the server's correction term,
and \(K\) is the total number of participating Nodes
as above.
On the Researcher
side, the global model is updated by performing gradient descent.
Additional parameters are needed when working with SCAFFOLD
:
server_lr
:Researcher
's learning rate for performing a gradient stepnum_updates
: the number of updates (ie gradient descent optimizer steps) to be performed on eachNode
. Relying only onn_epochs
could lead to some inconsistencies in the computation of the correction term.
Please note that:
SCAFFOLD
should be used only withSGD
optimizer. Using otherOptimizers
in Fed-BioMed is possible, but without any convergence guarantees.SCAFFOLD
can only be used with thePyTorch
framework at the moment.SCAFFOLD
requires using thenum_updates
training argument to control the number of training iterations
How to Create Your Custom Aggregator
Desinging your own Aggregator
class: the aggregation
method
The process of the FedAverage
aggregation is shown below;
def aggregation(model_params: List[Dict[str, Union[torch.Tensor,
np.ndarray]],
weights: List[float],
*args, **kwargs) -> Dict[str, Union[torch.Tensor,
np.ndarray]]:
"""
Args:
model_params (List[Dict[str, Union[torch.Tensor, np.ndarray]]]): dictionary mapping
each nodes id with incoming model weigths trained on node
weigths (List[float]): number of samples contained in each nodes
dataset (for FedAvg, it is to perform the weigthed sum required
for aggregation)
Returns:
avg_params(Dict[str, Union[torch.Tensor, np.ndarray]]): averaged parameters
"""
assert len(model_params) > 0, 'An empty list of models was passed.'
assert len(weights) == len(model_params), 'List with number of observations must have ' \
'the same number of elements that list of models.'
# Compute proportions
proportions = [n_k / sum(weights) for n_k in weights]
# Empty model parameter dictionary
avg_params = copy.deepcopy(model_params[0])
for key, val in avg_params.items():
(t, avg_params[key] ) = initialize(val)
if t == 'tensor':
for model, weight in zip(model_params, proportions):
for key in avg_params.keys():
avg_params[key] += weight * model[key]
if t == 'array':
for key in avg_params.keys():
matr = np.array([ d[key] for d in model_params ])
avg_params[key] = np.average(matr,weights=np.array(weights),axis=0)
return avg_params
aggregation
method is expecting at least model_params
and weights
arguments. Additional argument can be passed through *args
and kwargs
depending, on the values needed for your Aggregator
.
It is possible to create your custom aggregator by creating a new class which inherits from the Aggregator class defined in fedbiomed.researcher.aggregators.aggregator.Aggregator
.
class Aggregator:
"""
Defines methods for aggregating strategy
(eg FedAvg, FedProx, SCAFFOLD, ...).
"""
def __init__(self):
pass
@staticmethod
def normalize_weights(weights) -> list:
# Load list of weights assigned to each node and
# normalize these weights so they sum up to 1
norm = [w/sum(weights) for w in weights]
return norm
def aggregate(self, model_params: list, weights: list, *args, **kwargs) -> Dict: # pragma: no cover
"""Strategy to aggregate models"""
pass
Your child class should extend the method aggregate
that gets model parameters and weights as arguments. The model parameters are those which have been locally updated in each node during the last round. The weights represent the ratio of the number of samples in each node and the total number of samples. Your custom aggregator class should return aggregated parameters.
You should also pay attention to the way the parameters are loaded. For example, it may be a dictionary that contains tensor data types or just an array. As you can see from the following example, the aggregator first checks the data type of the parameters, and then it does the averaging.
if t == 'tensor':
for model, weight in zip(model_params, proportions):
for key in avg_params.keys():
avg_params[key] += weight * model[key]
if t == 'array':
for key in avg_params.keys():
matr = np.array([ d[key] for d in model_params ])
avg_params[key] = np.average(matr,weights=np.array(weights),axis=0)
Desinging your own Aggregator
class: the create_aggregator_args
method
For some advanced Aggregators
, you may need to send some argument to Nodes
in order to update the local model. For instance, SCAFFOLD
Aggregator
sends specific correction terms for each of the Nodes
involved in the training.
The method that has this responsability is create_aggregator_args
, and is designed as follow (in the fedbiomed.researcher.aggregators.aggregator.Aggregator
class):
def create_aggregator_args(self, *args, **kwargs) -> Tuple[dict, dict]:
"""Returns aggregator arguments that are expecting by the nodes
Returns:
dict: contains `Aggregator` parameters that will be sent through MQTT message
service
dict: contains parameters that will be sent through file exchange message.
Both dictionaries are mapping node_id to `Aggregator` parameters specific
to each Node.
"""
return self._aggregator_args or {}, {}
create_aggregator_args
returns two dictionaries, the first one containing Aggregator
parameters that will be sent through MQTT message service, and the second one Aggregator
parameters exchanged through file exchange service. The latter is designed for the transmission of large amount of data, e.g., in SCAFFOLD
the correction terms parameters. Each of the dictionary is mapping Nodes
ids to a dictionary of parameter to be sent to the corresponding Node
.
Conclusions
In this article, the aggregation process is explained. Currently, Fed-BioMed only supports the vanilla federated averaging scheme for the aggregation operation called FedAverage
, as well as FedProx
and SCAFFOLD
. However, Fed-BioMed also allows you to create your custom aggregator using the Aggregator
parent class. It means that you define your custom aggregator based on your problem. You can define it in your notebook or python script and passed into the experiment as an argument.