Data Workflows
Workflows
Upsert Workflow
- WorkflowAPI.upsert(workflow: WorkflowUpsert, mode: Literal['replace'] = 'replace') Workflow
-
Note this is an upsert endpoint, so if a workflow with the same external id already exists, it will be updated.
- Parameters
workflow (WorkflowUpsert) – The workflow to create or update.
mode (Literal['replace']) – This is not an option for the API, but is included here to document that the upserts are always done in replace mode.
- Returns
The created workflow.
- Return type
Examples
Create workflow my workflow:
>>> from cognite.client import CogniteClient >>> from cognite.client.data_classes import WorkflowUpsert >>> client = CogniteClient() >>> res = client.workflows.upsert(WorkflowUpsert(external_id="my workflow", description="my workflow description"))
Delete Workflow(s)
- WorkflowAPI.delete(external_id: Union[str, SequenceNotStr[str]], ignore_unknown_ids: bool = False) None
Delete one or more workflows with versions.
- Parameters
external_id (str | SequenceNotStr[str]) – External id or list of external ids to delete.
ignore_unknown_ids (bool) – Ignore external ids that are not found rather than throw an exception.
Examples
Delete workflow my workflow:
>>> from cognite.client import CogniteClient >>> client = CogniteClient() >>> client.workflows.delete("my workflow")
Retrieve Workflow
- WorkflowAPI.retrieve(external_id: str) cognite.client.data_classes.workflows.Workflow | None
-
- Parameters
external_id (str) – Identifier for a Workflow. Must be unique for the project.
- Returns
The requested workflow if it exists, None otherwise.
- Return type
Workflow | None
Examples
Retrieve workflow my workflow:
>>> from cognite.client import CogniteClient >>> client = CogniteClient() >>> res = client.workflows.retrieve("my workflow")
List Workflows
- WorkflowAPI.list(limit: int | None = 25) WorkflowList
List all workflows in the project.
- Parameters
limit (int | None) – Maximum number of results to return. Defaults to 25. Set to -1, float(“inf”) or None
- Returns
All workflows in the CDF project.
- Return type
Examples
List all workflows:
>>> from cognite.client import CogniteClient >>> client = CogniteClient() >>> res = client.workflows.list()
Workflow Versions
Upsert Workflow Version
- WorkflowVersionAPI.upsert(version: WorkflowVersionUpsert, mode: Literal['replace'] = 'replace') WorkflowVersion
-
Note this is an upsert endpoint, so if a workflow with the same version external id already exists, it will be updated.
Furthermore, if the workflow does not exist, it will be created.
- Parameters
version (WorkflowVersionUpsert) – The workflow version to create or update.
mode (Literal['replace']) – This is not an option for the API, but is included here to document that the upserts are always done in replace mode.
- Returns
The created workflow version.
- Return type
Examples
Create workflow version with one Function task:
>>> from cognite.client import CogniteClient >>> from cognite.client.data_classes import WorkflowVersionUpsert, WorkflowDefinitionUpsert, WorkflowTask, FunctionTaskParameters >>> client = CogniteClient() >>> new_version = WorkflowVersionUpsert( ... workflow_external_id="my_workflow", ... version="1", ... workflow_definition=WorkflowDefinitionUpsert( ... tasks=[ ... WorkflowTask( ... external_id="my_workflow-task1", ... parameters=FunctionTaskParameters( ... external_id="cdf_deployed_function:my_function", ... data={"a": 1, "b": 2}, ... ), ... ) ... ], ... description="This workflow has one step", ... ), ... ) >>> res = client.workflows.versions.upsert(new_version)
Delete Workflow Version(s)
- WorkflowVersionAPI.delete(workflow_version_id: cognite.client.data_classes.workflows.WorkflowVersionId | tuple[str, str] | collections.abc.MutableSequence[cognite.client.data_classes.workflows.WorkflowVersionId | tuple[str, str]], ignore_unknown_ids: bool = False) None
-
- Parameters
workflow_version_id (WorkflowVersionIdentifier | MutableSequence[WorkflowVersionIdentifier]) – Workflow version id or list of workflow version ids to delete.
ignore_unknown_ids (bool) – Ignore external ids that are not found rather than throw an exception.
Examples
Delete workflow version “1” of workflow “my workflow” specified by using a tuple:
>>> from cognite.client import CogniteClient >>> client = CogniteClient() >>> client.workflows.versions.delete(("my workflow", "1"))
Delete workflow version “1” of workflow “my workflow” and workflow version “2” of workflow “my workflow 2” using the WorkflowVersionId class:
>>> from cognite.client import CogniteClient >>> from cognite.client.data_classes import WorkflowVersionId >>> client = CogniteClient() >>> client.workflows.versions.delete([WorkflowVersionId("my workflow", "1"), WorkflowVersionId("my workflow 2", "2")])
Retrieve Workflow Version
- WorkflowVersionAPI.retrieve(workflow_external_id: str, version: str) cognite.client.data_classes.workflows.WorkflowVersion | None
-
- Parameters
workflow_external_id (str) – External id of the workflow.
version (str) – Version of the workflow.
- Returns
The requested workflow version if it exists, None otherwise.
- Return type
WorkflowVersion | None
Examples
Retrieve workflow version 1 of workflow my workflow:
>>> from cognite.client import CogniteClient >>> client = CogniteClient() >>> res = client.workflows.versions.retrieve("my workflow", "1")
List Workflow Versions
- WorkflowVersionAPI.list(workflow_version_ids: Optional[Union[WorkflowVersionId, tuple[str, str], str, MutableSequence[cognite.client.data_classes.workflows.WorkflowVersionId | tuple[str, str] | str]]] = None, limit: int = 25) WorkflowVersionList
List workflow versions in the project
- Parameters
workflow_version_ids (WorkflowIdentifier | MutableSequence[WorkflowIdentifier] | None) – Workflow version id or list of workflow version ids to filter on.
limit (int) – Maximum number of results to return. Defaults to 25. Set to -1, float(“inf”) or None
- Returns
The requested workflow versions.
- Return type
Examples
Get all workflow version for workflows ‘my_workflow’ and ‘my_workflow_2’:
>>> from cognite.client import CogniteClient >>> client = CogniteClient() >>> res = client.workflows.versions.list(["my_workflow", "my_workflow_2"])
Get all workflow versions for workflows ‘my_workflow’ and ‘my_workflow_2’ using the WorkflowVersionId class:
>>> from cognite.client import CogniteClient >>> from cognite.client.data_classes import WorkflowVersionId >>> client = CogniteClient() >>> res = client.workflows.versions.list([WorkflowVersionId("my_workflow"), WorkflowVersionId("my_workflow_2")])
Get all workflow versions for workflows ‘my_workflow’ version ‘1’ and ‘my_workflow_2’ version ‘2’ using tuples:
>>> from cognite.client import CogniteClient >>> client = CogniteClient() >>> res = client.workflows.versions.list([("my_workflow", "1"), ("my_workflow_2", "2")])
Workflow Executions
List Workflow Executions
- WorkflowExecutionAPI.list(workflow_version_ids: Optional[Union[WorkflowVersionId, tuple[str, str], MutableSequence[cognite.client.data_classes.workflows.WorkflowVersionId | tuple[str, str]]]] = None, created_time_start: Optional[int] = None, created_time_end: Optional[int] = None, statuses: Optional[Union[Literal['completed', 'failed', 'running', 'terminated', 'timed_out'], MutableSequence[Literal['completed', 'failed', 'running', 'terminated', 'timed_out']]]] = None, limit: int = 25) WorkflowExecutionList
List workflow executions in the project.
- Parameters
workflow_version_ids (WorkflowVersionIdentifier | MutableSequence[WorkflowVersionIdentifier] | None) – Workflow version id or list of workflow version ids to filter on.
created_time_start (int | None) – Filter out executions that was created before this time. Time is in milliseconds since epoch.
created_time_end (int | None) – Filter out executions that was created after this time. Time is in milliseconds since epoch.
statuses (WorkflowStatus | MutableSequence[WorkflowStatus] | None) – Workflow status or list of workflow statuses to filter on.
limit (int) – Maximum number of results to return. Defaults to 25. Set to -1, float(“inf”) or None to return all items.
- Returns
The requested workflow executions.
- Return type
Examples
Get all workflow executions for workflows ‘my_workflow’ version ‘1’:
>>> from cognite.client import CogniteClient >>> client = CogniteClient() >>> res = client.workflows.executions.list(("my_workflow", "1"))
Get all workflow executions for workflows after last 24 hours:
>>> from cognite.client import CogniteClient >>> from datetime import datetime, timedelta >>> client = CogniteClient() >>> res = client.workflows.executions.list(created_time_start=int((datetime.now() - timedelta(days=1)).timestamp() * 1000))
Retrieve Detailed Workflow Execution
- WorkflowExecutionAPI.retrieve_detailed(id: str) cognite.client.data_classes.workflows.WorkflowExecutionDetailed | None
Retrieve a workflow execution with detailed information.
- Parameters
id (str) – The server-generated id of the workflow execution.
- Returns
The requested workflow execution if it exists, None otherwise.
- Return type
WorkflowExecutionDetailed | None
Examples
Retrieve workflow execution with UUID ‘000560bc-9080-4286-b242-a27bb4819253’:
>>> from cognite.client import CogniteClient >>> client = CogniteClient() >>> res = client.workflows.executions.retrieve_detailed("000560bc-9080-4286-b242-a27bb4819253")
List workflow executions and retrieve detailed information for the first one:
>>> from cognite.client import CogniteClient >>> client = CogniteClient() >>> res = client.workflows.executions.list() >>> res = client.workflows.executions.retrieve_detailed(res[0].id)
Run Workflow Execution
- WorkflowExecutionAPI.run(workflow_external_id: str, version: str, input: dict | None = None, metadata: dict | None = None, client_credentials: ClientCredentials | None = None, nonce: str | None = None) WorkflowExecution
-
- Parameters
workflow_external_id (str) – External id of the workflow.
version (str) – Version of the workflow.
input (dict | None) – The input to the workflow execution. This will be available for tasks that have specified it as an input with the string “${workflow.input}” See tip below for more information.
metadata (dict | None) – Application specific metadata. Keys have a maximum length of 32 characters, values a maximum of 255, and there can be a maximum of 10 key-value pairs.
client_credentials (ClientCredentials | None) – Specific credentials that should be used to trigger the workflow execution. When passed will take precedence over the current credentials.
nonce (str | None) – The nonce to use to bind the session. If not provided, a new session will be created using the current credentials.
Tip
The workflow input can be available in the workflow tasks. For example, if you have a Task with function parameters then you can specify it as follows
>>> from cognite.client.data_classes import WorkflowTask, FunctionTaskParameters >>> task = WorkflowTask( ... external_id="my_workflow-task1", ... parameters=FunctionTaskParameters( ... external_id="cdf_deployed_function:my_function", ... data={"workflow_data": "${workflow.input}",}))
Tip
You can create a session via the Sessions API, using the client.iam.session.create() method.
- Returns
The created workflow execution.
- Return type
Examples
Trigger a workflow execution for the workflow “foo”, version 1:
>>> from cognite.client import CogniteClient >>> client = CogniteClient() >>> res = client.workflows.executions.run("foo", "1")
Trigger a workflow execution with input data:
>>> res = client.workflows.executions.run("foo", "1", input={"a": 1, "b": 2})
Trigger a workflow execution using a specific set of client credentials (i.e. not your current credentials):
>>> import os >>> from cognite.client.data_classes import ClientCredentials >>> credentials = ClientCredentials("my-client-id", os.environ["MY_CLIENT_SECRET"]) >>> res = client.workflows.executions.run("foo", "1", client_credentials=credentials)
Trigger Workflow Execution
- WorkflowExecutionAPI.trigger(workflow_external_id: str, version: str, input: dict | None = None, metadata: dict | None = None, client_credentials: ClientCredentials | None = None) WorkflowExecution
[DEPRECATED]Trigger a workflow execution.
This method is deprecated, use ‘.run’ instead. It will be completely removed October 2024.
- Parameters
workflow_external_id (str) – External id of the workflow.
version (str) – Version of the workflow.
input (dict | None) – The input to the workflow execution. This will be available for tasks that have specified it as an input with the string “${workflow.input}” See tip below for more information.
metadata (dict | None) – Application specific metadata. Keys have a maximum length of 32 characters, values a maximum of 255, and there can be a maximum of 10 key-value pairs.
client_credentials (ClientCredentials | None) – Specific credentials that should be used to trigger the workflow execution. When passed will take precedence over the current credentials.
- Returns
No description.
- Return type
Cancel Workflow Execution
- WorkflowExecutionAPI.cancel(id: str, reason: str | None) WorkflowExecution
-
- Parameters
id (str) – The server-generated id of the workflow execution.
reason (str | None) – The reason for the cancellation, this will be put within the execution’s reasonForIncompletion field. It is defaulted to ‘cancelled’ if not provided.
Note
Cancelling a workflow will immediately cancel the in_progress tasks, but not their spawned work in other services (like transformations and functions).
- Returns
The canceled workflow execution.
- Return type
Examples
Trigger a workflow execution for the workflow “foo”, version 1 and cancel it:
>>> from cognite.client import CogniteClient >>> client = CogniteClient() >>> res = client.workflows.executions.run("foo", "1") >>> client.workflows.executions.cancel(id="foo", reason="test cancellation")
Retry Workflow Execution
- WorkflowExecutionAPI.retry(id: str, client_credentials: ClientCredentials | None = None) WorkflowExecution
-
- Parameters
id (str) – The server-generated id of the workflow execution.
client_credentials (ClientCredentials | None) – Specific credentials that should be used to retry the workflow execution. When passed will take precedence over the current credentials.
- Returns
The retried workflow execution.
- Return type
Examples
Retry a workflow execution that has been cancelled or failed:
>>> from cognite.client import CogniteClient >>> client = CogniteClient() >>> res = client.workflows.executions.run("foo", "1") >>> client.workflows.executions.cancel(id=res.id, reason="test cancellation") >>> client.workflows.executions.retry(res.id)
Workflow Tasks
Update Status of Async Task
- WorkflowTaskAPI.update(task_id: str, status: Literal['completed', 'failed'], output: Optional[dict] = None) WorkflowTaskExecution
-
For tasks that has been marked with ‘is_async = True’, the status must be updated by calling this endpoint with either ‘completed’ or ‘failed’.
- Parameters
task_id (str) – The server-generated id of the task.
status (Literal['completed', 'failed']) – The new status of the task. Must be either ‘completed’ or ‘failed’.
output (dict | None) – The output of the task. This will be available for tasks that has specified it as an output with the string “${<taskExternalId>.output}”
- Returns
The updated task execution.
- Return type
Examples
Update task with UUID ‘000560bc-9080-4286-b242-a27bb4819253’ to status ‘completed’:
>>> from cognite.client import CogniteClient >>> client = CogniteClient() >>> res = client.workflows.tasks.update("000560bc-9080-4286-b242-a27bb4819253", "completed")
Update task with UUID ‘000560bc-9080-4286-b242-a27bb4819253’ to status ‘failed’ with output ‘{“a”: 1, “b”: 2}’:
>>> from cognite.client import CogniteClient >>> client = CogniteClient() >>> res = client.workflows.tasks.update("000560bc-9080-4286-b242-a27bb4819253", "failed", output={"a": 1, "b": 2})
Trigger workflow, retrieve detailed task execution and update status of the second task (assumed to be async) to ‘completed’:
>>> from cognite.client import CogniteClient >>> client = CogniteClient() >>> res = client.workflows.executions.run("my workflow", "1") >>> res = client.workflows.executions.retrieve_detailed(res.id) >>> res = client.workflows.tasks.update(res.tasks[1].id, "completed")
Workflow Triggers
Create or update triggers for workflow executions
- WorkflowTriggerAPI.upsert(workflow_trigger: WorkflowTriggerUpsert, client_credentials: ClientCredentials | dict | None = None) WorkflowTrigger
Create or update a trigger for a workflow.
- Parameters
workflow_trigger (WorkflowTriggerUpsert) – The workflow trigger specification.
client_credentials (ClientCredentials | dict | None) – Specific credentials that should be used to trigger the workflow execution. When passed will take precedence over the current credentials.
- Returns
The created or updated workflow trigger specification.
- Return type
Examples
Create or update a scheduled trigger for a workflow:
>>> from cognite.client import CogniteClient >>> from cognite.client.data_classes.workflows import WorkflowTriggerUpsert, WorkflowScheduledTriggerRule >>> client = CogniteClient() >>> client.workflows.triggers.upsert( ... WorkflowTriggerUpsert( ... external_id="my_trigger", ... trigger_rule=WorkflowScheduledTriggerRule(cron_expression="0 0 * * *"), ... workflow_external_id="my_workflow", ... workflow_version="1", ... input={"a": 1, "b": 2}, ... metadata={"key": "value"}, ... ) ... )
- Create or update a data modeling trigger for a workflow:
>>> from cognite.client.data_classes.workflows import WorkflowDataModelingTriggerRule, WorkflowTriggerDataModelingQuery >>> from cognite.client.data_classes.data_modeling.query import NodeResultSetExpression, Select, SourceSelector >>> from cognite.client.data_classes.data_modeling import ViewId >>> from cognite.client.data_classes.filters import Equals >>> from cognite.client import CogniteClient >>> client = CogniteClient() >>> view_id = ViewId("my_space_id", "view_external_id", "v1") >>> client.workflows.triggers.upsert( ... WorkflowTriggerUpsert( ... external_id="my_trigger", ... trigger_rule=WorkflowDataModelingTriggerRule( ... data_modeling_query=WorkflowTriggerDataModelingQuery( ... with_={"timeseries": NodeResultSetExpression(filter=Equals(view_id.as_property_ref("name"), value="my_name"))}, ... select={"timeseries": Select([SourceSelector(view_id, ["name"])])}, ... ), ... batch_size=500, ... batch_timeout=300, ... ), ... workflow_external_id="my_workflow", ... workflow_version="1", ... ) ... )
Create triggers for workflow executions
- WorkflowTriggerAPI.create(workflow_trigger: WorkflowTriggerCreate, client_credentials: ClientCredentials | dict | None = None) WorkflowTrigger
[DEPRECATED] Create or update a trigger for a workflow.
This method is deprecated, use ‘.upsert’ instead. It will be completely removed October 2024.
- Parameters
workflow_trigger (WorkflowTriggerCreate) – The workflow trigger specification.
client_credentials (ClientCredentials | dict | None) – Specific credentials that should be used to trigger the workflow execution. When passed will take precedence over the current credentials.
- Returns
The created or updated workflow trigger specification.
- Return type
Examples
Create or update a scheduled trigger for a workflow:
>>> from cognite.client import CogniteClient >>> from cognite.client.data_classes.workflows import WorkflowTriggerCreate, WorkflowScheduledTriggerRule >>> client = CogniteClient() >>> client.workflows.triggers.create( ... WorkflowTriggerCreate( ... external_id="my_trigger", ... trigger_rule=WorkflowScheduledTriggerRule(cron_expression="0 0 * * *"), ... workflow_external_id="my_workflow", ... workflow_version="1", ... input={"a": 1, "b": 2}, ... ) ... )
Delete triggers for workflow executions
- WorkflowTriggerAPI.delete(external_id: str) None
Delete a trigger for a workflow.
- Parameters
external_id (str) – The external id of the trigger to delete.
Examples
Delete a trigger with external id ‘my_trigger’:
>>> from cognite.client import CogniteClient >>> client = CogniteClient() >>> client.workflows.triggers.delete("my_trigger")
Get triggers for workflow executions
- WorkflowTriggerAPI.get_triggers(limit: int = 25) WorkflowTriggerList
-
- Parameters
limit (int) – Maximum number of results to return. Defaults to 25. Set to -1, float(“inf”) or None to return all items.
- Returns
The trigger list.
- Return type
Examples
Get all triggers:
>>> from cognite.client import CogniteClient >>> client = CogniteClient() >>> res = client.workflows.triggers.get_triggers()
Get trigger run history for a workflow trigger
- WorkflowTriggerAPI.get_trigger_run_history(external_id: str, limit: int = 25) WorkflowTriggerRunList
List the history of runs for a trigger.
- Parameters
external_id (str) – The external id of the trigger to list runs for.
limit (int) – Maximum number of results to return. Defaults to 25. Set to -1, float(“inf”) or None to return all items.
- Returns
The requested trigger runs.
- Return type
Examples
Get all runs for a trigger with external id ‘my_trigger’:
>>> from cognite.client import CogniteClient >>> client = CogniteClient() >>> res = client.workflows.triggers.get_trigger_run_history("my_trigger")
Data Workflows data classes
- class cognite.client.data_classes.workflows.CDFTaskOutput(response: str | dict | None, status_code: int | None)
Bases:
WorkflowTaskOutput
The CDF Request output is used to specify the output of a CDF Request.
- Parameters
response (str | dict | None) – The response of the CDF Request. Will be a JSON object if content-type is application/json, otherwise will be a string.
status_code (int | None) – The status code of the CDF Request.
- class cognite.client.data_classes.workflows.CDFTaskParameters(resource_path: str, method: Union[Literal['GET', 'POST', 'PUT', 'DELETE'], str], query_parameters: Optional[Union[dict, str]] = None, body: Optional[Union[dict, str]] = None, request_timeout_in_millis: int | str = 10000)
Bases:
WorkflowTaskParameters
The CDF request parameters are used to specify a request to the Cognite Data Fusion API.
- Parameters
resource_path (str) – The resource path of the request. Note the path of the request which is prefixed by ‘{cluster}.cognitedata.com/api/v1/project/{project}’ based on the cluster and project of the request.
method (Literal['GET', 'POST', 'PUT', 'DELETE'] | str) – The HTTP method of the request.
query_parameters (dict | str | None) – The query parameters of the request. Defaults to None.
body (dict | str | None) – The body of the request. Defaults to None. Limited to 1024KiB in size
request_timeout_in_millis (int | str) – The timeout of the request in milliseconds. Defaults to 10000.
Examples
Call the asset/list endpoint with a limit of 10:
>>> from cognite.client.data_classes import WorkflowTask, CDFTaskParameters >>> task = WorkflowTask( ... external_id="task1", ... parameters=CDFTaskParameters( ... resource_path="/assets/list", ... method="GET", ... query_parameters={"limit": 10}, ... ), ... )
- dump(camel_case: bool = True) dict[str, Any]
Dump the instance into a json serializable Python data type.
- Parameters
camel_case (bool) – Use camelCase for attribute names. Defaults to True.
- Returns
A dictionary representation of the instance.
- Return type
dict[str, Any]
- class cognite.client.data_classes.workflows.DynamicTaskOutput
Bases:
WorkflowTaskOutput
The dynamic task output is used to specify the output of a dynamic task.
- class cognite.client.data_classes.workflows.DynamicTaskParameters(tasks: list[cognite.client.data_classes.workflows.WorkflowTask] | str)
Bases:
WorkflowTaskParameters
The dynamic task parameters are used to specify a dynamic task.
When the tasks and their order of execution are determined at runtime, we use dynamic tasks. It takes the tasks parameter which is a Reference to an array of function, transformation, and cdf task definitions. This array should be generated and returned by a previous step in the workflow, for instance, a Cognite Function task.
Tip
You can reference data from other tasks or the workflow. You do this by following the format ${prefix.jsonPath} in the expression. Some valid option are:
${workflow.input}: The workflow input.
${<taskExternalId>.output}: The output of the task with the given external id.
${<taskExternalId>.input}: The input of the task with the given external id.
${<taskExternalId>.input.someKey}: A specific key within the input of the task with the given external id.
- Parameters
tasks (list[WorkflowTask] | str) – The tasks to be dynamically executed. The dynamic task is a string that is evaluated during the workflow’s execution. When calling Version Upsert, the tasks parameter must be a Reference string. When calling Execution details, the tasks parameter will be a list of WorkflowTask objects.
- dump(camel_case: bool = True) dict[str, Any]
Dump the instance into a json serializable Python data type.
- Parameters
camel_case (bool) – Use camelCase for attribute names. Defaults to True.
- Returns
A dictionary representation of the instance.
- Return type
dict[str, Any]
- class cognite.client.data_classes.workflows.FunctionTaskOutput(call_id: int | None, function_id: int | None, response: dict | None)
Bases:
WorkflowTaskOutput
The class represent the output of Cognite Function task.
- Parameters
call_id (int | None) – The call_id of the CDF Function call.
function_id (int | None) – The function_id of the CDF Function.
response (dict | None) – The response of the CDF Function call.
- class cognite.client.data_classes.workflows.FunctionTaskParameters(external_id: str, data: Optional[Union[dict, str]] = None, is_async_complete: Optional[bool] = None)
Bases:
WorkflowTaskParameters
The function parameters are used to specify the Cognite Function to be called.
- Parameters
external_id (str) – The external ID of the function to be called.
data (dict | str | None) – The data to be passed to the function. Defaults to None. The data can be used to specify the input to the function from previous tasks or the workflow input. See the tip below for more information.
is_async_complete (bool | None) – Whether the function is asynchronous. Defaults to None, which the API will interpret as False.
If a function is asynchronous, you need to call the client.workflows.tasks.update() endpoint to update the status of the task. While synchronous tasks update the status automatically.
Tip
You can dynamically specify data from other tasks or the workflow. You do this by following the format ${prefix.jsonPath} in the expression. The valid are:
${workflow.input}: The workflow input.
${<taskExternalId>.output}: The output of the task with the given external id.
${<taskExternalId>.input}: The input of the task with the given external id.
${<taskExternalId>.input.someKey}: A specific key within the input of the task with the given external id.
For example, if you have a workflow containing two tasks, and the external_id of the first task is task1 then, you can specify the data for the second task as follows:
>>> from cognite.client.data_classes import WorkflowTask, FunctionTaskParameters >>> task = WorkflowTask( ... external_id="task2", ... parameters=FunctionTaskParameters( ... external_id="cdf_deployed_function", ... data={ ... "workflow_data": "${workflow.input}", ... "task1_input": "${task1.input}", ... "task1_output": "${task1.output}" ... }, ... ), ... )
- dump(camel_case: bool = True) dict[str, Any]
Dump the instance into a json serializable Python data type.
- Parameters
camel_case (bool) – Use camelCase for attribute names. Defaults to True.
- Returns
A dictionary representation of the instance.
- Return type
dict[str, Any]
- class cognite.client.data_classes.workflows.SubworkflowReferenceParameters(workflow_external_id: str, version: str)
Bases:
WorkflowTaskParameters
The subworkflow task parameters are used to specify a subworkflow task. When a workflow is made of stages with dependencies between them, we can use subworkflow tasks for convenience. The subworkflow reference is used to specifying a reference to another workflow which will be embedded into the execution at start time.
- Parameters
workflow_external_id (str) – The external ID of the referenced workflow.
version (str) – The version of the referenced workflow.
- dump(camel_case: bool = True) dict[str, Any]
Dump the instance into a json serializable Python data type.
- Parameters
camel_case (bool) – Use camelCase for attribute names. Defaults to True.
- Returns
A dictionary representation of the instance.
- Return type
dict[str, Any]
- class cognite.client.data_classes.workflows.SubworkflowTaskOutput
Bases:
WorkflowTaskOutput
The subworkflow task output is used to specify the output of a subworkflow task.
- class cognite.client.data_classes.workflows.SubworkflowTaskParameters(tasks: list[cognite.client.data_classes.workflows.WorkflowTask])
Bases:
WorkflowTaskParameters
The subworkflow task parameters are used to specify a subworkflow task.
When a workflow is made of stages with dependencies between them, we can use subworkflow tasks for convenience. It takes the tasks parameter which is an array of function, transformation, cdf, …, task definitions. This array needs to be statically set on the workflow definition (if it needs to be defined at runtime, use a dynamic task).
- Parameters
tasks (list[WorkflowTask]) – The tasks belonging to the subworkflow.
- dump(camel_case: bool = True) dict[str, Any]
Dump the instance into a json serializable Python data type.
- Parameters
camel_case (bool) – Use camelCase for attribute names. Defaults to True.
- Returns
A dictionary representation of the instance.
- Return type
dict[str, Any]
- class cognite.client.data_classes.workflows.TransformationTaskOutput(job_id: int | None)
Bases:
WorkflowTaskOutput
The transformation output is used to specify the output of a transformation task.
- Parameters
job_id (int | None) – The job id of the transformation job.
- class cognite.client.data_classes.workflows.TransformationTaskParameters(external_id: str, concurrency_policy: Literal['fail', 'restartAfterCurrent', 'waitForCurrent'] = 'fail')
Bases:
WorkflowTaskParameters
The transformation parameters are used to specify the transformation to be called.
- Parameters
external_id (str) – The external ID of the transformation to be called.
concurrency_policy (Literal['fail', 'restartAfterCurrent', 'waitForCurrent']) – Determines the behavior of the task if the Transformation is already running.
fail
: The task fails if another instance of the Transformation is currently running.waitForCurrent
: The task will pause and wait for the already running Transformation to complete. Once completed, the task is completed. This mode is useful for preventing redundant Transformation runs.restartAfterCurrent
: The task waits for the ongoing Transformation to finish. After completion, the task restarts the Transformation. This mode ensures that the most recent data can be used by following tasks.
- dump(camel_case: bool = True) dict[str, Any]
Dump the instance into a json serializable Python data type.
- Parameters
camel_case (bool) – Use camelCase for attribute names. Defaults to True.
- Returns
A dictionary representation of the instance.
- Return type
dict[str, Any]
- class cognite.client.data_classes.workflows.Workflow(external_id: str, created_time: int, description: Optional[str] = None, data_set_id: Optional[int] = None)
Bases:
WorkflowCore
This class represents a workflow. This is the reading version, used when reading or listing a workflows.
- Parameters
external_id (str) – The external ID provided by the client. Must be unique for the resource type.
created_time (int) – The time when the workflow was created. Unix timestamp in milliseconds.
description (str | None) – Description of the workflow. Defaults to None.
data_set_id (int | None) – The id of the data set this workflow belongs to.
- as_write() WorkflowUpsert
Returns this workflow in the writing format.
- class cognite.client.data_classes.workflows.WorkflowCore(external_id: str, description: Optional[str] = None, data_set_id: Optional[int] = None)
Bases:
WriteableCogniteResource
[WorkflowUpsert
],ABC
- class cognite.client.data_classes.workflows.WorkflowDataModelingTriggerRule(data_modeling_query: WorkflowTriggerDataModelingQuery, batch_size: Optional[int] = None, batch_timeout: Optional[int] = None)
Bases:
WorkflowTriggerRule
This class represents a data modeling trigger rule.
- Parameters
data_modeling_query (WorkflowTriggerDataModelingQuery) – The data modeling query of the trigger.
batch_size (int | None) – The batch size of the trigger.
batch_timeout (int | None) – The batch timeout of the trigger.
- dump(camel_case: bool = True) dict[str, Any]
Dump the instance into a json serializable Python data type.
- Parameters
camel_case (bool) – Use camelCase for attribute names. Defaults to True.
- Returns
A dictionary representation of the instance.
- Return type
dict[str, Any]
- class cognite.client.data_classes.workflows.WorkflowDefinition(hash_: str, tasks: list[cognite.client.data_classes.workflows.WorkflowTask], description: Optional[str] = None)
Bases:
WorkflowDefinitionCore
This class represents a workflow definition. This represents the read version of a workflow definition.
A workflow definition defines the tasks and order/dependencies of these tasks.
- Parameters
hash (str) – The hash of the tasks and description. This is used to uniquely identify the workflow definition as you can overwrite a workflow version.
tasks (list[WorkflowTask]) – The tasks of the workflow definition.
description (str | None) – The description of the workflow definition. Defaults to None.
- as_write() WorkflowDefinitionUpsert
Returns this WorkflowDefinition in its write format.
- dump(camel_case: bool = True) dict[str, Any]
Dump the instance into a json serializable Python data type.
- Parameters
camel_case (bool) – Use camelCase for attribute names. Defaults to True.
- Returns
A dictionary representation of the instance.
- Return type
dict[str, Any]
- class cognite.client.data_classes.workflows.WorkflowDefinitionCore(tasks: list[cognite.client.data_classes.workflows.WorkflowTask], description: Optional[str] = None)
Bases:
WriteableCogniteResource
[WorkflowDefinitionUpsert
],ABC
This class represents a workflow definition.
A workflow definition defines the tasks and order/dependencies of these tasks.
- Parameters
tasks (list[WorkflowTask]) – The tasks of the workflow definition.
description (str | None) – The description of the workflow definition. Note that when updating a workflow definition description, it will always be overwritten also if it is set to None. Meaning if the workflow definition already has a description, and you want to keep it, you need to provide the description when updating it.
- dump(camel_case: bool = True) dict[str, Any]
Dump the instance into a json serializable Python data type.
- Parameters
camel_case (bool) – Use camelCase for attribute names. Defaults to True.
- Returns
A dictionary representation of the instance.
- Return type
dict[str, Any]
- class cognite.client.data_classes.workflows.WorkflowDefinitionUpsert(tasks: list[cognite.client.data_classes.workflows.WorkflowTask], description: Optional[str] = None)
Bases:
WorkflowDefinitionCore
This class represents a workflow definition. This represents the write/update version of a workflow definition.
A workflow definition defines the tasks and order/dependencies of these tasks.
- Parameters
tasks (list[WorkflowTask]) – The tasks of the workflow definition.
description (str | None) – The description of the workflow definition. Note that when updating a workflow definition description, it will always be overwritten also if it is set to None. Meaning if the workflow definition already has a description, and you want to keep it, you need to provide the description when updating it.
- as_write() WorkflowDefinitionUpsert
Returns this WorkflowDefinitionUpsert in its write format.
- dump(camel_case: bool = True) dict[str, Any]
Dump the instance into a json serializable Python data type.
- Parameters
camel_case (bool) – Use camelCase for attribute names. Defaults to True.
- Returns
A dictionary representation of the instance.
- Return type
dict[str, Any]
- class cognite.client.data_classes.workflows.WorkflowExecution(id: str, workflow_external_id: str, status: Literal['completed', 'failed', 'running', 'terminated', 'timed_out'], created_time: int, version: Optional[str] = None, start_time: Optional[int] = None, end_time: Optional[int] = None, reason_for_incompletion: Optional[str] = None, metadata: Optional[dict] = None)
Bases:
CogniteResource
This class represents a workflow execution.
- Parameters
id (str) – The server generated id of the workflow execution.
workflow_external_id (str) – The external ID of the workflow.
status (WorkflowStatus) – The status of the workflow execution.
created_time (int) – The time when the workflow execution was created. Unix timestamp in milliseconds.
version (str | None) – The version of the workflow. Defaults to None.
start_time (int | None) – The start time of the workflow execution. Unix timestamp in milliseconds. Defaults to None.
end_time (int | None) – The end time of the workflow execution. Unix timestamp in milliseconds. Defaults to None.
reason_for_incompletion (str | None) – Provides the reason if the workflow did not complete successfully. Defaults to None.
metadata (dict | None) – Application specific metadata.
- class cognite.client.data_classes.workflows.WorkflowExecutionDetailed(id: str, workflow_external_id: str, workflow_definition: WorkflowDefinition, status: Literal['completed', 'failed', 'running', 'terminated', 'timed_out'], executed_tasks: list[cognite.client.data_classes.workflows.WorkflowTaskExecution], created_time: int, version: Optional[str] = None, start_time: Optional[int] = None, end_time: Optional[int] = None, reason_for_incompletion: Optional[str] = None, input: Optional[dict] = None, metadata: Optional[dict] = None)
Bases:
WorkflowExecution
This class represents a detailed workflow execution.
A detailed workflow execution contains the input and output of each task in the workflow execution. In addition, it contains the workflow definition of the workflow.
- Parameters
id (str) – The server generated id of the workflow execution.
workflow_external_id (str) – The external ID of the workflow.
workflow_definition (WorkflowDefinition) – The workflow definition of the workflow.
status (WorkflowStatus) – The status of the workflow execution.
executed_tasks (list[WorkflowTaskExecution]) – The executed tasks of the workflow execution.
created_time (int) – The time when the workflow execution was created. Unix timestamp in milliseconds.
version (str | None) – The version of the workflow. Defaults to None.
start_time (int | None) – The start time of the workflow execution. Unix timestamp in milliseconds. Defaults to None.
end_time (int | None) – The end time of the workflow execution. Unix timestamp in milliseconds. Defaults to None.
reason_for_incompletion (str | None) – Provides the reason if the workflow did not complete successfully. Defaults to None.
input (dict | None) – Input arguments the workflow was triggered with.
metadata (dict | None) – Metadata set when the workflow was triggered.
- dump(camel_case: bool = True) dict[str, Any]
Dump the instance into a json serializable Python data type.
- Parameters
camel_case (bool) – Use camelCase for attribute names. Defaults to True.
- Returns
A dictionary representation of the instance.
- Return type
dict[str, Any]
- class cognite.client.data_classes.workflows.WorkflowExecutionList(resources: Iterable[Any], cognite_client: CogniteClient | None = None)
Bases:
CogniteResourceList
[WorkflowExecution
],InternalIdTransformerMixin
This class represents a list of workflow executions.
- class cognite.client.data_classes.workflows.WorkflowIds(workflow_ids: Collection[WorkflowVersionId])
Bases:
UserList
This class represents a list of Workflow Version Identifiers.
- class cognite.client.data_classes.workflows.WorkflowList(resources: Iterable[Any], cognite_client: CogniteClient | None = None)
Bases:
WriteableCogniteResourceList
[WorkflowUpsert
,Workflow
],ExternalIDTransformerMixin
This class represents a list of workflows.
- as_write() WorkflowUpsertList
Returns these workflows in the writing format.
- class cognite.client.data_classes.workflows.WorkflowScheduledTriggerRule(cron_expression: str)
Bases:
WorkflowTriggerRule
This class represents a scheduled trigger rule.
- Parameters
cron_expression (str) – The cron specification for the scheduled trigger.
- class cognite.client.data_classes.workflows.WorkflowTask(external_id: str, parameters: WorkflowTaskParameters, name: Optional[str] = None, description: Optional[str] = None, retries: int = 3, timeout: int = 3600, on_failure: Literal['abortWorkflow', 'skipTask'] = 'abortWorkflow', depends_on: Optional[list[str]] = None)
Bases:
CogniteResource
This class represents a workflow task.
Note
Tasks do not distinguish between write and read versions.
- Parameters
external_id (str) – The external ID provided by the client. Must be unique for the resource type.
parameters (WorkflowTaskParameters) – The parameters of the task.
name (str | None) – The name of the task. Defaults to None.
description (str | None) – The description of the task. Defaults to None.
retries (int) – The number of retries for the task. Defaults to 3.
timeout (int) – The timeout of the task in seconds. Defaults to 3600.
on_failure (Literal['abortWorkflow', 'skipTask']) – The policy to handle failures and timeouts. Defaults to abortWorkflow.
skipTask
: For both failures and timeouts, the task will retry until the retries are exhausted. After that, the Task is marked as COMPLETED_WITH_ERRORS and the subsequent tasks are executed.abortWorkflow
: In case of failures, retries will be performed until exhausted. After which the task is marked as FAILED and the Workflow is marked the same. In the event of a timeout, no retries are undertaken; the task is marked as TIMED_OUT and the Workflow is marked as FAILED.depends_on (list[str] | None) – The external ids of the tasks that this task depends on. Defaults to None.
- dump(camel_case: bool = True) dict[str, Any]
Dump the instance into a json serializable Python data type.
- Parameters
camel_case (bool) – Use camelCase for attribute names. Defaults to True.
- Returns
A dictionary representation of the instance.
- Return type
dict[str, Any]
- class cognite.client.data_classes.workflows.WorkflowTaskExecution(id: str, external_id: str, status: Literal['in_progress', 'cancelled', 'failed', 'failed_with_terminal_error', 'completed', 'completed_with_errors', 'timed_out', 'skipped'], input: WorkflowTaskParameters, output: WorkflowTaskOutput, version: Optional[str] = None, start_time: Optional[int] = None, end_time: Optional[int] = None, reason_for_incompletion: Optional[str] = None)
Bases:
CogniteObject
This class represents a task execution.
- Parameters
id (str) – The server generated id of the task execution.
external_id (str) – The external ID provided by the client. Must be unique for the resource type.
status (TaskStatus) – The status of the task execution.
input (WorkflowTaskParameters) – The input parameters of the task execution.
output (WorkflowTaskOutput) – The output of the task execution.
version (str | None) – The version of the task execution. Defaults to None.
start_time (int | None) – The start time of the task execution. Unix timestamp in milliseconds. Defaults to None.
end_time (int | None) – The end time of the task execution. Unix timestamp in milliseconds. Defaults to None.
reason_for_incompletion (str | None) – Provides the reason if the workflow did not complete successfully. Defaults to None.
- dump(camel_case: bool = True) dict[str, Any]
Dump the instance into a json serializable Python data type.
- Parameters
camel_case (bool) – Use camelCase for attribute names. Defaults to True.
- Returns
A dictionary representation of the instance.
- Return type
dict[str, Any]
- class cognite.client.data_classes.workflows.WorkflowTaskOutput
Bases:
ABC
- class cognite.client.data_classes.workflows.WorkflowTaskParameters
Bases:
CogniteObject
,ABC
- class cognite.client.data_classes.workflows.WorkflowTrigger(external_id: str, trigger_rule: WorkflowTriggerRule, workflow_external_id: str, workflow_version: str, input: Optional[dict] = None, metadata: Optional[dict] = None, created_time: Optional[int] = None, last_updated_time: Optional[int] = None)
Bases:
WorkflowTriggerCore
This class represents a workflow trigger.
- Parameters
external_id (str) – The external ID provided by the client. Must be unique for the resource type.
trigger_rule (WorkflowTriggerRule) – The trigger rule of the workflow version trigger.
workflow_external_id (str) – The external ID of the workflow.
workflow_version (str) – The version of the workflow.
input (dict | None) – The input data passed to the workflow when an execution is started. Defaults to None.
metadata (dict | None) – Application specific metadata. Defaults to None.
created_time (int | None) – The time when the workflow version trigger was created. Unix timestamp in milliseconds. Defaults to None.
last_updated_time (int | None) – The time when the workflow version trigger was last updated. Unix timestamp in milliseconds. Defaults to None.
- as_write() WorkflowTriggerUpsert
Returns this workflow trigger instance.
- dump(camel_case: bool = True) dict[str, Any]
Dump the instance into a json serializable Python data type.
- Parameters
camel_case (bool) – Use camelCase for attribute names. Defaults to True.
- Returns
A dictionary representation of the instance.
- Return type
dict[str, Any]
- class cognite.client.data_classes.workflows.WorkflowTriggerCore(external_id: str, trigger_rule: WorkflowTriggerRule, workflow_external_id: str, workflow_version: str, input: Optional[dict] = None, metadata: Optional[dict] = None)
Bases:
WriteableCogniteResource
[WorkflowTriggerUpsert
],ABC
This class represents a base class for a workflow trigger.
- Parameters
external_id (str) – The external ID provided by the client. Must be unique for the resource type.
trigger_rule (WorkflowTriggerRule) – The trigger rule of the workflow version trigger.
workflow_external_id (str) – The external ID of the workflow.
workflow_version (str) – The version of the workflow.
input (dict | None) – The input data of the workflow version trigger. Defaults to None.
metadata (dict | None) – Application specific metadata. Defaults to None.
- cognite.client.data_classes.workflows.WorkflowTriggerCreate
alias of
WorkflowTriggerUpsert
- class cognite.client.data_classes.workflows.WorkflowTriggerDataModelingQuery(with_: dict[str, cognite.client.data_classes.data_modeling.query.ResultSetExpression], select: dict[str, cognite.client.data_classes.data_modeling.query.Select])
Bases:
Query
This class represents a data modeling trigger query.
- Parameters
with (dict[str, ResultSetExpression]) – A dictionary of result set expressions to use in the query. The keys are used to reference the result set expressions in the select.
select (dict[str, Select]) – A dictionary of select expressions to use in the query. The keys must match the keys in the with_ dictionary. The select expressions define which properties to include in the result set.
- class cognite.client.data_classes.workflows.WorkflowTriggerList(resources: Iterable[Any], cognite_client: CogniteClient | None = None)
Bases:
WriteableCogniteResourceList
[WorkflowTriggerUpsert
,WorkflowTrigger
],ExternalIDTransformerMixin
This class represents a list of workflow triggers.
- as_write() WorkflowTriggerUpsertList
Returns a WorkflowTriggerUpsertList object with the same data.
- class cognite.client.data_classes.workflows.WorkflowTriggerRule
Bases:
CogniteObject
,ABC
This is the base class for all workflow trigger rules.
- dump(camel_case: bool = True) dict[str, Any]
Dump the instance into a json serializable Python data type.
- Parameters
camel_case (bool) – Use camelCase for attribute names. Defaults to True.
- Returns
A dictionary representation of the instance.
- Return type
dict[str, Any]
- class cognite.client.data_classes.workflows.WorkflowTriggerRun(external_id: str, fire_time: int, workflow_external_id: str, workflow_version: str, status: Literal['success', 'failed'], workflow_execution_id: Optional[str] = None, reason_for_failure: Optional[str] = None)
Bases:
CogniteResource
This class represents a workflow trigger run.
- dump(camel_case: bool = True) dict[str, Any]
Dump the instance into a json serializable Python data type.
- Parameters
camel_case (bool) – Use camelCase for attribute names. Defaults to True.
- Returns
A dictionary representation of the instance.
- Return type
dict[str, Any]
- class cognite.client.data_classes.workflows.WorkflowTriggerRunList(resources: Iterable[Any], cognite_client: CogniteClient | None = None)
Bases:
CogniteResourceList
[WorkflowTriggerRun
],ExternalIDTransformerMixin
This class represents a list of workflow trigger runs.
- class cognite.client.data_classes.workflows.WorkflowTriggerUpsert(external_id: str, trigger_rule: WorkflowTriggerRule, workflow_external_id: str, workflow_version: str, input: Optional[dict] = None, metadata: Optional[dict] = None)
Bases:
WorkflowTriggerCore
This class represents a workflow trigger for upsertion.
- Parameters
external_id (str) – The external ID provided by the client. Must be unique for the resource type.
trigger_rule (WorkflowTriggerRule) – The trigger rule of the workflow version trigger.
workflow_external_id (str) – The external ID of the workflow.
workflow_version (str) – The version of the workflow.
input (dict | None) – The input data of the workflow version trigger. Defaults to None.
metadata (dict | None) – Application specific metadata. Defaults to None.
- as_write() WorkflowTriggerUpsert
Returns this workflow trigger create instance.
- dump(camel_case: bool = True) dict[str, Any]
Dump the instance into a json serializable Python data type.
- Parameters
camel_case (bool) – Use camelCase for attribute names. Defaults to True.
- Returns
A dictionary representation of the instance.
- Return type
dict[str, Any]
- class cognite.client.data_classes.workflows.WorkflowTriggerUpsertList(resources: Iterable[Any], cognite_client: CogniteClient | None = None)
Bases:
CogniteResourceList
[WorkflowTriggerUpsert
],ExternalIDTransformerMixin
- class cognite.client.data_classes.workflows.WorkflowUpsert(external_id: str, description: Optional[str] = None, data_set_id: Optional[int] = None)
Bases:
WorkflowCore
This class represents a workflow. This is the write version, used when creating or updating a workflow.
- Parameters
external_id (str) – The external ID provided by the client. Must be unique for the resource type.
description (str | None) – Description of the workflow. Note that when updating a workflow, the description will always be overwritten also if it is set to None. Meaning if the workflow already has a description, and you want to keep it, you need to provide the description when updating the workflow.
data_set_id (int | None) – The id of the data set this workflow belongs to. If a dataSetId is provided, any operations on this workflow, or its versions, executions, and triggers will require appropriate access to the data set. More information on data sets and their configuration can be found here: https://docs.cognite.com/cdf/data_governance/concepts/datasets/
- as_write() WorkflowUpsert
Returns this workflow instance.
- class cognite.client.data_classes.workflows.WorkflowUpsertList(resources: Iterable[Any], cognite_client: CogniteClient | None = None)
Bases:
CogniteResourceList
[WorkflowUpsert
],ExternalIDTransformerMixin
- class cognite.client.data_classes.workflows.WorkflowVersion(workflow_external_id: str, version: str, workflow_definition: WorkflowDefinition)
Bases:
WorkflowVersionCore
This class represents a workflow version. This is the read variant, used when retrieving/listing a workflow variant.
- Parameters
workflow_external_id (str) – The external ID of the workflow.
version (str) – The version of the workflow.
workflow_definition (WorkflowDefinition) – The workflow definition of the workflow version.
- as_write() WorkflowVersionUpsert
Returns a WorkflowVersionUpsert object with the same data.
- dump(camel_case: bool = True) dict[str, Any]
Dump the instance into a json serializable Python data type.
- Parameters
camel_case (bool) – Use camelCase for attribute names. Defaults to True.
- Returns
A dictionary representation of the instance.
- Return type
dict[str, Any]
- class cognite.client.data_classes.workflows.WorkflowVersionCore(workflow_external_id: str, version: str)
Bases:
WriteableCogniteResource
[WorkflowVersionUpsert
],ABC
This class represents a workflow version.
- Parameters
workflow_external_id (str) – The external ID of the workflow.
version (str) – The version of the workflow.
- class cognite.client.data_classes.workflows.WorkflowVersionId(workflow_external_id: str, version: Optional[str] = None)
Bases:
object
This class represents a Workflow Version Identifier.
- Parameters
workflow_external_id (str) – The external ID of the workflow.
version (str, optional) – The version of the workflow. Defaults to None.
- class cognite.client.data_classes.workflows.WorkflowVersionList(resources: Iterable[Any], cognite_client: CogniteClient | None = None)
Bases:
WriteableCogniteResourceList
[WorkflowVersionUpsert
,WorkflowVersion
]This class represents a list of workflow versions.
- as_ids() WorkflowIds
Returns a WorkflowIds object with the workflow version ids.
- as_write() WorkflowVersionUpsertList
Returns a WorkflowVersionUpsertList object with the same data.
- class cognite.client.data_classes.workflows.WorkflowVersionUpsert(workflow_external_id: str, version: str, workflow_definition: WorkflowDefinitionUpsert)
Bases:
WorkflowVersionCore
This class represents a workflow version. This is the write-variant, used when creating or updating a workflow variant.
- Parameters
workflow_external_id (str) – The external ID of the workflow.
version (str) – The version of the workflow.
workflow_definition (WorkflowDefinitionUpsert) – The workflow definition of the workflow version.
- as_write() WorkflowVersionUpsert
Returns this WorkflowVersionUpsert instance.
- dump(camel_case: bool = True) dict[str, Any]
Dump the instance into a json serializable Python data type.
- Parameters
camel_case (bool) – Use camelCase for attribute names. Defaults to True.
- Returns
A dictionary representation of the instance.
- Return type
dict[str, Any]
- class cognite.client.data_classes.workflows.WorkflowVersionUpsertList(resources: Iterable[Any], cognite_client: CogniteClient | None = None)
Bases:
CogniteResourceList
[WorkflowVersionUpsert
]This class represents a list of workflow versions.
- as_ids() WorkflowIds
Returns a WorkflowIds object with the workflow version ids.