Mercury SkillsMercury Skills
v1.0.0 cosmicstack-labs

Agent Task Delegation & Load Balancing

Design and operate task delegation systems for multi-agent fleets. Covers workload distribution, load balancing, queue management, priority scheduling, and dynamic agent scaling for production agent systems.

View source0 downloads
task-delegationload-balancingqueue-managementworkload-distributionagent-orchestrationscaling

Agent Task Delegation & Load Balancing#

Overview#

A multi-agent system without delegation logic is a mob, not a team. Tasks must be routed to the right agent, prioritized correctly, and balanced across available capacity. This skill covers queue-based architectures, routing strategies, backpressure handling, and dynamic scaling for production agent workloads.


Core Concepts#

Delegation Models#

ModelDescriptionBest For
Direct AssignmentTask is routed to a specific agent by nameKnown, fixed responsibilities
Work QueueTasks go into a queue; agents pull when readyVariable workloads, many agents
RouterClassifier decides which agent handles each taskHeterogeneous task types
SupervisorOrchestrator delegates and synthesizesComplex multi-step workflows
BroadcastAll agents receive task; first responder claims itRedundancy, SLA-critical tasks

Load Balancing Strategies#

StrategyAlgorithmWhen to Use
Round RobinCycle through agents in orderIdentical agents, uniform tasks
Least ConnectionsAssign to agent with fewest active tasksVariable task duration
WeightedBased on agent capacity/priorityHeterogeneous agent capabilities
Consistent HashingHash task → agent (deterministic)Session affinity, cache locality
Latency-BasedRoute to fastest available agentPerformance-sensitive tasks
RandomPick agent at randomSimple, symmetrical setups

Step-by-Step Implementation#

Step 1: Build the Task Queue#

from dataclasses import dataclass
from enum import Enum
import asyncio
import time

class Priority(Enum):
    CRITICAL = 0
    HIGH = 1
    MEDIUM = 2
    LOW = 3

@dataclass
class Task:
    id: str
    agent_type: str
    payload: dict
    priority: Priority = Priority.MEDIUM
    created_at: float = None
    timeout: int = 30
    retry_count: int = 0
    max_retries: int = 3
    
    def __post_init__(self):
        if self.created_at is None:
            self.created_at = time.time()

class TaskQueue:
    """Priority-based task queue with timeout handling."""
    
    def __init__(self):
        self.queues = {
            Priority.CRITICAL: asyncio.Queue(),
            Priority.HIGH: asyncio.Queue(),
            Priority.MEDIUM: asyncio.Queue(),
            Priority.LOW: asyncio.Queue(),
        }
    
    async def enqueue(self, task: Task):
        """Add task to the appropriate priority queue."""
        await self.queues[task.priority].put(task)
    
    async def dequeue(self) -> Task:
        """Get the highest-priority available task."""
        for priority in sorted([p for p in Priority]):
            queue = self.queues[priority]
            if not queue.empty():
                task = await queue.get()
                # Check if task has expired
                if time.time() - task.created_at > task.timeout:
                    return await self.dequeue()  # Skip expired task
                return task
        
        return None  # All queues empty

Step 2: Implement the Delegator#

class AgentDelegator:
    """Routes tasks to the right agent with load balancing."""
    
    def __init__(self, task_queue: TaskQueue):
        self.queue = task_queue
        self.agents = {}  # agent_type -> list of agent instances
        self.active_tasks = {}  # agent_id -> count
        self.capacity = {}  # agent_id -> max concurrent tasks
    
    def register_agent(self, agent_type: str, agent, capacity: int = 5):
        """Register an agent that can handle tasks."""
        if agent_type not in self.agents:
            self.agents[agent_type] = []
        agent_id = f"{agent_type}-{len(self.agents[agent_type])}"
        agent.agent_id = agent_id
        self.agents[agent_type].append(agent)
        self.active_tasks[agent_id] = 0
        self.capacity[agent_id] = capacity
    
    async def delegate(self, task: Task) -> str:
        """Assign task to the best available agent."""
        available = self._find_available(task.agent_type)
        
        if not available:
            # Backpressure — queue the task
            await self.queue.enqueue(task)
            return f"queued:{task.id}"
        
        agent = self._select_agent(available)
        self.active_tasks[agent.agent_id] += 1
        
        try:
            result = await asyncio.wait_for(
                agent.run(task.payload),
                timeout=task.timeout
            )
            return result
        finally:
            self.active_tasks[agent.agent_id] -= 1
    
    def _find_available(self, agent_type: str) -> list:
        """Find agents with available capacity."""
        available = []
        for agent in self.agents.get(agent_type, []):
            if self.active_tasks[agent.agent_id] < self.capacity[agent.agent_id]:
                available.append(agent)
        return available
    
    def _select_agent(self, available: list):
        """Select the best agent using least-connections strategy."""
        return min(available, key=lambda a: self.active_tasks[a.agent_id])

Step 3: Add Backpressure & Rate Limiting#

class BackpressureManager:
    """Prevent overload with backpressure mechanisms."""
    
    def __init__(self, max_queue_depth: int = 1000,
                 max_concurrent: int = 50):
        self.max_queue_depth = max_queue_depth
        self.max_concurrent = max_concurrent
        self.current_concurrent = 0
    
    async def acquire(self) -> bool:
        """Try to acquire a slot. Returns False if overloaded."""
        if self.current_concurrent >= self.max_concurrent:
            return False
        self.current_concurrent += 1
        return True
    
    def release(self):
        """Release a slot when task completes."""
        self.current_concurrent -= 1
    
    def is_overloaded(self, queue_depth: int) -> bool:
        """Check if the system is under backpressure."""
        return (queue_depth > self.max_queue_depth or 
                self.current_concurrent >= self.max_concurrent)

class RateLimiter:
    """Token-bucket rate limiter for agent invocations."""
    
    def __init__(self, rate: float, burst: int):
        self.rate = rate  # tokens per second
        self.burst = burst
        self.tokens = burst
        self.last_refill = time.time()
    
    async def wait_if_needed(self):
        """Block until a token is available."""
        while True:
            self._refill()
            if self.tokens >= 1:
                self.tokens -= 1
                return
            await asyncio.sleep(0.05)
    
    def _refill(self):
        now = time.time()
        elapsed = now - self.last_refill
        self.tokens = min(self.burst, self.tokens + elapsed * self.rate)
        self.last_refill = now

Step 4: Implement the Supervisor Pattern#

class SupervisorAgent:
    """Orchestrator that decomposes tasks and delegates to specialists."""

    def __init__(self, delegator: AgentDelegator, llm):
        self.delegator = delegator
        self.llm = llm
        self.planner = TaskPlanner()

    async def process(self, user_task: str) -> str:
        """Break down task, delegate subtasks, synthesize results."""
        
        # Step 1: Plan — decompose the task
        plan = await self.planner.create_plan(user_task)
        
        # Step 2: Delegate — dispatch subtasks in dependency order
        results = {}
        for step in plan.sorted_steps():
            task = Task(
                id=step.id,
                agent_type=step.agent_type,
                payload={"instruction": step.instruction, "context": results},
                priority=step.priority,
                timeout=step.timeout
            )
            result = await self.delegator.delegate(task)
            results[step.id] = result
        
        # Step 3: Synthesize — combine results into final response
        return await self._synthesize(plan, results)

    async def _synthesize(self, plan, results: dict) -> str:
        """Combine agent outputs into a cohesive response."""
        context = "\n\n".join([
            f"### {step.description}\n{results[step.id]}"
            for step in plan.steps
        ])
        
        return await self.llm.generate(
            f"Synthesize these results into a final response:\n\n{context}"
        )

Step 5: Dynamic Agent Scaling#

class AutoScaler:
    """Scale agent pools up and down based on demand."""
    
    def __init__(self, delegator: AgentDelegator, min_agents: int = 2,
                 max_agents: int = 20, scale_up_threshold: float = 0.8,
                 scale_down_threshold: float = 0.2):
        self.delegator = delegator
        self.min_agents = min_agents
        self.max_agents = max_agents
        self.scale_up_threshold = scale_up_threshold
        self.scale_down_threshold = scale_down_threshold
    
    async def evaluate(self, agent_type: str):
        """Check metrics and scale if needed."""
        agents = self.delegator.agents.get(agent_type, [])
        current_count = len(agents)
        
        # Calculate utilization
        active = sum(
            self.delegator.active_tasks[a.agent_id] 
            for a in agents
        )
        capacity = sum(
            self.delegator.capacity[a.agent_id] 
            for a in agents
        )
        utilization = active / capacity if capacity > 0 else 0
        
        # Scale up
        if utilization > self.scale_up_threshold and current_count < self.max_agents:
            await self._add_agent(agent_type)
        
        # Scale down
        elif utilization < self.scale_down_threshold and current_count > self.min_agents:
            await self._remove_agent(agent_type)
    
    async def _add_agent(self, agent_type: str):
        """Spin up a new agent instance."""
        new_agent = await AgentFactory.create(agent_type)
        self.delegator.register_agent(agent_type, new_agent)
        logger.info(f"Scaled up {agent_type}: {len(self.delegator.agents[agent_type])} agents")
    
    async def _remove_agent(self, agent_type: str):
        """Gracefully remove an idle agent."""
        agents = self.delegator.agents[agent_type]
        # Find the least busy agent
        idle_agents = [
            a for a in agents 
            if self.delegator.active_tasks[a.agent_id] == 0
        ]
        if idle_agents:
            agent = idle_agents[0]
            agents.remove(agent)
            logger.info(f"Scaled down {agent_type}: {len(agents)} agents")

Queue Architecture#

                    ┌─────────────────┐
                    │   Task Ingress   │
                    └────────┬────────┘

                    ┌────────▼────────┐
                    │   Rate Limiter   │
                    └────────┬────────┘

                    ┌────────▼────────┐
                    │   Task Queue     │
                    │  (Prioritized)   │
                    └────────┬────────┘

                    ┌────────▼────────┐
                    │  Agent Delegator │
                    └──┬────┬────┬────┘
                       │    │    │
              ┌────────▼┐ ┌─▼──┐ ┌▼────────┐
              │ Agent A │ │ B  │ │ Agent C │
              └─────────┘ └────┘ └─────────┘
                       │    │    │
                    ┌──▼────▼────▼──┐
                    │   Result Bus   │
                    └────────────────┘

Trigger Phrases#

PhraseAction
"Delegate this task"Route task to appropriate agent
"Show queue depth"Report current queue size and priority breakdown
"Scale up agents"Increase agent pool for a type
"Which agent is overloaded?"Show utilization per agent
"Set priority for this task"Re-queue with different priority level
"Check load distribution"Show how tasks are balanced across agents
"Pause agent type X"Stop routing new tasks to a specific type
"Drain agent X gracefully"Let current tasks finish, don't assign new ones

Anti-Patterns#

Anti-PatternWhy It FailsFix
No backpressureSystem collapses under loadImplement queue depth limits
Synchronous delegationOne slow agent blocks all tasksAsync dispatch with timeouts
Ignoring task affinityAgents lose cache benefitsConsistent hashing for session stickiness
Infinite queue growthMemory exhaustion, stale tasksTTL on queued tasks, dead-letter queues
Over-provisioning agentsWasted resources, unnecessary costAuto-scale based on real-time utilization
No dead-letter handlingFailed tasks disappear silentlyLog failures, alert on patterns

More in AI / ML

View all →