Paramater Aggregation in Fed-Biomed

Aggregation of model parameters plays an important role in federated learning, where we naturally deal with data heterogeneity. Unlike the distributed learning datasets aren't saved as same-sized data blocks. The number of samples and the quality of the samples can vary in every node. In Fed-BioMed, we currently work on providing various solutions for this heterogeneity. Up to now, we support a function called FedAverage() which performs the standard aggregation scheme in federated learning: federated averaging. It performs a weighted mean of local model parameters based on the size of node specific datasets. This operation occurs after each round of training in the nodes.

The process of the FedAverage aggregation is shown below;

def aggregation(model_params: List[Dict[str, Union[torch.Tensor,
                                                    np.ndarray]],
                weights: List[float]) -> Dict[str, Union[torch.Tensor,
                                                    np.ndarray]]:
    """
    Args:
    model_params (List[Dict[str, Union[torch.Tensor, np.ndarray]]]): dictionary mapping
    each nodes id with incoming model weigths trained on node
    weigths (List[float]): number of samples contained in each nodes
    dataset (for FedAvg, it is to perform the weigthed sum required
    for aggregation)

    Returns:
    avg_params(Dict[str, Union[torch.Tensor, np.ndarray]]): averaged parameters
    """
    assert len(model_params) > 0, 'An empty list of models was passed.'
    assert len(weights) == len(model_params), 'List with number of observations must have ' \
                                              'the same number of elements that list of models.'

    # Compute proportions
    proportions = [n_k / sum(weights) for n_k in weights]

    # Empty model parameter dictionary
    avg_params = copy.deepcopy(model_params[0])
    #print('before for ',model_params)
    for key, val in avg_params.items():
        (t, avg_params[key] ) = initialize(val)
    if t == 'tensor':
        for model, weight in zip(model_params, proportions):
            for key in avg_params.keys():
                avg_params[key] += weight * model[key]

    if t == 'array':
        for key in avg_params.keys():
            matr = np.array([ d[key] for d in model_params ])
            avg_params[key] = np.average(matr,weights=np.array(weights),axis=0)

    #print('after for',avg_params)
    return avg_params

How to Create Your Custom Aggregator

It is possible to create your custom aggregator by creating a new class which inherits from the Aggregator class defined in fedbiomed.researcher.aggregators.aggregator.Aggregator.

class Aggregator:
    """
    Defines methods for aggregating strategy
    (eg FedAvg, FedProx, SCAFFOLD, ...).
    """
    def __init__(self):
        pass

    @staticmethod
    def normalize_weights(weights) -> list:
        # Load list of weights assigned to each node and
        # normalize these weights so they sum up to 1
        norm = [w/sum(weights) for w in weights]
        return norm

    def aggregate(self,  model_params: list, weights: list) -> Dict: # pragma: no cover
        """Strategy to aggregate models"""
        pass

Your child class should extend the method aggregate that gets model parameters and weights as inputs. The model parameters are those which have been locally updated in each node during the last round. The weights represent the ratio of the number of samples in each nodes and the total number of samples. Your custom aggregator class should return aggregated parameters.

You should also pay attention to the way the parameters are loaded. For example, it may be a dictionary that contains tensor data types or just an array. As you can see from the following example, the aggregator first checks the data type of the parameters and then it does the averaging.

    if t == 'tensor':
        for model, weight in zip(model_params, proportions):
            for key in avg_params.keys():
                avg_params[key] += weight * model[key]

    if t == 'array':
        for key in avg_params.keys():
            matr = np.array([ d[key] for d in model_params ])
            avg_params[key] = np.average(matr,weights=np.array(weights),axis=0)

Conclusions

In this article, the aggregation process is explained. Currently, Fed-BioMed only supports the vanilla federated averaging scheme for the aggregation operation called FedAverage(). However, Fed-BioMed also allows you to create your custom aggregator using the Aggerator parent class. It means that you define your custom aggregator based on your problem. You can define it in your notebook or python script and passed into the experiement as an argument.