Documentation

Watch
in package
implements Executable, CommandSubscriber

Operation for creating a change stream with the aggregate command.

Note: the implementation of CommandSubscriber is an internal implementation detail and should not be considered part of the public API.

Tags
see
Collection::watch()
see
https://docs.mongodb.com/manual/changeStreams/

Interfaces, Classes, Traits and Enums

Executable
CommandSubscriber

Table of Contents

FULL_DOCUMENT_DEFAULT  = 'default'
FULL_DOCUMENT_UPDATE_LOOKUP  = 'updateLookup'
$aggregate  : Aggregate
$aggregateOptions  : array<string|int, mixed>
$changeStreamOptions  : array<string|int, mixed>
$collectionName  : string|null
$databaseName  : string
$firstBatchSize  : int|null
$hasResumed  : bool
$manager  : Manager
$operationTime  : TimestampInterface
$pipeline  : array<string|int, mixed>
$postBatchResumeToken  : object|null
$wireVersionForStartAtOperationTime  : int
__construct()  : mixed
Constructs an aggregate command for creating a change stream.
execute()  : ChangeStream
Execute the operation.
createAggregate()  : Aggregate
Create the aggregate command for a change stream.
createChangeStreamIterator()  : ChangeStreamIterator
Create a ChangeStreamIterator by executing the aggregate command.
executeAggregate()  : Cursor
Execute the aggregate command.
getInitialResumeToken()  : array<string|int, mixed>|object|null
Return the initial resume token for creating the ChangeStreamIterator.
resume()  : ChangeStreamIterator
Resumes a change stream.
shouldCaptureOperationTime()  : bool
Determine whether to capture operation time from an aggregate response.

Constants

FULL_DOCUMENT_DEFAULT

public mixed FULL_DOCUMENT_DEFAULT = 'default'

FULL_DOCUMENT_UPDATE_LOOKUP

public mixed FULL_DOCUMENT_UPDATE_LOOKUP = 'updateLookup'

Properties

$aggregateOptions

private array<string|int, mixed> $aggregateOptions

$changeStreamOptions

private array<string|int, mixed> $changeStreamOptions

$collectionName

private string|null $collectionName

$databaseName

private string $databaseName

$firstBatchSize

private int|null $firstBatchSize

$hasResumed

private bool $hasResumed = false

$manager

private Manager $manager

$operationTime

private TimestampInterface $operationTime

$pipeline

private array<string|int, mixed> $pipeline

$postBatchResumeToken

private object|null $postBatchResumeToken

$wireVersionForStartAtOperationTime

private static int $wireVersionForStartAtOperationTime = 7

Methods

__construct()

Constructs an aggregate command for creating a change stream.

public __construct(Manager $manager, string|null $databaseName, string|null $collectionName, array<string|int, mixed> $pipeline[, array<string|int, mixed> $options = [] ]) : mixed

Supported options:

  • batchSize (integer): The number of documents to return per batch.

  • collation (document): Specifies a collation.

  • fullDocument (string): Determines whether the "fullDocument" field will be populated for update operations. By default, change streams only return the delta of fields during the update operation (via the "updateDescription" field). To additionally return the most current majority-committed version of the updated document, specify "updateLookup" for this option. Defaults to "default".

    Insert and replace operations always include the "fullDocument" field and delete operations omit the field as the document no longer exists.

  • maxAwaitTimeMS (integer): The maximum amount of time for the server to wait on new documents to satisfy a change stream query.

  • readConcern (MongoDB\Driver\ReadConcern): Read concern.

  • readPreference (MongoDB\Driver\ReadPreference): Read preference. This will be used to select a new server when resuming. Defaults to a "primary" read preference.

  • resumeAfter (document): Specifies the logical starting point for the new change stream.

    Using this option in conjunction with "startAfter" and/or "startAtOperationTime" will result in a server error. The options are mutually exclusive.

  • session (MongoDB\Driver\Session): Client session.

  • startAfter (document): Specifies the logical starting point for the new change stream. Unlike "resumeAfter", this option can be used with a resume token from an "invalidate" event.

    Using this option in conjunction with "resumeAfter" and/or "startAtOperationTime" will result in a server error. The options are mutually exclusive.

  • startAtOperationTime (MongoDB\BSON\TimestampInterface): If specified, the change stream will only provide changes that occurred at or after the specified timestamp. Any command run against the server will return an operation time that can be used here. Alternatively, an operation time may be obtained from MongoDB\Driver\Server::getInfo().

    Using this option in conjunction with "resumeAfter" and/or "startAfter" will result in a server error. The options are mutually exclusive.

    This option is not supported for server versions < 4.0.

  • typeMap (array): Type map for BSON deserialization. This will be applied to the returned Cursor (it is not sent to the server).

Note: A database-level change stream may be created by specifying null for the collection name. A cluster-level change stream may be created by specifying null for both the database and collection name.

Parameters
$manager : Manager

Manager instance from the driver

$databaseName : string|null

Database name

$collectionName : string|null

Collection name

$pipeline : array<string|int, mixed>

List of pipeline operations

$options : array<string|int, mixed> = []

Command options

Tags
throws
InvalidArgumentException

for parameter/option parsing errors

Return values
mixed

execute()

Execute the operation.

public execute(Server $server) : ChangeStream
Parameters
$server : Server
Tags
see
Executable::execute()
throws
UnsupportedException

if collation or read concern is used and unsupported

throws
RuntimeException

for other driver errors (e.g. connection errors)

Return values
ChangeStream

createAggregate()

Create the aggregate command for a change stream.

private createAggregate() : Aggregate

This method is also used to recreate the aggregate command when resuming.

Return values
Aggregate

createChangeStreamIterator()

Create a ChangeStreamIterator by executing the aggregate command.

private createChangeStreamIterator(Server $server) : ChangeStreamIterator
Parameters
$server : Server
Return values
ChangeStreamIterator

executeAggregate()

Execute the aggregate command.

private executeAggregate(Server $server) : Cursor

The command will be executed using APM so that we can capture data from its response (e.g. firstBatch size, postBatchResumeToken).

Parameters
$server : Server
Return values
Cursor

Search results