mdbc/distinct_scope.go
2022-02-23 16:59:45 +08:00

306 lines
6.2 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package mdbc
import (
"context"
"encoding/json"
"errors"
"fmt"
"reflect"
"time"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)
type DistinctScope struct {
scope *Scope
cw *ctxWrap
err error
fieldName string
filter interface{}
opts *options.DistinctOptions
result []interface{}
enableCache bool
cacheFunc DistinctCacheFunc
cacheKey string
}
type DistinctCacheFunc func(field string, obj interface{}) (*CacheObject, error)
// DefaultDistinctCacheFunc 默认的缓存方法
// key: 缓存对象的key
// obj: 缓存对象
var DefaultDistinctCacheFunc = func() DistinctCacheFunc {
return func(key string, obj interface{}) (co *CacheObject, err error) {
// 建议先断言obj 再操作 若obj断言失败 请返回nil 这样缓存将不会执行
// 下面使用反射进行操作 保证兼容
v := reflect.ValueOf(obj)
if v.Type().Kind() != reflect.Ptr {
return nil, fmt.Errorf("invalid list type, not ptr")
}
v = v.Elem()
if v.Type().Kind() != reflect.Slice {
return nil, fmt.Errorf("invalid list type, not ptr to slice")
}
// 空slice 无需缓存
if v.Len() == 0 {
return nil, nil
}
b, _ := json.Marshal(obj)
co = &CacheObject{
Key: key,
Value: string(b),
}
return co, nil
}
}
// SetContext 设置上下文
func (ds *DistinctScope) SetContext(ctx context.Context) *DistinctScope {
if ds.cw == nil {
ds.cw = &ctxWrap{}
}
ds.cw.ctx = ctx
return ds
}
func (ds *DistinctScope) getContext() context.Context {
return ds.cw.ctx
}
func (ds *DistinctScope) doClear() {
if ds.cw != nil && ds.cw.cancel != nil {
ds.cw.cancel()
}
ds.scope.execT = 0
ds.scope.debug = false
}
// SetUpdateOption 设置更新选项
func (ds *DistinctScope) SetUpdateOption(opts options.DistinctOptions) *DistinctScope {
ds.opts = &opts
return ds
}
// SetFilter 设置过滤条件
func (ds *DistinctScope) SetFilter(filter interface{}) *DistinctScope {
if filter == nil {
ds.filter = bson.M{}
return ds
}
v := reflect.ValueOf(filter)
if v.Kind() == reflect.Ptr || v.Kind() == reflect.Map || v.Kind() == reflect.Slice {
if v.IsNil() {
ds.filter = bson.M{}
}
}
ds.filter = filter
return ds
}
// SetFieldName 设置字段名
func (ds *DistinctScope) SetFieldName(name string) *DistinctScope {
ds.fieldName = name
return ds
}
func (ds *DistinctScope) optionAssembled() {
// 配置项被直接调用重写过
if ds.opts != nil {
return
}
ds.opts = new(options.DistinctOptions)
}
// SetCacheFunc 传递一个函数 处理查询操作的结果进行缓存
func (ds *DistinctScope) SetCacheFunc(key string, cb DistinctCacheFunc) *DistinctScope {
ds.enableCache = true
ds.cacheFunc = cb
ds.cacheKey = key
return ds
}
func (ds *DistinctScope) preCheck() {
if ds.filter == nil {
ds.filter = bson.M{}
}
var breakerTTL time.Duration
if ds.scope.breaker == nil {
breakerTTL = defaultBreakerTime
} else if ds.scope.breaker.ttl == 0 {
breakerTTL = defaultBreakerTime
} else {
breakerTTL = ds.scope.breaker.ttl
}
if ds.cw == nil {
ds.cw = &ctxWrap{}
}
if ds.cw.ctx == nil {
ds.cw.ctx, ds.cw.cancel = context.WithTimeout(context.Background(), breakerTTL)
}
}
func (ds *DistinctScope) assertErr() {
if ds.err == nil {
return
}
if errors.Is(ds.err, context.DeadlineExceeded) {
ds.err = &ErrRequestBroken
return
}
err, ok := ds.err.(mongo.CommandError)
if !ok {
return
}
if err.HasErrorMessage(context.DeadlineExceeded.Error()) {
ds.err = &ErrRequestBroken
return
}
}
// debug 判断是否开启debug开启的话就打印
func (ds *DistinctScope) debug() {
if !ds.scope.debug {
return
}
debugger := &Debugger{
collection: ds.scope.tableName,
execT: ds.scope.execT,
action: ds,
}
// 当错误时优先输出
if ds.scope.debugWhenError {
if ds.err != nil {
debugger.errMsg = ds.err.Error()
debugger.ErrorString()
}
return
}
// 所有bug输出
if ds.scope.debug {
debugger.String()
}
}
func (ds *DistinctScope) doString() string {
builder := RegisterTimestampCodec(nil).Build()
filter, _ := bson.MarshalExtJSONWithRegistry(builder, ds.filter, true, true)
return fmt.Sprintf(`distinct("%s",%s)`, ds.fieldName, string(filter))
}
func (ds *DistinctScope) doGet() {
defer ds.assertErr()
var starTime time.Time
if ds.scope.debug {
starTime = time.Now()
}
ds.result, ds.err = db.Collection(ds.scope.tableName).Distinct(ds.getContext(), ds.fieldName, ds.filter, ds.opts)
if ds.scope.debug {
ds.scope.execT = time.Since(starTime)
ds.debug()
}
}
// doCache 执行缓存
// 检测句柄存不存在
// 从cacheFunc中获取cacheObj
// 判断下数据没问题以及没有错误就进行缓存
func (ds *DistinctScope) doCache(obj interface{}) *DistinctScope {
// redis句柄不存在
if ds.scope.cache == nil {
return nil
}
cacheObj, err := ds.cacheFunc(ds.cacheKey, obj)
if err != nil {
ds.err = err
return ds
}
if cacheObj == nil {
ds.err = fmt.Errorf("cache object nil")
return ds
}
ttl := ds.scope.cache.ttl
if ttl == 0 {
ttl = time.Hour
} else if ttl == -1 {
ttl = 0
}
if ds.getContext().Err() != nil {
ds.err = ds.getContext().Err()
ds.assertErr()
return ds
}
ds.err = ds.scope.cache.client.Set(ds.getContext(), cacheObj.Key, cacheObj.Value, ttl).Err()
return ds
}
// Get 获取结果
// list: 必须 *[]string 或者 *[]struct
func (ds *DistinctScope) Get(list interface{}) error {
defer ds.doClear()
ds.optionAssembled()
ds.preCheck()
if ds.fieldName == "" {
return fmt.Errorf("field name empty")
}
ds.doGet()
if ds.err != nil {
return ds.err
}
vo := reflect.ValueOf(list)
if vo.Kind() != reflect.Ptr {
return fmt.Errorf("arg not ptr")
}
vo = vo.Elem()
if vo.Kind() != reflect.Slice {
return fmt.Errorf("arg not ptr to slice")
}
vot := vo.Type()
if vot.Kind() != reflect.Slice {
return fmt.Errorf("arg not ptr to slice")
}
vot = vot.Elem()
if vot.Kind() != reflect.String && vot.Kind() != reflect.Struct {
return fmt.Errorf("slice subtype must string or struct but get %+v", vot.Kind())
}
for _, obj := range ds.result {
vo = reflect.Append(vo, reflect.ValueOf(obj))
}
rtv := reflect.ValueOf(list)
rtv.Elem().Set(vo)
if ds.enableCache {
ds.doCache(list)
}
return nil
}