Sync instances

async AsyncCogniteClient.data_modeling.instances.sync(
query: QuerySync,
include_typing: bool = False,
debug: DebugParameters | None = None,
) QueryResult

Subscription to changes for nodes/edges.

Subscribe to changes for nodes and edges in a project, matching a supplied filter.

Note

Each result set expression accepts a sync_mode controlling how the initial data is loaded. The default, "two_phase", runs a backfill phase followed by live updates and is recommended for queries with custom filters; the backfill phase’s filters and sort must be backed by a cursorable index. Use "one_phase" when fetching all instances (or all in specific spaces) for better throughput, or "no_backfill" to skip the initial load and only receive live updates.

When using "two_phase", the backfill phase is over when the number of instances returned is smaller than the limit (including 0).

Parameters:
  • query (QuerySync) – The query for instances.

  • include_typing (bool) – Return property type information as part of the result.

  • debug (DebugParameters | None) – Debug settings for profiling and troubleshooting.

Returns:

The resulting instances from the query.

Return type:

QueryResult

Examples

Sync all assets in a given space using one-phase mode (recommended for full fetches):

>>> from cognite.client import CogniteClient, AsyncCogniteClient
>>> from cognite.client.data_classes.data_modeling.query import (
...     QuerySync,
...     SelectSync,
...     NodeResultSetExpressionSync,
...     SourceSelector,
... )
>>> from cognite.client.data_classes.filters import SpaceFilter
>>> from cognite.client.data_classes.data_modeling.ids import ViewId
>>> client = CogniteClient()
>>> # async_client = AsyncCogniteClient()  # another option
>>> asset_view = ViewId("mySpace", "myAssets", "v1")
>>> query = QuerySync(
...     with_={
...         "pumps": NodeResultSetExpressionSync(
...             filter=SpaceFilter("mySpace"),
...             sync_mode="one_phase",
...         ),
...     },
...     select={"pumps": SelectSync([SourceSelector(asset_view, ["*"])])},
... )
>>> res = client.data_modeling.instances.sync(query)
>>> # Later: keep up with changes by following the cursor:
>>> query.cursors = res.cursors
>>> res_new = client.data_modeling.instances.sync(query)

Sync all pumps connected to open work orders:

>>> from cognite.client.data_classes.data_modeling.query import (
...     EdgeResultSetExpressionSync,
... )
>>> from cognite.client.data_classes.filters import Equals
>>> work_order_view = ViewId("mySpace", "myWorkOrder", "v1")
>>> pump_view = ViewId("mySpace", "myPump", "v1")
>>> query = QuerySync(
...     with_={
...         "work_orders": NodeResultSetExpressionSync(
...             filter=Equals(work_order_view.as_property_ref("status"), "open"),
...             sync_mode="two_phase",
...         ),
...         "work_orders_to_pumps": EdgeResultSetExpressionSync(
...             from_="work_orders",
...             filter=Equals(
...                 ["edge", "type"],
...                 {"space": work_order_view.space, "externalId": "WorkOrder.pump"},
...             ),
...             sync_mode="two_phase",
...         ),
...         "pumps": NodeResultSetExpressionSync(
...             from_="work_orders_to_pumps",
...             sync_mode="two_phase",
...         ),
...     },
...     select={
...         "pumps": SelectSync([SourceSelector(pump_view, ["*"])]),
...     },
... )
>>> res = client.data_modeling.instances.sync(query)

To debug and/or profile your query, you can use the debug parameter:

>>> from cognite.client.data_classes.data_modeling.debug import DebugParameters
>>> debug_params = DebugParameters(
...     emit_results=False,
...     include_plan=True,  # Include the postgres execution plan
...     include_translated_query=True,  # Include the internal representation of the query.
...     profile=True,
... )
>>> res = client.data_modeling.instances.sync(query, debug=debug_params)
>>> print(res.debug)