The steps for this recipe are as follows:
- In Databricks, start a new notebook and enter the information needed to connect to IoT Hub. Then, enter the following code:
import datetime as dt
import json
ehConf = {}
ehConf['eventhubs.connectionString'] = ["The connection string you copies"]
ehConf['eventhubs.consumerGroup'] = "[The consumer group you created]"
startingEventPosition = {
"offset": -1,
"seqNo": -1, #not in use
"enqueuedTime": None, #not in use
"isInclusive": True
}
endingEventPosition = {
"offset": None, #not in use
"seqNo": -1, #not in use
"enqueuedTime": endTime,
"isInclusive": True
}
ehConf["eventhubs.recieverTimeout"] = 100
- Put the data into a Spark DataFrame:
df = spark \
.readStream \
.format("eventhubs") \
.options(**ehConf) \
.load()
- The next step is to apply a structure to the data so that you can use structured streaming:
from pyspark.sql.types import *
Schema = StructType([StructField("deviceEndSessionTime", StringType()), StructField("sensor1", StringType()),
StructField("sensor2", StringType()),
StructField("deviceId", LongType()),
])
- The final step is to apply the schema to a DataFrame. This allows you to work with the data as if it were a table:
from pyspark.sql.functions import *
rawData = df. \
selectExpr("cast(Body as string) as json"). \
select(from_json("json", Schema).alias("data")). \
select("data.*")