Data Workflows

Workflows

Upsert Workflows

async WorkflowAPI.upsert(
workflow: WorkflowUpsert | Sequence[WorkflowUpsert],
mode: Literal['replace'] = 'replace',
) Workflow | WorkflowList

Create one or more workflow(s).

Note this is an upsert endpoint, so workflows that already exist will be updated, and new ones will be created.

Parameters:
  • workflow (WorkflowUpsert | Sequence[WorkflowUpsert]) – The workflow(s) to upsert.

  • mode (Literal['replace']) – This is not an option for the API, but is included here to document that the upserts are always done in replace mode.

Returns:

The created workflow(s).

Return type:

Workflow | WorkflowList

Examples

Create one workflow with external id “my_workflow”:

>>> from cognite.client import CogniteClient
>>> from cognite.client.data_classes import WorkflowUpsert
>>> client = CogniteClient()
>>> # async_client = AsyncCogniteClient()  # another option
>>> wf = WorkflowUpsert(external_id="my_workflow", description="my workflow description")
>>> res = client.workflows.upsert(wf)

Create multiple workflows:

>>> wf2 = WorkflowUpsert(external_id="other", data_set_id=123)
>>> res = client.workflows.upsert([wf, wf2])

Retrieve Workflows

async WorkflowAPI.retrieve(
external_id: str | SequenceNotStr[str],
ignore_unknown_ids: bool = False,
) Workflow | WorkflowList | None

Retrieve one or more workflows.

Parameters:
  • external_id (str | SequenceNotStr[str]) – Identifier (or sequence of identifiers) for a Workflow. Must be unique.

  • ignore_unknown_ids (bool) – When requesting multiple workflows, whether to ignore external IDs that are not found rather than throwing an exception.

Returns:

If a single external ID is specified: the requested workflow, or None if it does not exist. If several external IDs are specified: the requested workflows.

Return type:

Workflow | WorkflowList | None

Examples

Retrieve workflow with external ID “my_workflow”:

>>> from cognite.client import CogniteClient, AsyncCogniteClient
>>> client = CogniteClient()
>>> # async_client = AsyncCogniteClient()  # another option
>>> workflow = client.workflows.retrieve("my_workflow")

Retrieve multiple workflows:

>>> workflow_list = client.workflows.retrieve(["foo", "bar"])

List Workflows

async WorkflowAPI.list(
limit: int | None = 25,
) WorkflowList

List workflows in the project.

Parameters:

limit (int | None) – Maximum number of results to return. Defaults to 25. Set to -1, float(“inf”) or None

Returns:

Workflows in the CDF project.

Return type:

WorkflowList

Examples

List all workflows:

>>> from cognite.client import CogniteClient, AsyncCogniteClient
>>> client = CogniteClient()
>>> # async_client = AsyncCogniteClient()  # another option
>>> res = client.workflows.list(limit=None)

Delete Workflows

async WorkflowAPI.delete(
external_id: str | SequenceNotStr[str],
ignore_unknown_ids: bool = False,
) None

Delete one or more workflows with versions.

Parameters:
  • external_id (str | SequenceNotStr[str]) – External id or list of external ids to delete.

  • ignore_unknown_ids (bool) – Ignore external ids that are not found rather than throw an exception.

Examples

Delete workflow with external_id “my_workflow”:

>>> from cognite.client import CogniteClient, AsyncCogniteClient
>>> client = CogniteClient()
>>> # async_client = AsyncCogniteClient()  # another option
>>> client.workflows.delete("my_workflow")

Workflow Versions

Upsert Workflow Versions

async WorkflowVersionAPI.upsert(
version: WorkflowVersionUpsert | Sequence[WorkflowVersionUpsert],
mode: Literal['replace'] = 'replace',
) WorkflowVersion | WorkflowVersionList

Create one or more workflow version(s).

Note this is an upsert endpoint, so workflow versions that already exist will be updated, and new ones will be created.

Parameters:
  • version (WorkflowVersionUpsert | Sequence[WorkflowVersionUpsert]) – The workflow version(s) to upsert.

  • mode (Literal['replace']) – This is not an option for the API, but is included here to document that the upserts are always done in replace mode.

Returns:

The created workflow version(s).

Return type:

WorkflowVersion | WorkflowVersionList

Examples

Create one workflow version with a single Function task:

>>> from cognite.client import CogniteClient
>>> from cognite.client.data_classes import (
...     WorkflowVersionUpsert, WorkflowDefinitionUpsert,
...     WorkflowTask, FunctionTaskParameters,
... )
>>> client = CogniteClient()
>>> # async_client = AsyncCogniteClient()  # another option
>>> function_task = WorkflowTask(
...     external_id="my_workflow-task1",
...     parameters=FunctionTaskParameters(
...         external_id="my_fn_xid",
...         data={"a": 1, "b": 2},
...     ),
... )
>>> new_version = WorkflowVersionUpsert(
...    workflow_external_id="my_workflow",
...    version="1",
...    workflow_definition=WorkflowDefinitionUpsert(
...        tasks=[function_task],
...        description="This workflow has one step",
...    ),
... )
>>> res = client.workflows.versions.upsert(new_version)

Retrieve Workflow Versions

async WorkflowVersionAPI.retrieve(
workflow_external_id: WorkflowVersionIdentifier | Sequence[WorkflowVersionIdentifier] | WorkflowIds,
*,
ignore_unknown_ids: bool = False,
) WorkflowVersion | WorkflowVersionList | None

Retrieve a workflow version.

Parameters:
  • workflow_external_id (WorkflowVersionIdentifier | Sequence[WorkflowVersionIdentifier] | WorkflowIds) – External id of the workflow.

  • ignore_unknown_ids (bool) – When requesting multiple, whether to ignore external IDs that are not found rather than throwing an exception.

Returns:

If a single identifier is specified: the requested workflow version, or None if it does not exist. If several ids are specified: the requested workflow versions.

Return type:

WorkflowVersion | WorkflowVersionList | None

Examples

Retrieve workflow version ‘v1’ of workflow “my_workflow”:

>>> from cognite.client import CogniteClient
>>> from cognite.client.data_classes import WorkflowVersionId
>>> client = CogniteClient()
>>> # async_client = AsyncCogniteClient()  # another option
>>> res = client.workflows.versions.retrieve(WorkflowVersionId("my_workflow", "v1"))

Retrieve multiple workflow versions and ignore unknown:

>>> res = client.workflows.versions.retrieve(
...     [WorkflowVersionId("my_workflow", "v1"), WorkflowVersionId("other", "v3.2")],
...     ignore_unknown_ids=True,
... )
>>> # A sequence of tuples is also accepted:
>>> res = client.workflows.versions.retrieve([("my_workflow", "v1"), ("other", "v3.2")])

List Workflow Versions

async WorkflowVersionAPI.list(
workflow_version_ids: WorkflowIdentifier | MutableSequence[WorkflowIdentifier] | None = None,
limit: int | None = 25,
) WorkflowVersionList

List workflow versions in the project

Parameters:
  • workflow_version_ids (WorkflowIdentifier | MutableSequence[WorkflowIdentifier] | None) – Workflow version id or list of workflow version ids to filter on.

  • limit (int | None) – Maximum number of results to return. Defaults to 25. Set to -1, float(“inf”) or None

Returns:

The requested workflow versions.

Return type:

WorkflowVersionList

Examples

Get all workflow version for workflows ‘my_workflow’ and ‘my_workflow_2’:

>>> from cognite.client import CogniteClient, AsyncCogniteClient
>>> client = CogniteClient()
>>> # async_client = AsyncCogniteClient()  # another option
>>> res = client.workflows.versions.list(["my_workflow", "my_workflow_2"])

Get all workflow versions for workflows ‘my_workflow’ and ‘my_workflow_2’ using the WorkflowVersionId class:

>>> from cognite.client.data_classes import WorkflowVersionId
>>> res = client.workflows.versions.list(
...     [WorkflowVersionId("my_workflow"), WorkflowVersionId("my_workflow_2")])

Get all workflow versions for workflows ‘my_workflow’ version ‘1’ and ‘my_workflow_2’ version ‘2’ using tuples:

>>> res = client.workflows.versions.list(
...     [("my_workflow", "1"), ("my_workflow_2", "2")])

Delete Workflow Versions

async WorkflowVersionAPI.delete(
workflow_version_id: WorkflowVersionIdentifier | MutableSequence[WorkflowVersionId] | MutableSequence[tuple[str, str]],
ignore_unknown_ids: bool = False,
) None

Delete a workflow version(s).

Parameters:
  • workflow_version_id (WorkflowVersionIdentifier | MutableSequence[WorkflowVersionId] | MutableSequence[tuple[str, str]]) – Workflow version id or list of workflow version ids to delete.

  • ignore_unknown_ids (bool) – Ignore external ids that are not found rather than throw an exception.

Examples

Delete workflow version “1” of workflow “my workflow” specified by using a tuple:

>>> from cognite.client import CogniteClient, AsyncCogniteClient
>>> client = CogniteClient()
>>> # async_client = AsyncCogniteClient()  # another option
>>> client.workflows.versions.delete(("my workflow", "1"))

Delete workflow version “1” of workflow “my workflow” and workflow version “2” of workflow “my workflow 2” using the WorkflowVersionId class:

>>> from cognite.client.data_classes import WorkflowVersionId
>>> client.workflows.versions.delete([WorkflowVersionId("my workflow", "1"), WorkflowVersionId("my workflow 2", "2")])

Workflow Executions

Run Workflow Execution

async WorkflowExecutionAPI.run(
workflow_external_id: str,
version: str,
input: dict | None = None,
metadata: dict | None = None,
client_credentials: ClientCredentials | None = None,
nonce: str | None = None,
) WorkflowExecution

Run a workflow execution.

Parameters:
  • workflow_external_id (str) – External id of the workflow.

  • version (str) – Version of the workflow.

  • input (dict | None) – The input to the workflow execution. This will be available for tasks that have specified it as an input with the string “${workflow.input}” See tip below for more information.

  • metadata (dict | None) – Application specific metadata. Keys have a maximum length of 32 characters, values a maximum of 255, and there can be a maximum of 10 key-value pairs.

  • client_credentials (ClientCredentials | None) – Specific credentials that should be used to trigger the workflow execution. When passed will take precedence over the current credentials.

  • nonce (str | None) – The nonce to use to bind the session. If not provided, a new session will be created using the given ‘client_credentials’. If this is not given, the current credentials will be used.

Tip

The workflow input can be available in the workflow tasks. For example, if you have a Task with function parameters then you can specify it as follows

>>> from cognite.client.data_classes import WorkflowTask, FunctionTaskParameters
>>> task = WorkflowTask(
...     external_id="my_workflow-task1",
...     parameters=FunctionTaskParameters(
...         external_id="cdf_deployed_function:my_function",
...         data={"workflow_data": "${workflow.input}"}))

Tip

You can create a session via the Sessions API, using the client.iam.session.create() method.

Returns:

The created workflow execution.

Return type:

WorkflowExecution

Examples

Trigger a workflow execution for the workflow “foo”, version 1:

>>> from cognite.client import CogniteClient, AsyncCogniteClient
>>> client = CogniteClient()
>>> # async_client = AsyncCogniteClient()  # another option
>>> res = client.workflows.executions.run("foo", "1")

Trigger a workflow execution with input data:

>>> res = client.workflows.executions.run("foo", "1", input={"a": 1, "b": 2})

Trigger a workflow execution using a specific set of client credentials (i.e. not your current credentials):

>>> import os
>>> from cognite.client.data_classes import ClientCredentials
>>> credentials = ClientCredentials("my-client-id", os.environ["MY_CLIENT_SECRET"])
>>> res = client.workflows.executions.run("foo", "1", client_credentials=credentials)

Retrieve detailed Workflow Execution

async WorkflowExecutionAPI.retrieve_detailed(
id: str,
) WorkflowExecutionDetailed | None

Retrieve a workflow execution with detailed information.

Parameters:

id (str) – The server-generated id of the workflow execution.

Returns:

The requested workflow execution if it exists, None otherwise.

Return type:

WorkflowExecutionDetailed | None

Examples

Retrieve workflow execution with id ‘000560bc-9080-4286-b242-a27bb4819253’:

>>> from cognite.client import CogniteClient, AsyncCogniteClient
>>> client = CogniteClient()
>>> # async_client = AsyncCogniteClient()  # another option
>>> res = client.workflows.executions.retrieve_detailed("000560bc-9080-4286-b242-a27bb4819253")

List workflow executions and retrieve detailed information for the first one:

>>> res = client.workflows.executions.list()
>>> res = client.workflows.executions.retrieve_detailed(res[0].id)

List Workflow Executions

async WorkflowExecutionAPI.list(
workflow_version_ids: WorkflowVersionIdentifier | MutableSequence[WorkflowVersionIdentifier] | None = None,
created_time_start: int | None = None,
created_time_end: int | None = None,
statuses: WorkflowStatus | MutableSequence[WorkflowStatus] | None = None,
limit: int | None = 25,
) WorkflowExecutionList

List workflow executions in the project.

Parameters:
  • workflow_version_ids (WorkflowVersionIdentifier | MutableSequence[WorkflowVersionIdentifier] | None) – Workflow version id or list of workflow version ids to filter on.

  • created_time_start (int | None) – Filter out executions that was created before this time. Time is in milliseconds since epoch.

  • created_time_end (int | None) – Filter out executions that was created after this time. Time is in milliseconds since epoch.

  • statuses (WorkflowStatus | MutableSequence[WorkflowStatus] | None) – Workflow status or list of workflow statuses to filter on.

  • limit (int | None) – Maximum number of results to return. Defaults to 25. Set to -1, float(“inf”) or None to return all items.

Returns:

The requested workflow executions.

Return type:

WorkflowExecutionList

Examples

Get all workflow executions for workflows ‘my_workflow’ version ‘1’:

>>> from cognite.client import CogniteClient, AsyncCogniteClient
>>> client = CogniteClient()
>>> # async_client = AsyncCogniteClient()  # another option
>>> res = client.workflows.executions.list(("my_workflow", "1"))

Get all workflow executions from the last 24 hours:

>>> from cognite.client.utils import timestamp_to_ms
>>> res = client.workflows.executions.list(
...     created_time_start=timestamp_to_ms("1d-ago"))

Cancel Workflow Execution

async WorkflowExecutionAPI.cancel(
id: str,
reason: str | None,
) WorkflowExecution

Cancel a workflow execution.

Note

Cancelling a workflow will immediately cancel the in_progress tasks, but not their spawned work in other services (like transformations and functions).

Parameters:
  • id (str) – The server-generated id of the workflow execution.

  • reason (str | None) – The reason for the cancellation, this will be put within the execution’s reasonForIncompletion field. It is defaulted to ‘cancelled’ if not provided.

Returns:

The canceled workflow execution.

Return type:

WorkflowExecution

Examples

Trigger a workflow execution for the workflow “foo”, version 1 and cancel it:

>>> from cognite.client import CogniteClient, AsyncCogniteClient
>>> client = CogniteClient()
>>> # async_client = AsyncCogniteClient()  # another option
>>> res = client.workflows.executions.run("foo", "1")
>>> client.workflows.executions.cancel(id="foo", reason="test cancellation")

Retry Workflow Execution

async WorkflowExecutionAPI.retry(
id: str,
client_credentials: ClientCredentials | None = None,
) WorkflowExecution

Retry a workflow execution.

Parameters:
  • id (str) – The server-generated id of the workflow execution.

  • client_credentials (ClientCredentials | None) – Specific credentials that should be used to retry the workflow execution. When passed will take precedence over the current credentials.

Returns:

The retried workflow execution.

Return type:

WorkflowExecution

Examples

Retry a workflow execution that has been cancelled or failed:

>>> from cognite.client import CogniteClient, AsyncCogniteClient
>>> client = CogniteClient()
>>> # async_client = AsyncCogniteClient()  # another option
>>> res = client.workflows.executions.run("foo", "1")
>>> client.workflows.executions.cancel(id=res.id, reason="test cancellation")
>>> client.workflows.executions.retry(res.id)

Workflow Tasks

Update status of async Task

async WorkflowTaskAPI.update(
task_id: str,
status: Literal['completed', 'failed', 'failed_with_terminal_error'],
output: dict | None = None,
) WorkflowTaskExecution

Update status of async task.

For tasks that has been marked with ‘is_async = True’, the status must be updated by calling this endpoint with either ‘completed’, ‘failed’ or ‘failed_with_terminal_error’.

Parameters:
  • task_id (str) – The server-generated id of the task.

  • status (Literal['completed', 'failed', 'failed_with_terminal_error']) – The new status of the task. Must be either ‘completed’, ‘failed’ or ‘failed_with_terminal_error’.

  • output (dict | None) – The output of the task. This will be available for tasks that has specified it as an output with the string “${<taskExternalId>.output}”

Returns:

The updated task execution.

Return type:

WorkflowTaskExecution

Examples

Update task with id ‘000560bc-9080-4286-b242-a27bb4819253’ to status ‘completed’:

>>> from cognite.client import CogniteClient, AsyncCogniteClient
>>> client = CogniteClient()
>>> # async_client = AsyncCogniteClient()  # another option
>>> res = client.workflows.tasks.update("000560bc-9080-4286-b242-a27bb4819253", "completed")

Update task with id ‘000560bc-9080-4286-b242-a27bb4819253’ to status ‘failed’ with output ‘{“a”: 1, “b”: 2}’:

>>> res = client.workflows.tasks.update("000560bc-9080-4286-b242-a27bb4819253", "failed", output={"a": 1, "b": 2})

Trigger workflow, retrieve detailed task execution and update status of the second task (assumed to be async) to ‘completed’:

>>> res = client.workflows.executions.run("my workflow", "1")
>>> res = client.workflows.executions.retrieve_detailed(res.id)
>>> res = client.workflows.tasks.update(res.tasks[1].id, "completed")

Workflow Triggers

Upsert Trigger

async WorkflowTriggerAPI.upsert(
workflow_trigger: WorkflowTriggerUpsert,
client_credentials: ClientCredentials | dict | None = None,
) WorkflowTrigger

Create or update a trigger for a workflow.

Parameters:
  • workflow_trigger (WorkflowTriggerUpsert) – The workflow trigger specification.

  • client_credentials (ClientCredentials | dict | None) – Specific credentials that should be used to trigger the workflow execution. When passed will take precedence over the current credentials.

Returns:

The created or updated workflow trigger specification.

Return type:

WorkflowTrigger

Examples

Create or update a scheduled trigger for a workflow:

>>> from cognite.client import CogniteClient
>>> from cognite.client.data_classes.workflows import WorkflowTriggerUpsert, WorkflowScheduledTriggerRule
>>> from zoneinfo import ZoneInfo
>>> client = CogniteClient()
>>> # async_client = AsyncCogniteClient()  # another option
>>> client.workflows.triggers.upsert(
...     WorkflowTriggerUpsert(
...         external_id="my_trigger",
...         trigger_rule=WorkflowScheduledTriggerRule(cron_expression="0 0 * * *", timezone=ZoneInfo("UTC")),
...         workflow_external_id="my_workflow",
...         workflow_version="1",
...         input={"a": 1, "b": 2},
...         metadata={"key": "value"},
...     )
... )

Create or update a data modeling trigger for a workflow:

>>> from cognite.client.data_classes.workflows import WorkflowDataModelingTriggerRule, WorkflowTriggerDataModelingQuery
>>> from cognite.client.data_classes.data_modeling.query import NodeResultSetExpression, Select, SourceSelector
>>> from cognite.client.data_classes.data_modeling import ViewId
>>> from cognite.client.data_classes.filters import Equals
>>> view_id = ViewId("my_space_id", "view_external_id", "v1")
>>> client.workflows.triggers.upsert(
...     WorkflowTriggerUpsert(
...         external_id="my_trigger",
...         trigger_rule=WorkflowDataModelingTriggerRule(
...             data_modeling_query=WorkflowTriggerDataModelingQuery(
...                 with_={"timeseries": NodeResultSetExpression(filter=Equals(view_id.as_property_ref("name"), value="my_name"))},
...                 select={"timeseries": Select([SourceSelector(view_id, ["name"])])},
...             ),
...             batch_size=500,
...             batch_timeout=300,
...         ),
...         workflow_external_id="my_workflow",
...         workflow_version="1",
...     )
... )

List Triggers

async WorkflowTriggerAPI.list(
limit: int | None = 25,
) WorkflowTriggerList

List the workflow triggers.

Parameters:

limit (int | None) – Maximum number of results to return. Defaults to 25. Set to -1, float(“inf”) or None to return all items.

Returns:

The list of triggers.

Return type:

WorkflowTriggerList

Examples

List all triggers:

>>> from cognite.client import CogniteClient, AsyncCogniteClient
>>> client = CogniteClient()
>>> # async_client = AsyncCogniteClient()  # another option
>>> res = client.workflows.triggers.list(limit=None)

List runs for a Trigger

async WorkflowTriggerAPI.list_runs(
external_id: str,
limit: int | None = 25,
) WorkflowTriggerRunList

List the history of runs for a trigger.

Parameters:
  • external_id (str) – The external id of the trigger to list runs for.

  • limit (int | None) – Maximum number of results to return. Defaults to 25. Set to -1, float(“inf”) or None to return all items.

Returns:

The requested trigger runs.

Return type:

WorkflowTriggerRunList

Examples

Get all runs for a trigger with external id ‘my_trigger’:

>>> from cognite.client import CogniteClient, AsyncCogniteClient
>>> client = CogniteClient()
>>> # async_client = AsyncCogniteClient()  # another option
>>> res = client.workflows.triggers.list_runs("my_trigger", limit=None)

Delete Triggers

async WorkflowTriggerAPI.delete(
external_id: str | SequenceNotStr[str],
) None

Delete one or more triggers for a workflow.

Parameters:

external_id (str | SequenceNotStr[str]) – The external id(s) of the trigger(s) to delete.

Examples

Delete a trigger with external id ‘my_trigger’:

>>> from cognite.client import CogniteClient, AsyncCogniteClient
>>> client = CogniteClient()
>>> # async_client = AsyncCogniteClient()  # another option
>>> client.workflows.triggers.delete("my_trigger")

Delete a list of triggers:

>>> client.workflows.triggers.delete(["my_trigger", "another_trigger"])

Pause Trigger

async WorkflowTriggerAPI.pause(external_id: str) None

Pause a workflow trigger.

When a trigger is paused, it will not trigger new workflow executions. This operation is idempotent - pausing an already paused trigger has no effect.

Parameters:

external_id (str) – The external id of the trigger to pause.

Examples

Pause a trigger:

>>> from cognite.client import CogniteClient
>>> client = CogniteClient()
>>> # async_client = AsyncCogniteClient()  # another option
>>> client.workflows.triggers.pause("my_trigger")

Resume Trigger

async WorkflowTriggerAPI.resume(external_id: str) None

Resume a paused workflow trigger.

When a trigger is resumed, it will start triggering workflow executions again according to its trigger rule. This operation is idempotent - resuming an already active trigger has no effect.

Parameters:

external_id (str) – The external id of the trigger to resume.

Examples

Resume a trigger:

>>> from cognite.client import CogniteClient
>>> client = CogniteClient()
>>> # async_client = AsyncCogniteClient()  # another option
>>> client.workflows.triggers.resume("my_trigger")

Data Workflows data classes

class cognite.client.data_classes.workflows.CDFTaskOutput(response: str | dict | None, status_code: int | None)

Bases: WorkflowTaskOutput

The CDF Request output is used to specify the output of a CDF Request.

Parameters:
  • response (str | dict | None) – The response of the CDF Request. Will be a JSON object if content-type is application/json, otherwise will be a string.

  • status_code (int | None) – The status code of the CDF Request.

class cognite.client.data_classes.workflows.CDFTaskParameters(
resource_path: str,
method: Literal['GET', 'POST', 'PUT', 'DELETE'] | str,
query_parameters: dict | str | None = None,
body: dict | str | None = None,
request_timeout_in_millis: int | str = 10000,
)

Bases: WorkflowTaskParameters

The CDF request parameters are used to specify a request to the Cognite Data Fusion API.

Parameters:
  • resource_path (str) – The resource path of the request. Note the path of the request which is prefixed by ‘{cluster}.cognitedata.com/api/v1/project/{project}’ based on the cluster and project of the request.

  • method (Literal['GET', 'POST', 'PUT', 'DELETE'] | str) – The HTTP method of the request.

  • query_parameters (dict | str | None) – The query parameters of the request. Defaults to None.

  • body (dict | str | None) – The body of the request. Defaults to None. Limited to 1024KiB in size

  • request_timeout_in_millis (int | str) – The timeout of the request in milliseconds. Defaults to 10000.

Examples

Call the asset/list endpoint with a limit of 10:

>>> from cognite.client.data_classes import WorkflowTask, CDFTaskParameters
>>> task = WorkflowTask(
...     external_id="task1",
...     parameters=CDFTaskParameters(
...         resource_path="/assets/list",
...         method="GET",
...         query_parameters={"limit": 10},
...     ),
... )
dump(camel_case: bool = True) dict[str, Any]

Dump the instance into a json serializable Python data type.

Parameters:

camel_case (bool) – Use camelCase for attribute names. Defaults to True.

Returns:

A dictionary representation of the instance.

Return type:

dict[str, Any]

class cognite.client.data_classes.workflows.DynamicTaskOutput

Bases: WorkflowTaskOutput

The dynamic task output is used to specify the output of a dynamic task.

class cognite.client.data_classes.workflows.DynamicTaskParameters(
tasks: list[WorkflowTask] | str,
)

Bases: WorkflowTaskParameters

The dynamic task parameters are used to specify a dynamic task.

When the tasks and their order of execution are determined at runtime, we use dynamic tasks. It takes the tasks parameter which is a Reference to an array of function, transformation, and cdf task definitions. This array should be generated and returned by a previous step in the workflow, for instance, a Cognite Function task.

Tip

You can reference data from other tasks or the workflow. You do this by following the format ${prefix.jsonPath} in the expression. Some valid option are:

  • ${workflow.input}: The workflow input.

  • ${<taskExternalId>.output}: The output of the task with the given external id.

  • ${<taskExternalId>.input}: The input of the task with the given external id.

  • ${<taskExternalId>.input.someKey}: A specific key within the input of the task with the given external id.

Parameters:

tasks (list[WorkflowTask] | str) – The tasks to be dynamically executed. The dynamic task is a string that is evaluated during the workflow’s execution. When calling Version Upsert, the tasks parameter must be a Reference string. When calling Execution details, the tasks parameter will be a list of WorkflowTask objects.

dump(camel_case: bool = True) dict[str, Any]

Dump the instance into a json serializable Python data type.

Parameters:

camel_case (bool) – Use camelCase for attribute names. Defaults to True.

Returns:

A dictionary representation of the instance.

Return type:

dict[str, Any]

class cognite.client.data_classes.workflows.FunctionTaskOutput(
call_id: int | None,
function_id: int | None,
response: dict | None,
)

Bases: WorkflowTaskOutput

The class represent the output of Cognite Function task.

Parameters:
  • call_id (int | None) – The call_id of the CDF Function call.

  • function_id (int | None) – The function_id of the CDF Function.

  • response (dict | None) – The response of the CDF Function call.

class cognite.client.data_classes.workflows.FunctionTaskParameters(
external_id: str,
data: dict | str | None = None,
is_async_complete: bool | None = None,
)

Bases: WorkflowTaskParameters

The function parameters are used to specify the Cognite Function to be called.

Parameters:
  • external_id (str) – The external ID of the function to be called.

  • data (dict | str | None) – The data to be passed to the function. Defaults to None. The data can be used to specify the input to the function from previous tasks or the workflow input. See the tip below for more information.

  • is_async_complete (bool | None) – Whether the function is asynchronous. Defaults to None, which the API will interpret as False.

If a function is asynchronous, you need to call the client.workflows.tasks.update() endpoint to update the status of the task. While synchronous tasks update the status automatically.

Tip

You can dynamically specify data from other tasks or the workflow. You do this by following the format ${prefix.jsonPath} in the expression. The valid are:

  • ${workflow.input}: The workflow input.

  • ${<taskExternalId>.output}: The output of the task with the given external id.

  • ${<taskExternalId>.input}: The input of the task with the given external id.

  • ${<taskExternalId>.input.someKey}: A specific key within the input of the task with the given external id.

For example, if you have a workflow containing two tasks, and the external_id of the first task is task1 then, you can specify the data for the second task as follows:

>>> from cognite.client.data_classes  import WorkflowTask, FunctionTaskParameters
>>> task = WorkflowTask(
...     external_id="task2",
...     parameters=FunctionTaskParameters(
...         external_id="cdf_deployed_function",
...         data={
...             "workflow_data": "${workflow.input}",
...             "task1_input": "${task1.input}",
...             "task1_output": "${task1.output}"
...         },
...     ),
... )
dump(camel_case: bool = True) dict[str, Any]

Dump the instance into a json serializable Python data type.

Parameters:

camel_case (bool) – Use camelCase for attribute names. Defaults to True.

Returns:

A dictionary representation of the instance.

Return type:

dict[str, Any]

class cognite.client.data_classes.workflows.SimulationTaskOutput(
run_id: int | None,
log_id: int | None,
status_message: str | None,
)

Bases: WorkflowTaskOutput

The class represent the output of Simulation execution.

Parameters:
  • run_id (int | None) – The run ID of the simulation run.

  • log_id (int | None) – The log ID of the simulation run.

  • status_message (str | None) – Status message of the simulation execution.

class cognite.client.data_classes.workflows.SimulationTaskParameters(
routine_external_id: str,
run_time: int | None = None,
inputs: list[SimulationInputOverride] | None = None,
)

Bases: WorkflowTaskParameters

The simulation parameters are used to specify the simulation routine to be executed.

Parameters:
  • routine_external_id (str) – The external ID of the simulation routine to be executed.

  • run_time (int | None) – Reference timestamp used for data pre-processing and data sampling.

  • inputs (list[SimulationInputOverride] | None) – List of input overrides.

dump(camel_case: bool = True) dict[str, Any]

Dump the instance into a json serializable Python data type.

Parameters:

camel_case (bool) – Use camelCase for attribute names. Defaults to True.

Returns:

A dictionary representation of the instance.

Return type:

dict[str, Any]

class cognite.client.data_classes.workflows.SubworkflowReferenceParameters(workflow_external_id: str, version: str)

Bases: WorkflowTaskParameters

The subworkflow task parameters are used to specify a subworkflow task. When a workflow is made of stages with dependencies between them, we can use subworkflow tasks for convenience. The subworkflow reference is used to specifying a reference to another workflow which will be embedded into the execution at start time.

Parameters:
  • workflow_external_id (str) – The external ID of the referenced workflow.

  • version (str) – The version of the referenced workflow.

dump(
camel_case: bool = True,
) dict[str, Any]

Dump the instance into a json serializable Python data type.

Parameters:

camel_case (bool) – Use camelCase for attribute names. Defaults to True.

Returns:

A dictionary representation of the instance.

Return type:

dict[str, Any]

class cognite.client.data_classes.workflows.SubworkflowTaskOutput

Bases: WorkflowTaskOutput

The subworkflow task output is used to specify the output of a subworkflow task.

class cognite.client.data_classes.workflows.SubworkflowTaskParameters(
tasks: list[WorkflowTask],
)

Bases: WorkflowTaskParameters

The subworkflow task parameters are used to specify a subworkflow task.

When a workflow is made of stages with dependencies between them, we can use subworkflow tasks for convenience. It takes the tasks parameter which is an array of function, transformation, cdf, …, task definitions. This array needs to be statically set on the workflow definition (if it needs to be defined at runtime, use a dynamic task).

Parameters:

tasks (list[WorkflowTask]) – The tasks belonging to the subworkflow.

dump(
camel_case: bool = True,
) dict[str, Any]

Dump the instance into a json serializable Python data type.

Parameters:

camel_case (bool) – Use camelCase for attribute names. Defaults to True.

Returns:

A dictionary representation of the instance.

Return type:

dict[str, Any]

class cognite.client.data_classes.workflows.TransformationTaskOutput(job_id: int | None)

Bases: WorkflowTaskOutput

The transformation output is used to specify the output of a transformation task.

Parameters:

job_id (int | None) – The job id of the transformation job.

class cognite.client.data_classes.workflows.TransformationTaskParameters(
external_id: str,
concurrency_policy: Literal['fail', 'restartAfterCurrent', 'waitForCurrent'] = 'fail',
use_transformation_credentials: bool = False,
)

Bases: WorkflowTaskParameters

The transformation parameters are used to specify the transformation to be called.

Parameters:
  • external_id (str) – The external ID of the transformation to be called.

  • concurrency_policy (Literal['fail', 'restartAfterCurrent', 'waitForCurrent']) – Determines the behavior of the task if the Transformation is already running. fail: The task fails if another instance of the Transformation is currently running. waitForCurrent: The task will pause and wait for the already running Transformation to complete. Once completed, the task is completed. This mode is useful for preventing redundant Transformation runs. restartAfterCurrent: The task waits for the ongoing Transformation to finish. After completion, the task restarts the Transformation. This mode ensures that the most recent data can be used by following tasks.

  • use_transformation_credentials (bool) – If set to true, the transformation will be run using the client credentials configured on the transformation. If set to false, the transformation will be run using the client credentials used to trigger the workflow.

dump(
camel_case: bool = True,
) dict[str, Any]

Dump the instance into a json serializable Python data type.

Parameters:

camel_case (bool) – Use camelCase for attribute names. Defaults to True.

Returns:

A dictionary representation of the instance.

Return type:

dict[str, Any]

class cognite.client.data_classes.workflows.Workflow(
external_id: str,
created_time: int,
last_updated_time: int,
description: str | None = None,
data_set_id: int | None = None,
max_concurrent_executions: int | None = None,
)

Bases: WorkflowCore

This class represents a workflow. This is the read version, used when reading or listing workflows.

Parameters:
  • external_id (str) – The external ID provided by the client. Must be unique for the resource type.

  • created_time (int) – The time when the workflow was created. Unix timestamp in milliseconds.

  • last_updated_time (int) – The time when the workflow was last updated. Unix timestamp in milliseconds.

  • description (str | None) – Description of the workflow. Defaults to None.

  • data_set_id (int | None) – The id of the data set this workflow belongs to.

  • max_concurrent_executions (int | None) – Maximum concurrent executions for this workflow. Defaults to None, which means the workflow will use the project limit.

as_write() WorkflowUpsert

Returns this workflow in the writing format.

class cognite.client.data_classes.workflows.WorkflowCore(
external_id: str,
description: str | None = None,
data_set_id: int | None = None,
max_concurrent_executions: int | None = None,
)

Bases: WriteableCogniteResource[WorkflowUpsert], ABC

class cognite.client.data_classes.workflows.WorkflowDataModelingTriggerRule(
data_modeling_query: WorkflowTriggerDataModelingQuery,
batch_size: int | None = None,
batch_timeout: int | None = None,
)

Bases: WorkflowTriggerRule

This class represents a data modeling trigger rule.

Parameters:
  • data_modeling_query (WorkflowTriggerDataModelingQuery) – The data modeling query of the trigger.

  • batch_size (int | None) – The batch size of the trigger.

  • batch_timeout (int | None) – The batch timeout of the trigger.

dump(
camel_case: bool = True,
) dict[str, Any]

Dump the instance into a json serializable Python data type.

Parameters:

camel_case (bool) – Use camelCase for attribute names. Defaults to True.

Returns:

A dictionary representation of the instance.

Return type:

dict[str, Any]

class cognite.client.data_classes.workflows.WorkflowDefinition(
hash_: str,
tasks: list[WorkflowTask],
description: str | None = None,
)

Bases: WorkflowDefinitionCore

This class represents a workflow definition. This represents the read version of a workflow definition.

A workflow definition defines the tasks and order/dependencies of these tasks.

Parameters:
  • hash (str) – The hash of the tasks and description. This is used to uniquely identify the workflow definition as you can overwrite a workflow version.

  • tasks (list[WorkflowTask]) – The tasks of the workflow definition.

  • description (str | None) – The description of the workflow definition. Defaults to None.

as_write() WorkflowDefinitionUpsert

Returns this WorkflowDefinition in its write format.

dump(camel_case: bool = True) dict[str, Any]

Dump the instance into a json serializable Python data type.

Parameters:

camel_case (bool) – Use camelCase for attribute names. Defaults to True.

Returns:

A dictionary representation of the instance.

Return type:

dict[str, Any]

class cognite.client.data_classes.workflows.WorkflowDefinitionCore(
tasks: list[WorkflowTask],
description: str | None = None,
)

Bases: WriteableCogniteResource[WorkflowDefinitionUpsert], ABC

This class represents a workflow definition.

A workflow definition defines the tasks and order/dependencies of these tasks.

Parameters:
  • tasks (list[WorkflowTask]) – The tasks of the workflow definition.

  • description (str | None) – The description of the workflow definition. Note that when updating a workflow definition description, it will always be overwritten also if it is set to None. Meaning if the workflow definition already has a description, and you want to keep it, you need to provide the description when updating it.

dump(camel_case: bool = True) dict[str, Any]

Dump the instance into a json serializable Python data type.

Parameters:

camel_case (bool) – Use camelCase for attribute names. Defaults to True.

Returns:

A dictionary representation of the instance.

Return type:

dict[str, Any]

class cognite.client.data_classes.workflows.WorkflowDefinitionUpsert(
tasks: list[WorkflowTask],
description: str | None = None,
)

Bases: WorkflowDefinitionCore

This class represents a workflow definition. This represents the write/update version of a workflow definition.

A workflow definition defines the tasks and order/dependencies of these tasks.

Parameters:
  • tasks (list[WorkflowTask]) – The tasks of the workflow definition.

  • description (str | None) – The description of the workflow definition. Note that when updating a workflow definition description, it will always be overwritten also if it is set to None. Meaning if the workflow definition already has a description, and you want to keep it, you need to provide the description when updating it.

as_write() WorkflowDefinitionUpsert

Returns this WorkflowDefinitionUpsert in its write format.

dump(camel_case: bool = True) dict[str, Any]

Dump the instance into a json serializable Python data type.

Parameters:

camel_case (bool) – Use camelCase for attribute names. Defaults to True.

Returns:

A dictionary representation of the instance.

Return type:

dict[str, Any]

class cognite.client.data_classes.workflows.WorkflowExecution(
id: str,
workflow_external_id: str,
status: Literal['completed', 'failed', 'running', 'terminated', 'timed_out'],
created_time: int,
version: str | None = None,
start_time: int | None = None,
end_time: int | None = None,
reason_for_incompletion: str | None = None,
metadata: dict | None = None,
)

Bases: CogniteResource

This class represents a workflow execution.

Parameters:
  • id (str) – The server generated id of the workflow execution.

  • workflow_external_id (str) – The external ID of the workflow.

  • status (WorkflowStatus) – The status of the workflow execution.

  • created_time (int) – The time when the workflow execution was created. Unix timestamp in milliseconds.

  • version (str | None) – The version of the workflow. Defaults to None.

  • start_time (int | None) – The start time of the workflow execution. Unix timestamp in milliseconds. Defaults to None.

  • end_time (int | None) – The end time of the workflow execution. Unix timestamp in milliseconds. Defaults to None.

  • reason_for_incompletion (str | None) – Provides the reason if the workflow did not complete successfully. Defaults to None.

  • metadata (dict | None) – Application specific metadata.

class cognite.client.data_classes.workflows.WorkflowExecutionDetailed(
id: str,
workflow_external_id: str,
workflow_definition: WorkflowDefinition,
status: Literal['completed', 'failed', 'running', 'terminated', 'timed_out'],
executed_tasks: list[WorkflowTaskExecution],
created_time: int,
version: str | None = None,
start_time: int | None = None,
end_time: int | None = None,
reason_for_incompletion: str | None = None,
input: dict | None = None,
metadata: dict | None = None,
)

Bases: WorkflowExecution

This class represents a detailed workflow execution.

A detailed workflow execution contains the input and output of each task in the workflow execution. In addition, it contains the workflow definition of the workflow.

Parameters:
  • id (str) – The server generated id of the workflow execution.

  • workflow_external_id (str) – The external ID of the workflow.

  • workflow_definition (WorkflowDefinition) – The workflow definition of the workflow.

  • status (WorkflowStatus) – The status of the workflow execution.

  • executed_tasks (list[WorkflowTaskExecution]) – The executed tasks of the workflow execution.

  • created_time (int) – The time when the workflow execution was created. Unix timestamp in milliseconds.

  • version (str | None) – The version of the workflow. Defaults to None.

  • start_time (int | None) – The start time of the workflow execution. Unix timestamp in milliseconds. Defaults to None.

  • end_time (int | None) – The end time of the workflow execution. Unix timestamp in milliseconds. Defaults to None.

  • reason_for_incompletion (str | None) – Provides the reason if the workflow did not complete successfully. Defaults to None.

  • input (dict | None) – Input arguments the workflow was triggered with.

  • metadata (dict | None) – Metadata set when the workflow was triggered.

dump(
camel_case: bool = True,
) dict[str, Any]

Dump the instance into a json serializable Python data type.

Parameters:

camel_case (bool) – Use camelCase for attribute names. Defaults to True.

Returns:

A dictionary representation of the instance.

Return type:

dict[str, Any]

class cognite.client.data_classes.workflows.WorkflowExecutionList(
resources: Sequence[T_CogniteResource],
)

Bases: CogniteResourceList[WorkflowExecution], InternalIdTransformerMixin

This class represents a list of workflow executions.

class cognite.client.data_classes.workflows.WorkflowIds(
workflow_ids: Collection[WorkflowVersionId],
)

Bases: UserList

This class represents a list of Workflow Version Identifiers.

class cognite.client.data_classes.workflows.WorkflowList(
resources: Sequence[T_CogniteResource],
)

Bases: WriteableCogniteResourceList[WorkflowUpsert, Workflow], ExternalIDTransformerMixin

This class represents a list of workflows.

as_write() WorkflowUpsertList

Returns these workflows in the writing format.

class cognite.client.data_classes.workflows.WorkflowScheduledTriggerRule(
cron_expression: str,
timezone: ZoneInfo | None = None,
)

Bases: WorkflowTriggerRule

This class represents a scheduled trigger rule.

Parameters:
  • cron_expression (str) – The cron specification for the scheduled trigger.

  • timezone (ZoneInfo | None) – The timezone in which the scheduled trigger should be evaluated. If not provided, UTC will be used as the default timezone on the server side.

dump(
camel_case: bool = True,
) dict[str, Any]

Dump the instance into a json serializable Python data type.

Parameters:

camel_case (bool) – Use camelCase for attribute names. Defaults to True.

Returns:

A dictionary representation of the instance.

Return type:

dict[str, Any]

class cognite.client.data_classes.workflows.WorkflowTask(
external_id: str,
parameters: WorkflowTaskParameters,
name: str | None = None,
description: str | None = None,
retries: int = 3,
timeout: int = 3600,
on_failure: Literal['abortWorkflow', 'skipTask'] = 'abortWorkflow',
depends_on: list[str] | None = None,
)

Bases: CogniteResource

This class represents a workflow task.

Note

Tasks do not distinguish between write and read versions.

Parameters:
  • external_id (str) – The external ID provided by the client. Must be unique for the resource type.

  • parameters (WorkflowTaskParameters) – The parameters of the task.

  • name (str | None) – The name of the task. Defaults to None.

  • description (str | None) – The description of the task. Defaults to None.

  • retries (int) – The number of retries for the task. Defaults to 3.

  • timeout (int) – The timeout of the task in seconds. Defaults to 3600.

  • on_failure (Literal['abortWorkflow', 'skipTask']) – The policy to handle failures and timeouts. Defaults to abortWorkflow. skipTask: For both failures and timeouts, the task will retry until the retries are exhausted. After that, the Task is marked as COMPLETED_WITH_ERRORS and the subsequent tasks are executed. abortWorkflow: In case of failures, retries will be performed until exhausted. After which the task is marked as FAILED and the Workflow is marked the same. In the event of a timeout, no retries are undertaken; the task is marked as TIMED_OUT and the Workflow is marked as FAILED.

  • depends_on (list[str] | None) – The external ids of the tasks that this task depends on. Defaults to None.

dump(camel_case: bool = True) dict[str, Any]

Dump the instance into a json serializable Python data type.

Parameters:

camel_case (bool) – Use camelCase for attribute names. Defaults to True.

Returns:

A dictionary representation of the instance.

Return type:

dict[str, Any]

class cognite.client.data_classes.workflows.WorkflowTaskExecution(
id: str,
external_id: str,
status: Literal['in_progress', 'cancelled', 'failed', 'failed_with_terminal_error', 'completed', 'completed_with_errors', 'timed_out', 'skipped'],
input: WorkflowTaskParameters,
output: WorkflowTaskOutput,
version: str | None = None,
start_time: int | None = None,
end_time: int | None = None,
reason_for_incompletion: str | None = None,
)

Bases: CogniteResource

This class represents a task execution.

Parameters:
  • id (str) – The server generated id of the task execution.

  • external_id (str) – The external ID provided by the client. Must be unique for the resource type.

  • status (TaskStatus) – The status of the task execution.

  • input (WorkflowTaskParameters) – The input parameters of the task execution.

  • output (WorkflowTaskOutput) – The output of the task execution.

  • version (str | None) – The version of the task execution. Defaults to None.

  • start_time (int | None) – The start time of the task execution. Unix timestamp in milliseconds. Defaults to None.

  • end_time (int | None) – The end time of the task execution. Unix timestamp in milliseconds. Defaults to None.

  • reason_for_incompletion (str | None) – Provides the reason if the workflow did not complete successfully. Defaults to None.

dump(camel_case: bool = True) dict[str, Any]

Dump the instance into a json serializable Python data type.

Parameters:

camel_case (bool) – Use camelCase for attribute names. Defaults to True.

Returns:

A dictionary representation of the instance.

Return type:

dict[str, Any]

class cognite.client.data_classes.workflows.WorkflowTaskOutput

Bases: ABC

class cognite.client.data_classes.workflows.WorkflowTaskParameters

Bases: CogniteResource, ABC

class cognite.client.data_classes.workflows.WorkflowTrigger(
external_id: str,
trigger_rule: WorkflowTriggerRule,
workflow_external_id: str,
workflow_version: str,
is_paused: bool,
input: dict | None,
metadata: dict | None,
created_time: int,
last_updated_time: int,
)

Bases: WorkflowTriggerCore

This class represents a workflow trigger.

Parameters:
  • external_id (str) – The external ID provided by the client. Must be unique for the resource type.

  • trigger_rule (WorkflowTriggerRule) – The trigger rule of the workflow version trigger.

  • workflow_external_id (str) – The external ID of the workflow.

  • workflow_version (str) – The version of the workflow.

  • is_paused (bool) – Whether the trigger is paused.

  • input (dict | None) – The input data passed to the workflow when an execution is started.

  • metadata (dict | None) – Application specific metadata.

  • created_time (int) – The time when the workflow version trigger was created. Unix timestamp in milliseconds.

  • last_updated_time (int) – The time when the workflow version trigger was last updated. Unix timestamp in milliseconds.

as_write() WorkflowTriggerUpsert

Returns this workflow trigger instance.

dump(camel_case: bool = True) dict[str, Any]

Dump the instance into a json serializable Python data type.

Parameters:

camel_case (bool) – Use camelCase for attribute names. Defaults to True.

Returns:

A dictionary representation of the instance.

Return type:

dict[str, Any]

class cognite.client.data_classes.workflows.WorkflowTriggerCore(
external_id: str,
trigger_rule: WorkflowTriggerRule,
workflow_external_id: str,
workflow_version: str,
)

Bases: WriteableCogniteResource[WorkflowTriggerUpsert], ABC

This class represents a base class for a workflow trigger.

Parameters:
  • external_id (str) – The external ID provided by the client. Must be unique for the resource type.

  • trigger_rule (WorkflowTriggerRule) – The trigger rule of the workflow version trigger.

  • workflow_external_id (str) – The external ID of the workflow.

  • workflow_version (str) – The version of the workflow.

class cognite.client.data_classes.workflows.WorkflowTriggerDataModelingQuery(
with_: dict[str, ResultSetExpression],
select: dict[str, Select],
)

Bases: Query

This class represents a data modeling trigger query.

Parameters:
  • with (dict[str, ResultSetExpression]) – A dictionary of result set expressions to use in the query. The keys are used to reference the result set expressions in the select.

  • select (dict[str, Select]) – A dictionary of select expressions to use in the query. The keys must match the keys in the with_ dictionary. The select expressions define which properties to include in the result set.

class cognite.client.data_classes.workflows.WorkflowTriggerList(
resources: Sequence[T_CogniteResource],
)

Bases: WriteableCogniteResourceList[WorkflowTriggerUpsert, WorkflowTrigger], ExternalIDTransformerMixin

This class represents a list of workflow triggers.

as_write() WorkflowTriggerUpsertList

Returns a WorkflowTriggerUpsertList object with the same data.

class cognite.client.data_classes.workflows.WorkflowTriggerRule

Bases: CogniteResource, ABC

This is the base class for all workflow trigger rules.

dump(camel_case: bool = True) dict[str, Any]

Dump the instance into a json serializable Python data type.

Parameters:

camel_case (bool) – Use camelCase for attribute names. Defaults to True.

Returns:

A dictionary representation of the instance.

Return type:

dict[str, Any]

class cognite.client.data_classes.workflows.WorkflowTriggerRun(
external_id: str,
fire_time: int,
workflow_external_id: str,
workflow_version: str,
status: Literal['success', 'failed'],
workflow_execution_id: str | None,
reason_for_failure: str | None,
)

Bases: CogniteResource

This class represents a workflow trigger run.

dump(camel_case: bool = True) dict[str, Any]

Dump the instance into a json serializable Python data type.

Parameters:

camel_case (bool) – Use camelCase for attribute names. Defaults to True.

Returns:

A dictionary representation of the instance.

Return type:

dict[str, Any]

class cognite.client.data_classes.workflows.WorkflowTriggerRunList(
resources: Sequence[T_CogniteResource],
)

Bases: CogniteResourceList[WorkflowTriggerRun], ExternalIDTransformerMixin

This class represents a list of workflow trigger runs.

class cognite.client.data_classes.workflows.WorkflowTriggerUpsert(
external_id: str,
trigger_rule: WorkflowTriggerRule,
workflow_external_id: str,
workflow_version: str,
input: dict | None = None,
metadata: dict | None = None,
)

Bases: WorkflowTriggerCore

This class represents a workflow trigger for upsertion.

Parameters:
  • external_id (str) – The external ID provided by the client. Must be unique for the resource type.

  • trigger_rule (WorkflowTriggerRule) – The trigger rule of the workflow version trigger.

  • workflow_external_id (str) – The external ID of the workflow.

  • workflow_version (str) – The version of the workflow.

  • input (dict | None) – The input data of the workflow version trigger. Defaults to None.

  • metadata (dict | None) – Application specific metadata. Defaults to None.

as_write() WorkflowTriggerUpsert

Returns this workflow trigger create instance.

dump(camel_case: bool = True) dict[str, Any]

Dump the instance into a json serializable Python data type.

Parameters:

camel_case (bool) – Use camelCase for attribute names. Defaults to True.

Returns:

A dictionary representation of the instance.

Return type:

dict[str, Any]

class cognite.client.data_classes.workflows.WorkflowTriggerUpsertList(
resources: Sequence[T_CogniteResource],
)

Bases: CogniteResourceList[WorkflowTriggerUpsert], ExternalIDTransformerMixin

class cognite.client.data_classes.workflows.WorkflowUpsert(
external_id: str,
description: str | None = None,
data_set_id: int | None = None,
max_concurrent_executions: int | None = None,
)

Bases: WorkflowCore

This class represents a workflow. This is the write version, used when creating or updating a workflow.

Parameters:
  • external_id (str) – The external ID provided by the client. Must be unique for the resource type.

  • description (str | None) – Description of the workflow. Note that when updating a workflow, the description will always be overwritten also if it is set to None. Meaning if the workflow already has a description, and you want to keep it, you need to provide the description when updating the workflow.

  • data_set_id (int | None) – The id of the data set this workflow belongs to. If a dataSetId is provided, any operations on this workflow, or its versions, executions, and triggers will require appropriate access to the data set. More information on data sets and their configuration can be found here: https://docs.cognite.com/cdf/data_governance/concepts/datasets/

  • max_concurrent_executions (int | None) – Maximum concurrent executions for this workflow. Defaults to None, which means the workflow will use the project limit.

as_write() WorkflowUpsert

Returns this workflow instance.

class cognite.client.data_classes.workflows.WorkflowUpsertList(
resources: Sequence[T_CogniteResource],
)

Bases: CogniteResourceList[WorkflowUpsert], ExternalIDTransformerMixin

class cognite.client.data_classes.workflows.WorkflowVersion(
workflow_external_id: str,
version: str,
workflow_definition: WorkflowDefinition,
created_time: int,
last_updated_time: int,
)

Bases: WorkflowVersionCore

This class represents a workflow version. This is the read variant, used when retrieving/listing a workflow variant.

Parameters:
  • workflow_external_id (str) – The external ID of the workflow.

  • version (str) – The version of the workflow.

  • workflow_definition (WorkflowDefinition) – The workflow definition of the workflow version.

  • created_time (int) – The number of milliseconds since 00:00:00 Thursday, 1 January 1970, Coordinated Universal Time (UTC), minus leap seconds.

  • last_updated_time (int) – The number of milliseconds since 00:00:00 Thursday, 1 January 1970, Coordinated Universal Time (UTC), minus leap seconds.

as_write() WorkflowVersionUpsert

Returns a WorkflowVersionUpsert object with the same data.

dump(camel_case: bool = True) dict[str, Any]

Dump the instance into a json serializable Python data type.

Parameters:

camel_case (bool) – Use camelCase for attribute names. Defaults to True.

Returns:

A dictionary representation of the instance.

Return type:

dict[str, Any]

class cognite.client.data_classes.workflows.WorkflowVersionCore(workflow_external_id: str, version: str)

Bases: WriteableCogniteResource[WorkflowVersionUpsert], ABC

This class represents a workflow version.

Parameters:
  • workflow_external_id (str) – The external ID of the workflow.

  • version (str) – The version of the workflow.

class cognite.client.data_classes.workflows.WorkflowVersionId(workflow_external_id: str, version: str | None = None)

Bases: object

This class represents a Workflow Version Identifier.

Parameters:
  • workflow_external_id (str) – The external ID of the workflow.

  • version (str, optional) – The version of the workflow. Defaults to None.

class cognite.client.data_classes.workflows.WorkflowVersionList(
resources: Sequence[T_CogniteResource],
)

Bases: WriteableCogniteResourceList[WorkflowVersionUpsert, WorkflowVersion]

This class represents a list of workflow versions.

as_ids() WorkflowIds

Returns a WorkflowIds object with the workflow version ids.

as_write() WorkflowVersionUpsertList

Returns a WorkflowVersionUpsertList object with the same data.

class cognite.client.data_classes.workflows.WorkflowVersionUpsert(
workflow_external_id: str,
version: str,
workflow_definition: WorkflowDefinitionUpsert,
)

Bases: WorkflowVersionCore

This class represents a workflow version. This is the write-variant, used when creating or updating a workflow variant.

Parameters:
  • workflow_external_id (str) – The external ID of the workflow.

  • version (str) – The version of the workflow.

  • workflow_definition (WorkflowDefinitionUpsert) – The workflow definition of the workflow version.

as_write() WorkflowVersionUpsert

Returns this WorkflowVersionUpsert instance.

dump(camel_case: bool = True) dict[str, Any]

Dump the instance into a json serializable Python data type.

Parameters:

camel_case (bool) – Use camelCase for attribute names. Defaults to True.

Returns:

A dictionary representation of the instance.

Return type:

dict[str, Any]

class cognite.client.data_classes.workflows.WorkflowVersionUpsertList(
resources: Sequence[T_CogniteResource],
)

Bases: CogniteResourceList[WorkflowVersionUpsert]

This class represents a list of workflow versions.

as_ids() WorkflowIds

Returns a WorkflowIds object with the workflow version ids.