xain_fl.coordinator package

Defines constants to be used throughout the module

Submodules

xain_fl.coordinator.coordinator module

XAIN FL Coordinator

class xain_fl.coordinator.coordinator.Coordinator(store=<xain_fl.coordinator.store.NullObjectStore object>, metrics_store=<xain_fl.coordinator.metrics_store.NullObjectMetricsStore object>, num_rounds=1, minimum_participants_in_round=1, fraction_of_participants=1.0, weights=array([], dtype=float64), epochs=1, epoch_base=0, aggregator=<xain_fl.fl.coordinator.aggregate.WeightedAverageAggregator object>, controller=<xain_fl.fl.coordinator.controller.RandomController object>)

Bases: object

Class implementing the main Coordinator logic. It is implemented as a state machine that reacts to received messages.

The states of the Coordinator are:
  • STANDBY: The coordinator is in standby mode, typically when waiting for participants to connect. In this mode the only messages that the coordinator can receive are RendezvousRequest and HeartbeatRequest.

  • ROUND: A round is currently in progress. During a round the important messages the coordinator can receive are StartTrainingRoundRequest and EndTrainingRoundRequest. Since participants may or may not be selected for rounds, they can be advertised accordingly with ROUND or STANDBY respectively. Round numbers start from 0.

  • FINISHED: The training session has ended and participants should disconnect from the coordinator.

States are exchanged during heartbeats so that both coordinators and participants can react to each others state change.

The flow of the Coordinator:
  1. The coordinator is started and waits for enough participants to join. STANDBY.

  2. Once enough participants are connected the coordinator starts the rounds. ROUND.

  3. Repeat step 2. for the given number of rounds

  4. The training session is over and the coordinator is ready to shutdown. FINISHED.

Note

RendezvousRequest is always allowed regardless of which state the coordinator is on.

Parameters
  • num_rounds (int) – The number of rounds of the training session.

  • minimum_participants_in_round (int) – The minimum number of participants that participate in a round.

  • fraction_of_participants (float) – The fraction of total connected participants to be selected in a single round. Defaults to 1.0, meaning that all connected participants will be selected. It must be in the (0.0, 1.0] interval.

  • weights (ndarray) – The weights of the global model.

  • epochs (int) – Number of training iterations local to Participant.

  • epochs_base – Global number of epochs as of last round.

  • aggregator (Aggregator) – The type of aggregation to perform at the end of each round. Defaults to WeightedAverageAggregator.

  • controller (Controller) – Controls how the Participants are selected at the start of each round. Defaults to RandomController.

get_minimum_connected_participants()

Calculates how many participants are needed so that we can select a specific fraction of them.

Returns

int: Minimum number of participants needed to be connected to start a round.

Return type

obj

on_message(message, participant_id)

Coordinator method that implements the state machine.

Parameters
  • message (GeneratedProtocolMessageType) – A protobuf message.

  • participant_id (str) – The id of the participant making the request.

Return type

GeneratedProtocolMessageType

Returns

The response sent back to the participant.

Raises
remove_participant(participant_id)

Remove a participant from the list of accepted participants.

This method is to be called when it is detected that a participant has disconnected. After a participant is removed, if the number of remaining participants is less than the minimum number of participants that need to be connected, the Coordinator will transition to STANDBY state.

Parameters

participant_id (str) – The id of the participant to remove.

Return type

None

select_participant_ids_and_init_round()

Selects the participant ids and initiates a Round.

Return type

None

xain_fl.coordinator.coordinator_grpc module

XAIN FL gRPC Coordinator

class xain_fl.coordinator.coordinator_grpc.CoordinatorGrpc(coordinator)

Bases: xain_proto.fl.coordinator_pb2_grpc.CoordinatorServicer

The Coordinator gRPC service.

The main logic for the Coordinator is decoupled from gRPC and implemented in the xain_fl.coordinator.coordinator.Coordinator class. The gRPC message only handles client requests and forwards the messages to xain_fl.coordinator.coordinator.Coordinator.

Parameters

coordinator (Coordinator) – The Coordinator state machine.

EndTrainingRound(request, context)

The EndTrainingRound gRPC method.

Once a participant has finished the training for the round it calls this method in order to submit to the xain_fl.coordinator.coordinator.Coordinator the updated weights.

Parameters
  • request (EndTrainingRoundRequest) – The participant’s request. The request contains the updated weights as a result of the training as well as any metrics helpful for the xain_fl.coordinator.coordinator.Coordinator.

  • context (ServicerContext) – The context associated with the gRPC request.

Return type

EndTrainingRoundResponse

Returns

The response to the participant’s request. The response is just an acknowledgment that the xain_fl.coordinator.coordinator.Coordinator successfully received the updated weights.

Heartbeat(request, context)

The Heartbeat gRPC method.

Participants periodically send an heartbeat so that the Coordinator can detect failures.

Parameters
  • request (HeartbeatRequest) – The participant’s request. The participant’s request contains the current State and round number the participant is on.

  • context (ServicerContext) – The context associated with the gRPC request.

Return type

HeartbeatResponse

Returns

The response to the participant’s request. The response contains both the State and the current round the coordinator is on. If a training session has not started yet the round number defaults to 0.

Rendezvous(request, context)

The Rendezvous gRPC method.

A participant contacts the coordinator and the coordinator adds the participant to its list of participants. If the coordinator already has all the participants it tells the participant to try again later.

Parameters
  • request (RendezvousRequest) – The participant’s request.

  • context (ServicerContext) – The context associated with the gRPC request.

Return type

RendezvousResponse

Returns

The response to the participant’s request. The response is an enum containing either:

StartTrainingRound(request, context)

The StartTrainingRound gRPC method.

Once a participant is notified that the xain_fl.coordinator.coordinator.Coordinator is in a round (through the state advertised in the HeartbeatResponse), the participant should call this method in order to get the global model weights in order to start the training for the round.

Parameters
  • request (StartTrainingRoundRequest) – The participant’s request.

  • context (ServicerContext) – The context associated with the gRPC request.

Return type

StartTrainingRoundResponse

Returns

The response to the participant’s request. The response contains the global model weights.

xain_fl.coordinator.heartbeat module

XAIN FL Hearbeats

xain_fl.coordinator.heartbeat.monitor_heartbeats(coordinator, terminate_event)

Monitors the heartbeat of participants.

If a heartbeat expires the participant is removed from the Participants.

Note

This is meant to be run inside a thread and expects an Event, to know when it should terminate.

Parameters
  • coordinator (Coordinator) – The coordinator to monitor for heartbeats.

  • terminate_event (Event) – A threading event to signal that this method should terminate.

Return type

None

xain_fl.coordinator.metrics_store module

XAIN FL Metric Store

class xain_fl.coordinator.metrics_store.AbstractMetricsStore

Bases: abc.ABC

An abstract metric store.

abstract write_metrics(participant_id, metrics)

Write the participant metrics on behalf of the participant with the given participant_id into a metric store.

Parameters
  • participant_id (str) – The ID of the participant.

  • metrics (Dict[str, ndarray]) – The metrics of the participant with the given participant_id.

Return type

None

Returns

True, on success, otherwise False.

class xain_fl.coordinator.metrics_store.MetricsStore(config)

Bases: xain_fl.coordinator.metrics_store.AbstractMetricsStore

A metric store that uses InfluxDB to store the metrics.

write_metrics(participant_id, metrics)

Write the participant metrics on behalf of the participant with the given participant_id into InfluxDB.

Parameters
  • participant_id (str) – The ID of the participant.

  • metrics (Dict[str, ndarray]) – The metrics of the participant with the given participant_id.

Return type

None

Returns

True, on success, otherwise False.

Raises

MetricsStoreError – If the writing of the metrics to InfluxDB failed.

exception xain_fl.coordinator.metrics_store.MetricsStoreError

Bases: Exception

Raised when the writing of the metrics failed.

class xain_fl.coordinator.metrics_store.NullObjectMetricsStore

Bases: xain_fl.coordinator.metrics_store.AbstractMetricsStore

A metric store that does nothing.

write_metrics(participant_id, metrics)

A method that has no effect.

Parameters
  • participant_id (str) – The ID of the participant.

  • metrics (Dict[str, ndarray]) – The metrics of the participant with the given participant_id.

Return type

None

Returns

Always True.

xain_fl.coordinator.metrics_store.transform_metrics_to_influx_data_points(participant_id, metrics)

Transform the metrics of a participant into InfluxDB data points.

Parameters
  • participant_id (str) – The ID of the participant.

  • metrics (Dict[str, ndarray]) – The metrics of the participant with the given participant_id.

Return type

List[dict]

Returns

The metrics of the participant as InfluxDB data points.

xain_fl.coordinator.participants module

XAIN FL Participants

class xain_fl.coordinator.participants.ParticipantContext(participant_id)

Bases: object

Class to store state about each participant. Currently it only stores the participant_id and the time when the next heartbeat_expires.

In the future we may store more information like in what state a participant is in e.g. IDLE, RUNNING, …

Parameters

participant_id (str) – The id of the participant. Typically a host:port or public key when using SSL.

class xain_fl.coordinator.participants.Participants

Bases: object

This class provides some useful methods to handle all the participants connected to a coordinator in a thread safe manner by protecting access to the participants list with a lock.

add(participant_id)

Adds a new participant to the list of participants.

Parameters

participant_id (str) – The id of the participant to add.

Return type

None

ids()

Get the ids of the participants.

Return type

List[str]

Returns

The list of participant ids.

len()

Get the number of participants.

Return type

int

Returns

The number of participants in the list.

next_expiration()

Helper method to check what is the next heartbeat to expire.

Currently being used by the heartbeat_monitor to check how long it should sleep until the next check.

Return type

float

Returns

The next heartbeat to expire.

remove(participant_id)

Removes a participant from the list of participants.

This will be typically used after a participant is disconnected from the coordinator.

Parameters

participant_id (str) – The id of the participant to remove.

Return type

None

update_expires(participant_id)

Updates the heartbeat expiration time for a participant.

This is currently called by the xain_fl.coordinator.coordinator.Coordinator every time a participant sends a heartbeat.

Parameters

participant_id (str) – The id of the participant to update the expire time.

Return type

None

xain_fl.coordinator.round module

XAIN FL Rounds

class xain_fl.coordinator.round.Round(participant_ids)

Bases: object

Class to manage the state of a single round. This class contains the logic to handle all updates sent by the participants during a round and does some sanity checks like preventing the same participant to submit multiple updates during a single round.

Parameters

participant_ids (List[str]) – The list of IDs of the participants selected to participate in this round.

add_updates(participant_id, model_weights, aggregation_data, metrics)

Valid a participant’s update for the round.

Parameters
  • participant_id (str) – The id of the participant making the request.

  • model_weights (ndarray) – The updated model weights.

  • aggregation_data (int) – Meta data for aggregation.

  • metrics (Dict[str, ndarray]) – A dictionary containing metrics with the name and the value as list of ndarrays.

Raises

DuplicatedUpdateError – If the participant already submitted his update this round.

Return type

None

get_weight_updates()

Get a list of all participants weight updates. This list will usually be used by the aggregation function.

Return type

Tuple[List[ndarray], List[int]]

Returns

The lists of model weights and aggregation meta data from all participants.

is_finished()

Check if all the required participants submitted their updates this round. If all participants submitted their updates the round is considered finished.

Return type

bool

Returns

True if all participants submitted their updates this round. False otherwise.

xain_fl.coordinator.store module

This module provides classes for weights storage. It currently only works with services that provide the AWS S3 APIs.

class xain_fl.coordinator.store.AbstractStore

Bases: abc.ABC

An abstract class that defines the API a store must implement.

abstract read_weights(participant_id, round)

Read the weights computed by the given participant for the given round.

Parameters
  • participant_id (str) – ID of the participant’s weights.

  • round (int) – A round number the weights correspond to.

Return type

ndarray

abstract write_weights(round, weights)

Store the given weights, corresponding to the given round.

Parameters
  • round (int) – A round number the weights correspond to.

  • weights (ndarray) – The weights to store.

Return type

None

class xain_fl.coordinator.store.NullObjectStore

Bases: xain_fl.coordinator.store.AbstractStore

A store that does nothing.

read_weights(participant_id, round)

A dummy method that has no effect.

Parameters
  • participant_id (str) – ID of the participant’s weights. Not used.

  • round (int) – A round number the weights correspond to. Not used.

Return type

ndarray

write_weights(round, weights)

A dummy method that has no effect.

Parameters
  • round (int) – A round number the weights correspond to. Not used.

  • weights (ndarray) – The weights to store. Not used.

Return type

None

class xain_fl.coordinator.store.S3Store(config)

Bases: xain_fl.coordinator.store.AbstractStore

A store for services that offer the AWS S3 API.

Parameters

config (StorageConfig) – the storage configuration (endpoint URL, credentials, etc.)

read_weights(participant_id, round)

Download the weights computed by the given participant for the given round, from an S3 bucket.

Parameters
  • participant_id (str) – ID of the participant’s weights. Not used.

  • round (int) – A round number the weights correspond to.

Return type

ndarray

Returns

The weights read from the S3 bucket.

write_weights(round, weights)

Store the given weights, corresponding to the given round.

Parameters
  • round (int) – A round number the weights correspond to.

  • weights (ndarray) – The weights to store.