Subscribe
- async AsyncCogniteClient.data_modeling.instances.subscribe(
- query: QuerySync,
- callback: Callable[[QueryResult], None | Awaitable[None]],
- poll_delay_seconds: float = 30,
- throttle_seconds: float = 1,
Subscribe to a query and get updates when the result set changes.
This runs the sync() method in a background task. We do not support chaining result sets when subscribing to a query.
Tip
For a practical guide on using this method to create a live local replica of your data, see this example of syncing instances to a local SQLite database.
- Parameters:
query (QuerySync) – The query to subscribe to.
callback (Callable[[QueryResult], None | Awaitable[None]]) – The callback function to call when the result set changes. Can be a regular or async function.
poll_delay_seconds (float) – The time to wait between polls when no data is present. Defaults to 30 seconds.
throttle_seconds (float) – The time to wait between polls despite data being present.
- Returns:
An object that can be used to inspect and cancel the subscription.
- Return type:
SubscriptionContext
Examples
Subscribe to a given query and process the results in your own callback function (here we just print the result for illustration):
>>> from cognite.client import CogniteClient >>> from cognite.client.data_classes.data_modeling.query import ( ... QuerySync, ... QueryResult, ... NodeResultSetExpressionSync, ... SelectSync, ... SourceSelector, ... ) >>> from cognite.client.data_classes.data_modeling import ViewId >>> from cognite.client.data_classes.filters import Equals >>> >>> client = CogniteClient() >>> def just_print_the_result(result: QueryResult) -> None: ... print(result) >>> >>> view_id = ViewId("someSpace", "someView", "v1") >>> filter = Equals(view_id.as_property_ref("myAsset"), "Il-Tempo-Gigante") >>> query = QuerySync( ... with_={"work_orders": NodeResultSetExpressionSync(filter=filter)}, ... select={"work_orders": SelectSync([SourceSelector(view_id, ["*"])])}, ... ) >>> subscription_context = client.data_modeling.instances.subscribe( ... query, callback=just_print_the_result ... ) >>> # Use the returned subscription_context to manage the subscription, e.g. to cancel it: >>> subscription_context.cancel()