RxDB Server
Classes
Classes
RemoteTokenManager
Remote authentication token manager
Manages JWT tokens for authenticating with remote RxDB servers. Provides token validation, expiration checking, and automatic refresh capabilities (refresh implementation is application-specific).
Constructor
constructor(initialToken?: string | undefined)
Parameters:
| Parameter | Type | Required | Description |
|---|---|---|---|
initialToken | string | undefined | No | - Initial JWT token to manage (optional) |
Methods
setToken
Sets a new JWT token
Parses the token, extracts expiration time, and validates the structure. Logs a warning if the token is expired or expires soon.
setToken(token: string): void
Parameters:
| Parameter | Type | Required | Description |
|---|---|---|---|
token | string | Yes | - JWT token string |
Examples:
try {
manager.setToken('eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9...')
console.log('Token set successfully')
} catch (error) {
console.error('Invalid token:', error.message)
}
getToken
Gets the current token if valid
getToken(): string | null
Returns:
string \| null - JWT token string, or null if no token is set or token is expired
Examples:
const token = manager.getToken()
if (token) {
// Use token for authentication
fetch(url, {
headers: { 'Authorization': `Bearer ${token}` }
})
} else {
// Token expired or not set - need to refresh
await manager.refreshToken()
}
getPayload
Gets the decoded token payload
getPayload(): JWTPayload | null
Returns:
JWTPayload \| null - Decoded JWT payload, or null if no token is set
Examples:
const payload = manager.getPayload()
if (payload) {
console.log(`Domain: ${payload.domainId}`)
console.log(`Scopes: ${payload.scopes?.join(', ')}`)
console.log(`Expires: ${new Date(payload.exp * 1000).toISOString()}`)
}
isValid
Checks if the current token is valid (set and not expired)
isValid(): boolean
Returns:
boolean - True if token exists and is not expired
Examples:
if (manager.isValid()) {
console.log('Token is valid')
} else {
console.log('Token expired or not set')
await refreshToken()
}
getTimeToExpiry
Gets the number of milliseconds until token expiration
getTimeToExpiry(): number | null
Returns:
number \| null - Milliseconds until expiration, or null if no expiration is set
Examples:
const ttl = manager.getTimeToExpiry()
if (ttl === null) {
console.log('Token has no expiration')
} else if (ttl < 0) {
console.log('Token is expired')
} else {
console.log(`Token expires in ${Math.floor(ttl / 1000 / 60)} minutes`)
}
expiresWithin
Checks if token will expire within the specified time
expiresWithin(milliseconds: number): boolean
Parameters:
| Parameter | Type | Required | Description |
|---|---|---|---|
milliseconds | number | Yes | - Time window to check (in milliseconds) |
Returns:
boolean - True if token will expire within the specified time
Examples:
// Check if token expires in next 5 minutes
if (manager.expiresWithin(5 * 60 * 1000)) {
console.log('Token expires soon - should refresh')
await manager.refreshToken()
}
refreshToken
Refreshes the token using a refresh endpoint
refreshToken(refreshEndpoint: string, refreshToken?: string | undefined): Promise<boolean>
Parameters:
| Parameter | Type | Required | Description |
|---|---|---|---|
refreshEndpoint | string | Yes | - URL of the token refresh endpoint |
refreshToken | string | undefined | No | - Refresh token to use (optional) |
Returns:
Promise<boolean> - Promise resolving to true if refresh succeeded
Examples:
// Custom refresh implementation
class CustomTokenManager extends RemoteTokenManager {
async refreshToken(endpoint: string, refreshToken: string) {
const response = await fetch(endpoint, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ refresh_token: refreshToken })
})
if (response.ok) {
const data = await response.json()
this.setToken(data.access_token)
return true
}
return false
}
}
clearToken
Clears the current token
clearToken(): void
Examples:
// On logout
manager.clearToken()
console.log('Token cleared')
Examples:
const tokenManager = new RemoteTokenManager('<JWT_TOKEN>')
// Check if token is valid
if (tokenManager.isValid()) {
const token = tokenManager.getToken()
// Use token for authentication
} else {
console.error('Token expired or invalid')
// Refresh token or re-authenticate
}
// Get token details
const payload = tokenManager.getPayload()
console.log(`Token expires at: ${new Date(payload.exp * 1000)}`)
AuthValidator
Constructor
constructor(config: AuthValidatorConfig)
Parameters:
| Parameter | Type | Required | Description |
|---|---|---|---|
config | AuthValidatorConfig | Yes |
Methods
validateToken
validateToken(authToken: string): Promise<AuthData>
Parameters:
| Parameter | Type | Required | Description |
|---|---|---|---|
authToken | string | Yes |
Returns:
Promise<AuthData> -
RemoteReplicationClient
Remote RxDB server replication client
Manages WebSocket connections to a remote RxDB server and handles bidirectional synchronization for all configured collections. Supports:
- Live replication (real-time sync via WebSocket)
- Periodic polling (interval-based sync)
- Domain-based filtering (multi-tenant isolation)
- Collection filtering (include/exclude specific collections)
- Automatic retry on connection failure
Methods
setupReplication
Sets up remote replication for all applicable collections
Creates WebSocket replication connections for each collection in the database, filtered by the configuration's include/exclude lists if specified.
setupReplication(database: EpicFlowDatabase, config: RemoteReplicationConfig): Promise<void>
Parameters:
| Parameter | Type | Required | Description |
|---|---|---|---|
database | EpicFlowDatabase | Yes | - Local RxDB database instance |
config | RemoteReplicationConfig | Yes | - Remote replication configuration |
Returns:
Promise<void> -
Examples:
const config = {
url: 'wss://remote.example.com/ws/replication',
authToken: 'jwt-token',
domainId: 'production',
live: true,
collections: ['tasks', 'projects'], // Optional: only these
excludeCollections: ['audit_logs'] // Optional: exclude these
}
await client.setupReplication(database, config)
getReplicationStatus
Gets the current replication status for all collections
getReplicationStatus(): CollectionReplicationStatus[]
Returns:
CollectionReplicationStatus[] - Array of collection replication statuses
Examples:
const statuses = client.getReplicationStatus()
for (const status of statuses) {
console.log(`${status.collectionName}: ${status.pushed} pushed, ${status.pulled} pulled`)
if (status.lastError) {
console.error(` Error: ${status.lastError}`)
}
}
pause
Pauses replication for all collections
Stops active sync but preserves connection state for later resumption.
pause(): Promise<void>
Returns:
Promise<void> -
Examples:
// Pause during maintenance
await client.pause()
// Do maintenance work...
// Resume
await client.resume()
resume
Resumes replication for all collections
Recreates replication connections after a previous {@link pause} call. This is necessary because pause() calls cancel() which terminally stops replication states.
resume(): Promise<void>
Returns:
Promise<void> -
Examples:
await client.resume()
console.log('Replication resumed')
close
Closes all replication connections
Cancels all active replications and cleans up resources. Should be called during application shutdown.
close(): Promise<void>
Returns:
Promise<void> -
Examples:
// During shutdown
process.on('SIGTERM', async () => {
await client.close()
process.exit(0)
})
isInitialized
Checks if replication is initialized
isInitialized(): boolean
Returns:
boolean - True if replication has been set up
Examples:
const client = new RemoteReplicationClient()
const config = {
url: 'wss://remote-server.example.com/ws/replication',
authToken: '<JWT_TOKEN>',
domainId: 'production',
live: true
}
await client.setupReplication(database, config)
// Get status
const status = client.getReplicationStatus()
console.log(`Syncing ${status.length} collections`)
// Later, close connections
await client.close()
Logger
Constructor
constructor(config?: Partial<LoggerConfig>)
Parameters:
| Parameter | Type | Required | Description |
|---|---|---|---|
config | Partial<LoggerConfig> | No |
Methods
error
error(message: string, args?: any[]): void
Parameters:
| Parameter | Type | Required | Description |
|---|---|---|---|
message | string | Yes | |
args | any[] | No |
warn
warn(message: string, args?: any[]): void
Parameters:
| Parameter | Type | Required | Description |
|---|---|---|---|
message | string | Yes | |
args | any[] | No |
info
info(message: string, args?: any[]): void
Parameters:
| Parameter | Type | Required | Description |
|---|---|---|---|
message | string | Yes | |
args | any[] | No |
debug
debug(message: string, args?: any[]): void
Parameters:
| Parameter | Type | Required | Description |
|---|---|---|---|
message | string | Yes | |
args | any[] | No |
child
child(prefix: string): Logger
Parameters:
| Parameter | Type | Required | Description |
|---|---|---|---|
prefix | string | Yes |
Returns:
Logger -
ChangeTracker
Manages change tracking for a storage instance.
Responsibilities:
- Track document changes with sequence numbers
- Emit change events through Observable stream
- Support checkpoint-based pagination for replication
- Generate EventBulk objects for RxDB compatibility
Constructor
constructor(databaseName: string, collectionName: string, primaryPath: string)
Parameters:
| Parameter | Type | Required | Description |
|---|---|---|---|
databaseName | string | Yes | |
collectionName | string | Yes | |
primaryPath | string | Yes |
Properties
| Property | Type | Required | Description |
|---|---|---|---|
changes$ | Observable<EventBulk<RxStorageChangeEvent<RxDocumentData<RxDocType>>, any>> | Yes | Observable stream of change events. |
Methods
recordChange
Records a document change and emits it to subscribers.
recordChange(change: DocumentChange<RxDocType>): void
Parameters:
| Parameter | Type | Required | Description |
|---|---|---|---|
change | DocumentChange<RxDocType> | Yes | - The document change to record |
Examples:
tracker.recordChange({
document: { id: '123', name: 'Alice' },
metadata: { sequence: 42, lastModified: '...', deleted: false },
operation: 'INSERT'
});
recordBatchChanges
Records multiple document changes in a batch and emits a single event bulk.
recordBatchChanges(changes: DocumentChange<RxDocType>[]): void
Parameters:
| Parameter | Type | Required | Description |
|---|---|---|---|
changes | DocumentChange<RxDocType>[] | Yes | - Array of document changes |
getChangedDocumentsSince
Retrieves changes since a checkpoint.
Used by RxDB's replication protocol to fetch incremental updates. Returns changes in ascending sequence order.
getChangedDocumentsSince(checkpoint: Checkpoint | undefined, limit: number, allDocuments: Map<string, { document: RxDocumentData<RxDocType>; metadata: DocumentMetadata; }>): { documents: Array<RxDocumentData<RxDocType>>; checkpoint: Checkpoint | null; }
Parameters:
| Parameter | Type | Required | Description |
|---|---|---|---|
checkpoint | Checkpoint | undefined | Yes | - Starting checkpoint (exclusive) |
limit | number | Yes | - Maximum number of changes to return |
allDocuments | Map<string, { document: RxDocumentData<RxDocType>; metadata: DocumentMetadata; }> | Yes | - All documents in the collection (for full scan if needed) |
Returns:
{ documents: Array<RxDocumentData<RxDocType>>; checkpoint: Checkpoint \| null; } - Array of changes and the new checkpoint
Examples:
const result = tracker.getChangedDocumentsSince(
{ sequence: 100, id: 'doc-100' },
50,
allDocs
);
// Returns up to 50 changes after sequence 100
clearBuffer
Clears the change buffer.
Used during cleanup operations or when buffer needs to be reset.
clearBuffer(): void
close
Completes the change stream and prevents further emissions.
Call this when the storage instance is closed.
close(): void
FilesystemStorageInstance
Filesystem-based RxStorageInstance implementation.
Stores each document as a separate JSON file on disk with atomic writes and proper locking. Supports real-time change streams, checkpoints, and multi-instance coordination.
Constructor
constructor(params: RxStorageInstanceCreationParams<RxDocType, FilesystemStorageOptions>)
Parameters:
| Parameter | Type | Required | Description |
|---|---|---|---|
params | RxStorageInstanceCreationParams<RxDocType, FilesystemStorageOptions> | Yes | - Instance creation parameters from RxDB |
Properties
| Property | Type | Required | Description |
|---|---|---|---|
primaryPath | StringKeys<RxDocumentData<RxDocType>> | Yes | |
schema | any | Yes | |
internals | Readonly<FilesystemStorageOptions> | Yes | |
options | Readonly<FilesystemStorageOptions> | Yes | |
databaseName | string | Yes | |
collectionName | string | Yes |
Methods
bulkWrite
Writes multiple documents atomically with conflict detection.
bulkWrite(documentWrites: BulkWriteRow<RxDocType>[], context: string): Promise<RxStorageBulkWriteResponse<RxDocType>>
Parameters:
| Parameter | Type | Required | Description |
|---|---|---|---|
documentWrites | BulkWriteRow<RxDocType>[] | Yes | - Array of documents to write with previous revisions |
context | string | Yes | - Context string for debugging |
Returns:
Promise<RxStorageBulkWriteResponse<RxDocType>> - Result with successful writes and conflicts
Examples:
const result = await instance.bulkWrite([
{
document: { id: '1', name: 'Alice', _rev: '2-abc', _deleted: false },
previous: { id: '1', name: 'Bob', _rev: '1-xyz', _deleted: false }
}
], 'replication');
findDocumentsById
Finds documents by their primary keys.
findDocumentsById(ids: string[], withDeleted: boolean): Promise<RxDocumentData<RxDocType>[]>
Parameters:
| Parameter | Type | Required | Description |
|---|---|---|---|
ids | string[] | Yes | - Array of primary key values |
withDeleted | boolean | Yes |
Returns:
Promise<RxDocumentData<RxDocType>[]> - Array of document data
Examples:
const docs = await instance.findDocumentsById(['1', '2', '3'], false);
query
Executes a Mango query against the collection.
query(preparedQuery: PreparedQuery<RxDocType>): Promise<RxStorageQueryResult<RxDocType>>
Parameters:
| Parameter | Type | Required | Description |
|---|---|---|---|
preparedQuery | PreparedQuery<RxDocType> | Yes | - The prepared query to execute |
Returns:
Promise<RxStorageQueryResult<RxDocType>> - Query results
Examples:
const results = await instance.query(preparedQuery);
count
Counts documents matching a query selector.
count(preparedQuery: PreparedQuery<RxDocType>): Promise<RxStorageCountResult>
Parameters:
| Parameter | Type | Required | Description |
|---|---|---|---|
preparedQuery | PreparedQuery<RxDocType> | Yes | - The prepared query with selector |
Returns:
Promise<RxStorageCountResult> - Count result
getAttachmentData
Retrieves attachment data for a document.
Note: This implementation stores documents only, not separate attachments. Attachments can be stored as base64 data within documents.
getAttachmentData(documentId: string, attachmentId: string, digest: string): Promise<string>
Parameters:
| Parameter | Type | Required | Description |
|---|---|---|---|
documentId | string | Yes | - Document ID |
attachmentId | string | Yes | - Attachment ID |
digest | string | Yes |
Returns:
Promise<string> - Attachment data as string
getChangedDocumentsSince
Retrieves documents changed since a checkpoint.
Used by replication protocol for incremental sync.
getChangedDocumentsSince(limit: number, checkpoint?: RxStorageDefaultCheckpoint | undefined): Promise<{ documents: Array<RxDocumentData<RxDocType>>; checkpoint: RxStorageDefaultCheckpoint; }>
Parameters:
| Parameter | Type | Required | Description |
|---|---|---|---|
limit | number | Yes | - Maximum number of documents to return |
checkpoint | RxStorageDefaultCheckpoint | undefined | No | - Starting checkpoint (exclusive) |
Returns:
Promise<{ documents: Array<RxDocumentData<RxDocType>>; checkpoint: RxStorageDefaultCheckpoint; }> - Changed documents and new checkpoint
Examples:
const result = await instance.getChangedDocumentsSince(100, checkpoint);
// Get next batch
const next = await instance.getChangedDocumentsSince(100, result.checkpoint);
changeStream
Observable stream of change events.
Emits EventBulk objects whenever documents are modified.
changeStream(): Observable<EventBulk<RxStorageChangeEvent<RxDocType>, any>>
Returns:
Observable<EventBulk<RxStorageChangeEvent<RxDocType>, any>> - Observable of change events
cleanup
Removes deleted documents (tombstones) older than a minimum deleted time.
cleanup(minDeletedTime: number): Promise<boolean>
Parameters:
| Parameter | Type | Required | Description |
|---|---|---|---|
minDeletedTime | number | Yes | - Minimum deleted timestamp (delete older tombstones) |
Returns:
Promise<boolean> - Cleanup result
close
Closes the storage instance and releases resources.
close(): Promise<void>
Returns:
Promise<void> - Cleanup promise
remove
Removes the entire collection from disk.
remove(): Promise<void>
Returns:
Promise<void> - Removal promise
MultiInstanceCoordinator
Manages cross-process communication via the filesystem.
Events are written to .events/ directory and polled by other processes. Each event file contains document IDs and sequence ranges affected.
Constructor
constructor(collectionPath: string)
Parameters:
| Parameter | Type | Required | Description |
|---|---|---|---|
collectionPath | string | Yes |
Methods
broadcastChange
Broadcasts a change event to other processes.
Writes an event file that other processes will discover via polling.
broadcastChange(documentIds: string[], sequenceRange: { start: number; end: number; }): void
Parameters:
| Parameter | Type | Required | Description |
|---|---|---|---|
documentIds | string[] | Yes | - IDs of documents that were modified |
sequenceRange | { start: number; end: number; } | Yes | - Sequence number range affected |
Examples:
coordinator.broadcastChange(
['doc-1', 'doc-2'],
{ start: 100, end: 101 }
);
startPolling
Starts polling for changes from other processes.
startPolling(callback: (documentIds: string[]) => void, intervalMs?: number): void
Parameters:
| Parameter | Type | Required | Description |
|---|---|---|---|
callback | (documentIds: string[]) => void | Yes | - Function to call when external changes are detected |
intervalMs | number | No | - Polling interval in milliseconds (default: 1000ms) |
Examples:
coordinator.startPolling((documentIds) => {
console.log('External changes:', documentIds);
// Reload affected documents from disk
}, 1000);
stopPolling
Stops polling for changes.
stopPolling(): void
close
Cleans up resources.
Call this when the storage instance is closed.
close(): void
FilesystemStorage
Filesystem-based RxStorage implementation.
Stores documents as individual JSON files on disk with support for:
- Atomic writes with file locking
- Real-time change streams
- Checkpoint-based replication
- Multi-instance coordination (cross-process sync)
- In-memory caching with LRU eviction
Constructor
constructor(options: FilesystemStorageOptions)
Parameters:
| Parameter | Type | Required | Description |
|---|---|---|---|
options | FilesystemStorageOptions | Yes | - Filesystem storage options |
Properties
| Property | Type | Required | Description |
|---|---|---|---|
name | "filesystem" | Yes | Storage name identifier. |
rxdbVersion | "16.19.1" | Yes | RxDB version this storage is compatible with. |
options | FilesystemStorageOptions | Yes | Configuration options for this storage instance. |
Methods
createStorageInstance
Creates a storage instance for a collection.
This method is called by RxDB when creating a collection. Each collection gets its own storage instance with isolated data.
createStorageInstance(params: RxStorageInstanceCreationParams<RxDocType, FilesystemStorageOptions>): Promise<FilesystemStorageInstance<RxDocType>>
Parameters:
| Parameter | Type | Required | Description |
|---|---|---|---|
params | RxStorageInstanceCreationParams<RxDocType, FilesystemStorageOptions> | Yes | - Creation parameters from RxDB |
Returns:
Promise<FilesystemStorageInstance<RxDocType>> - Storage instance for the collection
Examples:
// Called internally by RxDB:
const instance = await storage.createStorageInstance({
databaseName: 'mydb',
collectionName: 'users',
schema: userSchema,
options: { basePath: './data' },
multiInstance: true
});