RxDB Server

Functions

Functions

createDatabase

Creates and initializes the Epic Flow RxDB database with all collections

Signature:

createDatabase(databaseName: string): Promise<EpicFlowDatabase>

Parameters:

ParameterTypeRequiredDescription
databaseNamestringYes

Returns:

Promise<EpicFlowDatabase> -

startReplicationClient

Start WebSocket replication client to pull from remote server Uses RxDB's built-in WebSocket replication

Signature:

startReplicationClient(options: ReplicationClientOptions): Promise<void>

Parameters:

ParameterTypeRequiredDescription
optionsReplicationClientOptionsYes

Returns:

Promise<void> -

createAuthContext

Creates auth context from request headers Extracts domainId from x-domain-id header (set by Kong gateway). Throws MissingDomainIdError if header is missing or has placeholder value. Kong handles authentication - this server extracts domain context for isolation.

Signature:

createAuthContext(headers: Record<string, string | string[] | undefined>): { data: { type: "service"; serviceAccountId: string; domainId: string; scopes: string[]; }; validUntil: number; }

Parameters:

ParameterTypeRequiredDescription
headersRecord<string, string | string[] | undefined>Yes

Returns:

{ data: { type: "service"; serviceAccountId: string; domainId: string; scopes: string[]; }; validUntil: number; } -

createServer

Creates and configures the RxDB server with Express Authentication is handled by Kong gateway - this server extracts domain context for isolation

Signature:

createServer(database: EpicFlowDatabase): Promise<ServerInstance>

Parameters:

ParameterTypeRequiredDescription
databaseEpicFlowDatabaseYes

Returns:

Promise<ServerInstance> -

startServerAndAttachWebSocket

Start the server and attach WebSocket after endpoints are configured Must be called AFTER setupReplicationEndpoints and setupRestEndpoints Authentication is handled by Kong gateway - WebSocket server trusts all connections

Signature:

startServerAndAttachWebSocket(rxServer: any, database: EpicFlowDatabase): Promise<{ httpServer: any; wsServer: WebSocketServerState; }>

Parameters:

ParameterTypeRequiredDescription
rxServeranyYes
databaseEpicFlowDatabaseYes

Returns:

Promise<{ httpServer: any; wsServer: WebSocketServerState; }> -

setupReplicationEndpoints

Setup replication endpoints for all collections Note: Auth collections are NOT included in rxdb-server - they're handled by the auth system separately

Signature:

setupReplicationEndpoints(server: any, database: EpicFlowDatabase): void

Parameters:

ParameterTypeRequiredDescription
serveranyYes
databaseEpicFlowDatabaseYes

setupRestEndpoints

Setup REST endpoints for all db-collections Provides CRUD operations via REST API instead of replication Note: Auth collections are NOT included in rxdb-server - they're handled by the auth system separately

Signature:

setupRestEndpoints(server: any, database: EpicFlowDatabase): Promise<void>

Parameters:

ParameterTypeRequiredDescription
serveranyYes
databaseEpicFlowDatabaseYes

Returns:

Promise<void> -

setupReplicationConfigEndpoints

Sets up replication configuration API endpoints

Adds REST endpoints for monitoring and controlling remote replication:

  • GET /api/replication/status - Get current replication status
  • POST /api/replication/pause - Pause all replication
  • POST /api/replication/resume - Resume all replication

Signature:

setupReplicationConfigEndpoints(app: any, remoteClient: RemoteReplicationClient | null): void

Parameters:

ParameterTypeRequiredDescription
appanyYes- Express application instance
remoteClientRemoteReplicationClient | nullYes- Remote replication client instance

Examples:

const app = express()
const remoteClient = new RemoteReplicationClient()

// Setup replication API
setupReplicationConfigEndpoints(app, remoteClient)

// Now endpoints are available:
// GET  http://localhost:3002/api/replication/status
// POST http://localhost:3002/api/replication/pause
// POST http://localhost:3002/api/replication/resume

setupAuthEndpoints

Setup auth REST endpoints for token management

Signature:

setupAuthEndpoints(server: any, storage: AuthStorageAdapter, tokenManager: TokenManager, mailService: MailService): void

Parameters:

ParameterTypeRequiredDescription
serveranyYes
storageAuthStorageAdapterYes
tokenManagerTokenManagerYes
mailServiceMailServiceYes

loadRemoteReplicationConfig

Loads remote replication configuration from environment variables

Reads all configuration from process.env and returns a typed config object. Validates required fields and provides defaults for optional fields.

Signature:

loadRemoteReplicationConfig(): RemoteReplicationConfig

Returns:

RemoteReplicationConfig - Remote replication configuration

Examples:

const config = loadRemoteReplicationConfig()

if (config.enabled) {
  console.log(`Connecting to ${config.url}`)
  // ... setup replication
}

createWebSocketAuthHandler

Creates a WebSocket authentication handler that validates JWT tokens during the upgrade handshake. Reuses the existing AuthValidator for token validation.

Supports authentication via:

  1. Authorization header (preferred)
  2. URL query parameter 'token' (fallback for WebSocket limitations)

Signature:

createWebSocketAuthHandler(authValidator: AuthValidator): (headers: Headers, url?: string) => Promise<WebSocketAuthContext>

Parameters:

ParameterTypeRequiredDescription
authValidatorAuthValidatorYes- The AuthValidator instance to use for token validation

Returns:

(headers: Headers, url?: string) => Promise<WebSocketAuthContext> - Authentication handler function for WebSocket upgrade

attachWebSocketServer

Attaches a WebSocket server to an existing HTTP server for RxDB replication. Uses RxDB's built-in WebSocket replication server on the same port as Express.

The WebSocket server:

  • Shares the HTTP server port with Express
  • Handles WebSocket upgrade requests at the configured path
  • Authentication is handled by Kong gateway - all connections accepted

Signature:

attachWebSocketServer(options: WebSocketServerOptions): WebSocketServerState

Parameters:

ParameterTypeRequiredDescription
optionsWebSocketServerOptionsYes- Configuration for WebSocket server

Returns:

WebSocketServerState - WebSocket server state with lifecycle methods

Examples:

const httpServer = http.createServer(app)

const wsState = attachWebSocketServer({
  httpServer,
  database,
  path: '/ws/replication'
})

// Later, during shutdown
await wsState.close()

parseCheckpoint

Converts RxDB's checkpoint format to internal Checkpoint format.

Signature:

parseCheckpoint(checkpoint: RxStorageDefaultCheckpoint | undefined): Checkpoint | undefined

Parameters:

ParameterTypeRequiredDescription
checkpointRxStorageDefaultCheckpoint | undefinedYes- RxDB checkpoint (or undefined)

Returns:

Checkpoint \| undefined - Internal checkpoint format

serializeCheckpoint

Converts internal Checkpoint format to RxDB's checkpoint format.

Signature:

serializeCheckpoint(checkpoint: Checkpoint | null): RxStorageDefaultCheckpoint | undefined

Parameters:

ParameterTypeRequiredDescription
checkpointCheckpoint | nullYes- Internal checkpoint

Returns:

RxStorageDefaultCheckpoint \| undefined - RxDB checkpoint format

sanitizeFilename

Sanitizes a document ID to make it safe for use as a filename.

Replaces special characters that are invalid in filenames with safe alternatives:

  • Pipe (|) → double-underscore (__)
  • Forward slash (/) → single-underscore (_)
  • Backslash () → single-underscore (_)
  • Colon (:) → dash (-)
  • Asterisk (*) → dash (-)
  • Question mark (?) → dash (-)
  • Quote (") → dash (-)
  • Less than (<) → dash (-)
  • Greater than (>) → dash (-)
  • Null character → removed

Signature:

sanitizeFilename(documentId: string): string

Parameters:

ParameterTypeRequiredDescription
documentIdstringYes- Document primary key to sanitize

Returns:

string - Sanitized filename-safe string

Examples:

sanitizeFilename('collection|users-0'); // => 'collection__users-0'
sanitizeFilename('path/to/doc'); // => 'path_to_doc'

ensureDirectory

Ensures a directory exists, creating it recursively if needed.

Signature:

ensureDirectory(dirPath: string): void

Parameters:

ParameterTypeRequiredDescription
dirPathstringYes- Path to the directory

readJsonFile

Reads a JSON file from disk.

Signature:

readJsonFile(filePath: string): T

Parameters:

ParameterTypeRequiredDescription
filePathstringYes- Path to the JSON file

Returns:

T - Parsed JSON content

writeJsonFile

Writes a JSON file to disk atomically using a temporary file and rename. This ensures that concurrent readers never see partial writes.

Signature:

writeJsonFile(filePath: string, data: T): void

Parameters:

ParameterTypeRequiredDescription
filePathstringYes- Path to the JSON file
dataTYes- Data to write

Examples:

writeJsonFile('/path/to/doc.json', { id: '123', name: 'Test' });

readDocument

Reads a document from disk including its metadata.

Documents are stored in two files:

  • {id}.json - The document data
  • {id}.meta.json - Document metadata (sequence, lastModified, deleted)

Signature:

readDocument(collectionPath: string, documentId: string): { document: RxDocumentData<RxDocType>; metadata: DocumentMetadata; } | null

Parameters:

ParameterTypeRequiredDescription
collectionPathstringYes- Base path for the collection
documentIdstringYes- Document primary key

Returns:

{ document: RxDocumentData<RxDocType>; metadata: DocumentMetadata; } \| null - Document data and metadata, or null if not found

Examples:

const result = readDocument('/data/users', 'user-123');
if (result) {
  console.log(result.document, result.metadata);
}

writeDocument

Writes a document and its metadata to disk atomically.

Both the document and metadata files are written using atomic operations to ensure consistency.

Signature:

writeDocument(collectionPath: string, documentId: string, document: RxDocumentData<RxDocType>, metadata: DocumentMetadata): void

Parameters:

ParameterTypeRequiredDescription
collectionPathstringYes- Base path for the collection
documentIdstringYes- Document primary key
documentRxDocumentData<RxDocType>Yes- Document data to write
metadataDocumentMetadataYes- Document metadata to write

Examples:

writeDocument(
  '/data/users',
  'user-123',
  { id: 'user-123', name: 'Alice' },
  { sequence: 42, lastModified: new Date().toISOString(), deleted: false }
);

deleteDocument

Deletes a document from disk (both data and metadata files).

Signature:

deleteDocument(collectionPath: string, documentId: string): void

Parameters:

ParameterTypeRequiredDescription
collectionPathstringYes- Base path for the collection
documentIdstringYes- Document primary key

listDocumentIds

Lists all document IDs in a collection directory.

Scans the collection directory and returns all document IDs by reading the actual document data (since filenames are sanitized and may not match the original document IDs for documents with special characters).

Signature:

listDocumentIds(collectionPath: string): string[]

Parameters:

ParameterTypeRequiredDescription
collectionPathstringYes- Base path for the collection

Returns:

string[] - Array of document IDs (original, not sanitized)

Examples:

const ids = listDocumentIds('/data/users');
// ['user-123', 'collection|users-0', ...] // Original IDs, not sanitized

readCollectionMetadata

Reads collection metadata from .meta.json file.

Signature:

readCollectionMetadata(collectionPath: string): CollectionMetadata | null

Parameters:

ParameterTypeRequiredDescription
collectionPathstringYes- Base path for the collection

Returns:

CollectionMetadata \| null - Collection metadata, or null if file doesn't exist

writeCollectionMetadata

Writes collection metadata to .meta.json file.

Signature:

writeCollectionMetadata(collectionPath: string, metadata: CollectionMetadata): void

Parameters:

ParameterTypeRequiredDescription
collectionPathstringYes- Base path for the collection
metadataCollectionMetadataYes- Collection metadata to write

initializeCollectionMetadata

Initializes collection metadata if it doesn't exist.

Signature:

initializeCollectionMetadata(collectionPath: string, collectionName: string, schemaVersion: number): CollectionMetadata

Parameters:

ParameterTypeRequiredDescription
collectionPathstringYes- Base path for the collection
collectionNamestringYes- Name of the collection
schemaVersionnumberYes- Schema version number

Returns:

CollectionMetadata - Initialized or existing metadata

cleanupTempFiles

Cleans up temporary files in a directory. Removes any .tmp.* files that may have been left behind from failed writes.

Signature:

cleanupTempFiles(dirPath: string): void

Parameters:

ParameterTypeRequiredDescription
dirPathstringYes- Directory to clean

removeCollectionDirectory

Removes the entire collection directory and all its contents.

Signature:

removeCollectionDirectory(collectionPath: string): void

Parameters:

ParameterTypeRequiredDescription
collectionPathstringYes- Base path for the collection

getRxStorageFilesystem

Creates a filesystem-based RxDB storage instance.

This is the main factory function for creating the storage plugin. Use this function when configuring RxDB's storage option.

Signature:

getRxStorageFilesystem(options: FilesystemStorageOptions): RxStorage<FilesystemStorageOptions, any>

Parameters:

ParameterTypeRequiredDescription
optionsFilesystemStorageOptionsYes- Storage configuration options

Returns:

RxStorage<FilesystemStorageOptions, any> - RxStorage instance compatible with RxDB

Examples:

### Basic Usage
```typescript
import { getRxStorageFilesystem } from '@epicdm/flowstate-rxdb-server/storage/filesystem';
import { createRxDatabase } from 'rxdb';
import path from 'path';

const storage = getRxStorageFilesystem({
  basePath: path.join(__dirname, 'data')
});

const database = await createRxDatabase({
  name: 'myapp',
  storage,
  multiInstance: false
});

```typescript
### Multi-Instance Mode
```typescript
// Enable multi-instance for cross-process synchronization
const database = await createRxDatabase({
  name: 'myapp',
  storage: getRxStorageFilesystem({
    basePath: './data'
  }),
  multiInstance: true // Enable polling for external changes
});

```typescript
### With Collections
```typescript
const database = await createRxDatabase({
  name: 'myapp',
  storage: getRxStorageFilesystem({ basePath: './data' })
});

await database.addCollections({
  users: {
    schema: {
      version: 0,
      primaryKey: 'id',
      type: 'object',
      properties: {
        id: { type: 'string', maxLength: 100 },
        name: { type: 'string' },
        email: { type: 'string' }
      }
    }
  }
});

// Documents are now stored at: ./data/myapp/users/{id}.json

```typescript
### Querying with Mango
```typescript
// All Mango query operators are supported
const results = await database.users.find({
  selector: {
    $and: [
      { age: { $gte: 18 } },
      { email: { $regex: '@example.com$' } }
    ]
  },
  sort: [{ name: 'asc' }],
  limit: 10,
  skip: 0
}).exec();

```typescript
### Change Streams
```typescript
// Subscribe to real-time changes
database.users.$.subscribe(changeEvent => {
  console.log('Document changed:', changeEvent.documentId);
  console.log('Operation:', changeEvent.operation); // 'INSERT' | 'UPDATE' | 'DELETE'
});

```typescript
### Replication with Checkpoints
```typescript
// The storage supports efficient incremental replication
const replicationState = database.users.syncGraphQL({
  url: 'https://api.example.com/graphql',
  pull: {
    queryBuilder: (checkpoint, limit) => ({
      query: `{
        users(checkpoint: ${JSON.stringify(checkpoint)}, limit: ${limit}) {
          documents { id name email }
          checkpoint { id lwt }
        }
      }`
    })
  }
});

## matchesSelector

Evaluates whether a document matches a Mango query selector.

**Signature:**

```typescript
matchesSelector(document: RxDocumentData<RxDocType>, selector: MangoQuerySelector<RxDocType>): boolean

Parameters:

ParameterTypeRequiredDescription
documentRxDocumentData<RxDocType>Yes- The document to test
selectorMangoQuerySelector<RxDocType>Yes- The Mango query selector

Returns:

boolean - true if document matches selector

Examples:

const matches = matchesSelector(
  { id: '1', age: 25, name: 'Alice' },
  { age: { $gt: 18 }, name: { $regex: '^A' } }
);

sortDocuments

Sorts an array of documents according to a Mango query sort specification.

Signature:

sortDocuments(documents: RxDocumentData<RxDocType>[], sort?: { [key: string]: "asc" | "desc"; }[] | undefined): void

Parameters:

ParameterTypeRequiredDescription
documentsRxDocumentData<RxDocType>[]Yes- Array of documents to sort (sorted in place)
sort{ [key: string]: "asc" | "desc"; }[] | undefinedNo- Sort specification (array of {field: 'asc'|'desc'})

Examples:

const docs = [{ age: 30 }, { age: 20 }];
sortDocuments(docs, [{ age: 'asc' }]);
// docs is now [{ age: 20 }, { age: 30 }]

executeQuery

Executes a full Mango query against an array of documents.

This function:

  1. Filters documents using the selector
  2. Sorts results
  3. Applies skip and limit

Signature:

executeQuery(documents: RxDocumentData<RxDocType>[], query: MangoQuery<RxDocType>): RxDocumentData<RxDocType>[]

Parameters:

ParameterTypeRequiredDescription
documentsRxDocumentData<RxDocType>[]Yes- Array of documents to query
queryMangoQuery<RxDocType>Yes- Mango query object

Returns:

RxDocumentData<RxDocType>[] - Filtered and sorted documents

Examples:

const results = executeQuery(allDocs, {
  selector: { age: { $gt: 18 } },
  sort: [{ name: 'asc' }],
  limit: 10,
  skip: 5
});

countDocuments

Counts documents matching a Mango query selector.

Signature:

countDocuments(documents: RxDocumentData<RxDocType>[], selector: MangoQuerySelector<RxDocType>): number

Parameters:

ParameterTypeRequiredDescription
documentsRxDocumentData<RxDocType>[]Yes- Array of documents to query
selectorMangoQuerySelector<RxDocType>Yes- Mango query selector

Returns:

number - Number of matching documents

Examples:

const count = countDocuments(allDocs, { age: { $gt: 18 } });
Previous
Classes