Skip to content

Time Weighted Average

get(connection, parameters_dict)

A function that recieves a dataframe of raw tag data and performs a timeweighted average, returning the results.

This function requires the input of a pandas dataframe acquired via the rtdip.functions.raw() method and the user to input a dictionary of parameters. (See Attributes table below)

Pi data points will either have step enabled (True) or step disabled (False). You can specify whether you want step to be fetched by "Pi" or you can set the step parameter to True/False in the dictionary below.

Parameters:

Name Type Description Default
connection object

Connection chosen by the user (Databricks SQL Connect, PYODBC SQL Connect, TURBODBC SQL Connect)

required
parameters_dict dict

A dictionary of parameters (see Attributes table below)

required

Attributes:

Name Type Description
business_unit str

Business unit

region str

Region

asset str

Asset

data_security_level str

Level of data security

data_type str

Type of the data (float, integer, double, string)

tag_names list

List of tagname or tagnames

start_date str

Start date (Either a utc date in the format YYYY-MM-DD or a utc datetime in the format YYYY-MM-DDTHH:MM:SS or specify the timezone offset in the format YYYY-MM-DDTHH:MM:SS+zz:zz)

end_date str

End date (Either a utc date in the format YYYY-MM-DD or a utc datetime in the format YYYY-MM-DDTHH:MM:SS or specify the timezone offset in the format YYYY-MM-DDTHH:MM:SS+zz:zz)

window_size_mins int

Window size in minutes

window_length int

(Optional) add longer window time for the start or end of specified date to cater for edge cases

include_bad_data bool

Include "Bad" data points with True or remove "Bad" data points with False

step str

data points with step "enabled" or "disabled". The options for step are "metadata", "true" or "false". "metadata" will get the step requirements from the metadata table if applicable.

Returns:

Name Type Description
DataFrame pd.DataFrame

A dataframe containing the time weighted averages.

Source code in src/sdk/python/rtdip_sdk/queries/time_series/time_weighted_average.py
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
def get(connection: object, parameters_dict: dict) -> pd.DataFrame:
    '''
    A function that recieves a dataframe of raw tag data and performs a timeweighted average, returning the results. 

    This function requires the input of a pandas dataframe acquired via the rtdip.functions.raw() method and the user to input a dictionary of parameters. (See Attributes table below)

    Pi data points will either have step enabled (True) or step disabled (False). You can specify whether you want step to be fetched by "Pi" or you can set the step parameter to True/False in the dictionary below.

    Args:
        connection: Connection chosen by the user (Databricks SQL Connect, PYODBC SQL Connect, TURBODBC SQL Connect)
        parameters_dict (dict): A dictionary of parameters (see Attributes table below)
    Attributes:
        business_unit (str): Business unit 
        region (str): Region
        asset (str): Asset 
        data_security_level (str): Level of data security 
        data_type (str): Type of the data (float, integer, double, string)
        tag_names (list): List of tagname or tagnames
        start_date (str): Start date (Either a utc date in the format YYYY-MM-DD or a utc datetime in the format YYYY-MM-DDTHH:MM:SS or specify the timezone offset in the format YYYY-MM-DDTHH:MM:SS+zz:zz)
        end_date (str): End date (Either a utc date in the format YYYY-MM-DD or a utc datetime in the format YYYY-MM-DDTHH:MM:SS or specify the timezone offset in the format YYYY-MM-DDTHH:MM:SS+zz:zz)
        window_size_mins (int): Window size in minutes
        window_length (int): (Optional) add longer window time for the start or end of specified date to cater for edge cases
        include_bad_data (bool): Include "Bad" data points with True or remove "Bad" data points with False
        step (str): data points with step "enabled" or "disabled". The options for step are "metadata", "true" or "false". "metadata" will get the step requirements from the metadata table if applicable.
    Returns:
        DataFrame: A dataframe containing the time weighted averages.
    '''
    try:  
        dtz_format = "%Y-%m-%dT%H:%M:%S%z"
        utc = "Etc/UTC"

        def is_date_format(dt, format):
            try:
                return datetime.strptime(dt , format)
            except Exception:
                return False

        def set_dtz(dt, is_end_date = False):
            if is_date_format(dt, "%Y-%m-%d"):
                _time = "T23:59:59" if is_end_date == True else "T00:00:00"
                return dt + _time + "+00:00"
            elif is_date_format(dt, "%Y-%m-%dT%H:%M:%S"):
                return dt + "+00:00"
            elif is_date_format(dt, "%Y-%m-%dT%H:%M:%S%z"):
                return dt
            else: 
                raise ValueError(f"Inputted datetime: '{dt}', is not in the correct format")

        parameters_dict["start_date"] = set_dtz(parameters_dict["start_date"])
        parameters_dict["end_date"] = set_dtz(parameters_dict["end_date"], True)

        #used to only return data between start and end date (at the end of function)
        original_start_date = datetime.strptime(parameters_dict["start_date"], dtz_format).replace(tzinfo=pytz.timezone(utc))
        original_end_date = datetime.strptime(parameters_dict["end_date"], dtz_format).replace(tzinfo=pytz.timezone(utc))

        if "window_length" in parameters_dict:       
            parameters_dict["start_date"] = (datetime.strptime(parameters_dict["start_date"], dtz_format) - timedelta(minutes = int(parameters_dict["window_length"]))).strftime(dtz_format)
            parameters_dict["end_date"] = (datetime.strptime(parameters_dict["end_date"], dtz_format) + timedelta(minutes = int(parameters_dict["window_length"]))).strftime(dtz_format) 
        else:
            parameters_dict["start_date"] = (datetime.strptime(parameters_dict["start_date"], dtz_format) - timedelta(minutes = int(parameters_dict["window_size_mins"]))).strftime(dtz_format)
            parameters_dict["end_date"] = (datetime.strptime(parameters_dict["end_date"], dtz_format) + timedelta(minutes = int(parameters_dict["window_size_mins"]))).strftime(dtz_format)

        pandas_df = raw_get(connection, parameters_dict)

        pandas_df["EventDate"] = pd.to_datetime(pandas_df["EventTime"]).dt.date  

        boundaries_df = pd.DataFrame(columns=["EventTime", "TagName"])
        for tag in parameters_dict["tag_names"]:
            start_date_new_row = pd.DataFrame([[pd.to_datetime(parameters_dict["start_date"], utc=True).replace(tzinfo=pytz.timezone(utc)), tag]], columns=["EventTime", "TagName"])
            end_date_new_row = pd.DataFrame([[pd.to_datetime(parameters_dict["end_date"], utc=True).replace(tzinfo=pytz.timezone(utc)), tag]], columns=["EventTime", "TagName"])
            boundaries_df = pd.concat([boundaries_df, start_date_new_row, end_date_new_row], ignore_index=True)
        boundaries_df.set_index(pd.DatetimeIndex(boundaries_df["EventTime"]), inplace=True)
        boundaries_df.drop(columns="EventTime", inplace=True)
        boundaries_df = boundaries_df.groupby(["TagName"]).resample("{}T".format(str(parameters_dict["window_size_mins"]))).ffill().drop(columns='TagName')

        #preprocess - add boundaries and time interpolate missing boundary values
        preprocess_df = pandas_df.copy()
        preprocess_df["EventTime"] = preprocess_df["EventTime"].round("S")
        preprocess_df.set_index(["EventTime", "TagName", "EventDate"], inplace=True)
        preprocess_df = preprocess_df.join(boundaries_df, how="outer", rsuffix="right")
        if isinstance(parameters_dict["step"], str) and parameters_dict["step"].lower() == "metadata":
            metadata_df = metadata_get(connection, parameters_dict)
            metadata_df.set_index("TagName", inplace=True)
            metadata_df = metadata_df.loc[:, "Step"]
            preprocess_df = preprocess_df.merge(metadata_df, left_index=True, right_index=True)
        elif parameters_dict["step"].lower() == "true":
            preprocess_df["Step"] =  True
        elif parameters_dict["step"].lower() == "false":
            preprocess_df["Step"] = False
        else:
            raise Exception('Unexpected step value', parameters_dict["step"]) # NOSONAR

        def process_time_weighted_averages_step(pandas_df):
            if pandas_df["Step"].any() == False:
                pandas_df = pandas_df.reset_index(level=["TagName", "EventDate"]).sort_index().interpolate(method='time')
                shift_raw_df = pandas_df.copy()
                shift_raw_df["CalcValue"] = (shift_raw_df.index.to_series().diff().dt.seconds/86400) * shift_raw_df.Value.rolling(2).sum()
                time_weighted_averages = shift_raw_df.resample("{}T".format(str(parameters_dict["window_size_mins"])), closed="right", label="right").CalcValue.sum() * 0.5 / parameters_dict["window_size_mins"] * 24 * 60
                return time_weighted_averages
            else:
                pandas_df = pandas_df.reset_index(level=["TagName", "EventDate"]).sort_index().interpolate(method='pad', limit_direction='forward')
                shift_raw_df = pandas_df.copy()
                shift_raw_df["CalcValue"] = (shift_raw_df.index.to_series().diff().dt.seconds/86400) * shift_raw_df.Value.shift(1)
                time_weighted_averages = shift_raw_df.resample("{}T".format(str(parameters_dict["window_size_mins"])), closed="right", label="right").CalcValue.sum() / parameters_dict["window_size_mins"] * 24 * 60
                return time_weighted_averages

        #calculate time weighted averages
        time_weighted_averages = preprocess_df.groupby(["TagName"]).apply(process_time_weighted_averages_step).reset_index()

        if "CalcValue" not in time_weighted_averages.columns:
            time_weighted_averages = time_weighted_averages.melt(id_vars="TagName", var_name="EventTime", value_name="Value")
        else: 
            time_weighted_averages = time_weighted_averages.rename(columns={"CalcValue": "Value"})

        time_weighted_averages = time_weighted_averages.set_index("EventTime").sort_values(by=["TagName", "EventTime"])
        time_weighted_averages_datetime = time_weighted_averages.index.to_pydatetime()
        weighted_averages_timezones = np.array([z for z in time_weighted_averages_datetime])
        time_weighted_averages = time_weighted_averages[(original_start_date < weighted_averages_timezones) & (weighted_averages_timezones <= original_end_date + timedelta(seconds = 1))]
        return time_weighted_averages

    except Exception as e:
        logging.exception('error with time weighted average function', str(e))
        raise e

Example

from rtdip_sdk.authentication.azure import DefaultAuth
from rtdip_sdk.connectors import DatabricksSQLConnection
from rtdip_sdk.queries import time_weighted_average

auth = DefaultAuth().authenticate()
token = auth.get_token("2ff814a6-3304-4ab8-85cb-cd0e6f879c1d/.default").token
connection = DatabricksSQLConnection("{server_hostname}", "{http_path}", token)

parameters = {
    "business_unit": "Business Unit",
    "region": "Region", 
    "asset": "Asset Name", 
    "data_security_level": "Security Level", 
    "data_type": "float", #options:["float", "double", "integer", "string"]
    "tag_names": ["tag_1", "tag_2"], #list of tags
    "start_date": "2023-01-01", #start_date can be a date in the format "YYYY-MM-DD" or a datetime in the format "YYYY-MM-DDTHH:MM:SS" or specify the timezone offset in the format "YYYY-MM-DDTHH:MM:SS+zz:zz"
    "end_date": "2023-01-31", #end_date can be a date in the format "YYYY-MM-DD" or a datetime in the format "YYYY-MM-DDTHH:MM:SS" or specify the timezone offset in the format "YYYY-MM-DDTHH:MM:SS+zz:zz"
    "window_size_mins": 15, #numeric input
    "window_length": 20, #numeric input
    "include_bad_data": True, #options: [True, False]
    "step": True
}
x = time_weighted_average.get(connection, parameters)
print(x)

This example is using DefaultAuth() and DatabricksSQLConnection() to authenticate and connect. You can find other ways to authenticate here. The alternative built in connection methods are either by PYODBCSQLConnection(), TURBODBCSQLConnection() or SparkConnection().

Note

server_hostname and http_path can be found on the SQL Warehouses Page.