219 lines
4.9 KiB
Go
219 lines
4.9 KiB
Go
package mdbc
|
||
|
||
import (
|
||
"context"
|
||
"errors"
|
||
"fmt"
|
||
"time"
|
||
|
||
jsoniter "github.com/json-iterator/go"
|
||
|
||
"go.mongodb.org/mongo-driver/bson"
|
||
"go.mongodb.org/mongo-driver/mongo"
|
||
"go.mongodb.org/mongo-driver/mongo/options"
|
||
)
|
||
|
||
type BulkWriteScope struct {
|
||
scope *Scope
|
||
cw *ctxWrap
|
||
err error
|
||
chunkSize uint32
|
||
opts *options.BulkWriteOptions
|
||
models []mongo.WriteModel
|
||
result *mongo.BulkWriteResult
|
||
chunkFunc func([]mongo.WriteModel) error
|
||
}
|
||
|
||
// doString debug string
|
||
func (bws *BulkWriteScope) doString() string {
|
||
var data []interface{}
|
||
builder := RegisterTimestampCodec(nil).Build()
|
||
for _, v := range bws.models {
|
||
var body interface{}
|
||
rawv := Struct2MapOmitEmpty(v)
|
||
b, _ := bson.MarshalExtJSONWithRegistry(builder, rawv, true, true)
|
||
_ = jsoniter.Unmarshal(b, &body)
|
||
data = append(data, body)
|
||
}
|
||
b, _ := jsoniter.Marshal(data)
|
||
return fmt.Sprintf("bulkWrite(%s)", string(b))
|
||
}
|
||
|
||
// debug debug
|
||
func (bws *BulkWriteScope) debug() {
|
||
if !bws.scope.debug {
|
||
return
|
||
}
|
||
|
||
debugger := &Debugger{
|
||
collection: bws.scope.tableName,
|
||
execT: bws.scope.execT,
|
||
action: bws,
|
||
}
|
||
debugger.String()
|
||
}
|
||
|
||
// SetContext 设置上下文
|
||
func (bws *BulkWriteScope) SetContext(ctx context.Context) *BulkWriteScope {
|
||
if bws.cw == nil {
|
||
bws.cw = &ctxWrap{}
|
||
}
|
||
bws.cw.ctx = ctx
|
||
return bws
|
||
}
|
||
|
||
func (bws BulkWriteScope) getContext() context.Context {
|
||
return bws.cw.ctx
|
||
}
|
||
|
||
// SetBulkWriteOption 设置BulkWriteOption
|
||
func (bws *BulkWriteScope) SetBulkWriteOption(opts options.BulkWriteOptions) *BulkWriteScope {
|
||
bws.opts = &opts
|
||
return bws
|
||
}
|
||
|
||
// SetOrdered 设置BulkWriteOptions中的Ordered
|
||
func (bws *BulkWriteScope) SetOrdered(ordered bool) *BulkWriteScope {
|
||
if bws.opts == nil {
|
||
bws.opts = new(options.BulkWriteOptions)
|
||
}
|
||
bws.opts.Ordered = &ordered
|
||
return bws
|
||
}
|
||
|
||
// SetChunkSize 指定分块操作大小 默认不分块 当数据足够大时 可能导致deadlock问题?不确定这个问题
|
||
func (bws *BulkWriteScope) SetChunkSize(size uint32) *BulkWriteScope {
|
||
bws.chunkSize = size
|
||
return bws
|
||
}
|
||
|
||
// SetChunkFunc 可以在进行批量插入之前做一些事情 若错误将终止这一批数据的写入而执行下一批
|
||
func (bws *BulkWriteScope) SetChunkFunc(f func(models []mongo.WriteModel) error) *BulkWriteScope {
|
||
bws.chunkFunc = f
|
||
return bws
|
||
}
|
||
|
||
// SetWriteModel 设置需要操作的数据
|
||
func (bws *BulkWriteScope) SetWriteModel(models []mongo.WriteModel) *BulkWriteScope {
|
||
bws.models = models
|
||
return bws
|
||
}
|
||
|
||
// SetWriteModelFunc 可以定义函数来返回需要操作的数据
|
||
func (bws *BulkWriteScope) SetWriteModelFunc(f func() []mongo.WriteModel) *BulkWriteScope {
|
||
bws.models = f()
|
||
return bws
|
||
}
|
||
|
||
// preCheck 预检查
|
||
func (bws *BulkWriteScope) preCheck() {
|
||
var breakerTTL time.Duration
|
||
if bws.scope.breaker == nil {
|
||
breakerTTL = defaultBreakerTime
|
||
} else if bws.scope.breaker.ttl == 0 {
|
||
breakerTTL = defaultBreakerTime
|
||
} else {
|
||
breakerTTL = bws.scope.breaker.ttl
|
||
}
|
||
if bws.cw == nil {
|
||
bws.cw = &ctxWrap{}
|
||
}
|
||
if bws.cw.ctx == nil {
|
||
bws.cw.ctx, bws.cw.cancel = context.WithTimeout(context.Background(), breakerTTL)
|
||
}
|
||
}
|
||
|
||
func (bws *BulkWriteScope) doClear() {
|
||
if bws.cw != nil && bws.cw.cancel != nil {
|
||
bws.cw.cancel()
|
||
}
|
||
bws.scope.debug = false
|
||
bws.scope.execT = 0
|
||
}
|
||
|
||
func (bws *BulkWriteScope) assertErr() {
|
||
if bws.err == nil {
|
||
return
|
||
}
|
||
if errors.Is(bws.err, context.DeadlineExceeded) {
|
||
bws.err = &ErrRequestBroken
|
||
return
|
||
}
|
||
err, ok := bws.err.(mongo.CommandError)
|
||
if !ok {
|
||
return
|
||
}
|
||
if err.HasErrorMessage(context.DeadlineExceeded.Error()) {
|
||
bws.err = &ErrRequestBroken
|
||
}
|
||
}
|
||
|
||
func (bws *BulkWriteScope) doBulkWrite() {
|
||
defer bws.assertErr()
|
||
var starTime time.Time
|
||
if bws.scope.debug {
|
||
starTime = time.Now()
|
||
}
|
||
bws.result, bws.err = db.Collection(bws.scope.tableName).BulkWrite(bws.getContext(), bws.models, bws.opts)
|
||
if bws.scope.debug {
|
||
bws.scope.execT = time.Since(starTime)
|
||
bws.debug()
|
||
}
|
||
}
|
||
|
||
func (bws *BulkWriteScope) splitBulkWrite(arr []mongo.WriteModel, chunkSize int) [][]mongo.WriteModel {
|
||
var newArr [][]mongo.WriteModel
|
||
for i := 0; i < len(arr); i += chunkSize {
|
||
end := i + chunkSize
|
||
|
||
if end > len(arr) {
|
||
end = len(arr)
|
||
}
|
||
|
||
newArr = append(newArr, arr[i:end])
|
||
}
|
||
|
||
return newArr
|
||
}
|
||
|
||
func (bws *BulkWriteScope) checkModel() {
|
||
if len(bws.models) == 0 {
|
||
bws.err = fmt.Errorf("models empty")
|
||
return
|
||
}
|
||
// 命令检测
|
||
}
|
||
|
||
// Do 执行批量操作 请确保 SetWriteModel 已被设置 否则报错
|
||
func (bws *BulkWriteScope) Do() (*mongo.BulkWriteResult, error) {
|
||
defer bws.doClear()
|
||
bws.checkModel()
|
||
bws.preCheck()
|
||
|
||
if bws.err != nil {
|
||
return nil, bws.err
|
||
}
|
||
|
||
// 如果设置了chunkSize就分片插入
|
||
if bws.chunkSize > 0 {
|
||
models := bws.splitBulkWrite(bws.models, int(bws.chunkSize))
|
||
for _, model := range models {
|
||
bws.models = model
|
||
if bws.chunkFunc != nil {
|
||
if bws.err = bws.chunkFunc(bws.models); bws.err != nil {
|
||
continue
|
||
}
|
||
}
|
||
bws.doBulkWrite()
|
||
}
|
||
} else {
|
||
bws.doBulkWrite()
|
||
}
|
||
|
||
if bws.err != nil {
|
||
return nil, bws.err
|
||
}
|
||
|
||
return bws.result, nil
|
||
}
|