Uploaded image for project: 'Spring XD'
  1. Spring XD
  2. XD-2232

Add support for estimating the median value of a data stream.



    • Type: Story
    • Status: To Do
    • Priority: Minor
    • Resolution: Unresolved
    • Affects Version/s: None
    • Fix Version/s: Waiting for Triage
    • Component/s: Analytics
    • Labels:
    • Story Points:
    • Rank (Obsolete):


      Often it is useful to have access to the median value for fields of a data stream since they are more robust with respect to outliers.

      The median is defined as the value of a dataset such that, when sorted, 50% of the data is smaller than the value and 50% of the data is larger then the value. Ordinarily this is difficult to calculate on a stream because it requires the collection and sorting of all data.

      The median of a data stream can be approximated with a technique called stochastic averaging. To approximate the median value of a data stream one could use the following approach:
      Given the current estimate of the median M. If the next observed value in the stream is larger than M, increase the current estimate by r (= the learning rate). If it is smaller, decrease the estimate by r. When M is close to the median, it increases as often as it decreases, and therefore it stabilizes.

      The following example shows a primitive implementation of the above mentioned algorithm in groovy (to be placed under ${XD_HOME}/modules/processor/scripts).

      import org.springframework.xd.tuple.TupleBuilder;
      import org.springframework.xd.tuple.Tuple;
       * Stochastic averaging to compute the median of a data stream.
       * To approximate this value using stochastic optimization, the value
       * of interest is the current estimate of the median M. If the next observed 
       * value of the stream is larger than M, increase by r. If it is smaller decrease the
       * estimate by r. When M is close to the median, it increases as often as it decreases,
       * and therefore stablilizes.
       * r denotes the learningRate.
      enum MedianEstimator{
        //TODO Add support for estimating multiple medians
         * The current median value.
        double current = Double.POSITIVE_INFINITY
        public double update(double value, double learningRate){
          //Initialize current with given value
          if(current == Double.POSITIVE_INFINITY){
             current = value
          if(current == value){
            return current
          //Move current value towards the median
          current = current + (current < value ? learningRate : -learningRate)
          return current
      //TODO Make learning rate configurable
      def learningRate = 0.7
      //TODO Add support for dynamic field selection.
      double median = MedianEstimator.INSTANCE.update(payload.getDouble('value'), learningRate)
      def fieldNames = new ArrayList<String>(payload.getFieldNames())
      def fieldValues = new ArrayList<Object>(payload.getValues())
      //TODO Add make median output field configurable
      //Return the original tuple values extended with the computed median.
      TupleBuilder.tuple().ofNamesAndValues(fieldNames, fieldValues)

      Stream definition:

      xd:>stream create median --definition "http --outputType=application/x-xd-tuple | script --location=median-est.groovy | log" --deploy
      Created and deployed new stream 'median'

      Post some JSON data...

      xd:>http post --contentType application/json --data '{"value":2}'
      > POST (application/json;charset=UTF-8) http://localhost:9000 {"value":2}
      > 200 OK


      20:44:37,829  INFO pool-35-thread-4 sink.median - {"id":"cd9719b3-eeff-59c9-fdf1-fdb628c7fbb8","timestamp":1413139477829,"value":"2","value_median":2.0}

      ... After ~15 the median value should stabilize.

      This approach was taken from the book "Real-time Analytics - Techniques to Analyze and Visualize Streaming Data" P. 296 / Byron Ellis / Wiley

      Open points:

      • Support for resetting the median value
      • Better state management (Redis?)
      • Support median estimation for multiple fields
      • Make learning rate configurable from outside
      • Maybe add this as aggregate-counter aggregation strategy?




            Unassigned Unassigned
            thomasd Thomas Darimont
            0 Vote for this issue
            1 Start watching this issue