fedbiomed.researcher.experiment
Module:fedbiomed.researcher.experiment
Code of the researcher. Implements the experiment orchestration
Attributes
TrainingPlan module-attribute
TrainingPlan = TypeVar(
"TrainingPlan", TorchTrainingPlan, SKLearnTrainingPlan
)
Type_TrainingPlan module-attribute
Type_TrainingPlan = TypeVar(
"Type_TrainingPlan",
Type[TorchTrainingPlan],
Type[SKLearnTrainingPlan],
)
training_plans module-attribute
training_plans = (TorchTrainingPlan, SKLearnTrainingPlan)
Classes
Experiment
Experiment(
tags=None,
nodes=None,
training_data=None,
aggregator=None,
node_selection_strategy=None,
round_limit=None,
training_plan_class=None,
training_plan_path=None,
model_args={},
training_args=None,
save_breakpoints=False,
tensorboard=False,
experimentation_folder=None,
use_secagg=False,
secagg_timeout=0,
)
Bases: object
This class represents the orchestrator managing the federated training
Parameters:
Name | Type | Description | Default |
---|---|---|---|
tags | Union[List[str], str, None] | list of string with data tags or string with one data tag. Empty list of tags ([]) means any dataset is accepted, it is different from None (tags not set, cannot search for training_data yet). | None |
nodes | Union[List[str], None] | list of node_ids to filter the nodes to be involved in the experiment. Defaults to None (no filtering). | None |
training_data | Union[FederatedDataSet, dict, None] |
| None |
aggregator | Union[Aggregator, Type[Aggregator], None] | object or class defining the method for aggregating local updates. Default to None (use | None |
node_selection_strategy | Union[Strategy, Type[Strategy], None] | object or class defining how nodes are sampled at each round for training, and how non-responding nodes are managed. Defaults to None: - use | None |
round_limit | Union[int, None] | the maximum number of training rounds (nodes <-> central server) that should be executed for the experiment. | None |
training_plan_class | Union[Type_TrainingPlan, str, None] | name of the training plan class | None |
training_plan_path | Union[str, None] | path to a file containing training plan code | None |
model_args | dict | contains model arguments passed to the constructor of the training plan when instantiating it : output and input feature dimension, etc. | {} |
training_args | Union[TypeVar(TrainingArgs), dict, None] | contains training arguments passed to the | None |
save_breakpoints | bool | whether to save breakpoints or not after each training round. Breakpoints can be used for resuming a crashed experiment. | False |
tensorboard | bool | whether to save scalar values for displaying in Tensorboard during training for each node. Currently, it is only used for loss values. - If it is true, monitor instantiates a | False |
experimentation_folder | Union[str, None] | choose a specific name for the folder where experimentation result files and breakpoints are stored. This should just contain the name for the folder not a path. The name is used as a subdirectory of | None |
use_secagg | bool | whether to setup a secure aggregation context for this experiment, and use it to send encrypted updates from nodes to researcher. Defaults to | False |
secagg_timeout | float | when | 0 |
Source code in fedbiomed/researcher/experiment.py
@exp_exceptions
def __init__(self,
tags: Union[List[str], str, None] = None,
nodes: Union[List[str], None] = None,
training_data: Union[FederatedDataSet, dict, None] = None,
aggregator: Union[Aggregator, Type[Aggregator], None] = None,
node_selection_strategy: Union[Strategy, Type[Strategy], None] = None,
round_limit: Union[int, None] = None,
training_plan_class: Union[Type_TrainingPlan, str, None] = None,
training_plan_path: Union[str, None] = None,
model_args: dict = {},
training_args: Union[TypeVar("TrainingArgs"), dict, None] = None,
save_breakpoints: bool = False,
tensorboard: bool = False,
experimentation_folder: Union[str, None] = None,
use_secagg: bool = False,
secagg_timeout: float = 0
):
"""Constructor of the class.
Args:
tags: list of string with data tags or string with one data tag. Empty list of tags ([]) means any dataset
is accepted, it is different from None (tags not set, cannot search for training_data yet).
nodes: list of node_ids to filter the nodes to be involved in the experiment. Defaults to None (no
filtering).
training_data:
* If it is a FederatedDataSet object, use this value as training_data.
* else if it is a dict, create and use a FederatedDataSet object from the dict and use this value as
training_data. The dict should use node ids as keys, values being list of dicts (each dict
representing a dataset on a node).
* else if it is None (no training data provided)
- if `tags` is not None, set training_data by
searching for datasets with a query to the nodes using `tags` and `nodes`
- if `tags` is None, set training_data to None (no training_data set yet,
experiment is not fully initialized and cannot be launched)
Defaults to None (query nodes for dataset if `tags` is not None, set training_data
to None else)
aggregator: object or class defining the method for aggregating local updates. Default to None (use
[`FedAverage`][fedbiomed.researcher.aggregators.FedAverage] for aggregation)
node_selection_strategy:object or class defining how nodes are sampled at each round for training, and how
non-responding nodes are managed. Defaults to None:
- use [`DefaultStrategy`][fedbiomed.researcher.strategies.DefaultStrategy] if training_data is
initialized
- else strategy is None (cannot be initialized), experiment cannot be launched yet
round_limit: the maximum number of training rounds (nodes <-> central server) that should be executed for
the experiment. `None` means that no limit is defined. Defaults to None.
training_plan_class: name of the training plan class [`str`][str] or training plan class
(`Type_TrainingPlan`) to use for training.
For experiment to be properly and fully defined `training_plan_class` needs to be:
- a [`str`][str] when `training_plan_class_path` is not None (training plan class comes from a file).
- a `Type_TrainingPlan` when `training_plan_class_path` is None (training plan class passed
as argument).
Defaults to None (no training plan class defined yet)
training_plan_path: path to a file containing training plan code [`str`][str] or None (no file containing
training plan code, `training_plan` needs to be a class matching `Type_TrainingPlan`) Defaults to None.
model_args: contains model arguments passed to the constructor of the training plan when instantiating it :
output and input feature dimension, etc.
training_args: contains training arguments passed to the `training_routine` of the training plan when
launching it: lr, epochs, batch_size...
save_breakpoints: whether to save breakpoints or not after each training round. Breakpoints can be used for
resuming a crashed experiment.
tensorboard: whether to save scalar values for displaying in Tensorboard during training for each node.
Currently, it is only used for loss values.
- If it is true, monitor instantiates a `Monitor` object that write scalar logs into `./runs` directory.
- If it is False, it stops monitoring if it was active.
experimentation_folder: choose a specific name for the folder where experimentation result files and
breakpoints are stored. This should just contain the name for the folder not a path. The name is used
as a subdirectory of `environ[EXPERIMENTS_DIR])`. Defaults to None (auto-choose a folder name)
- Caveat : if using a specific name this experimentation will not be automatically detected as the last
experimentation by `load_breakpoint`
- Caveat : do not use a `experimentation_folder` name finishing with numbers ([0-9]+) as this would
confuse the last experimentation detection heuristic by `load_breakpoint`.
use_secagg: whether to setup a secure aggregation context for this experiment, and use it
to send encrypted updates from nodes to researcher. Defaults to `False`
secagg_timeout: when `use_secagg` is `True`, maximum duration for the setup phase of each
secagg context element (server key and biprime), thus total secagg setup is twice the `timeout`.
Defaults to `environ['TIMEOUT']` if unset or equals 0.
"""
# predefine all class variables, so no need to write try/except
# block each time we use it
self._fds = None
self._node_selection_strategy = None
self._job = None
self._round_limit = None
self._training_plan_path = None
self._reqs = None
self._training_args = None
self._node_selection_strategy = None
self._tags = None
self._monitor = None
self._experimentation_folder = None
self.aggregator_args = {}
self._aggregator = None
self._global_model = None
self._client_correction_states_dict = {}
self._client_states_dict = {}
self._server_state = None
# training_data: Union[FederatedDataSet, dict, None] = None,
# aggregator: Union[Aggregator, Type[Aggregator], None] = None,
# node_selection_strategy: Union[Strategy, Type[Strategy], None] = None,
# model_class: Union[Type_TrainingPlan, str, None] = None,
# model_args: dict = {},
# training_args: dict = {},
# save_breakpoints: bool = False,
# tensorboard: bool = False,
# experimentation_folder: Union[str, None] = None
self._use_secagg = False
self._secagg_servkey = None
self._secagg_biprime = None
# use_secagg: bool = False,
# secagg_timeout: float = 0
# set self._tags and self._nodes
self.set_tags(tags)
self.set_nodes(nodes)
# set self._model_args and self._training_args to dict
self.set_model_args(model_args)
self.set_training_args(training_args)
# Useless to add a param and setter/getter for Requests() as it is a singleton ?
self._reqs = Requests()
# set self._fds: type Union[FederatedDataSet, None]
self.set_training_data(training_data, True)
# set self._aggregator : type Aggregator
self.set_aggregator(aggregator)
# set self._node_selection_strategy: type Union[Strategy, None]
self.set_strategy(node_selection_strategy)
# "current" means number of rounds already trained
self._set_round_current(0)
self.set_round_limit(round_limit)
# set self._experimentation_folder: type str
self.set_experimentation_folder(experimentation_folder)
# Note: currently keep this parameter as it cannot be updated in Job()
# without refactoring Job() first
# sets self._training_plan_is_defined: bool == is the training plan properly defined ?
# with current version of jobs, a correctly defined model requires:
# - either training_plan_path to None + training_plan_class is the class a training plan
# - or training_plan_path not None + training_plan_class is a name (str) of a training plan
#
# note: no need to set self._training_plan_is_defined before calling `set_training_plan_class`
self.set_training_plan_class(training_plan_class)
self.set_training_plan_path(training_plan_path)
# set self._job to Union[Job, None]
self.set_job()
# TODO: rewrite after experiment results refactoring
self._aggregated_params = {}
self.set_save_breakpoints(save_breakpoints)
# always create a monitoring process
self._monitor = Monitor()
self._reqs.add_monitor_callback(self._monitor.on_message_handler)
self.set_tensorboard(tensorboard)
self.set_use_secagg(use_secagg, secagg_timeout)
Attributes
aggregator_args instance-attribute
aggregator_args = {}
Functions
aggregated_params()
aggregated_params()
Retrieves all aggregated parameters of each round of training
Returns:
Type | Description |
---|---|
dict | Dictionary of aggregated parameters keys stand for each round of training |
Source code in fedbiomed/researcher/experiment.py
@exp_exceptions
def aggregated_params(self) -> dict:
"""Retrieves all aggregated parameters of each round of training
Returns:
Dictionary of aggregated parameters keys stand for each round of training
"""
return self._aggregated_params
aggregator()
aggregator()
Retrieves aggregator class that will be used for aggregating model parameters.
To set or update aggregator: set_aggregator
.
Returns:
Type | Description |
---|---|
Aggregator | A class or an object that is an instance of Aggregator |
Source code in fedbiomed/researcher/experiment.py
@exp_exceptions
def aggregator(self) -> Aggregator:
""" Retrieves aggregator class that will be used for aggregating model parameters.
To set or update aggregator: [`set_aggregator`][fedbiomed.researcher.experiment.Experiment.set_aggregator].
Returns:
A class or an object that is an instance of [Aggregator][fedbiomed.researcher.aggregators.Aggregator]
"""
return self._aggregator
breakpoint()
breakpoint()
Saves breakpoint with the state of the training at a current round. The following Experiment attributes will
be saved
- round_current
- round_limit
- tags
- experimentation_folder
- aggregator
- node_selection_strategy
- training_data
- training_args
- model_args
- training_plan_path
- training_plan_class
- aggregated_params
- job (attributes returned by the Job, aka job state)
- use_secagg
- secagg_servkey
- secagg_biprime
Raises:
Type | Description |
---|---|
FedbiomedExperimentError | experiment not fully defined, experiment did not run any round yet, or error when saving breakpoint |
Source code in fedbiomed/researcher/experiment.py
@exp_exceptions
def breakpoint(self) -> None:
"""
Saves breakpoint with the state of the training at a current round. The following Experiment attributes will
be saved:
- round_current
- round_limit
- tags
- experimentation_folder
- aggregator
- node_selection_strategy
- training_data
- training_args
- model_args
- training_plan_path
- training_plan_class
- aggregated_params
- job (attributes returned by the Job, aka job state)
- use_secagg
- secagg_servkey
- secagg_biprime
Raises:
FedbiomedExperimentError: experiment not fully defined, experiment did not run any round yet, or error when
saving breakpoint
"""
# at this point, we run the constructor so all object variables are defined
# check pre-requisistes for saving a breakpoint
#
# need to have run at least 1 round to save a breakpoint
if self._round_current < 1:
msg = ErrorNumbers.FB413.value + \
' - need to run at least 1 before saving a breakpoint'
logger.critical(msg)
raise FedbiomedExperimentError(msg)
elif self._fds is None:
msg = ErrorNumbers.FB413.value + \
' - need to define `training_data` for saving a breakpoint'
logger.critical(msg)
raise FedbiomedExperimentError(msg)
elif self._node_selection_strategy is None:
msg = ErrorNumbers.FB413.value + \
' - need to define `strategy` for saving a breakpoint'
logger.critical(msg)
raise FedbiomedExperimentError(msg)
elif self._job is None:
msg = ErrorNumbers.FB413.value + \
' - need to define `job` for saving a breakpoint'
logger.critical(msg)
raise FedbiomedExperimentError(msg)
# conditions are met, save breakpoint
breakpoint_path, breakpoint_file_name = \
choose_bkpt_file(self._experimentation_folder, self._round_current - 1)
# prepare secagg contexts for saving
if isinstance(self._secagg_servkey, SecaggContext):
secagg_servkey = self._secagg_servkey.save_state()
else:
secagg_servkey = None
if isinstance(self._secagg_biprime, SecaggContext):
secagg_biprime = self._secagg_biprime.save_state()
else:
secagg_biprime = None
state = {
'training_data': self._fds.data(),
'training_args': self._training_args.dict(),
'model_args': self._model_args,
'training_plan_path': self._job.training_plan_file, # only in Job we always model saved to a file
# with current version
'training_plan_class': self._job.training_plan_name, # not always available properly
# formatted in Experiment with current version
'round_current': self._round_current,
'round_limit': self._round_limit,
'experimentation_folder': self._experimentation_folder,
'aggregator': self._aggregator.save_state(self._job.training_plan, breakpoint_path, global_model=self._global_model), # aggregator state
'node_selection_strategy': self._node_selection_strategy.save_state(),
# strategy state
'tags': self._tags,
'aggregated_params': self._save_aggregated_params(
self._aggregated_params, breakpoint_path),
'job': self._job.save_state(breakpoint_path), # job state
'use_secagg': self._use_secagg,
'secagg_servkey': secagg_servkey,
'secagg_biprime': secagg_biprime
}
# rewrite paths in breakpoint : use the links in breakpoint directory
state['training_plan_path'] = create_unique_link(
breakpoint_path,
# - Need a file with a restricted characters set in name to be able to import as module
'model_' + str("{:04d}".format(self._round_current - 1)), '.py',
# - Prefer relative path, eg for using experiment result after
# experiment in a different tree
os.path.join('..', os.path.basename(state["training_plan_path"]))
)
# save state into a json file.
breakpoint_file_path = os.path.join(breakpoint_path, breakpoint_file_name)
try:
with open(breakpoint_file_path, 'w') as bkpt:
json.dump(state, bkpt)
logger.info(f"breakpoint for round {self._round_current - 1} saved at " +
os.path.dirname(breakpoint_file_path))
except (OSError, ValueError, TypeError, RecursionError) as e:
# - OSError: heuristic for catching open() and write() errors
# - see json.dump() documentation for documented errors for this call
msg = ErrorNumbers.FB413.value + f' - save failed with message {str(e)}'
logger.critical(msg)
raise FedbiomedExperimentError(msg)
check_training_plan_status()
check_training_plan_status()
Method for checking training plan status, ie whether it is approved or not by the nodes
Returns:
Type | Description |
---|---|
Responses | Training plan status for answering nodes |
Raises:
Type | Description |
---|---|
FedbiomedExperimentError | bad argument type |
Source code in fedbiomed/researcher/experiment.py
@exp_exceptions
def check_training_plan_status(self) -> Responses:
""" Method for checking training plan status, ie whether it is approved or not by the nodes
Returns:
Training plan status for answering nodes
Raises:
FedbiomedExperimentError: bad argument type
"""
# at this point, self._job exists (initialized in constructor)
if self._job is None:
# cannot check training plan status if job not defined
msg = ErrorNumbers.FB412.value + \
', in method `check_training_plan_status` : no `job` defined for experiment'
logger.critical(msg)
raise FedbiomedExperimentError(msg)
# always returns a `Responses()` object
responses = self._job.check_training_plan_is_approved_by_nodes()
return responses
experimentation_folder()
experimentation_folder()
Retrieves the folder name where experiment data/result are saved.
Please see alsoset_experimentation_folder
Returns:
Type | Description |
---|---|
str | File name where experiment related files are saved |
Source code in fedbiomed/researcher/experiment.py
@exp_exceptions
def experimentation_folder(self) -> str:
"""Retrieves the folder name where experiment data/result are saved.
Please see also[`set_experimentation_folder`]
[fedbiomed.researcher.experiment.Experiment.set_experimentation_folder]
Returns:
File name where experiment related files are saved
"""
return self._experimentation_folder
experimentation_path()
experimentation_path()
Retrieves the file path where experimentation folder is located and experiment related files are saved.
Returns:
Type | Description |
---|---|
str | Experiment directory where all experiment related files are saved |
Source code in fedbiomed/researcher/experiment.py
@exp_exceptions
def experimentation_path(self) -> str:
"""Retrieves the file path where experimentation folder is located and experiment related files are saved.
Returns:
Experiment directory where all experiment related files are saved
"""
return os.path.join(environ['EXPERIMENTS_DIR'], self._experimentation_folder)
info()
info()
Prints out the information about the current status of the experiment.
Lists all the parameters/arguments of the experiment and informs whether the experiment can be run.
Raises:
Type | Description |
---|---|
FedbiomedExperimentError | Inconsistent experiment due to missing variables |
Source code in fedbiomed/researcher/experiment.py
@exp_exceptions
def info(self) -> None:
"""Prints out the information about the current status of the experiment.
Lists all the parameters/arguments of the experiment and informs whether the experiment can be run.
Raises:
FedbiomedExperimentError: Inconsistent experiment due to missing variables
"""
# at this point all attributes are initialized (in constructor)
info = {
'Arguments': [
'Tags',
'Nodes filter',
'Training Data',
'Aggregator',
'Strategy',
'Job',
'Training Plan Path',
'Training Plan Class',
'Model Arguments',
'Training Arguments',
'Rounds already run',
'Rounds total',
'Experiment folder',
'Experiment Path',
'Breakpoint State',
'Secure Aggregation'
],
# max 60 characters per column for values - can we do that with tabulate() ?
'Values': ['\n'.join(findall('.{1,60}',
str(e))) for e in [
self._tags,
self._nodes,
self._fds,
self._aggregator.aggregator_name if self._aggregator is not None else None,
self._node_selection_strategy,
self._job,
self._training_plan_path,
self._training_plan_class,
self._model_args,
self._training_args,
self._round_current,
self._round_limit,
self._experimentation_folder,
self.experimentation_path(),
self._save_breakpoints,
f'- Using: {self._use_secagg}\n- Server key context: {self._secagg_servkey}\n' \
f'- Biprime context: {self._secagg_biprime}'
]
]
}
print(tabulate(info, headers='keys'))
# definitions that may be missing for running the experiment
# (value None == not defined yet for _fds et _job,
# False == no valid model for _training_plan_is_defined )
may_be_missing = {
'_fds': 'Training Data',
'_node_selection_strategy': 'Strategy',
'_training_plan_is_defined': 'Training Plan',
'_job': 'Job'
}
# definitions found missing
missing = ''
for key, value in may_be_missing.items():
try:
if eval('self.' + key) is None or eval('self.' + key) is False:
missing += f'- {value}\n'
except Exception:
# should not happen, all eval variables should be defined
msg = ErrorNumbers.FB400.value + \
f', in method `info` : self.{key} not defined for experiment'
logger.critical(msg)
raise FedbiomedExperimentError(msg)
if missing:
print(f'\nExperiment cannot be run (not fully defined), missing :\n{missing}')
else:
print('\nExperiment can be run now (fully defined)')
job()
job()
Retrieves the Job
that manages training rounds.
Returns:
Type | Description |
---|---|
Union[Job, None] | Initialized |
Source code in fedbiomed/researcher/experiment.py
@exp_exceptions
def job(self) -> Union[Job, None]:
"""Retrieves the [`Job`][fedbiomed.researcher.job] that manages training rounds.
Returns:
Initialized `Job` object. None, if it isn't declared yet or not information to set to job. Please see
[`set_job`][fedbiomed.researcher.experiment.Experiment.set_job].
"""
return self._job
load_breakpoint(breakpoint_folder_path=None)
classmethod
load_breakpoint(breakpoint_folder_path=None)
Loads breakpoint (provided a breakpoint has been saved) so experience can be resumed. Useful if training has crashed researcher side or if user wants to resume experiment.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
cls | Type[_E] | Experiment class | required |
breakpoint_folder_path | Union[str, None] | path of the breakpoint folder. Path can be absolute or relative eg: "var/experiments/Experiment_xxxx/breakpoints_xxxx". If None, loads latest breakpoint of the latest experiment. Defaults to None. | None |
Returns:
Type | Description |
---|---|
_E | Reinitialized experiment object. With given object-0.2119, 0.0796, -0.0759, user can then use |
Raises:
Type | Description |
---|---|
FedbiomedExperimentError | bad argument type, error when reading breakpoint or bad loaded breakpoint content (corrupted) |
Source code in fedbiomed/researcher/experiment.py
@classmethod
@exp_exceptions
def load_breakpoint(cls: Type[_E],
breakpoint_folder_path: Union[str, None] = None) -> _E:
"""
Loads breakpoint (provided a breakpoint has been saved)
so experience can be resumed. Useful if training has crashed
researcher side or if user wants to resume experiment.
Args:
cls: Experiment class
breakpoint_folder_path: path of the breakpoint folder. Path can be absolute or relative eg:
"var/experiments/Experiment_xxxx/breakpoints_xxxx". If None, loads latest breakpoint of the latest
experiment. Defaults to None.
Returns:
Reinitialized experiment object. With given object-0.2119, 0.0796, -0.0759, user can then use `.run()` method to pursue model
training.
Raises:
FedbiomedExperimentError: bad argument type, error when reading breakpoint or bad loaded breakpoint
content (corrupted)
"""
# check parameters type
if not isinstance(breakpoint_folder_path, str) and breakpoint_folder_path is not None:
msg = ErrorNumbers.FB413.value + ' - load failed, ' + \
f'`breakpoint_folder_path` has bad type {type(breakpoint_folder_path)}'
logger.critical(msg)
raise FedbiomedExperimentError(msg)
# get breakpoint folder path (if it is None) and state file
breakpoint_folder_path, state_file = find_breakpoint_path(breakpoint_folder_path)
breakpoint_folder_path = os.path.abspath(breakpoint_folder_path)
try:
with open(os.path.join(breakpoint_folder_path, state_file), "r") as f:
saved_state = json.load(f)
except (json.JSONDecodeError, OSError) as e:
# OSError: heuristic for catching file access issues
msg = ErrorNumbers.FB413.value + ' - load failed, ' + \
f'reading breakpoint file failed with message {str(e)}'
logger.critical(msg)
raise FedbiomedExperimentError(msg)
if not isinstance(saved_state, dict):
msg = ErrorNumbers.FB413.value + ' - load failed, ' + \
f'breakpoint file seems corrupted. Type should be `dict` not {type(saved_state)}'
logger.critical(msg)
raise FedbiomedExperimentError(msg)
# retrieve breakpoint training data
bkpt_fds = saved_state.get('training_data')
# keeping bkpt_fds a dict so that FederatedDataSet will be instantiated
# in Experiment.__init__() applying some type checks.
# More checks to verify the structure/content of saved_state.get('training_data')
# should be added in FederatedDataSet.__init__() when refactoring it
bkpt_fds = FederatedDataSet(bkpt_fds)
# retrieve breakpoint sampling strategy
bkpt_sampling_strategy_args = saved_state.get("node_selection_strategy")
bkpt_sampling_strategy = cls._create_object(bkpt_sampling_strategy_args, data=bkpt_fds)
# initializing experiment
loaded_exp = cls(tags=saved_state.get('tags'),
nodes=None, # list of previous nodes is contained in training_data
training_data=bkpt_fds,
# aggregator=bkpt_aggregator,
node_selection_strategy=bkpt_sampling_strategy,
round_limit=saved_state.get("round_limit"),
training_plan_class=saved_state.get("training_plan_class"),
training_plan_path=saved_state.get("training_plan_path"),
model_args=saved_state.get("model_args"),
training_args=saved_state.get("training_args"),
save_breakpoints=True,
experimentation_folder=saved_state.get('experimentation_folder')
)
# nota: we are initializing experiment with no aggregator: hence, by default,
# `loaded_exp` will be loaded with FedAverage.
# changing `Experiment` attributes
loaded_exp._set_round_current(saved_state.get('round_current'))
# TODO: checks when loading parameters
training_plan = loaded_exp.training_plan()
if training_plan is None:
msg = ErrorNumbers.FB413.value + ' - load failed, ' + \
'breakpoint file seems corrupted, `training_plan` is None'
logger.critical(msg)
raise FedbiomedExperimentError(msg)
else:
loaded_exp._aggregated_params = loaded_exp._load_aggregated_params(
saved_state.get('aggregated_params'),
training_plan.load
)
# retrieve and change federator
bkpt_aggregator_args = saved_state.get("aggregator")
bkpt_aggregator = loaded_exp._create_object(bkpt_aggregator_args, training_plan= training_plan)
loaded_exp.set_aggregator(bkpt_aggregator)
# changing `Job` attributes
loaded_exp._job.load_state(saved_state.get('job'))
# nota: exceptions should be handled in Job, when refactoring it
# changing secagg attributes
bkpt_secagg_servkey_args = saved_state.get("secagg_servkey")
if bkpt_secagg_servkey_args:
loaded_exp._secagg_servkey = cls._create_object(
bkpt_secagg_servkey_args,
parties = bkpt_secagg_servkey_args['parties'],
job_id = bkpt_secagg_servkey_args['job_id']
)
bkpt_secagg_biprime_args = saved_state.get("secagg_biprime")
if bkpt_secagg_biprime_args:
loaded_exp._secagg_biprime = cls._create_object(
bkpt_secagg_biprime_args,
parties = bkpt_secagg_biprime_args['parties']
)
loaded_exp._use_secagg = saved_state.get('use_secagg')
logger.info(f"Experimentation reload from {breakpoint_folder_path} successful!")
return loaded_exp
model_args()
model_args()
Retrieves model arguments.
Please see also set_model_args
Returns:
Type | Description |
---|---|
dict | The arguments that are going to be passed to |
Source code in fedbiomed/researcher/experiment.py
@exp_exceptions
def model_args(self) -> dict:
"""Retrieves model arguments.
Please see also [`set_model_args`][fedbiomed.researcher.experiment.Experiment.set_model_args]
Returns:
The arguments that are going to be passed to [`training_plans`][fedbiomed.common.training_plans]
classes in built time on the node side.
"""
return self._model_args
monitor()
monitor()
Retrieves the monitor object
Monitor is responsible for receiving and parsing real-time training and validation feed-back from each node participate to federated training. See Monitor
Returns:
Type | Description |
---|---|
Monitor | Monitor object that will always exist with experiment to retrieve feed-back from the nodes. |
Source code in fedbiomed/researcher/experiment.py
@exp_exceptions
def monitor(self) -> Monitor:
"""Retrieves the monitor object
Monitor is responsible for receiving and parsing real-time training and validation feed-back from each node
participate to federated training. See [`Monitor`][fedbiomed.researcher.monitor.Monitor]
Returns:
Monitor object that will always exist with experiment to retrieve feed-back from the nodes.
"""
return self._monitor
nodes()
nodes()
Retrieves the nodes
that are chosen for federated training.
Please see set_nodes
to set nodes
.
Returns:
Type | Description |
---|---|
Union[List[str], None] | Object that contains meta-data for the datasets of each node. |
Source code in fedbiomed/researcher/experiment.py
@exp_exceptions
def nodes(self) -> Union[List[str], None]:
"""Retrieves the `nodes` that are chosen for federated training.
Please see [`set_nodes`][fedbiomed.researcher.experiment.Experiment.set_nodes] to set `nodes`.
Returns:
Object that contains meta-data for the datasets of each node. `None` if nodes are not set.
"""
return self._nodes
round_current()
round_current()
Retrieves the round where the experiment is at.
Returns:
Type | Description |
---|---|
int | Indicates the round number that the experiment will perform next. |
Source code in fedbiomed/researcher/experiment.py
@exp_exceptions
def round_current(self) -> int:
"""Retrieves the round where the experiment is at.
Returns:
Indicates the round number that the experiment will perform next.
"""
return self._round_current
round_limit()
round_limit()
Retrieves the round limit from the experiment object.
Please see also set_round_limit
to change or set round limit.
Returns:
Type | Description |
---|---|
Union[int, None] | Round limit that shows maximum number of rounds that can be performed. |
Source code in fedbiomed/researcher/experiment.py
@exp_exceptions
def round_limit(self) -> Union[int, None]:
"""Retrieves the round limit from the experiment object.
Please see also [`set_round_limit`][fedbiomed.researcher.experiment.Experiment.set_training_data] to change
or set round limit.
Returns:
Round limit that shows maximum number of rounds that can be performed. `None` if it isn't declared yet.
"""
return self._round_limit
run(rounds=None, increase=False)
run(rounds=None, increase=False)
Run one or more rounds of an experiment, continuing from the point the experiment had reached.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
rounds | Union[int, None] | Number of experiment rounds to run in this call. * | None |
increase | bool | automatically increase the | False |
Returns:
Type | Description |
---|---|
int | Number of rounds have been run |
Raises:
Type | Description |
---|---|
FedbiomedExperimentError | bad argument type or value |
Source code in fedbiomed/researcher/experiment.py
@exp_exceptions
def run(self, rounds: Union[int, None] = None, increase: bool = False) -> int:
"""Run one or more rounds of an experiment, continuing from the point the
experiment had reached.
Args:
rounds: Number of experiment rounds to run in this call.
* `None` means "run all the rounds remaining in the experiment" computed as
maximum rounds (`round_limit` for this experiment) minus the number of
rounds already run rounds (`round_current` for this experiment).
It does nothing and issues a warning if `round_limit` is `None` (no
round limit defined for the experiment)
* `int` >= 1 means "run at most `rounds` rounds".
If `round_limit` is `None` for the experiment, run exactly `rounds` rounds.
If a `round_limit` is set for the experiment and the number or rounds would
increase beyond the `round_limit` of the experiment:
- if `increase` is True, increase the `round_limit` to
(`round_current` + `rounds`) and run `rounds` rounds
- if `increase` is False, run (`round_limit` - `round_current`)
rounds, don't modify the maximum `round_limit` of the experiment
and issue a warning.
increase: automatically increase the `round_limit`
of the experiment for executing the specified number of `rounds`.
Does nothing if `round_limit` is `None` or `rounds` is None.
Defaults to False
Returns:
Number of rounds have been run
Raises:
FedbiomedExperimentError: bad argument type or value
"""
# check rounds is a >=1 integer or None
if rounds is None:
pass
elif isinstance(rounds, int):
if rounds < 1:
msg = ErrorNumbers.FB410.value + \
f', in method `run` param `rounds` : value {rounds}'
logger.critical(msg)
raise FedbiomedExperimentError(msg)
else:
# bad type
msg = ErrorNumbers.FB410.value + \
f', in method `run` param `rounds` : type {type(rounds)}'
logger.critical(msg)
raise FedbiomedExperimentError(msg)
# check increase is a boolean
if not isinstance(increase, bool):
msg = ErrorNumbers.FB410.value + \
f', in method `run` param `increase` : type {type(increase)}'
logger.critical(msg)
raise FedbiomedExperimentError(msg)
# compute number of rounds to run + updated rounds limit
if rounds is None:
if isinstance(self._round_limit, int):
# run all remaining rounds in the experiment
rounds = self._round_limit - self._round_current
if rounds == 0:
# limit already reached
logger.warning(f'Round limit of {self._round_limit} already reached '
'for this experiment, do nothing.')
return 0
else:
# cannot run if no number of rounds given and no round limit exists
logger.warning('Cannot run, please specify a number of `rounds` to run or '
'set a `round_limit` to the experiment')
return 0
else:
# at this point, rounds is an int >= 1
if isinstance(self._round_limit, int):
if (self._round_current + rounds) > self._round_limit:
if increase:
# dont change rounds, but extend self._round_limit as necessary
logger.debug(f'Auto increasing total rounds for experiment from {self._round_limit} '
f'to {self._round_current + rounds}')
self._round_limit = self._round_current + rounds
else:
new_rounds = self._round_limit - self._round_current
if new_rounds == 0:
# limit already reached
logger.warning(f'Round limit of {self._round_limit} already reached '
'for this experiment, do nothing.')
return 0
else:
# reduce the number of rounds to run in the experiment
logger.warning(f'Limit of {self._round_limit} rounds for the experiment '
f'will be reached, reducing the number of rounds for this '
f'run from {rounds} to {new_rounds}')
rounds = new_rounds
# At this point `rounds` is an int > 0 (not None)
# run the rounds
for _ in range(rounds):
if isinstance(self._round_limit, int) and self._round_current == (self._round_limit - 1) \
and self._training_args['test_on_global_updates'] is True:
# Do "validation after a round" only if this a round limit is defined and we reached it
# and validation is active on global params
# When this condition is met, it also means we are running the last of
# the `rounds` rounds in this function
test_after = True
else:
test_after = False
increment = self.run_once(increase=False, test_after=test_after)
if increment == 0:
# should not happen
msg = ErrorNumbers.FB400.value + \
f', in method `run` method `run_once` returns {increment}'
logger.critical(msg)
raise FedbiomedExperimentError(msg)
return rounds
run_once(increase=False, test_after=False)
run_once(increase=False, test_after=False)
Run at most one round of an experiment, continuing from the point the experiment had reached.
If round_limit
is None
for the experiment (no round limit defined), run one round. If round_limit
is not None
and the round_limit
of the experiment is already reached: * if increase
is False, do nothing and issue a warning * if increase
is True, increment total number of round round_limit
and run one round
Parameters:
Name | Type | Description | Default |
---|---|---|---|
increase | bool | automatically increase the | False |
test_after | bool | if True, do a second request to the nodes after the round, only for validation on aggregated params. Intended to be used after the last training round of an experiment. Defaults to False. | False |
Returns:
Type | Description |
---|---|
int | Number of rounds really run |
Raises:
Type | Description |
---|---|
FedbiomedExperimentError | bad argument type or value |
Source code in fedbiomed/researcher/experiment.py
@exp_exceptions
def run_once(self, increase: bool = False, test_after: bool = False) -> int:
"""Run at most one round of an experiment, continuing from the point the
experiment had reached.
If `round_limit` is `None` for the experiment (no round limit defined), run one round.
If `round_limit` is not `None` and the `round_limit` of the experiment is already reached:
* if `increase` is False, do nothing and issue a warning
* if `increase` is True, increment total number of round `round_limit` and run one round
Args:
increase: automatically increase the `round_limit` of the experiment if needed. Does nothing if
`round_limit` is `None`. Defaults to False
test_after: if True, do a second request to the nodes after the round, only for validation on aggregated
params. Intended to be used after the last training round of an experiment. Defaults to False.
Returns:
Number of rounds really run
Raises:
FedbiomedExperimentError: bad argument type or value
"""
# check increase is a boolean
if not isinstance(increase, bool):
msg = ErrorNumbers.FB410.value + \
f', in method `run_once` param `increase` : type {type(increase)}'
logger.critical(msg)
raise FedbiomedExperimentError(msg)
# nota: we should never have self._round_current > self._round_limit, only ==
if self._round_limit is not None and self._round_current >= self._round_limit:
if increase is True:
logger.debug(f'Auto increasing total rounds for experiment from {self._round_limit} '
f'to {self._round_current + 1}')
self._round_limit = self._round_current + 1
else:
logger.warning(f'Round limit of {self._round_limit} was reached, do nothing')
return 0
# at this point, self._aggregator always exists and is not None
# self.{_node_selection_strategy,_job} exist but may be None
# check pre-requisites are met for running a round
# for component in (self._node_selection_strategy, self._job):
if self._node_selection_strategy is None:
msg = ErrorNumbers.FB411.value + ', missing `node_selection_strategy`'
logger.critical(msg)
raise FedbiomedExperimentError(msg)
elif self._job is None:
msg = ErrorNumbers.FB411.value + ', missing `job`'
logger.critical(msg)
raise FedbiomedExperimentError(msg)
# Ready to execute a training round using the job, strategy and aggregator
if self._global_model is None:
self._global_model = self._job.training_plan.get_model_params() # initial server state, before optimization/aggregation
self._aggregator.set_training_plan_type(self._job.training_plan.type())
# Sample nodes using strategy (if given)
self._job.nodes = self._node_selection_strategy.sample_nodes(self._round_current)
# check aggregator parameter(s) before starting a round
self._aggregator.check_values(n_updates=self._training_args.get('num_updates'),
training_plan=self._job.training_plan)
logger.info('Sampled nodes in round ' + str(self._round_current) + ' ' + str(self._job.nodes))
aggr_args_thr_msg, aggr_args_thr_file = self._aggregator.create_aggregator_args(self._global_model,
self._job._nodes)
# Trigger training round on sampled nodes
_ = self._job.start_nodes_training_round(round=self._round_current,
aggregator_args_thr_msg=aggr_args_thr_msg,
aggregator_args_thr_files=aggr_args_thr_file,
do_training=True)
# refining/normalizing model weights received from nodes
model_params, weights = self._node_selection_strategy.refine(
self._job.training_replies[self._round_current], self._round_current)
self._aggregator.set_fds(self._fds)
# aggregate models from nodes to a global model
aggregated_params = self._aggregator.aggregate(model_params,
weights,
global_model = self._global_model,
training_plan=self._job.training_plan,
training_replies=self._job.training_replies,
node_ids=self._job.nodes,
n_updates=self._training_args.get('num_updates'),
n_round=self._round_current)
# write results of the aggregated model in a temp file
self._global_model = aggregated_params # update global model
aggregated_params_path, _ = self._job.update_parameters(aggregated_params)
logger.info(f'Saved aggregated params for round {self._round_current} '
f'in {aggregated_params_path}')
self._aggregated_params[self._round_current] = {'params': aggregated_params,
'params_path': aggregated_params_path}
self._round_current += 1
# Update round in monitor for the next round
self._monitor.set_round(round_=self._round_current + 1)
if self._save_breakpoints:
self.breakpoint()
# do final validation after saving breakpoint :
# not saved in breakpoint for current round, but more simple
if test_after:
# FIXME: should we sample nodes here too?
aggr_args_thr_msg, aggr_args_thr_file = self._aggregator.create_aggregator_args(self._global_model,
self._job._nodes)
self._job.start_nodes_training_round(round=self._round_current,
aggregator_args_thr_msg=aggr_args_thr_msg,
aggregator_args_thr_files=aggr_args_thr_file,
do_training=False)
return 1
save_breakpoints()
save_breakpoints()
Retrieves the status of saving breakpoint after each round of training.
Returns:
Type | Description |
---|---|
bool |
|
Source code in fedbiomed/researcher/experiment.py
@exp_exceptions
def save_breakpoints(self) -> bool:
"""Retrieves the status of saving breakpoint after each round of training.
Returns:
`True`, If saving breakpoint is active. `False`, vice versa.
"""
return self._save_breakpoints
secagg_context()
secagg_context()
Retrieves the secure aggregation context of the experiment.
Returns:
Type | Description |
---|---|
Tuple[Union[SecaggServkeyContext, None], Union[SecaggBiprimeContext, None]] | a tuple of the server key secagg component (or None if it doesn't exist), and the biprime secagg component (or None if it doesn't exist). |
Source code in fedbiomed/researcher/experiment.py
@exp_exceptions
def secagg_context(self) -> Tuple[Union[SecaggServkeyContext, None], Union[SecaggBiprimeContext, None]]:
"""Retrieves the secure aggregation context of the experiment.
Returns:
a tuple of the server key secagg component (or None if it doesn't exist), and the
biprime secagg component (or None if it doesn't exist).
"""
return self._secagg_servkey, self._secagg_biprime
set_aggregator(aggregator)
set_aggregator(aggregator)
Sets aggregator + verification on arguments type
Parameters:
Name | Type | Description | Default |
---|---|---|---|
aggregator | Union[Aggregator, Type[Aggregator], None] | Object or class defining the method for aggregating local updates. Default to None (use | required |
Returns:
Type | Description |
---|---|
Aggregator | aggregator (Aggregator) |
Raises:
Type | Description |
---|---|
FedbiomedExperimentError | bad aggregator type |
Source code in fedbiomed/researcher/experiment.py
@exp_exceptions
def set_aggregator(self, aggregator: Union[Aggregator, Type[Aggregator], None]) -> \
Aggregator:
"""Sets aggregator + verification on arguments type
Args:
aggregator: Object or class defining the method for aggregating local updates. Default to None
(use `FedAverage` for aggregation)
Returns:
aggregator (Aggregator)
Raises:
FedbiomedExperimentError : bad aggregator type
"""
if aggregator is None:
# default aggregator
self._aggregator = FedAverage()
elif inspect.isclass(aggregator):
# a class is provided, need to instantiate an object
if issubclass(aggregator, Aggregator):
self._aggregator = aggregator()
else:
# bad argument
msg = ErrorNumbers.FB410.value + f' `aggregator` : {aggregator} class'
logger.critical(msg)
raise FedbiomedExperimentError(msg)
elif isinstance(aggregator, Aggregator):
# an object of a proper class is provided, nothing to do
self._aggregator = aggregator
else:
# other bad type or object
msg = ErrorNumbers.FB410.value + f' `aggregator` : {type(aggregator)}'
logger.critical(msg)
raise FedbiomedExperimentError(msg)
# at this point self._aggregator is (non-None) aggregator object
self.aggregator_args["aggregator_name"] = self._aggregator.aggregator_name
if self._fds is not None:
self._aggregator.set_fds(self._fds)
return self._aggregator
set_experimentation_folder(experimentation_folder)
set_experimentation_folder(experimentation_folder)
Sets experimentation_folder
, the folder name where experiment data/result are saved.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
experimentation_folder | Union[str, None] | File name where experiment related files are saved | required |
Returns:
Type | Description |
---|---|
str | experimentation_folder (str) |
Raises:
Type | Description |
---|---|
FedbiomedExperimentError | bad |
Source code in fedbiomed/researcher/experiment.py
@exp_exceptions
def set_experimentation_folder(self, experimentation_folder: Union[str, None]) -> str:
"""Sets `experimentation_folder`, the folder name where experiment data/result are saved.
Args:
experimentation_folder: File name where experiment related files are saved
Returns:
experimentation_folder (str)
Raises:
FedbiomedExperimentError : bad `experimentation_folder` type
"""
if experimentation_folder is None:
self._experimentation_folder = create_exp_folder()
elif isinstance(experimentation_folder, str):
sanitized_folder = sanitize_filename(experimentation_folder, platform='auto')
self._experimentation_folder = create_exp_folder(sanitized_folder)
if (sanitized_folder != experimentation_folder):
logger.warning(f'`experimentation_folder` was sanitized from '
f'{experimentation_folder} to {sanitized_folder}')
else:
msg = ErrorNumbers.FB410.value + \
f' `experimentation_folder` : {type(experimentation_folder)}'
logger.critical(msg)
raise FedbiomedExperimentError(msg)
# at this point self._experimentation_folder is a str valid for a foldername
# _job doesn't always exist at this point
if self._job is not None:
logger.debug('Experimentation folder changed, you may need to update `job`')
return self._experimentation_folder
set_job()
set_job()
Setter for job, it verifies pre-requisites are met for creating a job attached to this experiment. If yes, instantiate a job ; if no, return None.
Returns:
Type | Description |
---|---|
Union[Job, None] | The object that is initialized for creating round jobs. |
Source code in fedbiomed/researcher/experiment.py
@exp_exceptions
def set_job(self) -> Union[Job, None]:
"""Setter for job, it verifies pre-requisites are met for creating a job
attached to this experiment. If yes, instantiate a job ; if no, return None.
Returns:
The object that is initialized for creating round jobs.
"""
# at this point all are defined among:
# self.{_reqs,_fds,_training_plan_is_defined,_training_plan,_training_plan_path,_model_args,_training_args}
# self._experimentation_folder => self.experimentation_path()
# self._round_current
if self._job is not None:
# a job is already defined, and it may also have run some rounds
logger.debug('Experimentation `job` changed after running '
'{self._round_current} rounds, may give inconsistent results')
# note:
# if self._secagg_servkey != None, then it should be redefined
if self._training_plan_is_defined is not True:
# training plan not properly defined yet
self._job = None
logger.debug('Experiment not fully configured yet: no job. Missing proper training plan '
f'definition (training_plan={self._training_plan_class} '
f'training_plan_path={self._training_plan_path})')
elif self._fds is None:
# not training data yet
self._job = None
logger.debug('Experiment not fully configured yet: no job. Missing training data')
else:
# meeting requisites for instantiating a job
self._job = Job(reqs=self._reqs,
training_plan_class=self._training_plan_class,
training_plan_path=self._training_plan_path,
model_args=self._model_args,
training_args=self._training_args,
data=self._fds,
keep_files_dir=self.experimentation_path())
return self._job
set_model_args(model_args)
set_model_args(model_args)
Sets model_args
+ verification on arguments type
Parameters:
Name | Type | Description | Default |
---|---|---|---|
model_args | dict | contains model arguments passed to the constructor of the training plan when instantiating it : output and input feature dimension, etc. | required |
Returns:
Type | Description |
---|---|
dict | Model arguments that have been set. |
Raises:
Type | Description |
---|---|
FedbiomedExperimentError | bad model_args type |
Source code in fedbiomed/researcher/experiment.py
@exp_exceptions
def set_model_args(self, model_args: dict) -> dict:
"""Sets `model_args` + verification on arguments type
Args:
model_args (dict): contains model arguments passed to the constructor
of the training plan when instantiating it : output and input feature
dimension, etc.
Returns:
Model arguments that have been set.
Raises:
FedbiomedExperimentError : bad model_args type
"""
if isinstance(model_args, dict):
self._model_args = model_args
else:
# bad type
msg = ErrorNumbers.FB410.value + f' `model_args` : {type(model_args)}'
logger.critical(msg)
raise FedbiomedExperimentError(msg)
# self._model_args always exist at this point
if self._job is not None:
logger.debug('Experimentation model_args changed, you may need to update `job`')
return self._model_args
set_nodes(nodes)
set_nodes(nodes)
Sets for nodes + verifications on argument type
Parameters:
Name | Type | Description | Default |
---|---|---|---|
nodes | Union[List[str], None] | List of node_ids to filter the nodes to be involved in the experiment. | required |
Returns:
Type | Description |
---|---|
Union[List[str], None] | List of tags that are set. None, if the argument |
Raises:
Type | Description |
---|---|
FedbiomedExperimentError | Bad nodes type |
Source code in fedbiomed/researcher/experiment.py
@exp_exceptions
def set_nodes(self, nodes: Union[List[str], None]) -> Union[List[str], None]:
"""Sets for nodes + verifications on argument type
Args:
nodes: List of node_ids to filter the nodes to be involved in the experiment.
Returns:
List of tags that are set. None, if the argument `nodes` is None.
Raises:
FedbiomedExperimentError : Bad nodes type
"""
if isinstance(nodes, list):
self._nodes = nodes
for node in nodes:
if not isinstance(node, str):
msg = ErrorNumbers.FB410.value + f' `nodes` : list of {type(node)}'
logger.critical(msg)
raise FedbiomedExperimentError(msg)
elif nodes is None:
self._nodes = nodes
else:
msg = ErrorNumbers.FB410.value + f' `nodes` : {type(nodes)}'
logger.critical(msg)
raise FedbiomedExperimentError(msg)
# self._nodes always exist at this point
if self._fds is not None:
logger.debug('Experimentation nodes filter changed, you may need to update `training_data`')
return self._nodes
set_round_limit(round_limit)
set_round_limit(round_limit)
Sets round_limit
+ verification on arguments type
Parameters:
Name | Type | Description | Default |
---|---|---|---|
round_limit | Union[int, None] | the maximum number of training rounds (nodes <-> central server) that should be executed for the experiment. | required |
Returns:
Type | Description |
---|---|
Union[int, None] | Round limit for experiment of federated learning |
Raises:
Type | Description |
---|---|
FedbiomedExperimentError | bad rounds type or value |
Source code in fedbiomed/researcher/experiment.py
@exp_exceptions
def set_round_limit(self, round_limit: Union[int, None]) -> Union[int, None]:
"""Sets `round_limit` + verification on arguments type
Args:
round_limit: the maximum number of training rounds (nodes <-> central server) that should be executed
for the experiment. `None` means that no limit is defined.
Returns:
Round limit for experiment of federated learning
Raises:
FedbiomedExperimentError : bad rounds type or value
"""
# at this point round_current exists and is an int >= 0
if round_limit is None:
# no limit for training rounds
self._round_limit = None
elif isinstance(round_limit, int):
# at this point round_limit is an int
if round_limit < 0:
msg = ErrorNumbers.FB410.value + f' `round_limit` can not be negative: {round_limit}'
logger.critical(msg)
raise FedbiomedExperimentError(msg)
elif round_limit < self._round_current:
# self._round_limit can't be less than current round
logger.error(f'cannot set `round_limit` to less than the number of already run rounds '
f'({self._round_current})')
else:
self._round_limit = round_limit
else:
msg = ErrorNumbers.FB410.value + f' `round_limit` : {type(round_limit)}'
logger.critical(msg)
raise FedbiomedExperimentError(msg)
# at this point self._round_limit is a Union[int, None]
return self._round_limit
set_save_breakpoints(save_breakpoints)
set_save_breakpoints(save_breakpoints)
Setter for save_breakpoints + verification on arguments type
Parameters:
Name | Type | Description | Default |
---|---|---|---|
save_breakpoints | bool | whether to save breakpoints or not after each training round. Breakpoints can be used for resuming a crashed experiment. | required |
Returns:
Type | Description |
---|---|
bool | Status of saving breakpoints |
Raises:
Type | Description |
---|---|
FedbiomedExperimentError | bad save_breakpoints type |
Source code in fedbiomed/researcher/experiment.py
@exp_exceptions
def set_save_breakpoints(self, save_breakpoints: bool) -> bool:
""" Setter for save_breakpoints + verification on arguments type
Args:
save_breakpoints (bool): whether to save breakpoints or
not after each training round. Breakpoints can be used for resuming
a crashed experiment.
Returns:
Status of saving breakpoints
Raises:
FedbiomedExperimentError: bad save_breakpoints type
"""
if isinstance(save_breakpoints, bool):
self._save_breakpoints = save_breakpoints
# no warning if done during experiment, we may change breakpoint policy at any time
else:
msg = ErrorNumbers.FB410.value + f' `save_breakpoints` : {type(save_breakpoints)}'
logger.critical(msg)
raise FedbiomedExperimentError(msg)
return self._save_breakpoints
set_strategy(node_selection_strategy)
set_strategy(node_selection_strategy)
Sets for node_selection_strategy
+ verification on arguments type
Parameters:
Name | Type | Description | Default |
---|---|---|---|
node_selection_strategy | Union[Strategy, Type[Strategy], None] | object or class defining how nodes are sampled at each round for training, and how non-responding nodes are managed. Defaults to None: - use | required |
Returns:
Type | Description |
---|---|
Union[Strategy, None] | node selection strategy class |
Raises:
Type | Description |
---|---|
FedbiomedExperimentError | bad strategy type |
Source code in fedbiomed/researcher/experiment.py
@exp_exceptions
def set_strategy(self, node_selection_strategy: Union[Strategy, Type[Strategy], None]) -> \
Union[Strategy, None]:
"""Sets for `node_selection_strategy` + verification on arguments type
Args:
node_selection_strategy: object or class defining how nodes are sampled at each round for training, and
how non-responding nodes are managed. Defaults to None:
- use `DefaultStrategy` if training_data is initialized
- else strategy is None (cannot be initialized), experiment cannot
be launched yet
Returns:
node selection strategy class
Raises:
FedbiomedExperimentError : bad strategy type
"""
if self._fds is not None:
if node_selection_strategy is None:
# default node_selection_strategy
self._node_selection_strategy = DefaultStrategy(self._fds)
elif inspect.isclass(node_selection_strategy):
# a class is provided, need to instantiate an object
if issubclass(node_selection_strategy, Strategy):
self._node_selection_strategy = node_selection_strategy(self._fds)
else:
# bad argument
msg = ErrorNumbers.FB410.value + \
f' `node_selection_strategy` : {node_selection_strategy} class'
logger.critical(msg)
raise FedbiomedExperimentError(msg)
elif isinstance(node_selection_strategy, Strategy):
# an object of a proper class is provided, nothing to do
self._node_selection_strategy = node_selection_strategy
else:
# other bad type or object
msg = ErrorNumbers.FB410.value + \
f' `node_selection_strategy` : {type(node_selection_strategy)}'
logger.critical(msg)
raise FedbiomedExperimentError(msg)
else:
# cannot initialize strategy if not FederatedDataSet yet
self._node_selection_strategy = None
logger.debug('Experiment not fully configured yet: no node selection strategy')
# at this point self._node_selection_strategy is a Union[Strategy, None]
return self._node_selection_strategy
set_tags(tags)
set_tags(tags)
Sets tags + verifications on argument type
Parameters:
Name | Type | Description | Default |
---|---|---|---|
tags | Union[List[str], str, None] | List of string with data tags or string with one data tag. Empty list of tags ([]) means any dataset is accepted, it is different from None (tags not set, cannot search for training_data yet). | required |
Returns:
Type | Description |
---|---|
Union[List[str], None] | List of tags that are set. None, if the argument |
Raises:
Type | Description |
---|---|
FedbiomedExperimentError | Bad tags type |
Source code in fedbiomed/researcher/experiment.py
@exp_exceptions
def set_tags(self, tags: Union[List[str], str, None]) -> Union[List[str], None]:
"""Sets tags + verifications on argument type
Args:
tags: List of string with data tags or string with one data tag. Empty list
of tags ([]) means any dataset is accepted, it is different from None (tags not set, cannot search
for training_data yet).
Returns:
List of tags that are set. None, if the argument `tags` is None.
Raises:
FedbiomedExperimentError : Bad tags type
"""
if isinstance(tags, list):
for tag in tags:
if not isinstance(tag, str):
msg = ErrorNumbers.FB410.value + f' `tags` : list of {type(tag)}'
logger.critical(msg)
raise FedbiomedExperimentError(msg)
self._tags = tags
elif isinstance(tags, str):
self._tags = [tags]
elif tags is None:
self._tags = tags
else:
msg = ErrorNumbers.FB410.value + f' `tags` : {type(tags)}'
logger.critical(msg)
raise FedbiomedExperimentError(msg)
# self._tags always exist at this point
if self._fds is not None:
logger.debug('Experimentation tags changed, you may need to update `training_data`')
return self._tags
set_tensorboard(tensorboard)
set_tensorboard(tensorboard)
Sets the tensorboard flag
Parameters:
Name | Type | Description | Default |
---|---|---|---|
tensorboard | bool | If | required |
Returns:
Type | Description |
---|---|
bool | Status of tensorboard |
Source code in fedbiomed/researcher/experiment.py
@exp_exceptions
def set_tensorboard(self, tensorboard: bool) -> bool:
"""
Sets the tensorboard flag
Args:
tensorboard: If `True` tensorboard log files will be writen after receiving training feedbacks
Returns:
Status of tensorboard
"""
if isinstance(tensorboard, bool):
self._tensorboard = tensorboard
self._monitor.set_tensorboard(tensorboard)
else:
msg = ErrorNumbers.FB410.value + f' `tensorboard` : {type(tensorboard)}'
logger.critical(msg)
raise FedbiomedExperimentError(msg)
return self._tensorboard
set_test_metric(metric, metric_args)
set_test_metric(metric, metric_args)
Sets a metric for federated model validation
Parameters:
Name | Type | Description | Default |
---|---|---|---|
metric | Union[MetricTypes, str, None] | A class as an instance of | required |
**metric_args | dict | A dictionary that contains arguments for metric function. Arguments should be compatible with corresponding metrics in | {} |
Returns:
Type | Description |
---|---|
Tuple[Union[str, None], Dict[str, Any]] | Metric and metric args as tuple |
Raises:
Type | Description |
---|---|
FedbiomedExperimentError | Invalid type for |
Source code in fedbiomed/researcher/experiment.py
@exp_exceptions
def set_test_metric(self, metric: Union[MetricTypes, str, None], **metric_args: dict) -> \
Tuple[Union[str, None], Dict[str, Any]]:
""" Sets a metric for federated model validation
Args:
metric: A class as an instance of [`MetricTypes`][fedbiomed.common.metrics.MetricTypes]. [`str`][str] for
referring one of metric which provided as attributes in [`MetricTypes`]
[fedbiomed.common.metrics.MetricTypes]. None, if it isn't declared yet.
**metric_args: A dictionary that contains arguments for metric function. Arguments
should be compatible with corresponding metrics in [`sklearn.metrics`][sklearn.metrics].
Returns:
Metric and metric args as tuple
Raises:
FedbiomedExperimentError: Invalid type for `metric` argument
"""
self._training_args['test_metric'] = metric
# using **metric_args, we know `test_metric_args` is a Dict[str, Any]
self._training_args['test_metric_args'] = metric_args
if self._job is not None:
# job setter function exists, use it
self._job.training_args = self._training_args
logger.debug('Experimentation training_args updated for `job`')
return metric, metric_args
set_test_on_global_updates(flag=True)
set_test_on_global_updates(flag=True)
Setter for test_on_global_updates, that indicates whether to perform a validation on the federated model updates on the node side before training model locally where aggregated model parameters are received.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
flag | bool | whether to perform model validation on global updates. Defaults to True. | True |
Returns:
Type | Description |
---|---|
bool | Value of the flag |
Raises:
Type | Description |
---|---|
FedbiomedExperimentError | bad flag type |
Source code in fedbiomed/researcher/experiment.py
@exp_exceptions
def set_test_on_global_updates(self, flag: bool = True) -> bool:
"""
Setter for test_on_global_updates, that indicates whether to perform a validation on the federated model
updates on the node side before training model locally where aggregated model parameters are received.
Args:
flag (bool, optional): whether to perform model validation on global updates. Defaults to True.
Returns:
Value of the flag `test_on_global_updates`.
Raises:
FedbiomedExperimentError : bad flag type
"""
self._training_args['test_on_global_updates'] = flag
if self._job is not None:
# job setter function exists, use it
self._job.training_args = self._training_args
logger.debug('Experimentation training_args updated for `job`')
return self._training_args['test_on_global_updates']
set_test_on_local_updates(flag=True)
set_test_on_local_updates(flag=True)
Setter for test_on_local_updates
, that indicates whether to perform a validation on the federated model on the node side where model parameters are updated locally after training in each node.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
flag | bool | whether to perform model validation on local updates. Defaults to True. | True |
Returns:
Type | Description |
---|---|
bool | value of the flag |
Raises:
Type | Description |
---|---|
FedbiomedExperimentError | bad flag type |
Source code in fedbiomed/researcher/experiment.py
@exp_exceptions
def set_test_on_local_updates(self, flag: bool = True) -> bool:
"""
Setter for `test_on_local_updates`, that indicates whether to perform a validation on the federated model on the
node side where model parameters are updated locally after training in each node.
Args:
flag (bool, optional): whether to perform model validation on local updates. Defaults to True.
Returns:
value of the flag `test_on_local_updates`
Raises:
FedbiomedExperimentError: bad flag type
"""
self._training_args['test_on_local_updates'] = flag
if self._job is not None:
# job setter function exists, use it
self._job.training_args = self._training_args
logger.debug('Experimentation training_args updated for `job`')
return self._training_args['test_on_local_updates']
set_test_ratio(ratio)
set_test_ratio(ratio)
Sets validation ratio for model validation.
When setting test_ratio, nodes will allocate (1 - test_ratio
) fraction of data for training and the remaining for validating model. This could be useful for validating the model, once every round, as well as controlling overfitting, doing early stopping, ....
Parameters:
Name | Type | Description | Default |
---|---|---|---|
ratio | float | validation ratio. Must be within interval [0,1]. | required |
Returns:
Type | Description |
---|---|
float | Validation ratio that is set |
Raises:
Type | Description |
---|---|
FedbiomedExperimentError | bad data type |
FedbiomedExperimentError | ratio is not within interval [0, 1] |
Source code in fedbiomed/researcher/experiment.py
@exp_exceptions
def set_test_ratio(self, ratio: float) -> float:
""" Sets validation ratio for model validation.
When setting test_ratio, nodes will allocate (1 - `test_ratio`) fraction of data for training and the
remaining for validating model. This could be useful for validating the model, once every round, as well as
controlling overfitting, doing early stopping, ....
Args:
ratio: validation ratio. Must be within interval [0,1].
Returns:
Validation ratio that is set
Raises:
FedbiomedExperimentError: bad data type
FedbiomedExperimentError: ratio is not within interval [0, 1]
"""
self._training_args['test_ratio'] = ratio
if self._job is not None:
# job setter function exists, use it
self._job.training_args = self._training_args
logger.debug('Experimentation training_args updated for `job`')
return ratio
set_training_args(training_args, reset=True)
set_training_args(training_args, reset=True)
Sets training_args
+ verification on arguments type
Parameters:
Name | Type | Description | Default |
---|---|---|---|
training_args | dict | contains training arguments passed to the | required |
reset | bool | whether to reset the training_args (if previous training_args has already been set), or to update them with training_args. Defaults to True. | True |
Returns:
Type | Description |
---|---|
dict | Training arguments |
Raises:
Type | Description |
---|---|
FedbiomedExperimentError | bad training_args type |
Source code in fedbiomed/researcher/experiment.py
@exp_exceptions
def set_training_args(self, training_args: dict, reset: bool = True) -> dict:
""" Sets `training_args` + verification on arguments type
Args:
training_args (dict): contains training arguments passed to the `training_routine` of the
[`fedbiomed.common.training_plans`][fedbiomed.common.training_plans] when launching it:
lr, epochs, batch_size...
reset (bool, optional): whether to reset the training_args (if previous training_args has already been
set), or to update them with training_args. Defaults to True.
Returns:
Training arguments
Raises:
FedbiomedExperimentError : bad training_args type
"""
if isinstance(training_args, TrainingArgs):
self._training_args = deepcopy(training_args)
else:
self._training_args = TrainingArgs(training_args, only_required=False)
return self._training_args.dict()
set_training_data(training_data, from_tags=False)
set_training_data(training_data, from_tags=False)
Sets training data for federated training + verification on arguments type
Parameters:
Name | Type | Description | Default |
---|---|---|---|
training_data | Union[FederatedDataSet, dict, None] |
| required |
from_tags | bool | Specificities; If True, query nodes for datasets when no | False |
Returns:
Type | Description |
---|---|
Union[FederatedDataSet, None] | Nodes and dataset with meta-data |
Raises:
Type | Description |
---|---|
FedbiomedExperimentError | bad training_data type |
Source code in fedbiomed/researcher/experiment.py
@exp_exceptions
def set_training_data(
self,
training_data: Union[FederatedDataSet, dict, None],
from_tags: bool = False) -> \
Union[FederatedDataSet, None]:
"""Sets training data for federated training + verification on arguments type
Args:
training_data:
* If it is a FederatedDataSet object, use this value as training_data.
* else if it is a dict, create and use a FederatedDataSet object from the dict
and use this value as training_data. The dict should use node ids as keys,
values being list of dicts (each dict representing a dataset on a node).
* else if it is None (no training data provided)
- if `from_tags` is True and `tags` is not None, set training_data by
searching for datasets with a query to the nodes using `tags` and `nodes`
- if `from_tags` is False or `tags` is None, set training_data to None (no training_data set yet,
experiment is not fully initialized and cannot be launched)
from_tags: Specificities; If True, query nodes for datasets when no `training_data` is provided.
Not used when `training_data` is provided.
Returns:
Nodes and dataset with meta-data
Raises:
FedbiomedExperimentError : bad training_data type
"""
# we can trust _reqs _tags _nodes are existing and properly typed/formatted
if not isinstance(from_tags, bool):
msg = ErrorNumbers.FB410.value + f' `from_tags` : got {type(from_tags)} but expected a boolean'
logger.critical(msg)
raise FedbiomedExperimentError(msg)
# case where no training data are passed
if training_data is None and from_tags is True:
# cannot search for training_data if tags not initialized;
# nodes can be None (no filtering on nodes by default)
if self._tags is not None:
training_data = self._reqs.search(self._tags, self._nodes)
if isinstance(training_data, FederatedDataSet):
self._fds = training_data
elif isinstance(training_data, dict):
# TODO: FederatedDataSet constructor should verify typing and format
self._fds = FederatedDataSet(training_data)
elif training_data is not None:
msg = ErrorNumbers.FB410.value + f' `training_data` has incorrect type: {type(training_data)}'
logger.critical(msg)
raise FedbiomedExperimentError(msg)
else:
self._fds = None
logger.debug('Experiment not fully configured yet: no training data')
# at this point, self._fds is either None or a FederatedDataSet object
if self._node_selection_strategy is not None:
logger.debug('Training data changed, '
'you may need to update `node_selection_strategy`')
if self._job is not None:
logger.debug('Training data changed, you may need to update `job`')
if self._aggregator is not None:
logger.debug('Training data changed, you may need to update `aggregator`')
if self._secagg_servkey is not None or self._secagg_biprime is not None:
logger.debug('Training data changed, you may need to update `use_secagg`')
return self._fds
set_training_plan_class(training_plan_class)
set_training_plan_class(training_plan_class)
Sets training_plan
+ verification on arguments type
Parameters:
Name | Type | Description | Default |
---|---|---|---|
training_plan_class | Union[Type_TrainingPlan, str, None] | name of the training plan class ( | required |
Returns:
Type | Description |
---|---|
Union[Type_TrainingPlan, str, None] |
|
Raises:
Type | Description |
---|---|
FedbiomedExperimentError | bad training_plan_class type |
Source code in fedbiomed/researcher/experiment.py
@exp_exceptions
def set_training_plan_class(self, training_plan_class: Union[Type_TrainingPlan, str, None]) -> \
Union[Type_TrainingPlan, str, None]:
"""Sets `training_plan` + verification on arguments type
Args:
training_plan_class: name of the training plan class (`str`) or training plan class as one
of [`TrainingPlans`] [fedbiomed.common.training_plans] to use for training.
For experiment to be properly and fully defined `training_plan_class` needs to be:
- a `str` when `training_plan_path` is not None (training plan class comes from a file).
- a `Type_TrainingPlan` when `training_plan_path` is None (training plan class passed
as argument).
Returns:
`training_plan_class` that is set for experiment
Raises:
FedbiomedExperimentError : bad training_plan_class type
"""
if training_plan_class is None:
self._training_plan_class = None
self._training_plan_is_defined = False
elif isinstance(training_plan_class, str):
if str.isidentifier(training_plan_class):
# correct python identifier
self._training_plan_class = training_plan_class
# training_plan_class_path may not be defined at this point
self._training_plan_is_defined = isinstance(self._training_plan_path, str)
else:
# bad identifier
msg = ErrorNumbers.FB410.value + f' `training_plan_class` : {training_plan_class} bad identifier'
logger.critical(msg)
raise FedbiomedExperimentError(msg)
elif inspect.isclass(training_plan_class):
# training_plan_class must be a subclass of a valid training plan
if issubclass(training_plan_class, training_plans):
# valid class
self._training_plan_class = training_plan_class
# training_plan_class_path may not be defined at this point
self._training_plan_is_defined = self._training_plan_path is None
else:
# bad class
msg = ErrorNumbers.FB410.value + f' `training_plan_class` : {training_plan_class} class'
logger.critical(msg)
raise FedbiomedExperimentError(msg)
else:
# bad type
msg = ErrorNumbers.FB410.value + f' `training_plan_class` of type: {type(training_plan_class)}'
logger.critical(msg)
raise FedbiomedExperimentError(msg)
# self._training_plan_is_defined and self._training_plan_class always exist at this point
if not self._training_plan_is_defined:
logger.debug(f'Experiment not fully configured yet: no valid training plan, '
f'training_plan_class={self._training_plan_class} '
f'training_plan_class_path={self._training_plan_path}')
if self._job is not None:
logger.debug('Experimentation training_plan changed, you may need to update `job`')
return self._training_plan_class
set_training_plan_path(training_plan_path)
set_training_plan_path(training_plan_path)
Sets training_plan_path
+ verification on arguments type.
Training plan path is the path where training plan class is saved as python script/module externally.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
training_plan_path | Union[str, None]) | path to a file containing training plan code ( | required |
Returns:
Type | Description |
---|---|
Union[str, None] | The path that is set for retrieving module where training plan class is defined |
Raises:
Type | Description |
---|---|
FedbiomedExperimentError | bad training_plan_path type |
Source code in fedbiomed/researcher/experiment.py
@exp_exceptions
def set_training_plan_path(self, training_plan_path: Union[str, None]) -> Union[str, None]:
"""Sets `training_plan_path` + verification on arguments type.
Training plan path is the path where training plan class is saved as python script/module externally.
Args:
training_plan_path (Union[str, None]) : path to a file containing training plan code (`str`) or None
(no file containing training plan code, `training_plan` needs to be a class matching one
of [`training_plans`][fedbiomed.common.training_plans]
Returns:
The path that is set for retrieving module where training plan class is defined
Raises:
FedbiomedExperimentError : bad training_plan_path type
"""
# self._training_plan and self._training_plan_is_defined already exist when entering this function
if training_plan_path is None:
self._training_plan_path = None
# .. so training plan is defined if it is a class (+ then, it has been tested as valid)
self._training_plan_is_defined = inspect.isclass(self._training_plan_class)
elif isinstance(training_plan_path, str):
if sanitize_filepath(training_plan_path, platform='auto') == training_plan_path \
and os.path.isfile(training_plan_path):
# provided training plan path is a sane path to an existing file
self._training_plan_path = training_plan_path
# if providing a training plan path, we expect a training plan class name (not a class)
self._training_plan_is_defined = isinstance(self._training_plan_class, str)
else:
# bad filepath
msg = ErrorNumbers.FB410.value + \
f' `training_plan_path` : {training_plan_path} is not a same path to an existing file'
logger.critical(msg)
raise FedbiomedExperimentError(msg)
else:
# bad type
msg = ErrorNumbers.FB410.value + f' `training_plan_path` must be string, but got type: {type(training_plan_path)}'
logger.critical(msg)
raise FedbiomedExperimentError(msg)
# self._training_plan_path is also defined at this point
if not self._training_plan_is_defined:
logger.debug(f'Experiment not fully configured yet: no valid training plan, '
f'training_plan={self._training_plan_class} training_plan_path={self._training_plan_path}')
if self._job is not None:
logger.debug('Experimentation training_plan_path changed, you may need to update `job`')
return self._training_plan_path
set_use_secagg(use_secagg=False, timeout=0)
set_use_secagg(use_secagg=False, timeout=0)
Sets use of secure aggregation and create secure aggregation context if it doesn't exist.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
use_secagg | bool | if | False |
timeout | float | maximum duration for the setup phase of each secagg context element (server key and biprime), thus total secagg setup is twice the | 0 |
Returns:
Type | Description |
---|---|
bool |
|
Raises:
Type | Description |
---|---|
FedbiomedExperimentError | Bad argument type |
Source code in fedbiomed/researcher/experiment.py
@exp_exceptions
def set_use_secagg(self, use_secagg: bool = False, timeout: float = 0) -> bool:
"""Sets use of secure aggregation and create secure aggregation context if it doesn't exist.
Args:
use_secagg: if `True` sets secure aggregation to be used for next rounds and
establish secagg context if it doesn't exist
timeout: maximum duration for the setup phase of each secagg context element
(server key and biprime), thus total secagg setup is twice the `timeout`.
Defaults to `environ['TIMEOUT']` if unset or equals 0.
Returns:
`True` if secure aggregation context could be established
Raises:
FedbiomedExperimentError : Bad argument type
"""
if not isinstance(use_secagg, bool):
msg = ErrorNumbers.FB410.value + f' `use_secagg` : {type(use_secagg)}'
logger.critical(msg)
raise FedbiomedExperimentError(msg)
if isinstance(timeout, int):
timeout = float(timeout) # accept int (and bool...)
if not isinstance(timeout, float):
errmess = f'{ErrorNumbers.FB410.value}: `timeout` : {type(timeout)}'
logger.error(errmess)
raise FedbiomedExperimentError(errmess)
# at this point _fds variable exist
if use_secagg:
# TODO: extend to support non-default strategy with a changing set of selected nodes
# using self._node_selection_strategy.sample_nodes(self._round_current)
if self._fds:
node_parties = self._fds.node_ids()
else:
node_parties = []
parties = [environ['RESEARCHER_ID']] + node_parties
if not self._secagg_servkey:
# a secagg servkey element must be attached to a job_id
if self._job:
self._secagg_servkey = SecaggServkeyContext(parties, self._job.id)
if self._secagg_servkey and not self._secagg_servkey.status():
self._secagg_servkey.setup(timeout)
if not self._secagg_biprime:
self._secagg_biprime = SecaggBiprimeContext(parties)
if not self._secagg_biprime.status():
self._secagg_biprime.setup(timeout)
if self._secagg_servkey and self._secagg_servkey.status() and self._secagg_biprime.status():
self._use_secagg = True
logger.warning("SECURE AGGREGATION NOT IMPLEMENTED YET, DO NOTHING")
else:
logger.debug('Experiment not fully configured yet: no secure aggregation')
else:
self._use_secagg = use_secagg
return self._use_secagg
strategy()
strategy()
Retrieves the class that represents the node selection strategy.
Please see also set_strategy
to set or update node selection strategy.
Returns:
Type | Description |
---|---|
Union[Strategy, None] | A class or object as an instance of |
Source code in fedbiomed/researcher/experiment.py
@exp_exceptions
def strategy(self) -> Union[Strategy, None]:
"""Retrieves the class that represents the node selection strategy.
Please see also [`set_strategy`][fedbiomed.researcher.experiment.Experiment.set_strategy] to set or update
node selection strategy.
Returns:
A class or object as an instance of [`Strategy`][fedbiomed.researcher.strategies.Strategy]. `None` if
it is not declared yet. It means that node selection strategy will be
[`DefaultStrategy`][fedbiomed.researcher.strategies.DefaultStrategy].
"""
return self._node_selection_strategy
tags()
tags()
Retrieves the tags from the experiment object.
Please see set_tags
to set tags.
Returns:
Type | Description |
---|---|
Union[List[str], None] | List of tags that has been set. |
Source code in fedbiomed/researcher/experiment.py
@exp_exceptions
def tags(self) -> Union[List[str], None]:
"""Retrieves the tags from the experiment object.
Please see [`set_tags`][fedbiomed.researcher.experiment.Experiment.set_tags] to set tags.
Returns:
List of tags that has been set. `None` if it isn't declare yet.
"""
return self._tags
test_metric()
test_metric()
Retrieves the metric for validation routine.
Please see also set_test_metric
to change/set test_metric
Returns:
Type | Description |
---|---|
Union[MetricTypes, str, None] | A class as an instance of |
Source code in fedbiomed/researcher/experiment.py
@exp_exceptions
def test_metric(self) -> Union[MetricTypes, str, None]:
"""Retrieves the metric for validation routine.
Please see also [`set_test_metric`][fedbiomed.researcher.experiment.Experiment.set_test_metric]
to change/set `test_metric`
Returns:
A class as an instance of [`MetricTypes`][fedbiomed.common.metrics.MetricTypes]. [`str`][str] for referring
one of metric which provided as attributes in [`MetricTypes`][fedbiomed.common.metrics.MetricTypes].
None, if it isn't declared yet.
"""
return self._training_args['test_metric']
test_metric_args()
test_metric_args()
Retrieves the metric argument for the metric function that is going to be used.
Please see also set_test_metric
to change/set test_metric
and get more information on the arguments can be used.
Returns:
Type | Description |
---|---|
Dict[str, Any] | A dictionary that contains arguments for metric function. See |
Source code in fedbiomed/researcher/experiment.py
@exp_exceptions
def test_metric_args(self) -> Dict[str, Any]:
"""Retrieves the metric argument for the metric function that is going to be used.
Please see also [`set_test_metric`][fedbiomed.researcher.experiment.Experiment.set_test_metric] to change/set
`test_metric` and get more information on the arguments can be used.
Returns:
A dictionary that contains arguments for metric function. See [`set_test_metric`]
[fedbiomed.researcher.experiment.Experiment.set_test_metric]
"""
return self._training_args['test_metric_args']
test_on_global_updates()
test_on_global_updates()
Retrieves the status of whether validation will be performed on globally updated (aggregated) parameters by the nodes at the beginning of each round.
Please see also set_test_on_global_updates
.
Returns:
Type | Description |
---|---|
bool | True, if validation is active on globally updated (aggregated) parameters. False for vice versa. |
Source code in fedbiomed/researcher/experiment.py
@exp_exceptions
def test_on_global_updates(self) -> bool:
""" Retrieves the status of whether validation will be performed on globally updated (aggregated)
parameters by the nodes at the beginning of each round.
Please see also [`set_test_on_global_updates`]
[fedbiomed.researcher.experiment.Experiment.set_test_on_global_updates].
Returns:
True, if validation is active on globally updated (aggregated) parameters. False for vice versa.
"""
return self._training_args['test_on_global_updates']
test_on_local_updates()
test_on_local_updates()
Retrieves the status of whether validation will be performed on locally updated parameters by the nodes at the end of each round.
Please see also set_test_on_local_updates
.
Returns:
Type | Description |
---|---|
bool | True, if validation is active on locally updated parameters. False for vice versa. |
Source code in fedbiomed/researcher/experiment.py
@exp_exceptions
def test_on_local_updates(self) -> bool:
"""Retrieves the status of whether validation will be performed on locally updated parameters by
the nodes at the end of each round.
Please see also
[`set_test_on_local_updates`][fedbiomed.researcher.experiment.Experiment.set_test_on_local_updates].
Returns:
True, if validation is active on locally updated parameters. False for vice versa.
"""
return self._training_args['test_on_local_updates']
test_ratio()
test_ratio()
Retrieves the ratio for validation partition of entire dataset.
Please see also set_test_ratio
to change/set test_ratio
Returns:
Type | Description |
---|---|
float | The ratio for validation part, |
Source code in fedbiomed/researcher/experiment.py
@exp_exceptions
def test_ratio(self) -> float:
"""Retrieves the ratio for validation partition of entire dataset.
Please see also [`set_test_ratio`][fedbiomed.researcher.experiment.Experiment.set_test_ratio] to
change/set `test_ratio`
Returns:
The ratio for validation part, `1 - test_ratio` is ratio for training set.
"""
return self._training_args['test_ratio']
training_args()
training_args()
Retrieves training arguments.
Please see also set_training_args
Returns:
Type | Description |
---|---|
dict | The arguments that are going to be passed to |
Source code in fedbiomed/researcher/experiment.py
@exp_exceptions
def training_args(self) -> dict:
"""Retrieves training arguments.
Please see also [`set_training_args`][fedbiomed.researcher.experiment.Experiment.set_training_args]
Returns:
The arguments that are going to be passed to `training_routine` of [`training_plans`]
[fedbiomed.common.training_plans] classes to perfom training on the node side.
An example training routine: [`TorchTrainingPlan.training_routine`]
[fedbiomed.common.training_plans.TorchTrainingPlan.training_routine]
"""
return self._training_args.dict()
training_data()
training_data()
Retrieves the training data which is an instance of FederatedDataset
Please see set_training_data
to set or update training data.
Returns:
Type | Description |
---|---|
Union[FederatedDataSet, None] | Object that contains meta-data for the datasets of each node. |
Source code in fedbiomed/researcher/experiment.py
@exp_exceptions
def training_data(self) -> Union[FederatedDataSet, None]:
"""Retrieves the training data which is an instance of [`FederatedDataset`]
[fedbiomed.researcher.datasets.FederatedDataSet]
Please see [`set_training_data`][fedbiomed.researcher.experiment.Experiment.set_training_data] to set or
update training data.
Returns:
Object that contains meta-data for the datasets of each node. `None` if it isn't set yet.
"""
return self._fds
training_plan()
training_plan()
Retrieves training plan instance that has been built and send the nodes through HTTP restfull service for each round of training.
Loading aggregated parameters
After retrieving the training plan instance aggregated parameters should be loaded. Example:
training_plan = exp.training_plan()
training_plan.model.load_state_dict(exp.aggregated_params()[rounds - 1]['params'])
Returns:
Type | Description |
---|---|
Union[TrainingPlan, None] | Training plan object which is an instance one of training_plans. |
Source code in fedbiomed/researcher/experiment.py
@exp_exceptions
def training_plan(self) -> Union[TrainingPlan, None]:
""" Retrieves training plan instance that has been built and send the nodes through HTTP restfull service
for each round of training.
!!! info "Loading aggregated parameters"
After retrieving the training plan instance aggregated parameters should be loaded.
Example:
```python
training_plan = exp.training_plan()
training_plan.model.load_state_dict(exp.aggregated_params()[rounds - 1]['params'])
```
Returns:
Training plan object which is an instance one of [training_plans][fedbiomed.common.training_plans].
"""
# at this point `job` is defined but may be None
if self._job is None:
logger.error('No `job` defined for experiment, cannot get `training_plan`')
return None
else:
return self._job.training_plan
training_plan_approve(training_plan, description='no description provided', nodes=[], timeout=5)
training_plan_approve(training_plan, description='no description provided', nodes=[], timeout=5)
Send a training plan and a ApprovalRequest message to node(s).
This is a simple redirect to the Requests.training_plan_approve() method.
If a list of node id(s) is provided, the message will be individually sent to all nodes of the list. If the node id(s) list is None (default), the message is broadcast to all nodes.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
training_plan | BaseTrainingPlan | the training plan to upload and send to the nodes for approval. It can be: - a path_name (str) - a training_plan (class) - an instance of a training plan | required |
nodes | list | list of nodes (specified by their UUID) | [] |
description | str | Description for training plan approve request | 'no description provided' |
timeout | int | maximum waiting time for the answers | 5 |
Returns:
Name | Type | Description |
---|---|---|
dict | a dictionary of pairs (node_id: status), where status indicates to the researcher | |
dict | that the training plan has been correctly downloaded on the node side. | |
Warning | dict | status does not mean that the training plan is approved, only that it has been added |
dict | to the "approval queue" on the node side. |
Source code in fedbiomed/researcher/experiment.py
@exp_exceptions
def training_plan_approve(self,
training_plan: 'BaseTrainingPlan',
description: str = "no description provided",
nodes: list = [],
timeout: int = 5) -> dict:
"""Send a training plan and a ApprovalRequest message to node(s).
This is a simple redirect to the Requests.training_plan_approve() method.
If a list of node id(s) is provided, the message will be individually sent
to all nodes of the list.
If the node id(s) list is None (default), the message is broadcast to all nodes.
Args:
training_plan: the training plan to upload and send to the nodes for approval.
It can be:
- a path_name (str)
- a training_plan (class)
- an instance of a training plan
nodes: list of nodes (specified by their UUID)
description: Description for training plan approve request
timeout: maximum waiting time for the answers
Returns:
a dictionary of pairs (node_id: status), where status indicates to the researcher
that the training plan has been correctly downloaded on the node side.
Warning: status does not mean that the training plan is approved, only that it has been added
to the "approval queue" on the node side.
"""
return self._reqs.training_plan_approve(training_plan,
description,
nodes,
timeout)
training_plan_class()
training_plan_class()
Retrieves the training plan (training plan class) that is created for training.
Please see also set_training_plan_class
.
Returns:
Type | Description |
---|---|
Union[Type_TrainingPlan, str, None] | Training plan class as one of |
Source code in fedbiomed/researcher/experiment.py
@exp_exceptions
def training_plan_class(self) -> Union[Type_TrainingPlan, str, None]:
"""Retrieves the training plan (training plan class) that is created for training.
Please see also [`set_training_plan_class`][fedbiomed.researcher.experiment.Experiment.set_training_plan_class].
Returns:
Training plan class as one of [`Type_TrainingPlan`][fedbiomed.researcher.experiment.Type_TrainingPlan]. None
if it isn't declared yet. [`str`][str] if [`training_plan_path`]
[fedbiomed.researcher.experiment.Experiment.training_plan_path]that represents training plan class
created externally is provided.
"""
return self._training_plan_class
training_plan_file(display=True)
training_plan_file(display=True)
This method displays saved final training plan for the experiment that will be sent to the nodes for training.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
display | bool | If | True |
Returns:
Type | Description |
---|---|
str | Path to training plan file |
Raises:
Type | Description |
---|---|
FedbiomedExperimentError | bad argument type, or cannot read training plan file content |
Source code in fedbiomed/researcher/experiment.py
@exp_exceptions
def training_plan_file(self, display: bool = True) -> str:
""" This method displays saved final training plan for the experiment
that will be sent to the nodes for training.
Args:
display: If `True`, prints content of the training plan file. Default is `True`
Returns:
Path to training plan file
Raises:
FedbiomedExperimentError: bad argument type, or cannot read training plan file content
"""
if not isinstance(display, bool):
# bad type
msg = ErrorNumbers.FB410.value + \
f', in method `training_plan_file` param `display` : type {type(display)}'
logger.critical(msg)
raise FedbiomedExperimentError(msg)
# at this point, self._job exists (initialized in constructor)
if self._job is None:
# cannot check training plan file if job not defined
msg = ErrorNumbers.FB412.value + \
', in method `training_plan_file` : no `job` defined for experiment'
logger.critical(msg)
raise FedbiomedExperimentError(msg)
training_plan_file = self._job.training_plan_file
# Display content so researcher can copy
try:
if display:
with open(training_plan_file) as file:
content = file.read()
file.close()
print(content)
except OSError as e:
# cannot read training plan file content
msg = ErrorNumbers.FB412.value + \
f', in method `training_plan_file` : error when reading training plan file - {e}'
logger.critical(msg)
raise FedbiomedExperimentError(msg)
return self._job.training_plan_file
training_plan_path()
training_plan_path()
Retrieves training plan path where training plan class is saved as python script externally.
Please see also set_training_plan_path
.
Returns:
Type | Description |
---|---|
Union[str, None] | Path to python script ( |
Source code in fedbiomed/researcher/experiment.py
@exp_exceptions
def training_plan_path(self) -> Union[str, None]:
"""Retrieves training plan path where training plan class is saved as python script externally.
Please see also [`set_training_plan_path`][fedbiomed.researcher.experiment.Experiment.set_training_plan_path].
Returns:
Path to python script (`.py`) where training plan class (training plan) is created. None if it isn't
declared yet.
"""
return self._training_plan_path
training_replies()
training_replies()
Retrieves training replies of each round of training.
Training replies contains timing statistics and the files parth/URLs that has been received after each round.
Returns:
Type | Description |
---|---|
Union[dict, None] | Dictionary of training replies keys stand for each round of training. None, if Job isn't declared or empty dict if there is no training round has been run. |
Source code in fedbiomed/researcher/experiment.py
@exp_exceptions
def training_replies(self) -> Union[dict, None]:
"""Retrieves training replies of each round of training.
Training replies contains timing statistics and the files parth/URLs that has been received after each round.
Returns:
Dictionary of training replies keys stand for each round of training. None, if
[Job][fedbiomed.researcher.job] isn't declared or empty dict if there is no training round has been run.
"""
# at this point `job` is defined but may be None
if self._job is None:
logger.error('No `job` defined for experiment, cannot get `training_replies`')
return None
else:
return self._job.training_replies
use_secagg()
use_secagg()
Retrieves the status of whether secure aggregation will be used for next rounds.
Please see alsoset_use_secagg
Returns:
Type | Description |
---|---|
bool | True if secure aggregation will be used for next rounds. |
Source code in fedbiomed/researcher/experiment.py
@exp_exceptions
def use_secagg(self) -> bool:
"""Retrieves the status of whether secure aggregation will be used for next rounds.
Please see also[`set_use_secagg`]
[fedbiomed.researcher.experiment.Experiment.set_use_secagg]
Returns:
True if secure aggregation will be used for next rounds.
"""
return self._use_secagg
Functions
exp_exceptions(function)
exp_exceptions(function)
Decorator for handling all exceptions in the Experiment class() : pretty print a message for the user, quit Experiment.
Source code in fedbiomed/researcher/experiment.py
def exp_exceptions(function):
"""
Decorator for handling all exceptions in the Experiment class() :
pretty print a message for the user, quit Experiment.
"""
# wrap the original function catching the exceptions
@functools.wraps(function)
def payload(*args, **kwargs):
code = 0
try:
ret = function(*args, **kwargs)
except FedbiomedSilentTerminationError:
# handle the case of nested calls will exception decorator
raise
except SystemExit as e:
# handle the sys.exit() from other clauses
sys.exit(e)
except KeyboardInterrupt:
code = 1
print(
'\n--------------------',
'Fed-BioMed researcher stopped due to keyboard interrupt',
'--------------------',
sep=os.linesep)
logger.critical('Fed-BioMed researcher stopped due to keyboard interrupt')
except FedbiomedError as e:
code = 1
print(
'\n--------------------',
f'Fed-BioMed researcher stopped due to exception:\n{str(e)}',
'--------------------',
sep=os.linesep)
# redundant, should be already logged when raising exception
# logger.critical(f'Fed-BioMed researcher stopped due to exception:\n{str(e)}')
except BaseException as e:
code = 3
print(
'\n--------------------',
f'Fed-BioMed researcher stopped due to unknown error:\n{str(e)}',
'More details in the backtrace extract below',
'--------------------',
sep=os.linesep)
# at most 5 backtrace entries to avoid too long output
traceback.print_exc(limit=5, file=sys.stdout)
print('--------------------')
logger.critical(f'Fed-BioMed stopped due to unknown error:\n{str(e)}')
if code != 0:
if is_ipython():
# raise a silent specific exception, don't exit the interactive kernel
raise FedbiomedSilentTerminationError
else:
# exit the process
sys.exit(code)
return ret
return payload