Bases: SourceInterface
The Spark Kinesis Source is used to read data from Kinesis in a Databricks environment.
Structured streaming from Kinesis is not supported in open source Spark.
Args:
spark (SparkSession): Spark Session required to read data from Kinesis
options (dict): Options that can be specified for a Kinesis read operation (See Attributes table below). Further information on the options is available here
Attributes:
awsAccessKey (str): AWS access key.
awsSecretKey (str): AWS secret access key corresponding to the access key.
streamName (List[str]): The stream names to subscribe to.
region (str): The region the streams are defined in.
endpoint (str): The regional endpoint for Kinesis Data Streams.
initialPosition (str): The point to start reading from; earliest, latest, or at_timestamp.
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/kinesis.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93 | class SparkKinesisSource(SourceInterface):
"""
The Spark Kinesis Source is used to read data from Kinesis in a Databricks environment.
Structured streaming from Kinesis is **not** supported in open source Spark.
Args:
spark (SparkSession): Spark Session required to read data from Kinesis
options (dict): Options that can be specified for a Kinesis read operation (See Attributes table below). Further information on the options is available [here](https://docs.databricks.com/structured-streaming/kinesis.html#configuration){ target="_blank" }
Attributes:
awsAccessKey (str): AWS access key.
awsSecretKey (str): AWS secret access key corresponding to the access key.
streamName (List[str]): The stream names to subscribe to.
region (str): The region the streams are defined in.
endpoint (str): The regional endpoint for Kinesis Data Streams.
initialPosition (str): The point to start reading from; earliest, latest, or at_timestamp.
"""
spark: SparkSession
options: dict
def __init__(self, spark: SparkSession, options: dict) -> None:
self.spark = spark
self.options = options
self.schema = KINESIS_SCHEMA
@staticmethod
def system_type():
"""
Attributes:
SystemType (Environment): Requires PYSPARK_DATABRICKS
"""
return SystemType.PYSPARK_DATABRICKS
@staticmethod
def libraries():
libraries = Libraries()
return libraries
@staticmethod
def settings() -> dict:
return {}
def pre_read_validation(self):
return True
def post_read_validation(self, df: DataFrame) -> bool:
assert df.schema == self.schema
return True
def read_batch(self):
"""
Raises:
NotImplementedError: Kinesis only supports streaming reads. To perform a batch read, use the read_stream method of this component and specify the Trigger on the write_stream to be `availableNow=True` to perform batch-like reads of cloud storage files.
"""
raise NotImplementedError(
"Kinesis only supports streaming reads. To perform a batch read, use the read_stream method and specify Trigger on the write_stream as `availableNow=True`"
)
def read_stream(self) -> DataFrame:
"""
Reads streaming data from Kinesis. All of the data in the table is processed as well as any new data that arrives after the stream started.
"""
try:
return (
self.spark.readStream.format("kinesis").options(**self.options).load()
)
except Py4JJavaError as e:
logging.exception(e.errmsg)
raise e
except Exception as e:
logging.exception(str(e))
raise e
|
read_batch()
Raises:
Type |
Description |
NotImplementedError
|
Kinesis only supports streaming reads. To perform a batch read, use the read_stream method of this component and specify the Trigger on the write_stream to be availableNow=True to perform batch-like reads of cloud storage files.
|
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/kinesis.py
| def read_batch(self):
"""
Raises:
NotImplementedError: Kinesis only supports streaming reads. To perform a batch read, use the read_stream method of this component and specify the Trigger on the write_stream to be `availableNow=True` to perform batch-like reads of cloud storage files.
"""
raise NotImplementedError(
"Kinesis only supports streaming reads. To perform a batch read, use the read_stream method and specify Trigger on the write_stream as `availableNow=True`"
)
|
read_stream()
Reads streaming data from Kinesis. All of the data in the table is processed as well as any new data that arrives after the stream started.
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/kinesis.py
80
81
82
83
84
85
86
87
88
89
90
91
92
93 | def read_stream(self) -> DataFrame:
"""
Reads streaming data from Kinesis. All of the data in the table is processed as well as any new data that arrives after the stream started.
"""
try:
return (
self.spark.readStream.format("kinesis").options(**self.options).load()
)
except Py4JJavaError as e:
logging.exception(e.errmsg)
raise e
except Exception as e:
logging.exception(str(e))
raise e
|
system_type()
staticmethod
Attributes:
Name |
Type |
Description |
SystemType |
Environment
|
Requires PYSPARK_DATABRICKS
|
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/kinesis.py
| @staticmethod
def system_type():
"""
Attributes:
SystemType (Environment): Requires PYSPARK_DATABRICKS
"""
return SystemType.PYSPARK_DATABRICKS
|