How to Build an AI Agent
How to Build an AI Agent
s08

Tarefas em Segundo Plano

Concorrencia

Background Threads + Notifications

198 LOC6 ferramentasBackgroundManager + notification queue
Run slow operations in the background; the agent keeps thinking ahead

s01 > s02 > s03 > s04 > s05 > s06 | s07 > [ s08 ] s09 > s10 > s11 > s12

"Execute operações lentas em segundo plano; o agente continua pensando" -- threads daemon executam comandos, injetam notificações na conclusão.

Problema

Alguns comandos levam minutos: npm install, pytest, docker build. Com um loop bloqueante, o modelo fica ocioso esperando. Se o usuário perguntar "instale dependências e enquanto isso roda, crie o arquivo de configuração," o agente faz eles sequencialmente, não em paralelo.

Solução

Main thread                Background thread
+-----------------+        +-----------------+
| agent loop      |        | subprocess runs |
| ...             |        | ...             |
| [LLM call] <---+------- | enqueue(result) |
|  ^drain queue   |        +-----------------+
+-----------------+

Timeline:
Agent --[spawn A]--[spawn B]--[other work]----
             |          |
             v          v
          [A runs]   [B runs]      (parallel)
             |          |
             +-- results injected before next LLM call --+

Como Funciona

  1. BackgroundManager rastreia tarefas com uma fila de notificação thread-safe.
class BackgroundManager:
    def __init__(self):
        self.tasks = {}
        self._notification_queue = []
        self._lock = threading.Lock()
  1. run() inicia uma thread daemon e retorna imediatamente.
def run(self, command: str) -> str:
    task_id = str(uuid.uuid4())[:8]
    self.tasks[task_id] = {"status": "running", "command": command}
    thread = threading.Thread(
        target=self._execute, args=(task_id, command), daemon=True)
    thread.start()
    return f"Background task {task_id} started"
  1. Quando o subprocesso termina, seu resultado vai para a fila de notificação.
def _execute(self, task_id, command):
    try:
        r = subprocess.run(command, shell=True, cwd=WORKDIR,
            capture_output=True, text=True, timeout=300)
        output = (r.stdout + r.stderr).strip()[:50000]
    except subprocess.TimeoutExpired:
        output = "Error: Timeout (300s)"
    with self._lock:
        self._notification_queue.append({
            "task_id": task_id, "result": output[:500]})
  1. O loop do agente drena notificações antes de cada chamada LLM.
def agent_loop(messages: list):
    while True:
        notifs = BG.drain_notifications()
        if notifs:
            notif_text = "\n".join(
                f"[bg:{n['task_id']}] {n['result']}" for n in notifs)
            messages.append({"role": "user",
                "content": f"<background-results>\n{notif_text}\n"
                           f"</background-results>"})
            messages.append({"role": "assistant",
                "content": "Noted background results."})
        response = client.messages.create(...)

O loop permanece single-threaded. Apenas subprocess I/O é paralelizado.

O Que Mudou Desde s07

ComponenteAntes (s07)Depois (s08)
Ferramentas86 (base + background_run + check)
ExecuçãoApenas bloqueanteBloqueante + threads em segundo plano
NotificaçãoNenhumFila drenada por loop
ConcorrênciaNenhumThreads daemon

Experimente

python agents/s08_background_tasks.py
  1. Execute "sleep 5 && echo done" em segundo plano, depois crie um arquivo enquanto ele roda
  2. Inicie 3 tarefas em segundo plano: "sleep 2", "sleep 4", "sleep 6". Verifique seus status.
  3. Execute pytest em segundo plano e continue trabalhando em outras coisas