xain_fl.coordinator package

Defines constants to be used throughout the module

Submodules

xain_fl.coordinator.coordinator module

XAIN FL Coordinator

class xain_fl.coordinator.coordinator.Coordinator(global_weights_writer, local_weights_reader, metrics_store=<xain_fl.coordinator.metrics_store.NullObjectMetricsStore object>, num_rounds=1, minimum_participants_in_round=1, fraction_of_participants=1.0, weights=array([], dtype=float64), epochs=1, epoch_base=0, aggregator=<xain_fl.fl.coordinator.aggregate.WeightedAverageAggregator object>, controller=<xain_fl.fl.coordinator.controller.RandomController object>, participants=None)

Bases: object

Class implementing the main Coordinator logic. It is implemented as a state machine that reacts to received messages.

The states of the Coordinator are:
  • STANDBY: The coordinator is in standby mode, typically when waiting for participants to connect. In this mode the only messages that the coordinator can receive are RendezvousRequest and HeartbeatRequest.

  • ROUND: A round is currently in progress. During a round the important messages the coordinator can receive are StartTrainingRoundRequest and EndTrainingRoundRequest. Since participants may or may not be selected for rounds, they can be advertised accordingly with ROUND or STANDBY respectively. Round numbers start from 0.

  • FINISHED: The training session has ended and participants should disconnect from the coordinator.

States are exchanged during heartbeats so that both coordinators and participants can react to each others state change.

The flow of the Coordinator:
  1. The coordinator is started and waits for enough participants to join. STANDBY.

  2. Once enough participants are connected the coordinator starts the rounds. ROUND.

  3. Repeat step 2. for the given number of rounds

  4. The training session is over and the coordinator is ready to shutdown. FINISHED.

Note

RendezvousRequest is always allowed regardless of which state the coordinator is on.

Parameters
  • global_weights_writer (AbstractGlobalWeightsWriter) – service for storing global weights

  • local_weights_reader (AbstractLocalWeightsReader) – service for retrieving the local weights

  • num_rounds (int) – The number of rounds of the training session.

  • minimum_participants_in_round (int) – The minimum number of participants that participate in a round.

  • fraction_of_participants (float) – The fraction of total connected participants to be selected in a single round. Defaults to 1.0, meaning that all connected participants will be selected. It must be in the (0.0, 1.0] interval.

  • weights (ndarray) – The weights of the global model.

  • epochs (int) – Number of training iterations local to Participant.

  • epochs_base – The global epoch number for the start of the next training round.

  • aggregator (Aggregator) – The type of aggregation to perform at the end of each round. Defaults to WeightedAverageAggregator.

  • controller (Controller) – Controls how the Participants are selected at the start of each round. Defaults to RandomController.

get_minimum_connected_participants()

Calculates how many participants are needed so that we can select a specific fraction of them.

Returns

int: Minimum number of participants needed to be connected to start a round.

Return type

obj

on_message(message, participant_id)

Coordinator method that implements the state machine.

Parameters
  • message (GeneratedProtocolMessageType) – A protobuf message.

  • participant_id (str) – The id of the participant making the request.

Return type

GeneratedProtocolMessageType

Returns

The response sent back to the participant.

Raises
remove_participant(participant_id)

Remove a participant from the list of accepted participants.

This method is to be called when it is detected that a participant has disconnected. After a participant is removed, if the number of remaining participants is less than the minimum number of participants that need to be connected, the Coordinator will transition to STANDBY state.

Parameters

participant_id (str) – The id of the participant to remove.

Return type

None

select_outstanding()

Selects participants outstanding for the round.

Return type

List[str]

select_participant_ids_and_init_round()

Selects the participant ids and initiates a Round.

Return type

None

xain_fl.coordinator.coordinator.pb_enum_to_str(pb_enum, member_value)

Return the human readable string of a enum member value.

Parameters
  • pb_enum (EnumDescriptor) – The proto enum definition.

  • member_value (int) – The enum member value.

Return type

str

Returns

The human readable string of a enum member value.

xain_fl.coordinator.coordinator_grpc module

XAIN FL gRPC Coordinator

class xain_fl.coordinator.coordinator_grpc.CoordinatorGrpc(coordinator)

Bases: xain_proto.fl.coordinator_pb2_grpc.CoordinatorServicer

The Coordinator gRPC service.

The main logic for the Coordinator is decoupled from gRPC and implemented in the xain_fl.coordinator.coordinator.Coordinator class. The gRPC message only handles client requests and forwards the messages to xain_fl.coordinator.coordinator.Coordinator.

Parameters

coordinator (Coordinator) – The Coordinator state machine.

EndTrainingRound(request, context)

The EndTrainingRound gRPC method.

Once a participant has finished the training for the round it calls this method in order to submit to the xain_fl.coordinator.coordinator.Coordinator the updated weights.

Parameters
  • request (EndTrainingRoundRequest) – The participant’s request. The request contains the updated weights as a result of the training as well as any metrics helpful for the xain_fl.coordinator.coordinator.Coordinator.

  • context (ServicerContext) – The context associated with the gRPC request.

Return type

EndTrainingRoundResponse

Returns

The response to the participant’s request. The response is just an acknowledgment that the xain_fl.coordinator.coordinator.Coordinator successfully received the updated weights.

Heartbeat(request, context)

The Heartbeat gRPC method.

Participants periodically send an heartbeat so that the Coordinator can detect failures.

Parameters
  • request (HeartbeatRequest) – The participant’s request. The participant’s request contains the current State and round number the participant is on.

  • context (ServicerContext) – The context associated with the gRPC request.

Return type

HeartbeatResponse

Returns

The response to the participant’s request. The response contains both the State and the current round the coordinator is on. If a training session has not started yet the round number defaults to 0.

Rendezvous(request, context)

The Rendezvous gRPC method.

A participant contacts the coordinator and the coordinator adds the participant to its list of participants. If the coordinator already has all the participants it tells the participant to try again later.

Parameters
  • request (RendezvousRequest) – The participant’s request.

  • context (ServicerContext) – The context associated with the gRPC request.

Return type

RendezvousResponse

Returns

The response to the participant’s request. The response is an enum containing either:

StartTrainingRound(request, context)

The StartTrainingRound gRPC method.

Once a participant is notified that the xain_fl.coordinator.coordinator.Coordinator is in a round (through the state advertised in the HeartbeatResponse), the participant should call this method in order to get the global model weights in order to start the training for the round.

Parameters
  • request (StartTrainingRoundRequest) – The participant’s request.

  • context (ServicerContext) – The context associated with the gRPC request.

Return type

StartTrainingRoundResponse

Returns

The response to the participant’s request. The response contains the global model weights.

xain_fl.coordinator.heartbeat module

XAIN FL Hearbeats

xain_fl.coordinator.heartbeat.monitor_heartbeats(coordinator, terminate_event)

Monitors the heartbeat of participants.

If a heartbeat expires the participant is removed from the Participants.

Note

This is meant to be run inside a thread and expects an Event, to know when it should terminate.

Parameters
  • coordinator (Coordinator) – The coordinator to monitor for heartbeats.

  • terminate_event (Event) – A threading event to signal that this method should terminate.

Return type

None

xain_fl.coordinator.metrics_store module

XAIN FL Metric Store

class xain_fl.coordinator.metrics_store.AbstractMetricsStore

Bases: abc.ABC

An abstract metric store.

abstract write_metrics(owner, metrics, tags=None)

Write metrics into a metric store.

Parameters
  • owner (str) – The name of the owner of the metrics e.g. coordinator or participant.

  • metrics (Dict[str, Union[str, int, float]]) – A dictionary with the metric names as keys and the metric values as values.

  • tags (Optional[Dict[str, str]]) – A dictionary to append optional metadata to the metric. Defaults to None.

Return type

None

abstract write_received_participant_metrics(metrics_as_json)

Write the participant metrics on behalf of the participant into a metric store.

Parameters

metrics_as_json (str) – The metrics of a specific participant.

Raises

MetricsStoreError – If the writing of the metrics has failed.

Return type

None

class xain_fl.coordinator.metrics_store.MetricsStore(config)

Bases: xain_fl.coordinator.metrics_store.AbstractMetricsStore

A metric store that uses InfluxDB to store the metrics.

write_metrics(owner, metrics, tags=None)

Write the metrics to InfluxDB that are collected on the coordinator site.

Parameters
  • owner (str) – The name of the owner of the metrics e.g. coordinator or participant.

  • metrics (Dict[str, Union[str, int, float]]) – A dictionary with the metric names as keys and the metric values as values.

  • tags (Optional[Dict[str, str]]) – A dictionary to append optional metadata to the metric. Defaults to None.

Raises

MetricsStoreError – If the writing of the metrics to InfluxDB has failed.

Return type

None

write_received_participant_metrics(metrics_as_json)

Write the participant metrics on behalf of the participant into InfluxDB.

Parameters

metrics_as_json (str) – The metrics of a specific participant.

Raises

MetricsStoreError – If the writing of the metrics to InfluxDB has failed.

Return type

None

exception xain_fl.coordinator.metrics_store.MetricsStoreError

Bases: Exception

Raised when the writing of the metrics has failed.

class xain_fl.coordinator.metrics_store.NullObjectMetricsStore

Bases: xain_fl.coordinator.metrics_store.AbstractMetricsStore

A metric store that does nothing.

write_metrics(owner, metrics, tags=None)

A method that has no effect.

Parameters
  • owner (str) – The name of the owner of the metrics e.g. coordinator or participant.

  • metrics (Dict[str, Union[str, int, float]]) – A dictionary with the metric names as keys and the metric values as values.

  • tags (Optional[Dict[str, str]]) – A dictionary to append optional metadata to the metric. Defaults to None.

Return type

None

write_received_participant_metrics(metrics_as_json)

A method that has no effect.

Parameters

metrics_as_json (str) – The metrics of a specific participant.

Return type

None

xain_fl.coordinator.participants module

XAIN FL Participants

class xain_fl.coordinator.participants.ParticipantContext(participant_id, heartbeat_time, heartbeat_timeout)

Bases: object

Class to store state about each participant. Currently it only stores the participant_id and the time when the next heartbeat_expires.

In the future we may store more information like in what state a participant is in e.g. IDLE, RUNNING, …

Parameters

participant_id (str) – The id of the participant. Typically a host:port or public key when using SSL.

class xain_fl.coordinator.participants.Participants(heartbeat_time=10, heartbeat_timeout=5)

Bases: object

This class provides some useful methods to handle all the participants connected to a coordinator in a thread safe manner by protecting access to the participants list with a lock.

add(participant_id)

Adds a new participant to the list of participants.

Parameters

participant_id (str) – The id of the participant to add.

Return type

None

ids()

Get the ids of the participants.

Return type

List[str]

Returns

The list of participant ids.

len()

Get the number of participants.

Return type

int

Returns

The number of participants in the list.

next_expiration()

Helper method to check what is the next heartbeat to expire.

Currently being used by the heartbeat_monitor to check how long it should sleep until the next check.

Return type

float

Returns

The next heartbeat to expire.

remove(participant_id)

Removes a participant from the list of participants.

This will be typically used after a participant is disconnected from the coordinator.

Parameters

participant_id (str) – The id of the participant to remove.

Return type

None

update_expires(participant_id)

Updates the heartbeat expiration time for a participant.

This is currently called by the xain_fl.coordinator.coordinator.Coordinator every time a participant sends a heartbeat.

Parameters

participant_id (str) – The id of the participant to update the expire time.

Return type

None

xain_fl.coordinator.round module

XAIN FL Rounds

class xain_fl.coordinator.round.Round(participant_ids)

Bases: object

Class to manage the state of a single round. This class contains the logic to handle all updates sent by the participants during a round in a thread-safe manner and does some sanity checks like preventing the same participant from submitting multiple updates within a single round.

Parameters
  • participant_ids (List[str]) – The list of IDs of the participants selected to participate in this round.

  • updates – Dictionary of training updates indexed by participant ID.

add_selected(more_ids)

Add to the collection of selected participants.

Parameters

more_ids (List[str]) – ids of participants to add.

Return type

None

add_updates(participant_id, model_weights, aggregation_data)

Add a participant’s update for the round.

Parameters
  • participant_id (str) – The id of the participant making the request.

  • model_weights (ndarray) – The updated model weights.

  • aggregation_data (int) – Meta data for aggregation.

Raises

DuplicatedUpdateError – If the participant already submitted his update this round.

Return type

None

get_weight_updates()

Get a list of all participants weight updates. This list will usually be used by the aggregation function.

Return type

Tuple[List[ndarray], List[int]]

Returns

The lists of model weights and aggregation meta data from all participants.

is_finished()

Check if all the required participants submitted their updates this round. If all participants submitted their updates the round is considered finished.

Return type

bool

Returns

True if all participants submitted their updates this round. False otherwise.

remove_selected(participant_id)

Remove from the collection of selected participants.

Parameters

participant_id (str) – id of participant to remove.

Return type

None

xain_fl.coordinator.store module

This module provides classes for weights storage. It currently only works with services that provide the AWS S3 APIs.

class xain_fl.coordinator.store.AbstractGlobalWeightsWriter

Bases: abc.ABC

An abstract class that defines the API for storing the aggregated weights the coordinator computes.

abstract write_weights(round, weights)

Store the given weights, corresponding to the given round.

Parameters
  • round (int) – A round number the weights correspond to.

  • weights (ndarray) – The weights to store.

Return type

None

class xain_fl.coordinator.store.AbstractLocalWeightsReader

Bases: abc.ABC

An abstract class that defines the API for retrieving the weights participants upload after their training round.

abstract read_weights(participant_id, round)

Retrieve the weights computed by the given participant for the given round.

Parameters
  • participant_id (str) – ID of the participant’s weights.

  • round (int) – A round number the weights correspond to.

Return type

ndarray

class xain_fl.coordinator.store.S3BaseClass(config)

Bases: object

A base class for implementating AWS S3 clients.

Parameters

config (StorageConfig) – the storage configuration (endpoint URL, credentials, etc.)

class xain_fl.coordinator.store.S3GlobalWeightsWriter(config)

Bases: xain_fl.coordinator.store.AbstractGlobalWeightsWriter, xain_fl.coordinator.store.S3BaseClass

AbstractGlobalWeightsWriter implementor for AWS S3 storage backend.

write_weights(round, weights)

Store the given weights, corresponding to the given round.

Parameters
  • round (int) – A round number the weights correspond to.

  • weights (ndarray) – The weights to store.

Return type

None

class xain_fl.coordinator.store.S3LocalWeightsReader(config)

Bases: xain_fl.coordinator.store.AbstractLocalWeightsReader, xain_fl.coordinator.store.S3BaseClass

AbstractLocalWeightsReader implementor for AWS S3 storage backend.

read_weights(participant_id, round)

Download the weights computed by the given participant for the given round, from an S3 bucket.

Parameters
  • participant_id (str) – ID of the participant’s weights

  • round (int) – round number the weights correspond to

Return type

ndarray

Returns

The weights read from the S3 bucket.