InkdownInkdown
Start writing

Study

59 files·8 subfolders

Shared Workspace

Study
core

10-TaskQueue

Shared from "Study" on Inkdown

Task Queue Architecture

Overview

The task scheduling system allows users to create recurring or one-time tasks that execute in the background. It uses Google Cloud Tasks for reliable scheduling with Firebase for persistence.


Architecture

Plain text
User Request
    ↓
Task Detection (LLM)
    ↓
┌─────────────────────────────────────────────────────────────────┐
│                   TASK CREATION FLOW                             │
│                                                                  │
│  1. LLM decides: TASK_CREATION / TASK_UPDATE / REFUSAL          │
│  2. Generate task details (title, description, purpose)         │
│  3. Build schedule using LLM + RRULE                            │
│  4. Save to Firebase (tasks collection)                         │
│  5. Schedule in Google Cloud Tasks                             │
│       ├─ ≤30 days: Schedule at exact time                      │
│       └─ >30 days: Schedule "wake-up" in 30 days               │
└─────────────────────────────────────────────────────────────────┘
    ↓
┌─────────────────────────────────────────────────────────────────┐
│                   TASK EXECUTION FLOW                            │
│                                                                  │
│  Google Cloud Tasks                                             │
│       ↓                                                         │
│  HTTP POST to /v1/tasks/execute                                 │
│       ↓                                                         │
│  Auth via secret token                                          │
│       ↓                                                         │
│  Reconstruct conversation context                               │
│       ↓                                                         │
│  Execute via main orchestrator                                │
│       ↓                                                         │
│  Save result to Firebase (executions subcollection)            │
│       ↓                                                         │
│  Schedule next occurrence (recurring tasks)                     │
└─────────────────────────────────────────────────────────────────┘
programming-language-concepts.md
zero-language-explanation.md
DB
01-introduction.md
02-relational-databases.md
03-database-design.md
04-indexing.md
05-transactions-acid.md
06-nosql-databases.md
07-query-optimization.md
08-replication-ha.md
09-sharding-partitioning.md
10-caching-strategies.md
11-cap-theorem.md
12-connection-pooling.md
13-backup-recovery.md
14-monitoring.md
15-database-selection.md
README.md
JS
Event loop
Merlin Backend
01-Orchestration.md
02-DeepResearch.md
03-Search.md
04-Scraping.md
05-Streaming.md
06-MultiProviderLLM.md
07-MemoryAndContext.md
08-ErrorHandling.md
09-RateLimiting.md
10-TaskQueue.md
11-SecurityAndAuth.md
Orchestration-2nd-draft
OpenAI Agents Python
00_OVERVIEW.md
01_AGENT_SYSTEM.md
02_RUNNER_SYSTEM.md
03_TOOL_SYSTEM.md
04_ITEMS_SYSTEM.md
05_GUARDRAILS.md
06_HANDOFFS.md
07_MEMORY_SESSIONS.md
08_MODEL_PROVIDERS.md
09_SANDBOX_SYSTEM.md
10_TRACING.md
11_RUN_STATE.md
12_CONTEXT.md
13_LIFECYCLE_HOOKS.md
14_CONFIGURATION.md
15_ERROR_HANDLING.md
16_STREAMING.md
17_EXTENSIONS.md
18_MCP_INTEGRATION.md
19_BEST_PRACTICES.md
20_ARCHITECTURE_PATTERNS.md
opencode-study
context-handling
core
Python
Alembic
Basics
sqlalchemy - fastapi
SQLAlchemy overview
tweets
system_design_for_agentic_apps.md

Google Cloud Tasks Setup

File: src/server/services/gcloudTasks.ts

TypeScript
import { CloudTasksClient } from "@google-cloud/tasks";

const PROJECT_ID = "foyer-work";
const LOCATION = "us-west1";
const QUEUE_NAME = "merlin-tasks";

export const tasksQueue = new CloudTasksClient({
	projectId: PROJECT_ID,
});

export const CLOUD_TASKS_QUEUE_PATH = tasksQueue.queuePath(
	PROJECT_ID,
	LOCATION,
	QUEUE_NAME,
);

export const CLOUD_TASK_NAME_PREFIX = `projects/${PROJECT_ID}/locations/${LOCATION}/queues/${QUEUE_NAME}/tasks`;

Task Creation Flow

File: src/server/utilities/tasks/baseUtils.ts:303

TypeScript
export const createOrUpdateTaskUsingChatId = async (input: {
	chatId: string;
	chatFragment: TPromptMessage[];
	query: TPromptMessage;
	chatStateManager: ChatStateManager;
	pastTasks?: TTaskDB[];
}): Promise<TTaskDB> => {
	const { user, userInstance, request, assistantMessageNode, eventManager } =
		requestContext.get();

	// Get user's local time
	const currentUserDate = getDateFromRequest();

	// Prepare LLM input
	const taskLLMInput = {
		messageHistory: input.chatFragment
			.map(
				(fragment) =>
					`${fragment.role}: ${Message.getMessageV2ContentAsString(fragment.content)}`,
			)
			.join("\n"),
		finalUserQuery: Message.getMessageV2ContentAsString(input.query.content),
		currentTimeInMillis: Date.now(),
		pastTasks: input.pastTasks,
		currentTime: currentUserDate,
	};

	// PHASE 1: Decide task flow
	const taskDecisionEvent = eventManager.createEvent(
		"Understanding Your Task",
		"BOLT",
	);
	const taskFlowDecisionResponse = await decideTaskFlow(taskLLMInput);
	taskDecisionEvent.end();

	// Show past tasks if updating
	if (input.pastTasks?.length > 0) {
		const pastTasksEvent = eventManager.createEvent(
			"Understanding Past Tasks",
			"SEARCH",
		);
		pastTasksEvent.send(
			input.pastTasks.map((task) => ({ value: task.title, type: "TEXT" })),
		);
		pastTasksEvent.end();
	}

	// Handle refusal (ambiguous request)
	if (taskFlowDecisionResponse.refusal) {
		throw new PromptError(
			ErrorType.INCOMPLETE_TASK,
			taskFlowDecisionResponse.inferenceFailureDetails,
		);
	}

	// Calculate query cost for this task
	const modelConfig = getLLMModel(schema.schema.model);
	const queryPerExecution = calculateQueryPerExecution(
		modelConfig,
		input.chatStateManager,
	);

	// PHASE 2A: TASK UPDATE
	if (taskFlowDecisionResponse.mode === "TASK_UPDATE") {
		return await handleTaskUpdate(
			taskFlowDecisionResponse,
			input,
			currentUserDate,
			queryPerExecution,
		);
	}

	// PHASE 2B: TASK CREATION
	if (taskFlowDecisionResponse.mode === "TASK_CREATION") {
		return await handleTaskCreation(
			taskLLMInput,
			input,
			currentUserDate,
			queryPerExecution,
		);
	}

	throw new Error("Invalid task flow decision mode");
};
Task Creation Handler
TypeScript
const handleTaskCreation = async (
	taskLLMInput,
	input,
	currentUserDate,
	queryPerExecution,
) => {
	const taskCreationEvent = eventManager.createEvent("Creating A Task", "BOLT");

	// Parallel: Generate task + Build schedule
	const [createdTask, generatedTaskSchedule] = await settleAllOrThrow([
		generateTask(taskLLMInput), // LLM generates title, description, purpose
		buildTaskScheduleUsingLLM({
			// LLM creates RRULE
			currentTime: taskLLMInput.currentTime,
			currentTimeInMillis: taskLLMInput.currentTimeInMillis,
			finalUserQuery: taskLLMInput.finalUserQuery,
			messageHistory: taskLLMInput.messageHistory,
		}),
	]);

	// Handle refusals
	if (createdTask.refusal) {
		throw new PromptError(
			ErrorType.INCOMPLETE_TASK,
			createdTask.inferenceFailureDetails,
		);
	}
	if (generatedTaskSchedule.refusal) {
		throw new PromptError(
			ErrorType.INCOMPLETE_TASK,
			generatedTaskSchedule.inferenceFailureDetails,
		);
	}

	// Calculate next execution time
	const nextExecutionTime = generatedTaskSchedule?.dtstart
		? getZoneAdjustedTimestampFromISO(
				generatedTaskSchedule.dtstart,
				currentUserDate.zone.name,
			)
		: Timestamp.fromMillis(currentUserDate.toMillis() + FIVE_MINS_MS); // Default: 5 min

	// Validate minimum schedule time
	const now = new Date();
	const diffMs = nextExecutionTime.toDate().getTime() - now.getTime();
	if (diffMs < MIN_ALLOWED_TASK_SCHEDULE_MS) {
		throw new PromptError(ErrorType.INCOMPLETE_TASK, {
			fields: ["nextExecutionTime"],
			reason: `Setting execution time less than ${MIN_ALLOWED_TASK_SCHEDULE_MS}ms`,
		});
	}

	// Build task object
	const task: TTaskDB = {
		id: randomUUID(),
		title: createdTask.taskTitle,
		description: createdTask.taskDescription,
		isOneTimeExecution: generatedTaskSchedule.isOneTimeExecution,
		status: createdTask.taskStatus,
		createdAt: Timestamp.fromDate(currentUserDate.toJSDate()),
		updatedAt: Timestamp.fromDate(currentUserDate.toJSDate()),
		createdByUserId: user.uid,
		rruleExpression: generatedTaskSchedule.rruleExpression,
		nextExecutionTime,
		createdByChatId: input.chatId,
		purposeOfTask: createdTask.purposeOfTask,
		category: createdTask?.inferredCategory ?? "OTHERS",
		currentCloudTaskId: `${CLOUD_TASK_NAME_PREFIX}/${randomUUID()}`,
		currentPayload: request,
		delivery: ["EMAIL", "WEB", "MOBILE"],
		deliveryTitle: createdTask.deliveryTitle,
		queryPerExecution,
		currentChatMessageId: assistantMessageNode.id,
		timezone: currentUserDate.zone.name,
	};

	// Show task in UI
	taskCreationEvent.send([{ value: task.title, type: "TEXT" }]);

	// Schedule in Google Cloud Tasks
	const isWakeUpCall = diffMs > MAX_CLOUD_TASK_SCHEDULE_MS; // >30 days
	const scheduleTime = isWakeUpCall
		? Timestamp.fromMillis(
				DateTime.now()
					.setZone(task.timezone)
					.plus({ milliseconds: MAX_CLOUD_TASK_SCHEDULE_MS })
					.toMillis(),
			)
		: task.nextExecutionTime;

	const requestBody = getScheduledTaskRequestBody({
		...task.currentPayload,
		message: {
			...task.currentPayload.message,
			context: "",
			content: `Task Purpose: ${task.purposeOfTask}`,
			childId: randomUUID(),
			id: randomUUID(),
		},
		metadata: { ...task.currentPayload.metadata, webAccess: true },
		decodedToken: {
			uid: userInstance.decodedToken.uid,
			name: userInstance.decodedToken.name,
		},
		secret: TASKS_EXECUTOR_API.SECRET,
		taskId: task.id,
		chatId: input.chatId,
		purposeOfTask: task.purposeOfTask,
		scheduleTime,
		isAWakeUpCall: isWakeUpCall,
	});

	const [cloudTask] = await tasksQueue.createTask(requestBody);
	task.currentCloudTaskId =
		cloudTask.name ?? `${CLOUD_TASK_NAME_PREFIX}/${randomUUID()}`;

	// Save to Firebase
	await getTaskRef(task.id).set(task);
	taskCreationEvent.end();

	return task;
};

RRULE Schedule Generation

File: src/server/utilities/tasks/functions/iCalSchedule.ts

TypeScript
export const buildTaskScheduleUsingLLM = async ({
	currentTime,
	currentTimeInMillis,
	finalUserQuery,
	messageHistory,
}: ScheduleInput) => {
	// LLM generates natural language schedule
	const llmResponse = await generateScheduleFromLLM({
		currentTime,
		currentTimeInMillis,
		finalUserQuery,
		messageHistory,
	});

	if (llmResponse.refusal) {
		return {
			refusal: true,
			inferenceFailureDetails: llmResponse.inferenceFailureDetails,
		};
	}

	// Convert to RRULE
	const rruleString = parseScheduleToRRule(llmResponse.schedule, currentTime);

	return {
		isOneTimeExecution: llmResponse.isOneTimeExecution,
		rruleExpression: rruleString,
		dtstart: llmResponse.dtstart, // ISO string
		refusal: false,
	};
};

// Parse natural language to RRULE
const parseScheduleToRRule = (
	schedule: string,
	currentTime: DateTime,
): string => {
	// Examples:
	// "every day at 9am" → "FREQ=DAILY;BYHOUR=9;BYMINUTE=0"
	// "every Monday and Friday" → "FREQ=WEEKLY;BYDAY=MO,FR"
	// "on the 1st of every month" → "FREQ=MONTHLY;BYMONTHDAY=1"

	const rrule = parse(schedule); // Uses rrule library
	return rrule.toString();
};

RRULE Examples:

TypeScript
// Daily at 9:00 AM
"FREQ=DAILY;BYHOUR=9;BYMINUTE=0";

// Weekly on Monday and Friday
"FREQ=WEEKLY;BYDAY=MO,FR;BYHOUR=9;BYMINUTE=0";

// Monthly on the 1st
"FREQ=MONTHLY;BYMONTHDAY=1;BYHOUR=9;BYMINUTE=0";

// One-time (no RRULE)
"";

Wake-Up Call Pattern

Problem: Google Cloud Tasks max schedule is 30 days. Solution: Chain of wake-up calls for longer schedules.

Plain text
User: "Remind me every year on my birthday"
    ↓
Schedule: "FREQ=YEARLY;BYMONTH=5;BYMONTHDAY=15"
    ↓
Next execution: May 15, 2025 (>30 days away)
    ↓
Action:
  1. Schedule "wake-up" 30 days from now
  2. Wake-up reschedules the real task
  3. If still >30 days, chain another wake-up

Timeline:
Now ──30 days──► Wake-up 1 ──30 days──► Wake-up 2 ──...──► Actual Task

Code:

TypeScript
const MAX_CLOUD_TASK_SCHEDULE_MS = 30 * 24 * 60 * 60 * 1000; // 30 days

const now = new Date();
const diffMs = task.nextExecutionTime.toDate().getTime() - now.getTime();

const isWakeUpCall = diffMs > MAX_CLOUD_TASK_SCHEDULE_MS;
const scheduleTime = isWakeUpCall
	? Timestamp.fromMillis(
			DateTime.now()
				.setZone(task.timezone)
				.plus({ milliseconds: MAX_CLOUD_TASK_SCHEDULE_MS })
				.toMillis(),
		)
	: task.nextExecutionTime;

// In task execution handler
if (input.isAWakeUpCall) {
	// Just reschedule the real task
	await scheduleNextExecution(task);
	return;
}

Task Execution

File: src/server/endpoints/tasks/executeTask.ts

TypeScript
export const executeTaskEndpoint = endpoint({
	method: "post",
	path: "/v1/tasks/execute",
	middlewares: [authMiddlewareForTaskExecutionContext],
	...taskExecutionSchema,
	handler: async ({ input }) => {
		const { taskId, chatId, purposeOfTask, isAWakeUpCall } = input;

		// Get task from Firebase
		const task = await getTask(taskId);

		// Handle wake-up call
		if (isAWakeUpCall) {
			await scheduleNextExecution(task);
			return { success: true, isWakeUpCall: true };
		}

		// Reconstruct conversation context
		const chatStateManager = new ChatStateManager(input.request);
		chatStateManager.setMode("TASK_EXECUTION");

		// Execute via orchestrator
		const result = await executeChat({
			messages: [
				{
					role: "user",
					content: [{ type: "TEXT", text: purposeOfTask }],
				},
			],
			chatStateManager,
			isTaskExecution: true,
		});

		// Save execution record
		await addTaskExecution({
			taskId,
			content: result.content,
			chatId,
			messageId: result.messageId,
			userId: task.createdByUserId,
			queries: result.usage.queries,
			progress: result.progress,
			isDelivered: false, // User notification pending
		});

		// Schedule next occurrence (recurring)
		if (!task.isOneTimeExecution) {
			await scheduleNextExecution(task);
		} else {
			await updateTask({ id: taskId, status: "COMPLETED" });
		}

		return { success: true, executionId };
	},
});
Next Execution Calculation

File: src/server/utilities/tasks/baseUtils.ts:109

TypeScript
const getNextRunTimeFromCron = (
	rruleExpression: string,
	prevTask: TTaskDB,
	dtstart?: Timestamp,
): Date => {
	if (prevTask.isOneTimeExecution) {
		// One-time: use stored execution time
		return DateTime.fromJSDate(prevTask.nextExecutionTime.toDate())
			.setZone(prevTask.timezone)
			.toUTC()
			.toJSDate();
	}

	// Parse RRULE
	const dtStartTimestamp = dtstart ?? prevTask.nextExecutionTime;
	const dtStartWithZone = DateTime.fromJSDate(
		dtStartTimestamp.toDate(),
	).setZone(prevTask.timezone);
	const dtStartUTCFormat = dtStartWithZone.toUTC().toJSDate();

	const nextExecutionTimeInUTC = DateTime.fromJSDate(
		prevTask.nextExecutionTime.toDate(),
	)
		.setZone(prevTask.timezone)
		.toJSDate();

	const rule = rrule.rrulestr(rruleExpression, { dtstart: dtStartUTCFormat });
	const date = rule.after(nextExecutionTimeInUTC);

	if (!date) throw new Error("Date not found");
	return date;
};

Scheduling Request Format

File: src/server/utilities/tasks/baseUtils.ts:253

TypeScript
export const getScheduledTaskRequestBody = (
	payload: TTaskPayload & { scheduleTime: Timestamp },
): google.cloud.tasks.v2.ICreateTaskRequest => {
	const { scheduleTime, ...rest } = payload;

	return {
		parent: CLOUD_TASKS_QUEUE_PATH,
		task: {
			httpRequest: {
				httpMethod: "POST",
				url: TASKS_EXECUTOR_API.ENDPOINT,
				body: Buffer.from(JSON.stringify(rest)).toString("base64"),
				headers: {
					"Content-Type": "application/json",
					"x-merlin-version": "merlin-web",
				},
			},
			scheduleTime: {
				seconds: scheduleTime.seconds,
			},
		},
	};
};

Task Data Model

Firestore Schema:

TypeScript
interface TTaskDB {
	id: string; // UUID
	title: string; // User-friendly title
	description: string; // Detailed description
	purposeOfTask: string; // Execution prompt

	// Schedule
	isOneTimeExecution: boolean; // One-time vs recurring
	rruleExpression: string; // RRULE string (if recurring)
	nextExecutionTime: Timestamp; // Next scheduled run
	timezone: string; // User's timezone

	// Metadata
	status: "ACTIVE" | "PAUSED" | "COMPLETED" | "FAILED";
	category: "REMINDER" | "REPORT" | "ANALYSIS" | "OTHERS";
	delivery: ("EMAIL" | "WEB" | "MOBILE")[];
	deliveryTitle: string; // Custom notification title

	// Cost tracking
	queryPerExecution: number; // Estimated query cost

	// Relations
	createdByUserId: string; // Firebase UID
	createdByChatId: string; // Source chat
	currentChatMessageId: string; // Message where task was created

	// Cloud Tasks
	currentCloudTaskId: string; // Google Cloud Task ID
	currentPayload: object; // Execution payload snapshot

	// Timestamps
	createdAt: Timestamp;
	updatedAt: Timestamp;
}

Executions Subcollection:

TypeScript
interface TTaskExecution {
	id: string; // UUID
	taskId: string; // Parent task
	createdAt: Timestamp;
	content: string; // Generated content
	chatId: string; // Execution chat
	messageId: string; // Result message
	createdByUserId: string;
	queries: number; // Actual query cost
	tokens: number; // Token usage
	status: "SUCCESS" | "FAILURE";
	progress: TMessageProgressStep[]; // Execution progress
	isDelivered: boolean; // Notification sent
}

Summary

The task queue system:

  1. LLM-Powered: Natural language → structured tasks
  2. RRULE Scheduling: Complex recurrence patterns
  3. Wake-Up Pattern: Handle >30 day schedules
  4. Time Zone Aware: User's local time, UTC storage
  5. Cost Tracking: Estimated + actual query costs
  6. Multi-Channel Delivery: Email, web, mobile push
  7. Execution History: Full audit trail
  8. Chain Scheduling: Recurring tasks auto-reschedule

Key Principle: Background tasks should feel like natural conversation continuations, scheduled and executed autonomously.