xain_fl.coordinator package¶
Defines constants to be used throughout the module
Submodules¶
xain_fl.coordinator.coordinator module¶
XAIN FL Coordinator
-
class
xain_fl.coordinator.coordinator.
Coordinator
(num_rounds=1, minimum_participants_in_round=1, fraction_of_participants=1.0, weights=[], epochs=1, epoch_base=0, aggregator=None, controller=None)¶ Bases:
object
Class implementing the main Coordinator logic. It is implemented as a state machine that reacts to received messages.
The states of the Coordinator are:
STANDBY
: The coordinator is in standby mode, typically when waiting for participants to connect. In this mode the only messages that the coordinator can receive areRendezvousRequest
andHeartbeatRequest
.ROUND
: A round is currently in progress. During a round the important messages the coordinator can receive areStartTrainingRoundRequest
andEndTrainingRoundRequest
. Since participants are selected for rounds or not, they can be advertised either ROUND or STANDBY accordingly.FINISHED
: The training session has ended and participants should disconnect from the coordinator.
States are exchanged during heartbeats so that both coordinators and participants can react to each others state change.
The flow of the Coordinator:
The coordinator is started and waits for enough participants to join. STANDBY.
Once enough participants are connected the coordinator starts the rounds. ROUND N.
Repeat step 2. for the given number of rounds
The training session is over and the coordinator is ready to shutdown. FINISHED.
Note
RendezvousRequest
is always allowed regardless of which state the coordinator is on.- Parameters
num_rounds (
int
) – The number of rounds of the training sessionminimum_participants_in_round (
int
) – The minimum number of participants that participate in a roundfraction_of_participants (
float
) – The fraction of total connected participants to be selected in a single round. Defaults to 1.0, meaning that all connected participants will be selected. It must be in the (0.0, 1.0] interval.epochs (
int
) – Number of training iterations local to Participant.epochs_base – Global number of epochs as of last round.
aggregator (
Optional
[Aggregator
]) – The type of aggregation to perform at the end of each round. Defaults toFederatedAveragingAgg
.controller (
Optional
[Controller
]) – Controls how the Participants are selected at the start of each round. Defaults toRandomController
.
-
DEFAULT_AGGREGATOR
= <xain_fl.fl.coordinator.aggregate.FederatedAveragingAgg object>¶ if no Aggregator instance is provided during initialisation, then
FederatedAveragingAgg
in used.
-
DEFAULT_CONTROLLER
= <xain_fl.fl.coordinator.controller.RandomController object>¶ if no Controller instance is provided during initialisation, then
RandomController
in used.
-
get_minimum_connected_participants
()¶ Calculates how many participants are needed so that we can select a specific fraction of them. :returns: int: Minimum number of participants needed to be connected to start a round. :rtype: obj
-
on_message
(message, participant_id)¶ Coordinator method that implements the state machine.
- Parameters
message (
GeneratedProtocolMessageType
) – A protobuf message.participant_id (
str
) – The id of the participant making the request.
- Return type
GeneratedProtocolMessageType
- Returns
The response sent back to the participant.
- Raises
UnknownParticipantError – If it receives a request from an unknown participant. Typically a participant that has not rendezvous with the
Coordinator
.InvalidRequestError – If it receives a request that is not allowed in the current
Coordinator
state.
-
remove_participant
(participant_id)¶ Remove a participant from the list of accepted participants.
This method is to be called when it is detected that a participant has disconnected. After a participant is removed, if the number of remaining participants is less than the minimum number of participants that need to be connected, the
Coordinator
will transition to STANDBY state.- Parameters
participant_id (
str
) – The id of the participant to remove.- Return type
None
-
select_participant_ids_and_init_round
()¶ Selects the participant ids and initiates a Round.
- Return type
None
xain_fl.coordinator.coordinator_grpc module¶
XAIN FL gRPC Coordinator
-
class
xain_fl.coordinator.coordinator_grpc.
CoordinatorGrpc
(coordinator, store)¶ Bases:
xain_proto.fl.coordinator_pb2_grpc.CoordinatorServicer
The Coordinator gRPC service.
The main logic for the Coordinator is decoupled from gRPC and implemented in the
xain_fl.coordinator.coordinator.Coordinator
class. The gRPC message only handles client requests and forwards the messages toxain_fl.coordinator.coordinator.Coordinator
.- Parameters
coordinator (
Coordinator
) – The Coordinator state machine.store (
Store
) – The Store in which the coordinator fetches trained models from the participants and to which it saves aggregated models.
-
EndTrainingRound
(request, context)¶ The EndTrainingRound gRPC method.
Once a participant has finished the training for the round it calls this method in order to submit to the
xain_fl.coordinator.coordinator.Coordinator
the updated weights.- Parameters
request (
EndTrainingRoundRequest
) – The participant’s request. The request contains the updated weights as a result of the training as well as any metrics helpful for thexain_fl.coordinator.coordinator.Coordinator
.context (
ServicerContext
) – The context associated with the gRPC request.
- Return type
EndTrainingRoundResponse
- Returns
The response to the participant’s request. The response is just an acknowledgment that the
xain_fl.coordinator.coordinator.Coordinator
successfully received the updated weights.
-
Heartbeat
(request, context)¶ The Heartbeat gRPC method.
Participants periodically send an heartbeat so that the
Coordinator
can detect failures.- Parameters
request (
HeartbeatRequest
) – The participant’s request. The participant’s request contains the currentState
and round number the participant is on.context (
ServicerContext
) – The context associated with the gRPC request.
- Return type
HeartbeatResponse
- Returns
The response to the participant’s request. The response contains both the
State
and the current round the coordinator is on. If a training session has not started yet the round number defaults to 0.
-
Rendezvous
(request, context)¶ The Rendezvous gRPC method.
A participant contacts the coordinator and the coordinator adds the participant to its list of participants. If the coordinator already has all the participants it tells the participant to try again later.
- Parameters
request (
RendezvousRequest
) – The participant’s request.context (
ServicerContext
) – The context associated with the gRPC request.
- Return type
RendezvousResponse
- Returns
The response to the participant’s request. The response is an enum containing either:
ACCEPT: If the
xain_fl.coordinator.coordinator.Coordinator
does not have enough participants.- LATER: If the
xain_fl.coordinator.coordinator.Coordinator
already has enough participants.
-
StartTrainingRound
(request, context)¶ The StartTrainingRound gRPC method.
Once a participant is notified that the
xain_fl.coordinator.coordinator.Coordinator
is in a round (through the state advertised in theHeartbeatResponse
), the participant should call this method in order to get the global model weights in order to start the training for the round.- Parameters
request (
StartTrainingRoundRequest
) – The participant’s request.context (
ServicerContext
) – The context associated with the gRPC request.
- Return type
StartTrainingRoundResponse
- Returns
The response to the participant’s request. The response contains the global model weights.
xain_fl.coordinator.heartbeat module¶
XAIN FL Hearbeats
-
xain_fl.coordinator.heartbeat.
monitor_heartbeats
(coordinator, terminate_event)¶ Monitors the heartbeat of participants.
If a heartbeat expires the participant is removed from the
Participants
.Note
This is meant to be run inside a thread and expects an
Event
, to know when it should terminate.- Parameters
coordinator (
Coordinator
) – The coordinator to monitor for heartbeats.terminate_event (
Event
) – A threading event to signal that this method should terminate.
- Return type
None
xain_fl.coordinator.participants module¶
XAIN FL Participants
-
class
xain_fl.coordinator.participants.
ParticipantContext
(participant_id)¶ Bases:
object
Class to store state about each participant. Currently it only stores the participant_id and the time when the next heartbeat_expires.
In the future we may store more information like in what state a participant is in e.g. IDLE, RUNNING, …
- Parameters
participant_id (
str
) – The id of the participant. Typically a host:port or public key when using SSL.
-
class
xain_fl.coordinator.participants.
Participants
¶ Bases:
object
This class provides some useful methods to handle all the participants connected to a coordinator in a thread safe manner by protecting access to the participants list with a lock.
-
add
(participant_id)¶ Adds a new participant to the list of participants.
- Parameters
participant_id (
str
) – The id of the participant to add.- Return type
None
-
len
()¶ Get the number of participants.
- Return type
- Returns
The number of participants in the list.
-
next_expiration
()¶ Helper method to check what is the next heartbeat to expire.
Currently being used by the heartbeat_monitor to check how long it should sleep until the next check.
- Return type
- Returns
The next heartbeat to expire.
-
remove
(participant_id)¶ Removes a participant from the list of participants.
This will be typically used after a participant is disconnected from the coordinator.
- Parameters
participant_id (
str
) – The id of the participant to remove.- Return type
None
-
update_expires
(participant_id)¶ Updates the heartbeat expiration time for a participant.
This is currently called by the
xain_fl.coordinator.coordinator.Coordinator
every time a participant sends a heartbeat.- Parameters
participant_id (
str
) – The id of the participant to update the expire time.- Return type
None
-
xain_fl.coordinator.round module¶
XAIN FL Rounds
-
class
xain_fl.coordinator.round.
Round
(participant_ids)¶ Bases:
object
Class to manage the state of a single round. This class contains the logic to handle all updates sent by the participants during a round and does some sanity checks like preventing the same participant to submit multiple updates during a single round.
- Parameters
participant_ids (
List
[str
]) – The list of IDs of the participants selected to participate in this round.
-
add_updates
(participant_id, weight_update, metrics)¶ Valid a participant’s update for the round.
- Parameters
- Raises
DuplicatedUpdateError – If the participant already submitted his update this round.
- Return type
None
-
get_weight_updates
()¶ Get a list of all participants weight updates. This list will usually be used by the aggregation function.