fedbiomed.common.training_plans
Module:fedbiomed.common.training_plans
The fedbiomed.common.training_plans
module includes training plan classes that are used for federated training
Classes
BaseTrainingPlan
BaseTrainingPlan()
Base class for training plan
All concrete, framework- and/or model-specific training plans should inherit from this class, and implement: * the post_init
method: to process model and training hyper-parameters * the training_routine
method: to train the model for one round * the predict
method: to compute predictions over a given batch * (opt.) the testing_step
method: to override the evaluation behavior and compute a batch-wise (set of) metric(s)
Attributes:
Name | Type | Description |
---|---|---|
dataset_path | Union[str, None] | The path that indicates where dataset has been stored |
pre_processes | Dict[str, Dict[ProcessTypes, Union[str, Callable[..., Any]]]] | Preprocess functions that will be applied to the training data at the beginning of the training routine. |
training_data_loader | Union[DataLoader, NPDataLoader, None] | Data loader used in the training routine. |
testing_data_loader | Union[DataLoader, NPDataLoader, None] | Data loader used in the validation routine. |
Source code in fedbiomed/common/training_plans/_base_training_plan.py
def __init__(self) -> None:
"""Construct the base training plan."""
self._dependencies: List[str] = []
self.dataset_path: Union[str, None] = None
self.pre_processes: Dict[
str, Dict[ProcessTypes, Union[str, Callable[..., Any]]]
] = OrderedDict()
self.training_data_loader: Union[DataLoader, NPDataLoader, None] = None
self.testing_data_loader: Union[DataLoader, NPDataLoader, None] = None
Attributes
dataset_path instance-attribute
dataset_path: Union[str, None] = None
pre_processes instance-attribute
pre_processes: Dict[
str, Dict[ProcessTypes, Union[str, Callable[..., Any]]]
] = OrderedDict()
testing_data_loader instance-attribute
testing_data_loader: Union[
DataLoader, NPDataLoader, None
] = None
training_data_loader instance-attribute
training_data_loader: Union[
DataLoader, NPDataLoader, None
] = None
Functions
add_dependency(dep)
add_dependency(dep)
Add new dependencies to the TrainingPlan.
These dependencies are used while creating a python module.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
dep | List[str] | Dependencies to add. Dependencies should be indicated as import statement strings, e.g. | required |
Source code in fedbiomed/common/training_plans/_base_training_plan.py
def add_dependency(self, dep: List[str]) -> None:
"""Add new dependencies to the TrainingPlan.
These dependencies are used while creating a python module.
Args:
dep: Dependencies to add. Dependencies should be indicated as
import statement strings, e.g. `"from torch import nn"`.
"""
for val in dep:
if val not in self._dependencies:
self._dependencies.append(val)
add_preprocess(method, process_type)
add_preprocess(method, process_type)
Register a pre-processing method to be executed on training data.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
method | Callable | Pre-processing method to be run before training. | required |
process_type | ProcessTypes | Type of pre-processing that will be run. The expected signature of | required |
Source code in fedbiomed/common/training_plans/_base_training_plan.py
def add_preprocess(
self,
method: Callable,
process_type: ProcessTypes
) -> None:
"""Register a pre-processing method to be executed on training data.
Args:
method: Pre-processing method to be run before training.
process_type: Type of pre-processing that will be run.
The expected signature of `method` and the arguments
passed to it depend on this parameter.
"""
if not callable(method):
msg = (
f"{ErrorNumbers.FB605.value}: error while adding "
"preprocess, `method` should be callable."
)
logger.critical(msg)
raise FedbiomedTrainingPlanError(msg)
if not isinstance(process_type, ProcessTypes):
msg = (
f"{ErrorNumbers.FB605.value}: error while adding "
"preprocess, `process_type` should be an instance "
"of `fedbiomed.common.constants.ProcessTypes`."
)
logger.critical(msg)
raise FedbiomedTrainingPlanError(msg)
# NOTE: this may be revised into a list rather than OrderedDict
self.pre_processes[method.__name__] = {
'method': method,
'process_type': process_type
}
get_learning_rate()
get_learning_rate()
Source code in fedbiomed/common/training_plans/_base_training_plan.py
def get_learning_rate(self) -> List[float]:
raise FedbiomedTrainingPlanError("method not implemented")
get_model_params()
get_model_params()
Retrieves parameters from a model defined in a training plan. Output format depends on the nature of the training plan (OrderedDict for a PyTorch training plan, np.ndarray for a sklearn training plan)
Returns:
Type | Description |
---|---|
Union[OrderedDict, Dict] | Union[OrderedDict, np.ndarray]: model parameters. Object type depends on |
Union[OrderedDict, Dict] | the nature of the TrainingPlan |
Source code in fedbiomed/common/training_plans/_base_training_plan.py
def get_model_params(self) -> Union[OrderedDict, Dict]:
"""
Retrieves parameters from a model defined in a training plan.
Output format depends on the nature of the training plan (OrderedDict for
a PyTorch training plan, np.ndarray for a sklearn training plan)
Returns:
Union[OrderedDict, np.ndarray]: model parameters. Object type depends on
the nature of the TrainingPlan
"""
msg = ErrorNumbers.FB303.value + ": get_model_parans method must be implemented in the TrainingPlan"
logger.critical(msg)
raise FedbiomedTrainingPlanError(msg)
init_dependencies()
init_dependencies()
Default method where dependencies are returned
Returns:
Type | Description |
---|---|
List | Empty list as default |
Source code in fedbiomed/common/training_plans/_base_training_plan.py
def init_dependencies(self) -> List:
"""Default method where dependencies are returned
Returns:
Empty list as default
"""
return []
optimizer_args()
optimizer_args()
Retrieves optimizer arguments (to be overridden by children classes)
Returns:
Type | Description |
---|---|
Dict | Empty dictionary: (to be overridden in children classes) |
Source code in fedbiomed/common/training_plans/_base_training_plan.py
def optimizer_args(self) -> Dict:
"""Retrieves optimizer arguments (to be overridden
by children classes)
Returns:
Empty dictionary: (to be overridden in children classes)
"""
logger.warning("`optimizer_args` method not defined in training_plan!")
return {}
post_init(model_args, training_args, aggregator_args=None)
abstractmethod
post_init(model_args, training_args, aggregator_args=None)
Process model, training and optimizer arguments.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
model_args | Dict[str, Any] | Arguments defined to instantiate the wrapped model. | required |
training_args | Dict[str, Any] | Arguments that are used in training routines such as epoch, dry_run etc. Please see | required |
aggregator_args | Optional[Dict[str, Any]] | Arguments managed by and shared with the researcher-side aggregator. | None |
Source code in fedbiomed/common/training_plans/_base_training_plan.py
@abstractmethod
def post_init(
self,
model_args: Dict[str, Any],
training_args: Dict[str, Any],
aggregator_args: Optional[Dict[str, Any]] = None,
) -> None:
"""Process model, training and optimizer arguments.
Args:
model_args: Arguments defined to instantiate the wrapped model.
training_args: Arguments that are used in training routines
such as epoch, dry_run etc.
Please see [`TrainingArgs`][fedbiomed.common.training_args.TrainingArgs]
aggregator_args: Arguments managed by and shared with the
researcher-side aggregator.
"""
return None
predict(data)
abstractmethod
predict(data)
Return model predictions for a given batch of input features.
This method is called as part of testing_routine
, to compute predictions based on which evaluation metrics are computed. It will however be skipped if a testing_step
method is attached to the training plan, than wraps together a custom routine to compute an output metric directly from a (data, target) batch.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data | Any | Array-like (or tensor) structure containing batched input features. | required |
Returns:
Type | Description |
---|---|
np.ndarray | Output predictions, converted to a numpy array (as per the |
Source code in fedbiomed/common/training_plans/_base_training_plan.py
@abstractmethod
def predict(
self,
data: Any,
) -> np.ndarray:
"""Return model predictions for a given batch of input features.
This method is called as part of `testing_routine`, to compute
predictions based on which evaluation metrics are computed. It
will however be skipped if a `testing_step` method is attached
to the training plan, than wraps together a custom routine to
compute an output metric directly from a (data, target) batch.
Args:
data: Array-like (or tensor) structure containing batched
input features.
Returns:
Output predictions, converted to a numpy array (as per the
`fedbiomed.common.metrics.Metrics` specs).
"""
return NotImplemented
save_code(filepath)
save_code(filepath)
Saves the class source/codes of the training plan class that is created byuser.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
filepath | str | path to the destination file | required |
Raises:
Type | Description |
---|---|
FedbiomedTrainingPlanError | raised when source of the model class cannot be assessed |
FedbiomedTrainingPlanError | raised when model file cannot be created/opened/edited |
Source code in fedbiomed/common/training_plans/_base_training_plan.py
def save_code(self, filepath: str) -> None:
"""Saves the class source/codes of the training plan class that is created byuser.
Args:
filepath: path to the destination file
Raises:
FedbiomedTrainingPlanError: raised when source of the model class cannot be assessed
FedbiomedTrainingPlanError: raised when model file cannot be created/opened/edited
"""
try:
class_source = get_class_source(self.__class__)
except FedbiomedError as e:
msg = ErrorNumbers.FB605.value + \
" : error while getting source of the model class - " + \
str(e)
logger.critical(msg)
raise FedbiomedTrainingPlanError(msg)
# Preparing content of the module
content = ""
for s in self._dependencies:
content += s + "\n"
content += "\n"
content += class_source
try:
# should we write it in binary (for the sake of space optimization)?
with open(filepath, "w") as file:
file.write(content)
logger.debug("Model file has been saved: " + filepath)
except PermissionError:
_msg = ErrorNumbers.FB605.value + f" : Unable to read {filepath} due to unsatisfactory privileges" + \
", can't write the model content into it"
logger.critical(_msg)
raise FedbiomedTrainingPlanError(_msg)
except MemoryError:
_msg = ErrorNumbers.FB605.value + f" : Can't write model file on {filepath}: out of memory!"
logger.critical(_msg)
raise FedbiomedTrainingPlanError(_msg)
except OSError:
_msg = ErrorNumbers.FB605.value + f" : Can't open file {filepath} to write model content"
logger.critical(_msg)
raise FedbiomedTrainingPlanError(_msg)
set_aggregator_args(aggregator_args)
set_aggregator_args(aggregator_args)
Source code in fedbiomed/common/training_plans/_base_training_plan.py
def set_aggregator_args(self, aggregator_args: Dict[str, Any]):
raise FedbiomedTrainingPlanError("method not implemented and needed")
set_data_loaders(train_data_loader, test_data_loader)
set_data_loaders(train_data_loader, test_data_loader)
Sets data loaders
Parameters:
Name | Type | Description | Default |
---|---|---|---|
train_data_loader | Union[DataLoader, NPDataLoader, None] | Data loader for training routine/loop | required |
test_data_loader | Union[DataLoader, NPDataLoader, None] | Data loader for validation routine | required |
Source code in fedbiomed/common/training_plans/_base_training_plan.py
def set_data_loaders(
self,
train_data_loader: Union[DataLoader, NPDataLoader, None],
test_data_loader: Union[DataLoader, NPDataLoader, None]
) -> None:
"""Sets data loaders
Args:
train_data_loader: Data loader for training routine/loop
test_data_loader: Data loader for validation routine
"""
self.training_data_loader = train_data_loader
self.testing_data_loader = test_data_loader
set_dataset_path(dataset_path)
set_dataset_path(dataset_path)
Dataset path setter for TrainingPlan
Parameters:
Name | Type | Description | Default |
---|---|---|---|
dataset_path | str | The path where data is saved on the node. This method is called by the node that executes the training. | required |
Source code in fedbiomed/common/training_plans/_base_training_plan.py
def set_dataset_path(self, dataset_path: str) -> None:
"""Dataset path setter for TrainingPlan
Args:
dataset_path: The path where data is saved on the node.
This method is called by the node that executes the training.
"""
self.dataset_path = dataset_path
logger.debug(f"Dataset path has been set as {self.dataset_path}")
testing_routine(metric, metric_args, history_monitor, before_train)
testing_routine(metric, metric_args, history_monitor, before_train)
Evaluation routine, to be called once per round.
Note
If the training plan implements a testing_step
method (the signature of which is func(data, target) -> metrics) then it will be used rather than the input metric.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
metric | Optional[MetricTypes] | The metric used for validation. If None, use MetricTypes.ACCURACY. | required |
history_monitor | Optional[HistoryMonitor] | HistoryMonitor instance, used to record computed metrics and communicate them to the researcher (server). | required |
before_train | bool | Whether the evaluation is being performed before local training occurs, of afterwards. This is merely reported back through | required |
Source code in fedbiomed/common/training_plans/_base_training_plan.py
def testing_routine(
self,
metric: Optional[MetricTypes],
metric_args: Dict[str, Any],
history_monitor: Optional['HistoryMonitor'],
before_train: bool
) -> None:
"""Evaluation routine, to be called once per round.
!!! info "Note"
If the training plan implements a `testing_step` method
(the signature of which is func(data, target) -> metrics)
then it will be used rather than the input metric.
Args:
metric: The metric used for validation.
If None, use MetricTypes.ACCURACY.
history_monitor: HistoryMonitor instance,
used to record computed metrics and communicate them to
the researcher (server).
before_train: Whether the evaluation is being performed
before local training occurs, of afterwards. This is merely
reported back through `history_monitor`.
"""
# TODO: Add preprocess option for testing_data_loader.
if self.testing_data_loader is None:
msg = f"{ErrorNumbers.FB605.value}: no validation dataset was set."
logger.critical(msg)
raise FedbiomedTrainingPlanError(msg)
n_batches = len(self.testing_data_loader)
n_samples = len(self.testing_data_loader.dataset)
# Set up a batch-wise metrics-computation function.
# Either use an optionally-implemented custom training routine.
if hasattr(self, "testing_step"):
evaluate = getattr(self, "testing_step")
metric_name = "Custom"
# Or use the provided `metric` (or its default value).
else:
if metric is None:
metric = MetricTypes.ACCURACY
metric_controller = Metrics()
def evaluate(data, target):
nonlocal metric, metric_args, metric_controller
output = self.predict(data)
if isinstance(target, torch.Tensor):
target = target.numpy()
return metric_controller.evaluate(
target, output, metric=metric, **metric_args
)
metric_name = metric.name
# Iterate over the validation dataset and run the defined routine.
num_samples_observed_till_now: int = 0
for idx, (data, target) in enumerate(self.testing_data_loader, 1):
num_samples_observed_till_now += self._infer_batch_size(data)
# Run the evaluation step; catch and raise exceptions.
try:
m_value = evaluate(data, target)
except Exception as exc:
msg = (
f"{ErrorNumbers.FB605.value}: An error occurred "
f"while computing the {metric_name} metric: {exc}"
)
logger.critical(msg)
raise FedbiomedTrainingPlanError(msg)
# Log the computed value.
logger.debug(
f"Validation: Batch {idx}/{n_batches} "
f"| Samples {num_samples_observed_till_now}/{n_samples} "
f"| Metric[{metric_name}]: {m_value}"
)
# Further parse, and report it (provided a monitor is set).
if history_monitor is not None:
m_dict = self._create_metric_result_dict(m_value, metric_name)
history_monitor.add_scalar(
metric=m_dict,
iteration=idx,
epoch=None,
test=True,
test_on_local_updates=(not before_train),
test_on_global_updates=before_train,
total_samples=n_samples,
batch_samples=num_samples_observed_till_now,
num_batches=n_batches
)
training_data()
training_data()
All subclasses must provide a training_data routine the purpose of this actual code is to detect that it has been provided
Raises:
Type | Description |
---|---|
FedbiomedTrainingPlanError | if called and not inherited |
Source code in fedbiomed/common/training_plans/_base_training_plan.py
def training_data(self):
"""All subclasses must provide a training_data routine the purpose of this actual code is to detect
that it has been provided
Raises:
FedbiomedTrainingPlanError: if called and not inherited
"""
msg = ErrorNumbers.FB303.value + ": training_data must be implemented"
logger.critical(msg)
raise FedbiomedTrainingPlanError(msg)
training_routine(history_monitor=None, node_args=None)
abstractmethod
training_routine(history_monitor=None, node_args=None)
Training routine, to be called once per round.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
history_monitor | Optional[HistoryMonitor] | optional HistoryMonitor instance, recording training metadata. | None |
node_args | Optional[Dict[str, Any]] | Command line arguments for node. These arguments can specify GPU use; however, this is not supported for scikit-learn models and thus will be ignored. | None |
Source code in fedbiomed/common/training_plans/_base_training_plan.py
@abstractmethod
def training_routine(
self,
history_monitor: Optional['HistoryMonitor'] = None,
node_args: Optional[Dict[str, Any]] = None
) -> None:
"""Training routine, to be called once per round.
Args:
history_monitor: optional HistoryMonitor
instance, recording training metadata.
node_args: Command line arguments for node.
These arguments can specify GPU use; however, this is not
supported for scikit-learn models and thus will be ignored.
"""
return None
FedPerceptron
FedPerceptron()
Bases: FedSGDClassifier
Fed-BioMed training plan for scikit-learn Perceptron models.
This class inherits from FedSGDClassifier, and forces the wrapped scikit-learn SGDClassifier model to use a "perceptron" loss, that makes it equivalent to an actual scikit-learn Perceptron model.
Source code in fedbiomed/common/training_plans/_sklearn_models.py
def __init__(self) -> None:
"""Class constructor."""
super().__init__()
self._model.set_params(loss="perceptron")
Functions
post_init(model_args, training_args, aggregator_args=None)
post_init(model_args, training_args, aggregator_args=None)
Source code in fedbiomed/common/training_plans/_sklearn_models.py
def post_init(
self,
model_args: Dict[str, Any],
training_args: Dict[str, Any],
aggregator_args: Optional[Dict[str, Any]] = None,
) -> None:
model_args["loss"] = "perceptron"
super().post_init(model_args, training_args)
FedSGDClassifier
FedSGDClassifier()
Bases: SKLearnTrainingPlanPartialFit
Fed-BioMed training plan for scikit-learn SGDClassifier models.
Source code in fedbiomed/common/training_plans/_sklearn_models.py
def __init__(self) -> None:
"""Initialize the sklearn SGDClassifier training plan."""
super().__init__()
self._is_classification = True
Functions
get_learning_rate()
get_learning_rate()
Source code in fedbiomed/common/training_plans/_sklearn_models.py
def get_learning_rate(self) -> List[float]:
return self._model.eta0
set_init_params()
set_init_params()
Initialize the model's trainable parameters.
Source code in fedbiomed/common/training_plans/_sklearn_models.py
def set_init_params(self) -> None:
"""Initialize the model's trainable parameters."""
# Set up zero-valued start weights, for binary of multiclass classif.
n_classes = self._model_args["n_classes"]
if n_classes == 2:
init_params = {
"intercept_": np.zeros((1,)),
"coef_": np.zeros((1, self._model_args["n_features"]))
}
else:
init_params = {
"intercept_": np.zeros((n_classes,)),
"coef_": np.zeros((n_classes, self._model_args["n_features"]))
}
# Assign these initialization parameters and retain their names.
self._param_list = list(init_params.keys())
for key, val in init_params.items():
setattr(self._model, key, val)
# Also initialize the "classes_" slot with unique predictable labels.
# FIXME: this assumes target values are integers in range(n_classes).
setattr(self._model, "classes_", np.arange(n_classes))
FedSGDRegressor
FedSGDRegressor()
Bases: SKLearnTrainingPlanPartialFit
Fed-BioMed training plan for scikit-learn SGDRegressor models.
Source code in fedbiomed/common/training_plans/_sklearn_models.py
def __init__(self) -> None:
"""Initialize the sklearn SGDRegressor training plan."""
super().__init__()
self._is_regression = True
Functions
get_learning_rate()
get_learning_rate()
Source code in fedbiomed/common/training_plans/_sklearn_models.py
def get_learning_rate(self) -> List[float]:
return self._model.eta0
set_init_params()
set_init_params()
Initialize the model's trainable parameters.
Source code in fedbiomed/common/training_plans/_sklearn_models.py
def set_init_params(self) -> None:
"""Initialize the model's trainable parameters."""
init_params = {
'intercept_': np.array([0.]),
'coef_': np.array([0.] * self._model_args['n_features'])
}
self._param_list = list(init_params.keys())
for key, val in init_params.items():
setattr(self._model, key, val)
SKLearnTrainingPlan
SKLearnTrainingPlan()
Bases: BaseTrainingPlan
Base class for Fed-BioMed wrappers of sklearn classes.
Classes that inherit from this abstract class must: - Specify a _model_cls
class attribute that defines the type of scikit-learn model being wrapped for training. - Implement a set_init_params
method that: - sets and assigns the model's initial trainable weights attributes. - populates the _param_list
attribute with names of these attributes. - Implement a _training_routine
method that performs a training round based on self.train_data_loader
(which is a NPDataLoader
).
Attributes:
Name | Type | Description |
---|---|---|
dataset_path | Optional[str] | The path that indicates where dataset has been stored |
pre_processes | Optional[str] | Preprocess functions that will be applied to the training data at the beginning of the training routine. |
training_data_loader | Optional[str] | Data loader used in the training routine. |
testing_data_loader | Optional[str] | Data loader used in the validation routine. |
Source code in fedbiomed/common/training_plans/_sklearn_training_plan.py
def __init__(self) -> None:
"""Initialize the SKLearnTrainingPlan."""
super().__init__()
self._model = self._model_cls()
self._model_args = {} # type: Dict[str, Any]
self._training_args = {} # type: Dict[str, Any]
self._param_list = [] # type: List[str]
self.__type = TrainingPlans.SkLearnTrainingPlan
self._is_classification = False
self._batch_maxnum = 0
self.dataset_path: Optional[str] = None
self.add_dependency([
"import inspect",
"import numpy as np",
"import pandas as pd",
"from fedbiomed.common.training_plans import SKLearnTrainingPlan",
"from fedbiomed.common.data import DataManager",
])
self.add_dependency(list(self._model_dep))
Attributes
dataset_path instance-attribute
dataset_path: Optional[str] = None
Functions
after_training_params()
after_training_params()
Return the wrapped model's trainable parameters' current values.
This method returns a dict containing parameters that need to be reported back and aggregated in a federated learning setting.
Returns:
Type | Description |
---|---|
Dict[str, np.ndarray] | dict[str, np.ndarray]: the trained parameters to aggregate. |
Source code in fedbiomed/common/training_plans/_sklearn_training_plan.py
def after_training_params(self) -> Dict[str, np.ndarray]:
"""Return the wrapped model's trainable parameters' current values.
This method returns a dict containing parameters that need
to be reported back and aggregated in a federated learning
setting.
Returns:
dict[str, np.ndarray]: the trained parameters to aggregate.
"""
return {key: getattr(self._model, key) for key in self._param_list}
get_learning_rate(lr_key='eta0')
get_learning_rate(lr_key='eta0')
Source code in fedbiomed/common/training_plans/_sklearn_training_plan.py
def get_learning_rate(self, lr_key: str = 'eta0') -> List[float]:
lr = self._model_args.get(lr_key)
if lr is None:
# get the default value
lr = self._model.__dict__.get(lr_key)
if lr is None:
raise FedbiomedTrainingPlanError("Cannot retrieve learning rate. As a quick fix, specify it in the Model_args")
return [lr]
get_model_params()
get_model_params()
Source code in fedbiomed/common/training_plans/_sklearn_training_plan.py
def get_model_params(self) -> Dict:
return self.after_training_params()
load(filename, to_params=False)
load(filename, to_params=False)
Load a scikit-learn model dump, overwriting the wrapped model.
This method uses the joblib.load function, which in turn uses pickle to deserialize the model. Note that unpickling objects can lead to arbitrary code execution; hence use with care.
This function updates the _model
private attribute with the loaded instance, and returns either that same model or a dict wrapping its trainable parameters.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
filename | str | The path to the pickle file to load. | required |
to_params | bool | Whether to return the model's parameters wrapped as a dict rather than the model instance. | False |
Notes
Load can be called from a Job or Round: * From Round it is called to return the model. * From Job it is called with to return its parameters dict.
Returns:
Type | Description |
---|---|
Union[BaseEstimator, Dict[str, Dict[str, np.ndarray]]] | Dictionary with the loaded parameters. |
Source code in fedbiomed/common/training_plans/_sklearn_training_plan.py
def load(
self,
filename: str,
to_params: bool = False
) -> Union[BaseEstimator, Dict[str, Dict[str, np.ndarray]]]:
"""Load a scikit-learn model dump, overwriting the wrapped model.
This method uses the joblib.load function, which in turn uses
pickle to deserialize the model. Note that unpickling objects
can lead to arbitrary code execution; hence use with care.
This function updates the `_model` private attribute with the
loaded instance, and returns either that same model or a dict
wrapping its trainable parameters.
Args:
filename: The path to the pickle file to load.
to_params: Whether to return the model's parameters
wrapped as a dict rather than the model instance.
Notes:
Load can be called from a Job or Round:
* From Round it is called to return the model.
* From Job it is called with to return its parameters dict.
Returns:
Dictionary with the loaded parameters.
"""
# Deserialize the dump, type-check the instance and assign it.
with open(filename, "rb") as file:
model = joblib.load(file)
if not isinstance(model, self._model_cls):
msg = (
f"{ErrorNumbers.FB304.value}: reloaded model does not conform "
f"to expectations: should be of type {self._model_cls}, not "
f"{type(model)}."
)
logger.critical(msg)
raise FedbiomedTrainingPlanError(msg)
self._model = model
# Optionally return the model's pseudo state dict instead of it.
if to_params:
params = {k: getattr(self._model, k) for k in self._param_list}
return {"model_params": params}
return self._model
model()
model()
Retrieve the wrapped scikit-learn model instance.
Returns:
Type | Description |
---|---|
BaseEstimator | Scikit-learn model instance |
Source code in fedbiomed/common/training_plans/_sklearn_training_plan.py
def model(self) -> BaseEstimator:
"""Retrieve the wrapped scikit-learn model instance.
Returns:
Scikit-learn model instance
"""
return self._model
model_args()
model_args()
post_init(model_args, training_args, aggregator_args=None)
post_init(model_args, training_args, aggregator_args=None)
Process model, training and optimizer arguments.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
model_args | Dict[str, Any] | Arguments defined to instantiate the wrapped model. | required |
training_args | Dict[str, Any] | Arguments that are used in training routines such as epoch, dry_run etc. Please see | required |
aggregator_args | Optional[Dict[str, Any]] | Arguments managed by and shared with the researcher-side aggregator. | None |
Source code in fedbiomed/common/training_plans/_sklearn_training_plan.py
def post_init(
self,
model_args: Dict[str, Any],
training_args: Dict[str, Any],
aggregator_args: Optional[Dict[str, Any]] = None,
) -> None:
"""Process model, training and optimizer arguments.
Args:
model_args: Arguments defined to instantiate the wrapped model.
training_args: Arguments that are used in training routines
such as epoch, dry_run etc.
Please see [`TrainingArgs`][fedbiomed.common.training_args.TrainingArgs]
aggregator_args: Arguments managed by and shared with the
researcher-side aggregator.
"""
self._model_args = model_args
self._aggregator_args = aggregator_args or {}
self._model_args.setdefault("verbose", 1)
self._training_args = training_args.pure_training_arguments()
self._batch_maxnum = self._training_args.get('batch_maxnum', self._batch_maxnum)
# Add dependencies
self._configure_dependencies()
# Override default model parameters based on `self._model_args`.
params = {
key: self._model_args.get(key, val)
for key, val in self._model.get_params().items()
}
self._model.set_params(**params)
# Set up additional parameters (normally created by `self._model.fit`).
self.set_init_params()
predict(data)
predict(data)
Return model predictions for a given batch of input features.
This method is called as part of testing_routine
, to compute predictions based on which evaluation metrics are computed. It will however be skipped if a testing_step
method is attached to the training plan, than wraps together a custom routine to compute an output metric directly from a (data, target) batch.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data | Any | Array-like (or tensor) structure containing batched input features. | required |
Returns:
Type | Description |
---|---|
np.ndarray | Output predictions, converted to a numpy array (as per the |
Source code in fedbiomed/common/training_plans/_sklearn_training_plan.py
def predict(
self,
data: Any,
) -> np.ndarray:
"""Return model predictions for a given batch of input features.
This method is called as part of `testing_routine`, to compute
predictions based on which evaluation metrics are computed. It
will however be skipped if a `testing_step` method is attached
to the training plan, than wraps together a custom routine to
compute an output metric directly from a (data, target) batch.
Args:
data: Array-like (or tensor) structure containing batched
input features.
Returns:
Output predictions, converted to a numpy array (as per the
`fedbiomed.common.metrics.Metrics` specs).
"""
return self._model.predict(data)
save(filename, params=None)
save(filename, params=None)
Save the wrapped model and its trainable parameters.
This method is designed for parameter communication. It uses the joblib.dump function, which in turn uses pickle to serialize the model. Note that unpickling objects can lead to arbitrary code execution; hence use with care.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
filename | str | Path to the output file. | required |
params | Union[None, Dict[str, np.ndarray], Dict[str, Any]] | Model parameters to enforce and save. This may either be a {name: array} parameters dict, or a nested dict that stores such a parameters dict under the 'model_params' key (in the context of the Round class). | None |
Notes
Save can be called from Job or Round. * From Round it is called with params (as a complex dict). * From Job it is called with no params in constructor, and with params in update_parameters.
Source code in fedbiomed/common/training_plans/_sklearn_training_plan.py
def save(
self,
filename: str,
params: Union[None, Dict[str, np.ndarray], Dict[str, Any]] = None
) -> None:
"""Save the wrapped model and its trainable parameters.
This method is designed for parameter communication. It
uses the joblib.dump function, which in turn uses pickle
to serialize the model. Note that unpickling objects can
lead to arbitrary code execution; hence use with care.
Args:
filename: Path to the output file.
params: Model parameters to enforce and save.
This may either be a {name: array} parameters dict, or a
nested dict that stores such a parameters dict under the
'model_params' key (in the context of the Round class).
Notes:
Save can be called from Job or Round.
* From Round it is called with params (as a complex dict).
* From Job it is called with no params in constructor, and
with params in update_parameters.
"""
# Optionally overwrite the wrapped model's weights.
if params:
if isinstance(params.get('model_params'), dict): # in a Round
params = params["model_params"]
for key, val in params.items():
setattr(self._model, key, val)
# Save the wrapped model (using joblib, hence pickle).
with open(filename, "wb") as file:
joblib.dump(self._model, file)
set_data_loaders(train_data_loader, test_data_loader)
set_data_loaders(train_data_loader, test_data_loader)
Sets data loaders
Parameters:
Name | Type | Description | Default |
---|---|---|---|
train_data_loader | Union[DataLoader, NPDataLoader, None] | Data loader for training routine/loop | required |
test_data_loader | Union[DataLoader, NPDataLoader, None] | Data loader for validation routine | required |
Source code in fedbiomed/common/training_plans/_sklearn_training_plan.py
def set_data_loaders(
self,
train_data_loader: Union[DataLoader, NPDataLoader, None],
test_data_loader: Union[DataLoader, NPDataLoader, None]
) -> None:
"""Sets data loaders
Args:
train_data_loader: Data loader for training routine/loop
test_data_loader: Data loader for validation routine
"""
args = (train_data_loader, test_data_loader)
if not all(isinstance(data, NPDataLoader) for data in args):
msg = (
f"{ErrorNumbers.FB310.value}: SKLearnTrainingPlan expects "
"NPDataLoader instances as training and testing data "
f"loaders, but received {type(train_data_loader)} "
f"and {type(test_data_loader)} respectively."
)
logger.error(msg)
raise FedbiomedTrainingPlanError(msg)
self.training_data_loader = train_data_loader
self.testing_data_loader = test_data_loader
set_init_params()
abstractmethod
set_init_params()
Initialize the model's trainable parameters.
Source code in fedbiomed/common/training_plans/_sklearn_training_plan.py
@abstractmethod
def set_init_params(self) -> None:
"""Initialize the model's trainable parameters."""
testing_routine(metric, metric_args, history_monitor, before_train)
testing_routine(metric, metric_args, history_monitor, before_train)
Evaluation routine, to be called once per round.
Note
If the training plan implements a testing_step
method (the signature of which is func(data, target) -> metrics) then it will be used rather than the input metric.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
metric | Optional[MetricTypes] | The metric used for validation. If None, use MetricTypes.ACCURACY. | required |
history_monitor | Optional[HistoryMonitor] | HistoryMonitor instance, used to record computed metrics and communicate them to the researcher (server). | required |
before_train | bool | Whether the evaluation is being performed before local training occurs, of afterwards. This is merely reported back through | required |
Source code in fedbiomed/common/training_plans/_sklearn_training_plan.py
def testing_routine(
self,
metric: Optional[MetricTypes],
metric_args: Dict[str, Any],
history_monitor: Optional['HistoryMonitor'],
before_train: bool
) -> None:
"""Evaluation routine, to be called once per round.
!!! info "Note"
If the training plan implements a `testing_step` method
(the signature of which is func(data, target) -> metrics)
then it will be used rather than the input metric.
Args:
metric: The metric used for validation.
If None, use MetricTypes.ACCURACY.
history_monitor: HistoryMonitor instance,
used to record computed metrics and communicate them to
the researcher (server).
before_train: Whether the evaluation is being performed
before local training occurs, of afterwards. This is merely
reported back through `history_monitor`.
"""
# Check that the testing data loader is of proper type.
if not isinstance(self.testing_data_loader, NPDataLoader):
msg = (
f"{ErrorNumbers.FB310.value}: SKLearnTrainingPlan cannot be "
"evaluated without a NPDataLoader as `testing_data_loader`."
)
logger.error(msg)
raise FedbiomedTrainingPlanError(msg)
# If required, make up for the lack of specifications regarding target
# classification labels.
if self._is_classification and not hasattr(self._model, 'classes_'):
classes = self._classes_from_concatenated_train_test()
setattr(self._model, 'classes_', classes)
# If required, select the default metric (accuracy or mse).
if metric is None:
if self._is_classification:
metric = MetricTypes.ACCURACY
else:
metric = MetricTypes.MEAN_SQUARE_ERROR
# Delegate the actual evalation routine to the parent class.
super().testing_routine(
metric, metric_args, history_monitor, before_train
)
training_args()
training_args()
training_routine(history_monitor=None, node_args=None)
training_routine(history_monitor=None, node_args=None)
Training routine, to be called once per round.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
history_monitor | Optional[HistoryMonitor] | optional HistoryMonitor instance, recording training metadata. Defaults to None. | None |
node_args | Optional[Dict[str, Any]] | command line arguments for node. These arguments can specify GPU use; however, this is not supported for scikit-learn models and thus will be ignored. | None |
Source code in fedbiomed/common/training_plans/_sklearn_training_plan.py
def training_routine(
self,
history_monitor: Optional['HistoryMonitor'] = None,
node_args: Optional[Dict[str, Any]] = None
) -> None:
"""Training routine, to be called once per round.
Args:
history_monitor: optional HistoryMonitor
instance, recording training metadata. Defaults to None.
node_args: command line arguments for node.
These arguments can specify GPU use; however, this is not
supported for scikit-learn models and thus will be ignored.
"""
if self._model is None:
raise FedbiomedTrainingPlanError('model is None')
# Run preprocesses
self._preprocess()
if not isinstance(self._model, BaseEstimator):
msg = (
f"{ErrorNumbers.FB320.value}: model should be a scikit-learn "
f"estimator, but is of type {type(self._model)}"
)
logger.critical(msg)
raise FedbiomedTrainingPlanError(msg)
if not isinstance(self.training_data_loader, NPDataLoader):
msg = (
f"{ErrorNumbers.FB310.value}: SKLearnTrainingPlan cannot "
"be trained without a NPDataLoader as `training_data_loader`."
)
logger.critical(msg)
raise FedbiomedTrainingPlanError(msg)
# Run preprocessing operations.
self._preprocess()
# Warn if GPU-use was expected (as it is not supported).
if node_args is not None and node_args.get('gpu_only', False):
logger.warning(
'Node would like to force GPU usage, but sklearn training '
'plan does not support it. Training on CPU.'
)
# Run the model-specific training routine.
try:
self._training_routine(history_monitor)
except Exception as exc:
msg = (
f"{ErrorNumbers.FB605.value}: error while fitting "
f"the model: {exc}"
)
logger.critical(msg)
raise FedbiomedTrainingPlanError(msg)
type()
type()
Getter for training plan type
Source code in fedbiomed/common/training_plans/_sklearn_training_plan.py
def type(self) -> TrainingPlans:
"""Getter for training plan type """
return self.__type
TorchTrainingPlan
TorchTrainingPlan()
Bases: BaseTrainingPlan
, ABC
Implements TrainingPlan for torch NN framework
An abstraction over pytorch module to run pytorch models and scripts on node side. Researcher model (resp. params) will be:
- saved on a '.py' (resp. '.pt') files,
- uploaded on a HTTP server (network layer),
- then Downloaded from the HTTP server on node side,
- finally, read and executed on node side.
Researcher must define/override: - a training_data()
function - a training_step()
function
Researcher may have to add extra dependencies/python imports, by using add_dependencies
method.
Attributes:
Name | Type | Description |
---|---|---|
dataset_path | The path that indicates where dataset has been stored | |
pre_processes | Preprocess functions that will be applied to the training data at the beginning of the training routine. | |
training_data_loader | Data loader used in the training routine. | |
testing_data_loader | Data loader used in the validation routine. | |
correction_state | an OrderedDict of {'parameter name': torch.Tensor} where the keys correspond to the names of the model parameters contained in self._model.named_parameters(), and the values correspond to the correction to be applied to that parameter. |
Source code in fedbiomed/common/training_plans/_torchnn.py
def __init__(self):
""" Construct training plan """
super().__init__()
self.__type = TrainingPlans.TorchTrainingPlan
# Differential privacy support
self._dp_controller = None
self._optimizer = None
self._model = None
self._training_args = None
self._model_args = None
self._optimizer_args = None
self._use_gpu = False
self._batch_maxnum = 100
self._fedprox_mu = None
self._log_interval = 10
self._epochs = 1
self._dry_run = False
self._num_updates = None
self.correction_state = OrderedDict()
self.aggregator_name = None
# TODO : add random seed init
# self.random_seed_params = None
# self.random_seed_shuffling_data = None
# device to use: cpu/gpu
# - all operations except training only use cpu
# - researcher doesn't request to use gpu by default
self._device_init = "cpu"
self._device = self._device_init
# list dependencies of the model
self.add_dependency(["import torch",
"import torch.nn as nn",
"import torch.nn.functional as F",
"from fedbiomed.common.training_plans import TorchTrainingPlan",
"from fedbiomed.common.data import DataManager",
"from fedbiomed.common.constants import ProcessTypes",
"from torch.utils.data import DataLoader",
"from torchvision import datasets, transforms"
])
# Aggregated model parameters
self._init_params: List[torch.Tensor] = None
Attributes
aggregator_name instance-attribute
aggregator_name = None
correction_state instance-attribute
correction_state = OrderedDict()
Functions
after_training_params()
after_training_params()
Retrieve parameters after training is done
Call the user defined postprocess function
- if provided, the function is part of pytorch model defined by the researcher
- and expect the model parameters as argument
Returns:
Type | Description |
---|---|
dict | The state_dict of the model, or modified state_dict if preprocess is present |
Source code in fedbiomed/common/training_plans/_torchnn.py
def after_training_params(self) -> dict:
"""Retrieve parameters after training is done
Call the user defined postprocess function:
- if provided, the function is part of pytorch model defined by the researcher
- and expect the model parameters as argument
Returns:
The state_dict of the model, or modified state_dict if preprocess is present
"""
# Check whether postprocess method exists, and use it
params = self._model.state_dict()
if hasattr(self, 'postprocess'):
logger.debug("running model.postprocess() method")
try:
params = self.postprocess(self._model.state_dict()) # Post process
except Exception as e:
raise FedbiomedTrainingPlanError(f"{ErrorNumbers.FB605.value}: Error while running post process "
f"{e}")
params = self._dp_controller.after_training(params)
return params
get_learning_rate()
get_learning_rate()
Gets learning rate from value set in optimizer.
Warning
This function gathers the base learning rate applied to the model weights, including alterations due to any LR scheduler. However, it does not catch any adaptive component, e.g. due to RMSProp, Adam or such.
Returns:
Type | Description |
---|---|
List[float] | List[float]: list of single learning rate or multiple learning rates (as many as the number of the layers contained in the model) |
Source code in fedbiomed/common/training_plans/_torchnn.py
def get_learning_rate(self) -> List[float]:
"""Gets learning rate from value set in optimizer.
!!! warning
This function gathers the base learning rate applied to the model weights,
including alterations due to any LR scheduler. However, it does not catch
any adaptive component, e.g. due to RMSProp, Adam or such.
Returns:
List[float]: list of single learning rate or multiple learning rates
(as many as the number of the layers contained in the model)
"""
if self._optimizer is None:
raise FedbiomedTrainingPlanError(f"{ErrorNumbers.FB605.value}: Optimizer not found, please call "
f"`init_optimizer beforehand")
learning_rates = []
params = self._optimizer.param_groups
for param in params:
learning_rates.append(param['lr'])
return learning_rates
get_model_params()
get_model_params()
Source code in fedbiomed/common/training_plans/_torchnn.py
def get_model_params(self) -> OrderedDict:
return self._model.state_dict()
init_model()
abstractmethod
init_model()
Abstract method where model should be defined
Source code in fedbiomed/common/training_plans/_torchnn.py
@abstractmethod
def init_model(self):
"""Abstract method where model should be defined """
pass
init_optimizer()
init_optimizer()
Abstract method for declaring optimizer by default
Source code in fedbiomed/common/training_plans/_torchnn.py
def init_optimizer(self):
"""Abstract method for declaring optimizer by default """
try:
self._optimizer = torch.optim.Adam(self._model.parameters(), **self._optimizer_args)
except AttributeError as e:
raise FedbiomedTrainingPlanError(f"{ErrorNumbers.FB605.value}: Invalid argument for default "
f"optimizer Adam. Error: {e}")
return self._optimizer
initial_parameters()
initial_parameters()
Returns initial parameters without DP or training applied
Returns:
Type | Description |
---|---|
Dict | State dictionary of torch Module |
Source code in fedbiomed/common/training_plans/_torchnn.py
def initial_parameters(self) -> Dict:
"""Returns initial parameters without DP or training applied
Returns:
State dictionary of torch Module
"""
return self._init_params
load(filename, to_params=False)
load(filename, to_params=False)
Load the torch training parameters to this training plan or to a data structure from a file
Parameters:
Name | Type | Description | Default |
---|---|---|---|
filename | str | path to the source file | required |
to_params | bool | if False, load params to this pytorch object; if True load params to a data structure | False |
Returns:
Type | Description |
---|---|
dict | Contains parameters |
Source code in fedbiomed/common/training_plans/_torchnn.py
def load(self, filename: str, to_params: bool = False) -> dict:
"""Load the torch training parameters to this training plan or to a data structure from a file
Args:
filename: path to the source file
to_params: if False, load params to this pytorch object; if True load params to a data structure
Returns:
Contains parameters
"""
params = torch.load(filename)
if to_params is False:
self._model.load_state_dict(params)
return params
model()
model()
Source code in fedbiomed/common/training_plans/_torchnn.py
def model(self):
return self._model
model_args()
model_args()
Retrieves model args
Returns:
Type | Description |
---|---|
Dict | Model arguments arguments |
Source code in fedbiomed/common/training_plans/_torchnn.py
def model_args(self) -> Dict:
"""Retrieves model args
Returns:
Model arguments arguments
"""
return self._model_args
optimizer()
optimizer()
Source code in fedbiomed/common/training_plans/_torchnn.py
def optimizer(self):
return self._optimizer
optimizer_args()
optimizer_args()
Retrieves optimizer arguments
Returns:
Type | Description |
---|---|
Dict | Optimizer arguments |
Source code in fedbiomed/common/training_plans/_torchnn.py
def optimizer_args(self) -> Dict:
"""Retrieves optimizer arguments
Returns:
Optimizer arguments
"""
self.update_optimizer_args() # update `optimizer_args` (eg after training)
return self._optimizer_args
post_init(model_args, training_args, aggregator_args=None)
post_init(model_args, training_args, aggregator_args=None)
Process model, training and optimizer arguments.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
model_args | Dict[str, Any] | Arguments defined to instantiate the wrapped model. | required |
training_args | TrainingArgs | Arguments that are used in training routines such as epoch, dry_run etc. Please see | required |
aggregator_args | Optional[Dict[str, Any]] | Arguments managed by and shared with the researcher-side aggregator. | None |
Raises:
Type | Description |
---|---|
FedbiomedTrainingPlanError | If the provided arguments do not match expectations, or if the optimizer, model and dependencies configuration goes wrong. |
Source code in fedbiomed/common/training_plans/_torchnn.py
def post_init(
self,
model_args: Dict[str, Any],
training_args: TrainingArgs,
aggregator_args: Optional[Dict[str, Any]] = None,
) -> None:
"""Process model, training and optimizer arguments.
Args:
model_args: Arguments defined to instantiate the wrapped model.
training_args: Arguments that are used in training routines
such as epoch, dry_run etc.
Please see [`TrainingArgs`][fedbiomed.common.training_args.TrainingArgs]
aggregator_args: Arguments managed by and shared with the
researcher-side aggregator.
Raises:
FedbiomedTrainingPlanError: If the provided arguments do not
match expectations, or if the optimizer, model and dependencies
configuration goes wrong.
"""
self._model_args = model_args
self._optimizer_args = training_args.optimizer_arguments() or {}
self._training_args = training_args.pure_training_arguments()
self._use_gpu = self._training_args.get('use_gpu')
self._batch_maxnum = self._training_args.get('batch_maxnum')
self._log_interval = self._training_args.get('log_interval')
self._epochs = self._training_args.get('epochs')
self._num_updates = self._training_args.get('num_updates', 1)
self._dry_run = self._training_args.get('dry_run')
# aggregator args
self._fedprox_mu = self._training_args.get('fedprox_mu')
# TODO: put fedprox mu inside strategy_args
self._aggregator_args = aggregator_args or {}
self.set_aggregator_args(self._aggregator_args)
# self.aggregator_name = self._aggregator_args.get('aggregator_name')
# FIXME: we should have a AggregatorHandler that handles aggregator args
self._dp_controller = DPController(training_args.dp_arguments() or None)
# Add dependencies
self._configure_dependencies()
# Configure model and optimizer
self._configure_model_and_optimizer()
predict(data)
predict(data)
Return model predictions for a given batch of input features.
This method is called as part of testing_routine
, to compute predictions based on which evaluation metrics are computed. It will however be skipped if a testing_step
method is attached to the training plan, than wraps together a custom routine to compute an output metric directly from a (data, target) batch.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data | Any | Array-like (or tensor) structure containing batched input features. | required |
Returns:
Type | Description |
---|---|
np.ndarray | np.ndarray: Output predictions, converted to a numpy array (as per the |
Source code in fedbiomed/common/training_plans/_torchnn.py
def predict(
self,
data: Any,
) -> np.ndarray:
"""Return model predictions for a given batch of input features.
This method is called as part of `testing_routine`, to compute
predictions based on which evaluation metrics are computed. It
will however be skipped if a `testing_step` method is attached
to the training plan, than wraps together a custom routine to
compute an output metric directly from a (data, target) batch.
Args:
data: Array-like (or tensor) structure containing batched
input features.
Returns:
np.ndarray: Output predictions, converted to a numpy array
(as per the `fedbiomed.common.metrics.Metrics` specs).
"""
with torch.no_grad():
pred = self._model(data)
return pred.numpy()
save(filename, params=None)
save(filename, params=None)
Save the torch training parameters from this training plan or from given params
to a file
Parameters:
Name | Type | Description | Default |
---|---|---|---|
filename | str | Path to the destination file | required |
params | dict | Parameters to save to a file, should be structured as a torch state_dict() | None |
Source code in fedbiomed/common/training_plans/_torchnn.py
def save(self, filename: str, params: dict = None) -> None:
"""Save the torch training parameters from this training plan or from given `params` to a file
Args:
filename (str): Path to the destination file
params (dict): Parameters to save to a file, should be structured as a torch state_dict()
"""
if params is not None:
return torch.save(params, filename)
else:
return torch.save(self._model.state_dict(), filename)
send_to_device(to_send, device)
send_to_device(to_send, device)
Send inputs to correct device for training.
Recursively traverses lists, tuples and dicts until it meets a torch Tensor, then sends the Tensor to the specified device.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
to_send | Union[torch.Tensor, list, tuple, dict] | the data to be sent to the device. | required |
device | torch.device | the device to send the data to. | required |
Raises:
Type | Description |
---|---|
FedbiomedTrainingPlanError | when to_send is not the correct type |
Source code in fedbiomed/common/training_plans/_torchnn.py
def send_to_device(self,
to_send: Union[torch.Tensor, list, tuple, dict],
device: torch.device
):
"""Send inputs to correct device for training.
Recursively traverses lists, tuples and dicts until it meets a torch Tensor, then sends the Tensor
to the specified device.
Args:
to_send: the data to be sent to the device.
device: the device to send the data to.
Raises:
FedbiomedTrainingPlanError: when to_send is not the correct type
"""
if isinstance(to_send, torch.Tensor):
return to_send.to(device)
elif isinstance(to_send, dict):
return {key: self.send_to_device(val, device) for key, val in to_send.items()}
elif isinstance(to_send, tuple):
return tuple(self.send_to_device(d, device) for d in to_send)
elif isinstance(to_send, list):
return [self.send_to_device(d, device) for d in to_send]
else:
raise FedbiomedTrainingPlanError(f'{ErrorNumbers.FB310.value} cannot send data to device. '
f'Data must be a torch Tensor or a list, tuple or dict '
f'ultimately containing Tensors.')
set_aggregator_args(aggregator_args)
set_aggregator_args(aggregator_args)
Handles and loads aggregators arguments sent through MQTT and file exchanged system. If sent through file exchanged system, loads the arguments.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
aggregator_args | Dict[str, Any] | dictionary mapping aggregator argument name with its value (eg | required |
Source code in fedbiomed/common/training_plans/_torchnn.py
def set_aggregator_args(self, aggregator_args: Dict[str, Any]):
"""Handles and loads aggregators arguments sent through MQTT and
file exchanged system. If sent through file exchanged system, loads the arguments.
Args:
aggregator_args (Dict[str, Any]): dictionary mapping aggregator argument name with its value (eg
'aggregator_correction' with correction states)
"""
self.aggregator_name = aggregator_args.get('aggregator_name') or self.aggregator_name
for arg_name, aggregator_arg in aggregator_args.items():
if arg_name == 'aggregator_correction' and aggregator_arg.get('param_path', False):
# FIXME: this is too specific to Scaffold. Should be redesigned, or handled
# by an aggregator handler that contains all keys for all strategies implemented
# in fedbiomed
# setattr(self, arg_name, aggregator_arg)
# here we ae loading all args that have been sent from file exchange system
self.correction_state = torch.load(aggregator_arg.get('param_path'))
testing_routine(metric, metric_args, history_monitor, before_train)
testing_routine(metric, metric_args, history_monitor, before_train)
Evaluation routine, to be called once per round.
Note
If the training plan implements a testing_step
method (the signature of which is func(data, target) -> metrics) then it will be used rather than the input metric.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
metric | Optional[MetricTypes] | The metric used for validation. If None, use MetricTypes.ACCURACY. | required |
history_monitor | Optional[HistoryMonitor] | HistoryMonitor instance, used to record computed metrics and communicate them to the researcher (server). | required |
before_train | bool | Whether the evaluation is being performed before local training occurs, of afterwards. This is merely reported back through | required |
Source code in fedbiomed/common/training_plans/_torchnn.py
def testing_routine(
self,
metric: Optional[MetricTypes],
metric_args: Dict[str, Any],
history_monitor: Optional['HistoryMonitor'],
before_train: bool
) -> None:
"""Evaluation routine, to be called once per round.
!!! info "Note"
If the training plan implements a `testing_step` method
(the signature of which is func(data, target) -> metrics)
then it will be used rather than the input metric.
Args:
metric: The metric used for validation.
If None, use MetricTypes.ACCURACY.
history_monitor: HistoryMonitor instance,
used to record computed metrics and communicate them to
the researcher (server).
before_train: Whether the evaluation is being performed
before local training occurs, of afterwards. This is merely
reported back through `history_monitor`.
"""
if not isinstance(self._model, torch.nn.Module):
msg = (
f"{ErrorNumbers.FB320.value}: model should be a torch "
f"nn.Module, but is of type {type(self._model)}"
)
logger.critical(msg)
raise FedbiomedTrainingPlanError(msg)
try:
self._model.eval() # pytorch switch for model inference-mode
with torch.no_grad():
super().testing_routine(
metric, metric_args, history_monitor, before_train
)
finally:
self._model.train() # restore training behaviors
training_args()
training_args()
Retrieves training args
Returns:
Type | Description |
---|---|
Dict | Training arguments |
Source code in fedbiomed/common/training_plans/_torchnn.py
def training_args(self) -> Dict:
"""Retrieves training args
Returns:
Training arguments
"""
return self._training_args
training_data()
abstractmethod
training_data()
Abstract method to return training data
Source code in fedbiomed/common/training_plans/_torchnn.py
@abstractmethod
def training_data(self):
"""Abstract method to return training data"""
pass
training_routine(history_monitor=None, node_args=None)
training_routine(history_monitor=None, node_args=None)
Training routine procedure.
End-user should define;
- a
training_data()
function defining how sampling / handling data in node's dataset is done. It should return a generator able to output tuple (batch_idx, (data, targets)) that is iterable for each batch. - a
training_step()
function defining how cost is computed. It should output loss values for backpropagation.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
history_monitor | Any | Monitor handler for real-time feed. Defined by the Node and can't be overwritten | None |
node_args | Union[dict, None] | command line arguments for node. Can include: - | None |
Source code in fedbiomed/common/training_plans/_torchnn.py
def training_routine(self,
history_monitor: Any = None,
node_args: Union[dict, None] = None,
):
# FIXME: add betas parameters for ADAM solver + momentum for SGD
# FIXME 2: remove parameters specific for validation specified in the
# training routine
"""Training routine procedure.
End-user should define;
- a `training_data()` function defining how sampling / handling data in node's dataset is done. It should
return a generator able to output tuple (batch_idx, (data, targets)) that is iterable for each batch.
- a `training_step()` function defining how cost is computed. It should output loss values for backpropagation.
Args:
history_monitor: Monitor handler for real-time feed. Defined by the Node and can't be overwritten
node_args: command line arguments for node. Can include:
- `gpu (bool)`: propose use a GPU device if any is available. Default False.
- `gpu_num (Union[int, None])`: if not None, use the specified GPU device instead of default
GPU device if this GPU device is available. Default None.
- `gpu_only (bool)`: force use of a GPU device if any available, even if researcher
doesn't request for using a GPU. Default False.
"""
self._model.train() # pytorch switch for training
# set correct type for node args
node_args = {} if not isinstance(node_args, dict) else node_args
# send all model to device, ensures having all the requested tensors
self._set_device(self._use_gpu, node_args)
self._model.to(self._device)
# Run preprocess when everything is ready before the training
self._preprocess()
# initial aggregated model parameters
self._init_params = deepcopy(list(self._model.parameters()))
# DP actions
self._model, self._optimizer, self.training_data_loader = \
self._dp_controller.before_training(self._model, self._optimizer, self.training_data_loader)
# set number of training loop iterations
iterations_accountant = MiniBatchTrainingIterationsAccountant(self)
# Training loop iterations
for epoch in iterations_accountant.iterate_epochs():
training_data_iter: Iterator = iter(self.training_data_loader)
for batch_idx in iterations_accountant.iterate_batches():
# retrieve data and target
data, target = next(training_data_iter)
# update accounting for number of observed samples
batch_size = self._infer_batch_size(data)
iterations_accountant.increment_sample_counters(batch_size)
# handle training on accelerator devices
data, target = self.send_to_device(data, self._device), self.send_to_device(target, self._device)
# train this batch
corrected_loss, loss = self._train_over_batch(data, target)
# Reporting
if iterations_accountant.should_log_this_batch():
# Retrieve reporting information: semantics differ whether num_updates or epochs were specified
num_samples, num_samples_max = iterations_accountant.reporting_on_num_samples()
num_iter, num_iter_max = iterations_accountant.reporting_on_num_iter()
epoch_to_report = iterations_accountant.reporting_on_epoch()
logger.debug('Train {}| '
'Iteration {}/{} | '
'Samples {}/{} ({:.0f}%)\tLoss: {:.6f}'.format(
f'Epoch: {epoch_to_report} ' if epoch_to_report is not None else '',
num_iter,
num_iter_max,
num_samples,
num_samples_max,
100. * num_iter / num_iter_max,
loss.item()))
# Send scalar values via general/feedback topic
if history_monitor is not None:
# the researcher only sees the average value of samples observed until now
history_monitor.add_scalar(metric={'Loss': loss.item()},
iteration=num_iter,
epoch=epoch_to_report,
train=True,
num_samples_trained=num_samples,
num_batches=num_iter_max,
total_samples=num_samples_max,
batch_samples=batch_size)
# Handle dry run mode
if self._dry_run:
self._model.to(self._device_init)
torch.cuda.empty_cache()
return
# release gpu usage as much as possible though:
# - it should be done by deleting the object
# - and some gpu memory remains used until process (cuda kernel ?) finishes
self._model.to(self._device_init)
torch.cuda.empty_cache()
training_step()
abstractmethod
training_step()
Abstract method, all subclasses must provide a training_step.
Source code in fedbiomed/common/training_plans/_torchnn.py
@abstractmethod
def training_step(self):
"""Abstract method, all subclasses must provide a training_step.
"""
pass
type()
type()
Gets training plan type
Source code in fedbiomed/common/training_plans/_torchnn.py
def type(self) -> TrainingPlans.TorchTrainingPlan:
""" Gets training plan type"""
return self.__type
update_optimizer_args()
update_optimizer_args()
Updates _optimizer_args
variable. Can prove useful to retrieve optimizer parameters after having trained a model, parameters which may have changed during training (eg learning rate).
Updated arguments
- learning_rate
Returns:
Name | Type | Description |
---|---|---|
Dict | Dict | updated |
Source code in fedbiomed/common/training_plans/_torchnn.py
def update_optimizer_args(self) -> Dict:
"""
Updates `_optimizer_args` variable. Can prove useful
to retrieve optimizer parameters after having trained a
model, parameters which may have changed during training (eg learning rate).
Updated arguments:
- learning_rate
Returns:
Dict: updated `_optimizer_args`
"""
if self._optimizer_args is None:
self._optimizer_args = {}
self._optimizer_args['lr'] = self.get_learning_rate()
return self._optimizer_args
MiniBatchTrainingIterationsAccountant
Accounting class for keeping track of training iterations.
This class has the following responsibilities
- manage iterators for epochs and batches
- provide up-to-date values for reporting
- handle different semantics in case the researcher asked for num_updates or epochs
We assume that the underlying implementation for the training loop is always made in terms of epochs and batches. So the primary purpose of this class is to provide a way to correctly convert the number of updates into epochs and batches.
For reporting purposes, in the case of num_updates then we think of the training as a single big loop, while in the case of epochs and batches we think of it as two nested loops. This changes the meaning of the values outputted by the reporting functions (see their docstrings for more details).
Attributes:
Name | Type | Description |
---|---|---|
_training_plan | a reference to the training plan executing the training iterations | |
cur_epoch | int | the index of the current epoch during iterations |
cur_batch | int | the index of the current batch during iterations |
epochs | int | the total number of epochs to be performed (we always perform one additional -- possibly empty -- epoch |
num_batches_per_epoch | int | the number of iterations per epoch |
num_batches_in_last_epoch | int | the number of iterations in the last epoch (can be zero) |
num_samples_observed_in_epoch | int | a counter for the number of samples observed in the current epoch, for reporting |
num_samples_observed_in_total | int | a counter for the number of samples observed total, for reporting |
Parameters:
Name | Type | Description | Default |
---|---|---|---|
training_plan | TBaseTrainingPlan | a reference to the training plan that is executing the training iterations | required |
Source code in fedbiomed/common/training_plans/_training_iterations.py
def __init__(self, training_plan: TBaseTrainingPlan):
"""Initialize the class.
Arguments:
training_plan: a reference to the training plan that is executing the training iterations
"""
self._training_plan = training_plan
self.cur_epoch: int = 0
self.cur_batch: int = 0
self.epochs: int = 0
self.num_batches_per_epoch: int = 0
self.num_batches_in_last_epoch: int = 0
self.num_samples_observed_in_epoch: int = 0
self.num_samples_observed_in_total: int = 0
self._n_training_iterations()
Attributes
cur_batch instance-attribute
cur_batch: int = 0
cur_epoch instance-attribute
cur_epoch: int = 0
epochs instance-attribute
epochs: int = 0
num_batches_in_last_epoch instance-attribute
num_batches_in_last_epoch: int = 0
num_batches_per_epoch instance-attribute
num_batches_per_epoch: int = 0
num_samples_observed_in_epoch instance-attribute
num_samples_observed_in_epoch: int = 0
num_samples_observed_in_total instance-attribute
num_samples_observed_in_total: int = 0
Classes
BatchIter
BatchIter(accountant)
Iterator over batches.
Attributes:
Name | Type | Description |
---|---|---|
_accountant | an instance of the class that created this iterator |
Source code in fedbiomed/common/training_plans/_training_iterations.py
def __init__(self, accountant: TTrainingIterationsAccountant):
self._accountant = accountant
EpochsIter
EpochsIter(accountant)
Iterator over epochs.
Attributes:
Name | Type | Description |
---|---|---|
_accountant | an instance of the class that created this iterator |
Source code in fedbiomed/common/training_plans/_training_iterations.py
def __init__(self, accountant: TTrainingIterationsAccountant):
self._accountant = accountant
Functions
increment_sample_counters(n_samples)
increment_sample_counters(n_samples)
Increments internal counter for numbers of observed samples
Source code in fedbiomed/common/training_plans/_training_iterations.py
def increment_sample_counters(self, n_samples: int):
"""Increments internal counter for numbers of observed samples"""
self.num_samples_observed_in_epoch += n_samples
self.num_samples_observed_in_total += n_samples
iterate_batches()
iterate_batches()
Returns an instance of a batches iterator.
Source code in fedbiomed/common/training_plans/_training_iterations.py
def iterate_batches(self):
"""Returns an instance of a batches iterator."""
return MiniBatchTrainingIterationsAccountant.BatchIter(self)
iterate_epochs()
iterate_epochs()
Returns an instance of an epochs iterator.
Source code in fedbiomed/common/training_plans/_training_iterations.py
def iterate_epochs(self):
"""Returns an instance of an epochs iterator."""
return MiniBatchTrainingIterationsAccountant.EpochsIter(self)
num_batches_in_this_epoch()
num_batches_in_this_epoch()
Returns the number of iterations to be performed in the current epoch
Source code in fedbiomed/common/training_plans/_training_iterations.py
def num_batches_in_this_epoch(self) -> int:
"""Returns the number of iterations to be performed in the current epoch"""
if self.cur_epoch == self.epochs:
return self.num_batches_in_last_epoch
else:
return self.num_batches_per_epoch
reporting_on_epoch()
reporting_on_epoch()
Returns the optional index of the current epoch, for reporting.
Source code in fedbiomed/common/training_plans/_training_iterations.py
def reporting_on_epoch(self) -> Optional[int]:
"""Returns the optional index of the current epoch, for reporting."""
if self._training_plan.training_args()['num_updates'] is not None:
return None
else:
return self.cur_epoch
reporting_on_num_iter()
reporting_on_num_iter()
Outputs useful reporting information about the number of iterations
If the researcher specified num_updates, then the iteration number will be the cumulated total, and similarly the maximum number of iterations will be equal to the requested number of updates. If the researcher specified epochs, then the iteration number will be the batch index in the current epoch, while the maximum number of iterations will be computed specifically for the current epoch.
Returns:
Type | Description |
---|---|
int | the iteration number |
int | the maximum number of iterations to be reported |
Source code in fedbiomed/common/training_plans/_training_iterations.py
def reporting_on_num_iter(self) -> Tuple[int, int]:
"""Outputs useful reporting information about the number of iterations
If the researcher specified num_updates, then the iteration number will be the cumulated total, and
similarly the maximum number of iterations will be equal to the requested number of updates.
If the researcher specified epochs, then the iteration number will be the batch index in the current epoch,
while the maximum number of iterations will be computed specifically for the current epoch.
Returns:
the iteration number
the maximum number of iterations to be reported
"""
if self._training_plan.training_args()['num_updates'] is not None:
num_iter = (self.cur_epoch - 1) * self.num_batches_per_epoch + self.cur_batch
total_batches_to_be_observed = (self.epochs - 1) * self.num_batches_per_epoch + \
self.num_batches_in_last_epoch
num_iter_max = total_batches_to_be_observed
else:
num_iter = self.cur_batch
num_iter_max = self.num_batches_per_epoch
return num_iter, num_iter_max
reporting_on_num_samples()
reporting_on_num_samples()
Outputs useful reporting information about the number of observed samples
If the researcher specified num_updates, then the number of observed samples will be the grand total, and similarly the maximum number of samples will be the grand total over all iterations. If the researcher specified epochs, then both values will be specific to the current epoch.
Returns:
Type | Description |
---|---|
int | the number of samples observed until the current iteration |
int | the maximum number of samples to be observed |
Source code in fedbiomed/common/training_plans/_training_iterations.py
def reporting_on_num_samples(self) -> Tuple[int, int]:
"""Outputs useful reporting information about the number of observed samples
If the researcher specified num_updates, then the number of observed samples will be the grand total, and
similarly the maximum number of samples will be the grand total over all iterations.
If the researcher specified epochs, then both values will be specific to the current epoch.
Returns:
the number of samples observed until the current iteration
the maximum number of samples to be observed
"""
if self._training_plan.training_args()['num_updates'] is not None:
num_samples = self.num_samples_observed_in_total
total_batches_to_be_observed = (self.epochs - 1) * self.num_batches_per_epoch + \
self.num_batches_in_last_epoch
total_n_samples_to_be_observed = \
self._training_plan.training_args()['batch_size'] * total_batches_to_be_observed
num_samples_max = total_n_samples_to_be_observed
else:
num_samples = self.num_samples_observed_in_epoch
num_samples_max = self._training_plan.training_args()['batch_size']*self.num_batches_in_this_epoch() if \
self.cur_batch < self.num_batches_in_this_epoch() else num_samples
return num_samples, num_samples_max
should_log_this_batch()
should_log_this_batch()
Whether the current batch should be logged or not.
A batch shall be logged if at least one of the following conditions is True
- the cumulative batch index is a multiple of the logging interval
- the dry_run condition was specified by the researcher
- it is the last batch of the epoch
- it is the first batch of the epoch
Source code in fedbiomed/common/training_plans/_training_iterations.py
def should_log_this_batch(self) -> bool:
"""Whether the current batch should be logged or not.
A batch shall be logged if at least one of the following conditions is True:
- the cumulative batch index is a multiple of the logging interval
- the dry_run condition was specified by the researcher
- it is the last batch of the epoch
- it is the first batch of the epoch
"""
current_iter = (self.cur_epoch - 1) * self.num_batches_per_epoch + self.cur_batch
return (current_iter % self._training_plan.training_args()['log_interval'] == 0 or
self._training_plan.training_args()['dry_run'] or
self.cur_batch >= self.num_batches_in_this_epoch() or # last batch
self.cur_batch == 1) # first batch