306 lines
6.2 KiB
Go
306 lines
6.2 KiB
Go
|
package mdbc
|
|||
|
|
|||
|
import (
|
|||
|
"context"
|
|||
|
"encoding/json"
|
|||
|
"errors"
|
|||
|
"fmt"
|
|||
|
"reflect"
|
|||
|
"time"
|
|||
|
|
|||
|
"go.mongodb.org/mongo-driver/bson"
|
|||
|
"go.mongodb.org/mongo-driver/mongo"
|
|||
|
"go.mongodb.org/mongo-driver/mongo/options"
|
|||
|
)
|
|||
|
|
|||
|
type DistinctScope struct {
|
|||
|
scope *Scope
|
|||
|
cw *ctxWrap
|
|||
|
err error
|
|||
|
fieldName string
|
|||
|
filter interface{}
|
|||
|
opts *options.DistinctOptions
|
|||
|
result []interface{}
|
|||
|
enableCache bool
|
|||
|
cacheFunc DistinctCacheFunc
|
|||
|
cacheKey string
|
|||
|
}
|
|||
|
|
|||
|
type DistinctCacheFunc func(field string, obj interface{}) (*CacheObject, error)
|
|||
|
|
|||
|
// DefaultDistinctCacheFunc 默认的缓存方法
|
|||
|
// key: 缓存对象的key
|
|||
|
// obj: 缓存对象
|
|||
|
var DefaultDistinctCacheFunc = func() DistinctCacheFunc {
|
|||
|
return func(key string, obj interface{}) (co *CacheObject, err error) {
|
|||
|
// 建议先断言obj 再操作 若obj断言失败 请返回nil 这样缓存将不会执行
|
|||
|
// 下面使用反射进行操作 保证兼容
|
|||
|
v := reflect.ValueOf(obj)
|
|||
|
if v.Type().Kind() != reflect.Ptr {
|
|||
|
return nil, fmt.Errorf("invalid list type, not ptr")
|
|||
|
}
|
|||
|
|
|||
|
v = v.Elem()
|
|||
|
if v.Type().Kind() != reflect.Slice {
|
|||
|
return nil, fmt.Errorf("invalid list type, not ptr to slice")
|
|||
|
}
|
|||
|
|
|||
|
// 空slice 无需缓存
|
|||
|
if v.Len() == 0 {
|
|||
|
return nil, nil
|
|||
|
}
|
|||
|
|
|||
|
b, _ := json.Marshal(obj)
|
|||
|
co = &CacheObject{
|
|||
|
Key: key,
|
|||
|
Value: string(b),
|
|||
|
}
|
|||
|
|
|||
|
return co, nil
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
// SetContext 设置上下文
|
|||
|
func (ds *DistinctScope) SetContext(ctx context.Context) *DistinctScope {
|
|||
|
if ds.cw == nil {
|
|||
|
ds.cw = &ctxWrap{}
|
|||
|
}
|
|||
|
ds.cw.ctx = ctx
|
|||
|
return ds
|
|||
|
}
|
|||
|
|
|||
|
func (ds *DistinctScope) getContext() context.Context {
|
|||
|
return ds.cw.ctx
|
|||
|
}
|
|||
|
|
|||
|
func (ds *DistinctScope) doClear() {
|
|||
|
if ds.cw != nil && ds.cw.cancel != nil {
|
|||
|
ds.cw.cancel()
|
|||
|
}
|
|||
|
ds.scope.execT = 0
|
|||
|
ds.scope.debug = false
|
|||
|
}
|
|||
|
|
|||
|
// SetUpdateOption 设置更新选项
|
|||
|
func (ds *DistinctScope) SetUpdateOption(opts options.DistinctOptions) *DistinctScope {
|
|||
|
ds.opts = &opts
|
|||
|
return ds
|
|||
|
}
|
|||
|
|
|||
|
// SetFilter 设置过滤条件
|
|||
|
func (ds *DistinctScope) SetFilter(filter interface{}) *DistinctScope {
|
|||
|
if filter == nil {
|
|||
|
ds.filter = bson.M{}
|
|||
|
return ds
|
|||
|
}
|
|||
|
|
|||
|
v := reflect.ValueOf(filter)
|
|||
|
if v.Kind() == reflect.Ptr || v.Kind() == reflect.Map || v.Kind() == reflect.Slice {
|
|||
|
if v.IsNil() {
|
|||
|
ds.filter = bson.M{}
|
|||
|
}
|
|||
|
}
|
|||
|
ds.filter = filter
|
|||
|
return ds
|
|||
|
}
|
|||
|
|
|||
|
// SetFieldName 设置字段名
|
|||
|
func (ds *DistinctScope) SetFieldName(name string) *DistinctScope {
|
|||
|
ds.fieldName = name
|
|||
|
return ds
|
|||
|
}
|
|||
|
|
|||
|
func (ds *DistinctScope) optionAssembled() {
|
|||
|
// 配置项被直接调用重写过
|
|||
|
if ds.opts != nil {
|
|||
|
return
|
|||
|
}
|
|||
|
|
|||
|
ds.opts = new(options.DistinctOptions)
|
|||
|
}
|
|||
|
|
|||
|
// SetCacheFunc 传递一个函数 处理查询操作的结果进行缓存
|
|||
|
func (ds *DistinctScope) SetCacheFunc(key string, cb DistinctCacheFunc) *DistinctScope {
|
|||
|
ds.enableCache = true
|
|||
|
ds.cacheFunc = cb
|
|||
|
ds.cacheKey = key
|
|||
|
return ds
|
|||
|
}
|
|||
|
|
|||
|
func (ds *DistinctScope) preCheck() {
|
|||
|
if ds.filter == nil {
|
|||
|
ds.filter = bson.M{}
|
|||
|
}
|
|||
|
var breakerTTL time.Duration
|
|||
|
if ds.scope.breaker == nil {
|
|||
|
breakerTTL = defaultBreakerTime
|
|||
|
} else if ds.scope.breaker.ttl == 0 {
|
|||
|
breakerTTL = defaultBreakerTime
|
|||
|
} else {
|
|||
|
breakerTTL = ds.scope.breaker.ttl
|
|||
|
}
|
|||
|
if ds.cw == nil {
|
|||
|
ds.cw = &ctxWrap{}
|
|||
|
}
|
|||
|
if ds.cw.ctx == nil {
|
|||
|
ds.cw.ctx, ds.cw.cancel = context.WithTimeout(context.Background(), breakerTTL)
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
func (ds *DistinctScope) assertErr() {
|
|||
|
if ds.err == nil {
|
|||
|
return
|
|||
|
}
|
|||
|
if errors.Is(ds.err, context.DeadlineExceeded) {
|
|||
|
ds.err = &ErrRequestBroken
|
|||
|
return
|
|||
|
}
|
|||
|
err, ok := ds.err.(mongo.CommandError)
|
|||
|
if !ok {
|
|||
|
return
|
|||
|
}
|
|||
|
if err.HasErrorMessage(context.DeadlineExceeded.Error()) {
|
|||
|
ds.err = &ErrRequestBroken
|
|||
|
return
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
// debug 判断是否开启debug,开启的话就打印
|
|||
|
func (ds *DistinctScope) debug() {
|
|||
|
if !ds.scope.debug {
|
|||
|
return
|
|||
|
}
|
|||
|
|
|||
|
debugger := &Debugger{
|
|||
|
collection: ds.scope.tableName,
|
|||
|
execT: ds.scope.execT,
|
|||
|
action: ds,
|
|||
|
}
|
|||
|
|
|||
|
// 当错误时优先输出
|
|||
|
if ds.scope.debugWhenError {
|
|||
|
if ds.err != nil {
|
|||
|
debugger.errMsg = ds.err.Error()
|
|||
|
debugger.ErrorString()
|
|||
|
}
|
|||
|
return
|
|||
|
}
|
|||
|
|
|||
|
// 所有bug输出
|
|||
|
if ds.scope.debug {
|
|||
|
debugger.String()
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
func (ds *DistinctScope) doString() string {
|
|||
|
builder := RegisterTimestampCodec(nil).Build()
|
|||
|
filter, _ := bson.MarshalExtJSONWithRegistry(builder, ds.filter, true, true)
|
|||
|
return fmt.Sprintf(`distinct("%s",%s)`, ds.fieldName, string(filter))
|
|||
|
}
|
|||
|
|
|||
|
func (ds *DistinctScope) doGet() {
|
|||
|
defer ds.assertErr()
|
|||
|
var starTime time.Time
|
|||
|
if ds.scope.debug {
|
|||
|
starTime = time.Now()
|
|||
|
}
|
|||
|
ds.result, ds.err = db.Collection(ds.scope.tableName).Distinct(ds.getContext(), ds.fieldName, ds.filter, ds.opts)
|
|||
|
if ds.scope.debug {
|
|||
|
ds.scope.execT = time.Since(starTime)
|
|||
|
ds.debug()
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
// doCache 执行缓存
|
|||
|
// 检测句柄存不存在
|
|||
|
// 从cacheFunc中获取cacheObj
|
|||
|
// 判断下数据没问题以及没有错误就进行缓存
|
|||
|
func (ds *DistinctScope) doCache(obj interface{}) *DistinctScope {
|
|||
|
// redis句柄不存在
|
|||
|
if ds.scope.cache == nil {
|
|||
|
return nil
|
|||
|
}
|
|||
|
|
|||
|
cacheObj, err := ds.cacheFunc(ds.cacheKey, obj)
|
|||
|
if err != nil {
|
|||
|
ds.err = err
|
|||
|
return ds
|
|||
|
}
|
|||
|
|
|||
|
if cacheObj == nil {
|
|||
|
ds.err = fmt.Errorf("cache object nil")
|
|||
|
return ds
|
|||
|
}
|
|||
|
|
|||
|
ttl := ds.scope.cache.ttl
|
|||
|
if ttl == 0 {
|
|||
|
ttl = time.Hour
|
|||
|
} else if ttl == -1 {
|
|||
|
ttl = 0
|
|||
|
}
|
|||
|
|
|||
|
if ds.getContext().Err() != nil {
|
|||
|
ds.err = ds.getContext().Err()
|
|||
|
ds.assertErr()
|
|||
|
return ds
|
|||
|
}
|
|||
|
|
|||
|
ds.err = ds.scope.cache.client.Set(ds.getContext(), cacheObj.Key, cacheObj.Value, ttl).Err()
|
|||
|
return ds
|
|||
|
}
|
|||
|
|
|||
|
// Get 获取结果
|
|||
|
// list: 必须 *[]string 或者 *[]struct
|
|||
|
func (ds *DistinctScope) Get(list interface{}) error {
|
|||
|
defer ds.doClear()
|
|||
|
ds.optionAssembled()
|
|||
|
ds.preCheck()
|
|||
|
|
|||
|
if ds.fieldName == "" {
|
|||
|
return fmt.Errorf("field name empty")
|
|||
|
}
|
|||
|
|
|||
|
ds.doGet()
|
|||
|
|
|||
|
if ds.err != nil {
|
|||
|
return ds.err
|
|||
|
}
|
|||
|
|
|||
|
vo := reflect.ValueOf(list)
|
|||
|
|
|||
|
if vo.Kind() != reflect.Ptr {
|
|||
|
return fmt.Errorf("arg not ptr")
|
|||
|
}
|
|||
|
|
|||
|
vo = vo.Elem()
|
|||
|
|
|||
|
if vo.Kind() != reflect.Slice {
|
|||
|
return fmt.Errorf("arg not ptr to slice")
|
|||
|
}
|
|||
|
|
|||
|
vot := vo.Type()
|
|||
|
|
|||
|
if vot.Kind() != reflect.Slice {
|
|||
|
return fmt.Errorf("arg not ptr to slice")
|
|||
|
}
|
|||
|
|
|||
|
vot = vot.Elem()
|
|||
|
|
|||
|
if vot.Kind() != reflect.String && vot.Kind() != reflect.Struct {
|
|||
|
return fmt.Errorf("slice subtype must string or struct but get %+v", vot.Kind())
|
|||
|
}
|
|||
|
|
|||
|
for _, obj := range ds.result {
|
|||
|
vo = reflect.Append(vo, reflect.ValueOf(obj))
|
|||
|
}
|
|||
|
|
|||
|
rtv := reflect.ValueOf(list)
|
|||
|
rtv.Elem().Set(vo)
|
|||
|
|
|||
|
if ds.enableCache {
|
|||
|
ds.doCache(list)
|
|||
|
}
|
|||
|
|
|||
|
return nil
|
|||
|
}
|