Saturday, November 28, 2015

Find median of a random stream

In statistics and probability theory, a median is the number separating the higher half of a data sample, a population, or a probability distribution, from the lower half. The median of a finite list of numbers can be found by arranging all the observations from lowest value to highest value and picking the middle one (e.g., the median of {3, 3, 5, 9, 11} is 5). If there is an even number of observations, then there is no single middle value; the median is then usually defined to be the mean of the two middle values (the median of {3, 5, 7, 9} is (5 + 7) / 2 = 6), which corresponds to interpreting the median as the fully trimmed mid-range. [Wikipedia]

Hacker Rank Question: The median of a finite list of numbers can be found by arranging all the integers from lowest to highest value and picking the middle one. For example, the median of {3,3,5,9,11} is 5. If there is an even number of integers, then there is no single middle value, and the median is then usually defined to be the mean of the two middle values. For examples, the median of {3,5,7,9} is (5+7)2=6. [Hackerrank]

The comparison of different approach and optimal algorithm is explained in detail in reference [1] & [2]. However, following is the steps for implementation using heap.

  1. Define two heaps, min heap and max heap.
  2. Insert data into min heap if data from stream is greater than median else insert into max heap.
  3. Calculate new median.
    1. If the count of data in min heap is greater than that of max heap by 1, median is root of min heap.
    2. If the count of data in max heap is greater than that of min heap by 1, median is root of max heap.
    3. If the count of data in both the heaps are same, then the median is the average of data in root of min and max heap.
  4. If the count of data in either of the heaps differ by more than 1, the data from one heap needs to be moved to other heap.
    1. If the count of data in min heap is more, move the minimum data i.e. root of heap to max heap.
    2. If the count of data in max heap is more, move the maximum data i.e. root of heap to min heap.
Following is the C implementation of above steps. However, for a stream of 100000 data the algorithm takes 18.70 secs.
#include <stdio.h>
#include <string.h>
#include <math.h>
#include <stdlib.h>

//pointers to array which hold maxHeap and minHeap
int* maxHeap;
int* minHeap;
//length of the arrays
int maxHeapLen = 0, minHeapLen = 0;
//median
float median;

int getLeftChildIndex(int cIndx) {
    cIndx += 1;
    return (2 * cIndx - 1);
}

int getRightChildIndex(int cIndx) {
    cIndx += 1;
    return (2 * cIndx);
}

void maxHeapify(void) {
    if(maxHeap == NULL || maxHeapLen == 0)
        return;
    
    int lChildIndx, rChildIndx;
    
    for(int indx = maxHeapLen/2 + 1; indx >= 0; indx--) {
        lChildIndx = getLeftChildIndex(indx);
        rChildIndx = getRightChildIndex(indx);
        int tmp;
        
        if(lChildIndx < maxHeapLen && maxHeap[indx] <= maxHeap[lChildIndx]) {
            tmp = maxHeap[indx];
            maxHeap[indx] = maxHeap[lChildIndx];
            maxHeap[lChildIndx] = tmp;
        } 
        
        if(rChildIndx < maxHeapLen && maxHeap[indx] <= maxHeap[rChildIndx]) {
            tmp = maxHeap[indx];
            maxHeap[indx] = maxHeap[rChildIndx];
            maxHeap[rChildIndx] = tmp;
        }
    }
}

void minHeapify(void) {
    if(minHeap == NULL || minHeapLen == 0)
        return;
    
    int lChildIndx, rChildIndx;
    
    for(int indx = minHeapLen/2 + 1; indx >= 0; indx--) {
        lChildIndx = getLeftChildIndex(indx);
        rChildIndx = getRightChildIndex(indx);
        int tmp;
        
        if(lChildIndx < minHeapLen && minHeap[indx] >= minHeap[lChildIndx]) {
            tmp = minHeap[indx];
            minHeap[indx] = minHeap[lChildIndx];
            minHeap[lChildIndx] = tmp;
        } 
        
        if(rChildIndx < minHeapLen && minHeap[indx] >= minHeap[rChildIndx]) {
            tmp = minHeap[indx];
            minHeap[indx] = minHeap[rChildIndx];
            minHeap[rChildIndx] = tmp;
        }
    }
}

//extracts the root from the heap and runs heapify
int extractMax(void) {
    if(maxHeap == NULL || maxHeapLen == 0)
        return -1;
    
    int max = maxHeap[0];
    maxHeap[0] = maxHeap[maxHeapLen - 1];
    maxHeapLen -= 1;
    maxHeapify();
    
    return max;
}

int extractMin(void) {
    if(minHeap == NULL || minHeapLen == 0)
        return -1;
    
    int min = minHeap[0];
    minHeap[0] = minHeap[minHeapLen - 1];
    minHeapLen -= 1;
    minHeapify();
    
    return min;
}

//inserts new data element to the heaps and runs heapify
void insertMaxHeap(int data) {
    maxHeap[maxHeapLen] = data;
    maxHeapLen += 1;
    maxHeapify();
}

void insertMinHeap(int data) {
    minHeap[minHeapLen] = data;
    minHeapLen += 1;
    minHeapify();
}

//generates median after receiving data element from steam
void genMedian(int num) {
 if(num >= median) {
     insertMinHeap(num);
    } else {
  insertMaxHeap(num);
    }
       
    if(abs(maxHeapLen - minHeapLen) >= 2) {
  if(maxHeapLen > minHeapLen)
   insertMinHeap(extractMax());
  else
   insertMaxHeap(extractMin());       
 }
       
 if(minHeapLen > maxHeapLen) {
        median = minHeap[0];
    } else if(minHeapLen < maxHeapLen) {
     median = maxHeap[0];
    } else {
        median = (maxHeap[0] + minHeap[0])/2.0;
    }
}

The above gprof results shows that getLeftChildIndex() and getRightChildIndex() functions are called the most. These function calls can be optimized by inline-ing as follows.
int getLeftChildIndex(int) __attribute__((always_inline));
int getRightChildIndex(int) __attribute__((always_inline));

//pointers to array which hold maxHeap and minHeap
int* maxHeap;
int* minHeap;
//length of the arrays
int maxHeapLen = 0, minHeapLen = 0;
//median
float median;

int inline getLeftChildIndex(int cIndx) {
    cIndx += 1;
    return (2 * cIndx - 1);
}

int inline getRightChildIndex(int cIndx) {
    cIndx += 1;
    return (2 * cIndx);
}

Inline-ing reduces the execution time by almost 4.5 secs.

The maxHeapify() and minHeapify() functions still taking most of the program execution time which are being called more than 150k times.

Median calculation depends on
  1. The maximum data of left half and minimum data of right half of the sorted series and
  2. If the left half and right half must be balanced (the length of left and right half should not exceed more than 1) if the length of one half exceeds that of another by 1
So, the heapify operations need not be called every time a new data elements arrives from stream if, the above conditions are met.
//generates median after receiving data element from steam
void genMedian(int num) {
 if(num >= median) {
  if(minHeapLen > 0 && num < minHeap[0]) {
   int tmp = minHeap[0];
   minHeap[0] = num;
   minHeap[minHeapLen] = tmp;
   minHeapLen += 1;
  } else {
   minHeap[minHeapLen] = num;
   minHeapLen += 1;
  }
    } else {
  if(maxHeapLen > 0 && num > maxHeap[0]) {
   int tmp = maxHeap[0];
   maxHeap[0] = num;
   maxHeap[maxHeapLen] = tmp;
   maxHeapLen += 1;
  } else {
   maxHeap[maxHeapLen] = num;
   maxHeapLen += 1;
  }
    }
       
    if(abs(maxHeapLen - minHeapLen) >= 2) {
  if(maxHeapLen > minHeapLen)
   insertMinHeap(extractMax());
  else
   insertMaxHeap(extractMin());       
 }
       
 if(minHeapLen > maxHeapLen) {
        median = minHeap[0];
    } else if(minHeapLen < maxHeapLen) {
     median = maxHeap[0];
    } else {
        median = (maxHeap[0] + minHeap[0])/2.0;
    }
}


The above optimization reduced the call to heapify functions by half and the execution time reduces to almost 4.5 secs.


References
  1. http://www.geeksforgeeks.org/median-of-stream-of-integers-running-integers/
  2. http://algorithmsandme.in/2014/08/heaps-find-median-of-stream-of-integers/
  3. http://www.ardendertat.com/2011/11/03/programming-interview-questions-13-median-of-integer-stream/

No comments:

Post a Comment