Real Time Analytics with Azure Event Hubs, Cloudera, and Azure SQL

Paige Liu

In this blog post, I will demonstrate how to ingest data from Azure Event Hubs to Spark Streaming running on Cloudera EDH, process the data in real time using Spark SQL, and write the results to Azure SQL database.  Alternatively, data processing can also be done using Impala.  This example uses the same data generator as described in this article about Azure Stream Analytics.

Scenario: 

A telecommunications company has a large volume of data for incoming calls.  The company needs to -

  1. Obtain insights about customer usage aggregated across geographical regions in real time
  2. Detect SIM fraud (multiple calls coming from the same identity around the same time but in geographically different locations) in real time

Ingesting data:

Data ingestion is the same as described in the aforementioned article about Stream Analytics.  Follow these main steps:

Step 1: Create an Event Hubs Input 

  1. Go to the Azure portal, click New -> App Services -> Service Bus -> Event Hub -> Quick Create to create a new Event Hub
  2. Go to the newly created Event Hub, on the Consumer Groups tab, click Create at the bottom of the page to create a new consumer group
  3. On the Configure tab of the Event Hub, under Shared Access Policies, create a new policy with Manage permissions
  4. Save the changes, and navigate to the Dashboard, click Connection Information at the bottom of the page, and copy down the connection information

Step 2: Download and configure the sample app that can generate sample call records and push them to Event Hubs

  1. Download the event generator sample app from GitHub 
  2. Replace the Microsoft.ServiceBus.ConnectionString and EvenHubName values in App.Config with your Event Hub connection string and name
  3. Run the solution

telcodatagen [#NumCDRsPerHour] [SIM Card Fraud Probability] [#DurationHours]

The following example will generate 1000 events with a 20% probability of fraud over the course of 2 hours

telcodatagen 1000 .2 2

Step 3: Optionally you can verify that you are able to receive data from Event Hubs using the Java client for Event Hubs.  

There are two reasons you may want to do this:

  1. Become familiar with how Java client for Event Hubs works, what packages are needed, and how to build it etc, which can be helpful when we run Spark Streaming in Scala below
  2. Make sure the ingestion pipeline works

Processing data with Spark Streaming and Spark SQL on Cloudera:

If you don't already have a Cloudera cluster, you can install it from a simple UI wizard on Azure Marketplace, which is also used in this example.  But the cluster doesn't have to be in any specific environment, as long as you have Spark on YARN installed on the cluster.  Please note that different versions of Spark do require some changes in the code.  Here, we are using Spark 1.3.0 installed with Cloudera 5.4.

You can find the source code for this Spark job on GitHub.   Below are the key steps:

Running the Spark job in Spark-Shell

Before submitting a Spark job in cluster mode, it's a good idea to run Spark interactively with spark-shell for easy troubleshooting.  In the YARN client mode, the Spark driver runs on the local machine, so any additional jars must be available on the local machine.  Any additional jars required by the Spark executors must be available on the Spark executor nodes as well.  For simplicity, we copied all the necessary jars to the /libs folder on both the Spark driver and the executors.

spark-shell --master yarn --deploy-mode client --executor-cores 4 -usejavacp --jars /libs/spark-streaming-eventhubs_2.10-0.1.0.jar,/libs/eventhubs-client-0.9.1.jar,/libs/qpid-amqp-1-0-client-0.32.jar,/libs/qpid-amqp-1-0-client-jms-0.32.jar,/libs/qpid-amqp-1-0-common-0.32.jar,/libs/qpid-client-0.32.jar,/libs/qpid-common-0.32.jar --driver-class-path /libs/sqljdbc4.jar --conf spark.executor.extraClassPath=/libs/sqljdbc4.jar

Step 1: Receive data in Spark Streaming from Event Hubs

In the interactive command prompt of spark-shell, you can paste the following Scala code to run the Spark Streaming job, replacing the parameters with your own.   

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.streaming.eventhubs.EventHubsUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}

val streamBatchIntervalInSeconds = 60
val ehParams = Map[String, String](
"eventhubs.policyname" -> "your event hubs rule name",
"eventhubs.policykey" -> "your event hubs policy key",
"eventhubs.namespace" -> "your event hubs namespace",
"eventhubs.name" -> "your event hubs name",
"eventhubs.partition.count" -> "2", //executor core count must be twice that of partition count
"eventhubs.consumergroup" -> "your event hubs consumer group name",
"eventhubs.checkpoint.dir" -> "your spark checkpoint dir in hdfs", //for simplicity we are not using reliable receiver with checkpoint in this example
"eventhubs.checkpoint.interval" -> "600")
val ssc = new StreamingContext(sc, Seconds(streamBatchIntervalInSeconds))
val stream = EventHubsUtils.createUnionStream(ssc, ehParams)
val lines = stream.map(msg => new String(msg))
lines.print()
ssc.start()

Start the telcodatagen mentioned earlier to generate some data for input.  If you can see each line received from Event Hubs printed out in the spark-shell, congratulations, you have everything wired up correctly.  

Step 2: Run real time queries in Spark SQL  

Replace "lines.print()" in the above Scala code with the following code.

 //convert international datetime string to unix time
val isoformat = new java.text.SimpleDateFormat("yyyy-MM-dd'T'hh:mm:ss'Z'")
def date2long: (String => Long) = (s: String) => try { isoformat.parse(s).getTime() } catch { case _: Throwable => 0 }
val sqlfunc = udf(date2long)

//define sqlContext
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
import org.apache.spark.sql.functions._

//process the data using Spark SQL
lines.foreachRDD { rdd => if (rdd.count() > 0) {
val rawdf = sqlContext.jsonRDD(rdd);
// convert international time string to unix time for easy calculation
val df = rawdf.withColumn("callTime", sqlfunc(rawdf("callrecTime")));
// use the min time of this mini batch as the timestamp for the aggregated entry
val minTime = df.filter(df("callTime") > 0).select("callTime").first().getLong(0);
// real time aggregation by region
val callsByRegion = df.groupBy("SwitchNum").count().withColumnRenamed("count", "callCount").withColumn("callTimeStamp", lit(minTime));
callsByRegion.show();
}
}

Now you should be able to see aggregated results in the spark-shell console output.  Note that the aggregation is applied directly on the Spark SQL DataFrame without having to run a SQL query.  For more complex data analysis, such as detecting fraud in this example, we can run a query like the following:

df.registerTempTable("calls");
val fraudCalls = sqlContext.sql("SELECT CS1.CallingIMSI, CS1.callrecTime as calltime1, CS2.callrecTime as calltime2, CS1.CallingNum as CallingNum1, CS2.CallingNum as CallingNum2, CS1.SwitchNum as Switch1, CS2.SwitchNum as Switch2 FROM calls CS1 inner JOIN calls CS2 ON CS1.CallingIMSI = CS2.CallingIMSI AND (CS1.callTime - CS2.callTime) between 1000 AND 5000 WHERE CS1.SwitchNum != CS2.SwitchNum order by CS1.CallingIMSI, CS1.callrecTime");

Note that this query will fail in Spark 1.3.0, probably related to a bug with self joins in Spark SQL.  We will demonstrate how to run this query in Impala instead later in this article.

Step 3: Store results in Azure SQL Server

Create a table in Azure SQL Server:

CREATE TABLE callsByRegion (
SwitchNum NVARCHAR (20) NOT NULL,
callCount INT NOT NULL,
callTimeStamp BIGINT NOT NULL
);
CREATE CLUSTERED INDEX IX_callsByRegion ON callsByRegion(callTimeStamp ASC);

Add a jdbc connection string variable anywhere before "lines.foreachRDD..." in the above Scala code, replacing parameters with your own:

val jdbccxn = "jdbc:sqlserver://<your azure db server>.database.windows.net:1433;database=<your db name>;user=<your db username>;password=<your password>;encrypt=false;loginTimeout=30;"

Replace "callsByRegion.show()" in the above Scala code with the following code:

//save real time aggregation to sql azure
callsByRegion.insertIntoJDBC(jdbccxn, "callsByRegion", false);

Run the Spark job again and check your Azure SQL Server.  You should see aggregated data in the callsByRegion table in your database.

Running the Spark job with Spark-Submit

Once everything runs fine in spark-shell, for production workload, you would want to submit the job to the Spark cluster.  Using spark-submit, you can run the job either in client mode or cluster mode.  You can build the Scala application into a jar and apply dependencies at run time, or you can build a uber jar that has all the dependencies in it.  Check out this example on GitHub to see how to build the jars in maven.

Running the job in YARN client mode

Note that the order of the parameters in the following commands is important.  --class and app jar must be at the end of each command. 

Using a jar without dependencies:

spark-submit --master yarn --deploy-mode client --executor-cores 4 --jars /libs/spark-streaming-eventhubs_2.10-0.1.0.jar,/libs/eventhubs-client-0.9.1.jar,/libs/qpid-amqp-1-0-client-0.32.jar,/libs/qpid-amqp-1-0-client-jms-0.32.jar,/libs/qpid-amqp-1-0-common-0.32.jar,/libs/qpid-client-0.32.jar,/libs/qpid-common-0.32.jar --driver-class-path /libs/sqljdbc4.jar --conf spark.executor.extraClassPath=/libs/sqljdbc4.jar --class com.sparkeventhub.sample.SparkEventHubSample ./app.jar

Using a uber jar with dependencies:

spark-submit --master yarn --deploy-mode client --executor-cores 4 --driver-class-path /libs/sqljdbc4.jar --conf spark.executor.extraClassPath=/libs/sqljdbc4.jar --class com.sparkeventhub.sample.SparkEventHubSample ./app-jar-with-dependencies.jar

Running the job in YARN cluster mode

Using a jar without dependencies:

spark-submit --master yarn --deploy-mode cluster  --executor-cores 4 --driver-cores 2 --jars /libs/spark-streaming-eventhubs_2.10-0.1.0.jar,/libs/eventhubs-client-0.9.1.jar,/libs/qpid-amqp-1-0-client-0.32.jar,/libs/qpid-amqp-1-0-client-jms-0.32.jar,/libs/qpid-amqp-1-0-common-0.32.jar,/libs/qpid-client-0.32.jar,/libs/qpid-common-0.32.jar --driver-class-path /libs/sqljdbc4.jar --conf spark.executor.extraClassPath=/libs/sqljdbc4.jar --class com.sparkeventhub.sample.SparkEventHubSample ./app.jar

Using a uber jar with dependencies:

spark-submit --master yarn --deploy-mode cluster --executor-cores 4 --driver-cores 2 --driver-class-path /libs/sqljdbc4.jar --conf spark.executor.extraClassPath=/libs/sqljdbc4.jar --class com.sparkeventhub.sample.SparkEventHubSample ./app-jar-with-dependencies.jar

Processing data with Impala

Instead of running queries in Spark SQL, we can alternatively store the data in hdfs, and run queries in real time with Impala.  To store data in HDFS, add a variable to the output file path in HDFS anyhere before "lines.foreachRDD..." in the above Scala code, replacing the path with your own:

import org.apache.spark.sql.SaveMode
val outputDir = "sparkoutput/calls"

Store the data in HDFS by adding the following code at the end of the "lines.foreachRDD..." block:

df.registerTempTable("rawcalls");
val calls=sqlContext.sql("SELECT callrecTime, SwitchNum, CallingIMSI, CallingNum, CalledNum, callTime from rawcalls");
calls.save(outputDir, SaveMode.Append);

Run the job again, and you should be able to see parquet files being stored in the specified HDFS folder.  The folder may be relative to the current HDFS user home directory, for example "/user/hdfs/sparkoutput/calls".  Create an external table in Impala:

create external table calls (callrecTime string, SwitchNum string, CallingIMSI string, CallingNum string, CalledNum string, callTime bigint) stored as parquet location '/user/hdfs/sparkoutput/calls'

Query the data in Impala:

SELECT CS1.CallingIMSI, CS1.callrecTime as calltime1, CS2.callrecTime as calltime2, CS1.CallingNum as CallingNum1, CS2.CallingNum as CallingNum2, CS1.SwitchNum as Switch1, CS2.SwitchNum as Switch2 FROM calls CS1 inner JOIN calls CS2 ON CS1.CallingIMSI = CS2.CallingIMSI AND (CS1.callTime - CS2.callTime) between 1000 AND 5000 WHERE CS1.SwitchNum != CS2.SwitchNum order by CS1.CallingIMSI, CS1.callrecTime

There you have it.  I'm new to Spark Streaming, Spark SQL, and Impala.  Would love to learn if there are better ways to do this.