.net core Elasticsearch 查询更新
阅读原文时间:2023年07月12日阅读:1

记录一下:

数据结构如下:

public class ESUserTransaction
{

    public long AccountId { get; set; }

    public string VarietyName { get; set; }

    public int OrganizationAgentId { get; set; }

    public int OrganizationMemberId { get; set; }

    public int OrganizationChannelId { get; set; }

    public int OrganizationMinorMakerId { get; set; }

    public int OrganizationMasterMakerId { get; set; }

    public List<EsOrder> Order { get; set; } = new List<EsOrder>();

    public DateTime CreateTime { get; set; }

    public DateTime LastTime { get; set; }  

}

public class EsOrder
{

    public long Id { get; set; }

    public string OrderCode { get; set; }

    public int TradeHand { get; set; }

    public decimal Amount { get; set; }

    public int OrderType { get; set; }

    public int OrderStatus { get; set; }

    public DateTime? CreateTime { get; set; }  

}

然后开始操作:

建索引 :

public async Task DoAllSummary()
{
try
{
List userList = new List();
var esClient = _esClientProvider.GetClient();

            //todo: 处理数据                

            esClient.CreateIndex(indexKey, p => p.Mappings(ms =>  
               ms.Map<ESUserTransaction>(m => m.AutoMap().Properties(ps => ps.Nested<EsOrder>(n => n.Name(c => c.Order))))));  
            return \_esClientProvider.BulkAll<ESUserTransaction>(esClient, indexKey, userList);  
        }  
        catch (Exception ex)  
        {  
            Logger.Error("---DigitalCurrencyExchange.ElasticsearchService.CronJob.DoAllSummary---DoAllSummary---出错", ex);  
            return false;  
        }  
    }

public class EsClientProvider : IEsClientProvider
{public ElasticClient GetClient()
{
if (_esclient != null)
return _esclient;
InitClient();
return _esclient;
}

    private void InitClient()  
    {  
        var node = new Uri(\_configuration\["EsUrl"\]);  
        \_esclient = new ElasticClient(new ConnectionSettings(node));  
    }

    public bool BulkAll<T>(IElasticClient elasticClient, IndexName indexName, IEnumerable<T> list) where T : class  
    {  
        const int size = ;  
        var tokenSource = new CancellationTokenSource();

        var observableBulk = elasticClient.BulkAll(list, f => f  
                .MaxDegreeOfParallelism()  
                .BackOffTime(TimeSpan.FromSeconds())  
                .BackOffRetries()  
                .Size(size)  
                .RefreshOnCompleted()  
                .Index(indexName)  
                .BufferToBulk((r, buffer) => r.IndexMany(buffer))  
            , tokenSource.Token);

        var countdownEvent = new CountdownEvent();

        Exception exception = null;

        void OnCompleted()  
        {  
            Logger.Error("BulkAll Finished");  
            countdownEvent.Signal();  
        }

        var bulkAllObserver = new BulkAllObserver(  
            onNext: response =>  
            {  
                Logger.Error($"Indexed {response.Page \* size} with {response.Retries} retries");  
            },  
            onError: ex =>  
            {  
                Logger.Error("BulkAll Error : {0}", ex);  
                exception = ex;  
                countdownEvent.Signal();  
            });

        observableBulk.Subscribe(bulkAllObserver);

        countdownEvent.Wait(tokenSource.Token);

        if (exception != null)  
        {  
            Logger.Error("BulkHotelGeo Error : {0}", exception);  
            return false;  
        }  
        else  
        {  
            return true;  
        }  
    }

}

public interface IEsClientProvider
{
ElasticClient GetClient();

bool BulkAll(IElasticClient elasticClient, IndexName indexName, IEnumerable list) where T : class;

}

查询数据:

    //单个字段查询  
        var result = await esClient.SearchAsync<ESUserTransaction>(d => d.Index(indexKey).Type(typeof(ESUserTransaction)).Query(q => q.Term(t => t.AccountId, dto.AccountId)));  
        //多个字段查询  
        var result = esClient.Search<ESUserTransaction>(s => s.Index(indexKey).Query(q => q.Bool(b => b  
                                                                                              .Must(bm => bm.Term(tm => tm.AccountId, dto.AccountId),  
                                                                                                    bm => bm.Term(tm => tm.CreateTime, dto.endDate))  
                                                                                                    )));

//多条件查询

        var esClient = this.\_esClientProvider.GetClient();  
        SearchRequest sr = new SearchRequest(indexKey, typeof(ESUserTransactions));  
        BoolQuery bq = new BoolQuery();  
        List<QueryContainer> list = new List<QueryContainer>();

        #region accountId  
        if (dto.AccountId.HasValue)  
        {  
            list.Add(new TermQuery()  
            {  
                Field = "accountId",  
                Value = dto.AccountId  
            });  
        }  
        #endregion

        #region VarietyName  
        if (!string.IsNullOrWhiteSpace(dto.VarietyName))  
        {  
            list.Add(new TermQuery()  
            {  
                Field = "varietyName",  
                Value = dto.VarietyName  
            });  
        }  
        #endregion

        #region DateTime  
        if (!dto.startDate.HasValue)  
            dto.startDate = new DateTime(DateTime.Now.Year, DateTime.Now.Month, DateTime.Now.Day);  
        list.Add(new DateRangeQuery  
        {  
            Field = "createTime",  
            GreaterThanOrEqualTo = dto.startDate,  
        });

        if (!dto.endDate.HasValue)  
            dto.endDate = new DateTime(DateTime.Now.Year, DateTime.Now.Month, DateTime.Now.Day).AddMonths();  
        list.Add(new DateRangeQuery  
        {  
            Field = "createTime",  
            LessThan = dto.endDate  
        });  
        #endregion

        #region OrderStatus  
        //list.Add(new TermQuery()  
        //{  
        //    Field = "orderStatus",  
        //    Value = dto.VarietyName  
        //});  
        #endregion

        bq.Must = list.ToArray();  
        sr.Query = bq;  
        sr.Size = ;

        var result = await esClient.SearchAsync<ESUserTransactions>(sr);

聚合:
var esClient = this._esClientProvider.GetClient();

        var result = await esClient.SearchAsync<ESUserTransaction>(d => d.Index(indexKey).Type(typeof(ESUserTransaction)).Size().  
                   Aggregations(a => a.Terms("getorderlist", st => st.Field(f => f.AccountId).  
                      Aggregations(na => na.Nested("order", nat => nat.Path(aa => aa.Order).Aggregations(agg => agg  
                                                .Sum("sum\_order\_amount", m => m.Field(o => o.Order.Sum(oc => oc.Amount)))  
                                                .ValueCount("count\_statu\_amountid", m => m.Field(md => md.Order.FirstOrDefault().Id))  
                                      ))))));

聚合2:
sr.Aggregations = new AggregationDictionary()
{
{
"order",new FilterAggregation("order")
{
Filter = new TermQuery { Field = Nest.Infer.Field(p => p.OrderType), Value = },
Aggregations =
new TermsAggregation("groupby_amountid")
{
Field=Nest.Infer.Field(p => p.AccountId),
Aggregations = new AggregationDictionary
{
{"es_count",new ValueCountAggregation("es_count", Nest.Infer.Field(p => p.OrderId))},
{"es_amount", new SumAggregation("es_amount", Nest.Infer.Field(p =>p.Amount)) },
{"es_tradehand", new SumAggregation("es_tradehand", Nest.Infer.Field(p =>p.TradeHand)) },
},
}
}
},
{
"position",new FilterAggregation("position")
{
Filter = new TermQuery { Field = Nest.Infer.Field(p => p.OrderType),Value = },
Aggregations =
new TermsAggregation("groupby_amountid")
{
Field=Nest.Infer.Field(p => p.AccountId),
Aggregations = new AggregationDictionary
{
{"es_count",new ValueCountAggregation("es_count", Nest.Infer.Field(p => p.OrderId))},
{"es_amount", new SumAggregation("es_amount", Nest.Infer.Field(p =>p.Amount)) },
{"es_tradehand", new SumAggregation("es_tradehand", Nest.Infer.Field(p =>p.TradeHand)) },
},
}
}
}
};

更新 多层数据结构

var esClient = this._esClientProvider.GetClient();
var resultData = await esClient.SearchAsync(s => s.Index(indexKey).Query(q => q.Bool(b => b
.Must(bm => bm.Term(tm => tm.AccountId, ),
bm => bm.Term(tm => tm.CreateTime, t)))));
var id = resultData.Hits.First().Id;
var nmodel = resultData.Hits.FirstOrDefault().Source;
nmodel.Order.Add(new EsOrder() { Id = , Amount = , OrderType = , OrderStatus = });
esClient.CreateIndex(indexKey, p => p.Mappings(ms =>
ms.Map(m => m.AutoMap().Properties(ps => ps.Nested(n => n.Name(c => c.Order))))));
var response =await esClient.UpdateAsync(id, i => i.Index(indexKey).Type(typeof(ESUserTransaction)).Doc(nmodel));

删除多层数据结构

var esClient = this._esClientProvider.GetClient();
var resultData = await esClient.SearchAsync(s => s.Index(indexKey).Query(q => q.Bool(b => b
.Must(bm => bm.Term(tm => tm.AccountId, ),
bm => bm.Term(tm => tm.CreateTime, t)))));
var id = resultData.Hits.First().Id;
var nmodel = resultData.Hits.FirstOrDefault().Source;
var removemodel = nmodel.Order.Where(d => d.Id == ).FirstOrDefault();
nmodel.Order.Remove(removemodel);
esClient.CreateIndex(indexKey, p => p.Mappings(ms =>
ms.Map(m => m.AutoMap().Properties(ps => ps.Nested(n => n.Name(c => c.Order))))));
var response = await esClient.UpdateAsync(id, i => i.Index(indexKey).Type(typeof(ESUserTransaction)).Doc(nmodel));

删除索引

var esClient = this._esClientProvider.GetClient();
var response = await esClient.DeleteIndexAsync(indexKey, d => d.Index(indexKey));