Subscribe

async AsyncCogniteClient.data_modeling.instances.subscribe(
query: QuerySync,
callback: Callable[[QueryResult], None | Awaitable[None]],
poll_delay_seconds: float = 30,
throttle_seconds: float = 1,
) SubscriptionContext

Subscribe to a query and get updates when the result set changes.

This runs the sync() method in a background task. We do not support chaining result sets when subscribing to a query.

Tip

For a practical guide on using this method to create a live local replica of your data, see this example of syncing instances to a local SQLite database.

Parameters:
  • query (QuerySync) – The query to subscribe to.

  • callback (Callable[[QueryResult], None | Awaitable[None]]) – The callback function to call when the result set changes. Can be a regular or async function.

  • poll_delay_seconds (float) – The time to wait between polls when no data is present. Defaults to 30 seconds.

  • throttle_seconds (float) – The time to wait between polls despite data being present.

Returns:

An object that can be used to inspect and cancel the subscription.

Return type:

SubscriptionContext

Examples

Subscribe to a given query and process the results in your own callback function (here we just print the result for illustration):

>>> from cognite.client import CogniteClient
>>> from cognite.client.data_classes.data_modeling.query import (
...     QuerySync,
...     QueryResult,
...     NodeResultSetExpressionSync,
...     SelectSync,
...     SourceSelector,
... )
>>> from cognite.client.data_classes.data_modeling import ViewId
>>> from cognite.client.data_classes.filters import Equals
>>>
>>> client = CogniteClient()
>>> def just_print_the_result(result: QueryResult) -> None:
...     print(result)
>>>
>>> view_id = ViewId("someSpace", "someView", "v1")
>>> filter = Equals(view_id.as_property_ref("myAsset"), "Il-Tempo-Gigante")
>>> query = QuerySync(
...     with_={"work_orders": NodeResultSetExpressionSync(filter=filter)},
...     select={"work_orders": SelectSync([SourceSelector(view_id, ["*"])])},
... )
>>> subscription_context = client.data_modeling.instances.subscribe(
...     query, callback=just_print_the_result
... )
>>> # Use the returned subscription_context to manage the subscription, e.g. to cancel it:
>>> subscription_context.cancel()