fedbiomed.researcher.requests

Module: fedbiomed.researcher.requests

Implements the message exchanges from researcher to nodes

Classes

Requests

CLASS
Requests(mess=None)

Represents the requests addressed from Researcher to nodes. It creates a task queue storing reply to each incoming message. Starts a message queue and reconfigures message to be sent into a Messaging object.

Parameters:

Name Type Description Default
mess Any

message to be sent by default.

None
Source code in fedbiomed/researcher/requests.py
def __init__(self, mess: Any = None):
    """
    Constructor of the class

    Args:
        mess: message to be sent by default.
    """
    # Need to ensure unique per researcher instance message queue to avoid conflicts
    # in case several instances of researcher (with same researcher_id ?) are active,
    # eg: a notebook not quitted and launching a script
    self.queue = TasksQueue(environ['MESSAGES_QUEUE_DIR'] + '_' + str(uuid.uuid4()), environ['TMP_DIR'])

    if mess is None or type(mess) is not Messaging:
        self.messaging = Messaging(self.on_message,
                                   ComponentType.RESEARCHER,
                                   environ['RESEARCHER_ID'],
                                   environ['MQTT_BROKER'],
                                   environ['MQTT_BROKER_PORT'])
        self.messaging.start(block=False)
    else:
        self.messaging = mess

    # defines the sequence used for ping protocol
    self._sequence = 0

    self._monitor_message_callback = None

Attributes

messaging instance-attribute
messaging = Messaging(
    self.on_message,
    ComponentType.RESEARCHER,
    environ["RESEARCHER_ID"],
    environ["MQTT_BROKER"],
    environ["MQTT_BROKER_PORT"],
)
queue instance-attribute
queue = TasksQueue(
    environ["MESSAGES_QUEUE_DIR"] + "_" + str(uuid.uuid4()),
    environ["TMP_DIR"],
)

Functions

add_monitor_callback(callback)

Adds callback function for monitor messages

Parameters:

Name Type Description Default
callback Callable[[Dict], None]

Callback function for handling monitor messages that come due 'general/monitoring' channel

required
Source code in fedbiomed/researcher/requests.py
def add_monitor_callback(self, callback: Callable[[Dict], None]):
    """ Adds callback function for monitor messages

    Args:
        callback: Callback function for handling monitor messages that come due 'general/monitoring' channel
    """

    self._monitor_message_callback = callback
get_messages(commands=[], time=0.0)

Goes through the queue and gets messages with the specific command

Parameters:

Name Type Description Default
commands list

Checks if message is containing the expecting command (the message is discarded if it doesn't). Defaults to None (no command message checking, meaning all incoming messages are considered).

[]
time float

Time to sleep in seconds before considering incoming messages. Defaults to .0.

0.0

Returns:

Type Description
Responses

Contains the corresponding answers

Source code in fedbiomed/researcher/requests.py
def get_messages(self, commands: list = [], time: float = .0) -> Responses:
    """Goes through the queue and gets messages with the specific command

    Args:
        commands: Checks if message is containing the expecting command (the message  is discarded if it doesn't).
            Defaults to None (no command message checking, meaning all incoming messages are considered).
        time: Time to sleep in seconds before considering incoming messages. Defaults to .0.

    Returns:
        Contains the corresponding answers
    """
    sleep(time)

    answers = []
    for _ in range(self.queue.qsize()):
        try:
            item = self.queue.get(block=False)
            self.queue.task_done()

            if not commands or \
                    ('command' in item.keys() and item['command'] in commands):
                answers.append(item)
            else:
                # currently trash all other messages
                pass

        except FedbiomedTaskQueueError:
            # may happend on self.queue.get()
            # if queue is empty -> we ignore it
            pass

    return Responses(answers)
get_messaging()

Retrieves Messaging object

Returns:

Type Description
Messaging

Messaging object

Source code in fedbiomed/researcher/requests.py
def get_messaging(self) -> Messaging:
    """Retrieves Messaging object

    Returns:
        Messaging object
    """
    return self.messaging
get_responses(look_for_commands, timeout=None, only_successful=True, while_responses=True)

Waits for all nodes' answers, regarding a specific command returns the list of all nodes answers

Parameters:

Name Type Description Default
look_for_commands list

instruction that has been sent to node (see Message commands)

required
timeout float

wait for a specific duration before collecting nodes messages. Defaults to None. If set to None; uses value in global variable TIMEOUT instead.

None
only_successful bool

deal only with messages that have been tagged as successful (ie with field success=True). Defaults to True.

True
while_responses bool

if True, continue while we get at least one response every timeout seconds. If False, always terminate after timeout even if we get some response.

True
Source code in fedbiomed/researcher/requests.py
def get_responses(self,
                  look_for_commands: list,
                  timeout: float = None,
                  only_successful: bool = True,
                  while_responses: bool = True) -> Responses:
    """Waits for all nodes' answers, regarding a specific command returns the list of all nodes answers

    Args:
        look_for_commands: instruction that has been sent to node (see `Message` commands)
        timeout: wait for a specific duration before collecting nodes messages. Defaults to None. If set to None;
            uses value in global variable TIMEOUT instead.
        only_successful: deal only with messages that have been tagged as successful (ie with field `success=True`).
            Defaults to True.
        while_responses: if `True`, continue while we get at least one response every
            `timeout` seconds. If False, always terminate after `timeout` even if we get some
            response.
    """
    timeout = timeout or environ['TIMEOUT']
    responses = []

    while True:
        sleep(timeout)
        new_responses = []
        for resp in self.get_messages(commands=look_for_commands, time=0):
            try:
                if not only_successful:
                    new_responses.append(resp)
                elif resp['success']:
                    # TODO: test if 'success'key exists
                    # what do we do if not ?
                    new_responses.append(resp)
            except Exception as e:
                logger.error(f"Incorrect message received: {resp} - error: {e}")
                pass

        if len(new_responses) == 0:
            "Timeout finished"
            break
        responses += new_responses
        if not while_responses:
            break

    return Responses(responses)
list(nodes=None, verbose=False)

Lists available data in each node

Parameters:

Name Type Description Default
nodes list

Listings datasets by given node ids. Default is None.

None
verbose bool

If it is true it prints datasets in readable format

False
Source code in fedbiomed/researcher/requests.py
def list(self, nodes: list = None, verbose: bool = False) -> dict:
    """Lists available data in each node

    Args:
        nodes: Listings datasets by given node ids. Default is None.
        verbose: If it is true it prints datasets in readable format
    """

    # If nodes list is provided
    if nodes:
        for node in nodes:
            self.messaging.send_message(
                ResearcherMessages.request_create({'researcher_id': environ['RESEARCHER_ID'],
                                                   "command": "list"}
                                                  ).get_dict(),
                client=node)
        logger.info(f'Listing datasets of given list of nodes : {nodes}')
    else:
        self.messaging.send_message(
            ResearcherMessages.request_create({'researcher_id': environ['RESEARCHER_ID'],
                                               "command": "list"}).get_dict())
        logger.info('Listing available datasets in all nodes... ')

    # Get datasets from node responses
    data_found = {}
    for resp in self.get_responses(look_for_commands=['list']):
        if not nodes:
            data_found[resp.get('node_id')] = resp.get('databases')
        elif resp.get('node_id') in nodes:
            data_found[resp.get('node_id')] = resp.get('databases')

    # Print dataset tables usong data_found object
    if verbose:
        for node in data_found:
            if len(data_found[node]) > 0:
                rows = [row.values() for row in data_found[node]]
                headers = data_found[node][0].keys()
                info = '\n Node: {} | Number of Datasets: {} \n'.format(node, len(data_found[node]))
                logger.info(info + tabulate.tabulate(rows, headers, tablefmt="grid") + '\n')
            else:
                logger.info('\n Node: {} | Number of Datasets: {}'.format(node, len(data_found[node])) +
                            " No data has been set up for this node.")

    return data_found
on_message(msg, topic)

Handler called by the Messaging class, when a message is received on researcher side.

It is run in the communication process and must ba as quick as possible: - it deals with quick messages (eg: ping/pong) - it stores the replies of the nodes to the task queue, the message will bee treated by the main (computing) thread.

Parameters:

Name Type Description Default
msg Dict[str, Any]

de-serialized msg

required
topic str

topic to publish message (MQTT channel)

required
Source code in fedbiomed/researcher/requests.py
def on_message(self, msg: Dict[str, Any], topic: str):
    """ Handler called by the [`Messaging`][fedbiomed.common.messaging] class,  when a message is received on
    researcher side.

    It is run in the communication process and must ba as quick as possible:
    - it deals with quick messages (eg: ping/pong)
    - it stores the replies of the nodes to the task queue, the message will bee
    treated by the main (computing) thread.

    Args:
        msg: de-serialized msg
        topic: topic to publish message (MQTT channel)
    """

    if topic == "general/logger":
        #
        # forward the treatment to node_log_handling() (same thread)
        self.print_node_log_message(ResearcherMessages.reply_create(msg).get_dict())
    elif topic == "general/researcher":
        #
        # *Reply messages (SearchReply, TrainReply) added to the TaskQueue
        self.queue.add(ResearcherMessages.reply_create(msg).get_dict())

        # we may trap FedbiomedTaskQueueError here then queue full
        # but what can we do except of quitting ?

    elif topic == "general/monitoring":
        if self._monitor_message_callback is not None:
            # Pass message to Monitor's on message handler
            self._monitor_message_callback(ResearcherMessages.reply_create(msg).get_dict())
    else:
        logger.error("message received on wrong topic (" + topic + ") - IGNORING")
ping_nodes()

Pings online nodes

Returns:

Type Description
list

List ids of up and running nodes

Source code in fedbiomed/researcher/requests.py
def ping_nodes(self) -> list:
    """ Pings online nodes

    Returns:
        List ids of up and running nodes
    """
    self.send_message(
        {'researcher_id': environ['RESEARCHER_ID'], 'command': 'ping'},
        add_sequence=True)

    # TODO: check sequence number in pong
    # TODO: (below, above) handle exceptions
    nodes_online = [resp['node_id'] for resp in self.get_responses(look_for_commands=['pong'])]
    return nodes_online
print_node_log_message(log)
staticmethod

Prints logger messages coming from the node

It is run on the communication process and must be as quick as possible: - all logs (coming from the nodes) are forwarded to the researcher logger (immediate display on console/file/whatever)

Source code in fedbiomed/researcher/requests.py
@staticmethod
def print_node_log_message(log: Dict[str, Any]):
    """Prints logger messages coming from the node

    It is run on the communication process and must be as quick as possible:
    - all logs (coming from the nodes) are forwarded to the researcher logger
    (immediate display on console/file/whatever)
    """

    # log contains the original message sent by the node
    # FIXME: we should use `fedbiomed.common.json.deserialize` method
    # instead of the json method when extracting json message
    original_msg = json.loads(log["msg"])

    # Loging fancy feedback for training
    logger.info("\033[1m{}\033[0m\n"
                "\t\t\t\t\t\033[1m NODE\033[0m {}\n"
                "\t\t\t\t\t\033[1m MESSAGE:\033[0m {}\033[0m\n"
                "{}".format(log["level"],
                            log["node_id"],
                            original_msg["message"],
                            5 * "-------------"))
remove_monitor_callback()

Removes callback function for Monitor class.

Source code in fedbiomed/researcher/requests.py
def remove_monitor_callback(self):
    """ Removes callback function for Monitor class. """

    self._monitor_message_callback = None
search(tags, nodes=None)

Searches available data by tags

Parameters:

Name Type Description Default
tags tuple

Tuple containing tags associated to the data researcher is looking for.

required
nodes list

optionally filter nodes with this list. Default is no filtering, consider all nodes

None

Returns:

Type Description
dict

A dict with node_id as keys, and list of dicts describing available data as values

Source code in fedbiomed/researcher/requests.py
def search(self, tags: tuple, nodes: list = None) -> dict:
    """ Searches available data by tags

    Args:
        tags: Tuple containing tags associated to the data researcher is looking for.
        nodes: optionally filter nodes with this list. Default is no filtering, consider all nodes

    Returns:
        A dict with node_id as keys, and list of dicts describing available data as values
    """

    # Search datasets based on node specifications
    if nodes:
        logger.info(f'Searching dataset with data tags: {tags} on specified nodes: {nodes}')
        for node in nodes:
            self.messaging.send_message(
                ResearcherMessages.request_create({'tags': tags,
                                                   'researcher_id': environ['RESEARCHER_ID'],
                                                   "command": "search"}
                                                  ).get_dict(),
                client=node)
    else:
        logger.info(f'Searching dataset with data tags: {tags} for all nodes')
        self.messaging.send_message(
            ResearcherMessages.request_create({'tags': tags,
                                               'researcher_id': environ['RESEARCHER_ID'],
                                               "command": "search"}
                                              ).get_dict())

    data_found = {}
    for resp in self.get_responses(look_for_commands=['search']):
        if not nodes:
            data_found[resp.get('node_id')] = resp.get('databases')
        elif resp.get('node_id') in nodes:
            data_found[resp.get('node_id')] = resp.get('databases')

        logger.info('Node selected for training -> {}'.format(resp.get('node_id')))

    if not data_found:
        logger.info("No available dataset has found in nodes with tags: {}".format(tags))

    return data_found
send_message(msg, client=None, add_sequence=False)

Ask the messaging class to send a new message (receivers are deduced from the message content)

Parameters:

Name Type Description Default
msg dict

the message to send to nodes

required
client str

defines the channel to which the message will be sent. Defaults to None (all nodes)

None
add_sequence bool

if True, add unique sequence number to the message

False

Returns:

Type Description
Union[int, None]

If add_sequence is True return the sequence number added to the message. If add_sequence is False, return None

Source code in fedbiomed/researcher/requests.py
def send_message(self, msg: dict, client: str = None, add_sequence: bool = False) -> \
        Union[int, None]:
    """
    Ask the messaging class to send a new message (receivers are
    deduced from the message content)

    Args:
        msg: the message to send to nodes
        client: defines the channel to which the message will be sent. Defaults to None (all nodes)
        add_sequence: if `True`, add unique sequence number to the message

    Returns:
        If `add_sequence` is True return the sequence number added to the message.
            If `add_sequence` is False, return None
    """
    logger.debug(str(environ['RESEARCHER_ID']))
    sequence = None
    if add_sequence:
        sequence = self._sequence
        self._sequence += 1
        msg['sequence'] = sequence

    self.messaging.send_message(
        ResearcherMessages.request_create(msg).get_dict(), 
        client=client)
    return sequence
training_plan_approve(training_plan, description='no description provided', nodes=[], timeout=5)

Send a training plan and a ApprovalRequest message to node(s).

If a list of node id(s) is provided, the message will be individually sent to all nodes of the list. If the node id(s) list is None (default), the message is broadcast to all nodes.

Parameters:

Name Type Description Default
training_plan BaseTrainingPlan

the training plan to upload and send to the nodes for approval. It can be: - a path_name (str) - a training plan (class) - an instance of a training plan (TrainingPlan instance)

required
nodes list

list of nodes (specified by their UUID)

[]
description str

Description of training plan approval request

'no description provided'
timeout int

maximum waiting time for the answers

5

Returns:

Name Type Description
dict

a dictionary of pairs (node_id: status), where status indicates to the researcher

dict

that the training plan has been correctly downloaded on the node side.

Warning dict

status does not mean that the training plan is approved, only that it has been added

dict

to the "approval queue" on the node side.

Source code in fedbiomed/researcher/requests.py
def training_plan_approve(self,
                          training_plan: 'BaseTrainingPlan',
                          description: str = "no description provided",
                          nodes: list = [],
                          timeout: int = 5) -> dict:
    """Send a training plan and a ApprovalRequest message to node(s).

    If a list of node id(s) is provided, the message will be individually sent
    to all nodes of the list.
    If the node id(s) list is None (default), the message is broadcast to all nodes.


    Args:
        training_plan: the training plan to upload and send to the nodes for approval.
               It can be:
               - a path_name (str)
               - a training plan (class)
               - an instance of a training plan (TrainingPlan instance)
        nodes: list of nodes (specified by their UUID)
        description: Description of training plan approval request
        timeout: maximum waiting time for the answers

    Returns:
        a dictionary of pairs (node_id: status), where status indicates to the researcher
        that the training plan has been correctly downloaded on the node side.
        Warning: status does not mean that the training plan is approved, only that it has been added
        to the "approval queue" on the node side.
    """

    # first verify all arguments
    if not isinstance(nodes, list):
        logger.error("bad nodes argument, training plan not sent")
        return {}

    # verify the training plan and save it to a local file name if necessary
    if isinstance(training_plan, str):
        # training plan is provided as a file
        # TODO: verify that this file a a proper TrainingPlan
        if os.path.isfile(training_plan) and os.access(training_plan, os.R_OK):
            training_plan_file = training_plan
        else:
            logger.error(f"cannot access to the file ({training_plan})")
            return {}
    else:
        # we need a training plan instance in other cases
        if inspect.isclass(training_plan):
            # case if `training_plan` is a class
            try:
                training_plan_instance = training_plan()
                deps = training_plan_instance.init_dependencies()
                training_plan_instance.add_dependency(deps)
            except Exception as e:  # TODO: be more specific
                logger.error(f"cannot instantiate the given training plan ({e})")
                return {}
        else:
            # also handle case where training plan is already an instance of a class
            training_plan_instance = training_plan

        # then save this instance to a file
        training_plan_file = os.path.join(environ['TMP_DIR'],
                                          "training_plan_" + str(uuid.uuid4()) + ".py")

        try:
            training_plan_instance.save_code(training_plan_file)
        except Exception as e:  # TODO: be more specific
            logger.error(f"Cannot save the training plan to a file ({e})")
            logger.error(f"Are you sure that {training_plan} is a TrainingPlan ?")
            return {}

    # verify that the file can be minified before sending
    #
    # TODO: enforce a stronger check here (user story #179)
    try:
        with open(training_plan_file, "r") as f:
            content = f.read()
        minify(content,
               remove_annotations=False,
               combine_imports=False,
               remove_pass=False,
               hoist_literals=False,
               remove_object_base=True,
               rename_locals=False)
    except Exception as e:
        # minify does not provide any specific exception
        logger.error(f"This file is not a python file ({e})")
        return {}

    # create a repository instance and upload the training plan file
    repository = Repository(environ['UPLOADS_URL'],
                            environ['TMP_DIR'],
                            environ['CACHE_DIR'])

    upload_status = repository.upload_file(training_plan_file)

    logger.debug(f"training_plan_approve: upload_status = {upload_status}")

    # send message to node(s)
    message = {
        'researcher_id': environ['RESEARCHER_ID'],
        'description': str(description),
        'training_plan_url': upload_status['file'],
        'command': 'approval'}

    if nodes:
        # send message to each node
        for n in nodes:
            sequence = self.send_message(message, client=n, add_sequence=True)
    else:
        # broadcast message
        sequence = self.send_message(message, add_sequence=True)

    # wait for answers for a certain timeout
    result = {}
    for resp in self.get_responses(look_for_commands=['approval'],
                                   timeout=timeout):
        if sequence != resp['sequence']:
            logger.error("received an approval_reply with wrong sequence, ignoring it")
            continue

        n = resp['node_id']
        s = resp['success']
        result[n] = s

        if s:
            logger.info(f"node ({n}) has correctly downloaded the training plan")
        else:
            logger.info(f"node ({n}) has not correctly downloaded the training plan")

    # print info to the user regarding the result
    if not result or not any(result.values()):
        logger.info("no nodes have acknowledged correct training plan reception before the timeout")

    # eventually complete the result with expected results
    # (if the message was sent to specific nodes)
    for n in nodes:
        if n not in result:
            result[n] = False
            logger.info(f"node ({n}) has not acknowledge training plan reception before the timeout")

    # return the result
    return result