Watch
in package
implements
Executable, CommandSubscriber
Operation for creating a change stream with the aggregate command.
Note: the implementation of CommandSubscriber is an internal implementation detail and should not be considered part of the public API.
Tags
Interfaces, Classes, Traits and Enums
- Executable
- CommandSubscriber
Table of Contents
- FULL_DOCUMENT_DEFAULT = 'default'
- FULL_DOCUMENT_UPDATE_LOOKUP = 'updateLookup'
- $aggregate : Aggregate
- $aggregateOptions : array<string|int, mixed>
- $changeStreamOptions : array<string|int, mixed>
- $collectionName : string|null
- $databaseName : string
- $firstBatchSize : int|null
- $hasResumed : bool
- $manager : Manager
- $operationTime : TimestampInterface
- $pipeline : array<string|int, mixed>
- $postBatchResumeToken : object|null
- $wireVersionForStartAtOperationTime : int
- __construct() : mixed
- Constructs an aggregate command for creating a change stream.
- execute() : ChangeStream
- Execute the operation.
- createAggregate() : Aggregate
- Create the aggregate command for a change stream.
- createChangeStreamIterator() : ChangeStreamIterator
- Create a ChangeStreamIterator by executing the aggregate command.
- executeAggregate() : Cursor
- Execute the aggregate command.
- getInitialResumeToken() : array<string|int, mixed>|object|null
- Return the initial resume token for creating the ChangeStreamIterator.
- resume() : ChangeStreamIterator
- Resumes a change stream.
- shouldCaptureOperationTime() : bool
- Determine whether to capture operation time from an aggregate response.
Constants
FULL_DOCUMENT_DEFAULT
public
mixed
FULL_DOCUMENT_DEFAULT
= 'default'
FULL_DOCUMENT_UPDATE_LOOKUP
public
mixed
FULL_DOCUMENT_UPDATE_LOOKUP
= 'updateLookup'
Properties
$aggregate
private
Aggregate
$aggregate
$aggregateOptions
private
array<string|int, mixed>
$aggregateOptions
$changeStreamOptions
private
array<string|int, mixed>
$changeStreamOptions
$collectionName
private
string|null
$collectionName
$databaseName
private
string
$databaseName
$firstBatchSize
private
int|null
$firstBatchSize
$hasResumed
private
bool
$hasResumed
= false
$manager
private
Manager
$manager
$operationTime
private
TimestampInterface
$operationTime
$pipeline
private
array<string|int, mixed>
$pipeline
$postBatchResumeToken
private
object|null
$postBatchResumeToken
$wireVersionForStartAtOperationTime
private
static int
$wireVersionForStartAtOperationTime
= 7
Methods
__construct()
Constructs an aggregate command for creating a change stream.
public
__construct(Manager $manager, string|null $databaseName, string|null $collectionName, array<string|int, mixed> $pipeline[, array<string|int, mixed> $options = [] ]) : mixed
Supported options:
-
batchSize (integer): The number of documents to return per batch.
-
collation (document): Specifies a collation.
-
fullDocument (string): Determines whether the "fullDocument" field will be populated for update operations. By default, change streams only return the delta of fields during the update operation (via the "updateDescription" field). To additionally return the most current majority-committed version of the updated document, specify "updateLookup" for this option. Defaults to "default".
Insert and replace operations always include the "fullDocument" field and delete operations omit the field as the document no longer exists.
-
maxAwaitTimeMS (integer): The maximum amount of time for the server to wait on new documents to satisfy a change stream query.
-
readConcern (MongoDB\Driver\ReadConcern): Read concern.
-
readPreference (MongoDB\Driver\ReadPreference): Read preference. This will be used to select a new server when resuming. Defaults to a "primary" read preference.
-
resumeAfter (document): Specifies the logical starting point for the new change stream.
Using this option in conjunction with "startAfter" and/or "startAtOperationTime" will result in a server error. The options are mutually exclusive.
-
session (MongoDB\Driver\Session): Client session.
-
startAfter (document): Specifies the logical starting point for the new change stream. Unlike "resumeAfter", this option can be used with a resume token from an "invalidate" event.
Using this option in conjunction with "resumeAfter" and/or "startAtOperationTime" will result in a server error. The options are mutually exclusive.
-
startAtOperationTime (MongoDB\BSON\TimestampInterface): If specified, the change stream will only provide changes that occurred at or after the specified timestamp. Any command run against the server will return an operation time that can be used here. Alternatively, an operation time may be obtained from MongoDB\Driver\Server::getInfo().
Using this option in conjunction with "resumeAfter" and/or "startAfter" will result in a server error. The options are mutually exclusive.
This option is not supported for server versions < 4.0.
-
typeMap (array): Type map for BSON deserialization. This will be applied to the returned Cursor (it is not sent to the server).
Note: A database-level change stream may be created by specifying null for the collection name. A cluster-level change stream may be created by specifying null for both the database and collection name.
Parameters
- $manager : Manager
-
Manager instance from the driver
- $databaseName : string|null
-
Database name
- $collectionName : string|null
-
Collection name
- $pipeline : array<string|int, mixed>
-
List of pipeline operations
- $options : array<string|int, mixed> = []
-
Command options
Tags
Return values
mixed —execute()
Execute the operation.
public
execute(Server $server) : ChangeStream
Parameters
- $server : Server
Tags
Return values
ChangeStream —createAggregate()
Create the aggregate command for a change stream.
private
createAggregate() : Aggregate
This method is also used to recreate the aggregate command when resuming.
Return values
Aggregate —createChangeStreamIterator()
Create a ChangeStreamIterator by executing the aggregate command.
private
createChangeStreamIterator(Server $server) : ChangeStreamIterator
Parameters
- $server : Server
Return values
ChangeStreamIterator —executeAggregate()
Execute the aggregate command.
private
executeAggregate(Server $server) : Cursor
The command will be executed using APM so that we can capture data from its response (e.g. firstBatch size, postBatchResumeToken).
Parameters
- $server : Server
Return values
Cursor —getInitialResumeToken()
Return the initial resume token for creating the ChangeStreamIterator.
private
getInitialResumeToken() : array<string|int, mixed>|object|null
Tags
Return values
array<string|int, mixed>|object|null —resume()
Resumes a change stream.
private
resume([array<string|int, mixed>|object|null $resumeToken = null ][, bool $hasAdvanced = false ]) : ChangeStreamIterator
Parameters
- $resumeToken : array<string|int, mixed>|object|null = null
- $hasAdvanced : bool = false
Tags
Return values
ChangeStreamIterator —shouldCaptureOperationTime()
Determine whether to capture operation time from an aggregate response.
private
shouldCaptureOperationTime(Server $server) : bool
Parameters
- $server : Server