fedbiomed.researcher.aggregators

Module: fedbiomed.researcher.aggregators

Classes

Aggregator

CLASS
Aggregator()

Defines methods for aggregating strategy (eg FedAvg, FedProx, SCAFFOLD, ...).

Source code in fedbiomed/researcher/aggregators/aggregator.py
def __init__(self):
    self._aggregator_params = None

Functions

aggregate(model_params, weights)

Strategy to aggregate models

Parameters:

Name Type Description Default
model_params list

List of model parameters received from each node

required
weights list

Weight for each node-model-parameter set

required

Raises:

Type Description
FedbiomedAggregatorError

If the method is not defined by inheritor

Source code in fedbiomed/researcher/aggregators/aggregator.py
def aggregate(self, model_params: list, weights: list) -> Dict:
    """
    Strategy to aggregate models

    Args:
        model_params: List of model parameters received from each node
        weights: Weight for each node-model-parameter set

    Raises:
        FedbiomedAggregatorError: If the method is not defined by inheritor
    """
    msg = ErrorNumbers.FB401.value + \
        ": aggreate method should be overloaded by the choosen strategy"
    logger.critical(msg)
    raise FedbiomedAggregatorError(msg)
load_state(state=None)

use for breakpoints. load the aggregator state

Source code in fedbiomed/researcher/aggregators/aggregator.py
def load_state(self, state: Dict[str, Any] = None):
    """
    use for breakpoints. load the aggregator state
    """
    self._aggregator_params = state['parameters']
normalize_weights(weights)
staticmethod

Load list of weights assigned to each node and normalize these weights so they sum up to 1

assuming that all values are >= 0.0

Source code in fedbiomed/researcher/aggregators/aggregator.py
@staticmethod
def normalize_weights(weights) -> list:
    """
    Load list of weights assigned to each node and
    normalize these weights so they sum up to 1

    assuming that all values are >= 0.0
    """
    _l = len(weights)
    if _l == 0:
        return []
    _s = sum(weights)
    if _s == 0:
        norm = [ 1.0 / _l ] * _l
    else:
        norm = [_w / _s for _w in weights]
    return norm
save_state()

use for breakpoints. save the aggregator state

Source code in fedbiomed/researcher/aggregators/aggregator.py
def save_state(self) -> Dict[str, Any]:
    """
    use for breakpoints. save the aggregator state
    """
    state = {
        "class": type(self).__name__,
        "module": self.__module__,
        "parameters": self._aggregator_params
    }
    return state

FedAverage

CLASS
FedAverage()

Bases: Aggregator

Defines the Federated averaging strategy

Source code in fedbiomed/researcher/aggregators/fedavg.py
def __init__(self):
    """Construct `FedAverage` object as an instance of [`Aggregator`]
    [fedbiomed.researcher.aggregators.Aggregator].
    """
    super(FedAverage, self).__init__()
    self.aggregator_name = "FedAverage"

Attributes

aggregator_name instance-attribute
aggregator_name = 'FedAverage'

Functions

aggregate(model_params, weights)

Aggregates local models sent by participating nodes into a global model, following Federated Averaging strategy.

Parameters:

Name Type Description Default
model_params list

contains each model layers

required
weights list

contains all weights of a given layer.

required

Returns:

Type Description
Dict

Aggregated parameters

Source code in fedbiomed/researcher/aggregators/fedavg.py
def aggregate(self, model_params: list, weights: list) -> Dict:
    """ Aggregates  local models sent by participating nodes into a global model, following Federated Averaging
    strategy.

    Args:
        model_params: contains each model layers
        weights: contains all weights of a given layer.

    Returns:
        Aggregated parameters
    """
    weights = self.normalize_weights(weights)
    return federated_averaging(model_params, weights)

Functions

federated_averaging(model_params, weights)

Defines Federated Averaging (FedAvg) strategy for model aggregation.

Parameters:

Name Type Description Default
model_params List[Dict[str, torch.Tensor]]

list that contains nodes' model parameters; each model is stored as an OrderedDict (maps model layer name to the model weights)

required
weights List[float]

weights for performing weighted sum in FedAvg strategy (depending on the dataset size of each node). Items in the list must always sum up to 1

required

Returns:

Type Description
Dict[str, torch.Tensor]

Final model with aggregated layers, as an OrderedDict object.

Source code in fedbiomed/researcher/aggregators/functional.py
def federated_averaging(model_params: List[Dict[str, torch.Tensor]],
                        weights: List[float]) -> Dict[str, torch.Tensor]:
    """Defines Federated Averaging (FedAvg) strategy for model aggregation.

    Args:
        model_params: list that contains nodes' model parameters; each model is stored as an OrderedDict (maps
            model layer name to the model weights)
        weights: weights for performing weighted sum in FedAvg strategy (depending on the dataset size of each node).
            Items in the list must always sum up to 1

    Returns:
        Final model with aggregated layers, as an OrderedDict object.
    """
    assert len(model_params) > 0, 'An empty list of models was passed.'
    assert len(weights) == len(model_params), 'List with number of observations must have ' \
                                              'the same number of elements that list of models.'

    # Compute proportions
    proportions = [n_k / sum(weights) for n_k in weights]

    # Empty model parameter dictionary
    avg_params = copy.deepcopy(model_params[0])

    for key, val in avg_params.items():
        (t, avg_params[key] ) = initialize(val)
    if t == 'tensor':
        for model, weight in zip(model_params, proportions):
            for key in avg_params.keys():
                avg_params[key] += weight * model[key]

    if t == 'array':
        for key in avg_params.keys():
            matr = np.array([ d[key] for d in model_params ])
            avg_params[key] = np.average(matr, weights=np.array(weights), axis=0)

    return avg_params

initialize(val)

Initialize tensor or array vector.

Source code in fedbiomed/researcher/aggregators/functional.py
def initialize(val):
    """Initialize tensor or array vector. """

    if isinstance(val, torch.Tensor):
        return ('tensor' , torch.zeros_like(val).float())
    elif isinstance(val, np.ndarray) or isinstance(val, list):
        return ('array' , np.zeros(len(val), dtype = float))