fedbiomed.node.round
Module:fedbiomed.node.round
implementation of Round class of the node component
Classes
Round
Round(
model_kwargs=None,
training_kwargs=None,
training=True,
dataset=None,
training_plan_url=None,
training_plan_class=None,
params_url=None,
job_id=None,
researcher_id=None,
history_monitor=None,
node_args=None,
dlp_and_loading_block_metadata=None,
)
This class represents the training part execute by a node in a given round
Parameters:
Name | Type | Description | Default |
---|---|---|---|
model_kwargs | dict | contains model args | None |
training_kwargs | dict | contains training arguments | None |
dataset | dict | dataset details to use in this round. It contains the dataset name, dataset's id, data path, its shape, its description... | None |
training_plan_url | str | url from which to download training plan file | None |
training_plan_class | str | name of the training plan (eg 'MyTrainingPlan') | None |
params_url | str | url from which to upload/download model params | None |
job_id | str | job id | None |
researcher_id | str | researcher id | None |
history_monitor | HistoryMonitor | Sends real-time feed-back to end-user during training | None |
node_args | Union[dict, None] | command line arguments for node. Can include: - | None |
Source code in fedbiomed/node/round.py
def __init__(self,
model_kwargs: dict = None,
training_kwargs: dict = None,
training: bool = True,
dataset: dict = None,
training_plan_url: str = None,
training_plan_class: str = None,
params_url: str = None,
job_id: str = None,
researcher_id: str = None,
history_monitor: HistoryMonitor = None,
node_args: Union[dict, None] = None,
dlp_and_loading_block_metadata: Optional[Tuple[dict, List[dict]]] = None):
"""Constructor of the class
Args:
model_kwargs: contains model args
training_kwargs: contains training arguments
dataset: dataset details to use in this round. It contains the dataset name, dataset's id,
data path, its shape, its description...
training_plan_url: url from which to download training plan file
training_plan_class: name of the training plan (eg 'MyTrainingPlan')
params_url: url from which to upload/download model params
job_id: job id
researcher_id: researcher id
history_monitor: Sends real-time feed-back to end-user during training
node_args: command line arguments for node. Can include:
- `gpu (bool)`: propose use a GPU device if any is available.
- `gpu_num (Union[int, None])`: if not None, use the specified GPU device instead of default
GPU device if this GPU device is available.
- `gpu_only (bool)`: force use of a GPU device if any available, even if researcher
doesn't request for using a GPU.
"""
self.dataset = dataset
self.training_plan_url = training_plan_url
self.training_plan_class = training_plan_class
self.params_url = params_url
self.job_id = job_id
self.researcher_id = researcher_id
self.history_monitor = history_monitor
self.tp_security_manager = TrainingPlanSecurityManager()
self.node_args = node_args
self.repository = Repository(environ['UPLOADS_URL'], environ['TMP_DIR'], environ['CACHE_DIR'])
self.training_plan = None
self.training = training
self._dlp_and_loading_block_metadata = dlp_and_loading_block_metadata
self.training_kwargs = training_kwargs
self.model_arguments = model_kwargs
self.testing_arguments = None
self.loader_arguments = None
self.training_arguments = None
Attributes
dataset instance-attribute
dataset = dataset
history_monitor instance-attribute
history_monitor = history_monitor
job_id instance-attribute
job_id = job_id
loader_arguments instance-attribute
loader_arguments = None
model_arguments instance-attribute
model_arguments = model_kwargs
node_args instance-attribute
node_args = node_args
params_url instance-attribute
params_url = params_url
repository instance-attribute
repository = Repository(
environ["UPLOADS_URL"],
environ["TMP_DIR"],
environ["CACHE_DIR"],
)
researcher_id instance-attribute
researcher_id = researcher_id
testing_arguments instance-attribute
testing_arguments = None
tp_security_manager instance-attribute
tp_security_manager = TrainingPlanSecurityManager()
training instance-attribute
training = training
training_arguments instance-attribute
training_arguments = None
training_kwargs instance-attribute
training_kwargs = training_kwargs
training_plan instance-attribute
training_plan = None
training_plan_class instance-attribute
training_plan_class = training_plan_class
training_plan_url instance-attribute
training_plan_url = training_plan_url
Functions
initialize_validate_training_arguments()
initialize_validate_training_arguments()
Validates and separates training argument for experiment round
Source code in fedbiomed/node/round.py
def initialize_validate_training_arguments(self) -> None:
"""Validates and separates training argument for experiment round"""
self.training_arguments = TrainingArgs(self.training_kwargs, only_required=False)
self.testing_arguments = self.training_arguments.testing_arguments()
self.loader_arguments = self.training_arguments.loader_arguments()
run_model_training()
run_model_training()
This method downloads training plan file; then runs the training of a model and finally uploads model params to the file repository
Returns:
Type | Description |
---|---|
dict[str, Any] | Returns the corresponding node message, training reply instance |
Source code in fedbiomed/node/round.py
def run_model_training(self) -> dict[str, Any]:
"""This method downloads training plan file; then runs the training of a model
and finally uploads model params to the file repository
Returns:
Returns the corresponding node message, training reply instance
"""
is_failed = False
# Initialize and validate requested experiment/training arguments
try:
self.initialize_validate_training_arguments()
except FedbiomedUserInputError as e:
return self._send_round_reply(success=False, message=str(e))
except Exception as e:
msg = 'Unexpected error while validating training argument'
logger.debug(f"{msg}: {e}")
return self._send_round_reply(success=False, message=f'{msg}. Please contact system provider')
try:
# module name cannot contain dashes
import_module = 'training_plan_' + str(uuid.uuid4().hex)
status, _ = self.repository.download_file(self.training_plan_url,
import_module + '.py')
if status != 200:
error_message = "Cannot download training plan file: " + self.training_plan_url
return self._send_round_reply(success=False, message=error_message)
else:
if environ["TRAINING_PLAN_APPROVAL"]:
approved, training_plan_ = self.tp_security_manager.check_training_plan_status(
os.path.join(environ["TMP_DIR"], import_module + '.py'),
TrainingPlanApprovalStatus.APPROVED)
if not approved:
error_message = f'Requested training plan is not approved by the node: {environ["NODE_ID"]}'
return self._send_round_reply(success=False, message=error_message)
else:
logger.info(f'Training plan has been approved by the node {training_plan_["name"]}')
if not is_failed:
status, params_path = self.repository.download_file(
self.params_url,
'training_plan_' + str(uuid.uuid4()) + '.pt')
if (status != 200) or params_path is None:
error_message = f"Cannot download param file: {self.params_url}"
return self._send_round_reply(success=False, message=error_message)
except Exception as e:
is_failed = True
# FIXME: this will trigger if model is not approved by node
error_message = f"Cannot download training plan files: {str(e)}"
return self._send_round_reply(success=False, message=error_message)
# import module, declare the training plan, load parameters
try:
sys.path.insert(0, environ['TMP_DIR'])
module = importlib.import_module(import_module)
train_class = getattr(module, self.training_plan_class)
self.training_plan = train_class()
sys.path.pop(0)
except Exception as e:
error_message = f"Cannot instantiate training plan object: {str(e)}"
return self._send_round_reply(success=False, message=error_message)
try:
self.training_plan.post_init(model_args=self.model_arguments,
training_args=self.training_arguments)
except Exception as e:
error_message = f"Can't initialize training plan with the arguments: {e}"
return self._send_round_reply(success=False, message=error_message)
# import model params into the training plan instance
try:
self.training_plan.load(params_path, to_params=False)
except Exception as e:
error_message = f"Cannot initialize model parameters: f{str(e)}"
return self._send_round_reply(success=False, message=error_message)
# Split training and validation data
try:
self._set_training_testing_data_loaders()
except FedbiomedError as e:
error_message = f"Can not create validation/train data: {str(e)}"
return self._send_round_reply(success=False, message=error_message)
except Exception as e:
error_message = f"Undetermined error while creating data for training/validation. Can not create " \
f"validation/train data: {str(e)}"
return self._send_round_reply(success=False, message=error_message)
# Validation Before Training
if self.testing_arguments.get('test_on_global_updates', False) is not False:
# Last control to make sure validation data loader is set.
if self.training_plan.testing_data_loader is not None:
try:
self.training_plan.testing_routine(metric=self.testing_arguments.get('test_metric', None),
metric_args=self.testing_arguments.get('test_metric_args', {}),
history_monitor=self.history_monitor,
before_train=True)
except FedbiomedError as e:
logger.error(f"{ErrorNumbers.FB314}: During the validation phase on global parameter updates; "
f"{str(e)}")
except Exception as e:
logger.error(f"Undetermined error during the testing phase on global parameter updates: "
f"{e}")
else:
logger.error(f"{ErrorNumbers.FB314}: Can not execute validation routine due to missing testing dataset"
f"Please make sure that `test_ratio` has been set correctly")
#
# If training is activated.
if self.training:
if self.training_plan.training_data_loader is not None:
try:
results = {}
rtime_before = time.perf_counter()
ptime_before = time.process_time()
self.training_plan.training_routine(history_monitor=self.history_monitor,
node_args=self.node_args
)
rtime_after = time.perf_counter()
ptime_after = time.process_time()
except Exception as e:
error_message = f"Cannot train model in round: {str(e)}"
return self._send_round_reply(success=False, message=error_message)
# Validation after training
if self.testing_arguments.get('test_on_local_updates', False) is not False:
if self.training_plan.testing_data_loader is not None:
try:
self.training_plan.testing_routine(metric=self.testing_arguments.get('test_metric', None),
metric_args=self.testing_arguments.get('test_metric_args',
{}),
history_monitor=self.history_monitor,
before_train=False)
except FedbiomedError as e:
logger.error(
f"{ErrorNumbers.FB314.value}: During the validation phase on local parameter updates; "
f"{str(e)}")
except Exception as e:
logger.error(f"Undetermined error during the validation phase on local parameter updates"
f"{e}")
else:
logger.error(
f"{ErrorNumbers.FB314.value}: Can not execute validation routine due to missing testing "
f"dataset please make sure that test_ratio has been set correctly")
# Upload results
results['researcher_id'] = self.researcher_id
results['job_id'] = self.job_id
results['model_params'] = self.training_plan.after_training_params()
results['node_id'] = environ['NODE_ID']
try:
# TODO : should validation status code but not yet returned
# by upload_file
filename = environ['TMP_DIR'] + '/node_params_' + str(uuid.uuid4()) + '.pt'
self.training_plan.save(filename, results)
res = self.repository.upload_file(filename)
logger.info("results uploaded successfully ")
except Exception as e:
is_failed = True
error_message = f"Cannot upload results: {str(e)}"
return self._send_round_reply(success=False, message=error_message)
# end : clean the namespace
try:
del self.training_plan
del import_module
except Exception as e:
logger.debug(f'Exception raise while deleting training plan instance: {e}')
pass
return self._send_round_reply(success=True,
timing={'rtime_training': rtime_after - rtime_before,
'ptime_training': ptime_after - ptime_before},
params_url=res['file'])
else:
# Only for validation
return self._send_round_reply(success=True)