mdbc/bulk_write_scope.go

219 lines
4.9 KiB
Go
Raw Permalink Normal View History

2022-02-23 08:59:45 +00:00
package mdbc
import (
"context"
"errors"
"fmt"
"time"
jsoniter "github.com/json-iterator/go"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)
type BulkWriteScope struct {
scope *Scope
cw *ctxWrap
err error
chunkSize uint32
opts *options.BulkWriteOptions
models []mongo.WriteModel
result *mongo.BulkWriteResult
chunkFunc func([]mongo.WriteModel) error
}
// doString debug string
func (bws *BulkWriteScope) doString() string {
var data []interface{}
builder := RegisterTimestampCodec(nil).Build()
for _, v := range bws.models {
var body interface{}
rawv := Struct2MapOmitEmpty(v)
b, _ := bson.MarshalExtJSONWithRegistry(builder, rawv, true, true)
_ = jsoniter.Unmarshal(b, &body)
data = append(data, body)
}
b, _ := jsoniter.Marshal(data)
return fmt.Sprintf("bulkWrite(%s)", string(b))
}
// debug debug
func (bws *BulkWriteScope) debug() {
if !bws.scope.debug {
return
}
debugger := &Debugger{
collection: bws.scope.tableName,
execT: bws.scope.execT,
action: bws,
}
debugger.String()
}
// SetContext 设置上下文
func (bws *BulkWriteScope) SetContext(ctx context.Context) *BulkWriteScope {
if bws.cw == nil {
bws.cw = &ctxWrap{}
}
bws.cw.ctx = ctx
return bws
}
func (bws BulkWriteScope) getContext() context.Context {
return bws.cw.ctx
}
// SetBulkWriteOption 设置BulkWriteOption
func (bws *BulkWriteScope) SetBulkWriteOption(opts options.BulkWriteOptions) *BulkWriteScope {
bws.opts = &opts
return bws
}
// SetOrdered 设置BulkWriteOptions中的Ordered
func (bws *BulkWriteScope) SetOrdered(ordered bool) *BulkWriteScope {
if bws.opts == nil {
bws.opts = new(options.BulkWriteOptions)
}
bws.opts.Ordered = &ordered
return bws
}
// SetChunkSize 指定分块操作大小 默认不分块 当数据足够大时 可能导致deadlock问题不确定这个问题
func (bws *BulkWriteScope) SetChunkSize(size uint32) *BulkWriteScope {
bws.chunkSize = size
return bws
}
// SetChunkFunc 可以在进行批量插入之前做一些事情 若错误将终止这一批数据的写入而执行下一批
func (bws *BulkWriteScope) SetChunkFunc(f func(models []mongo.WriteModel) error) *BulkWriteScope {
bws.chunkFunc = f
return bws
}
// SetWriteModel 设置需要操作的数据
func (bws *BulkWriteScope) SetWriteModel(models []mongo.WriteModel) *BulkWriteScope {
bws.models = models
return bws
}
// SetWriteModelFunc 可以定义函数来返回需要操作的数据
func (bws *BulkWriteScope) SetWriteModelFunc(f func() []mongo.WriteModel) *BulkWriteScope {
bws.models = f()
return bws
}
// preCheck 预检查
func (bws *BulkWriteScope) preCheck() {
var breakerTTL time.Duration
if bws.scope.breaker == nil {
breakerTTL = defaultBreakerTime
} else if bws.scope.breaker.ttl == 0 {
breakerTTL = defaultBreakerTime
} else {
breakerTTL = bws.scope.breaker.ttl
}
if bws.cw == nil {
bws.cw = &ctxWrap{}
}
if bws.cw.ctx == nil {
bws.cw.ctx, bws.cw.cancel = context.WithTimeout(context.Background(), breakerTTL)
}
}
func (bws *BulkWriteScope) doClear() {
if bws.cw != nil && bws.cw.cancel != nil {
bws.cw.cancel()
}
bws.scope.debug = false
bws.scope.execT = 0
}
func (bws *BulkWriteScope) assertErr() {
if bws.err == nil {
return
}
if errors.Is(bws.err, context.DeadlineExceeded) {
bws.err = &ErrRequestBroken
return
}
err, ok := bws.err.(mongo.CommandError)
if !ok {
return
}
if err.HasErrorMessage(context.DeadlineExceeded.Error()) {
bws.err = &ErrRequestBroken
}
}
func (bws *BulkWriteScope) doBulkWrite() {
defer bws.assertErr()
var starTime time.Time
if bws.scope.debug {
starTime = time.Now()
}
bws.result, bws.err = db.Collection(bws.scope.tableName).BulkWrite(bws.getContext(), bws.models, bws.opts)
if bws.scope.debug {
bws.scope.execT = time.Since(starTime)
bws.debug()
}
}
func (bws *BulkWriteScope) splitBulkWrite(arr []mongo.WriteModel, chunkSize int) [][]mongo.WriteModel {
var newArr [][]mongo.WriteModel
for i := 0; i < len(arr); i += chunkSize {
end := i + chunkSize
if end > len(arr) {
end = len(arr)
}
newArr = append(newArr, arr[i:end])
}
return newArr
}
func (bws *BulkWriteScope) checkModel() {
if len(bws.models) == 0 {
bws.err = fmt.Errorf("models empty")
return
}
// 命令检测
}
// Do 执行批量操作 请确保 SetWriteModel 已被设置 否则报错
func (bws *BulkWriteScope) Do() (*mongo.BulkWriteResult, error) {
defer bws.doClear()
bws.checkModel()
bws.preCheck()
if bws.err != nil {
return nil, bws.err
}
// 如果设置了chunkSize就分片插入
if bws.chunkSize > 0 {
models := bws.splitBulkWrite(bws.models, int(bws.chunkSize))
for _, model := range models {
bws.models = model
if bws.chunkFunc != nil {
if bws.err = bws.chunkFunc(bws.models); bws.err != nil {
continue
}
}
bws.doBulkWrite()
}
} else {
bws.doBulkWrite()
}
if bws.err != nil {
return nil, bws.err
}
return bws.result, nil
}