306 lines
6.2 KiB
Go
306 lines
6.2 KiB
Go
package mdbc
|
||
|
||
import (
|
||
"context"
|
||
"encoding/json"
|
||
"errors"
|
||
"fmt"
|
||
"reflect"
|
||
"time"
|
||
|
||
"go.mongodb.org/mongo-driver/bson"
|
||
"go.mongodb.org/mongo-driver/mongo"
|
||
"go.mongodb.org/mongo-driver/mongo/options"
|
||
)
|
||
|
||
type DistinctScope struct {
|
||
scope *Scope
|
||
cw *ctxWrap
|
||
err error
|
||
fieldName string
|
||
filter interface{}
|
||
opts *options.DistinctOptions
|
||
result []interface{}
|
||
enableCache bool
|
||
cacheFunc DistinctCacheFunc
|
||
cacheKey string
|
||
}
|
||
|
||
type DistinctCacheFunc func(field string, obj interface{}) (*CacheObject, error)
|
||
|
||
// DefaultDistinctCacheFunc 默认的缓存方法
|
||
// key: 缓存对象的key
|
||
// obj: 缓存对象
|
||
var DefaultDistinctCacheFunc = func() DistinctCacheFunc {
|
||
return func(key string, obj interface{}) (co *CacheObject, err error) {
|
||
// 建议先断言obj 再操作 若obj断言失败 请返回nil 这样缓存将不会执行
|
||
// 下面使用反射进行操作 保证兼容
|
||
v := reflect.ValueOf(obj)
|
||
if v.Type().Kind() != reflect.Ptr {
|
||
return nil, fmt.Errorf("invalid list type, not ptr")
|
||
}
|
||
|
||
v = v.Elem()
|
||
if v.Type().Kind() != reflect.Slice {
|
||
return nil, fmt.Errorf("invalid list type, not ptr to slice")
|
||
}
|
||
|
||
// 空slice 无需缓存
|
||
if v.Len() == 0 {
|
||
return nil, nil
|
||
}
|
||
|
||
b, _ := json.Marshal(obj)
|
||
co = &CacheObject{
|
||
Key: key,
|
||
Value: string(b),
|
||
}
|
||
|
||
return co, nil
|
||
}
|
||
}
|
||
|
||
// SetContext 设置上下文
|
||
func (ds *DistinctScope) SetContext(ctx context.Context) *DistinctScope {
|
||
if ds.cw == nil {
|
||
ds.cw = &ctxWrap{}
|
||
}
|
||
ds.cw.ctx = ctx
|
||
return ds
|
||
}
|
||
|
||
func (ds *DistinctScope) getContext() context.Context {
|
||
return ds.cw.ctx
|
||
}
|
||
|
||
func (ds *DistinctScope) doClear() {
|
||
if ds.cw != nil && ds.cw.cancel != nil {
|
||
ds.cw.cancel()
|
||
}
|
||
ds.scope.execT = 0
|
||
ds.scope.debug = false
|
||
}
|
||
|
||
// SetUpdateOption 设置更新选项
|
||
func (ds *DistinctScope) SetUpdateOption(opts options.DistinctOptions) *DistinctScope {
|
||
ds.opts = &opts
|
||
return ds
|
||
}
|
||
|
||
// SetFilter 设置过滤条件
|
||
func (ds *DistinctScope) SetFilter(filter interface{}) *DistinctScope {
|
||
if filter == nil {
|
||
ds.filter = bson.M{}
|
||
return ds
|
||
}
|
||
|
||
v := reflect.ValueOf(filter)
|
||
if v.Kind() == reflect.Ptr || v.Kind() == reflect.Map || v.Kind() == reflect.Slice {
|
||
if v.IsNil() {
|
||
ds.filter = bson.M{}
|
||
}
|
||
}
|
||
ds.filter = filter
|
||
return ds
|
||
}
|
||
|
||
// SetFieldName 设置字段名
|
||
func (ds *DistinctScope) SetFieldName(name string) *DistinctScope {
|
||
ds.fieldName = name
|
||
return ds
|
||
}
|
||
|
||
func (ds *DistinctScope) optionAssembled() {
|
||
// 配置项被直接调用重写过
|
||
if ds.opts != nil {
|
||
return
|
||
}
|
||
|
||
ds.opts = new(options.DistinctOptions)
|
||
}
|
||
|
||
// SetCacheFunc 传递一个函数 处理查询操作的结果进行缓存
|
||
func (ds *DistinctScope) SetCacheFunc(key string, cb DistinctCacheFunc) *DistinctScope {
|
||
ds.enableCache = true
|
||
ds.cacheFunc = cb
|
||
ds.cacheKey = key
|
||
return ds
|
||
}
|
||
|
||
func (ds *DistinctScope) preCheck() {
|
||
if ds.filter == nil {
|
||
ds.filter = bson.M{}
|
||
}
|
||
var breakerTTL time.Duration
|
||
if ds.scope.breaker == nil {
|
||
breakerTTL = defaultBreakerTime
|
||
} else if ds.scope.breaker.ttl == 0 {
|
||
breakerTTL = defaultBreakerTime
|
||
} else {
|
||
breakerTTL = ds.scope.breaker.ttl
|
||
}
|
||
if ds.cw == nil {
|
||
ds.cw = &ctxWrap{}
|
||
}
|
||
if ds.cw.ctx == nil {
|
||
ds.cw.ctx, ds.cw.cancel = context.WithTimeout(context.Background(), breakerTTL)
|
||
}
|
||
}
|
||
|
||
func (ds *DistinctScope) assertErr() {
|
||
if ds.err == nil {
|
||
return
|
||
}
|
||
if errors.Is(ds.err, context.DeadlineExceeded) {
|
||
ds.err = &ErrRequestBroken
|
||
return
|
||
}
|
||
err, ok := ds.err.(mongo.CommandError)
|
||
if !ok {
|
||
return
|
||
}
|
||
if err.HasErrorMessage(context.DeadlineExceeded.Error()) {
|
||
ds.err = &ErrRequestBroken
|
||
return
|
||
}
|
||
}
|
||
|
||
// debug 判断是否开启debug,开启的话就打印
|
||
func (ds *DistinctScope) debug() {
|
||
if !ds.scope.debug {
|
||
return
|
||
}
|
||
|
||
debugger := &Debugger{
|
||
collection: ds.scope.tableName,
|
||
execT: ds.scope.execT,
|
||
action: ds,
|
||
}
|
||
|
||
// 当错误时优先输出
|
||
if ds.scope.debugWhenError {
|
||
if ds.err != nil {
|
||
debugger.errMsg = ds.err.Error()
|
||
debugger.ErrorString()
|
||
}
|
||
return
|
||
}
|
||
|
||
// 所有bug输出
|
||
if ds.scope.debug {
|
||
debugger.String()
|
||
}
|
||
}
|
||
|
||
func (ds *DistinctScope) doString() string {
|
||
builder := RegisterTimestampCodec(nil).Build()
|
||
filter, _ := bson.MarshalExtJSONWithRegistry(builder, ds.filter, true, true)
|
||
return fmt.Sprintf(`distinct("%s",%s)`, ds.fieldName, string(filter))
|
||
}
|
||
|
||
func (ds *DistinctScope) doGet() {
|
||
defer ds.assertErr()
|
||
var starTime time.Time
|
||
if ds.scope.debug {
|
||
starTime = time.Now()
|
||
}
|
||
ds.result, ds.err = db.Collection(ds.scope.tableName).Distinct(ds.getContext(), ds.fieldName, ds.filter, ds.opts)
|
||
if ds.scope.debug {
|
||
ds.scope.execT = time.Since(starTime)
|
||
ds.debug()
|
||
}
|
||
}
|
||
|
||
// doCache 执行缓存
|
||
// 检测句柄存不存在
|
||
// 从cacheFunc中获取cacheObj
|
||
// 判断下数据没问题以及没有错误就进行缓存
|
||
func (ds *DistinctScope) doCache(obj interface{}) *DistinctScope {
|
||
// redis句柄不存在
|
||
if ds.scope.cache == nil {
|
||
return nil
|
||
}
|
||
|
||
cacheObj, err := ds.cacheFunc(ds.cacheKey, obj)
|
||
if err != nil {
|
||
ds.err = err
|
||
return ds
|
||
}
|
||
|
||
if cacheObj == nil {
|
||
ds.err = fmt.Errorf("cache object nil")
|
||
return ds
|
||
}
|
||
|
||
ttl := ds.scope.cache.ttl
|
||
if ttl == 0 {
|
||
ttl = time.Hour
|
||
} else if ttl == -1 {
|
||
ttl = 0
|
||
}
|
||
|
||
if ds.getContext().Err() != nil {
|
||
ds.err = ds.getContext().Err()
|
||
ds.assertErr()
|
||
return ds
|
||
}
|
||
|
||
ds.err = ds.scope.cache.client.Set(ds.getContext(), cacheObj.Key, cacheObj.Value, ttl).Err()
|
||
return ds
|
||
}
|
||
|
||
// Get 获取结果
|
||
// list: 必须 *[]string 或者 *[]struct
|
||
func (ds *DistinctScope) Get(list interface{}) error {
|
||
defer ds.doClear()
|
||
ds.optionAssembled()
|
||
ds.preCheck()
|
||
|
||
if ds.fieldName == "" {
|
||
return fmt.Errorf("field name empty")
|
||
}
|
||
|
||
ds.doGet()
|
||
|
||
if ds.err != nil {
|
||
return ds.err
|
||
}
|
||
|
||
vo := reflect.ValueOf(list)
|
||
|
||
if vo.Kind() != reflect.Ptr {
|
||
return fmt.Errorf("arg not ptr")
|
||
}
|
||
|
||
vo = vo.Elem()
|
||
|
||
if vo.Kind() != reflect.Slice {
|
||
return fmt.Errorf("arg not ptr to slice")
|
||
}
|
||
|
||
vot := vo.Type()
|
||
|
||
if vot.Kind() != reflect.Slice {
|
||
return fmt.Errorf("arg not ptr to slice")
|
||
}
|
||
|
||
vot = vot.Elem()
|
||
|
||
if vot.Kind() != reflect.String && vot.Kind() != reflect.Struct {
|
||
return fmt.Errorf("slice subtype must string or struct but get %+v", vot.Kind())
|
||
}
|
||
|
||
for _, obj := range ds.result {
|
||
vo = reflect.Append(vo, reflect.ValueOf(obj))
|
||
}
|
||
|
||
rtv := reflect.ValueOf(list)
|
||
rtv.Elem().Set(vo)
|
||
|
||
if ds.enableCache {
|
||
ds.doCache(list)
|
||
}
|
||
|
||
return nil
|
||
}
|