es的C# api 的封装和使用说明

栏目: ASP.NET · 发布时间: 6年前

内容简介:说明:

说明:

  1. C#的elastic client通过ConnectionSettings来指定es server的地址

  2. C#的Request有多种类型封装,比如Search相关的封装SearchRequest, GetAlias相关的GetAliasRequest, Count相关的CountRequest, ping相关的PingRequest等。  即,es默认支持的关键字都有对应的request

  3. C#的Query有matchQuery、DateRangeQuery等, 相互之间用条件运算符可以组合查询, 最终生成的restful 请求头转换为bool query了

  4. C#的Aggregation也有TermsAggregation、DateHistogramAggregation等, 用来普通分类和时间分类等, 不像js,需要自己用字符串指定。 返回值也要在Aggregations里查看。

public async Task<IReadOnlyCollection<DateHistogramBucket>> GetScanPatientStatistics( string systemID, string startDate, string endDate, DateInterval dateInterval, string indices = "*scanstatistics*" )

{

var client = GetClient();

var filterPath = new string [] { "aggregations.*.buckets" };

var request = new SearchRequest(indices)

{

Query = new MatchQuery() { Field = "systemID" , Query = systemID }

&& new DateRangeQuery() { Field = "timeStamp" , GreaterThanOrEqualTo = startDate, LessThanOrEqualTo = endDate, Format = "yyyyMMdd" },

Aggregations = new DateHistogramAggregation( "agg_date_histogram_timeStamp" )

{

Field = "timeStamp" ,

Interval = dateInterval,

Format = "yyyy-MM-dd HH:mm:ss" ,

Aggregations = new ValueCountAggregation( "agg_count_systemID" , "systemID.keyword" )

},

FilterPath = filterPath

};

var response = await client.SearchAsync<CTDMS<CTDMSItem>>(request);

var dateHistogram = response.Aggregations.DateHistogram( "agg_date_histogram_timeStamp" );

return dateHistogram.Buckets;

}

  1. Buckets可以转为Dictionary

public async Task<Dictionary<DateTime, double ?>> GetScanPatientStatistics( string systemID, DateRangeEnum dateRange, DateInterval dateInterval, string indices = "*scanstatistics*" )

{

var buckets = await GetScanPatientStatistics(systemID, DateRangeEnumExtensions.ToString(dateRange), "now" , dateInterval, indices);

return buckets.ToDictionary(k => k.Date, v => v.Values.OfType<ValueAggregate>().First().Value);

}

  1. C#对于各种查询都有数据类型校验, 这点和js api不相同, 因此必须指定类型

代码如下:

namespace My.ElasticSearch

{

/// <summary>

/// sample code 如下:

///  var search = new SearchDomainService();

//获取当月的每天温度

/*

var result = search.GetDetectorTemp("987654321098",

DetectorEnum.DetectorTempL,

DateRangeEnum.CurrentMonth,

DateInterval.Day).GetAwaiter().GetResult();

*/

//获取当前es中的所有index

/*

var indices = search.GetIndices().GetAwaiter().GetResult();

*/

//当前服务是否ok

/*

var isAvailable = search.IsAvailable().GetAwaiter().GetResult();

*/

//获取扫描量

/*

var count = search.GetScanCount("987654321098", "now/y").GetAwaiter().GetResult();

*/

//获取每一类部位的扫描量

/*

var stat = search.GetScanBodypartStatistics("987654321098", DateRangeEnum.CurrentYear).GetAwaiter().GetResult();

*/

//获取最后一次上传日志的时间

/*

var date = search.GetLatestUploadTime("987654321098").GetAwaiter().GetResult();

*/

/*

var date = search.GetLatestUploadTime<CTScanStatistics<CTScanStatisticsItem>, CTScanStatisticsItem>("987654321098").GetAwaiter().GetResult();

*/

/// </summary>

public class ElasticSearchDomainService

{

private ConnectionSettings _connectionSettings;

private ElasticClient _client = null;

public ElasticSearchDomainService()

{

_connectionSettings = new ConnectionSettings(new Uri(" http://localhost:9200 ")).DisableDirectStreaming();

}

public void SetConnectionSettingUrl(string url)

{

_connectionSettings = new ConnectionSettings(new Uri(url)).DisableDirectStreaming();

}

public ElasticClient GetClient()

{

if (_client == null)

{

_client = new ElasticClient(_connectionSettings);

}

return _client;

}

/// <summary>

/// -> POST http://localhost:9200/*dms*/_search?filter_path=aggregations.agg_date_histogram_timeStamp.buckets

/// {

///   "query": {

///     "bool": {

///       "must": [

///         {

///           "match": {

///             "systemID": "987654321098"

///           }

///         },

///         {

///           "match": {

///             "items.name.keyword": "DetectorTempL"

///           }

///         },

///         {

///           "range": {

///             "timeStamp": {

///               "gte": "now/M"

///             }

///           }

///         }

///       ]

///     }

///   },

///   "aggs": {

///     "agg_date_histogram_timeStamp": {

///       "date_histogram": {

///         "field": "timeStamp",

///         "interval": "1d",

///         "format": "yyyy-MM-dd HH:mm:ss"

///       },

///       "aggs": {

///         "agg_avg_items.value": {

///           "avg": {

///             "field": "items.value"

///           }

///         }

///       }

///     }

///   }

/// }

/// <- 200

/// {

///   "aggregations": {

///     "agg_date_histogram_timeStamp": {

///       "buckets": [

///         {

///           "key_as_string": "2018-05-30",

///           "key": 1527638400000,

///           "doc_count": 23,

///           "agg_avg_items.value": {

///             "value": 38.84304356229478

///           }

///         }

///       ]

///     }

///   }

/// }

/// 此方法提供特定设备的特定探测器在某个时间范围的每个interval的温度平均值

/// </summary>

/// <param name="systemID">查询的设备唯一标识,index中的systemID字段</param>

/// <param name="detector">查询的设备的探测器,有左、中、右。Index的items.name字段</param>

/// <param name="startDate">查询的时间范围的起始时间, 格式yyyyMMdd,参考es官方的Date Math</param>

/// <param name="endDate">查询的时间范围的终止时间, 格式yyyyMMdd,参考es官方的Date Math</param>

/// <param name="dateInterval">统计的时间间隔</param>

/// <param name="indices">查询的index name</param>

/// <returns>返回一个es自带的DateHistogramBucket的数组。DateHistogramBucket对象中包括分类的时间点及对应的值</returns>

public async Task<IReadOnlyCollection<DateHistogramBucket>> GetDetectorTemp(string systemID, DetectorEnum detector, string startDate, string endDate, DateInterval dateInterval, string indices = "*dms*")

{

var client = GetClient();

var filterPath = new string[] { "aggregations.*.buckets" };

var request = new SearchRequest(indices)

{

Query = new MatchQuery() { Field = "systemID", Query = systemID }

&& new MatchQuery() { Field = "items.name.keyword", Query = detector.ToString() }

&& new DateRangeQuery() { Field = "timeStamp", GreaterThanOrEqualTo = startDate, LessThanOrEqualTo = endDate, Format = "yyyyMMdd" },

Aggregations = new DateHistogramAggregation("agg_date_histogram_timeStamp")

{

Field = "timeStamp",

Interval = dateInterval,

Format = "yyyy-MM-dd HH:mm:ss",

Aggregations = new AverageAggregation("agg_avg_items.value", "items.value")

},

FilterPath = filterPath,

};

var response = await client.SearchAsync<CTDMS<CTDMSItem>>(request);

var dateHistogram = response.Aggregations.DateHistogram("agg_date_histogram_timeStamp");

return dateHistogram.Buckets;

}

/// <summary>

/// 此方法提供特定设备的特定探测器在某个时间范围的每个interval的温度平均值

/// </summary>

/// <param name="systemID">查询的设备唯一标识,index中的systemID字段</param>

/// <param name="detector">查询的设备的探测器,有左、中、右。Index的items.name字段</param>

/// <param name="dateRange">查询的时间范围,自定义的enum有当天、当月、当年、昨天等预设范围。如果需要一个自由的时间范围,请用另一个dateRange为string类型的同名函数</param>

/// <param name="dateInterval">统计的时间间隔</param>

/// <param name="indices">查询的index name</param>

/// <returns>返回一个字典,key为日期,value为该日期内的温度平均值。key主要根据interval来阶段递增变化</returns>

public async Task<Dictionary<DateTime, double?>> GetDetectorTemp(string systemID, DetectorEnum detector, DateRangeEnum dateRange, DateInterval dateInterval, string indices = "*dms*")

{

var buckets = await GetDetectorTemp(systemID, detector, DateRangeEnumExtensions.ToString(dateRange), "now", dateInterval, indices);

return buckets.ToDictionary(k => k.Date, v => v.Values.OfType<ValueAggregate>().First().Value);

}

/// <summary>

///-> GET http://localhost:9200/_alias?filter_path=-*kibana *,-*es*,-*monitoring*,metric*

///<- 200

///{

/// "metric_2018_05_30_ctdms`1": {

///   "aliases": {}

/// },

/// "metric_2018_05_30_ctaws`1": {

///   "aliases": {}

/// },

/// "metric_2018_05_30_ctscanstatistics`1": {

///   "aliases": {}

/// },

/// "metric_2018_05_30_ctdailystatistics`1": {

///   "aliases": {}

/// }

///}

///此方法用于获取当前es中的所有index name

/// </summary>

/// <param name="filterPath">指定需要过滤的或者显示的index,可以排除系统自带的一些index</param>

/// <returns>当前和业务相关的所有设备名称的数组</returns>

public async Task<List<string>> GetIndices(string[] filterPath = null)

{

var client = GetClient();

if (filterPath == null)

{

filterPath = new string[] { "-*kibana*", "-*es*", "-*monitoring*", "metric*" };

}

var request = new GetAliasRequest()

{

FilterPath = filterPath

};

var response = await client.GetAliasAsync(request);

return response.Indices.Keys.Select(e => e.Name).ToList();

}

/// <summary>

///  -> POST http://localhost:9200/*scanstatistics*/_search?filter_path=hits.hits._source.timeStamp

/// {

///   "size": 1,

///   "sort": [

///     {

///       "timeStamp": {

///         "order": "asc"

///       }

/// }

///   ],

///   "query": {

///     "match": {

///       "systemID": "987654321098"

///     }

///   }

/// }

/// <- 200

/// {

///   "hits": {

///     "hits": [

///       {

///         "_source": {

///           "timeStamp": "2018-05-30T09:48:26.705659+08:00"

///         }

///       }

///     ]

///   }

/// }

/// 返回特定数据类型的某个设备的最后一条日志的上传时间。用于显示设备的最后一次更新状态

/// </summary>

/// <param name="systemID">查询的设备唯一标识,index中的systemID字段</param>

/// <param name="indices">查询的index name</param>

/// <returns>最后一条日志的上传时间</returns>

public async Task<DateTime> GetLatestUploadTime<T1, T2>(string systemID, string indices = "*scanstatistics*")

where T1 : BaseMetrics<T2>

where T2 : BaseItem

{

var client = GetClient();

var filterPath = new string[] { "hits.hits._source.timeStamp" };

var request = new SearchRequest(indices)

{

Query = new MatchQuery() { Field = "systemID", Query = systemID },

Size = 1,

Sort = new List<ISort>

{

new SortField {Field = "timeStamp", Order = SortOrder.Descending}

},

FilterPath = filterPath

};

var response = await client.SearchAsync<T1>(request);

return response.Hits.First().Source.TimeStamp;

}

/// <summary>

/// 获取特定设备在某个时间范围内的扫描部位量的统计。

/// </summary>

/// <param name="systemID">查询的设备唯一标识,index中的systemID字段</param>

/// <param name="dateRange">查询的时间范围,自定义的enum有当天、当月、当年、昨天等预设范围。如果需要一个自由的时间范围,请用另一个dateRange为string类型的同名函数</param>

/// <param name="indices">查询的index name</param>

/// <returns>一个字典,key为扫描部位的名称, value为该扫描部位在查询的时间范围内的总数量之和。</returns>

public async Task<Dictionary<string, double?>> GetScanBodypartStatistics(string systemID, DateRangeEnum dateRange, string indices = "*scanstatistics*")

{

var buckets = await GetScanBodypartStatistics(systemID, DateRangeEnumExtensions.ToString(dateRange), "now", indices);

return buckets.ToDictionary(k => k.Key, v => v.Values.OfType<ValueAggregate>().First().Value);

}

/// <summary>

/// -> POST http://localhost:9200/*scanstatistics*/_search?filter_path=aggregations.agg_terms_items \.name\.keyword.buckets

/// {

///   "query": {

///     "bool": {

///       "must": [

///         {

///           "match": {

///             "systemID": "987654321098"

///           }

///         },

///         {

///           "range": {

///             "timeStamp": {

///               "gte": "now/M"

///             }

///           }

///         }

///       ]

///     }

///   },

///   "aggs": {

///     "agg_terms_items.name.keyword": {

///       "terms": {

///         "field": "items.name.keyword"

///       },

///       "aggs": {

///         "agg_sum_items.value": {

///           "sum": {

///             "field": "items.value"

///           }

///         }

///       }

///     }

///   }

/// }

/// <- 200

/// {

///   "aggregations": {

///     "agg_terms_items.name.keyword": {

///       "buckets": [

///         {

///           "key": "Upper Extremity",

///           "doc_count": 15,

///           "agg_sum_items.value": {

///             "value": 1247

///           }

///         },

///         {

///           "key": "Pelvis",

///           "doc_count": 13,

///           "agg_sum_items.value": {

///             "value": 888

///           }

///         },

///         {

///           "key": "Neck",

///           "doc_count": 11,

///           "agg_sum_items.value": {

///             "value": 939

///           }

///         },

///         {

///           "key": "Spine",

///           "doc_count": 11,

///           "agg_sum_items.value": {

///             "value": 797

///           }

///         },

///         {

///           "key": "Thorax",

///           "doc_count": 11,

///           "agg_sum_items.value": {

///             "value": 1016

///           }

///         },

///         {

///           "key": "Abdomen",

///           "doc_count": 9,

///           "agg_sum_items.value": {

///             "value": 732

///           }

///         },

///         {

///           "key": "Head",

///           "doc_count": 8,

///           "agg_sum_items.value": {

///             "value": 531

///           }

///         }

///       ]

///     }

///   }

/// }

/// 获取特定设备在某个时间范围内的扫描部位量的统计。

/// </summary>

/// <param name="systemID">查询的设备唯一标识,index中的systemID字段</param>

/// <param name="startDate">查询的时间范围的起始时间, 格式yyyyMMdd,参考es官方的Date Math</param>

/// <param name="endDate">查询的时间范围的终止时间, 格式yyyyMMdd,参考es官方的Date Math</param>

/// <param name="indices">查询的index name</param>

/// <returns>一个es自带的KeyedBucket的数组。KeyedBucket对象包括分类的name和value</returns>

public async Task<IReadOnlyCollection<KeyedBucket<string>>> GetScanBodypartStatistics(string systemID, string startDate, string endDate, string indices = "*scanstatistics*")

{

var client = GetClient();

var filterPath = new string[] { "aggregations.*.buckets" };

var request = new SearchRequest(indices)

{

Query = new MatchQuery() { Field = "systemID", Query = systemID }

&& new DateRangeQuery() { Field = "timeStamp", GreaterThanOrEqualTo = startDate, LessThanOrEqualTo = endDate, Format = "yyyyMMdd" },

Aggregations = new TermsAggregation("terms")

{

Field = "items.name.keyword",

Aggregations = new SumAggregation("sum", "items.value")

},

FilterPath = filterPath

};

var response = await client.SearchAsync<CTScanStatistics<CTScanStatisticsItem>>(request);

var agg = response.Aggregations.Terms("terms");

return agg.Buckets;

}

/// <summary>

/// 查询某个时间范围内的每个interval的患者扫描量

/// </summary>

/// <param name="systemID">查询的设备唯一标识,index中的systemID字段</param>

/// <param name="dateRange">查询的时间范围,自定义的enum有当天、当月、当年、昨天等预设范围。如果需要一个自由的时间范围,请用另一个dateRange为string类型的同名函数</param>

/// <param name="dateInterval">统计的时间间隔</param>

/// <param name="indices">查询的index name</param>

/// <returns>返回一个字典,key为日期,value为该日期内的患者扫描量总和。key主要根据interval来阶段递增变化</returns>

public async Task<Dictionary<DateTime, double?>> GetScanPatientStatistics(string systemID, DateRangeEnum dateRange, DateInterval dateInterval, string indices = "*scanstatistics*")

{

var buckets = await GetScanPatientStatistics(systemID, DateRangeEnumExtensions.ToString(dateRange), "now", dateInterval, indices);

return buckets.ToDictionary(k => k.Date, v => v.Values.OfType<ValueAggregate>().First().Value);

}

/// <summary>

/// -> POST /*scanstatistics*/_search?typed_keys=true&filter_path=aggregations.*.buckets

///{

/// "query": {

///   "bool": {

///     "must": [

///       {

///         "match": {

///           "systemID": {

///             "query": "987654321098"

///           }

///         }

///       },

///       {

///         "range": {

///           "timeStamp": {

///             "gte": "now/y"

///           }

///         }

///       }

///     ]

///   }

/// },

/// "aggs": {

///   "agg_date_histogram_timeStamp": {

///     "date_histogram": {

///       "field": "timeStamp",

///       "interval": "month",

///       "format": "yyyy-MM-dd HH:mm:ss"

///     },

///     "aggs": {

///       "agg_count_systemID": {

///         "value_count": {

///           "field": "systemID.keyword"

///         }

///       }

///     }

///   }

/// }

///}

/// <- 200

/// {

///  "aggregations": {

///   "date_histogram#agg_date_histogram_timeStamp": {

///     "buckets": [

///       {

///         "key_as_string": "2018-05-01 00:00:00",

///         "key": 1525132800000,

///         "doc_count": 30,

///         "value_count#agg_count_systemID": {

///           "value": 30

///         }

///      },

///       {

///         "key_as_string": "2018-06-01 00:00:00",

///         "key": 1527811200000,

///         "doc_count": 30,

///         "value_count#agg_count_systemID": {

///           "value": 30

///         }

///       }

///     ]

///   }

/// }

///}

/// 查询某个时间范围内的每个interval的患者扫描量

/// </summary>

/// <param name="systemID">查询的设备唯一标识,index中的systemID字段</param>

/// <param name="startDate">查询的时间范围的起始时间, 格式yyyyMMdd,参考es官方的Date Math</param>

/// <param name="endDate">查询的时间范围的终止时间, 格式yyyyMMdd,参考es官方的Date Math</param>

/// <param name="dateInterval">统计的时间间隔</param>

/// <param name="indices">查询的index name</param>

/// <returns>返回一个es自带的DateHistogramBucket的数组。DateHistogramBucket对象中包括分类的时间点及对应的值</returns>

public async Task<IReadOnlyCollection<DateHistogramBucket>> GetScanPatientStatistics(string systemID, string startDate, string endDate, DateInterval dateInterval, string indices = "*scanstatistics*")

{

var client = GetClient();

var filterPath = new string[] { "aggregations.*.buckets" };

var request = new SearchRequest(indices)

{

Query = new MatchQuery() { Field = "systemID", Query = systemID }

&& new DateRangeQuery() { Field = "timeStamp", GreaterThanOrEqualTo = startDate, LessThanOrEqualTo = endDate, Format = "yyyyMMdd" },

Aggregations = new DateHistogramAggregation("agg_date_histogram_timeStamp")

{

Field = "timeStamp",

Interval = dateInterval,

Format = "yyyy-MM-dd HH:mm:ss",

Aggregations = new ValueCountAggregation("agg_count_systemID", "systemID.keyword")

},

FilterPath = filterPath

};

var response = await client.SearchAsync<CTDMS<CTDMSItem>>(request);

var dateHistogram = response.Aggregations.DateHistogram("agg_date_histogram_timeStamp");

return dateHistogram.Buckets;

}

/// <summary>

/// 获取某个设备在特定时间范围内的患者扫描量。

/// 每条日志就是该患者在当时扫描时记录的log,因此统计扫描log的数量即可得到扫描患者的数量

/// </summary>

/// <param name="systemID">查询的设备唯一标识,index中的systemID</param>

/// <param name="dateRange">查询的时间范围,自定义的enum有当天、当月、当年、昨天等预设范围。如果需要一个自由的时间范围,请用另一个dateRange为string类型的同名函数</param>

/// <param name="indices">查询的index name</param>

/// <returns>患者扫描的总和</returns>

public async Task<long> GetTotalScanPatientCount(string systemID, DateRangeEnum dateRange, string indices = "*scanstatistics*")

{

return await GetTotalScanPatientCount(systemID, DateRangeEnumExtensions.ToString(dateRange), "now", indices);

}

/// <summary>

///  -> POST http://localhost:9200/*scanstatistics*/_count

///  {

///   "query": {

///     "bool": {

///       "must": [

///         {

///           "match": {

///             "systemID": "987654321098"

///           }

///         },

///         {

///           "range": {

///             "timeStamp": {

///               "gte": "now/d"

///             }

///           }

///         }

///       ]

///     }

///   }

/// }

/// <- 200

/// {

///   "count": 30,

///   "_shards": {

///     "total": 15,

///     "successful": 15,

///     "skipped": 0,

///     "failed": 0

///   }

/// }

/// 获取某个设备在特定时间范围内的患者扫描量。

/// 每条日志就是该患者在当时扫描时记录的log,因此统计扫描log的数量即可得到扫描患者的数量

/// </summary>

/// <param name="systemID">查询的设备唯一标识,index中的systemID</param>

/// <param name="startDate">查询的时间范围的起始时间, 格式yyyyMMdd,参考es官方的Date Math</param>

/// <param name="endDate">查询的时间范围的终止时间, 格式yyyyMMdd,参考es官方的Date Math</param>

/// <param name="indices">查询的index name</param>

/// <returns>患者扫描的总和</returns>

public async Task<long> GetTotalScanPatientCount(string systemID, string startDate, string endDate, string indices = "*scanstatistics*")

{

var client = GetClient();

var request = new CountRequest(indices)

{

Query = new MatchQuery() { Field = "systemID", Query = systemID }

&& new DateRangeQuery() { Field = "timeStamp", GreaterThanOrEqualTo = startDate, LessThanOrEqualTo = endDate, Format = "yyyyMMdd" }

};

var response = await client.CountAsync<CTScanStatistics<CTScanStatisticsItem>>(request);

return response.Count;

}

/// <summary>

/// 判断当前ES的服务是否有效。建议在执行每一个ES的操作前都应该先调用此方法。

/// </summary>

/// <returns>true,服务运行正常; false,服务运行异常</returns>

public async Task<bool> IsAvailable()

{

var client = GetClient();

var request = new PingRequest()

{

ErrorTrace = true

};

var response = await client.PingAsync(request);

return response.IsValid;

}

}

}


以上所述就是小编给大家介绍的《es的C# api 的封装和使用说明》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Charlotte's Web

Charlotte's Web

E. B. White / Puffin Classics / 2010-6-3 / GBP 6.99

This is the story of a little girl named Fern who loved a little pig named Wilbur and of Wilbur's dear friend, Charlotte A. Cavatica, a beautiful large grey spider. With the unlikely help of Templeton......一起来看看 《Charlotte's Web》 这本书的介绍吧!

JSON 在线解析
JSON 在线解析

在线 JSON 格式化工具

SHA 加密
SHA 加密

SHA 加密工具

XML 在线格式化
XML 在线格式化

在线 XML 格式化压缩工具