The steps for this recipe are as follows:
- Stream Kafka into Databricks. In a Databricks notebook, enter the following code:
from pyspark.sql.types import StringType
import json
import pandas as pd
from sklearn.linear_model import LogisticRegression
df.readStream.format("kafka")
.option("kafka.bootstrap.servers", "...azure.confluent.cloud:9092")
.option("subscribe", "TurboFan")
.option("startingOffsets", "latest")
.option("kafka.security.protocol","SASL_SSL")
.option("kafka.sasl.mechanism", "PLAIN")
.option("kafka.sasl.jaas.config", "kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"Kafka UserName\" password=\"Kafka Password\";")
.load()
.select($"value")
.withColumn("Value", $"value".cast(StringType))
- Specify the fields from the JSON file to serialize them...