RAG Sync
Classes
Classes
AdminServer
Admin API server for RAG sync service monitoring.
Exposes /status endpoint with service health, pipeline metrics, and connection status for the admin dashboard.
Constructor
constructor(metricsCollector: MetricsCollector, connectionConfig: ConnectionConfig)
Parameters:
| Parameter | Type | Required | Description |
|---|---|---|---|
metricsCollector | MetricsCollector | Yes | |
connectionConfig | ConnectionConfig | Yes |
Methods
getApp
Get the Express application for testing
getApp(): express.Application
Returns:
express.Application - The Express application instance
start
Start the admin server on the specified port
start(port: number): Promise<void>
Parameters:
| Parameter | Type | Required | Description |
|---|---|---|---|
port | number | Yes | - The port number to listen on |
Returns:
Promise<void> - Promise that resolves when the server is listening
stop
Stop the admin server
stop(): Promise<void>
Returns:
Promise<void> - Promise that resolves when the server is stopped
MetricsCollector
Collects and tracks metrics for the RAG sync service.
Maintains in-memory counters updated by SyncOrchestrator during processing. Provides a snapshot of current metrics for the admin API endpoint.
Constructor
constructor()
Methods
recordProcessed
Record a successfully processed document
recordProcessed(collection: string): void
Parameters:
| Parameter | Type | Required | Description |
|---|---|---|---|
collection | string | Yes | - The collection name (e.g., 'tasks', 'discussions') |
recordSkipped
Record a skipped document (unchanged content hash)
recordSkipped(collection: string): void
Parameters:
| Parameter | Type | Required | Description |
|---|---|---|---|
collection | string | Yes | - The collection name (e.g., 'tasks', 'discussions') |
recordError
Record a processing error
recordError(documentId: string, error: string, attempts: number): void
Parameters:
| Parameter | Type | Required | Description |
|---|---|---|---|
documentId | string | Yes | - The ID of the document that failed |
error | string | Yes | - The error message |
attempts | number | Yes | - The number of retry attempts made |
recordRetry
Record a retry attempt
recordRetry(): void
setQueueDepth
Update current queue depth
setQueueDepth(depth: number): void
Parameters:
| Parameter | Type | Required | Description |
|---|---|---|---|
depth | number | Yes | - The current number of documents waiting to be processed |
setActiveProcessing
Update active processing count
setActiveProcessing(count: number): void
Parameters:
| Parameter | Type | Required | Description |
|---|---|---|---|
count | number | Yes | - The number of documents currently being processed |
setMaxConcurrency
Update max concurrency setting
setMaxConcurrency(max: number): void
Parameters:
| Parameter | Type | Required | Description |
|---|---|---|---|
max | number | Yes | - The maximum number of concurrent document processors |
getMetrics
Get current metrics snapshot
getMetrics(): MetricsSnapshot
Returns:
MetricsSnapshot - A complete snapshot of current service and pipeline metrics
SyncOrchestrator
Main orchestrator for RAG sync operations.
Coordinates the flow of data between RxDB, AMS, and SurrealDB:
- Monitors RxDB collections for document changes
- Monitors AMS for new agent memories
- Processes documents into chunks with embeddings
- Stores embeddings in SurrealDB for semantic search
Constructor
constructor(config: { rxdb: { url: string; database: string; collections: string[]; token?: string | undefined; }; surrealdb: { url: string; database: string; namespace: string; auth: { username: string; password: string; }; }; redis: { host: string; port: number; db: number; keyPrefix: string; password?: string | undefined; }; ollama: { baseUrl: string; embeddingModel: string; timeout: number; maxRetries: number; }; ams: { url: string; pollInterval: number; auth?: { token?: string | undefined; } | undefined; }; sync: { batchSize: number; syncInterval: number; embedBatchSize: number; retryAttempts: number; retryDelay: number; }; adminPort: number; logLevel: "debug" | "info" | "warn" | "error"; })
Parameters:
| Parameter | Type | Required | Description |
|---|---|---|---|
config | { rxdb: { url: string; database: string; collections: string[]; token?: string | undefined; }; surrealdb: { url: string; database: string; namespace: string; auth: { username: string; password: string; }; }; redis: { host: string; port: number; db: number; keyPrefix: string; password?: string | undefined; }; ollama: { baseUrl: string; embeddingModel: string; timeout: number; maxRetries: number; }; ams: { url: string; pollInterval: number; auth?: { token?: string | undefined; } | undefined; }; sync: { batchSize: number; syncInterval: number; embedBatchSize: number; retryAttempts: number; retryDelay: number; }; adminPort: number; logLevel: "debug" | "info" | "warn" | "error"; } | Yes | - Complete configuration for all services |
Methods
start
Start the orchestrator and begin monitoring for changes.
Connects to all services and registers event handlers:
- Connects to SurrealDB vector database
- Creates local RxDB database for replication
- Starts native RxDB replication
- Subscribes to AMS memory events
start(): Promise<void>
Returns:
Promise<void> -
stop
Stop the orchestrator and disconnect from all services.
Performs graceful shutdown:
- Unsubscribes from AMS events
- Stops RxDB replication
- Closes local database
- Closes SurrealDB connection
Safe to call multiple times and during partial startup failures.
stop(): Promise<void>
Returns:
Promise<void> -
isRunning
Check if orchestrator is currently running.
isRunning(): boolean
Returns:
boolean - true if running, false otherwise
handleRxDBChange
Handle RxDB document change events.
Processes document changes from RxDB collections:
- INSERT/UPDATE: Process document into chunks and store embeddings
- DELETE: Remove all chunks from vector database
Uses content hashing to skip unchanged documents. Uses retry logic for transient embedding/storage failures.
handleRxDBChange(event: DocumentChangeEvent): Promise<void>
Parameters:
| Parameter | Type | Required | Description |
|---|---|---|---|
event | DocumentChangeEvent | Yes | - Document change event from RxDB |
Returns:
Promise<void> -
handleAMSMemoryEvent
Handle AMS memory events.
Processes memory events from Agent Memory Server:
- create/update: Process memory into chunks and store embeddings
- delete: Remove memory chunks from vector database
Memory events are treated as documents in the 'memories' collection.
handleAMSMemoryEvent(event: MemoryEvent): Promise<void>
Parameters:
| Parameter | Type | Required | Description |
|---|---|---|---|
event | MemoryEvent | Yes | - Memory event from AMS |
Returns:
Promise<void> -
getMetricsCollector
Get the metrics collector instance for admin API
getMetricsCollector(): MetricsCollector
Returns:
MetricsCollector -
Examples:
const orchestrator = new SyncOrchestrator(config);
// Start monitoring and processing
await orchestrator.start();
// Later, shutdown gracefully
await orchestrator.stop();
AMSMemoryClient
Client for subscribing to and fetching AMS (Agent Memory Server) memories
AMS stores conversation memories in Redis using JSON. This client provides:
- Real-time subscription to memory events via Redis pub/sub
- Bulk fetching of stored memories by namespace
- Automatic JSON parsing of Redis data
Constructor
constructor(config: AMSMemoryClientConfig)
Parameters:
| Parameter | Type | Required | Description |
|---|---|---|---|
config | AMSMemoryClientConfig | Yes | - Client configuration |
Methods
subscribe
Subscribe to AMS memory events via Redis pub/sub
Events are published to 'ams:memory:*' channels when memories are created, updated, or deleted. The handler will be called for each event.
subscribe(handler: MemoryEventHandler): Promise<void>
Parameters:
| Parameter | Type | Required | Description |
|---|---|---|---|
handler | MemoryEventHandler | Yes | - Function to process each memory event |
Returns:
Promise<void> -
unsubscribe
Unsubscribe from memory events and close subscriber connection
unsubscribe(): Promise<void>
Returns:
Promise<void> -
isSubscribed
Check if client is currently subscribed to memory events
isSubscribed(): boolean
Returns:
boolean - true if subscribed, false otherwise
getMemoriesForNamespace
Fetch all long-term memories for a given namespace
Scans Redis for keys matching 'long_term_memory:{namespace}:*' and retrieves each memory as JSON.
getMemoriesForNamespace(namespace: string): Promise<Memory[]>
Parameters:
| Parameter | Type | Required | Description |
|---|---|---|---|
namespace | string | Yes | - Namespace to fetch memories for (e.g., user ID) |
Returns:
Promise<Memory[]> - Array of memories in the namespace
Examples:
const memories = await client.getMemoriesForNamespace('user_123');
console.log(`Found ${memories.length} memories`);
disconnect
Disconnect from Redis and cleanup resources
This will unsubscribe from events and close all connections.
disconnect(): Promise<void>
Returns:
Promise<void> -
Examples:
const client = new AMSMemoryClient({
redisUrl: 'redis://localhost:6379'
});
// Subscribe to memory events
await client.subscribe(async (event) => {
console.log('Memory event:', event);
if (event.type === 'create') {
// Process new memory
}
});
// Fetch existing memories
const memories = await client.getMemoriesForNamespace('user_123');
// Cleanup
await client.disconnect();
NativeRxDBClient
Native RxDB Replication Client
Uses RxDB's native replicateWithWebsocketServer to sync data from the remote rxdb-server to a local in-memory database. Subscribes to local collection change events to process ALL documents (initial sync and real-time updates).
Constructor
constructor(database: any, config: NativeRxDBClientConfig)
Parameters:
| Parameter | Type | Required | Description |
|---|---|---|---|
database | any | Yes | |
config | NativeRxDBClientConfig | Yes |
Methods
onDocumentChange
Register a handler for document change events
Handlers are called for ALL documents in the database, including:
- Documents from initial replication
- Real-time updates from the server
Multiple handlers can be registered and will be called sequentially. If a handler throws an error, it is logged but other handlers will still be called.
onDocumentChange(handler: DocumentChangeHandler): void
Parameters:
| Parameter | Type | Required | Description |
|---|---|---|---|
handler | DocumentChangeHandler | Yes | - Function to process document change events |
startReplication
Start replication for all collections in the database
Initiates WebSocket replication for each collection, waits for initial sync to complete, and subscribes to change events.
startReplication(): Promise<void>
Returns:
Promise<void> -
stopReplication
Stop all replications and clean up
Unsubscribes from all observables, cancels all replication states, and releases resources. Safe to call multiple times.
stopReplication(): Promise<void>
Returns:
Promise<void> -
isRunning
Check if replication is currently running
isRunning(): boolean
Returns:
boolean - true if replication is active, false otherwise
getDatabase
Get the local database instance
getDatabase(): any
Returns:
any - The RxDB database instance
Examples:
const client = new NativeRxDBClient(database, {
wsUrl: 'wss://localhost:7443/ws/replication',
token: 'service-token-here',
batchSize: 100,
});
// Register handler for all document changes
client.onDocumentChange(async (event) => {
console.log(`${event.operation}: ${event.documentId}`);
if (event.document) {
await indexDocument(event.document);
}
});
// Start replication
await client.startReplication();
// Later, stop and clean up
await client.stopReplication();
RxDBReplicationClient
RxDB Replication Client for receiving real-time document changes
Constructor
constructor(config: RxDBReplicationClientConfig)
Parameters:
| Parameter | Type | Required | Description |
|---|---|---|---|
config | RxDBReplicationClientConfig | Yes | - Client configuration |
Methods
connect
Connect to the RxDB server via WebSocket
Establishes WebSocket connection and subscribes to configured collections. Resolves when connection is established, rejects on connection error.
connect(): Promise<void>
Returns:
Promise<void> -
disconnect
Disconnect from the RxDB server
Closes WebSocket connection and cancels any pending reconnection attempts. Safe to call multiple times.
disconnect(): void
isConnected
Check if client is currently connected to the server
isConnected(): boolean
Returns:
boolean - true if connected, false otherwise
onDocumentChange
Register a handler for document change events
Multiple handlers can be registered. Each will be called for every document change event. Handlers are called in registration order.
If a handler throws an error, the error is logged but other handlers will still be called.
onDocumentChange(handler: DocumentChangeHandler): void
Parameters:
| Parameter | Type | Required | Description |
|---|---|---|---|
handler | DocumentChangeHandler | Yes | - Function to process document change events |
Examples:
client.onDocumentChange(async (event) => {
if (event.operation === 'INSERT') {
await indexNewDocument(event.document);
}
});
Examples:
const client = new RxDBReplicationClient({
wsUrl: 'ws://localhost:3002/ws/replication',
collections: ['tasks', 'projects', 'notes'],
token: 'service-token-here'
});
// Register handler for document changes
client.onDocumentChange(async (event) => {
console.log(`Document ${event.operation}: ${event.documentId}`);
if (event.operation === 'INSERT' || event.operation === 'UPDATE') {
// Process document for indexing
await indexDocument(event.document);
} else if (event.operation === 'DELETE') {
// Remove from index
await removeFromIndex(event.documentId);
}
});
// Connect and start receiving events
await client.connect();
// Later, disconnect when done
await client.disconnect();
SurrealDBClient
SurrealDB client for vector storage and retrieval
Provides an interface to store and query document embeddings for Retrieval-Augmented Generation (RAG) use cases.
Constructor
constructor(config: SurrealDBConfig)
Parameters:
| Parameter | Type | Required | Description |
|---|---|---|---|
config | SurrealDBConfig | Yes |
Methods
connect
Establish connection to SurrealDB and initialize schema
connect(): Promise<void>
Returns:
Promise<void> -
disconnect
Close connection to SurrealDB
disconnect(): Promise<void>
Returns:
Promise<void> -
isConnected
Check if client is connected to database
isConnected(): boolean
Returns:
boolean - true if connected, false otherwise
getDocumentHash
Get the content hash for a document if it exists
Used to check if a document has changed before re-processing.
getDocumentHash(docId: string): Promise<string | null>
Parameters:
| Parameter | Type | Required | Description |
|---|---|---|---|
docId | string | Yes | - Document ID to check |
Returns:
Promise<string \| null> - The content hash if found, null otherwise
upsertDocument
Insert or update a document chunk with its embedding
Documents are identified by docId + chunkIndex. If a document with the same ID already exists, it will be updated.
upsertDocument(doc: RAGDocument): Promise<void>
Parameters:
| Parameter | Type | Required | Description |
|---|---|---|---|
doc | RAGDocument | Yes | - Document to upsert |
Returns:
Promise<void> -
deleteDocument
Delete all chunks for a document
deleteDocument(docId: string): Promise<void>
Parameters:
| Parameter | Type | Required | Description |
|---|---|---|---|
docId | string | Yes | - Document ID to delete |
Returns:
Promise<void> -
vectorSearch
Perform vector similarity search
Searches for documents with embeddings similar to the query vector using cosine similarity. Results are filtered by workspace/org if specified.
vectorSearch(options: VectorSearchOptions): Promise<SearchResult[]>
Parameters:
| Parameter | Type | Required | Description |
|---|---|---|---|
options | VectorSearchOptions | Yes | - Search options |
Returns:
Promise<SearchResult[]> - Array of matching documents with similarity scores
Examples:
const results = await client.vectorSearch({
queryVector: embeddings,
workspaceId: 'ws_1',
collections: ['tasks', 'notes'],
limit: 10,
minScore: 0.7
});
Examples:
const client = new SurrealDBClient({
url: 'ws://localhost:8000/rpc',
username: 'root',
password: 'root',
namespace: 'flowstate',
database: 'rag'
});
await client.connect();
await client.upsertDocument({
collection: 'tasks',
docId: 'task_123',
content: 'Task description...',
chunkIndex: 0,
embedding: [0.1, 0.2, ...],
metadata: { title: 'My Task' },
orgId: 'org_1',
workspaceId: 'ws_1'
});
const results = await client.vectorSearch({
queryVector: [0.1, 0.2, ...],
workspaceId: 'ws_1',
limit: 10
});
LocalDatabaseError
Error thrown when local database creation fails
Constructor
constructor(message: string)
Parameters:
| Parameter | Type | Required | Description |
|---|---|---|---|
message | string | Yes |
DocumentProcessor
Processes documents into chunks with embeddings for vector search. Handles document chunking, embedding generation, and metadata extraction based on collection-specific configurations.
Constructor
constructor(embeddingService: EmbeddingService)
Parameters:
| Parameter | Type | Required | Description |
|---|---|---|---|
embeddingService | EmbeddingService | Yes |
Methods
process
Processes a document into searchable chunks with embeddings.
process(collection: string, doc: Record<string, unknown>): Promise<ProcessedDocument>
Parameters:
| Parameter | Type | Required | Description |
|---|---|---|---|
collection | string | Yes | - The collection type (e.g., 'tasks', 'projects', 'memories') |
doc | Record<string, unknown> | Yes | - The document to process, must contain id, orgId, and workspaceId |
Returns:
Promise<ProcessedDocument> - A processed document with chunks and embeddings
EmbeddingService
Service for generating text embeddings using Ollama
This service wraps the Ollama API to provide a simple interface for generating embeddings from text. It supports both single text and batch embedding operations.
Constructor
constructor(config: EmbeddingServiceConfig)
Parameters:
| Parameter | Type | Required | Description |
|---|---|---|---|
config | EmbeddingServiceConfig | Yes |
Methods
embed
Generate embedding for a single text
embed(text: string): Promise<number[]>
Parameters:
| Parameter | Type | Required | Description |
|---|---|---|---|
text | string | Yes | - Input text to embed |
Returns:
Promise<number[]> - Array of numbers representing the embedding vector
embedBatch
Generate embeddings for multiple texts in a batch
This is more efficient than calling embed() multiple times as it sends all texts to Ollama in a single request.
embedBatch(texts: string[]): Promise<number[][]>
Parameters:
| Parameter | Type | Required | Description |
|---|---|---|---|
texts | string[] | Yes | - Array of input texts to embed |
Returns:
Promise<number[][]> - Array of embedding vectors, one for each input text
Examples:
const service = new EmbeddingService({
ollamaUrl: 'http://localhost:11434',
model: 'nomic-embed-text',
});
// Generate single embedding
const embedding = await service.embed('Hello, world!');
// Generate batch embeddings
const embeddings = await service.embedBatch(['Text 1', 'Text 2']);