记录一下:
数据结构如下:
public class ESUserTransaction
{
public long AccountId { get; set; }
public string VarietyName { get; set; }
public int OrganizationAgentId { get; set; }
public int OrganizationMemberId { get; set; }
public int OrganizationChannelId { get; set; }
public int OrganizationMinorMakerId { get; set; }
public int OrganizationMasterMakerId { get; set; }
public List<EsOrder> Order { get; set; } = new List<EsOrder>();
public DateTime CreateTime { get; set; }
public DateTime LastTime { get; set; }
}
public class EsOrder
{
public long Id { get; set; }
public string OrderCode { get; set; }
public int TradeHand { get; set; }
public decimal Amount { get; set; }
public int OrderType { get; set; }
public int OrderStatus { get; set; }
public DateTime? CreateTime { get; set; }
}
然后开始操作:
建索引 :
public async Task
{
try
{
List
var esClient = _esClientProvider.GetClient();
//todo: 处理数据
esClient.CreateIndex(indexKey, p => p.Mappings(ms =>
ms.Map<ESUserTransaction>(m => m.AutoMap().Properties(ps => ps.Nested<EsOrder>(n => n.Name(c => c.Order))))));
return \_esClientProvider.BulkAll<ESUserTransaction>(esClient, indexKey, userList);
}
catch (Exception ex)
{
Logger.Error("---DigitalCurrencyExchange.ElasticsearchService.CronJob.DoAllSummary---DoAllSummary---出错", ex);
return false;
}
}
public class EsClientProvider : IEsClientProvider
{public ElasticClient GetClient()
{
if (_esclient != null)
return _esclient;
InitClient();
return _esclient;
}
private void InitClient()
{
var node = new Uri(\_configuration\["EsUrl"\]);
\_esclient = new ElasticClient(new ConnectionSettings(node));
}
public bool BulkAll<T>(IElasticClient elasticClient, IndexName indexName, IEnumerable<T> list) where T : class
{
const int size = ;
var tokenSource = new CancellationTokenSource();
var observableBulk = elasticClient.BulkAll(list, f => f
.MaxDegreeOfParallelism()
.BackOffTime(TimeSpan.FromSeconds())
.BackOffRetries()
.Size(size)
.RefreshOnCompleted()
.Index(indexName)
.BufferToBulk((r, buffer) => r.IndexMany(buffer))
, tokenSource.Token);
var countdownEvent = new CountdownEvent();
Exception exception = null;
void OnCompleted()
{
Logger.Error("BulkAll Finished");
countdownEvent.Signal();
}
var bulkAllObserver = new BulkAllObserver(
onNext: response =>
{
Logger.Error($"Indexed {response.Page \* size} with {response.Retries} retries");
},
onError: ex =>
{
Logger.Error("BulkAll Error : {0}", ex);
exception = ex;
countdownEvent.Signal();
});
observableBulk.Subscribe(bulkAllObserver);
countdownEvent.Wait(tokenSource.Token);
if (exception != null)
{
Logger.Error("BulkHotelGeo Error : {0}", exception);
return false;
}
else
{
return true;
}
}
}
public interface IEsClientProvider
{
ElasticClient GetClient();
bool BulkAll
}
查询数据:
//单个字段查询
var result = await esClient.SearchAsync<ESUserTransaction>(d => d.Index(indexKey).Type(typeof(ESUserTransaction)).Query(q => q.Term(t => t.AccountId, dto.AccountId)));
//多个字段查询
var result = esClient.Search<ESUserTransaction>(s => s.Index(indexKey).Query(q => q.Bool(b => b
.Must(bm => bm.Term(tm => tm.AccountId, dto.AccountId),
bm => bm.Term(tm => tm.CreateTime, dto.endDate))
)));
//多条件查询
var esClient = this.\_esClientProvider.GetClient();
SearchRequest sr = new SearchRequest(indexKey, typeof(ESUserTransactions));
BoolQuery bq = new BoolQuery();
List<QueryContainer> list = new List<QueryContainer>();
#region accountId
if (dto.AccountId.HasValue)
{
list.Add(new TermQuery()
{
Field = "accountId",
Value = dto.AccountId
});
}
#endregion
#region VarietyName
if (!string.IsNullOrWhiteSpace(dto.VarietyName))
{
list.Add(new TermQuery()
{
Field = "varietyName",
Value = dto.VarietyName
});
}
#endregion
#region DateTime
if (!dto.startDate.HasValue)
dto.startDate = new DateTime(DateTime.Now.Year, DateTime.Now.Month, DateTime.Now.Day);
list.Add(new DateRangeQuery
{
Field = "createTime",
GreaterThanOrEqualTo = dto.startDate,
});
if (!dto.endDate.HasValue)
dto.endDate = new DateTime(DateTime.Now.Year, DateTime.Now.Month, DateTime.Now.Day).AddMonths();
list.Add(new DateRangeQuery
{
Field = "createTime",
LessThan = dto.endDate
});
#endregion
#region OrderStatus
//list.Add(new TermQuery()
//{
// Field = "orderStatus",
// Value = dto.VarietyName
//});
#endregion
bq.Must = list.ToArray();
sr.Query = bq;
sr.Size = ;
var result = await esClient.SearchAsync<ESUserTransactions>(sr);
聚合:
var esClient = this._esClientProvider.GetClient();
var result = await esClient.SearchAsync<ESUserTransaction>(d => d.Index(indexKey).Type(typeof(ESUserTransaction)).Size().
Aggregations(a => a.Terms("getorderlist", st => st.Field(f => f.AccountId).
Aggregations(na => na.Nested("order", nat => nat.Path(aa => aa.Order).Aggregations(agg => agg
.Sum("sum\_order\_amount", m => m.Field(o => o.Order.Sum(oc => oc.Amount)))
.ValueCount("count\_statu\_amountid", m => m.Field(md => md.Order.FirstOrDefault().Id))
))))));
聚合2:
sr.Aggregations = new AggregationDictionary()
{
{
"order",new FilterAggregation("order")
{
Filter = new TermQuery { Field = Nest.Infer.Field
Aggregations =
new TermsAggregation("groupby_amountid")
{
Field=Nest.Infer.Field
Aggregations = new AggregationDictionary
{
{"es_count",new ValueCountAggregation("es_count", Nest.Infer.Field
{"es_amount", new SumAggregation("es_amount", Nest.Infer.Field
{"es_tradehand", new SumAggregation("es_tradehand", Nest.Infer.Field
},
}
}
},
{
"position",new FilterAggregation("position")
{
Filter = new TermQuery { Field = Nest.Infer.Field
Aggregations =
new TermsAggregation("groupby_amountid")
{
Field=Nest.Infer.Field
Aggregations = new AggregationDictionary
{
{"es_count",new ValueCountAggregation("es_count", Nest.Infer.Field
{"es_amount", new SumAggregation("es_amount", Nest.Infer.Field
{"es_tradehand", new SumAggregation("es_tradehand", Nest.Infer.Field
},
}
}
}
};
更新 多层数据结构
var esClient = this._esClientProvider.GetClient();
var resultData = await esClient.SearchAsync
.Must(bm => bm.Term(tm => tm.AccountId, ),
bm => bm.Term(tm => tm.CreateTime, t)))));
var id = resultData.Hits.First().Id;
var nmodel = resultData.Hits.FirstOrDefault().Source;
nmodel.Order.Add(new EsOrder() { Id = , Amount = , OrderType = , OrderStatus = });
esClient.CreateIndex(indexKey, p => p.Mappings(ms =>
ms.Map
var response =await esClient.UpdateAsync
删除多层数据结构
var esClient = this._esClientProvider.GetClient();
var resultData = await esClient.SearchAsync
.Must(bm => bm.Term(tm => tm.AccountId, ),
bm => bm.Term(tm => tm.CreateTime, t)))));
var id = resultData.Hits.First().Id;
var nmodel = resultData.Hits.FirstOrDefault().Source;
var removemodel = nmodel.Order.Where(d => d.Id == ).FirstOrDefault();
nmodel.Order.Remove(removemodel);
esClient.CreateIndex(indexKey, p => p.Mappings(ms =>
ms.Map
var response = await esClient.UpdateAsync
删除索引
var esClient = this._esClientProvider.GetClient();
var response = await esClient.DeleteIndexAsync(indexKey, d => d.Index(indexKey));
手机扫一扫
移动阅读更方便
你可能感兴趣的文章