Skip to content

Json

PipelineJobFromJsonConverter

Bases: ConverterInterface

Converts a json string into a Pipeline Job

Parameters:

Name Type Description Default
pipeline_json str

Json representing PipelineJob information, including tasks and related steps

required
Source code in src/sdk/python/rtdip_sdk/pipelines/converters/pipeline_job_json.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
class PipelineJobFromJsonConverter(ConverterInterface):
    '''
    Converts a json string into a Pipeline Job

    Args:
        pipeline_json (str): Json representing PipelineJob information, including tasks and related steps 
    '''
    pipeline_json: str

    def __init__(self, pipeline_json: str):
        self.pipeline_json = pipeline_json

    def _try_convert_to_pipeline_secret(self, value):
        try:
            if "pipeline_secret" in value:
                value["pipeline_secret"]["type"] = getattr(sys.modules[__name__], value["pipeline_secret"]["type"])
            return PipelineSecret.parse_obj(value["pipeline_secret"])
        except: # NOSONAR
            return value

    def convert(self) -> PipelineJob:
        '''
        Converts a json string to a Pipeline Job
        '''
        pipeline_job_dict = json.loads(self.pipeline_json)

        # convert string component to class
        for task in pipeline_job_dict["task_list"]:
            for step in task["step_list"]:
                step["component"] = getattr(sys.modules[__name__], step["component"])
                for param_key, param_value in step["component_parameters"].items():
                    step["component_parameters"][param_key] = self._try_convert_to_pipeline_secret(param_value)
                    if not isinstance(step["component_parameters"][param_key], PipelineSecret) and isinstance(param_value, dict):
                        for key, value in param_value.items():
                            step["component_parameters"][param_key][key] = self._try_convert_to_pipeline_secret(value) 

        return PipelineJob(**pipeline_job_dict)

convert()

Converts a json string to a Pipeline Job

Source code in src/sdk/python/rtdip_sdk/pipelines/converters/pipeline_job_json.py
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
def convert(self) -> PipelineJob:
    '''
    Converts a json string to a Pipeline Job
    '''
    pipeline_job_dict = json.loads(self.pipeline_json)

    # convert string component to class
    for task in pipeline_job_dict["task_list"]:
        for step in task["step_list"]:
            step["component"] = getattr(sys.modules[__name__], step["component"])
            for param_key, param_value in step["component_parameters"].items():
                step["component_parameters"][param_key] = self._try_convert_to_pipeline_secret(param_value)
                if not isinstance(step["component_parameters"][param_key], PipelineSecret) and isinstance(param_value, dict):
                    for key, value in param_value.items():
                        step["component_parameters"][param_key][key] = self._try_convert_to_pipeline_secret(value) 

    return PipelineJob(**pipeline_job_dict)

PipelineJobToJsonConverter

Bases: ConverterInterface

Converts a Pipeline Job into a json string

Parameters:

Name Type Description Default
pipeline_job PipelineJob

A Pipeline Job consisting of tasks and steps

required
Source code in src/sdk/python/rtdip_sdk/pipelines/converters/pipeline_job_json.py
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
class PipelineJobToJsonConverter(ConverterInterface):
    '''
    Converts a Pipeline Job into a json string

    Args:
        pipeline_job (PipelineJob): A Pipeline Job consisting of tasks and steps
    '''
    pipeline_job: PipelineJob

    def __init__(self, pipeline_job: PipelineJob):
        self.pipeline_job = pipeline_job

    def convert(self):
        '''
        Converts a Pipeline Job to a json string
        '''
        # required because pydantic does not use encoders in subclasses
        for task in self.pipeline_job.task_list:
            step_dict_list = []
            for step in task.step_list:
                step_dict_list.append(json.loads(step.json(models_as_dict=False, exclude_none=True)))
            task.step_list = step_dict_list

        pipeline_job_json = self.pipeline_job.json(exclude_none=True)
        return pipeline_job_json

convert()

Converts a Pipeline Job to a json string

Source code in src/sdk/python/rtdip_sdk/pipelines/converters/pipeline_job_json.py
77
78
79
80
81
82
83
84
85
86
87
88
89
def convert(self):
    '''
    Converts a Pipeline Job to a json string
    '''
    # required because pydantic does not use encoders in subclasses
    for task in self.pipeline_job.task_list:
        step_dict_list = []
        for step in task.step_list:
            step_dict_list.append(json.loads(step.json(models_as_dict=False, exclude_none=True)))
        task.step_list = step_dict_list

    pipeline_job_json = self.pipeline_job.json(exclude_none=True)
    return pipeline_job_json