How to read azure storage and write to Azure SQL using Databricks

Databricks project originally initiated by UC Berkeley and then latter known as Apache Spark. It provides all analytics and Big Data processing service. Currently it is part of Microsoft and named as Microsoft Azure Databricks

We are using Databricks in Azure platform as  a single platform for Big Data processing and Machine Learning. It provides data science and data engineering teams with a fast, easy and collaborative Spark-based platform on Azure. 

In this post, I'll walk through how you can connect with Azure storage, read your files and write them in your Azure SQL. Basic Data Engineering steps. Implementing machine learning inelegancy or drill down level data aggregation is out of scope.

Let's start by creating Databricks first. To create Databricks, search Azure Databricks in your Azure portal and click on it to create.


Once created, you can see this at your Azure portal. Click on it to go to this resource home page.

At the home page, click on Launch Workspace button.


Now, Databricks workspace opens for you. Left navigation menu bar guide you to go to right place. First thing we need to do here is to create a cluster. To create cluster, click on Clusters menu and then click on 'Create Cluster' button. In the new cluster information page, write your cluster name, select mode as Standard. Then select Runtime version. At my end, I selected 7.3 LTS (Scala 2.12, Spark 3.0.1). Unchecked Enable autoscaling and  put 30 in Terminate after text box. Click on 'Create Cluster' button. Your cluster will be created in next couple of minutes.

Once, your cluster created, then navigate to Workspace menu in left navigation, and then select user and then click on your email dropdown. Next, select Create menu and then select Notebook.

Here, write the Notebook name and select Python as your default language and then select your Cluster(under which your code will run).



Next, upload your files in Azure storage. Here I uploaded couple of data files in CSV format.

Start coding. Go back to your Notebook and write the configuration details first. Collect information from Azure portal like your storage account name, container name, Key1 information along with your SQL details. You must have your database to store data from Databricks. In this post, I used Azure SQL DB.

#source confifuration
storage_account_name = "sjvstorage"
storage_account_access_key = "<key 1>"
#DBA config details
jdbcHostname = "sjvsql.database.windows.net"
jdbcDatabase = "<databse name>"
username = '<username>'
password = '<password>'
jdbcPort = 1433
jdbcUrl = "jdbc:sqlserver://{0}:{1};database={2}".format(jdbcHostname, jdbcPort, jdbcDatabase)
#connect SQl properties
connectionProperties = {
  "user" : username,
  "password" : password,
  "driver" : "com.microsoft.sqlserver.jdbc.SQLServerDriver"


Next is to let spark connect with Azure by running the following command.

#connect to Azure
spark.conf.set(
  "fs.azure.account.key."+storage_account_name+".blob.core.windows.net",
  storage_account_access_key)

Now, going to read the data. Here we need the Azure Blob Container name and the file path. Here defining the FilePath for individual input files and then load the data in DataFrame. Once data is loaded, you can check what are the different types of filed loaded into your dataframe.

blob_container = 'input'
wellheaderFilePath = "wasbs://" + blob_container + "@" + storage_account_name + ".blob.core.windows.net/WellHeaderData.csv"
wellHeaderDF = spark.read.format("csv").load(wellheaderFilePath, inferSchema = True, header = True)
wellfluidFilePath = "wasbs://" + blob_container + "@" + storage_account_name + ".blob.core.windows.net/WellFluidProductionData.csv"
wellfluidDF = spark.read.format("csv").load(wellfluidFilePath, inferSchema = True, header = True)
outputFilePath = "wasbs://" + blob_container + "@" + storage_account_name + ".blob.core.windows.net/WellOutput"


Now, you can start playing with your data. You can view selected columns from your record set.

wellHeaderDF.select("FID", "WellName", "APINo", "Label", "County", "State").show()

 

wellfluidDF.select("FID", "WellName", "APINo", "Permit", "FluidType", "FluidVolume", "FluidVolumeUnits", "Temperature_F", "Pressure_psi").show()


Next, to create temporary view on dataframe. This allows to play on the data, wrangling, filtering, implementing analytics etc you can start doing on your view data.

wellHeaderDF.createOrReplaceTempView("vWellHeader")
wellfluidDF.createOrReplaceTempView("vWellfluid")


Now, implement your wrangle magic using SQL. It is one of the finest part of Databricks - allows you to run your existing/known SQL query. 
%sql
SELECT A.APINo as API14, A.WellName as WellName, A.State as State, A.County as County , B.FluidVolume as FluidVolume, B.IntervalStartDateTime as IntervalStartDateTime , A.EndedDrillingDate as DrillingEndDate FROM vWellHeader A 
inner join vWellfluid B  on A.FID=B.FID
WHERE ltrim(rtrim(B.FluidVolume))<>'null'

Below code to load filtered wrangling data into dataframe.

dfSQLData=spark.sql("""SELECT A.APINo as API14, A.WellName as WellName, A.State as State, A.County as County , B.FluidVolume as FluidVolume, B.IntervalStartDateTime as IntervalStartDateTime , A.EndedDrillingDate as DrillingEndDate FROM vWellHeader A 
inner join vWellHeaderF B  on A.FID=B.FID
WHERE ltrim(rtrim(B.FluidVolume))<>'null'""")

Before write your dimension data(which is getting created by your SQL against the dataframe), make sure you have created the SQL table. Here, I have created the table from Azure Portal using SQL Preview. However, you can use SQL Management Studio as well. 

CREATE TABLE WellDimension(
    WellID INT NOT NULL IDENTITY PRIMARY KEY,
    APINo varchar(255),
   WellName varchar(255),
   State_ varchar(255),
   County varchar(255),
   FluidVolume int,
   IntervalStartDateTime varchar(255),
   DrillingEndDate varchar(255)
);

Just to make sure there is no data currently exist in my Dimension table. 


To save data to Azure SQL, using pyspark and pandas library, call the DataFrameWriter() by passing the SQL.

#using pandas
from pyspark.sql import *
import pandas as pd
filterDF = DataFrameWriter(dfSQLData)
filterDF.jdbc(url=jdbcUrl, table= "WellDimension", mode ="overwrite", properties = connectionProperties) 

Once the spark job completed execution, you can click on the view link and see how the job executed. 

Time to validate. Check on your SQL on using COUNT(*) and SELECT * from <DimensionTable> whether record get inserted and how many with details.



Below are a few reference url which will help you on developing/calling API.


I hope this post will help you. Let me know if you have any questions and I'd be more than happy to help here. Happy coding!

Comments

Popular posts from this blog

How to fix Azure DevOps error MSB4126

How to create Custom Visuals in Power BI – Initial few Steps

Entity Framework common error - no such table: __EFMigrationsHistory + ConnectionString property has not been initialized + certificate chain was issued by an authority that is not trusted