Sync With File Cache
- async AsyncCogniteClient.data_modeling.instances.sync_with_file_cache(
- query: QuerySync,
- *,
- file_external_id: str,
- security_category: int,
- backup_every: timedelta | None = datetime.timedelta(seconds=900),
- backup_on_exit: bool = False,
Create a managed sync session with persistent backup to a CDF file.
Returns a
SyncSessionWithCachethat you use as an async context manager. On entry the session downloads the backup file from CDF and restores the instance data and previous cursor positions if the query hash matches, allowing you to immediately continue syncing instances from where your last session left off (no need for a full backfill).We require a security category for the file to prevent users who lack access to the underlying instances from bypassing those restrictions by reading the backup file directly.
Note
This session (or rather /sync) must be invoked frequently enough to keep cursors alive. Cursors expire after 3 days (the soft-delete retention period). If a cursor expires the session will raise an error — call
await session.invalidate()followed byawait session.sync_until_live()to start fresh from a full backfill.If you really don’t care about missing deleted instances, there’s
allow_expired_cursors_and_accept_missed_deletes=TrueonQuerySyncthat will allow you to use an older cursor. Be careful, you can not change this setting mid-session, as the hash of your query would no longer match the existing, and a full backfill would be triggered.Warning
This functionality is in alpha and only the async context manager interface is currently available. The API and behaviour may change without notice.
- Parameters:
query (QuerySync) – The sync query.
file_external_id (str) – External ID of the CDF file used as the durable backup store. The file is created automatically on the first backup if it does not yet exist.
security_category (int) – The security category to apply to the backup file. The file is created (and re-uploaded on each backup) with this security category set, preventing users who lack access to the underlying instances from reading the backup directly.
backup_every (timedelta | None) – How often to upload state to CDF during a session (when active).
Noneuploads only on context-manager exit.backup_on_exit (bool) – Whether to upload state to CDF on context-manager exit.
- Raises:
ValueError – If
queryalready has cursors set (cursors are managed internally).ValueError – If any result-set expression referenced in
selectdoes not have an explicitlimitset.ValueError – If the given
security_categorydoes not exist in this project.ValueError – If
backup_everyis set to a value smaller than 1 minute.
- Returns:
The context manager for managing the sync session.
- Return type:
SyncSessionWithCache
Examples
One-off job that loads state from the Files API, syncs until it has caught up with all live changes, then does some work that require huge amounts of instance data, then backs up the progress on exit (from the context manager):
>>> import asyncio >>> from cognite.client import AsyncCogniteClient >>> from cognite.client.data_classes.data_modeling.query import ( ... QuerySync, ... SelectSync, ... NodeResultSetExpressionSync, ... ) >>> client = AsyncCogniteClient() >>> query = QuerySync( ... with_={"my_nodes": NodeResultSetExpressionSync(limit=1000)}, ... select={"my_nodes": SelectSync()}, ... ) >>> session = client.data_modeling.instances.sync_with_file_cache( ... query, ... file_external_id="my_backup_file", ... security_category=123, ... backup_every=None, # Only backup on exit ... backup_on_exit=True, ... ) >>> def do_work(nodes: NodeList) -> None: ... print(len(nodes)) # ¯\_(ツ)_/¯ >>> >>> async with session: ... await session.sync_until_live() ... do_work(session.get_nodes("my_nodes"))
Longer-running job with periodic backups, e.g. regularly computing statistics for a dashboard:
>>> session = client.data_modeling.instances.sync_with_file_cache( ... query, ... file_external_id="my_backup_file", ... security_category=123, ... backup_every=timedelta(minutes=15), ... ) >>> async with session: ... while True: ... await session.sync_until_live() ... do_work(session.get_nodes("my_nodes")) ... await asyncio.sleep(60)