How to Build an AI Agent
How to Build an AI Agent
s08

Task in Background

Concorrenza

Background Threads + Notifications

198 LOC6 strumentiBackgroundManager + notification queue
Run slow operations in the background; the agent keeps thinking ahead

s01 > s02 > s03 > s04 > s05 > s06 | s07 > [ s08 ] s09 > s10 > s11 > s12

"Esegui operazioni lente in background; l'agente continua a pensare" -- thread daemon eseguono comandi, iniettano notifiche al completamento.

Problema

Alcuni comandi richiedono minuti: npm install, pytest, docker build. Con un loop bloccante, il modello resta inatteso in attesa. Se l'utente chiede "installa le dipendenze e mentre quello gira, crea il file di configurazione", l'agente li fa sequenzialmente, non in parallelo.

Soluzione

Main thread                Background thread
+-----------------+        +-----------------+
| agent loop      |        | subprocess runs |
| ...             |        | ...             |
| [LLM call] <---+------- | enqueue(result) |
|  ^drain queue   |        +-----------------+
+-----------------+

Timeline:
Agent --[spawn A]--[spawn B]--[other work]----
             |          |
             v          v
          [A runs]   [B runs]      (parallel)
             |          |
             +-- results injected before next LLM call --+

Come Funziona

  1. BackgroundManager traccia i task con una coda di notifica thread-safe.
class BackgroundManager:
    def __init__(self):
        self.tasks = {}
        self._notification_queue = []
        self._lock = threading.Lock()
  1. run() avvia un thread daemon e ritorna immediatamente.
def run(self, command: str) -> str:
    task_id = str(uuid.uuid4())[:8]
    self.tasks[task_id] = {"status": "running", "command": command}
    thread = threading.Thread(
        target=self._execute, args=(task_id, command), daemon=True)
    thread.start()
    return f"Background task {task_id} started"
  1. Quando il subprocess termina, il suo risultato va nella coda di notifica.
def _execute(self, task_id, command):
    try:
        r = subprocess.run(command, shell=True, cwd=WORKDIR,
            capture_output=True, text=True, timeout=300)
        output = (r.stdout + r.stderr).strip()[:50000]
    except subprocess.TimeoutExpired:
        output = "Error: Timeout (300s)"
    with self._lock:
        self._notification_queue.append({
            "task_id": task_id, "result": output[:500]})
  1. Il loop dell'agente scarica le notifiche prima di ogni chiamata LLM.
def agent_loop(messages: list):
    while True:
        notifs = BG.drain_notifications()
        if notifs:
            notif_text = "\n".join(
                f"[bg:{n['task_id']}] {n['result']}" for n in notifs)
            messages.append({"role": "user",
                "content": f"<background-results>\n{notif_text}\n"
                           f"</background-results>"})
            messages.append({"role": "assistant",
                "content": "Noted background results."})
        response = client.messages.create(...)

Il loop rimane single-threaded. Solo l'I/O del subprocess è parallelizzato.

Cosa è Cambiato da s07

ComponentePrima (s07)Dopo (s08)
Strumenti86 (base + background_run + check)
EsecuzioneSolo bloccanteBloccante + thread in background
NotificaNessunoCoda scaricata per loop
ConcorrenzaNessunoThread daemon

Provalo

python agents/s08_background_tasks.py
  1. Esegui "sleep 5 && echo done" in background, poi crea un file mentre gira
  2. Avvia 3 task in background: "sleep 2", "sleep 4", "sleep 6". Controlla il loro stato.
  3. Esegui pytest in background e continua a lavorare su altre cose