Upsert Trigger
- async AsyncCogniteClient.workflows.triggers.upsert(
- workflow_trigger: WorkflowTriggerUpsert,
- client_credentials: ClientCredentials | dict | None = None,
Create or update a trigger for a workflow.
- Parameters:
workflow_trigger (WorkflowTriggerUpsert) – The workflow trigger specification.
client_credentials (ClientCredentials | dict | None) – Specific credentials that should be used to trigger the workflow execution. When passed will take precedence over the current credentials.
- Returns:
The created or updated workflow trigger specification.
- Return type:
Examples
Create or update a scheduled trigger for a workflow:
>>> from cognite.client import CogniteClient >>> from cognite.client.data_classes.workflows import ( ... WorkflowTriggerUpsert, ... WorkflowScheduledTriggerRule, ... ) >>> from zoneinfo import ZoneInfo >>> client = CogniteClient() >>> # async_client = AsyncCogniteClient() # another option >>> client.workflows.triggers.upsert( ... WorkflowTriggerUpsert( ... external_id="my_trigger", ... trigger_rule=WorkflowScheduledTriggerRule( ... cron_expression="0 0 * * *", timezone=ZoneInfo("UTC") ... ), ... workflow_external_id="my_workflow", ... workflow_version="1", ... input={"a": 1, "b": 2}, ... metadata={"key": "value"}, ... ) ... )
Create or update a data modeling trigger for a workflow:
>>> from cognite.client.data_classes.workflows import ( ... WorkflowDataModelingTriggerRule, ... WorkflowTriggerDataModelingQuery, ... ) >>> from cognite.client.data_classes.data_modeling.query import ( ... NodeResultSetExpression, ... Select, ... SourceSelector, ... ) >>> from cognite.client.data_classes.data_modeling import ViewId >>> from cognite.client.data_classes.filters import Equals >>> view_id = ViewId("my_space_id", "view_external_id", "v1") >>> client.workflows.triggers.upsert( ... WorkflowTriggerUpsert( ... external_id="my_trigger", ... trigger_rule=WorkflowDataModelingTriggerRule( ... data_modeling_query=WorkflowTriggerDataModelingQuery( ... with_={ ... "timeseries": NodeResultSetExpression( ... filter=Equals( ... view_id.as_property_ref("name"), value="my_name" ... ) ... ) ... }, ... select={"timeseries": Select([SourceSelector(view_id, ["name"])])}, ... ), ... batch_size=500, ... batch_timeout=300, ... ), ... workflow_external_id="my_workflow", ... workflow_version="1", ... ) ... )