streamin,【原创】StreamInsight查询系列(十九)——查询模式之检测异常

上篇文章介绍了查询模式中如何发现趋势,这篇博文将介绍StreamInsight中如何检测异常。

测试数据准备

为了方便测试查询,我们首先准备一个静态的测试数据源:
var now = DateTime.Parse("09/12/2011 8:57:00 PM"); var input = new[] { new { Time = now + TimeSpan.FromSeconds(1), Value = 20}, new { Time = now + TimeSpan.FromSeconds(2), Value = 30}, new { Time = now + TimeSpan.FromSeconds(3), Value = 120}, new { Time = now + TimeSpan.FromSeconds(4), Value = 200}, new { Time = now + TimeSpan.FromSeconds(5), Value = 20}, new { Time = now + TimeSpan.FromSeconds(6), Value = 110}, new { Time = now + TimeSpan.FromSeconds(7), Value = 110}, new { Time = now + TimeSpan.FromSeconds(8), Value = 210}, new { Time = now + TimeSpan.FromSeconds(9), Value = 120}, new { Time = now + TimeSpan.FromSeconds(10), Value = 130}, new { Time = now + TimeSpan.FromSeconds(11), Value = 20}, new { Time = now + TimeSpan.FromSeconds(12), Value = 30}, };
接下去将上述数据源转变为点类型复杂事件流:
var inputStream = input.ToPointStream(Application, t => PointEvent.CreateInsert(t.Time.ToLocalTime(), new { Value = t.Value }), AdvanceTimeSettings.IncreasingStartTime);

异常检测

问题:怎样每秒1次的计算过去5秒内Value字段值超过阈值100的事件数超过事件总数目80%的异常事件?
首先我们定义一下结果事件流中的负载类型SpikeEvent如下:
struct SpikeEvent { public double Ratio { get; set; } }
我们最终希望调用查询的方式如下:
int threshold = 100; double ratio = 0.8; var resultStream = DetectSpikes( inputStream, threshold, // 指定阈值(超过该阈值的事件被认为是“特殊事件”) ratio, // “特殊事件”占事件总数的百分比 TimeSpan.FromSeconds(5), // 窗口大小 TimeSpan.FromSeconds(1), // 跳跃大小 e => e.Value); // 指定的比较字段
因此最关键的部分就是如何实现DetectSpikes。阅读过
《StreamInsight查询系列(十五)——查询模式之窗口比率》文章的读者应该对此类查询并不陌生。
这里不加过多描述地给出DetectSpikes的实现:
/// /// 在输入流中检测异常 /// /// 输入流事件类型 /// 输入流
/// 异常定义阈值
/// 异常事件占事件总数的百分比
/// 衡量事件数目的窗口大小
/// 跳跃大小
/// 选择输入事件中的某个字段来检测事件类型
/// query that detects the spikes private static CepStream DetectSpikes( CepStream inputStream, int threshold, double ratio, TimeSpan windowSize, TimeSpan hopSize, Expression> fieldSelector) { // 统计跳跃窗口内所有事件的数目 var totalValues = from w in inputStream.HoppingWindow( windowSize, hopSize, HoppingWindowOutputPolicy.ClipToWindowEnd) select new { Count = w.Count(), }; // 构造包含过滤条件的LINQ语句 var parameter = fieldSelector.Parameters.First(); var field = fieldSelector.Body; Expression> filterExpression = (Expression>)Expression.Lambda( Expression.GreaterThan(field, Expression.Constant(threshold)), parameter); // 统计跳跃窗口内异常事件的数目 var bigValues = from w in inputStream.Where(filterExpression).HoppingWindow( windowSize, hopSize, HoppingWindowOutputPolicy.ClipToWindowEnd) select new { Count = w.Count(), }; // 选择异常事件数目超过事件总数一定百分比的事件 var output = from total in totalValues.ToPointEventStream() join big in bigValues.ToPointEventStream() on true equals true where big.Count * 1.0 / total.Count >= ratio select new SpikeEvent { Ratio = big.Count * 1.0 / (total.Count) }; return output; }
输出结果如下:
【原创】StreamInsight查询系列(十九)——查询模式之检测异常streamin
下一篇将介绍StreamInsight查询模式中如何检测空隙事件。
Tags: 

延伸阅读

最新评论

发表评论