Upsert Trigger

async AsyncCogniteClient.workflows.triggers.upsert(
workflow_trigger: WorkflowTriggerUpsert,
client_credentials: ClientCredentials | dict | None = None,
) WorkflowTrigger

Create or update a trigger for a workflow.

Parameters:
  • workflow_trigger (WorkflowTriggerUpsert) – The workflow trigger specification.

  • client_credentials (ClientCredentials | dict | None) – Specific credentials that should be used to trigger the workflow execution. When passed will take precedence over the current credentials.

Returns:

The created or updated workflow trigger specification.

Return type:

WorkflowTrigger

Examples

Create or update a scheduled trigger for a workflow:

>>> from cognite.client import CogniteClient
>>> from cognite.client.data_classes.workflows import (
...     WorkflowTriggerUpsert,
...     WorkflowScheduledTriggerRule,
... )
>>> from zoneinfo import ZoneInfo
>>> client = CogniteClient()
>>> # async_client = AsyncCogniteClient()  # another option
>>> client.workflows.triggers.upsert(
...     WorkflowTriggerUpsert(
...         external_id="my_trigger",
...         trigger_rule=WorkflowScheduledTriggerRule(
...             cron_expression="0 0 * * *", timezone=ZoneInfo("UTC")
...         ),
...         workflow_external_id="my_workflow",
...         workflow_version="1",
...         input={"a": 1, "b": 2},
...         metadata={"key": "value"},
...     )
... )

Create or update a data modeling trigger for a workflow:

>>> from cognite.client.data_classes.workflows import (
...     WorkflowDataModelingTriggerRule,
...     WorkflowTriggerDataModelingQuery,
... )
>>> from cognite.client.data_classes.data_modeling.query import (
...     NodeResultSetExpression,
...     Select,
...     SourceSelector,
... )
>>> from cognite.client.data_classes.data_modeling import ViewId
>>> from cognite.client.data_classes.filters import Equals
>>> view_id = ViewId("my_space_id", "view_external_id", "v1")
>>> client.workflows.triggers.upsert(
...     WorkflowTriggerUpsert(
...         external_id="my_trigger",
...         trigger_rule=WorkflowDataModelingTriggerRule(
...             data_modeling_query=WorkflowTriggerDataModelingQuery(
...                 with_={
...                     "timeseries": NodeResultSetExpression(
...                         filter=Equals(
...                             view_id.as_property_ref("name"), value="my_name"
...                         )
...                     )
...                 },
...                 select={"timeseries": Select([SourceSelector(view_id, ["name"])])},
...             ),
...             batch_size=500,
...             batch_timeout=300,
...         ),
...         workflow_external_id="my_workflow",
...         workflow_version="1",
...     )
... )