Sync instances
- async AsyncCogniteClient.data_modeling.instances.sync(
- query: QuerySync,
- include_typing: bool = False,
- debug: DebugParameters | None = None,
Subscription to changes for nodes/edges.
Subscribe to changes for nodes and edges in a project, matching a supplied filter.
Note
Each result set expression accepts a
sync_modecontrolling how the initial data is loaded. The default,"two_phase", runs a backfill phase followed by live updates and is recommended for queries with custom filters; the backfill phase’s filters and sort must be backed by a cursorable index. Use"one_phase"when fetching all instances (or all in specific spaces) for better throughput, or"no_backfill"to skip the initial load and only receive live updates.When using
"two_phase", the backfill phase is over when the number of instances returned is smaller than the limit (including 0).- Parameters:
query (QuerySync) – The query for instances.
include_typing (bool) – Return property type information as part of the result.
debug (DebugParameters | None) – Debug settings for profiling and troubleshooting.
- Returns:
The resulting instances from the query.
- Return type:
Examples
Sync all assets in a given space using one-phase mode (recommended for full fetches):
>>> from cognite.client import CogniteClient, AsyncCogniteClient >>> from cognite.client.data_classes.data_modeling.query import ( ... QuerySync, ... SelectSync, ... NodeResultSetExpressionSync, ... SourceSelector, ... ) >>> from cognite.client.data_classes.filters import SpaceFilter >>> from cognite.client.data_classes.data_modeling.ids import ViewId >>> client = CogniteClient() >>> # async_client = AsyncCogniteClient() # another option >>> asset_view = ViewId("mySpace", "myAssets", "v1") >>> query = QuerySync( ... with_={ ... "pumps": NodeResultSetExpressionSync( ... filter=SpaceFilter("mySpace"), ... sync_mode="one_phase", ... ), ... }, ... select={"pumps": SelectSync([SourceSelector(asset_view, ["*"])])}, ... ) >>> res = client.data_modeling.instances.sync(query) >>> # Later: keep up with changes by following the cursor: >>> query.cursors = res.cursors >>> res_new = client.data_modeling.instances.sync(query)
Sync all pumps connected to open work orders:
>>> from cognite.client.data_classes.data_modeling.query import ( ... EdgeResultSetExpressionSync, ... ) >>> from cognite.client.data_classes.filters import Equals >>> work_order_view = ViewId("mySpace", "myWorkOrder", "v1") >>> pump_view = ViewId("mySpace", "myPump", "v1") >>> query = QuerySync( ... with_={ ... "work_orders": NodeResultSetExpressionSync( ... filter=Equals(work_order_view.as_property_ref("status"), "open"), ... sync_mode="two_phase", ... ), ... "work_orders_to_pumps": EdgeResultSetExpressionSync( ... from_="work_orders", ... filter=Equals( ... ["edge", "type"], ... {"space": work_order_view.space, "externalId": "WorkOrder.pump"}, ... ), ... sync_mode="two_phase", ... ), ... "pumps": NodeResultSetExpressionSync( ... from_="work_orders_to_pumps", ... sync_mode="two_phase", ... ), ... }, ... select={ ... "pumps": SelectSync([SourceSelector(pump_view, ["*"])]), ... }, ... ) >>> res = client.data_modeling.instances.sync(query)
To debug and/or profile your query, you can use the debug parameter:
>>> from cognite.client.data_classes.data_modeling.debug import DebugParameters >>> debug_params = DebugParameters( ... emit_results=False, ... include_plan=True, # Include the postgres execution plan ... include_translated_query=True, # Include the internal representation of the query. ... profile=True, ... ) >>> res = client.data_modeling.instances.sync(query, debug=debug_params) >>> print(res.debug)