Architecture Overview
The Agent package (packages/agent) provides:
- Thread Management: Persistent conversation threads
- Streaming: Real-time AI response streaming
- Tool Support: Function calling for AI agents
- RAG Integration: Vector search and embeddings
- Model Flexibility: Support for multiple AI providers
Setting Up an Agent
Creating a Thread
Threads store conversation history:import { createThread, sendMessage } from "@packages/agent";
import { components } from "../_generated/api";
export const startChat = authenticatedMutation({
args: {
prompt: v.string(),
serverDiscordId: v.optional(v.string()),
},
handler: async (ctx, args) => {
const userId = args.userId;
// Create a new thread
const threadId = await createThread(ctx, components.agent, { userId });
// Save metadata
await ctx.db.insert("chatThreadMetadata", {
threadId,
serverDiscordId: args.serverDiscordId,
modelId: "anthropic/claude-3.5-sonnet",
});
return { threadId };
},
});
Sending Messages
Frompackages/database/convex/chat/mutations.ts:
import { saveMessage, syncStreams } from "@packages/agent";
import { components } from "../_generated/api";
export const sendMessage = anonOrAuthenticatedMutation({
args: {
threadId: v.string(),
prompt: v.string(),
modelId: v.optional(vModelId),
},
handler: async (ctx, args) => {
const userId = args.userId;
const modelId = args.modelId ?? "anthropic/claude-3.5-sonnet";
// Save user message
await saveMessage(ctx, components.agent, {
threadId: args.threadId,
message: {
role: "user",
content: args.prompt,
},
});
// Schedule AI response
await ctx.scheduler.runAfter(0, internal.chat.actions.generateResponse, {
threadId: args.threadId,
modelId,
});
return { threadId: args.threadId };
},
});
Streaming Responses
The Agent package supports streaming for real-time UX.Server-Side Streaming
import { streamText } from "@packages/agent";
import { anthropic } from "@ai-sdk/anthropic";
import { internal } from "../_generated/api";
export const generateResponse = internalAction({
args: {
threadId: v.string(),
modelId: v.string(),
},
handler: async (ctx, args) => {
// Get conversation history
const messages = await ctx.runQuery(
internal.chat.queries.getThreadMessages,
{ threadId: args.threadId }
);
// Stream AI response
const result = await streamText(ctx, components.agent, {
threadId: args.threadId,
model: anthropic("claude-3-5-sonnet-20241022"),
messages,
system: "You are a helpful assistant for Discord communities.",
tools: {
searchDiscord: {
description: "Search Discord messages",
parameters: z.object({
query: z.string(),
}),
execute: async ({ query }) => {
// Search implementation
return results;
},
},
},
});
return result;
},
});
Client-Side Streaming
Fromapps/main-site/src/components/chat/chat-interface.tsx:
"use client";
import { useQuery, useMutation } from "convex/react";
import { api } from "@packages/database/convex/_generated/api";
import { syncStreams } from "@packages/agent/react";
export function ChatInterface({ threadId }: { threadId: string }) {
const messages = useQuery(api.chat.queries.listUIMessages, {
threadId,
});
const sendMessage = useMutation(api.chat.mutations.sendMessage);
// Sync streaming updates
syncStreams({
threadId,
api,
});
const handleSubmit = async (prompt: string) => {
await sendMessage({
threadId,
prompt,
});
};
return (
<div className="flex flex-col h-full">
<div className="flex-1 overflow-y-auto">
{messages?.map((message) => (
<div key={message.id} className={`message ${message.role}`}>
{message.content}
{/* Show streaming indicator */}
{message.status === "streaming" && (
<span className="animate-pulse">...</span>
)}
</div>
))}
</div>
<ChatInput onSubmit={handleSubmit} />
</div>
);
}
Tools and Function Calling
Tools let AI agents perform actions.Defining Tools
import { createTool } from "@packages/agent";
import { z } from "zod";
const searchTool = createTool({
name: "search_discord",
description: "Search Discord messages for relevant information",
parameters: z.object({
query: z.string().describe("The search query"),
channelId: z.string().optional().describe("Specific channel to search"),
}),
execute: async (ctx, { query, channelId }) => {
const results = await ctx.db
.query("messages")
.withSearchIndex("search_content", (q) =>
q.search("content", query)
)
.take(5);
return results.map((msg) => ({
content: msg.content,
author: msg.authorId.toString(),
timestamp: msg.createdTimestamp,
}));
},
});
Using Tools in Agents
import { streamText } from "@packages/agent";
import { anthropic } from "@ai-sdk/anthropic";
export const chatWithTools = internalAction({
args: { threadId: v.string(), prompt: v.string() },
handler: async (ctx, args) => {
const result = await streamText(ctx, components.agent, {
threadId: args.threadId,
model: anthropic("claude-3-5-sonnet-20241022"),
messages: [{ role: "user", content: args.prompt }],
tools: {
searchDiscord: searchTool,
createGitHubIssue: createIssueTool,
},
maxSteps: 5, // Allow multi-step tool use
});
return result;
},
});
RAG (Retrieval-Augmented Generation)
Use vector embeddings for semantic search.Generating Embeddings
import { embedMessages, generateAndSaveEmbeddings } from "@packages/agent";
export const indexMessage = internalAction({
args: {
messageId: v.string(),
},
handler: async (ctx, args) => {
const message = await ctx.runQuery(
internal.messages.getMessage,
{ messageId: args.messageId }
);
if (!message) return;
// Generate embeddings
await generateAndSaveEmbeddings(ctx, components.agent, {
documents: [
{
id: message._id,
content: message.content,
metadata: {
authorId: message.authorId,
channelId: message.channelId,
},
},
],
});
},
});
Semantic Search
import { fetchContextMessages } from "@packages/agent";
export const searchSimilarMessages = internalAction({
args: {
query: v.string(),
limit: v.optional(v.number()),
},
handler: async (ctx, args) => {
const results = await fetchContextMessages(ctx, components.agent, {
query: args.query,
limit: args.limit ?? 10,
});
return results;
},
});
Model Configuration
Support multiple AI models:// packages/database/convex/shared/models.ts
import { anthropic } from "@ai-sdk/anthropic";
import { openai } from "@ai-sdk/openai";
export const models = {
"anthropic/claude-3.5-sonnet": {
name: "Claude 3.5 Sonnet",
provider: "anthropic",
model: anthropic("claude-3-5-sonnet-20241022"),
requiresSignIn: false,
},
"openai/gpt-4o": {
name: "GPT-4o",
provider: "openai",
model: openai("gpt-4o"),
requiresSignIn: true,
},
} as const;
export const defaultModelId = "anthropic/claude-3.5-sonnet";
export function getModelById(modelId: string) {
return models[modelId] ?? models[defaultModelId];
}
Usage Tracking
Track AI usage for rate limiting and billing:import { internal } from "../_generated/api";
export const checkAndConsumeMessage = async (
ctx: MutationCtx,
userId: string
) => {
const usage = await ctx.db
.query("userAIChatMessageUsage")
.withIndex("by_userId", (q) => q.eq("userId", userId))
.first();
const limit = 10; // Free tier limit
if (!usage) {
await ctx.db.insert("userAIChatMessageUsage", {
userId,
periodStart: Date.now(),
periodEnd: Date.now() + 30 * 24 * 60 * 60 * 1000, // 30 days
subscriptionMessagesUsed: 1,
purchasedCredits: 0,
});
return { allowed: true };
}
if (usage.subscriptionMessagesUsed >= limit) {
return {
allowed: false,
reason: `You've reached your limit of ${limit} messages per month.`,
};
}
await ctx.db.patch(usage._id, {
subscriptionMessagesUsed: usage.subscriptionMessagesUsed + 1,
});
return { allowed: true };
};
Testing AI Features
Mocking Models
import { mockModel } from "@packages/agent";
import { expect, test } from "vitest";
test("generates response", async () => {
const mock = mockModel({
responses: [
{ role: "assistant", content: "Hello! How can I help?" },
],
});
const result = await streamText(ctx, components.agent, {
threadId: "test-thread",
model: mock,
messages: [{ role: "user", content: "Hi" }],
});
expect(result.content).toBe("Hello! How can I help?");
});
Testing Tool Execution
test("uses search tool", async () => {
const searchSpy = vi.fn().mockResolvedValue([{ content: "Result" }]);
const tool = createTool({
name: "search",
description: "Search",
parameters: z.object({ query: z.string() }),
execute: searchSpy,
});
await streamText(ctx, components.agent, {
threadId: "test",
model: mockModel({
toolCalls: [{ name: "search", args: { query: "test" } }],
}),
messages: [{ role: "user", content: "Search for test" }],
tools: { search: tool },
});
expect(searchSpy).toHaveBeenCalledWith(expect.anything(), { query: "test" });
});
Troubleshooting
Streaming Not Working
Ensure you’re callingsyncStreams on the client:
import { syncStreams } from "@packages/agent/react";
syncStreams({ threadId, api });
Rate Limiting
Implement exponential backoff:import { Effect, Schedule, Duration } from "effect";
const retrySchedule = Schedule.exponential(Duration.seconds(1), 2).pipe(
Schedule.compose(Schedule.recurs(3))
);
const result = await Effect.retry(generateResponse, retrySchedule);
Token Limits
Truncate conversation history:const recentMessages = messages.slice(-10); // Last 10 messages
Next Steps
- Explore Testing strategies
- Learn about Deployment
- Check Database patterns