How to Build an AI Agent
How to Build an AI Agent
s08

Tareas en Segundo Plano

Concurrencia

Background Threads + Notifications

198 LOC6 herramientasBackgroundManager + notification queue
Run slow operations in the background; the agent keeps thinking ahead

s01 > s02 > s03 > s04 > s05 > s06 | s07 > [ s08 ] s09 > s10 > s11 > s12

"Ejecuta operaciones lentas en segundo plano; el agente sigue pensando" -- los threads daemon ejecutan comandos, inyectan notificaciones al completar.

Problema

Algunos comandos toman minutos: npm install, pytest, docker build. Con un bucle bloqueante, el modelo queda inactivo esperando. Si el usuario pregunta "instalar dependencias y mientras eso corre, crear el archivo de configuración", el agente los hace secuencialmente, no en paralelo.

Solución

Hilo principal              Hilo en segundo plano
+-----------------+        +-----------------+
| agent loop      |        | subprocess runs |
| ...             |        | ...             |
| [LLM call] <---+------- | enqueue(result) |
|  ^drain queue   |        +-----------------+
+-----------------+

Timeline:
Agent --[spawn A]--[spawn B]--[other work]----
             |          |
             v          v
          [A runs]   [B runs]      (parallel)
             |          |
             +-- results injected before next LLM call --+

Cómo Funciona

  1. BackgroundManager rastrea tareas con una cola de notificaciones thread-safe.
class BackgroundManager:
    def __init__(self):
        self.tasks = {}
        self._notification_queue = []
        self._lock = threading.Lock()
  1. run() inicia un thread daemon y retorna inmediatamente.
def run(self, command: str) -> str:
    task_id = str(uuid.uuid4())[:8]
    self.tasks[task_id] = {"status": "running", "command": command}
    thread = threading.Thread(
        target=self._execute, args=(task_id, command), daemon=True)
    thread.start()
    return f"Background task {task_id} started"
  1. Cuando el subprocess termina, su resultado va a la cola de notificaciones.
def _execute(self, task_id, command):
    try:
        r = subprocess.run(command, shell=True, cwd=WORKDIR,
            capture_output=True, text=True, timeout=300)
        output = (r.stdout + r.stderr).strip()[:50000]
    except subprocess.TimeoutExpired:
        output = "Error: Timeout (300s)"
    with self._lock:
        self._notification_queue.append({
            "task_id": task_id, "result": output[:500]})
  1. El bucle del agente drena notificaciones antes de cada llamada LLM.
def agent_loop(messages: list):
    while True:
        notifs = BG.drain_notifications()
        if notifs:
            notif_text = "\n".join(
                f"[bg:{n['task_id']}] {n['result']}" for n in notifs)
            messages.append({"role": "user",
                "content": f"<background-results>\n{notif_text}\n"
                           f"</background-results>"})
            messages.append({"role": "assistant",
                "content": "Noted background results."})
        response = client.messages.create(...)

El bucle permanece mono-threaded. Solo el I/O de subprocess es paralelizado.

Qué Cambió Respecto a s07

ComponenteAntes (s07)Después (s08)
Herramientas86 (base + background_run + check)
EjecuciónSolo bloqueanteBloqueante + threads en segundo plano
NotificaciónNoneCola drenada por bucle
ConcurrenciaNoneThreads daemon

Pruébalo

python agents/s08_background_tasks.py
  1. Ejecutar "sleep 5 && echo done" en segundo plano, luego crear un archivo mientras corre
  2. Iniciar 3 tareas en segundo plano: "sleep 2", "sleep 4", "sleep 6". Verificar su estado.
  3. Ejecutar pytest en segundo plano y continuar trabajando en otras cosas