package mdbc import ( "context" "encoding/json" "errors" "fmt" "reflect" "time" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" ) type DistinctScope struct { scope *Scope cw *ctxWrap err error fieldName string filter interface{} opts *options.DistinctOptions result []interface{} enableCache bool cacheFunc DistinctCacheFunc cacheKey string } type DistinctCacheFunc func(field string, obj interface{}) (*CacheObject, error) // DefaultDistinctCacheFunc 默认的缓存方法 // key: 缓存对象的key // obj: 缓存对象 var DefaultDistinctCacheFunc = func() DistinctCacheFunc { return func(key string, obj interface{}) (co *CacheObject, err error) { // 建议先断言obj 再操作 若obj断言失败 请返回nil 这样缓存将不会执行 // 下面使用反射进行操作 保证兼容 v := reflect.ValueOf(obj) if v.Type().Kind() != reflect.Ptr { return nil, fmt.Errorf("invalid list type, not ptr") } v = v.Elem() if v.Type().Kind() != reflect.Slice { return nil, fmt.Errorf("invalid list type, not ptr to slice") } // 空slice 无需缓存 if v.Len() == 0 { return nil, nil } b, _ := json.Marshal(obj) co = &CacheObject{ Key: key, Value: string(b), } return co, nil } } // SetContext 设置上下文 func (ds *DistinctScope) SetContext(ctx context.Context) *DistinctScope { if ds.cw == nil { ds.cw = &ctxWrap{} } ds.cw.ctx = ctx return ds } func (ds *DistinctScope) getContext() context.Context { return ds.cw.ctx } func (ds *DistinctScope) doClear() { if ds.cw != nil && ds.cw.cancel != nil { ds.cw.cancel() } ds.scope.execT = 0 ds.scope.debug = false } // SetUpdateOption 设置更新选项 func (ds *DistinctScope) SetUpdateOption(opts options.DistinctOptions) *DistinctScope { ds.opts = &opts return ds } // SetFilter 设置过滤条件 func (ds *DistinctScope) SetFilter(filter interface{}) *DistinctScope { if filter == nil { ds.filter = bson.M{} return ds } v := reflect.ValueOf(filter) if v.Kind() == reflect.Ptr || v.Kind() == reflect.Map || v.Kind() == reflect.Slice { if v.IsNil() { ds.filter = bson.M{} } } ds.filter = filter return ds } // SetFieldName 设置字段名 func (ds *DistinctScope) SetFieldName(name string) *DistinctScope { ds.fieldName = name return ds } func (ds *DistinctScope) optionAssembled() { // 配置项被直接调用重写过 if ds.opts != nil { return } ds.opts = new(options.DistinctOptions) } // SetCacheFunc 传递一个函数 处理查询操作的结果进行缓存 func (ds *DistinctScope) SetCacheFunc(key string, cb DistinctCacheFunc) *DistinctScope { ds.enableCache = true ds.cacheFunc = cb ds.cacheKey = key return ds } func (ds *DistinctScope) preCheck() { if ds.filter == nil { ds.filter = bson.M{} } var breakerTTL time.Duration if ds.scope.breaker == nil { breakerTTL = defaultBreakerTime } else if ds.scope.breaker.ttl == 0 { breakerTTL = defaultBreakerTime } else { breakerTTL = ds.scope.breaker.ttl } if ds.cw == nil { ds.cw = &ctxWrap{} } if ds.cw.ctx == nil { ds.cw.ctx, ds.cw.cancel = context.WithTimeout(context.Background(), breakerTTL) } } func (ds *DistinctScope) assertErr() { if ds.err == nil { return } if errors.Is(ds.err, context.DeadlineExceeded) { ds.err = &ErrRequestBroken return } err, ok := ds.err.(mongo.CommandError) if !ok { return } if err.HasErrorMessage(context.DeadlineExceeded.Error()) { ds.err = &ErrRequestBroken return } } // debug 判断是否开启debug,开启的话就打印 func (ds *DistinctScope) debug() { if !ds.scope.debug { return } debugger := &Debugger{ collection: ds.scope.tableName, execT: ds.scope.execT, action: ds, } // 当错误时优先输出 if ds.scope.debugWhenError { if ds.err != nil { debugger.errMsg = ds.err.Error() debugger.ErrorString() } return } // 所有bug输出 if ds.scope.debug { debugger.String() } } func (ds *DistinctScope) doString() string { builder := RegisterTimestampCodec(nil).Build() filter, _ := bson.MarshalExtJSONWithRegistry(builder, ds.filter, true, true) return fmt.Sprintf(`distinct("%s",%s)`, ds.fieldName, string(filter)) } func (ds *DistinctScope) doGet() { defer ds.assertErr() var starTime time.Time if ds.scope.debug { starTime = time.Now() } ds.result, ds.err = db.Collection(ds.scope.tableName).Distinct(ds.getContext(), ds.fieldName, ds.filter, ds.opts) if ds.scope.debug { ds.scope.execT = time.Since(starTime) ds.debug() } } // doCache 执行缓存 // 检测句柄存不存在 // 从cacheFunc中获取cacheObj // 判断下数据没问题以及没有错误就进行缓存 func (ds *DistinctScope) doCache(obj interface{}) *DistinctScope { // redis句柄不存在 if ds.scope.cache == nil { return nil } cacheObj, err := ds.cacheFunc(ds.cacheKey, obj) if err != nil { ds.err = err return ds } if cacheObj == nil { ds.err = fmt.Errorf("cache object nil") return ds } ttl := ds.scope.cache.ttl if ttl == 0 { ttl = time.Hour } else if ttl == -1 { ttl = 0 } if ds.getContext().Err() != nil { ds.err = ds.getContext().Err() ds.assertErr() return ds } ds.err = ds.scope.cache.client.Set(ds.getContext(), cacheObj.Key, cacheObj.Value, ttl).Err() return ds } // Get 获取结果 // list: 必须 *[]string 或者 *[]struct func (ds *DistinctScope) Get(list interface{}) error { defer ds.doClear() ds.optionAssembled() ds.preCheck() if ds.fieldName == "" { return fmt.Errorf("field name empty") } ds.doGet() if ds.err != nil { return ds.err } vo := reflect.ValueOf(list) if vo.Kind() != reflect.Ptr { return fmt.Errorf("arg not ptr") } vo = vo.Elem() if vo.Kind() != reflect.Slice { return fmt.Errorf("arg not ptr to slice") } vot := vo.Type() if vot.Kind() != reflect.Slice { return fmt.Errorf("arg not ptr to slice") } vot = vot.Elem() if vot.Kind() != reflect.String && vot.Kind() != reflect.Struct { return fmt.Errorf("slice subtype must string or struct but get %+v", vot.Kind()) } for _, obj := range ds.result { vo = reflect.Append(vo, reflect.ValueOf(obj)) } rtv := reflect.ValueOf(list) rtv.Elem().Set(vo) if ds.enableCache { ds.doCache(list) } return nil }