Specifications

160 CHAPTER 8 Complex Event Processing with StreamInsight
Assume you want to apply the Sum and Avg aggregations to eld x in an input stream. The
following example shows you how to use these aggregations as well as the Count aggrega-
tion for each snapshot window:
var outputStream = from eventWindow in inputStream.Snapshot()
select new { sum = eventWindow.Sum(e => e.x),
avg = eventWindow.Avg(e => e.x),
count = eventWindow.Count() };
TopK
A special type of aggregation is the TopK operation, which you use to rank and lter events in
an ordered window stream. To order a window stream, you use the orderby clause. Then you
use the Take method to specify the number of events that you want to send to the output
stream, discarding all other events. The following code shows how to produce a stream of the
top three events:
var outputStream = (from eventWindow in inputStream.Snapshot()
from e in eventWindow
orderby e.x ascending, e.y descending
select e).Take(3);
When you need to include the rank in the output stream, you use projection to add the
rank to each event’s payload. This is accessible through the Payload property, as shown in the
following code:
var outputStream = (from eventWindow in inputStream.Snapshot()
from e in eventWindow
orderby e.x ascending, e.y descending
select e).Take(3, e=> new { x = e.Payload.x, y = e.Payload.y, rank = e.Rank });
Grouping
When you want to compute operations on event groups separately, you add a group by
clause. For example, you might want to produce an output stream that aggregates the input
stream by location and compute the average for eld x for each location. In the following
example, the code illustrates how to create the grouping by location and how to aggregate
events over a specied column:
var outputStream = from e in inputStream
group e by e.locationID into eachLocation
from eventWindow in eachLocation.Snapshot()
select new { avgValue = eventWindow.Avg(e => e.x), locationId = eachGroup.Key };