Spark with scala

build.sbt

ThisBuild / version      := "0.1.0"
ThisBuild / scalaVersion := "2.12.13"
ThisBuild / organization := "bkr" 

lazy val KafkaStreamProcessing = (project in file("."))
  .settings(
    name := "Bkr.Spark",

    libraryDependencies += "org.apache.spark" %% "spark-core" % "3.1.1",
    libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.1.1",
    libraryDependencies += "org.apache.spark" %% "spark-sql-kafka-0-10" % "3.1.1",
    libraryDependencies += "org.apache.spark" %% "spark-avro" % "3.1.1", 
    
    // version is critical, or ADLS will fail
    libraryDependencies += "org.apache.hadoop" % "hadoop-common" % "3.3.1",
    libraryDependencies += "org.apache.hadoop" % "hadoop-azure" % "3.3.1",

    libraryDependencies += "org.scalatest" %% "scalatest" % "3.0.8" % Test,

    // deserializer
    libraryDependencies ++= List(
                          "io.circe" %% "circe-core" % "0.14.1",
                          "io.circe" %% "circe-generic" % "0.14.1",
                          "io.circe" %% "circe-parser" % "0.14.1"),
  )

Configure connection


import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

class SparkSessionBuilder {

    def build(appName: String, config: AppConfig): SparkSession = {

        val sparkConfig = config.sparkMasterCluster match {
            case "LOCAL" => new SparkConf()
                            .setAppName(appName)
                            .setMaster("local")

            case _ => new SparkConf()
                            .setAppName(appName)
        }

        val spark = SparkSession
                        .builder
                        .config(sparkConfig)
                        .getOrCreate()

        // blob storage
        spark
            .sparkContext
            .hadoopConfiguration
            .set(s"fs.azure.account.key.${config.azBlobStorageConfig.accountName}.blob.core.windows.net", config.azBlobStorageConfig.accountKey)

        // adls gen2
        spark
            .sparkContext
            .hadoopConfiguration
            .set(s"fs.azure.account.key.${config.azDataLakeStorageConfig.accountName}.dfs.core.windows.net", config.azDataLakeStorageConfig.accountKey)

        spark        
    }

}

Azure blob storage

import org.apache.spark.{SparkConf, SparkSession}
import org.apache.spark.sql._
import org.apache.spark.sql.streaming._
import org.apache.hadoop._

class AzBlobStorageDataSource(config: AzBlobStorageConfig) {

    def read(spark: SparkSession, container: String, blob: String) : DataFrame = {

        spark.read
                .option("inferSchema", "true")
                .json(s"wasbs://$container@${config.accountName}.blob.core.windows.net/$blob")
    }
}

ADLS Gen2 sink

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql._
import org.apache.spark.sql.streaming._
import org.apache.hadoop._

class AzDataLakeStorageDataSink(config: AzDataLakeStorageConfig) extends DataSink {

    def save[T](ds: Dataset[T], container: String, blob: String) = {
        ds.coalesce(1)
            .write
            .json(s"abfss://$container@${config.accountName}.dfs.core.windows.net/$blob")
    }
}