Skip to content

Json

PipelineJobFromJsonConverter

Bases: ConverterInterface

Converts a json string into a Pipeline Job.

Example

from rtdip_sdk.pipelines.secrets import PipelineJobFromJsonConverter

convert_json_string_to_pipline_job = PipelineJobFromJsonConverter(
    pipeline_json = "{JSON-STRING}"
)

convert_json_string_to_pipline_job.convert()

Parameters:

Name Type Description Default
pipeline_json str

Json representing PipelineJob information, including tasks and related steps

required
Source code in src/sdk/python/rtdip_sdk/pipelines/converters/pipeline_job_json.py
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
class PipelineJobFromJsonConverter(ConverterInterface):
    """
    Converts a json string into a Pipeline Job.

    Example
    -------
    ```python
    from rtdip_sdk.pipelines.secrets import PipelineJobFromJsonConverter

    convert_json_string_to_pipline_job = PipelineJobFromJsonConverter(
        pipeline_json = "{JSON-STRING}"
    )

    convert_json_string_to_pipline_job.convert()
    ```

    Parameters:
        pipeline_json (str): Json representing PipelineJob information, including tasks and related steps
    """

    pipeline_json: str

    def __init__(self, pipeline_json: str):
        self.pipeline_json = pipeline_json

    def _try_convert_to_pipeline_secret(self, value):
        try:
            if "pipeline_secret" in value:
                value["pipeline_secret"]["type"] = getattr(
                    sys.modules[__name__], value["pipeline_secret"]["type"]
                )
            return PipelineSecret.parse_obj(value["pipeline_secret"])
        except:  # NOSONAR
            return value

    def convert(self) -> PipelineJob:
        """
        Converts a json string to a Pipeline Job
        """
        pipeline_job_dict = json.loads(self.pipeline_json)

        # convert string component to class
        for task in pipeline_job_dict["task_list"]:
            for step in task["step_list"]:
                step["component"] = getattr(sys.modules[__name__], step["component"])
                for param_key, param_value in step["component_parameters"].items():
                    step["component_parameters"][param_key] = (
                        self._try_convert_to_pipeline_secret(param_value)
                    )
                    if not isinstance(
                        step["component_parameters"][param_key], PipelineSecret
                    ) and isinstance(param_value, dict):
                        for key, value in param_value.items():
                            step["component_parameters"][param_key][key] = (
                                self._try_convert_to_pipeline_secret(value)
                            )

        return PipelineJob(**pipeline_job_dict)

convert()

Converts a json string to a Pipeline Job

Source code in src/sdk/python/rtdip_sdk/pipelines/converters/pipeline_job_json.py
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
def convert(self) -> PipelineJob:
    """
    Converts a json string to a Pipeline Job
    """
    pipeline_job_dict = json.loads(self.pipeline_json)

    # convert string component to class
    for task in pipeline_job_dict["task_list"]:
        for step in task["step_list"]:
            step["component"] = getattr(sys.modules[__name__], step["component"])
            for param_key, param_value in step["component_parameters"].items():
                step["component_parameters"][param_key] = (
                    self._try_convert_to_pipeline_secret(param_value)
                )
                if not isinstance(
                    step["component_parameters"][param_key], PipelineSecret
                ) and isinstance(param_value, dict):
                    for key, value in param_value.items():
                        step["component_parameters"][param_key][key] = (
                            self._try_convert_to_pipeline_secret(value)
                        )

    return PipelineJob(**pipeline_job_dict)

PipelineJobToJsonConverter

Bases: ConverterInterface

Converts a Pipeline Job into a json string.

Example

from rtdip_sdk.pipelines.secrets import PipelineJobToJsonConverter

convert_pipeline_job_to_json_string = PipelineJobFromJsonConverter(
    pipeline_json = PipelineJob
)

convert_pipeline_job_to_json_string.convert()

Parameters:

Name Type Description Default
pipeline_job PipelineJob

A Pipeline Job consisting of tasks and steps

required
Source code in src/sdk/python/rtdip_sdk/pipelines/converters/pipeline_job_json.py
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
class PipelineJobToJsonConverter(ConverterInterface):
    """
    Converts a Pipeline Job into a json string.

    Example
    -------
    ```python
    from rtdip_sdk.pipelines.secrets import PipelineJobToJsonConverter

    convert_pipeline_job_to_json_string = PipelineJobFromJsonConverter(
        pipeline_json = PipelineJob
    )

    convert_pipeline_job_to_json_string.convert()
    ```

    Parameters:
        pipeline_job (PipelineJob): A Pipeline Job consisting of tasks and steps
    """

    pipeline_job: PipelineJob

    def __init__(self, pipeline_job: PipelineJob):
        self.pipeline_job = pipeline_job

    def convert(self):
        """
        Converts a Pipeline Job to a json string
        """
        # required because pydantic does not use encoders in subclasses
        for task in self.pipeline_job.task_list:
            step_dict_list = []
            for step in task.step_list:
                step_dict_list.append(
                    json.loads(step.json(models_as_dict=False, exclude_none=True))
                )
            task.step_list = step_dict_list

        pipeline_job_json = self.pipeline_job.json(exclude_none=True)
        return pipeline_job_json

convert()

Converts a Pipeline Job to a json string

Source code in src/sdk/python/rtdip_sdk/pipelines/converters/pipeline_job_json.py
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
def convert(self):
    """
    Converts a Pipeline Job to a json string
    """
    # required because pydantic does not use encoders in subclasses
    for task in self.pipeline_job.task_list:
        step_dict_list = []
        for step in task.step_list:
            step_dict_list.append(
                json.loads(step.json(models_as_dict=False, exclude_none=True))
            )
        task.step_list = step_dict_list

    pipeline_job_json = self.pipeline_job.json(exclude_none=True)
    return pipeline_job_json