Skip to content

Resample Function

get(connection, parameters_dict)

An RTDIP Resampling function in spark to resample data by querying databricks SQL warehouses using a connection and authentication method specified by the user. This spark resample function will return a resampled dataframe.

The available connectors by RTDIP are Databricks SQL Connect, PYODBC SQL Connect, TURBODBC SQL Connect.

The available authentication methods are Certificate Authentication, Client Secret Authentication or Default Authentication. See documentation.

This function requires the user to input a dictionary of parameters. (See Attributes table below)

Parameters:

Name Type Description Default
connection object

Connection chosen by the user (Databricks SQL Connect, PYODBC SQL Connect, TURBODBC SQL Connect)

required
parameters_dict dict

A dictionary of parameters (see Attributes table below)

required

Attributes:

Name Type Description
business_unit str

Business unit of the data

region str

Region

asset str

Asset

data_security_level str

Level of data security

data_type str

Type of the data (float, integer, double, string)

tag_names list

List of tagname or tagnames ["tag_1", "tag_2"]

start_date str

Start date (Either a date in the format YY-MM-DD or a datetime in the format YYY-MM-DDTHH:MM:SS or specify the timezone offset in the format YYYY-MM-DDTHH:MM:SS+zz:zz)

end_date str

End date (Either a date in the format YY-MM-DD or a datetime in the format YYY-MM-DDTHH:MM:SS or specify the timezone offset in the format YYYY-MM-DDTHH:MM:SS+zz:zz)

sample_rate int

(deprecated) Please use time_interval_rate instead. See below.

sample_unit str

(deprecated) Please use time_interval_unit instead. See below.

time_interval_rate str

The time interval rate (numeric input)

time_interval_unit str

The time interval unit (second, minute, day, hour)

agg_method str

Aggregation Method (first, last, avg, min, max)

include_bad_data bool

Include "Bad" data points with True or remove "Bad" data points with False

pivot bool

Pivot the data on timestamp column with True or do not pivot the data with False

limit optional int

The number of rows to be returned

offset optional int

The number of rows to skip before returning rows

case_insensitivity_tag_search optional bool

Search for tags using case insensitivity with True or case sensitivity with False

Returns:

Name Type Description
DataFrame DataFrame

A resampled dataframe.

Warning

Setting case_insensitivity_tag_search to True will result in a longer query time.

Source code in src/sdk/python/rtdip_sdk/queries/time_series/resample.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
def get(connection: object, parameters_dict: dict) -> pd.DataFrame:
    """
    An RTDIP Resampling function in spark to resample data by querying databricks SQL warehouses using a connection and authentication method specified by the user. This spark resample function will return a resampled dataframe.

    The available connectors by RTDIP are Databricks SQL Connect, PYODBC SQL Connect, TURBODBC SQL Connect.

    The available authentication methods are Certificate Authentication, Client Secret Authentication or Default Authentication. See documentation.

    This function requires the user to input a dictionary of parameters. (See Attributes table below)

    Args:
        connection: Connection chosen by the user (Databricks SQL Connect, PYODBC SQL Connect, TURBODBC SQL Connect)
        parameters_dict: A dictionary of parameters (see Attributes table below)

    Attributes:
        business_unit (str): Business unit of the data
        region (str): Region
        asset (str):  Asset
        data_security_level (str): Level of data security
        data_type (str): Type of the data (float, integer, double, string)
        tag_names (list): List of tagname or tagnames ["tag_1", "tag_2"]
        start_date (str): Start date (Either a date in the format YY-MM-DD or a datetime in the format YYY-MM-DDTHH:MM:SS or specify the timezone offset in the format YYYY-MM-DDTHH:MM:SS+zz:zz)
        end_date (str): End date (Either a date in the format YY-MM-DD or a datetime in the format YYY-MM-DDTHH:MM:SS or specify the timezone offset in the format YYYY-MM-DDTHH:MM:SS+zz:zz)
        sample_rate (int): (deprecated) Please use time_interval_rate instead. See below.
        sample_unit (str): (deprecated) Please use time_interval_unit instead. See below.
        time_interval_rate (str): The time interval rate (numeric input)
        time_interval_unit (str): The time interval unit (second, minute, day, hour)
        agg_method (str): Aggregation Method (first, last, avg, min, max)
        include_bad_data (bool): Include "Bad" data points with True or remove "Bad" data points with False
        pivot (bool): Pivot the data on timestamp column with True or do not pivot the data with False
        limit (optional int): The number of rows to be returned
        offset (optional int): The number of rows to skip before returning rows
        case_insensitivity_tag_search (optional bool): Search for tags using case insensitivity with True or case sensitivity with False

    Returns:
        DataFrame: A resampled dataframe.

    !!! warning
        Setting `case_insensitivity_tag_search` to True will result in a longer query time.
    """
    if isinstance(parameters_dict["tag_names"], list) is False:
        raise ValueError("tag_names must be a list")

    if "sample_rate" in parameters_dict:
        logging.warning(
            "Parameter sample_rate is deprecated and will be removed in v1.0.0. Please use time_interval_rate instead."
        )
        parameters_dict["time_interval_rate"] = parameters_dict["sample_rate"]

    if "sample_unit" in parameters_dict:
        logging.warning(
            "Parameter sample_unit is deprecated and will be removed in v1.0.0. Please use time_interval_unit instead."
        )
        parameters_dict["time_interval_unit"] = parameters_dict["sample_unit"]

    try:
        query = _query_builder(parameters_dict, "resample")

        try:
            cursor = connection.cursor()
            cursor.execute(query)
            df = cursor.fetch_all()
            cursor.close()
            connection.close()
            return df
        except Exception as e:
            logging.exception("error returning dataframe")
            raise e

    except Exception as e:
        logging.exception("error with resampling function")
        raise e

Example

from rtdip_sdk.authentication.azure import DefaultAuth
from rtdip_sdk.connectors import DatabricksSQLConnection
from rtdip_sdk.queries import resample

auth = DefaultAuth().authenticate()
token = auth.get_token("2ff814a6-3304-4ab8-85cb-cd0e6f879c1d/.default").token
connection = DatabricksSQLConnection("{server_hostname}", "{http_path}", token)

parameters = {
    "business_unit": "{business_unit}",
    "region": "{region}", 
    "asset": "{asset_name}", 
    "data_security_level": "{security_level}", 
    "data_type": "float",
    "tag_names": ["{tag_name_1}", "{tag_name_2}"],
    "start_date": "2023-01-01",
    "end_date": "2023-01-31",
    "time_interval_rate": "15",
    "time_interval_unit": "minute",
    "agg_method": "first",
    "include_bad_data": True,
}
x = resample.get(connection, parameters)
print(x)

This example is using DefaultAuth() and DatabricksSQLConnection() to authenticate and connect. You can find other ways to authenticate here. The alternative built in connection methods are either by PYODBCSQLConnection(), TURBODBCSQLConnection() or SparkConnection().

Note

See Samples Repository for full list of examples.

Note

server_hostname and http_path can be found on the SQL Warehouses Page.