How to Build an AI Agent
How to Build an AI Agent
s08

Tâches d'arrière-plan

Concurrence

Background Threads + Notifications

198 LOC6 outilsBackgroundManager + notification queue
Run slow operations in the background; the agent keeps thinking ahead

s01 > s02 > s03 > s04 > s05 > s06 | s07 > [ s08 ] s09 > s10 > s11 > s12

"Exécutez les opérations lentes en arrière-plan ; l'agent continue de penser" -- les threads daemon exécutent les commandes, injectent des notifications à la complétion.

Problème

Certaines commandes prennent des minutes : npm install, pytest, docker build. Avec une boucle bloquante, le modèle reste inactif en attendant. Si l'utilisateur demande "installer les dépendances et pendant ce temps créer le fichier de configuration", l'agent les fait séquentiellement, pas en parallèle.

Solution

Main thread                Background thread
+-----------------+        +-----------------+
| agent loop      |        | subprocess runs |
| ...             |        | ...             |
| [LLM call] <---+------- | enqueue(result) |
|  ^drain queue   |        +-----------------+
+-----------------+

Timeline:
Agent --[spawn A]--[spawn B]--[other work]----
             |          |
             v          v
          [A runs]   [B runs]      (parallel)
             |          |
             +-- results injected before next LLM call --+

Comment Ça Marche

  1. BackgroundManager suit les tâches avec une queue de notification thread-safe.
class BackgroundManager:
    def __init__(self):
        self.tasks = {}
        self._notification_queue = []
        self._lock = threading.Lock()
  1. run() démarre un thread daemon et retourne immédiatement.
def run(self, command: str) -> str:
    task_id = str(uuid.uuid4())[:8]
    self.tasks[task_id] = {"status": "running", "command": command}
    thread = threading.Thread(
        target=self._execute, args=(task_id, command), daemon=True)
    thread.start()
    return f"Background task {task_id} started"
  1. Quand le subprocess termine, son résultat va dans la queue de notification.
def _execute(self, task_id, command):
    try:
        r = subprocess.run(command, shell=True, cwd=WORKDIR,
            capture_output=True, text=True, timeout=300)
        output = (r.stdout + r.stderr).strip()[:50000]
    except subprocess.TimeoutExpired:
        output = "Error: Timeout (300s)"
    with self._lock:
        self._notification_queue.append({
            "task_id": task_id, "result": output[:500]})
  1. La boucle de l'agent draine les notifications avant chaque appel LLM.
def agent_loop(messages: list):
    while True:
        notifs = BG.drain_notifications()
        if notifs:
            notif_text = "\n".join(
                f"[bg:{n['task_id']}] {n['result']}" for n in notifs)
            messages.append({"role": "user",
                "content": f"<background-results>\n{notif_text}\n"
                           f"</background-results>"})
            messages.append({"role": "assistant",
                "content": "Noted background results."})
        response = client.messages.create(...)

La boucle reste mono-threadée. Seul le subprocess I/O est parallèle.

Qu'est-ce qui a Changé par Rapport à s07

ComposantAvant (s07)Après (s08)
Outils86 (base + background_run + check)
ExécutionBloquant seulementBloquant + threads arrière-plan
NotificationNoneQueue drainée par boucle
ConcurrenceNoneThreads daemon

Essayer

python agents/s08_background_tasks.py
  1. Exécuter "sleep 5 && echo done" en arrière-plan, puis créer un fichier pendant qu'il s'exécute
  2. Démarrer 3 tâches en arrière-plan : "sleep 2", "sleep 4", "sleep 6". Vérifier leur statut.
  3. Exécuter pytest en arrière-plan et continuer à travailler sur d'autres choses