 # Add support for estimating the median value of a data stream.

XMLWordPrintable

#### Details

• Type: Story
• Status: To Do
• Priority: Minor
• Resolution: Unresolved
• Affects Version/s: None
• Fix Version/s:
• Component/s:
• Labels:
None
• Story Points:
5
• Rank (Obsolete):
46134

#### Description

Often it is useful to have access to the median value for fields of a data stream since they are more robust with respect to outliers.

The median is defined as the value of a dataset such that, when sorted, 50% of the data is smaller than the value and 50% of the data is larger then the value. Ordinarily this is difficult to calculate on a stream because it requires the collection and sorting of all data.

The median of a data stream can be approximated with a technique called stochastic averaging. To approximate the median value of a data stream one could use the following approach:
Given the current estimate of the median M. If the next observed value in the stream is larger than M, increase the current estimate by r (= the learning rate). If it is smaller, decrease the estimate by r. When M is close to the median, it increases as often as it decreases, and therefore it stabilizes.

The following example shows a primitive implementation of the above mentioned algorithm in groovy (to be placed under \${XD_HOME}/modules/processor/scripts).

```import org.springframework.xd.tuple.TupleBuilder;
import org.springframework.xd.tuple.Tuple;

/**
* Stochastic averaging to compute the median of a data stream.
* To approximate this value using stochastic optimization, the value
* of interest is the current estimate of the median M. If the next observed
* value of the stream is larger than M, increase by r. If it is smaller decrease the
* estimate by r. When M is close to the median, it increases as often as it decreases,
* and therefore stablilizes.
* r denotes the learningRate.
*/
enum MedianEstimator{

INSTANCE

//TODO Add support for estimating multiple medians
/*
* The current median value.
*/
double current = Double.POSITIVE_INFINITY

public double update(double value, double learningRate){

//Initialize current with given value
if(current == Double.POSITIVE_INFINITY){
current = value
}

if(current == value){
return current
}

//Move current value towards the median
current = current + (current < value ? learningRate : -learningRate)

return current
}
}

//TODO Make learning rate configurable
def learningRate = 0.7
//TODO Add support for dynamic field selection.

//TODO Add make median output field configurable

//Return the original tuple values extended with the computed median.
TupleBuilder.tuple().ofNamesAndValues(fieldNames, fieldValues)
```

Stream definition:

```xd:>stream create median --definition "http --outputType=application/x-xd-tuple | script --location=median-est.groovy | log" --deploy
Created and deployed new stream 'median'
```

Post some JSON data...

```xd:>http post --contentType application/json --data '{"value":2}'
> POST (application/json;charset=UTF-8) http://localhost:9000 {"value":2}
> 200 OK
```

Output:

```20:44:37,829  INFO pool-35-thread-4 sink.median - {"id":"cd9719b3-eeff-59c9-fdf1-fdb628c7fbb8","timestamp":1413139477829,"value":"2","value_median":2.0}
```

... After ~15 the median value should stabilize.

This approach was taken from the book "Real-time Analytics - Techniques to Analyze and Visualize Streaming Data" P. 296 / Byron Ellis / Wiley

Open points:

• Support for resetting the median value
• Better state management (Redis?)
• Support median estimation for multiple fields
• Make learning rate configurable from outside
• Maybe add this as aggregate-counter aggregation strategy?

#### People

Assignee: Unassigned
Reporter: Thomas Darimont