xain_fl.coordinator package¶
Defines constants to be used throughout the module
Submodules¶
xain_fl.coordinator.coordinator module¶
XAIN FL Coordinator
-
class
xain_fl.coordinator.coordinator.
Coordinator
(global_weights_writer, local_weights_reader, metrics_store=<xain_fl.coordinator.metrics_store.NullObjectMetricsStore object>, num_rounds=1, minimum_participants_in_round=1, fraction_of_participants=1.0, weights=array([], dtype=float64), epochs=1, epoch_base=0, aggregator=<xain_fl.fl.coordinator.aggregate.WeightedAverageAggregator object>, controller=<xain_fl.fl.coordinator.controller.RandomController object>, participants=None)¶ Bases:
object
Class implementing the main Coordinator logic. It is implemented as a state machine that reacts to received messages.
- The states of the Coordinator are:
STANDBY
: The coordinator is in standby mode, typically when waiting for participants to connect. In this mode the only messages that the coordinator can receive areRendezvousRequest
andHeartbeatRequest
.ROUND
: A round is currently in progress. During a round the important messages the coordinator can receive areStartTrainingRoundRequest
andEndTrainingRoundRequest
. Since participants may or may not be selected for rounds, they can be advertised accordingly with ROUND or STANDBY respectively. Round numbers start from 0.FINISHED
: The training session has ended and participants should disconnect from the coordinator.
States are exchanged during heartbeats so that both coordinators and participants can react to each others state change.
- The flow of the Coordinator:
The coordinator is started and waits for enough participants to join. STANDBY.
Once enough participants are connected the coordinator starts the rounds. ROUND.
Repeat step 2. for the given number of rounds
The training session is over and the coordinator is ready to shutdown. FINISHED.
Note
RendezvousRequest
is always allowed regardless of which state the coordinator is on.- Parameters
global_weights_writer (
AbstractGlobalWeightsWriter
) – service for storing global weightslocal_weights_reader (
AbstractLocalWeightsReader
) – service for retrieving the local weightsnum_rounds (
int
) – The number of rounds of the training session.minimum_participants_in_round (
int
) – The minimum number of participants that participate in a round.fraction_of_participants (
float
) – The fraction of total connected participants to be selected in a single round. Defaults to 1.0, meaning that all connected participants will be selected. It must be in the (0.0, 1.0] interval.weights (
ndarray
) – The weights of the global model.epochs (
int
) – Number of training iterations local to Participant.epochs_base – The global epoch number for the start of the next training round.
aggregator (
Aggregator
) – The type of aggregation to perform at the end of each round. Defaults toWeightedAverageAggregator
.controller (
Controller
) – Controls how the Participants are selected at the start of each round. Defaults toRandomController
.
-
get_minimum_connected_participants
()¶ Calculates how many participants are needed so that we can select a specific fraction of them.
- Returns
int: Minimum number of participants needed to be connected to start a round.
- Return type
obj
-
on_message
(message, participant_id)¶ Coordinator method that implements the state machine.
- Parameters
message (
GeneratedProtocolMessageType
) – A protobuf message.participant_id (
str
) – The id of the participant making the request.
- Return type
GeneratedProtocolMessageType
- Returns
The response sent back to the participant.
- Raises
UnknownParticipantError – If it receives a request from an unknown participant. Typically a participant that has not rendezvous with the
Coordinator
.InvalidRequestError – If it receives a request that is not allowed in the current
Coordinator
state.
-
remove_participant
(participant_id)¶ Remove a participant from the list of accepted participants.
This method is to be called when it is detected that a participant has disconnected. After a participant is removed, if the number of remaining participants is less than the minimum number of participants that need to be connected, the
Coordinator
will transition to STANDBY state.- Parameters
participant_id (
str
) – The id of the participant to remove.- Return type
None
-
select_participant_ids_and_init_round
()¶ Selects the participant ids and initiates a Round.
- Return type
None
-
xain_fl.coordinator.coordinator.
pb_enum_to_str
(pb_enum, member_value)¶ Return the human readable string of a enum member value.
xain_fl.coordinator.coordinator_grpc module¶
XAIN FL gRPC Coordinator
-
class
xain_fl.coordinator.coordinator_grpc.
CoordinatorGrpc
(coordinator)¶ Bases:
xain_proto.fl.coordinator_pb2_grpc.CoordinatorServicer
The Coordinator gRPC service.
The main logic for the Coordinator is decoupled from gRPC and implemented in the
xain_fl.coordinator.coordinator.Coordinator
class. The gRPC message only handles client requests and forwards the messages toxain_fl.coordinator.coordinator.Coordinator
.- Parameters
coordinator (
Coordinator
) – The Coordinator state machine.
-
EndTrainingRound
(request, context)¶ The EndTrainingRound gRPC method.
Once a participant has finished the training for the round it calls this method in order to submit to the
xain_fl.coordinator.coordinator.Coordinator
the updated weights.- Parameters
request (
EndTrainingRoundRequest
) – The participant’s request. The request contains the updated weights as a result of the training as well as any metrics helpful for thexain_fl.coordinator.coordinator.Coordinator
.context (
ServicerContext
) – The context associated with the gRPC request.
- Return type
EndTrainingRoundResponse
- Returns
The response to the participant’s request. The response is just an acknowledgment that the
xain_fl.coordinator.coordinator.Coordinator
successfully received the updated weights.
-
Heartbeat
(request, context)¶ The Heartbeat gRPC method.
Participants periodically send an heartbeat so that the
Coordinator
can detect failures.- Parameters
request (
HeartbeatRequest
) – The participant’s request. The participant’s request contains the currentState
and round number the participant is on.context (
ServicerContext
) – The context associated with the gRPC request.
- Return type
HeartbeatResponse
- Returns
The response to the participant’s request. The response contains both the
State
and the current round the coordinator is on. If a training session has not started yet the round number defaults to 0.
-
Rendezvous
(request, context)¶ The Rendezvous gRPC method.
A participant contacts the coordinator and the coordinator adds the participant to its list of participants. If the coordinator already has all the participants it tells the participant to try again later.
- Parameters
request (
RendezvousRequest
) – The participant’s request.context (
ServicerContext
) – The context associated with the gRPC request.
- Return type
RendezvousResponse
- Returns
The response to the participant’s request. The response is an enum containing either:
ACCEPT: If the
xain_fl.coordinator.coordinator.Coordinator
does not have enough participants.- LATER: If the
xain_fl.coordinator.coordinator.Coordinator
already has enough participants.
-
StartTrainingRound
(request, context)¶ The StartTrainingRound gRPC method.
Once a participant is notified that the
xain_fl.coordinator.coordinator.Coordinator
is in a round (through the state advertised in theHeartbeatResponse
), the participant should call this method in order to get the global model weights in order to start the training for the round.- Parameters
request (
StartTrainingRoundRequest
) – The participant’s request.context (
ServicerContext
) – The context associated with the gRPC request.
- Return type
StartTrainingRoundResponse
- Returns
The response to the participant’s request. The response contains the global model weights.
xain_fl.coordinator.heartbeat module¶
XAIN FL Hearbeats
-
xain_fl.coordinator.heartbeat.
monitor_heartbeats
(coordinator, terminate_event)¶ Monitors the heartbeat of participants.
If a heartbeat expires the participant is removed from the
Participants
.Note
This is meant to be run inside a thread and expects an
Event
, to know when it should terminate.- Parameters
coordinator (
Coordinator
) – The coordinator to monitor for heartbeats.terminate_event (
Event
) – A threading event to signal that this method should terminate.
- Return type
None
xain_fl.coordinator.metrics_store module¶
XAIN FL Metric Store
-
class
xain_fl.coordinator.metrics_store.
AbstractMetricsStore
¶ Bases:
abc.ABC
An abstract metric store.
-
abstract
write_metrics
(owner, metrics, tags=None)¶ Write metrics into a metric store.
- Parameters
owner (
str
) – The name of the owner of the metrics e.g. coordinator or participant.metrics (
Dict
[str
,Union
[str
,int
,float
]]) – A dictionary with the metric names as keys and the metric values as values.tags (
Optional
[Dict
[str
,str
]]) – A dictionary to append optional metadata to the metric. Defaults to None.
- Return type
None
-
abstract
write_received_participant_metrics
(metrics_as_json)¶ Write the participant metrics on behalf of the participant into a metric store.
- Parameters
metrics_as_json (
str
) – The metrics of a specific participant.- Raises
MetricsStoreError – If the writing of the metrics has failed.
- Return type
None
-
abstract
-
class
xain_fl.coordinator.metrics_store.
MetricsStore
(config)¶ Bases:
xain_fl.coordinator.metrics_store.AbstractMetricsStore
A metric store that uses InfluxDB to store the metrics.
-
write_metrics
(owner, metrics, tags=None)¶ Write the metrics to InfluxDB that are collected on the coordinator site.
- Parameters
owner (
str
) – The name of the owner of the metrics e.g. coordinator or participant.metrics (
Dict
[str
,Union
[str
,int
,float
]]) – A dictionary with the metric names as keys and the metric values as values.tags (
Optional
[Dict
[str
,str
]]) – A dictionary to append optional metadata to the metric. Defaults to None.
- Raises
MetricsStoreError – If the writing of the metrics to InfluxDB has failed.
- Return type
None
-
write_received_participant_metrics
(metrics_as_json)¶ Write the participant metrics on behalf of the participant into InfluxDB.
- Parameters
metrics_as_json (
str
) – The metrics of a specific participant.- Raises
MetricsStoreError – If the writing of the metrics to InfluxDB has failed.
- Return type
None
-
-
exception
xain_fl.coordinator.metrics_store.
MetricsStoreError
¶ Bases:
Exception
Raised when the writing of the metrics has failed.
-
class
xain_fl.coordinator.metrics_store.
NullObjectMetricsStore
¶ Bases:
xain_fl.coordinator.metrics_store.AbstractMetricsStore
A metric store that does nothing.
-
write_metrics
(owner, metrics, tags=None)¶ A method that has no effect.
- Parameters
owner (
str
) – The name of the owner of the metrics e.g. coordinator or participant.metrics (
Dict
[str
,Union
[str
,int
,float
]]) – A dictionary with the metric names as keys and the metric values as values.tags (
Optional
[Dict
[str
,str
]]) – A dictionary to append optional metadata to the metric. Defaults to None.
- Return type
None
-
xain_fl.coordinator.participants module¶
XAIN FL Participants
-
class
xain_fl.coordinator.participants.
ParticipantContext
(participant_id, heartbeat_time, heartbeat_timeout)¶ Bases:
object
Class to store state about each participant. Currently it only stores the participant_id and the time when the next heartbeat_expires.
In the future we may store more information like in what state a participant is in e.g. IDLE, RUNNING, …
- Parameters
participant_id (
str
) – The id of the participant. Typically a host:port or public key when using SSL.
-
class
xain_fl.coordinator.participants.
Participants
(heartbeat_time=10, heartbeat_timeout=5)¶ Bases:
object
This class provides some useful methods to handle all the participants connected to a coordinator in a thread safe manner by protecting access to the participants list with a lock.
-
add
(participant_id)¶ Adds a new participant to the list of participants.
- Parameters
participant_id (
str
) – The id of the participant to add.- Return type
None
-
len
()¶ Get the number of participants.
- Return type
- Returns
The number of participants in the list.
-
next_expiration
()¶ Helper method to check what is the next heartbeat to expire.
Currently being used by the heartbeat_monitor to check how long it should sleep until the next check.
- Return type
- Returns
The next heartbeat to expire.
-
remove
(participant_id)¶ Removes a participant from the list of participants.
This will be typically used after a participant is disconnected from the coordinator.
- Parameters
participant_id (
str
) – The id of the participant to remove.- Return type
None
-
update_expires
(participant_id)¶ Updates the heartbeat expiration time for a participant.
This is currently called by the
xain_fl.coordinator.coordinator.Coordinator
every time a participant sends a heartbeat.- Parameters
participant_id (
str
) – The id of the participant to update the expire time.- Return type
None
-
xain_fl.coordinator.round module¶
XAIN FL Rounds
-
class
xain_fl.coordinator.round.
Round
(participant_ids)¶ Bases:
object
Class to manage the state of a single round. This class contains the logic to handle all updates sent by the participants during a round in a thread-safe manner and does some sanity checks like preventing the same participant from submitting multiple updates within a single round.
- Parameters
-
add_selected
(more_ids)¶ Add to the collection of selected participants.
-
add_updates
(participant_id, model_weights, aggregation_data)¶ Add a participant’s update for the round.
- Parameters
- Raises
DuplicatedUpdateError – If the participant already submitted his update this round.
- Return type
None
-
get_weight_updates
()¶ Get a list of all participants weight updates. This list will usually be used by the aggregation function.
-
is_finished
()¶ Check if all the required participants submitted their updates this round. If all participants submitted their updates the round is considered finished.
- Return type
- Returns
True if all participants submitted their updates this round. False otherwise.
xain_fl.coordinator.store module¶
This module provides classes for weights storage. It currently only works with services that provide the AWS S3 APIs.
-
class
xain_fl.coordinator.store.
AbstractGlobalWeightsWriter
¶ Bases:
abc.ABC
An abstract class that defines the API for storing the aggregated weights the coordinator computes.
-
class
xain_fl.coordinator.store.
AbstractLocalWeightsReader
¶ Bases:
abc.ABC
An abstract class that defines the API for retrieving the weights participants upload after their training round.
-
class
xain_fl.coordinator.store.
S3BaseClass
(config)¶ Bases:
object
A base class for implementating AWS S3 clients.
- Parameters
config (
StorageConfig
) – the storage configuration (endpoint URL, credentials, etc.)
-
class
xain_fl.coordinator.store.
S3GlobalWeightsWriter
(config)¶ Bases:
xain_fl.coordinator.store.AbstractGlobalWeightsWriter
,xain_fl.coordinator.store.S3BaseClass
AbstractGlobalWeightsWriter
implementor for AWS S3 storage backend.
-
class
xain_fl.coordinator.store.
S3LocalWeightsReader
(config)¶ Bases:
xain_fl.coordinator.store.AbstractLocalWeightsReader
,xain_fl.coordinator.store.S3BaseClass
AbstractLocalWeightsReader
implementor for AWS S3 storage backend.-
read_weights
(participant_id, round)¶ Download the weights computed by the given participant for the given round, from an S3 bucket.
-