Sync With File Cache

async AsyncCogniteClient.data_modeling.instances.sync_with_file_cache(
query: QuerySync,
*,
file_external_id: str,
security_category: int,
backup_every: timedelta | None = datetime.timedelta(seconds=900),
backup_on_exit: bool = False,
) SyncSessionWithCache

Create a managed sync session with persistent backup to a CDF file.

Returns a SyncSessionWithCache that you use as an async context manager. On entry the session downloads the backup file from CDF and restores the instance data and previous cursor positions if the query hash matches, allowing you to immediately continue syncing instances from where your last session left off (no need for a full backfill).

We require a security category for the file to prevent users who lack access to the underlying instances from bypassing those restrictions by reading the backup file directly.

Note

This session (or rather /sync) must be invoked frequently enough to keep cursors alive. Cursors expire after 3 days (the soft-delete retention period). If a cursor expires the session will raise an error — call await session.invalidate() followed by await session.sync_until_live() to start fresh from a full backfill.

If you really don’t care about missing deleted instances, there’s allow_expired_cursors_and_accept_missed_deletes=True on QuerySync that will allow you to use an older cursor. Be careful, you can not change this setting mid-session, as the hash of your query would no longer match the existing, and a full backfill would be triggered.

Warning

This functionality is in alpha and only the async context manager interface is currently available. The API and behaviour may change without notice.

Parameters:
  • query (QuerySync) – The sync query.

  • file_external_id (str) – External ID of the CDF file used as the durable backup store. The file is created automatically on the first backup if it does not yet exist.

  • security_category (int) – The security category to apply to the backup file. The file is created (and re-uploaded on each backup) with this security category set, preventing users who lack access to the underlying instances from reading the backup directly.

  • backup_every (timedelta | None) – How often to upload state to CDF during a session (when active). None uploads only on context-manager exit.

  • backup_on_exit (bool) – Whether to upload state to CDF on context-manager exit.

Raises:
  • ValueError – If query already has cursors set (cursors are managed internally).

  • ValueError – If any result-set expression referenced in select does not have an explicit limit set.

  • ValueError – If the given security_category does not exist in this project.

  • ValueError – If backup_every is set to a value smaller than 1 minute.

Returns:

The context manager for managing the sync session.

Return type:

SyncSessionWithCache

Examples

One-off job that loads state from the Files API, syncs until it has caught up with all live changes, then does some work that require huge amounts of instance data, then backs up the progress on exit (from the context manager):

>>> import asyncio
>>> from cognite.client import AsyncCogniteClient
>>> from cognite.client.data_classes.data_modeling.query import (
...     QuerySync,
...     SelectSync,
...     NodeResultSetExpressionSync,
... )
>>> client = AsyncCogniteClient()
>>> query = QuerySync(
...     with_={"my_nodes": NodeResultSetExpressionSync(limit=1000)},
...     select={"my_nodes": SelectSync()},
... )
>>> session = client.data_modeling.instances.sync_with_file_cache(
...     query,
...     file_external_id="my_backup_file",
...     security_category=123,
...     backup_every=None,  # Only backup on exit
...     backup_on_exit=True,
... )
>>> def do_work(nodes: NodeList) -> None:
...     print(len(nodes))  # ¯\_(ツ)_/¯
>>>
>>> async with session:  
...     await session.sync_until_live()
...     do_work(session.get_nodes("my_nodes"))

Longer-running job with periodic backups, e.g. regularly computing statistics for a dashboard:

>>> session = client.data_modeling.instances.sync_with_file_cache(
...     query,
...     file_external_id="my_backup_file",
...     security_category=123,
...     backup_every=timedelta(minutes=15),
... )
>>> async with session:  
...     while True:
...         await session.sync_until_live()
...         do_work(session.get_nodes("my_nodes"))
...         await asyncio.sleep(60)