mdbc/distinct_scope.go

306 lines
6.2 KiB
Go
Raw Permalink Normal View History

2022-02-23 08:59:45 +00:00
package mdbc
import (
"context"
"encoding/json"
"errors"
"fmt"
"reflect"
"time"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)
type DistinctScope struct {
scope *Scope
cw *ctxWrap
err error
fieldName string
filter interface{}
opts *options.DistinctOptions
result []interface{}
enableCache bool
cacheFunc DistinctCacheFunc
cacheKey string
}
type DistinctCacheFunc func(field string, obj interface{}) (*CacheObject, error)
// DefaultDistinctCacheFunc 默认的缓存方法
// key: 缓存对象的key
// obj: 缓存对象
var DefaultDistinctCacheFunc = func() DistinctCacheFunc {
return func(key string, obj interface{}) (co *CacheObject, err error) {
// 建议先断言obj 再操作 若obj断言失败 请返回nil 这样缓存将不会执行
// 下面使用反射进行操作 保证兼容
v := reflect.ValueOf(obj)
if v.Type().Kind() != reflect.Ptr {
return nil, fmt.Errorf("invalid list type, not ptr")
}
v = v.Elem()
if v.Type().Kind() != reflect.Slice {
return nil, fmt.Errorf("invalid list type, not ptr to slice")
}
// 空slice 无需缓存
if v.Len() == 0 {
return nil, nil
}
b, _ := json.Marshal(obj)
co = &CacheObject{
Key: key,
Value: string(b),
}
return co, nil
}
}
// SetContext 设置上下文
func (ds *DistinctScope) SetContext(ctx context.Context) *DistinctScope {
if ds.cw == nil {
ds.cw = &ctxWrap{}
}
ds.cw.ctx = ctx
return ds
}
func (ds *DistinctScope) getContext() context.Context {
return ds.cw.ctx
}
func (ds *DistinctScope) doClear() {
if ds.cw != nil && ds.cw.cancel != nil {
ds.cw.cancel()
}
ds.scope.execT = 0
ds.scope.debug = false
}
// SetUpdateOption 设置更新选项
func (ds *DistinctScope) SetUpdateOption(opts options.DistinctOptions) *DistinctScope {
ds.opts = &opts
return ds
}
// SetFilter 设置过滤条件
func (ds *DistinctScope) SetFilter(filter interface{}) *DistinctScope {
if filter == nil {
ds.filter = bson.M{}
return ds
}
v := reflect.ValueOf(filter)
if v.Kind() == reflect.Ptr || v.Kind() == reflect.Map || v.Kind() == reflect.Slice {
if v.IsNil() {
ds.filter = bson.M{}
}
}
ds.filter = filter
return ds
}
// SetFieldName 设置字段名
func (ds *DistinctScope) SetFieldName(name string) *DistinctScope {
ds.fieldName = name
return ds
}
func (ds *DistinctScope) optionAssembled() {
// 配置项被直接调用重写过
if ds.opts != nil {
return
}
ds.opts = new(options.DistinctOptions)
}
// SetCacheFunc 传递一个函数 处理查询操作的结果进行缓存
func (ds *DistinctScope) SetCacheFunc(key string, cb DistinctCacheFunc) *DistinctScope {
ds.enableCache = true
ds.cacheFunc = cb
ds.cacheKey = key
return ds
}
func (ds *DistinctScope) preCheck() {
if ds.filter == nil {
ds.filter = bson.M{}
}
var breakerTTL time.Duration
if ds.scope.breaker == nil {
breakerTTL = defaultBreakerTime
} else if ds.scope.breaker.ttl == 0 {
breakerTTL = defaultBreakerTime
} else {
breakerTTL = ds.scope.breaker.ttl
}
if ds.cw == nil {
ds.cw = &ctxWrap{}
}
if ds.cw.ctx == nil {
ds.cw.ctx, ds.cw.cancel = context.WithTimeout(context.Background(), breakerTTL)
}
}
func (ds *DistinctScope) assertErr() {
if ds.err == nil {
return
}
if errors.Is(ds.err, context.DeadlineExceeded) {
ds.err = &ErrRequestBroken
return
}
err, ok := ds.err.(mongo.CommandError)
if !ok {
return
}
if err.HasErrorMessage(context.DeadlineExceeded.Error()) {
ds.err = &ErrRequestBroken
return
}
}
// debug 判断是否开启debug开启的话就打印
func (ds *DistinctScope) debug() {
if !ds.scope.debug {
return
}
debugger := &Debugger{
collection: ds.scope.tableName,
execT: ds.scope.execT,
action: ds,
}
// 当错误时优先输出
if ds.scope.debugWhenError {
if ds.err != nil {
debugger.errMsg = ds.err.Error()
debugger.ErrorString()
}
return
}
// 所有bug输出
if ds.scope.debug {
debugger.String()
}
}
func (ds *DistinctScope) doString() string {
builder := RegisterTimestampCodec(nil).Build()
filter, _ := bson.MarshalExtJSONWithRegistry(builder, ds.filter, true, true)
return fmt.Sprintf(`distinct("%s",%s)`, ds.fieldName, string(filter))
}
func (ds *DistinctScope) doGet() {
defer ds.assertErr()
var starTime time.Time
if ds.scope.debug {
starTime = time.Now()
}
ds.result, ds.err = db.Collection(ds.scope.tableName).Distinct(ds.getContext(), ds.fieldName, ds.filter, ds.opts)
if ds.scope.debug {
ds.scope.execT = time.Since(starTime)
ds.debug()
}
}
// doCache 执行缓存
// 检测句柄存不存在
// 从cacheFunc中获取cacheObj
// 判断下数据没问题以及没有错误就进行缓存
func (ds *DistinctScope) doCache(obj interface{}) *DistinctScope {
// redis句柄不存在
if ds.scope.cache == nil {
return nil
}
cacheObj, err := ds.cacheFunc(ds.cacheKey, obj)
if err != nil {
ds.err = err
return ds
}
if cacheObj == nil {
ds.err = fmt.Errorf("cache object nil")
return ds
}
ttl := ds.scope.cache.ttl
if ttl == 0 {
ttl = time.Hour
} else if ttl == -1 {
ttl = 0
}
if ds.getContext().Err() != nil {
ds.err = ds.getContext().Err()
ds.assertErr()
return ds
}
ds.err = ds.scope.cache.client.Set(ds.getContext(), cacheObj.Key, cacheObj.Value, ttl).Err()
return ds
}
// Get 获取结果
// list: 必须 *[]string 或者 *[]struct
func (ds *DistinctScope) Get(list interface{}) error {
defer ds.doClear()
ds.optionAssembled()
ds.preCheck()
if ds.fieldName == "" {
return fmt.Errorf("field name empty")
}
ds.doGet()
if ds.err != nil {
return ds.err
}
vo := reflect.ValueOf(list)
if vo.Kind() != reflect.Ptr {
return fmt.Errorf("arg not ptr")
}
vo = vo.Elem()
if vo.Kind() != reflect.Slice {
return fmt.Errorf("arg not ptr to slice")
}
vot := vo.Type()
if vot.Kind() != reflect.Slice {
return fmt.Errorf("arg not ptr to slice")
}
vot = vot.Elem()
if vot.Kind() != reflect.String && vot.Kind() != reflect.Struct {
return fmt.Errorf("slice subtype must string or struct but get %+v", vot.Kind())
}
for _, obj := range ds.result {
vo = reflect.Append(vo, reflect.ValueOf(obj))
}
rtv := reflect.ValueOf(list)
rtv.Elem().Set(vo)
if ds.enableCache {
ds.doCache(list)
}
return nil
}