coordinator
Coordinator¶
¶
        coordinator
      ¶
        Coordinator
      ¶
        
__init__(self, port)        
  
      special
  
      ¶
init coordinator
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| port | int | coordinator serving port | required | 
Source code in fedvision/framework/coordinator/coordinator.py
          def __init__(self, port: int):
    """
    init coordinator
    Args:
        port: coordinator serving port
    """
    self._serving = True
    self._enrolled: MutableMapping[str, _TaskProviderForEnrolledParty] = {}
    self._proposals: MutableMapping[str, _Proposal] = {}
    self._job_type_to_subscribes: MutableMapping[str, MutableSet[str]] = {}
    self._check_interval = 0.5
    self._count_id = 0
    self._grpc_port = port
    self._grpc_server = None
        
FetchTask(self, request, context)        
  
      async
  
      ¶
handle task fetch gRPC request
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| request | coordinator_pb2.FetchTask.REQ | required | |
| context | grpc.aio.ServicerContext | required | 
Returns:
| Type | Description | 
|---|---|
| coordinator_pb2.Proposal.REP | 
Source code in fedvision/framework/coordinator/coordinator.py
          async def FetchTask(
    self, request: coordinator_pb2.FetchTask.REQ, context: grpc.aio.ServicerContext
) -> coordinator_pb2.Proposal.REP:
    """
    handle task fetch gRPC request
    Args:
        request:
        context:
    Returns:
    """
    if request.proposal_id not in self._proposals:
        return coordinator_pb2.FetchTask.REP(
            status=coordinator_pb2.FetchTask.NOT_FOUND
        )
    if request.party_id not in self._enrolled:
        return coordinator_pb2.FetchTask.REP(
            status=coordinator_pb2.FetchTask.NOT_ALLOW
        )
    proposal = self._proposals[request.proposal_id]
    if proposal.open_period_finished.is_set():
        return coordinator_pb2.FetchTask.REP(
            status=coordinator_pb2.FetchTask.TIMEOUT
        )
    proposal.add_responders(request.party_id)
    await proposal.open_period_finished.wait()
    if not proposal.goal_reached:
        return coordinator_pb2.FetchTask.REP(
            status=coordinator_pb2.FetchTask.CANCELED
        )
    self.info(f"chosen: {proposal.chosen.keys()}")
    if request.party_id not in proposal.chosen:
        return coordinator_pb2.FetchTask.REP(
            status=coordinator_pb2.FetchTask.RANDOM_OUT
        )
    # accepted, finally!
    success_rep = coordinator_pb2.FetchTask.REP(
        status=coordinator_pb2.FetchTask.READY,
        task=proposal.chosen[request.party_id],
    )
    return success_rep
        
Leave(self, request, context)        
  
      async
  
      ¶
Missing associated documentation comment in .proto file.
Source code in fedvision/framework/coordinator/coordinator.py
          async def Leave(self, request, context):
    if request.party_id not in self._enrolled:
        return coordinator_pb2.Leave.REP(status=coordinator_pb2.Leave.NOT_FOUND)
    self._enrolled[request.party_id].closed = True
    await self._enrolled[request.party_id].queue.join()
    self._enrolled.__delitem__(request.party_id)
    return coordinator_pb2.Leave.REP(status=coordinator_pb2.Leave.SUCCESS)
        
Proposal(self, request, context)        
  
      async
  
      ¶
handle job proposal gRPC request
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| request | coordinator_pb2.Proposal.REQ | required | |
| context | grpc.aio.ServicerContext | required | 
Returns:
| Type | Description | 
|---|---|
| coordinator_pb2.Proposal.REP | 
Source code in fedvision/framework/coordinator/coordinator.py
          async def Proposal(
    self, request: coordinator_pb2.Proposal.REQ, context: grpc.aio.ServicerContext
) -> coordinator_pb2.Proposal.REP:
    """
    handle job proposal gRPC request
    Args:
        request:
        context:
    Returns:
    """
    uid = self._generate_proposal_id(request.job_id)
    proposal = _Proposal.from_pb(uid=uid, pb=request)
    self._proposals[uid] = proposal
    if proposal.job_type not in self._job_type_to_subscribes:
        self.info(
            f"job type {proposal.job_type} not in {self._job_type_to_subscribes}, reject"
        )
        return coordinator_pb2.Proposal.REP(status=coordinator_pb2.Proposal.REJECT)
    if (
        len(self._job_type_to_subscribes[proposal.job_type])
        < proposal.minimum_acceptance
    ):
        self.info(
            f"not enough parties alive accept job type {proposal.job_type},"
            f" required {proposal.minimum_acceptance}, "
            f"{len(self._job_type_to_subscribes[proposal.job_type])} alive"
        )
        return coordinator_pb2.Proposal.REP(
            status=coordinator_pb2.Proposal.NOT_ENOUGH_SUBSCRIBERS
        )
    # dispatch proposal
    for party_id in self._job_type_to_subscribes[proposal.job_type]:
        await self._enrolled[party_id].queue.put(proposal)
    # sleep until timeout and then check if there are enough responders
    await asyncio.sleep(request.proposal_wait_time)
    if not proposal.has_enough_responders():
        proposal.set_open_period_finished(goal_reached=False)
        return coordinator_pb2.Proposal.REP(
            status=coordinator_pb2.Proposal.NOT_ENOUGH_RESPONDERS
        )
    proposal.set_open_period_finished(goal_reached=True)
    return coordinator_pb2.Proposal.REP(status=coordinator_pb2.Proposal.SUCCESS)
        
Subscribe(self, request, context)        
      ¶
handle subscribe gRPC request, response job proposals in stream
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| request | coordinator_pb2.Subscribe.REQ | required | |
| context | grpc.aio.ServicerContext | required | 
Returns:
| Type | Description | 
|---|---|
| AsyncGenerator[coordinator_pb2.Subscribe.REP, None] | 
Source code in fedvision/framework/coordinator/coordinator.py
          async def Subscribe(
    self, request: coordinator_pb2.Subscribe.REQ, context: grpc.aio.ServicerContext
) -> AsyncGenerator[coordinator_pb2.Subscribe.REP, None]:
    """
    handle subscribe gRPC request, response job proposals in stream
    Args:
        request:
        context:
    Returns:
    """
    if request.party_id in self._enrolled:
        yield coordinator_pb2.Subscribe.REP(
            status=coordinator_pb2.Subscribe.DUPLICATE_ENROLL
        )
        return
    if not self._serving:
        yield coordinator_pb2.Subscribe.REP(
            status=coordinator_pb2.Subscribe.NOT_SERVING
        )
        return
    task_provider = _TaskProviderForEnrolledParty()
    self._enrolled[request.party_id] = task_provider
    for job_type in request.job_types:
        self._job_type_to_subscribes.setdefault(job_type, set()).add(
            request.party_id
        )
    while True:
        if task_provider.closed:
            break
        try:
            # make stop subscribe passable, check status regularly
            proposal = await asyncio.wait_for(
                task_provider.queue.get(), timeout=self._check_interval
            )
        except asyncio.TimeoutError:
            # not receive proposal task for a while, maybe:
            # 1. just no new proposal
            # 2. flag has changed to false and no new proposal will in-queue
            continue
        else:
            yield coordinator_pb2.Subscribe.REP(
                status=coordinator_pb2.Subscribe.SUCCESS,
                proposal_id=proposal.uid,
                job_type=proposal.job_type,
            )
            task_provider.queue.task_done()
    # clean proposals
    while not task_provider.queue.empty():
        await task_provider.queue.get()
        task_provider.queue.task_done()