RxDB Server
Functions
Functions
createDatabase
Creates and initializes the Epic Flow RxDB database with all collections
Signature:
createDatabase(databaseName: string): Promise<EpicFlowDatabase>
Parameters:
| Parameter | Type | Required | Description |
|---|---|---|---|
databaseName | string | Yes |
Returns:
Promise<EpicFlowDatabase> -
startReplicationClient
Start WebSocket replication client to pull from remote server Uses RxDB's built-in WebSocket replication
Signature:
startReplicationClient(options: ReplicationClientOptions): Promise<void>
Parameters:
| Parameter | Type | Required | Description |
|---|---|---|---|
options | ReplicationClientOptions | Yes |
Returns:
Promise<void> -
createAuthContext
Creates auth context from request headers Extracts domainId from x-domain-id header (set by Kong gateway). Throws MissingDomainIdError if header is missing or has placeholder value. Kong handles authentication - this server extracts domain context for isolation.
Signature:
createAuthContext(headers: Record<string, string | string[] | undefined>): { data: { type: "service"; serviceAccountId: string; domainId: string; scopes: string[]; }; validUntil: number; }
Parameters:
| Parameter | Type | Required | Description |
|---|---|---|---|
headers | Record<string, string | string[] | undefined> | Yes |
Returns:
{ data: { type: "service"; serviceAccountId: string; domainId: string; scopes: string[]; }; validUntil: number; } -
createServer
Creates and configures the RxDB server with Express Authentication is handled by Kong gateway - this server extracts domain context for isolation
Signature:
createServer(database: EpicFlowDatabase): Promise<ServerInstance>
Parameters:
| Parameter | Type | Required | Description |
|---|---|---|---|
database | EpicFlowDatabase | Yes |
Returns:
Promise<ServerInstance> -
startServerAndAttachWebSocket
Start the server and attach WebSocket after endpoints are configured Must be called AFTER setupReplicationEndpoints and setupRestEndpoints Authentication is handled by Kong gateway - WebSocket server trusts all connections
Signature:
startServerAndAttachWebSocket(rxServer: any, database: EpicFlowDatabase): Promise<{ httpServer: any; wsServer: WebSocketServerState; }>
Parameters:
| Parameter | Type | Required | Description |
|---|---|---|---|
rxServer | any | Yes | |
database | EpicFlowDatabase | Yes |
Returns:
Promise<{ httpServer: any; wsServer: WebSocketServerState; }> -
setupReplicationEndpoints
Setup replication endpoints for all collections Note: Auth collections are NOT included in rxdb-server - they're handled by the auth system separately
Signature:
setupReplicationEndpoints(server: any, database: EpicFlowDatabase): void
Parameters:
| Parameter | Type | Required | Description |
|---|---|---|---|
server | any | Yes | |
database | EpicFlowDatabase | Yes |
setupRestEndpoints
Setup REST endpoints for all db-collections Provides CRUD operations via REST API instead of replication Note: Auth collections are NOT included in rxdb-server - they're handled by the auth system separately
Signature:
setupRestEndpoints(server: any, database: EpicFlowDatabase): Promise<void>
Parameters:
| Parameter | Type | Required | Description |
|---|---|---|---|
server | any | Yes | |
database | EpicFlowDatabase | Yes |
Returns:
Promise<void> -
setupReplicationConfigEndpoints
Sets up replication configuration API endpoints
Adds REST endpoints for monitoring and controlling remote replication:
- GET /api/replication/status - Get current replication status
- POST /api/replication/pause - Pause all replication
- POST /api/replication/resume - Resume all replication
Signature:
setupReplicationConfigEndpoints(app: any, remoteClient: RemoteReplicationClient | null): void
Parameters:
| Parameter | Type | Required | Description |
|---|---|---|---|
app | any | Yes | - Express application instance |
remoteClient | RemoteReplicationClient | null | Yes | - Remote replication client instance |
Examples:
const app = express()
const remoteClient = new RemoteReplicationClient()
// Setup replication API
setupReplicationConfigEndpoints(app, remoteClient)
// Now endpoints are available:
// GET http://localhost:3002/api/replication/status
// POST http://localhost:3002/api/replication/pause
// POST http://localhost:3002/api/replication/resume
setupAuthEndpoints
Setup auth REST endpoints for token management
Signature:
setupAuthEndpoints(server: any, storage: AuthStorageAdapter, tokenManager: TokenManager, mailService: MailService): void
Parameters:
| Parameter | Type | Required | Description |
|---|---|---|---|
server | any | Yes | |
storage | AuthStorageAdapter | Yes | |
tokenManager | TokenManager | Yes | |
mailService | MailService | Yes |
loadRemoteReplicationConfig
Loads remote replication configuration from environment variables
Reads all configuration from process.env and returns a typed config object. Validates required fields and provides defaults for optional fields.
Signature:
loadRemoteReplicationConfig(): RemoteReplicationConfig
Returns:
RemoteReplicationConfig - Remote replication configuration
Examples:
const config = loadRemoteReplicationConfig()
if (config.enabled) {
console.log(`Connecting to ${config.url}`)
// ... setup replication
}
createWebSocketAuthHandler
Creates a WebSocket authentication handler that validates JWT tokens during the upgrade handshake. Reuses the existing AuthValidator for token validation.
Supports authentication via:
- Authorization header (preferred)
- URL query parameter 'token' (fallback for WebSocket limitations)
Signature:
createWebSocketAuthHandler(authValidator: AuthValidator): (headers: Headers, url?: string) => Promise<WebSocketAuthContext>
Parameters:
| Parameter | Type | Required | Description |
|---|---|---|---|
authValidator | AuthValidator | Yes | - The AuthValidator instance to use for token validation |
Returns:
(headers: Headers, url?: string) => Promise<WebSocketAuthContext> - Authentication handler function for WebSocket upgrade
attachWebSocketServer
Attaches a WebSocket server to an existing HTTP server for RxDB replication. Uses RxDB's built-in WebSocket replication server on the same port as Express.
The WebSocket server:
- Shares the HTTP server port with Express
- Handles WebSocket upgrade requests at the configured path
- Authentication is handled by Kong gateway - all connections accepted
Signature:
attachWebSocketServer(options: WebSocketServerOptions): WebSocketServerState
Parameters:
| Parameter | Type | Required | Description |
|---|---|---|---|
options | WebSocketServerOptions | Yes | - Configuration for WebSocket server |
Returns:
WebSocketServerState - WebSocket server state with lifecycle methods
Examples:
const httpServer = http.createServer(app)
const wsState = attachWebSocketServer({
httpServer,
database,
path: '/ws/replication'
})
// Later, during shutdown
await wsState.close()
parseCheckpoint
Converts RxDB's checkpoint format to internal Checkpoint format.
Signature:
parseCheckpoint(checkpoint: RxStorageDefaultCheckpoint | undefined): Checkpoint | undefined
Parameters:
| Parameter | Type | Required | Description |
|---|---|---|---|
checkpoint | RxStorageDefaultCheckpoint | undefined | Yes | - RxDB checkpoint (or undefined) |
Returns:
Checkpoint \| undefined - Internal checkpoint format
serializeCheckpoint
Converts internal Checkpoint format to RxDB's checkpoint format.
Signature:
serializeCheckpoint(checkpoint: Checkpoint | null): RxStorageDefaultCheckpoint | undefined
Parameters:
| Parameter | Type | Required | Description |
|---|---|---|---|
checkpoint | Checkpoint | null | Yes | - Internal checkpoint |
Returns:
RxStorageDefaultCheckpoint \| undefined - RxDB checkpoint format
sanitizeFilename
Sanitizes a document ID to make it safe for use as a filename.
Replaces special characters that are invalid in filenames with safe alternatives:
- Pipe (|) → double-underscore (__)
- Forward slash (/) → single-underscore (_)
- Backslash () → single-underscore (_)
- Colon (:) → dash (-)
- Asterisk (*) → dash (-)
- Question mark (?) → dash (-)
- Quote (") → dash (-)
- Less than (<) → dash (-)
- Greater than (>) → dash (-)
- Null character → removed
Signature:
sanitizeFilename(documentId: string): string
Parameters:
| Parameter | Type | Required | Description |
|---|---|---|---|
documentId | string | Yes | - Document primary key to sanitize |
Returns:
string - Sanitized filename-safe string
Examples:
sanitizeFilename('collection|users-0'); // => 'collection__users-0'
sanitizeFilename('path/to/doc'); // => 'path_to_doc'
ensureDirectory
Ensures a directory exists, creating it recursively if needed.
Signature:
ensureDirectory(dirPath: string): void
Parameters:
| Parameter | Type | Required | Description |
|---|---|---|---|
dirPath | string | Yes | - Path to the directory |
readJsonFile
Reads a JSON file from disk.
Signature:
readJsonFile(filePath: string): T
Parameters:
| Parameter | Type | Required | Description |
|---|---|---|---|
filePath | string | Yes | - Path to the JSON file |
Returns:
T - Parsed JSON content
writeJsonFile
Writes a JSON file to disk atomically using a temporary file and rename. This ensures that concurrent readers never see partial writes.
Signature:
writeJsonFile(filePath: string, data: T): void
Parameters:
| Parameter | Type | Required | Description |
|---|---|---|---|
filePath | string | Yes | - Path to the JSON file |
data | T | Yes | - Data to write |
Examples:
writeJsonFile('/path/to/doc.json', { id: '123', name: 'Test' });
readDocument
Reads a document from disk including its metadata.
Documents are stored in two files:
{id}.json- The document data{id}.meta.json- Document metadata (sequence, lastModified, deleted)
Signature:
readDocument(collectionPath: string, documentId: string): { document: RxDocumentData<RxDocType>; metadata: DocumentMetadata; } | null
Parameters:
| Parameter | Type | Required | Description |
|---|---|---|---|
collectionPath | string | Yes | - Base path for the collection |
documentId | string | Yes | - Document primary key |
Returns:
{ document: RxDocumentData<RxDocType>; metadata: DocumentMetadata; } \| null - Document data and metadata, or null if not found
Examples:
const result = readDocument('/data/users', 'user-123');
if (result) {
console.log(result.document, result.metadata);
}
writeDocument
Writes a document and its metadata to disk atomically.
Both the document and metadata files are written using atomic operations to ensure consistency.
Signature:
writeDocument(collectionPath: string, documentId: string, document: RxDocumentData<RxDocType>, metadata: DocumentMetadata): void
Parameters:
| Parameter | Type | Required | Description |
|---|---|---|---|
collectionPath | string | Yes | - Base path for the collection |
documentId | string | Yes | - Document primary key |
document | RxDocumentData<RxDocType> | Yes | - Document data to write |
metadata | DocumentMetadata | Yes | - Document metadata to write |
Examples:
writeDocument(
'/data/users',
'user-123',
{ id: 'user-123', name: 'Alice' },
{ sequence: 42, lastModified: new Date().toISOString(), deleted: false }
);
deleteDocument
Deletes a document from disk (both data and metadata files).
Signature:
deleteDocument(collectionPath: string, documentId: string): void
Parameters:
| Parameter | Type | Required | Description |
|---|---|---|---|
collectionPath | string | Yes | - Base path for the collection |
documentId | string | Yes | - Document primary key |
listDocumentIds
Lists all document IDs in a collection directory.
Scans the collection directory and returns all document IDs by reading the actual document data (since filenames are sanitized and may not match the original document IDs for documents with special characters).
Signature:
listDocumentIds(collectionPath: string): string[]
Parameters:
| Parameter | Type | Required | Description |
|---|---|---|---|
collectionPath | string | Yes | - Base path for the collection |
Returns:
string[] - Array of document IDs (original, not sanitized)
Examples:
const ids = listDocumentIds('/data/users');
// ['user-123', 'collection|users-0', ...] // Original IDs, not sanitized
readCollectionMetadata
Reads collection metadata from .meta.json file.
Signature:
readCollectionMetadata(collectionPath: string): CollectionMetadata | null
Parameters:
| Parameter | Type | Required | Description |
|---|---|---|---|
collectionPath | string | Yes | - Base path for the collection |
Returns:
CollectionMetadata \| null - Collection metadata, or null if file doesn't exist
writeCollectionMetadata
Writes collection metadata to .meta.json file.
Signature:
writeCollectionMetadata(collectionPath: string, metadata: CollectionMetadata): void
Parameters:
| Parameter | Type | Required | Description |
|---|---|---|---|
collectionPath | string | Yes | - Base path for the collection |
metadata | CollectionMetadata | Yes | - Collection metadata to write |
initializeCollectionMetadata
Initializes collection metadata if it doesn't exist.
Signature:
initializeCollectionMetadata(collectionPath: string, collectionName: string, schemaVersion: number): CollectionMetadata
Parameters:
| Parameter | Type | Required | Description |
|---|---|---|---|
collectionPath | string | Yes | - Base path for the collection |
collectionName | string | Yes | - Name of the collection |
schemaVersion | number | Yes | - Schema version number |
Returns:
CollectionMetadata - Initialized or existing metadata
cleanupTempFiles
Cleans up temporary files in a directory. Removes any .tmp.* files that may have been left behind from failed writes.
Signature:
cleanupTempFiles(dirPath: string): void
Parameters:
| Parameter | Type | Required | Description |
|---|---|---|---|
dirPath | string | Yes | - Directory to clean |
removeCollectionDirectory
Removes the entire collection directory and all its contents.
Signature:
removeCollectionDirectory(collectionPath: string): void
Parameters:
| Parameter | Type | Required | Description |
|---|---|---|---|
collectionPath | string | Yes | - Base path for the collection |
getRxStorageFilesystem
Creates a filesystem-based RxDB storage instance.
This is the main factory function for creating the storage plugin. Use this function when configuring RxDB's storage option.
Signature:
getRxStorageFilesystem(options: FilesystemStorageOptions): RxStorage<FilesystemStorageOptions, any>
Parameters:
| Parameter | Type | Required | Description |
|---|---|---|---|
options | FilesystemStorageOptions | Yes | - Storage configuration options |
Returns:
RxStorage<FilesystemStorageOptions, any> - RxStorage instance compatible with RxDB
Examples:
### Basic Usage
```typescript
import { getRxStorageFilesystem } from '@epicdm/flowstate-rxdb-server/storage/filesystem';
import { createRxDatabase } from 'rxdb';
import path from 'path';
const storage = getRxStorageFilesystem({
basePath: path.join(__dirname, 'data')
});
const database = await createRxDatabase({
name: 'myapp',
storage,
multiInstance: false
});
```typescript
### Multi-Instance Mode
```typescript
// Enable multi-instance for cross-process synchronization
const database = await createRxDatabase({
name: 'myapp',
storage: getRxStorageFilesystem({
basePath: './data'
}),
multiInstance: true // Enable polling for external changes
});
```typescript
### With Collections
```typescript
const database = await createRxDatabase({
name: 'myapp',
storage: getRxStorageFilesystem({ basePath: './data' })
});
await database.addCollections({
users: {
schema: {
version: 0,
primaryKey: 'id',
type: 'object',
properties: {
id: { type: 'string', maxLength: 100 },
name: { type: 'string' },
email: { type: 'string' }
}
}
}
});
// Documents are now stored at: ./data/myapp/users/{id}.json
```typescript
### Querying with Mango
```typescript
// All Mango query operators are supported
const results = await database.users.find({
selector: {
$and: [
{ age: { $gte: 18 } },
{ email: { $regex: '@example.com$' } }
]
},
sort: [{ name: 'asc' }],
limit: 10,
skip: 0
}).exec();
```typescript
### Change Streams
```typescript
// Subscribe to real-time changes
database.users.$.subscribe(changeEvent => {
console.log('Document changed:', changeEvent.documentId);
console.log('Operation:', changeEvent.operation); // 'INSERT' | 'UPDATE' | 'DELETE'
});
```typescript
### Replication with Checkpoints
```typescript
// The storage supports efficient incremental replication
const replicationState = database.users.syncGraphQL({
url: 'https://api.example.com/graphql',
pull: {
queryBuilder: (checkpoint, limit) => ({
query: `{
users(checkpoint: ${JSON.stringify(checkpoint)}, limit: ${limit}) {
documents { id name email }
checkpoint { id lwt }
}
}`
})
}
});
## matchesSelector
Evaluates whether a document matches a Mango query selector.
**Signature:**
```typescript
matchesSelector(document: RxDocumentData<RxDocType>, selector: MangoQuerySelector<RxDocType>): boolean
Parameters:
| Parameter | Type | Required | Description |
|---|---|---|---|
document | RxDocumentData<RxDocType> | Yes | - The document to test |
selector | MangoQuerySelector<RxDocType> | Yes | - The Mango query selector |
Returns:
boolean - true if document matches selector
Examples:
const matches = matchesSelector(
{ id: '1', age: 25, name: 'Alice' },
{ age: { $gt: 18 }, name: { $regex: '^A' } }
);
sortDocuments
Sorts an array of documents according to a Mango query sort specification.
Signature:
sortDocuments(documents: RxDocumentData<RxDocType>[], sort?: { [key: string]: "asc" | "desc"; }[] | undefined): void
Parameters:
| Parameter | Type | Required | Description |
|---|---|---|---|
documents | RxDocumentData<RxDocType>[] | Yes | - Array of documents to sort (sorted in place) |
sort | { [key: string]: "asc" | "desc"; }[] | undefined | No | - Sort specification (array of {field: 'asc'|'desc'}) |
Examples:
const docs = [{ age: 30 }, { age: 20 }];
sortDocuments(docs, [{ age: 'asc' }]);
// docs is now [{ age: 20 }, { age: 30 }]
executeQuery
Executes a full Mango query against an array of documents.
This function:
- Filters documents using the selector
- Sorts results
- Applies skip and limit
Signature:
executeQuery(documents: RxDocumentData<RxDocType>[], query: MangoQuery<RxDocType>): RxDocumentData<RxDocType>[]
Parameters:
| Parameter | Type | Required | Description |
|---|---|---|---|
documents | RxDocumentData<RxDocType>[] | Yes | - Array of documents to query |
query | MangoQuery<RxDocType> | Yes | - Mango query object |
Returns:
RxDocumentData<RxDocType>[] - Filtered and sorted documents
Examples:
const results = executeQuery(allDocs, {
selector: { age: { $gt: 18 } },
sort: [{ name: 'asc' }],
limit: 10,
skip: 5
});
countDocuments
Counts documents matching a Mango query selector.
Signature:
countDocuments(documents: RxDocumentData<RxDocType>[], selector: MangoQuerySelector<RxDocType>): number
Parameters:
| Parameter | Type | Required | Description |
|---|---|---|---|
documents | RxDocumentData<RxDocType>[] | Yes | - Array of documents to query |
selector | MangoQuerySelector<RxDocType> | Yes | - Mango query selector |
Returns:
number - Number of matching documents
Examples:
const count = countDocuments(allDocs, { age: { $gt: 18 } });