RxDB Server

Classes

Classes

RemoteTokenManager

Remote authentication token manager

Manages JWT tokens for authenticating with remote RxDB servers. Provides token validation, expiration checking, and automatic refresh capabilities (refresh implementation is application-specific).

Constructor

constructor(initialToken?: string | undefined)

Parameters:

ParameterTypeRequiredDescription
initialTokenstring | undefinedNo- Initial JWT token to manage (optional)

Methods

setToken

Sets a new JWT token

Parses the token, extracts expiration time, and validates the structure. Logs a warning if the token is expired or expires soon.

setToken(token: string): void

Parameters:

ParameterTypeRequiredDescription
tokenstringYes- JWT token string

Examples:

try {
  manager.setToken('eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9...')
  console.log('Token set successfully')
} catch (error) {
  console.error('Invalid token:', error.message)
}

getToken

Gets the current token if valid

getToken(): string | null

Returns:

string \| null - JWT token string, or null if no token is set or token is expired

Examples:

const token = manager.getToken()

if (token) {
  // Use token for authentication
  fetch(url, {
    headers: { 'Authorization': `Bearer ${token}` }
  })
} else {
  // Token expired or not set - need to refresh
  await manager.refreshToken()
}

getPayload

Gets the decoded token payload

getPayload(): JWTPayload | null

Returns:

JWTPayload \| null - Decoded JWT payload, or null if no token is set

Examples:

const payload = manager.getPayload()

if (payload) {
  console.log(`Domain: ${payload.domainId}`)
  console.log(`Scopes: ${payload.scopes?.join(', ')}`)
  console.log(`Expires: ${new Date(payload.exp * 1000).toISOString()}`)
}

isValid

Checks if the current token is valid (set and not expired)

isValid(): boolean

Returns:

boolean - True if token exists and is not expired

Examples:

if (manager.isValid()) {
  console.log('Token is valid')
} else {
  console.log('Token expired or not set')
  await refreshToken()
}

getTimeToExpiry

Gets the number of milliseconds until token expiration

getTimeToExpiry(): number | null

Returns:

number \| null - Milliseconds until expiration, or null if no expiration is set

Examples:

const ttl = manager.getTimeToExpiry()

if (ttl === null) {
  console.log('Token has no expiration')
} else if (ttl < 0) {
  console.log('Token is expired')
} else {
  console.log(`Token expires in ${Math.floor(ttl / 1000 / 60)} minutes`)
}

expiresWithin

Checks if token will expire within the specified time

expiresWithin(milliseconds: number): boolean

Parameters:

ParameterTypeRequiredDescription
millisecondsnumberYes- Time window to check (in milliseconds)

Returns:

boolean - True if token will expire within the specified time

Examples:

// Check if token expires in next 5 minutes
if (manager.expiresWithin(5 * 60 * 1000)) {
  console.log('Token expires soon - should refresh')
  await manager.refreshToken()
}

refreshToken

Refreshes the token using a refresh endpoint

refreshToken(refreshEndpoint: string, refreshToken?: string | undefined): Promise<boolean>

Parameters:

ParameterTypeRequiredDescription
refreshEndpointstringYes- URL of the token refresh endpoint
refreshTokenstring | undefinedNo- Refresh token to use (optional)

Returns:

Promise<boolean> - Promise resolving to true if refresh succeeded

Examples:

// Custom refresh implementation
class CustomTokenManager extends RemoteTokenManager {
  async refreshToken(endpoint: string, refreshToken: string) {
    const response = await fetch(endpoint, {
      method: 'POST',
      headers: { 'Content-Type': 'application/json' },
      body: JSON.stringify({ refresh_token: refreshToken })
    })

    if (response.ok) {
      const data = await response.json()
      this.setToken(data.access_token)
      return true
    }

    return false
  }
}

clearToken

Clears the current token

clearToken(): void

Examples:

// On logout
manager.clearToken()
console.log('Token cleared')

Examples:

const tokenManager = new RemoteTokenManager('<JWT_TOKEN>')

// Check if token is valid
if (tokenManager.isValid()) {
  const token = tokenManager.getToken()
  // Use token for authentication
} else {
  console.error('Token expired or invalid')
  // Refresh token or re-authenticate
}

// Get token details
const payload = tokenManager.getPayload()
console.log(`Token expires at: ${new Date(payload.exp * 1000)}`)

AuthValidator

Constructor

constructor(config: AuthValidatorConfig)

Parameters:

ParameterTypeRequiredDescription
configAuthValidatorConfigYes

Methods

validateToken

validateToken(authToken: string): Promise<AuthData>

Parameters:

ParameterTypeRequiredDescription
authTokenstringYes

Returns:

Promise<AuthData> -

RemoteReplicationClient

Remote RxDB server replication client

Manages WebSocket connections to a remote RxDB server and handles bidirectional synchronization for all configured collections. Supports:

  • Live replication (real-time sync via WebSocket)
  • Periodic polling (interval-based sync)
  • Domain-based filtering (multi-tenant isolation)
  • Collection filtering (include/exclude specific collections)
  • Automatic retry on connection failure

Methods

setupReplication

Sets up remote replication for all applicable collections

Creates WebSocket replication connections for each collection in the database, filtered by the configuration's include/exclude lists if specified.

setupReplication(database: EpicFlowDatabase, config: RemoteReplicationConfig): Promise<void>

Parameters:

ParameterTypeRequiredDescription
databaseEpicFlowDatabaseYes- Local RxDB database instance
configRemoteReplicationConfigYes- Remote replication configuration

Returns:

Promise<void> -

Examples:

const config = {
  url: 'wss://remote.example.com/ws/replication',
  authToken: 'jwt-token',
  domainId: 'production',
  live: true,
  collections: ['tasks', 'projects'], // Optional: only these
  excludeCollections: ['audit_logs']   // Optional: exclude these
}

await client.setupReplication(database, config)

getReplicationStatus

Gets the current replication status for all collections

getReplicationStatus(): CollectionReplicationStatus[]

Returns:

CollectionReplicationStatus[] - Array of collection replication statuses

Examples:

const statuses = client.getReplicationStatus()

for (const status of statuses) {
  console.log(`${status.collectionName}: ${status.pushed} pushed, ${status.pulled} pulled`)
  if (status.lastError) {
    console.error(`  Error: ${status.lastError}`)
  }
}

pause

Pauses replication for all collections

Stops active sync but preserves connection state for later resumption.

pause(): Promise<void>

Returns:

Promise<void> -

Examples:

// Pause during maintenance
await client.pause()

// Do maintenance work...

// Resume
await client.resume()

resume

Resumes replication for all collections

Recreates replication connections after a previous {@link pause} call. This is necessary because pause() calls cancel() which terminally stops replication states.

resume(): Promise<void>

Returns:

Promise<void> -

Examples:

await client.resume()
console.log('Replication resumed')

close

Closes all replication connections

Cancels all active replications and cleans up resources. Should be called during application shutdown.

close(): Promise<void>

Returns:

Promise<void> -

Examples:

// During shutdown
process.on('SIGTERM', async () => {
  await client.close()
  process.exit(0)
})

isInitialized

Checks if replication is initialized

isInitialized(): boolean

Returns:

boolean - True if replication has been set up

Examples:

const client = new RemoteReplicationClient()

const config = {
  url: 'wss://remote-server.example.com/ws/replication',
  authToken: '<JWT_TOKEN>',
  domainId: 'production',
  live: true
}

await client.setupReplication(database, config)

// Get status
const status = client.getReplicationStatus()
console.log(`Syncing ${status.length} collections`)

// Later, close connections
await client.close()

Logger

Constructor

constructor(config?: Partial<LoggerConfig>)

Parameters:

ParameterTypeRequiredDescription
configPartial<LoggerConfig>No

Methods

error

error(message: string, args?: any[]): void

Parameters:

ParameterTypeRequiredDescription
messagestringYes
argsany[]No

warn

warn(message: string, args?: any[]): void

Parameters:

ParameterTypeRequiredDescription
messagestringYes
argsany[]No

info

info(message: string, args?: any[]): void

Parameters:

ParameterTypeRequiredDescription
messagestringYes
argsany[]No

debug

debug(message: string, args?: any[]): void

Parameters:

ParameterTypeRequiredDescription
messagestringYes
argsany[]No

child

child(prefix: string): Logger

Parameters:

ParameterTypeRequiredDescription
prefixstringYes

Returns:

Logger -

ChangeTracker

Manages change tracking for a storage instance.

Responsibilities:

  • Track document changes with sequence numbers
  • Emit change events through Observable stream
  • Support checkpoint-based pagination for replication
  • Generate EventBulk objects for RxDB compatibility

Constructor

constructor(databaseName: string, collectionName: string, primaryPath: string)

Parameters:

ParameterTypeRequiredDescription
databaseNamestringYes
collectionNamestringYes
primaryPathstringYes

Properties

PropertyTypeRequiredDescription
changes$Observable<EventBulk<RxStorageChangeEvent<RxDocumentData<RxDocType>>, any>>YesObservable stream of change events.

Methods

recordChange

Records a document change and emits it to subscribers.

recordChange(change: DocumentChange<RxDocType>): void

Parameters:

ParameterTypeRequiredDescription
changeDocumentChange<RxDocType>Yes- The document change to record

Examples:

tracker.recordChange({
  document: { id: '123', name: 'Alice' },
  metadata: { sequence: 42, lastModified: '...', deleted: false },
  operation: 'INSERT'
});

recordBatchChanges

Records multiple document changes in a batch and emits a single event bulk.

recordBatchChanges(changes: DocumentChange<RxDocType>[]): void

Parameters:

ParameterTypeRequiredDescription
changesDocumentChange<RxDocType>[]Yes- Array of document changes

getChangedDocumentsSince

Retrieves changes since a checkpoint.

Used by RxDB's replication protocol to fetch incremental updates. Returns changes in ascending sequence order.

getChangedDocumentsSince(checkpoint: Checkpoint | undefined, limit: number, allDocuments: Map<string, { document: RxDocumentData<RxDocType>; metadata: DocumentMetadata; }>): { documents: Array<RxDocumentData<RxDocType>>; checkpoint: Checkpoint | null; }

Parameters:

ParameterTypeRequiredDescription
checkpointCheckpoint | undefinedYes- Starting checkpoint (exclusive)
limitnumberYes- Maximum number of changes to return
allDocumentsMap<string, { document: RxDocumentData<RxDocType>; metadata: DocumentMetadata; }>Yes- All documents in the collection (for full scan if needed)

Returns:

{ documents: Array<RxDocumentData<RxDocType>>; checkpoint: Checkpoint \| null; } - Array of changes and the new checkpoint

Examples:

const result = tracker.getChangedDocumentsSince(
  { sequence: 100, id: 'doc-100' },
  50,
  allDocs
);
// Returns up to 50 changes after sequence 100

clearBuffer

Clears the change buffer.

Used during cleanup operations or when buffer needs to be reset.

clearBuffer(): void

close

Completes the change stream and prevents further emissions.

Call this when the storage instance is closed.

close(): void

FilesystemStorageInstance

Filesystem-based RxStorageInstance implementation.

Stores each document as a separate JSON file on disk with atomic writes and proper locking. Supports real-time change streams, checkpoints, and multi-instance coordination.

Constructor

constructor(params: RxStorageInstanceCreationParams<RxDocType, FilesystemStorageOptions>)

Parameters:

ParameterTypeRequiredDescription
paramsRxStorageInstanceCreationParams<RxDocType, FilesystemStorageOptions>Yes- Instance creation parameters from RxDB

Properties

PropertyTypeRequiredDescription
primaryPathStringKeys<RxDocumentData<RxDocType>>Yes
schemaanyYes
internalsReadonly<FilesystemStorageOptions>Yes
optionsReadonly<FilesystemStorageOptions>Yes
databaseNamestringYes
collectionNamestringYes

Methods

bulkWrite

Writes multiple documents atomically with conflict detection.

bulkWrite(documentWrites: BulkWriteRow<RxDocType>[], context: string): Promise<RxStorageBulkWriteResponse<RxDocType>>

Parameters:

ParameterTypeRequiredDescription
documentWritesBulkWriteRow<RxDocType>[]Yes- Array of documents to write with previous revisions
contextstringYes- Context string for debugging

Returns:

Promise<RxStorageBulkWriteResponse<RxDocType>> - Result with successful writes and conflicts

Examples:

const result = await instance.bulkWrite([
  {
    document: { id: '1', name: 'Alice', _rev: '2-abc', _deleted: false },
    previous: { id: '1', name: 'Bob', _rev: '1-xyz', _deleted: false }
  }
], 'replication');

findDocumentsById

Finds documents by their primary keys.

findDocumentsById(ids: string[], withDeleted: boolean): Promise<RxDocumentData<RxDocType>[]>

Parameters:

ParameterTypeRequiredDescription
idsstring[]Yes- Array of primary key values
withDeletedbooleanYes

Returns:

Promise<RxDocumentData<RxDocType>[]> - Array of document data

Examples:

const docs = await instance.findDocumentsById(['1', '2', '3'], false);

query

Executes a Mango query against the collection.

query(preparedQuery: PreparedQuery<RxDocType>): Promise<RxStorageQueryResult<RxDocType>>

Parameters:

ParameterTypeRequiredDescription
preparedQueryPreparedQuery<RxDocType>Yes- The prepared query to execute

Returns:

Promise<RxStorageQueryResult<RxDocType>> - Query results

Examples:

const results = await instance.query(preparedQuery);

count

Counts documents matching a query selector.

count(preparedQuery: PreparedQuery<RxDocType>): Promise<RxStorageCountResult>

Parameters:

ParameterTypeRequiredDescription
preparedQueryPreparedQuery<RxDocType>Yes- The prepared query with selector

Returns:

Promise<RxStorageCountResult> - Count result

getAttachmentData

Retrieves attachment data for a document.

Note: This implementation stores documents only, not separate attachments. Attachments can be stored as base64 data within documents.

getAttachmentData(documentId: string, attachmentId: string, digest: string): Promise<string>

Parameters:

ParameterTypeRequiredDescription
documentIdstringYes- Document ID
attachmentIdstringYes- Attachment ID
digeststringYes

Returns:

Promise<string> - Attachment data as string

getChangedDocumentsSince

Retrieves documents changed since a checkpoint.

Used by replication protocol for incremental sync.

getChangedDocumentsSince(limit: number, checkpoint?: RxStorageDefaultCheckpoint | undefined): Promise<{ documents: Array<RxDocumentData<RxDocType>>; checkpoint: RxStorageDefaultCheckpoint; }>

Parameters:

ParameterTypeRequiredDescription
limitnumberYes- Maximum number of documents to return
checkpointRxStorageDefaultCheckpoint | undefinedNo- Starting checkpoint (exclusive)

Returns:

Promise<{ documents: Array<RxDocumentData<RxDocType>>; checkpoint: RxStorageDefaultCheckpoint; }> - Changed documents and new checkpoint

Examples:

const result = await instance.getChangedDocumentsSince(100, checkpoint);
// Get next batch
const next = await instance.getChangedDocumentsSince(100, result.checkpoint);

changeStream

Observable stream of change events.

Emits EventBulk objects whenever documents are modified.

changeStream(): Observable<EventBulk<RxStorageChangeEvent<RxDocType>, any>>

Returns:

Observable<EventBulk<RxStorageChangeEvent<RxDocType>, any>> - Observable of change events

cleanup

Removes deleted documents (tombstones) older than a minimum deleted time.

cleanup(minDeletedTime: number): Promise<boolean>

Parameters:

ParameterTypeRequiredDescription
minDeletedTimenumberYes- Minimum deleted timestamp (delete older tombstones)

Returns:

Promise<boolean> - Cleanup result

close

Closes the storage instance and releases resources.

close(): Promise<void>

Returns:

Promise<void> - Cleanup promise

remove

Removes the entire collection from disk.

remove(): Promise<void>

Returns:

Promise<void> - Removal promise

MultiInstanceCoordinator

Manages cross-process communication via the filesystem.

Events are written to .events/ directory and polled by other processes. Each event file contains document IDs and sequence ranges affected.

Constructor

constructor(collectionPath: string)

Parameters:

ParameterTypeRequiredDescription
collectionPathstringYes

Methods

broadcastChange

Broadcasts a change event to other processes.

Writes an event file that other processes will discover via polling.

broadcastChange(documentIds: string[], sequenceRange: { start: number; end: number; }): void

Parameters:

ParameterTypeRequiredDescription
documentIdsstring[]Yes- IDs of documents that were modified
sequenceRange{ start: number; end: number; }Yes- Sequence number range affected

Examples:

coordinator.broadcastChange(
  ['doc-1', 'doc-2'],
  { start: 100, end: 101 }
);

startPolling

Starts polling for changes from other processes.

startPolling(callback: (documentIds: string[]) => void, intervalMs?: number): void

Parameters:

ParameterTypeRequiredDescription
callback(documentIds: string[]) => voidYes- Function to call when external changes are detected
intervalMsnumberNo- Polling interval in milliseconds (default: 1000ms)

Examples:

coordinator.startPolling((documentIds) => {
  console.log('External changes:', documentIds);
  // Reload affected documents from disk
}, 1000);

stopPolling

Stops polling for changes.

stopPolling(): void

close

Cleans up resources.

Call this when the storage instance is closed.

close(): void

FilesystemStorage

Filesystem-based RxStorage implementation.

Stores documents as individual JSON files on disk with support for:

  • Atomic writes with file locking
  • Real-time change streams
  • Checkpoint-based replication
  • Multi-instance coordination (cross-process sync)
  • In-memory caching with LRU eviction

Constructor

constructor(options: FilesystemStorageOptions)

Parameters:

ParameterTypeRequiredDescription
optionsFilesystemStorageOptionsYes- Filesystem storage options

Properties

PropertyTypeRequiredDescription
name"filesystem"YesStorage name identifier.
rxdbVersion"16.19.1"YesRxDB version this storage is compatible with.
optionsFilesystemStorageOptionsYesConfiguration options for this storage instance.

Methods

createStorageInstance

Creates a storage instance for a collection.

This method is called by RxDB when creating a collection. Each collection gets its own storage instance with isolated data.

createStorageInstance(params: RxStorageInstanceCreationParams<RxDocType, FilesystemStorageOptions>): Promise<FilesystemStorageInstance<RxDocType>>

Parameters:

ParameterTypeRequiredDescription
paramsRxStorageInstanceCreationParams<RxDocType, FilesystemStorageOptions>Yes- Creation parameters from RxDB

Returns:

Promise<FilesystemStorageInstance<RxDocType>> - Storage instance for the collection

Examples:

// Called internally by RxDB:
const instance = await storage.createStorageInstance({
  databaseName: 'mydb',
  collectionName: 'users',
  schema: userSchema,
  options: { basePath: './data' },
  multiInstance: true
});
Previous
Types