How to Build an AI Agent
How to Build an AI Agent
Back to Tasks

CompactTasks

s06 (205 LOC) → s07 (207 LOC)

LOC Delta

+2lines

New Tools

4

task_createtask_updatetask_listtask_get
New Classes

1

TaskManager
New Functions

0

Compact

Three-Layer Compression

205 LOC

5 tools: bash, read_file, write_file, edit_file, compact

memory

Tasks

Task Graph + Dependencies

207 LOC

8 tools: bash, read_file, write_file, edit_file, task_create, task_update, task_list, task_get

planning

Source Code Diff

s06 (s06_context_compact.py) -> s07 (s07_task_system.py)
11#!/usr/bin/env python3
22"""
3-s06_context_compact.py - Compact
3+s07_task_system.py - Tasks
44
5-Three-layer compression pipeline so the agent can work forever:
5+Tasks persist as JSON files in .tasks/ so they survive context compression.
6+Each task has a dependency graph (blockedBy/blocks).
67
7- Every turn:
8- +------------------+
9- | Tool call result |
10- +------------------+
11- |
12- v
13- [Layer 1: micro_compact] (silent, every turn)
14- Replace tool_result content older than last 3
15- with "[Previous: used {tool_name}]"
16- |
17- v
18- [Check: tokens > 50000?]
19- | |
20- no yes
21- | |
22- v v
23- continue [Layer 2: auto_compact]
24- Save full transcript to .transcripts/
25- Ask LLM to summarize conversation.
26- Replace all messages with [summary].
27- |
28- v
29- [Layer 3: compact tool]
30- Model calls compact -> immediate summarization.
31- Same as auto, triggered manually.
8+ .tasks/
9+ task_1.json {"id":1, "subject":"...", "status":"completed", ...}
10+ task_2.json {"id":2, "blockedBy":[1], "status":"pending", ...}
11+ task_3.json {"id":3, "blockedBy":[2], "blocks":[], ...}
3212
33-Key insight: "The agent can forget strategically and keep working forever."
13+ Dependency resolution:
14+ +----------+ +----------+ +----------+
15+ | task 1 | --> | task 2 | --> | task 3 |
16+ | complete | | blocked | | blocked |
17+ +----------+ +----------+ +----------+
18+ | ^
19+ +--- completing task 1 removes it from task 2's blockedBy
20+
21+Key insight: "State that survives compression -- because it's outside the conversation."
3422"""
3523
3624import json
3725import os
3826import subprocess
39-import time
4027from pathlib import Path
4128
4229from anthropic import Anthropic
4330from dotenv import load_dotenv
4431
4532load_dotenv(override=True)
4633
4734if os.getenv("ANTHROPIC_BASE_URL"):
4835 os.environ.pop("ANTHROPIC_AUTH_TOKEN", None)
4936
5037WORKDIR = Path.cwd()
5138client = Anthropic(base_url=os.getenv("ANTHROPIC_BASE_URL"))
5239MODEL = os.environ["MODEL_ID"]
40+TASKS_DIR = WORKDIR / ".tasks"
5341
54-SYSTEM = f"You are a coding agent at {WORKDIR}. Use tools to solve tasks."
42+SYSTEM = f"You are a coding agent at {WORKDIR}. Use task tools to plan and track work."
5543
56-THRESHOLD = 50000
57-TRANSCRIPT_DIR = WORKDIR / ".transcripts"
58-KEEP_RECENT = 3
5944
45+# -- TaskManager: CRUD with dependency graph, persisted as JSON files --
46+class TaskManager:
47+ def __init__(self, tasks_dir: Path):
48+ self.dir = tasks_dir
49+ self.dir.mkdir(exist_ok=True)
50+ self._next_id = self._max_id() + 1
6051
61-def estimate_tokens(messages: list) -> int:
62- """Rough token count: ~4 chars per token."""
63- return len(str(messages)) // 4
52+ def _max_id(self) -> int:
53+ ids = [int(f.stem.split("_")[1]) for f in self.dir.glob("task_*.json")]
54+ return max(ids) if ids else 0
6455
56+ def _load(self, task_id: int) -> dict:
57+ path = self.dir / f"task_{task_id}.json"
58+ if not path.exists():
59+ raise ValueError(f"Task {task_id} not found")
60+ return json.loads(path.read_text())
6561
66-# -- Layer 1: micro_compact - replace old tool results with placeholders --
67-def micro_compact(messages: list) -> list:
68- # Collect (msg_index, part_index, tool_result_dict) for all tool_result entries
69- tool_results = []
70- for msg_idx, msg in enumerate(messages):
71- if msg["role"] == "user" and isinstance(msg.get("content"), list):
72- for part_idx, part in enumerate(msg["content"]):
73- if isinstance(part, dict) and part.get("type") == "tool_result":
74- tool_results.append((msg_idx, part_idx, part))
75- if len(tool_results) <= KEEP_RECENT:
76- return messages
77- # Find tool_name for each result by matching tool_use_id in prior assistant messages
78- tool_name_map = {}
79- for msg in messages:
80- if msg["role"] == "assistant":
81- content = msg.get("content", [])
82- if isinstance(content, list):
83- for block in content:
84- if hasattr(block, "type") and block.type == "tool_use":
85- tool_name_map[block.id] = block.name
86- # Clear old results (keep last KEEP_RECENT)
87- to_clear = tool_results[:-KEEP_RECENT]
88- for _, _, result in to_clear:
89- if isinstance(result.get("content"), str) and len(result["content"]) > 100:
90- tool_id = result.get("tool_use_id", "")
91- tool_name = tool_name_map.get(tool_id, "unknown")
92- result["content"] = f"[Previous: used {tool_name}]"
93- return messages
62+ def _save(self, task: dict):
63+ path = self.dir / f"task_{task['id']}.json"
64+ path.write_text(json.dumps(task, indent=2))
9465
66+ def create(self, subject: str, description: str = "") -> str:
67+ task = {
68+ "id": self._next_id, "subject": subject, "description": description,
69+ "status": "pending", "blockedBy": [], "blocks": [], "owner": "",
70+ }
71+ self._save(task)
72+ self._next_id += 1
73+ return json.dumps(task, indent=2)
9574
96-# -- Layer 2: auto_compact - save transcript, summarize, replace messages --
97-def auto_compact(messages: list) -> list:
98- # Save full transcript to disk
99- TRANSCRIPT_DIR.mkdir(exist_ok=True)
100- transcript_path = TRANSCRIPT_DIR / f"transcript_{int(time.time())}.jsonl"
101- with open(transcript_path, "w") as f:
102- for msg in messages:
103- f.write(json.dumps(msg, default=str) + "\n")
104- print(f"[transcript saved: {transcript_path}]")
105- # Ask LLM to summarize
106- conversation_text = json.dumps(messages, default=str)[:80000]
107- response = client.messages.create(
108- model=MODEL,
109- messages=[{"role": "user", "content":
110- "Summarize this conversation for continuity. Include: "
111- "1) What was accomplished, 2) Current state, 3) Key decisions made. "
112- "Be concise but preserve critical details.\n\n" + conversation_text}],
113- max_tokens=2000,
114- )
115- summary = response.content[0].text
116- # Replace all messages with compressed summary
117- return [
118- {"role": "user", "content": f"[Conversation compressed. Transcript: {transcript_path}]\n\n{summary}"},
119- {"role": "assistant", "content": "Understood. I have the context from the summary. Continuing."},
120- ]
75+ def get(self, task_id: int) -> str:
76+ return json.dumps(self._load(task_id), indent=2)
12177
78+ def update(self, task_id: int, status: str = None,
79+ add_blocked_by: list = None, add_blocks: list = None) -> str:
80+ task = self._load(task_id)
81+ if status:
82+ if status not in ("pending", "in_progress", "completed"):
83+ raise ValueError(f"Invalid status: {status}")
84+ task["status"] = status
85+ # When a task is completed, remove it from all other tasks' blockedBy
86+ if status == "completed":
87+ self._clear_dependency(task_id)
88+ if add_blocked_by:
89+ task["blockedBy"] = list(set(task["blockedBy"] + add_blocked_by))
90+ if add_blocks:
91+ task["blocks"] = list(set(task["blocks"] + add_blocks))
92+ # Bidirectional: also update the blocked tasks' blockedBy lists
93+ for blocked_id in add_blocks:
94+ try:
95+ blocked = self._load(blocked_id)
96+ if task_id not in blocked["blockedBy"]:
97+ blocked["blockedBy"].append(task_id)
98+ self._save(blocked)
99+ except ValueError:
100+ pass
101+ self._save(task)
102+ return json.dumps(task, indent=2)
122103
123-# -- Tool implementations --
104+ def _clear_dependency(self, completed_id: int):
105+ """Remove completed_id from all other tasks' blockedBy lists."""
106+ for f in self.dir.glob("task_*.json"):
107+ task = json.loads(f.read_text())
108+ if completed_id in task.get("blockedBy", []):
109+ task["blockedBy"].remove(completed_id)
110+ self._save(task)
111+
112+ def list_all(self) -> str:
113+ tasks = []
114+ for f in sorted(self.dir.glob("task_*.json")):
115+ tasks.append(json.loads(f.read_text()))
116+ if not tasks:
117+ return "No tasks."
118+ lines = []
119+ for t in tasks:
120+ marker = {"pending": "[ ]", "in_progress": "[>]", "completed": "[x]"}.get(t["status"], "[?]")
121+ blocked = f" (blocked by: {t['blockedBy']})" if t.get("blockedBy") else ""
122+ lines.append(f"{marker} #{t['id']}: {t['subject']}{blocked}")
123+ return "\n".join(lines)
124+
125+
126+TASKS = TaskManager(TASKS_DIR)
127+
128+
129+# -- Base tool implementations --
124130def safe_path(p: str) -> Path:
125131 path = (WORKDIR / p).resolve()
126132 if not path.is_relative_to(WORKDIR):
127133 raise ValueError(f"Path escapes workspace: {p}")
128134 return path
129135
130136def run_bash(command: str) -> str:
131137 dangerous = ["rm -rf /", "sudo", "shutdown", "reboot", "> /dev/"]
132138 if any(d in command for d in dangerous):
133139 return "Error: Dangerous command blocked"
134140 try:
135141 r = subprocess.run(command, shell=True, cwd=WORKDIR,
136142 capture_output=True, text=True, timeout=120)
137143 out = (r.stdout + r.stderr).strip()
138144 return out[:50000] if out else "(no output)"
139145 except subprocess.TimeoutExpired:
140146 return "Error: Timeout (120s)"
141147
142148def run_read(path: str, limit: int = None) -> str:
143149 try:
144150 lines = safe_path(path).read_text().splitlines()
145151 if limit and limit < len(lines):
146152 lines = lines[:limit] + [f"... ({len(lines) - limit} more)"]
147153 return "\n".join(lines)[:50000]
148154 except Exception as e:
149155 return f"Error: {e}"
150156
151157def run_write(path: str, content: str) -> str:
152158 try:
153159 fp = safe_path(path)
154160 fp.parent.mkdir(parents=True, exist_ok=True)
155161 fp.write_text(content)
156162 return f"Wrote {len(content)} bytes"
157163 except Exception as e:
158164 return f"Error: {e}"
159165
160166def run_edit(path: str, old_text: str, new_text: str) -> str:
161167 try:
162168 fp = safe_path(path)
163- content = fp.read_text()
164- if old_text not in content:
169+ c = fp.read_text()
170+ if old_text not in c:
165171 return f"Error: Text not found in {path}"
166- fp.write_text(content.replace(old_text, new_text, 1))
172+ fp.write_text(c.replace(old_text, new_text, 1))
167173 return f"Edited {path}"
168174 except Exception as e:
169175 return f"Error: {e}"
170176
171177
172178TOOL_HANDLERS = {
173- "bash": lambda **kw: run_bash(kw["command"]),
174- "read_file": lambda **kw: run_read(kw["path"], kw.get("limit")),
175- "write_file": lambda **kw: run_write(kw["path"], kw["content"]),
176- "edit_file": lambda **kw: run_edit(kw["path"], kw["old_text"], kw["new_text"]),
177- "compact": lambda **kw: "Manual compression requested.",
179+ "bash": lambda **kw: run_bash(kw["command"]),
180+ "read_file": lambda **kw: run_read(kw["path"], kw.get("limit")),
181+ "write_file": lambda **kw: run_write(kw["path"], kw["content"]),
182+ "edit_file": lambda **kw: run_edit(kw["path"], kw["old_text"], kw["new_text"]),
183+ "task_create": lambda **kw: TASKS.create(kw["subject"], kw.get("description", "")),
184+ "task_update": lambda **kw: TASKS.update(kw["task_id"], kw.get("status"), kw.get("addBlockedBy"), kw.get("addBlocks")),
185+ "task_list": lambda **kw: TASKS.list_all(),
186+ "task_get": lambda **kw: TASKS.get(kw["task_id"]),
178187}
179188
180189TOOLS = [
181190 {"name": "bash", "description": "Run a shell command.",
182191 "input_schema": {"type": "object", "properties": {"command": {"type": "string"}}, "required": ["command"]}},
183192 {"name": "read_file", "description": "Read file contents.",
184193 "input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "limit": {"type": "integer"}}, "required": ["path"]}},
185194 {"name": "write_file", "description": "Write content to file.",
186195 "input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "content": {"type": "string"}}, "required": ["path", "content"]}},
187196 {"name": "edit_file", "description": "Replace exact text in file.",
188197 "input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "old_text": {"type": "string"}, "new_text": {"type": "string"}}, "required": ["path", "old_text", "new_text"]}},
189- {"name": "compact", "description": "Trigger manual conversation compression.",
190- "input_schema": {"type": "object", "properties": {"focus": {"type": "string", "description": "What to preserve in the summary"}}}},
198+ {"name": "task_create", "description": "Create a new task.",
199+ "input_schema": {"type": "object", "properties": {"subject": {"type": "string"}, "description": {"type": "string"}}, "required": ["subject"]}},
200+ {"name": "task_update", "description": "Update a task's status or dependencies.",
201+ "input_schema": {"type": "object", "properties": {"task_id": {"type": "integer"}, "status": {"type": "string", "enum": ["pending", "in_progress", "completed"]}, "addBlockedBy": {"type": "array", "items": {"type": "integer"}}, "addBlocks": {"type": "array", "items": {"type": "integer"}}}, "required": ["task_id"]}},
202+ {"name": "task_list", "description": "List all tasks with status summary.",
203+ "input_schema": {"type": "object", "properties": {}}},
204+ {"name": "task_get", "description": "Get full details of a task by ID.",
205+ "input_schema": {"type": "object", "properties": {"task_id": {"type": "integer"}}, "required": ["task_id"]}},
191206]
192207
193208
194209def agent_loop(messages: list):
195210 while True:
196- # Layer 1: micro_compact before each LLM call
197- micro_compact(messages)
198- # Layer 2: auto_compact if token estimate exceeds threshold
199- if estimate_tokens(messages) > THRESHOLD:
200- print("[auto_compact triggered]")
201- messages[:] = auto_compact(messages)
202211 response = client.messages.create(
203212 model=MODEL, system=SYSTEM, messages=messages,
204213 tools=TOOLS, max_tokens=8000,
205214 )
206215 messages.append({"role": "assistant", "content": response.content})
207216 if response.stop_reason != "tool_use":
208217 return
209218 results = []
210- manual_compact = False
211219 for block in response.content:
212220 if block.type == "tool_use":
213- if block.name == "compact":
214- manual_compact = True
215- output = "Compressing..."
216- else:
217- handler = TOOL_HANDLERS.get(block.name)
218- try:
219- output = handler(**block.input) if handler else f"Unknown tool: {block.name}"
220- except Exception as e:
221- output = f"Error: {e}"
221+ handler = TOOL_HANDLERS.get(block.name)
222+ try:
223+ output = handler(**block.input) if handler else f"Unknown tool: {block.name}"
224+ except Exception as e:
225+ output = f"Error: {e}"
222226 print(f"> {block.name}: {str(output)[:200]}")
223227 results.append({"type": "tool_result", "tool_use_id": block.id, "content": str(output)})
224228 messages.append({"role": "user", "content": results})
225- # Layer 3: manual compact triggered by the compact tool
226- if manual_compact:
227- print("[manual compact]")
228- messages[:] = auto_compact(messages)
229229
230230
231231if __name__ == "__main__":
232232 history = []
233233 while True:
234234 try:
235- query = input("\033[36ms06 >> \033[0m")
235+ query = input("\033[36ms07 >> \033[0m")
236236 except (EOFError, KeyboardInterrupt):
237237 break
238238 if query.strip().lower() in ("q", "exit", ""):
239239 break
240240 history.append({"role": "user", "content": query})
241241 agent_loop(history)
242242 response_content = history[-1]["content"]
243243 if isinstance(response_content, list):
244244 for block in response_content:
245245 if hasattr(block, "text"):
246246 print(block.text)
247247 print()