Anomaly Detection in Time Series Data using Azure Cognitive Services

An anomaly is a data point that does not follow the majority of data points’ collective common pattern and hence stands out from the rest of the data. These unusual or anomalous events can be detrimental to any corporation that relies on time-series data for its long-term strategy, and they must be monitored in order to mitigate the risks.

Training Data scientists and Data analysts to perform anomaly detection effectively can add to organisation’s training costs. Azure Cognitive Service can help you in these situations. It has an Anomaly detection API, which is a cloud-based Anomaly detection service that can be used to detect anomalies in massive time series data using a simple REST API call.

Process Flow

As shown in the below figure, we will create a Time Trigger Function that would get triggered at the scheduled time. The Function would read the content from a .csv file in an Azure blob storage container and call Azure Cognitive Service – Anomaly detection API. The API would detect anomalies in the time series data and return it as output. This output will be uploaded to blob storage container.

Prerequisites

  1. An Azure Subscription
  2. Time series data in Azure Blob Storage

Provision Azure Cognitive Service – Anomaly Detector

Provision Anomaly Detector Service to invoke client APIs and detect anomalies in time-series data in the .csv file in the Azure blob storage.

  • Login to Azure Portal
  • Search for “Anomaly Detector” and Click on Create
  • Provide Name, Region and Pricing Tier and Click on Review + Create
  • Go to Resource > Keys and Endpoint. Copy Values of Key and Endpoint. These values would be configured for calling the APIs

Create Azure Function

Create a time trigger Azure Function. This trigger will get invoked at a scheduled time to read data from blob storage for Anomaly Detection.

  • Open Visual Studio
  • Click on Create a new project
  • Search for Azure Functions Template
  • Select the template and click on Next
  • Provide Project Name and click on Create
  • Select “Timer Trigger“. Configure Schedule by providing CRON expression. Click on Create.

Add Cognitive Service Nuget Package

  • Right Click on Project > Manage Nuget Packages
  • Search for “Newtonsoft.Json”. Select and Click on Install
  • Search for “AnomalyDetector”. Select and Click on Install

Create Azure Cognitive Services – Anomaly Detection Client

  • Right Click on Project > Add > New Item…
  • Select “Class“, provide name “DetectAnomalyClient.cs” and click Add
  • Update the DetectAnomalyClient.cs with below Code.

In this code, the static DetectAnomalyClient class will be used to create a client object, get time-series data from the file and get anomalies from the dataset.

using System;
using System.Collections.Generic;
using System.IO;
using System.Threading.Tasks;
using Microsoft.Azure.CognitiveServices.AnomalyDetector;
using Microsoft.Azure.CognitiveServices.AnomalyDetector.Models;
using System.Text;
using System.Linq;
 
namespace DemoFuntionApp
{
    public static class DetectAnomalyClient
    {

     public static IAnomalyDetectorClient createClient(string endpoint, string key)
        {
            IAnomalyDetectorClient client = new AnomalyDetectorClient(new Azure.CognitiveServices.AnomalyDetector.ApiKeyServiceClientCredentials(key))
            {
                Endpoint = endpoint
            };
            return client;
        }


        public static List<Point> GetSeriesFromFile(string str)
        {
            List<Point> seriesData = new List<Point>();

            string[] groups = str.Split(new char[] { '\n' }, StringSplitOptions.RemoveEmptyEntries);
            foreach (string group in groups)
            {
                string[] coords = group.Split(',');
                seriesData.Add(new Point(DateTime.Parse(coords[0]), Double.Parse(coords[1])));
            }

            return seriesData;
        }

        public static string GetAnomaliesData(List<Point> series, string anomalyindices)
        {
            string returnstring = "";
            if (anomalyindices != "")
            {
                String[] indices = anomalyindices.Split(',');
                foreach (string i in indices)
                {
                    int index = int.Parse(i);
                    returnstring = returnstring + series[index];
                    returnstring = returnstring + "\n";
                }
            }

            return returnstring;
        }

    }
}

Create Anomaly Detection Class

  • Right Click on Project > Add > New Item…
  • Select “Class“, provide name “DetectAnomaly.cs” and click Add
  • Update the DetectAnomaly.cs with below Code.

In this code, we have created a static RunAsync() method which will get invoked from the Trigger function to detect the anomalies in the time series data set and return the output back to the trigger

using System;
using System.Collections.Generic;
using System.IO;
using System.Threading.Tasks;
using Microsoft.Azure.CognitiveServices.AnomalyDetector;
using Microsoft.Azure.CognitiveServices.AnomalyDetector.Models;
using System.Text;
using System.Linq;

namespace DemoFuntionApp
{
    public static class DetectAnomaly()
    {
        public static async Task<string> RunAsync(string endpoint, string subscriptionKey, string subscriptionRegion, string data)
        {
            try
            {
                IAnomalyDetectorClient client = DetectAnomalyClient.createClient(endpoint, subscriptionKey); //Anomaly Detector client

                List<Point> series = DetectAnomalyClient.GetSeriesFromFile(data);
                Request request = new Request(series, Granularity.Daily);

                EntireDetectResponse response = EntireDetectSampleAsync(client, request).Result; // Async method for batch anomaly detection
                String resultdataset = ReturnAnomalyDataset(request, response).Result;
                if (resultdataset.ToString().Length > 0)
                {
                    Console.WriteLine(resultdataset);
                    return resultdataset;
                }
                else return "No anomalies detected in the series.";
            }
            catch(Exception e)
            {
                return e.ToString();
            }
         

        }

        static async Task<EntireDetectResponse> EntireDetectSampleAsync(IAnomalyDetectorClient client, Request request)
        {
            Console.WriteLine("Detecting anomalies in the entire time series.");

            EntireDetectResponse result = await client.EntireDetectAsync(request).ConfigureAwait(false);

            return result;

        }

        static async Task<string> ReturnAnomalyDataset(Request request, EntireDetectResponse result)
        {
            Console.WriteLine("Detecting anomalies in the entire time series.");

            String returnstring = "Timestamp,\tInputValue,\tPeriod,\tExpectedValues,\tUpperMargins,\tLowerMargins,\tIsNegativeAnomaly,\tIsPositiveAnomaly\n";


            if (result.IsAnomaly.Contains(true))
            {
                Console.WriteLine("An anomaly was detected at index:");
                for (int i = 0; i < request.Series.Count; ++i)
                {
                    if (result.IsAnomaly[i])
                    {
                        returnstring = returnstring + ($"{request.Series[i].Timestamp},\t{request.Series[i].Value},\t{ result.Period},\t{result.ExpectedValues[i]},\t{result.UpperMargins[i]},\t{result.LowerMargins[i]},\t{result.IsNegativeAnomaly[i]},\t{result.IsPositiveAnomaly[i]}");
                        if (returnstring != "")
                            returnstring = returnstring + "\n";
                        returnstring = returnstring + i;

                    }
                }
                
            }
            return returnstring;

        }

    }
}

Update the Azure Function

  • Right Click on the Function and rename it.
  • Update below code in the Function.
  • Update blobconnection string, Azure cognitive service endpoint , subscription key and subscription region.

The below code triggers at a scheduled time. Reads the content from the csv file in blob storage and calls the RunAsync method on the DetectAnomaly class. The output of the method is written back to the blob storage.

using System;
using System.IO;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Host;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.Blob;

namespace DemoFuntionApp
{
    public static class DetectAnomalyFn
    {
        [FunctionName("DetectAnomalyFn")]
        public static void Run([BlobTrigger("<input container>/{name}", Connection = "blobconnstr")] Stream myBlob,
            [Blob("<output container>/{name}.txt", FileAccess.Write)] Stream validationOutput, string name, ILogger log,
             ExecutionContext context)
        {

            var config = new ConfigurationBuilder()
               .SetBasePath(context.FunctionAppDirectory)
               .AddJsonFile("local.settings.json", optional: true, reloadOnChange: true)
               .AddEnvironmentVariables()
               .Build();
            var Endpoint = config["ENDPOINT"];
            var SubscriptionKey = config["SUBSCRIPTION_KEY"];
            var SubscriptionRegion = config["SUBSCRIPTION_REGION"];

            StreamReader reader = new StreamReader(myBlob);
            string fileContent = reader.ReadToEnd();

            var output =
                DetectAnomaly.RunAsync(Endpoint, SubscriptionKey,SubscriptionRegion, fileContent).Result;
            using (var sw = new StreamWriter(validationOutput))
            {
                try
                {
                    sw.Write(output);
                    sw.Flush();
                }
                catch (Exception ex)
                {
                    log.LogInformation(ex.Message);
                }
            }
        }

        public static CloudBlobContainer GetContainerObj(string containerName)
        {
            CloudStorageAccount storageAccount = CloudStorageAccount.Parse("<Storage Connection String");

            CloudBlobClient blobClient = storageAccount.CreateCloudBlobClient();
            CloudBlobContainer blobContainer = blobClient.GetContainerReference(containerName);
            return blobContainer;
        }

        private static string GetBlobSasUri(CloudBlobContainer container, string blobName, string policyName = null)
        {
            string sasBlobToken;

CloudBlockBlob blob = container.GetBlockBlobReference(blobName);
            if (policyName == null)
            {

                SharedAccessBlobPolicy adHocSAS = new SharedAccessBlobPolicy()
                {
                    SharedAccessExpiryTime = DateTime.UtcNow.AddHours(24),
                    Permissions = SharedAccessBlobPermissions.Read | SharedAccessBlobPermissions.Write | SharedAccessBlobPermissions.Create
                };


                sasBlobToken = blob.GetSharedAccessSignature(adHocSAS);

                Console.WriteLine("SAS for blob (ad hoc): {0}", sasBlobToken);
                Console.WriteLine();
            }
            else
            {
                sasBlobToken = blob.GetSharedAccessSignature(null, policyName);

                Console.WriteLine("SAS for blob (stored access policy): {0}", sasBlobToken);
                Console.WriteLine();
            }
            return blob.Uri + sasBlobToken;
        }
    }
}

Azure cognitive service – Anomaly Detection API provides capabilities to detect anomalies in not only the entire dataset but on the latest chunk of data as well. You can tweak your code to detect anomalies in only the latest dataset, which saves a lot of processing time when the datasets are huge.



Categories: Azure, Azure Cognitive Service

Tags: , , ,

Leave a Reply