Edward 3 files · 0 subfolders
Copy to Workspace Orchestration Layer Shared from "Edward" on Inkdown
Overview
Think of it like a conductor leading an orchestra:
Conductor = Orchestration layer
Musicians = LLM, Docker sandbox, file system, package manager, build tools
Music = The generated code
The orchestration layer makes sure everyone plays at the right time, in the right order.
2. The 5-Minute Overview
The Big Picture Plain text
User sends message in chat
↓
API validates + admits the request
↓
Queues work to background worker
↓
Worker starts stream session
↓
LLM streams response (chunk by chunk)
↓
Parser reads chunks → produces events
↓
Events trigger actions (write files, install deps, run commands)
↓
Loop continues until done
↓
Finalize + save results
The 3 Main Phases Plain text
┌─────────────────────────────────────────────────────────┐
│ PHASE 1: ADMISSION (API) │
│ - Validate user, check limits │
│ - Create run record │
│ - Queue to worker │
└─────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────┐
│ PHASE 2: EXECUTION (Worker) │
│ - Stream from LLM │
│ - Parse events │
│ - Execute side effects (files, installs, commands) │
│ - Multi-turn loop │
└─────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────┐
│ PHASE 3: FINALIZE (Worker) │
│ - Apply fixes │
│ - Validate output │
│ - Persist assistant message │
└─────────────────────────────────────────────────────────┘
3. Step-by-Step Flow
Step 1: User Sends Message File: apps/web/stores/chatStream/useStartStream.ts
Plain text
User types: "Create a todo app with React"
↓
Frontend sends POST /chat/message
↓
Opens SSE (Server-Sent Events) connection for streaming
Step 2: API Admission File: apps/api/services/runs/messageOrchestrator.service.ts
JavaScript
async function unifiedSendMessage (req, res ) {
const admissionWindow = await getRunAdmissionWindow ();
if (admissionWindow.overloaded ) {
return error ("System busy, try again" );
}
const userData = await getUserWithApiKey (userId);
const decryptedKey = decrypt (userData.apiKey );
const { chatId } = await getOrCreateChat (userId, body.chatId );
const userMessageId = await saveMessage (chatId, userId, "user" , content);
const workflow = await createWorkflow (userId, chatId, {
userRequest : content,
mode : "GENERATE" ,
});
const run = await createAdmittedRun ({
chatId,
userId,
userMessageId,
metadata : { workflow, model, ... },
});
await enqueueAgentRunJob ({ runId : run.id });
await streamRunEventsFromPersistence ({ res, runId : run.id });
}
Admission control prevents overload (global + per-user + per-chat limits)
Run is persisted BEFORE execution (durable)
Browser gets runId immediately for tracking
Step 3: Worker Picks Up Job File: apps/api/services/runs/agent-run-worker/processor.ts
JavaScript
async function processAgentRunJob (runId, publisher ) {
const run = await getRunById (runId);
if (isTerminalRunStatus (run.status )) {
return ;
}
const cancelSub = createRedisClient ();
await cancelSub.subscribe (`edward:run-cancel:${runId} ` );
cancelSub.on ("message" , () => {
workerAbort.abort ();
});
const userData = await getUserWithApiKey (run.userId );
const decryptedKey = decrypt (userData.apiKey );
await markRunRunningIfAdmissible (runId);
const capturedRes = createRunEventCaptureResponse (async (event) => {
await persistRunEvent (runId, event, publisher);
});
await runStreamSession ({
res : capturedRes,
workflow : metadata.workflow ,
decryptedApiKey : decryptedKey,
historyMessages,
});
await finalizeSuccessfulRun ({...});
}
Worker is independent (can restart without losing progress)
Cancel signal via Redis pub/sub (fast)
Events persisted to DB as they happen (resumable)
Step 4: Stream Session Setup File: apps/api/services/chat/session/orchestrator/runStreamSession.orchestrator.ts
JavaScript
async function runStreamSession (params ) {
let framework = await resolveFramework ({
workflow,
userRequest : userTextContent,
});
const { baseMessages } = await prepareBaseMessages ({
userTextContent,
isFollowUp,
historyMessages,
projectContext,
});
const systemPrompt = composePrompt ({
framework,
complexity,
mode,
profile : COMPACT ,
});
const tokenUsage = await computeTokenUsage ({
apiKey : decryptedApiKey,
systemPrompt,
messages : baseMessages,
model,
});
if (isOverContextLimit (tokenUsage)) {
sendError ("Context window exceeded" );
return ;
}
const loopResult = await runAgentLoop ({
decryptedApiKey,
initialMessages : baseMessages,
systemPrompt,
framework,
abortController,
generatedFiles : new Map (),
declaredPackages : [],
});
await applyDeterministicPostgenAutofixes ({
framework,
generatedFiles,
sandboxId : workflow.sandboxId ,
});
const retryResult = await maybeRunStrictPostgenRetry ({
violations : getBlockingPostgenViolations ({...}),
});
const finalized = await finalizeStreamSession ({
fullRawResponse : loopResult.fullRawResponse ,
generatedFiles,
});
}
Framework resolved before LLM call (better prompts)
Token budget checked BEFORE calling LLM (fail fast)
Post-generation validation + autofix (quality control)
Step 5: Agent Loop (Multi-Turn) File: apps/api/services/chat/session/loop/agentLoop.runner.ts
JavaScript
async function runAgentLoop (params ) {
let agentMessages = params.initialMessages ;
let agentTurn = 0 ;
let fullRawResponse = "" ;
let loopStopReason = null ;
while (agentTurn < MAX_AGENT_TURNS ) {
agentTurn += 1 ;
const turnTokenUsage = await computeTokenUsage ({
apiKey : decryptedApiKey,
systemPrompt,
messages : agentMessages,
model,
});
if (isOverContextLimit (turnTokenUsage)) {
loopStopReason = "CONTEXT_LIMIT_EXCEEDED" ;
break ;
}
const turnResult = await executeAgentTurnStream ({
decryptedApiKey,
agentMessages,
systemPrompt,
abortController,
turn : agentTurn,
});
fullRawResponse = turnResult.fullRawResponse ;
const outcome = await resolveTurnOutcome ({
agentTurn,
turnRawResponse : turnResult.turnRawResponse ,
toolResultsThisTurn : turnResult.toolResultsThisTurn ,
budgetState : turnResult.budgetState ,
});
if (outcome.action === "break" ) {
loopStopReason = outcome.loopStopReason ;
break ;
}
agentMessages = outcome.agentMessages ;
}
return {
fullRawResponse,
agentTurn,
loopStopReason,
};
}Plain text
Turn 1: LLM says "I'll create App.jsx"
→ Writes App.jsx
→ No <done> tag yet
Turn 2: LLM says "Now I'll add styles.css"
→ Writes styles.css
→ Still no <done>
Turn 3: LLM says "Done!"
→ Emits <edward_done />
→ Loop exits
Tools were called but no file output yet
No <done> tag received
Under turn budget
Code/file output detected
<done> tag received
Tool budget exceeded
Max turns reached
Client aborted
Step 6: Turn Execution (Stream + Parse) File: apps/api/services/chat/session/loop/agentLoop.stream.ts
JavaScript
async function executeAgentTurnStream (params ) {
const parser = createStreamParser ();
const stream = streamResponse (
params.decryptedApiKey ,
params.agentMessages ,
params.abortController .signal ,
params.systemPrompt ,
params.framework ,
params.model ,
);
let turnRawResponse = "" ;
const toolResultsThisTurn = [];
for await (const chunk of stream) {
if (params.abortController .signal .aborted ) {
break ;
}
turnRawResponse += chunk;
const events = parser.process (chunk);
await processParserEvents ({
events,
turnState,
budgetState,
toolResultsThisTurn,
context : parserContext,
});
if (hasAnyTurnBudgetExceeded (budgetState)) {
break ;
}
}
return {
fullRawResponse : turnRawResponse,
toolResultsThisTurn,
budgetState,
};
}
Chunks processed as they arrive (not waiting for full response)
Parser converts raw text → structured events
Events trigger immediate side effects
Step 7: Parser State Machine File: apps/api/lib/llm/parser.ts
JavaScript
function createStreamParser ( ) {
const context = {
state : "TEXT" ,
buffer : "" ,
};
function process (chunk ) {
context.buffer += chunk;
let events = [];
let iterations = 0 ;
while (context.buffer .length > 0 && iterations < MAX_ITERATIONS ) {
handleState (events);
iterations++;
}
return events;
}
function handleState (events ) {
switch (context.state ) {
case "TEXT" :
if (buffer.includes ("<thinking>" )) {
context.state = "THINKING" ;
events.push ({ type : "THINKING_START" });
}
if (buffer.includes ("<edward_sandbox>" )) {
context.state = "SANDBOX" ;
events.push ({ type : "SANDBOX_START" });
}
if (buffer.includes ("<file path=" )) {
context.state = "FILE" ;
const path = extractPath (buffer);
events.push ({ type : "FILE_START" , path });
}
break ;
case "THINKING" :
if (buffer.includes ("</thinking>" )) {
context.state = "TEXT" ;
events.push ({ type : "THINKING_END" });
}
break ;
case "FILE" :
if (buffer.includes ("</file>" )) {
context.state = "TEXT" ;
events.push ({ type : "FILE_END" });
}
break ;
}
}
return { process, flush };
}State Trigger Exit TEXT Default <thinking>, <edward_sandbox>, <file>THINKING <thinking></thinking>SANDBOX <edward_sandbox></edward_sandbox>FILE <file path="..."></file>INSTALL <install></install>
Chunks can split tags across boundaries
Need to handle incomplete output safely
Can't just regex over full string
Step 8: Event Handler (Side Effects) File: apps/api/services/chat/session/events/handler.ts
JavaScript
async function handleParserEvent (ctx, event ) {
switch (event.type ) {
case "SANDBOX_START" :
if (!ctx.workflow .sandboxId ) {
await ensureSandbox (ctx.workflow );
}
break ;
case "FILE_START" :
await prepareSandboxFile (ctx.workflow .sandboxId , event.path );
ctx.currentFilePath = event.path ;
ctx.generatedFiles .set (event.path , "" );
break ;
case "FILE_CONTENT" :
await handleFileContent (
ctx.workflow .sandboxId ,
ctx.currentFilePath ,
event.content ,
ctx.isFirstFileChunk ,
);
ctx.generatedFiles .set (
ctx.currentFilePath ,
ctx.generatedFiles .get (ctx.currentFilePath ) + event.content
);
break ;
case "FILE_END" :
await sanitizeSandboxFile (ctx.workflow .sandboxId , ctx.currentFilePath );
ctx.currentFilePath = undefined ;
break ;
case "SANDBOX_END" :
await flushSandbox (ctx.workflow .sandboxId );
break ;
case "INSTALL_CONTENT" :
ctx.installTaskQueue .enqueue (async () => {
await handleInstallContent (ctx, event.dependencies );
});
break ;
case "COMMAND" :
await ctx.installTaskQueue ?.waitForIdle ();
await handleCommandEvent (ctx, event.command , event.args );
break ;
case "WEB_SEARCH" :
await handleWebSearchEvent (ctx, event.query , event.maxResults );
break ;
}
}Event Action SANDBOX_START Provision Docker container FILE_START Prepare file path FILE_CONTENT Buffer to Redis FILE_END Sanitize file SANDBOX_END Flush buffers to disk INSTALL_CONTENT Queue npm install COMMAND Run shell command WEB_SEARCH Search web
Step 9: Sandbox Write Flow (Buffered) File: apps/api/services/sandbox/write/buffer.ts + flush.ts
Write (Buffered to Redis) JavaScript
async function writeSandboxFile (sandboxId, filePath, content ) {
const bufferKey = `buffer:${sandboxId} :${filePath} ` ;
const filesSetKey = `files:${sandboxId} ` ;
const pipeline = redis.pipeline ();
pipeline.append (bufferKey, content);
pipeline.sadd (filesSetKey, filePath);
await pipeline.exec ();
scheduleSandboxFlush (sandboxId);
}
Flush (Redis → Container) JavaScript
async function flushSandbox (sandboxId ) {
const handle = await acquireDistributedLock (`flush:${sandboxId} ` );
const filePaths = await redis.smembers (`files:${sandboxId} ` );
for (const filePath of filePaths) {
const content = await redis.get (`buffer:${sandboxId} :${filePath} ` );
const exec = await container.exec ({
Cmd : ["sh" , "-c" , `cat >> '/app/${filePath} '` ],
AttachStdin : true ,
});
const stream = await exec.start ({ hijack : true });
stream.write (content);
stream.end ();
await redis.del (`buffer:${sandboxId} :${filePath} ` );
}
await releaseDistributedLock (handle);
}Plain text
Without buffering:
Write chunk 1 → Docker exec
Write chunk 2 → Docker exec
Write chunk 3 → Docker exec
(Slow, many round trips)
With buffering:
Write chunk 1 → Redis (fast)
Write chunk 2 → Redis (fast)
Write chunk 3 → Redis (fast)
Flush once → Docker exec
(Fast, one round trip)
Resilient to partial failures
Can batch multiple writes
Can replay/repair on failure
Step 10: Install Task Queue File: apps/api/services/chat/session/loop/agentLoop.runner.ts
JavaScript
let installQueueTail = Promise .resolve ();
const installTaskQueue = {
enqueue (task ) {
const queuedTask = installQueueTail.then (task, task);
installQueueTail = queuedTask.catch (() => undefined );
},
async waitForIdle ( ) {
await installQueueTail;
},
};
case "INSTALL_CONTENT" :
installTaskQueue.enqueue (async () => {
await execCommand (sandboxId, "npm install react" );
});
break ;
case "COMMAND" :
await installTaskQueue.waitForIdle ();
await execCommand (sandboxId, "npm run build" );
break ;Plain text
Without serialization:
npm install react (concurrent)
npm install lodash (concurrent)
→ Race conditions, lock file conflicts
With serialization:
npm install react (wait...)
npm install lodash (wait...)
→ Clean, sequential installs
Step 11: Turn Outcome Decision File: apps/api/services/chat/session/loop/agentLoop.turnOutcome.ts
JavaScript
async function resolveTurnOutcome (params ) {
if (toolBudgetExceededThisTurn) {
return { action : "break" , reason : "TOOL_BUDGET_EXCEEDED" };
}
if (codeOutputDetected) {
return { action : "break" , reason : "DONE" };
}
if (toolResultsThisTurn.length > 0 && !codeOutputDetected) {
const continuationPrompt = buildAgentContinuationPrompt (
userContent,
turnRawResponse,
toolResultsThisTurn,
);
return {
action : "continue" ,
agentMessages : [{ role : "user" , content : continuationPrompt }]
};
}
if (doneTagDetectedThisTurn) {
return { action : "break" , reason : "DONE" };
}
if (isConversationalReply) {
return { action : "break" , reason : "DONE" };
}
if (noProgressContinuations < MAX_NO_PROGRESS_CONTINUATIONS ) {
const nudgePrompt = buildNoProgressContinuationPrompt ();
return {
action : "continue" ,
agentMessages : [{ role : "user" , content : nudgePrompt }]
};
}
return { action : "break" , reason : "NO_TOOL_RESULTS" };
}Plain text
┌─────────────────┐
│ Turn Complete │
└────────┬────────┘
│
┌────────▼────────┐
│ Budget Exceeded?│──Yes──→ BREAK
└────────┬────────┘
│ No
┌────────▼────────┐
│ Code Output? │──Yes──→ BREAK
└────────┬────────┘
│ No
┌────────▼────────┐
│ Tools Called? │──Yes──→ CONTINUE (with results)
└────────┬────────┘
│ No
┌────────▼────────┐
│ <done> Tag? │──Yes──→ BREAK
└────────┬────────┘
│ No
┌────────▼────────┐
│ Conversational? │──Yes──→ BREAK
└────────┬────────┘
│ No
┌────────▼────────┐
│ Can Nudge? │──Yes──→ CONTINUE (nudge)
└────────┬────────┘
│ No
↓
BREAK
Step 12: Finalize File: apps/api/services/chat/session/orchestrator/runStreamSession.finalize.ts
JavaScript
async function finalizeStreamSession (params ) {
const assistantContent = buildAssistantMessageContent ({
fullRawResponse : params.fullRawResponse ,
generatedFiles : params.generatedFiles ,
declaredPackages : params.declaredPackages ,
});
await saveMessage (
params.chatId ,
params.userId ,
"assistant" ,
assistantContent,
);
emitMeta ({
phase : "SESSION_COMPLETE" ,
outputTokens : params.outputTokens ,
duration : Date .now () - params.messageStartTime ,
});
return { storedAssistantContent : assistantContent };
}
4. Deep Dive: Each Layer
Layer 1: Message Orchestrator Purpose: Admission control + queue + stream handoff
unifiedSendMessage() - Entry point
createAdmittedRun() - Create run with limits
enqueueAdmittedRun() - Queue to worker
streamRunEventsFromPersistence() - SSE to browser
API key decryption fails
Model/provider mismatch
Run admission rejected (limits)
Queue enqueue fails
Layer 2: Stream Session Purpose: Framework resolve + message prep + token budget + finalize
resolveFramework() - Detect/prefer framework
prepareBaseMessages() - Build LLM context
composePrompt() - System prompt
computeTokenUsage() - Budget check
finalizeStreamSession() - Persist results
Context limit exceeded
Framework detection fails
Finalize persistence fails
Layer 3: Agent Loop Purpose: Multi-turn execution + outcome decisions
runAgentLoop() - Main loop
executeAgentTurnStream() - Single turn
resolveTurnOutcome() - Continue/stop decision
Turn budget exceeded
Max turns reached
Abort signal received
Continuation prompt fails
Layer 4: Parser Purpose: Chunk → event conversion
createStreamParser() - State machine
process() - Parse chunk
flush() - Handle incomplete output
Tag split across chunks
Incomplete output
State machine stuck
Layer 5: Event Handler Purpose: Side effect execution
handleParserEvent() - Dispatch by type
handleFileContent() - Buffer writes
handleInstallContent() - Queue installs
handleCommandEvent() - Run commands
Sandbox not provisioned
File write fails
Install conflicts
Command timeout
Layer 6: Sandbox Write Purpose: Buffered writes to container
writeSandboxFile() - Buffer to Redis
flushSandbox() - Redis → container
scheduleSandboxFlush() - Debounced flush
Redis unavailable
Docker exec fails
Lock acquisition fails
Container stopped
5. Key Files to Read
Core Orchestration File Purpose apps/api/services/runs/messageOrchestrator.service.tsEntry point apps/api/services/runs/agent-run-worker/processor.tsWorker execution apps/api/services/chat/session/orchestrator/runStreamSession.orchestrator.tsStream session apps/api/services/chat/session/loop/agentLoop.runner.tsAgent loop apps/api/services/chat/session/loop/agentLoop.stream.tsTurn execution
Parser + Events File Purpose apps/api/lib/llm/parser.tsState machine parser apps/api/services/chat/session/events/handler.tsEvent side effects apps/api/services/chat/session/loop/events.tsEvent processing apps/api/services/chat/session/loop/agentLoop.turnOutcome.tsContinue/stop logic
Sandbox File Purpose apps/api/services/sandbox/write/buffer.tsRedis buffering apps/api/services/sandbox/write/flush.tsFlush to container apps/api/services/sandbox/write/flush.scheduler.tsDebounced flush apps/api/services/chat/file.handlers.tsFile content handling
6. Common Questions
Q: Why multi-turn loop instead of one LLM call? A: Complex tasks need multiple steps:
Plain text
One call approach:
User: "Build a todo app"
LLM: [tries to output everything at once]
→ Context overflow, messy output
Multi-turn approach:
Turn 1: Create App.jsx
Turn 2: Create styles.css
Turn 3: Create utils.js
Turn 4: <done>
→ Clean, bounded, verifiable
Q: Why buffer writes to Redis instead of writing directly?
Resilience: If Docker fails, buffer survives in Redis
Performance: One flush vs many small writes
Batching: Multiple chunks → one file write
Q: Why serialize installs? A: Prevent race conditions:
Plain text
Concurrent installs:
npm install react &
npm install lodash &
→ package-lock.json conflicts
→ Corrupted node_modules
Serialized installs:
npm install react (wait)
npm install lodash (wait)
→ Clean state
Q: How does cancellation work?
Redis pub/sub (fast):
Plain text
Browser → POST /cancel → Redis publish
Worker subscribes → receives signal → abort
DB polling (backup):
Plain text
Worker polls run.status every N seconds
If status = CANCELLED → abort
Q: What happens if worker crashes mid-turn? A: Checkpoint system allows resume:
Plain text
Turn 1: Complete ✓ (checkpoint saved)
Turn 2: Worker crashes ✗
↓
Worker restarts → loads checkpoint → resumes from Turn 3
Q: How are tokens budgeted? Plain text
Level 1: Context window
- computeTokenUsage() before LLM call
- Fail if over limit
Level 2: Turn tool budget
- MAX_AGENT_TOOL_CALLS_PER_TURN (e.g., 5)
- Break turn if exceeded
Level 3: Run tool budget
- MAX_AGENT_TOOL_CALLS_PER_RUN (e.g., 20)
- Break run if exceeded
Level 4: Turn count
- MAX_AGENT_TURNS (e.g., 10)
- Break loop if exceeded
Q: How does framework detection work? Plain text
1. User explicit request:
"Create a Next.js app" → framework = "nextjs"
2. Existing sandbox:
Sandbox has package.json with "next" → framework = "nextjs"
3. Workflow inference:
Planning workflow suggests framework based on request
Q: What's the difference between run_event and message? Plain text
message table:
- User/assistant conversation history
- Final output visible to user
- Queried for chat UI
run_event table:
- Execution trace (stream events)
- Used for replay/resume
- Debugging + audit trail
7. Debugging Guide
Trace a Turn Plain text
1. Check worker logs for turn start
→ "Agent turn 1 started"
2. Check LLM stream chunks
→ Chunk 1, Chunk 2, ...
3. Check parser events
→ FILE_START, FILE_CONTENT, FILE_END
4. Check event handler
→ "Writing file: App.jsx"
5. Check sandbox flush
→ "Flushed 3 files to sandbox"
6. Check turn outcome
→ "Turn 1 complete: codeOutputDetected=true"
Common Failures Symptom Likely Cause Fix Context limit exceeded Too much history Truncate context Tool budget exceeded Too many tool calls Reduce per-turn limit Turn stuck in loop No code output detected Check parser, tags Files not written Flush failed Check Redis, Docker Install conflicts Concurrent installs Check queue serialization
8. Summary
The Orchestration Flow in One Diagram Plain text
┌─────────────────────────────────────────────────────────────┐
│ USER sends message │
└────────────────────┬────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ API: Admission Control │
│ - Validate, check limits, create run, enqueue │
└────────────────────┬────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ WORKER: Process Agent Run │
│ - Load context, subscribe to cancel, mark running │
└────────────────────┬────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ STREAM SESSION: Setup │
│ - Resolve framework, prepare messages, check tokens │
└────────────────────┬────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ AGENT LOOP: Multi-Turn Execution │
│ ┌───────────────────────────────────────────────────────┐ │
│ │ Turn 1: Stream → Parse → Execute → Decide │ │
│ │ Turn 2: Stream → Parse → Execute → Decide │ │
│ │ Turn 3: Stream → Parse → Execute → Decide │ │
│ │ ... continue until done ... │ │
│ └───────────────────────────────────────────────────────┘ │
└────────────────────┬────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ POST-GENERATION: Validate + Fix │
│ - Apply autofixes, validate, maybe retry │
└────────────────────┬────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ FINALIZE: Persist Results │
│ - Save assistant message, emit metrics │
└─────────────────────────────────────────────────────────────┘
Key Takeaways
Orchestration is layered - Each layer has a clear responsibility
Multi-turn is essential - Complex tasks need iteration
Buffering matters - Redis buffers make writes resilient
Budgets prevent runaway - Token, tool, turn limits
Events are durable - Persisted for replay/resume
Cancellation is dual - Pub/sub + DB polling