Uploaded image for project: 'Spring XD'
  1. Spring XD
  2. XD-2232

Add support for estimating the median value of a data stream.

    XMLWordPrintable

    Details

    • Type: Story
    • Status: To Do
    • Priority: Minor
    • Resolution: Unresolved
    • Affects Version/s: None
    • Fix Version/s: Waiting for Triage
    • Component/s: Analytics
    • Labels:
      None
    • Story Points:
      5
    • Rank (Obsolete):
      46134

      Description

      Often it is useful to have access to the median value for fields of a data stream since they are more robust with respect to outliers.

      The median is defined as the value of a dataset such that, when sorted, 50% of the data is smaller than the value and 50% of the data is larger then the value. Ordinarily this is difficult to calculate on a stream because it requires the collection and sorting of all data.

      The median of a data stream can be approximated with a technique called stochastic averaging. To approximate the median value of a data stream one could use the following approach:
      Given the current estimate of the median M. If the next observed value in the stream is larger than M, increase the current estimate by r (= the learning rate). If it is smaller, decrease the estimate by r. When M is close to the median, it increases as often as it decreases, and therefore it stabilizes.

      The following example shows a primitive implementation of the above mentioned algorithm in groovy (to be placed under ${XD_HOME}/modules/processor/scripts).

      import org.springframework.xd.tuple.TupleBuilder;
      import org.springframework.xd.tuple.Tuple;
      
      /**
       * Stochastic averaging to compute the median of a data stream.
       * To approximate this value using stochastic optimization, the value
       * of interest is the current estimate of the median M. If the next observed 
       * value of the stream is larger than M, increase by r. If it is smaller decrease the
       * estimate by r. When M is close to the median, it increases as often as it decreases,
       * and therefore stablilizes.
       * r denotes the learningRate.
       */
      enum MedianEstimator{
        
        INSTANCE
      
        //TODO Add support for estimating multiple medians
        /*
         * The current median value.
         */
        double current = Double.POSITIVE_INFINITY
        
        public double update(double value, double learningRate){
          
          //Initialize current with given value
          if(current == Double.POSITIVE_INFINITY){
             current = value
          }
          
          if(current == value){
            return current
          }
      
          //Move current value towards the median
          current = current + (current < value ? learningRate : -learningRate)
      
          return current
        }
      }
      
      //TODO Make learning rate configurable
      def learningRate = 0.7
      //TODO Add support for dynamic field selection.
      double median = MedianEstimator.INSTANCE.update(payload.getDouble('value'), learningRate)
      
      def fieldNames = new ArrayList<String>(payload.getFieldNames())
      def fieldValues = new ArrayList<Object>(payload.getValues())
      
      //TODO Add make median output field configurable
      fieldNames.add("value_median")
      fieldValues.add(median)
      
      //Return the original tuple values extended with the computed median.
      TupleBuilder.tuple().ofNamesAndValues(fieldNames, fieldValues)
      

      Stream definition:

      xd:>stream create median --definition "http --outputType=application/x-xd-tuple | script --location=median-est.groovy | log" --deploy
      Created and deployed new stream 'median'
      

      Post some JSON data...

      xd:>http post --contentType application/json --data '{"value":2}'
      > POST (application/json;charset=UTF-8) http://localhost:9000 {"value":2}
      > 200 OK
      

      Output:

      20:44:37,829  INFO pool-35-thread-4 sink.median - {"id":"cd9719b3-eeff-59c9-fdf1-fdb628c7fbb8","timestamp":1413139477829,"value":"2","value_median":2.0}
      

      ... After ~15 the median value should stabilize.

      This approach was taken from the book "Real-time Analytics - Techniques to Analyze and Visualize Streaming Data" P. 296 / Byron Ellis / Wiley

      Open points:

      • Support for resetting the median value
      • Better state management (Redis?)
      • Support median estimation for multiple fields
      • Make learning rate configurable from outside
      • Maybe add this as aggregate-counter aggregation strategy?

        Attachments

          Activity

            People

            Assignee:
            Unassigned Unassigned
            Reporter:
            thomasd Thomas Darimont
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

              Dates

              Created:
              Updated: