How to Build an AI Agent
How to Build an AI Agent
Back to Background Tasks

TasksBackground Tasks

s07 (207 LOC) → s08 (198 LOC)

LOC Delta

-9lines

New Tools

2

background_runcheck_background
New Classes

1

BackgroundManager
New Functions

0

Tasks

Task Graph + Dependencies

207 LOC

8 tools: bash, read_file, write_file, edit_file, task_create, task_update, task_list, task_get

planning

Background Tasks

Background Threads + Notifications

198 LOC

6 tools: bash, read_file, write_file, edit_file, background_run, check_background

concurrency

Source Code Diff

s07 (s07_task_system.py) -> s08 (s08_background_tasks.py)
11#!/usr/bin/env python3
22"""
3-s07_task_system.py - Tasks
3+s08_background_tasks.py - Background Tasks
44
5-Tasks persist as JSON files in .tasks/ so they survive context compression.
6-Each task has a dependency graph (blockedBy/blocks).
5+Run commands in background threads. A notification queue is drained
6+before each LLM call to deliver results.
77
8- .tasks/
9- task_1.json {"id":1, "subject":"...", "status":"completed", ...}
10- task_2.json {"id":2, "blockedBy":[1], "status":"pending", ...}
11- task_3.json {"id":3, "blockedBy":[2], "blocks":[], ...}
8+ Main thread Background thread
9+ +-----------------+ +-----------------+
10+ | agent loop | | task executes |
11+ | ... | | ... |
12+ | [LLM call] <---+------- | enqueue(result) |
13+ | ^drain queue | +-----------------+
14+ +-----------------+
1215
13- Dependency resolution:
14- +----------+ +----------+ +----------+
15- | task 1 | --> | task 2 | --> | task 3 |
16- | complete | | blocked | | blocked |
17- +----------+ +----------+ +----------+
18- | ^
19- +--- completing task 1 removes it from task 2's blockedBy
16+ Timeline:
17+ Agent ----[spawn A]----[spawn B]----[other work]----
18+ | |
19+ v v
20+ [A runs] [B runs] (parallel)
21+ | |
22+ +-- notification queue --> [results injected]
2023
21-Key insight: "State that survives compression -- because it's outside the conversation."
24+Key insight: "Fire and forget -- the agent doesn't block while the command runs."
2225"""
2326
24-import json
2527import os
2628import subprocess
29+import threading
30+import uuid
2731from pathlib import Path
2832
2933from anthropic import Anthropic
3034from dotenv import load_dotenv
3135
3236load_dotenv(override=True)
3337
3438if os.getenv("ANTHROPIC_BASE_URL"):
3539 os.environ.pop("ANTHROPIC_AUTH_TOKEN", None)
3640
3741WORKDIR = Path.cwd()
3842client = Anthropic(base_url=os.getenv("ANTHROPIC_BASE_URL"))
3943MODEL = os.environ["MODEL_ID"]
40-TASKS_DIR = WORKDIR / ".tasks"
4144
42-SYSTEM = f"You are a coding agent at {WORKDIR}. Use task tools to plan and track work."
45+SYSTEM = f"You are a coding agent at {WORKDIR}. Use background_run for long-running commands."
4346
4447
45-# -- TaskManager: CRUD with dependency graph, persisted as JSON files --
46-class TaskManager:
47- def __init__(self, tasks_dir: Path):
48- self.dir = tasks_dir
49- self.dir.mkdir(exist_ok=True)
50- self._next_id = self._max_id() + 1
48+# -- BackgroundManager: threaded execution + notification queue --
49+class BackgroundManager:
50+ def __init__(self):
51+ self.tasks = {} # task_id -> {status, result, command}
52+ self._notification_queue = [] # completed task results
53+ self._lock = threading.Lock()
5154
52- def _max_id(self) -> int:
53- ids = [int(f.stem.split("_")[1]) for f in self.dir.glob("task_*.json")]
54- return max(ids) if ids else 0
55+ def run(self, command: str) -> str:
56+ """Start a background thread, return task_id immediately."""
57+ task_id = str(uuid.uuid4())[:8]
58+ self.tasks[task_id] = {"status": "running", "result": None, "command": command}
59+ thread = threading.Thread(
60+ target=self._execute, args=(task_id, command), daemon=True
61+ )
62+ thread.start()
63+ return f"Background task {task_id} started: {command[:80]}"
5564
56- def _load(self, task_id: int) -> dict:
57- path = self.dir / f"task_{task_id}.json"
58- if not path.exists():
59- raise ValueError(f"Task {task_id} not found")
60- return json.loads(path.read_text())
65+ def _execute(self, task_id: str, command: str):
66+ """Thread target: run subprocess, capture output, push to queue."""
67+ try:
68+ r = subprocess.run(
69+ command, shell=True, cwd=WORKDIR,
70+ capture_output=True, text=True, timeout=300
71+ )
72+ output = (r.stdout + r.stderr).strip()[:50000]
73+ status = "completed"
74+ except subprocess.TimeoutExpired:
75+ output = "Error: Timeout (300s)"
76+ status = "timeout"
77+ except Exception as e:
78+ output = f"Error: {e}"
79+ status = "error"
80+ self.tasks[task_id]["status"] = status
81+ self.tasks[task_id]["result"] = output or "(no output)"
82+ with self._lock:
83+ self._notification_queue.append({
84+ "task_id": task_id,
85+ "status": status,
86+ "command": command[:80],
87+ "result": (output or "(no output)")[:500],
88+ })
6189
62- def _save(self, task: dict):
63- path = self.dir / f"task_{task['id']}.json"
64- path.write_text(json.dumps(task, indent=2))
65-
66- def create(self, subject: str, description: str = "") -> str:
67- task = {
68- "id": self._next_id, "subject": subject, "description": description,
69- "status": "pending", "blockedBy": [], "blocks": [], "owner": "",
70- }
71- self._save(task)
72- self._next_id += 1
73- return json.dumps(task, indent=2)
74-
75- def get(self, task_id: int) -> str:
76- return json.dumps(self._load(task_id), indent=2)
77-
78- def update(self, task_id: int, status: str = None,
79- add_blocked_by: list = None, add_blocks: list = None) -> str:
80- task = self._load(task_id)
81- if status:
82- if status not in ("pending", "in_progress", "completed"):
83- raise ValueError(f"Invalid status: {status}")
84- task["status"] = status
85- # When a task is completed, remove it from all other tasks' blockedBy
86- if status == "completed":
87- self._clear_dependency(task_id)
88- if add_blocked_by:
89- task["blockedBy"] = list(set(task["blockedBy"] + add_blocked_by))
90- if add_blocks:
91- task["blocks"] = list(set(task["blocks"] + add_blocks))
92- # Bidirectional: also update the blocked tasks' blockedBy lists
93- for blocked_id in add_blocks:
94- try:
95- blocked = self._load(blocked_id)
96- if task_id not in blocked["blockedBy"]:
97- blocked["blockedBy"].append(task_id)
98- self._save(blocked)
99- except ValueError:
100- pass
101- self._save(task)
102- return json.dumps(task, indent=2)
103-
104- def _clear_dependency(self, completed_id: int):
105- """Remove completed_id from all other tasks' blockedBy lists."""
106- for f in self.dir.glob("task_*.json"):
107- task = json.loads(f.read_text())
108- if completed_id in task.get("blockedBy", []):
109- task["blockedBy"].remove(completed_id)
110- self._save(task)
111-
112- def list_all(self) -> str:
113- tasks = []
114- for f in sorted(self.dir.glob("task_*.json")):
115- tasks.append(json.loads(f.read_text()))
116- if not tasks:
117- return "No tasks."
90+ def check(self, task_id: str = None) -> str:
91+ """Check status of one task or list all."""
92+ if task_id:
93+ t = self.tasks.get(task_id)
94+ if not t:
95+ return f"Error: Unknown task {task_id}"
96+ return f"[{t['status']}] {t['command'][:60]}\n{t.get('result') or '(running)'}"
11897 lines = []
119- for t in tasks:
120- marker = {"pending": "[ ]", "in_progress": "[>]", "completed": "[x]"}.get(t["status"], "[?]")
121- blocked = f" (blocked by: {t['blockedBy']})" if t.get("blockedBy") else ""
122- lines.append(f"{marker} #{t['id']}: {t['subject']}{blocked}")
123- return "\n".join(lines)
98+ for tid, t in self.tasks.items():
99+ lines.append(f"{tid}: [{t['status']}] {t['command'][:60]}")
100+ return "\n".join(lines) if lines else "No background tasks."
124101
102+ def drain_notifications(self) -> list:
103+ """Return and clear all pending completion notifications."""
104+ with self._lock:
105+ notifs = list(self._notification_queue)
106+ self._notification_queue.clear()
107+ return notifs
125108
126-TASKS = TaskManager(TASKS_DIR)
127109
110+BG = BackgroundManager()
128111
129-# -- Base tool implementations --
112+
113+# -- Tool implementations --
130114def safe_path(p: str) -> Path:
131115 path = (WORKDIR / p).resolve()
132116 if not path.is_relative_to(WORKDIR):
133117 raise ValueError(f"Path escapes workspace: {p}")
134118 return path
135119
136120def run_bash(command: str) -> str:
137121 dangerous = ["rm -rf /", "sudo", "shutdown", "reboot", "> /dev/"]
138122 if any(d in command for d in dangerous):
139123 return "Error: Dangerous command blocked"
140124 try:
141125 r = subprocess.run(command, shell=True, cwd=WORKDIR,
142126 capture_output=True, text=True, timeout=120)
143127 out = (r.stdout + r.stderr).strip()
144128 return out[:50000] if out else "(no output)"
145129 except subprocess.TimeoutExpired:
146130 return "Error: Timeout (120s)"
147131
148132def run_read(path: str, limit: int = None) -> str:
149133 try:
150134 lines = safe_path(path).read_text().splitlines()
151135 if limit and limit < len(lines):
152136 lines = lines[:limit] + [f"... ({len(lines) - limit} more)"]
153137 return "\n".join(lines)[:50000]
154138 except Exception as e:
155139 return f"Error: {e}"
156140
157141def run_write(path: str, content: str) -> str:
158142 try:
159143 fp = safe_path(path)
160144 fp.parent.mkdir(parents=True, exist_ok=True)
161145 fp.write_text(content)
162146 return f"Wrote {len(content)} bytes"
163147 except Exception as e:
164148 return f"Error: {e}"
165149
166150def run_edit(path: str, old_text: str, new_text: str) -> str:
167151 try:
168152 fp = safe_path(path)
169153 c = fp.read_text()
170154 if old_text not in c:
171155 return f"Error: Text not found in {path}"
172156 fp.write_text(c.replace(old_text, new_text, 1))
173157 return f"Edited {path}"
174158 except Exception as e:
175159 return f"Error: {e}"
176160
177161
178162TOOL_HANDLERS = {
179- "bash": lambda **kw: run_bash(kw["command"]),
180- "read_file": lambda **kw: run_read(kw["path"], kw.get("limit")),
181- "write_file": lambda **kw: run_write(kw["path"], kw["content"]),
182- "edit_file": lambda **kw: run_edit(kw["path"], kw["old_text"], kw["new_text"]),
183- "task_create": lambda **kw: TASKS.create(kw["subject"], kw.get("description", "")),
184- "task_update": lambda **kw: TASKS.update(kw["task_id"], kw.get("status"), kw.get("addBlockedBy"), kw.get("addBlocks")),
185- "task_list": lambda **kw: TASKS.list_all(),
186- "task_get": lambda **kw: TASKS.get(kw["task_id"]),
163+ "bash": lambda **kw: run_bash(kw["command"]),
164+ "read_file": lambda **kw: run_read(kw["path"], kw.get("limit")),
165+ "write_file": lambda **kw: run_write(kw["path"], kw["content"]),
166+ "edit_file": lambda **kw: run_edit(kw["path"], kw["old_text"], kw["new_text"]),
167+ "background_run": lambda **kw: BG.run(kw["command"]),
168+ "check_background": lambda **kw: BG.check(kw.get("task_id")),
187169}
188170
189171TOOLS = [
190- {"name": "bash", "description": "Run a shell command.",
172+ {"name": "bash", "description": "Run a shell command (blocking).",
191173 "input_schema": {"type": "object", "properties": {"command": {"type": "string"}}, "required": ["command"]}},
192174 {"name": "read_file", "description": "Read file contents.",
193175 "input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "limit": {"type": "integer"}}, "required": ["path"]}},
194176 {"name": "write_file", "description": "Write content to file.",
195177 "input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "content": {"type": "string"}}, "required": ["path", "content"]}},
196178 {"name": "edit_file", "description": "Replace exact text in file.",
197179 "input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "old_text": {"type": "string"}, "new_text": {"type": "string"}}, "required": ["path", "old_text", "new_text"]}},
198- {"name": "task_create", "description": "Create a new task.",
199- "input_schema": {"type": "object", "properties": {"subject": {"type": "string"}, "description": {"type": "string"}}, "required": ["subject"]}},
200- {"name": "task_update", "description": "Update a task's status or dependencies.",
201- "input_schema": {"type": "object", "properties": {"task_id": {"type": "integer"}, "status": {"type": "string", "enum": ["pending", "in_progress", "completed"]}, "addBlockedBy": {"type": "array", "items": {"type": "integer"}}, "addBlocks": {"type": "array", "items": {"type": "integer"}}}, "required": ["task_id"]}},
202- {"name": "task_list", "description": "List all tasks with status summary.",
203- "input_schema": {"type": "object", "properties": {}}},
204- {"name": "task_get", "description": "Get full details of a task by ID.",
205- "input_schema": {"type": "object", "properties": {"task_id": {"type": "integer"}}, "required": ["task_id"]}},
180+ {"name": "background_run", "description": "Run command in background thread. Returns task_id immediately.",
181+ "input_schema": {"type": "object", "properties": {"command": {"type": "string"}}, "required": ["command"]}},
182+ {"name": "check_background", "description": "Check background task status. Omit task_id to list all.",
183+ "input_schema": {"type": "object", "properties": {"task_id": {"type": "string"}}}},
206184]
207185
208186
209187def agent_loop(messages: list):
210188 while True:
189+ # Drain background notifications and inject as system message before LLM call
190+ notifs = BG.drain_notifications()
191+ if notifs and messages:
192+ notif_text = "\n".join(
193+ f"[bg:{n['task_id']}] {n['status']}: {n['result']}" for n in notifs
194+ )
195+ messages.append({"role": "user", "content": f"<background-results>\n{notif_text}\n</background-results>"})
196+ messages.append({"role": "assistant", "content": "Noted background results."})
211197 response = client.messages.create(
212198 model=MODEL, system=SYSTEM, messages=messages,
213199 tools=TOOLS, max_tokens=8000,
214200 )
215201 messages.append({"role": "assistant", "content": response.content})
216202 if response.stop_reason != "tool_use":
217203 return
218204 results = []
219205 for block in response.content:
220206 if block.type == "tool_use":
221207 handler = TOOL_HANDLERS.get(block.name)
222208 try:
223209 output = handler(**block.input) if handler else f"Unknown tool: {block.name}"
224210 except Exception as e:
225211 output = f"Error: {e}"
226212 print(f"> {block.name}: {str(output)[:200]}")
227213 results.append({"type": "tool_result", "tool_use_id": block.id, "content": str(output)})
228214 messages.append({"role": "user", "content": results})
229215
230216
231217if __name__ == "__main__":
232218 history = []
233219 while True:
234220 try:
235- query = input("\033[36ms07 >> \033[0m")
221+ query = input("\033[36ms08 >> \033[0m")
236222 except (EOFError, KeyboardInterrupt):
237223 break
238224 if query.strip().lower() in ("q", "exit", ""):
239225 break
240226 history.append({"role": "user", "content": query})
241227 agent_loop(history)
242228 response_content = history[-1]["content"]
243229 if isinstance(response_content, list):
244230 for block in response_content:
245231 if hasattr(block, "text"):
246232 print(block.text)
247233 print()