fedbiomed.researcher.aggregators
Module:fedbiomed.researcher.aggregators
Classes
Aggregator
Aggregator()
Defines methods for aggregating strategy (eg FedAvg, FedProx, SCAFFOLD, ...).
Source code in fedbiomed/researcher/aggregators/aggregator.py
def __init__(self):
self._aggregator_params = None
Functions
aggregate(model_params, weights)
aggregate(model_params, weights)
Strategy to aggregate models
Parameters:
Name | Type | Description | Default |
---|---|---|---|
model_params | list | List of model parameters received from each node | required |
weights | list | Weight for each node-model-parameter set | required |
Raises:
Type | Description |
---|---|
FedbiomedAggregatorError | If the method is not defined by inheritor |
Source code in fedbiomed/researcher/aggregators/aggregator.py
def aggregate(self, model_params: list, weights: list) -> Dict:
"""
Strategy to aggregate models
Args:
model_params: List of model parameters received from each node
weights: Weight for each node-model-parameter set
Raises:
FedbiomedAggregatorError: If the method is not defined by inheritor
"""
msg = ErrorNumbers.FB401.value + \
": aggreate method should be overloaded by the choosen strategy"
logger.critical(msg)
raise FedbiomedAggregatorError(msg)
load_state(state=None)
load_state(state=None)
use for breakpoints. load the aggregator state
Source code in fedbiomed/researcher/aggregators/aggregator.py
def load_state(self, state: Dict[str, Any] = None):
"""
use for breakpoints. load the aggregator state
"""
self._aggregator_params = state['parameters']
normalize_weights(weights)
staticmethod
normalize_weights(weights)
Load list of weights assigned to each node and normalize these weights so they sum up to 1
assuming that all values are >= 0.0
Source code in fedbiomed/researcher/aggregators/aggregator.py
@staticmethod
def normalize_weights(weights) -> list:
"""
Load list of weights assigned to each node and
normalize these weights so they sum up to 1
assuming that all values are >= 0.0
"""
_l = len(weights)
if _l == 0:
return []
_s = sum(weights)
if _s == 0:
norm = [ 1.0 / _l ] * _l
else:
norm = [_w / _s for _w in weights]
return norm
save_state()
save_state()
use for breakpoints. save the aggregator state
Source code in fedbiomed/researcher/aggregators/aggregator.py
def save_state(self) -> Dict[str, Any]:
"""
use for breakpoints. save the aggregator state
"""
state = {
"class": type(self).__name__,
"module": self.__module__,
"parameters": self._aggregator_params
}
return state
FedAverage
FedAverage()
Bases: Aggregator
Defines the Federated averaging strategy
Source code in fedbiomed/researcher/aggregators/fedavg.py
def __init__(self):
"""Construct `FedAverage` object as an instance of [`Aggregator`]
[fedbiomed.researcher.aggregators.Aggregator].
"""
super(FedAverage, self).__init__()
self.aggregator_name = "FedAverage"
Attributes
aggregator_name instance-attribute
aggregator_name = 'FedAverage'
Functions
aggregate(model_params, weights)
aggregate(model_params, weights)
Aggregates local models sent by participating nodes into a global model, following Federated Averaging strategy.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
model_params | list | contains each model layers | required |
weights | list | contains all weights of a given layer. | required |
Returns:
Type | Description |
---|---|
Dict | Aggregated parameters |
Source code in fedbiomed/researcher/aggregators/fedavg.py
def aggregate(self, model_params: list, weights: list) -> Dict:
""" Aggregates local models sent by participating nodes into a global model, following Federated Averaging
strategy.
Args:
model_params: contains each model layers
weights: contains all weights of a given layer.
Returns:
Aggregated parameters
"""
weights = self.normalize_weights(weights)
return federated_averaging(model_params, weights)
Functions
federated_averaging(model_params, weights)
federated_averaging(model_params, weights)
Defines Federated Averaging (FedAvg) strategy for model aggregation.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
model_params | List[Dict[str, torch.Tensor]] | list that contains nodes' model parameters; each model is stored as an OrderedDict (maps model layer name to the model weights) | required |
weights | List[float] | weights for performing weighted sum in FedAvg strategy (depending on the dataset size of each node). Items in the list must always sum up to 1 | required |
Returns:
Type | Description |
---|---|
Dict[str, torch.Tensor] | Final model with aggregated layers, as an OrderedDict object. |
Source code in fedbiomed/researcher/aggregators/functional.py
def federated_averaging(model_params: List[Dict[str, torch.Tensor]],
weights: List[float]) -> Dict[str, torch.Tensor]:
"""Defines Federated Averaging (FedAvg) strategy for model aggregation.
Args:
model_params: list that contains nodes' model parameters; each model is stored as an OrderedDict (maps
model layer name to the model weights)
weights: weights for performing weighted sum in FedAvg strategy (depending on the dataset size of each node).
Items in the list must always sum up to 1
Returns:
Final model with aggregated layers, as an OrderedDict object.
"""
assert len(model_params) > 0, 'An empty list of models was passed.'
assert len(weights) == len(model_params), 'List with number of observations must have ' \
'the same number of elements that list of models.'
# Compute proportions
proportions = [n_k / sum(weights) for n_k in weights]
# Empty model parameter dictionary
avg_params = copy.deepcopy(model_params[0])
for key, val in avg_params.items():
(t, avg_params[key] ) = initialize(val)
if t == 'tensor':
for model, weight in zip(model_params, proportions):
for key in avg_params.keys():
avg_params[key] += weight * model[key]
if t == 'array':
for key in avg_params.keys():
matr = np.array([ d[key] for d in model_params ])
avg_params[key] = np.average(matr, weights=np.array(weights), axis=0)
return avg_params
initialize(val)
initialize(val)
Initialize tensor or array vector.
Source code in fedbiomed/researcher/aggregators/functional.py
def initialize(val):
"""Initialize tensor or array vector. """
if isinstance(val, torch.Tensor):
return ('tensor' , torch.zeros_like(val).float())
elif isinstance(val, np.ndarray) or isinstance(val, list):
return ('array' , np.zeros(len(val), dtype = float))