RAG Sync

Classes

Classes

AdminServer

Admin API server for RAG sync service monitoring.

Exposes /status endpoint with service health, pipeline metrics, and connection status for the admin dashboard.

Constructor

constructor(metricsCollector: MetricsCollector, connectionConfig: ConnectionConfig)

Parameters:

ParameterTypeRequiredDescription
metricsCollectorMetricsCollectorYes
connectionConfigConnectionConfigYes

Methods

getApp

Get the Express application for testing

getApp(): express.Application

Returns:

express.Application - The Express application instance

start

Start the admin server on the specified port

start(port: number): Promise<void>

Parameters:

ParameterTypeRequiredDescription
portnumberYes- The port number to listen on

Returns:

Promise<void> - Promise that resolves when the server is listening

stop

Stop the admin server

stop(): Promise<void>

Returns:

Promise<void> - Promise that resolves when the server is stopped

MetricsCollector

Collects and tracks metrics for the RAG sync service.

Maintains in-memory counters updated by SyncOrchestrator during processing. Provides a snapshot of current metrics for the admin API endpoint.

Constructor

constructor()

Methods

recordProcessed

Record a successfully processed document

recordProcessed(collection: string): void

Parameters:

ParameterTypeRequiredDescription
collectionstringYes- The collection name (e.g., 'tasks', 'discussions')

recordSkipped

Record a skipped document (unchanged content hash)

recordSkipped(collection: string): void

Parameters:

ParameterTypeRequiredDescription
collectionstringYes- The collection name (e.g., 'tasks', 'discussions')

recordError

Record a processing error

recordError(documentId: string, error: string, attempts: number): void

Parameters:

ParameterTypeRequiredDescription
documentIdstringYes- The ID of the document that failed
errorstringYes- The error message
attemptsnumberYes- The number of retry attempts made

recordRetry

Record a retry attempt

recordRetry(): void

setQueueDepth

Update current queue depth

setQueueDepth(depth: number): void

Parameters:

ParameterTypeRequiredDescription
depthnumberYes- The current number of documents waiting to be processed

setActiveProcessing

Update active processing count

setActiveProcessing(count: number): void

Parameters:

ParameterTypeRequiredDescription
countnumberYes- The number of documents currently being processed

setMaxConcurrency

Update max concurrency setting

setMaxConcurrency(max: number): void

Parameters:

ParameterTypeRequiredDescription
maxnumberYes- The maximum number of concurrent document processors

getMetrics

Get current metrics snapshot

getMetrics(): MetricsSnapshot

Returns:

MetricsSnapshot - A complete snapshot of current service and pipeline metrics

SyncOrchestrator

Main orchestrator for RAG sync operations.

Coordinates the flow of data between RxDB, AMS, and SurrealDB:

  • Monitors RxDB collections for document changes
  • Monitors AMS for new agent memories
  • Processes documents into chunks with embeddings
  • Stores embeddings in SurrealDB for semantic search

Constructor

constructor(config: { rxdb: { url: string; database: string; collections: string[]; token?: string | undefined; }; surrealdb: { url: string; database: string; namespace: string; auth: { username: string; password: string; }; }; redis: { host: string; port: number; db: number; keyPrefix: string; password?: string | undefined; }; ollama: { baseUrl: string; embeddingModel: string; timeout: number; maxRetries: number; }; ams: { url: string; pollInterval: number; auth?: { token?: string | undefined; } | undefined; }; sync: { batchSize: number; syncInterval: number; embedBatchSize: number; retryAttempts: number; retryDelay: number; }; adminPort: number; logLevel: "debug" | "info" | "warn" | "error"; })

Parameters:

ParameterTypeRequiredDescription
config{ rxdb: { url: string; database: string; collections: string[]; token?: string | undefined; }; surrealdb: { url: string; database: string; namespace: string; auth: { username: string; password: string; }; }; redis: { host: string; port: number; db: number; keyPrefix: string; password?: string | undefined; }; ollama: { baseUrl: string; embeddingModel: string; timeout: number; maxRetries: number; }; ams: { url: string; pollInterval: number; auth?: { token?: string | undefined; } | undefined; }; sync: { batchSize: number; syncInterval: number; embedBatchSize: number; retryAttempts: number; retryDelay: number; }; adminPort: number; logLevel: "debug" | "info" | "warn" | "error"; }Yes- Complete configuration for all services

Methods

start

Start the orchestrator and begin monitoring for changes.

Connects to all services and registers event handlers:

  • Connects to SurrealDB vector database
  • Creates local RxDB database for replication
  • Starts native RxDB replication
  • Subscribes to AMS memory events
start(): Promise<void>

Returns:

Promise<void> -

stop

Stop the orchestrator and disconnect from all services.

Performs graceful shutdown:

  • Unsubscribes from AMS events
  • Stops RxDB replication
  • Closes local database
  • Closes SurrealDB connection

Safe to call multiple times and during partial startup failures.

stop(): Promise<void>

Returns:

Promise<void> -

isRunning

Check if orchestrator is currently running.

isRunning(): boolean

Returns:

boolean - true if running, false otherwise

handleRxDBChange

Handle RxDB document change events.

Processes document changes from RxDB collections:

  • INSERT/UPDATE: Process document into chunks and store embeddings
  • DELETE: Remove all chunks from vector database

Uses content hashing to skip unchanged documents. Uses retry logic for transient embedding/storage failures.

handleRxDBChange(event: DocumentChangeEvent): Promise<void>

Parameters:

ParameterTypeRequiredDescription
eventDocumentChangeEventYes- Document change event from RxDB

Returns:

Promise<void> -

handleAMSMemoryEvent

Handle AMS memory events.

Processes memory events from Agent Memory Server:

  • create/update: Process memory into chunks and store embeddings
  • delete: Remove memory chunks from vector database

Memory events are treated as documents in the 'memories' collection.

handleAMSMemoryEvent(event: MemoryEvent): Promise<void>

Parameters:

ParameterTypeRequiredDescription
eventMemoryEventYes- Memory event from AMS

Returns:

Promise<void> -

getMetricsCollector

Get the metrics collector instance for admin API

getMetricsCollector(): MetricsCollector

Returns:

MetricsCollector -

Examples:

const orchestrator = new SyncOrchestrator(config);

// Start monitoring and processing
await orchestrator.start();

// Later, shutdown gracefully
await orchestrator.stop();

AMSMemoryClient

Client for subscribing to and fetching AMS (Agent Memory Server) memories

AMS stores conversation memories in Redis using JSON. This client provides:

  • Real-time subscription to memory events via Redis pub/sub
  • Bulk fetching of stored memories by namespace
  • Automatic JSON parsing of Redis data

Constructor

constructor(config: AMSMemoryClientConfig)

Parameters:

ParameterTypeRequiredDescription
configAMSMemoryClientConfigYes- Client configuration

Methods

subscribe

Subscribe to AMS memory events via Redis pub/sub

Events are published to 'ams:memory:*' channels when memories are created, updated, or deleted. The handler will be called for each event.

subscribe(handler: MemoryEventHandler): Promise<void>

Parameters:

ParameterTypeRequiredDescription
handlerMemoryEventHandlerYes- Function to process each memory event

Returns:

Promise<void> -

unsubscribe

Unsubscribe from memory events and close subscriber connection

unsubscribe(): Promise<void>

Returns:

Promise<void> -

isSubscribed

Check if client is currently subscribed to memory events

isSubscribed(): boolean

Returns:

boolean - true if subscribed, false otherwise

getMemoriesForNamespace

Fetch all long-term memories for a given namespace

Scans Redis for keys matching 'long_term_memory:{namespace}:*' and retrieves each memory as JSON.

getMemoriesForNamespace(namespace: string): Promise<Memory[]>

Parameters:

ParameterTypeRequiredDescription
namespacestringYes- Namespace to fetch memories for (e.g., user ID)

Returns:

Promise<Memory[]> - Array of memories in the namespace

Examples:

const memories = await client.getMemoriesForNamespace('user_123');
console.log(`Found ${memories.length} memories`);

disconnect

Disconnect from Redis and cleanup resources

This will unsubscribe from events and close all connections.

disconnect(): Promise<void>

Returns:

Promise<void> -

Examples:

const client = new AMSMemoryClient({
  redisUrl: 'redis://localhost:6379'
});

// Subscribe to memory events
await client.subscribe(async (event) => {
  console.log('Memory event:', event);
  if (event.type === 'create') {
    // Process new memory
  }
});

// Fetch existing memories
const memories = await client.getMemoriesForNamespace('user_123');

// Cleanup
await client.disconnect();

NativeRxDBClient

Native RxDB Replication Client

Uses RxDB's native replicateWithWebsocketServer to sync data from the remote rxdb-server to a local in-memory database. Subscribes to local collection change events to process ALL documents (initial sync and real-time updates).

Constructor

constructor(database: any, config: NativeRxDBClientConfig)

Parameters:

ParameterTypeRequiredDescription
databaseanyYes
configNativeRxDBClientConfigYes

Methods

onDocumentChange

Register a handler for document change events

Handlers are called for ALL documents in the database, including:

  • Documents from initial replication
  • Real-time updates from the server

Multiple handlers can be registered and will be called sequentially. If a handler throws an error, it is logged but other handlers will still be called.

onDocumentChange(handler: DocumentChangeHandler): void

Parameters:

ParameterTypeRequiredDescription
handlerDocumentChangeHandlerYes- Function to process document change events

startReplication

Start replication for all collections in the database

Initiates WebSocket replication for each collection, waits for initial sync to complete, and subscribes to change events.

startReplication(): Promise<void>

Returns:

Promise<void> -

stopReplication

Stop all replications and clean up

Unsubscribes from all observables, cancels all replication states, and releases resources. Safe to call multiple times.

stopReplication(): Promise<void>

Returns:

Promise<void> -

isRunning

Check if replication is currently running

isRunning(): boolean

Returns:

boolean - true if replication is active, false otherwise

getDatabase

Get the local database instance

getDatabase(): any

Returns:

any - The RxDB database instance

Examples:

const client = new NativeRxDBClient(database, {
  wsUrl: 'wss://localhost:7443/ws/replication',
  token: 'service-token-here',
  batchSize: 100,
});

// Register handler for all document changes
client.onDocumentChange(async (event) => {
  console.log(`${event.operation}: ${event.documentId}`);
  if (event.document) {
    await indexDocument(event.document);
  }
});

// Start replication
await client.startReplication();

// Later, stop and clean up
await client.stopReplication();

RxDBReplicationClient

RxDB Replication Client for receiving real-time document changes

Constructor

constructor(config: RxDBReplicationClientConfig)

Parameters:

ParameterTypeRequiredDescription
configRxDBReplicationClientConfigYes- Client configuration

Methods

connect

Connect to the RxDB server via WebSocket

Establishes WebSocket connection and subscribes to configured collections. Resolves when connection is established, rejects on connection error.

connect(): Promise<void>

Returns:

Promise<void> -

disconnect

Disconnect from the RxDB server

Closes WebSocket connection and cancels any pending reconnection attempts. Safe to call multiple times.

disconnect(): void

isConnected

Check if client is currently connected to the server

isConnected(): boolean

Returns:

boolean - true if connected, false otherwise

onDocumentChange

Register a handler for document change events

Multiple handlers can be registered. Each will be called for every document change event. Handlers are called in registration order.

If a handler throws an error, the error is logged but other handlers will still be called.

onDocumentChange(handler: DocumentChangeHandler): void

Parameters:

ParameterTypeRequiredDescription
handlerDocumentChangeHandlerYes- Function to process document change events

Examples:

client.onDocumentChange(async (event) => {
  if (event.operation === 'INSERT') {
    await indexNewDocument(event.document);
  }
});

Examples:

const client = new RxDBReplicationClient({
  wsUrl: 'ws://localhost:3002/ws/replication',
  collections: ['tasks', 'projects', 'notes'],
  token: 'service-token-here'
});

// Register handler for document changes
client.onDocumentChange(async (event) => {
  console.log(`Document ${event.operation}: ${event.documentId}`);
  if (event.operation === 'INSERT' || event.operation === 'UPDATE') {
    // Process document for indexing
    await indexDocument(event.document);
  } else if (event.operation === 'DELETE') {
    // Remove from index
    await removeFromIndex(event.documentId);
  }
});

// Connect and start receiving events
await client.connect();

// Later, disconnect when done
await client.disconnect();

SurrealDBClient

SurrealDB client for vector storage and retrieval

Provides an interface to store and query document embeddings for Retrieval-Augmented Generation (RAG) use cases.

Constructor

constructor(config: SurrealDBConfig)

Parameters:

ParameterTypeRequiredDescription
configSurrealDBConfigYes

Methods

connect

Establish connection to SurrealDB and initialize schema

connect(): Promise<void>

Returns:

Promise<void> -

disconnect

Close connection to SurrealDB

disconnect(): Promise<void>

Returns:

Promise<void> -

isConnected

Check if client is connected to database

isConnected(): boolean

Returns:

boolean - true if connected, false otherwise

getDocumentHash

Get the content hash for a document if it exists

Used to check if a document has changed before re-processing.

getDocumentHash(docId: string): Promise<string | null>

Parameters:

ParameterTypeRequiredDescription
docIdstringYes- Document ID to check

Returns:

Promise<string \| null> - The content hash if found, null otherwise

upsertDocument

Insert or update a document chunk with its embedding

Documents are identified by docId + chunkIndex. If a document with the same ID already exists, it will be updated.

upsertDocument(doc: RAGDocument): Promise<void>

Parameters:

ParameterTypeRequiredDescription
docRAGDocumentYes- Document to upsert

Returns:

Promise<void> -

deleteDocument

Delete all chunks for a document

deleteDocument(docId: string): Promise<void>

Parameters:

ParameterTypeRequiredDescription
docIdstringYes- Document ID to delete

Returns:

Promise<void> -

Perform vector similarity search

Searches for documents with embeddings similar to the query vector using cosine similarity. Results are filtered by workspace/org if specified.

vectorSearch(options: VectorSearchOptions): Promise<SearchResult[]>

Parameters:

ParameterTypeRequiredDescription
optionsVectorSearchOptionsYes- Search options

Returns:

Promise<SearchResult[]> - Array of matching documents with similarity scores

Examples:

const results = await client.vectorSearch({
  queryVector: embeddings,
  workspaceId: 'ws_1',
  collections: ['tasks', 'notes'],
  limit: 10,
  minScore: 0.7
});

Examples:

const client = new SurrealDBClient({
  url: 'ws://localhost:8000/rpc',
  username: 'root',
  password: 'root',
  namespace: 'flowstate',
  database: 'rag'
});

await client.connect();

await client.upsertDocument({
  collection: 'tasks',
  docId: 'task_123',
  content: 'Task description...',
  chunkIndex: 0,
  embedding: [0.1, 0.2, ...],
  metadata: { title: 'My Task' },
  orgId: 'org_1',
  workspaceId: 'ws_1'
});

const results = await client.vectorSearch({
  queryVector: [0.1, 0.2, ...],
  workspaceId: 'ws_1',
  limit: 10
});

LocalDatabaseError

Error thrown when local database creation fails

Constructor

constructor(message: string)

Parameters:

ParameterTypeRequiredDescription
messagestringYes

DocumentProcessor

Processes documents into chunks with embeddings for vector search. Handles document chunking, embedding generation, and metadata extraction based on collection-specific configurations.

Constructor

constructor(embeddingService: EmbeddingService)

Parameters:

ParameterTypeRequiredDescription
embeddingServiceEmbeddingServiceYes

Methods

process

Processes a document into searchable chunks with embeddings.

process(collection: string, doc: Record<string, unknown>): Promise<ProcessedDocument>

Parameters:

ParameterTypeRequiredDescription
collectionstringYes- The collection type (e.g., 'tasks', 'projects', 'memories')
docRecord<string, unknown>Yes- The document to process, must contain id, orgId, and workspaceId

Returns:

Promise<ProcessedDocument> - A processed document with chunks and embeddings

EmbeddingService

Service for generating text embeddings using Ollama

This service wraps the Ollama API to provide a simple interface for generating embeddings from text. It supports both single text and batch embedding operations.

Constructor

constructor(config: EmbeddingServiceConfig)

Parameters:

ParameterTypeRequiredDescription
configEmbeddingServiceConfigYes

Methods

embed

Generate embedding for a single text

embed(text: string): Promise<number[]>

Parameters:

ParameterTypeRequiredDescription
textstringYes- Input text to embed

Returns:

Promise<number[]> - Array of numbers representing the embedding vector

embedBatch

Generate embeddings for multiple texts in a batch

This is more efficient than calling embed() multiple times as it sends all texts to Ollama in a single request.

embedBatch(texts: string[]): Promise<number[][]>

Parameters:

ParameterTypeRequiredDescription
textsstring[]Yes- Array of input texts to embed

Returns:

Promise<number[][]> - Array of embedding vectors, one for each input text

Examples:

const service = new EmbeddingService({
  ollamaUrl: 'http://localhost:11434',
  model: 'nomic-embed-text',
});

// Generate single embedding
const embedding = await service.embed('Hello, world!');

// Generate batch embeddings
const embeddings = await service.embedBatch(['Text 1', 'Text 2']);
Previous
Types