Skip to content

Convert Forecast Raw JSON data to the Weather Data Model

RawForecastToWeatherDataModel

Bases: TransformerInterface

Converts a raw forecast into weather data model.

Parameters:

Name Type Description Default
spark SparkSession

Spark Session instance.

required
data DataFrame

Dataframe to be transformed

required
Source code in src/sdk/python/rtdip_sdk/pipelines/transformers/spark/the_weather_company/raw_forecast_to_weather_data_model.py
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
class RawForecastToWeatherDataModel(TransformerInterface):
    """
    Converts a raw forecast into weather data model.

    Parameters:
        spark (SparkSession): Spark Session instance.
        data (DataFrame): Dataframe to be transformed
    """

    spark: SparkSession
    data: DataFrame

    def __init__(
        self,
        spark: SparkSession,
        data: DataFrame,
    ) -> None:
        self.spark = spark
        self.data = data
        self.target_schema = WEATHER_DATA_MODEL

    @staticmethod
    def system_type():
        """
        Attributes:
            SystemType (Environment): Requires PYSPARK
        """
        return SystemType.PYSPARK

    @staticmethod
    def libraries():
        libraries = Libraries()
        return libraries

    @staticmethod
    def settings() -> dict:
        return {}

    def pre_transform_validation(self):
        return True

    def post_transform_validation(self) -> bool:
        assert str(self.data.schema) == str(self.target_schema)
        return True

    def _convert_into_target_schema(self) -> None:
        """
        Converts a Spark DataFrame structure into new structure based on the Target Schema.

        Returns: Nothing.

        """

        df: DataFrame = self.data
        df = df.select(self.target_schema.names)

        for field in self.target_schema.fields:
            df = df.withColumn(field.name, col(field.name).cast(field.dataType))

        self.data = self.spark.createDataFrame(df.rdd, self.target_schema)

    def transform(self) -> DataFrame:
        """
        Returns:
            DataFrame: A Forecast dataframe converted into Weather Data Model
        """

        self.pre_transform_validation()

        processed_date = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")

        df = (
            self.data.withColumn("WeatherDay", substring("FcstValidLocal", 0, 10))
            .withColumn(
                "WeatherHour",
                (substring("FcstValidLocal", 12, 2).cast(IntegerType()) + 1),
            )
            .withColumn("WeatherTimezoneOffset", substring("FcstValidLocal", 20, 5))
            .withColumn("WeatherType", lit("F"))
            .withColumn("ProcessedDate", lit(processed_date))
            .withColumnRenamed("Temp", "Temperature")
            .withColumnRenamed("Dewpt", "DewPoint")
            .withColumnRenamed("Rh", "Humidity")
            .withColumnRenamed("Hi", "HeatIndex")
            .withColumnRenamed("Wc", "WindChill")
            .withColumnRenamed("Wdir", "WindDirection")
            .withColumnRenamed("Wspd", "WindSpeed")
            .withColumnRenamed("Clds", "CloudCover")
            .withColumn("WetBulbTemp", lit(""))
            .withColumn("SolarIrradiance", lit(""))
            .withColumnRenamed("Qpf", "Precipitation")
            .withColumnRenamed("DayInd", "DayOrNight")
            .withColumnRenamed("Dow", "DayOfWeek")
            .withColumnRenamed("Gust", "WindGust")
            .withColumnRenamed("Mslp", "MslPressure")
            .withColumnRenamed("Num", "ForecastDayNum")
            .withColumnRenamed("Pop", "PropOfPrecip")
            .withColumnRenamed("PrecipType", "PrecipType")
            .withColumnRenamed("SnowQpf", "SnowAccumulation")
            .withColumnRenamed("UvIndex", "UvIndex")
            .withColumnRenamed("Vis", "Visibility")
        )

        columns = df.columns
        for column in columns:
            df = df.withColumn(
                column, when(col(column) == "", lit(None)).otherwise(col(column))
            )

        self.data = df
        self._convert_into_target_schema()
        self.post_transform_validation()

        return self.data

system_type() staticmethod

Attributes:

Name Type Description
SystemType Environment

Requires PYSPARK

Source code in src/sdk/python/rtdip_sdk/pipelines/transformers/spark/the_weather_company/raw_forecast_to_weather_data_model.py
46
47
48
49
50
51
52
@staticmethod
def system_type():
    """
    Attributes:
        SystemType (Environment): Requires PYSPARK
    """
    return SystemType.PYSPARK

transform()

Returns:

Name Type Description
DataFrame DataFrame

A Forecast dataframe converted into Weather Data Model

Source code in src/sdk/python/rtdip_sdk/pipelines/transformers/spark/the_weather_company/raw_forecast_to_weather_data_model.py
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
def transform(self) -> DataFrame:
    """
    Returns:
        DataFrame: A Forecast dataframe converted into Weather Data Model
    """

    self.pre_transform_validation()

    processed_date = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")

    df = (
        self.data.withColumn("WeatherDay", substring("FcstValidLocal", 0, 10))
        .withColumn(
            "WeatherHour",
            (substring("FcstValidLocal", 12, 2).cast(IntegerType()) + 1),
        )
        .withColumn("WeatherTimezoneOffset", substring("FcstValidLocal", 20, 5))
        .withColumn("WeatherType", lit("F"))
        .withColumn("ProcessedDate", lit(processed_date))
        .withColumnRenamed("Temp", "Temperature")
        .withColumnRenamed("Dewpt", "DewPoint")
        .withColumnRenamed("Rh", "Humidity")
        .withColumnRenamed("Hi", "HeatIndex")
        .withColumnRenamed("Wc", "WindChill")
        .withColumnRenamed("Wdir", "WindDirection")
        .withColumnRenamed("Wspd", "WindSpeed")
        .withColumnRenamed("Clds", "CloudCover")
        .withColumn("WetBulbTemp", lit(""))
        .withColumn("SolarIrradiance", lit(""))
        .withColumnRenamed("Qpf", "Precipitation")
        .withColumnRenamed("DayInd", "DayOrNight")
        .withColumnRenamed("Dow", "DayOfWeek")
        .withColumnRenamed("Gust", "WindGust")
        .withColumnRenamed("Mslp", "MslPressure")
        .withColumnRenamed("Num", "ForecastDayNum")
        .withColumnRenamed("Pop", "PropOfPrecip")
        .withColumnRenamed("PrecipType", "PrecipType")
        .withColumnRenamed("SnowQpf", "SnowAccumulation")
        .withColumnRenamed("UvIndex", "UvIndex")
        .withColumnRenamed("Vis", "Visibility")
    )

    columns = df.columns
    for column in columns:
        df = df.withColumn(
            column, when(col(column) == "", lit(None)).otherwise(col(column))
        )

    self.data = df
    self._convert_into_target_schema()
    self.post_transform_validation()

    return self.data