mdbc/bulk_write_scope.go
2022-02-23 16:59:45 +08:00

219 lines
4.9 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package mdbc
import (
"context"
"errors"
"fmt"
"time"
jsoniter "github.com/json-iterator/go"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)
type BulkWriteScope struct {
scope *Scope
cw *ctxWrap
err error
chunkSize uint32
opts *options.BulkWriteOptions
models []mongo.WriteModel
result *mongo.BulkWriteResult
chunkFunc func([]mongo.WriteModel) error
}
// doString debug string
func (bws *BulkWriteScope) doString() string {
var data []interface{}
builder := RegisterTimestampCodec(nil).Build()
for _, v := range bws.models {
var body interface{}
rawv := Struct2MapOmitEmpty(v)
b, _ := bson.MarshalExtJSONWithRegistry(builder, rawv, true, true)
_ = jsoniter.Unmarshal(b, &body)
data = append(data, body)
}
b, _ := jsoniter.Marshal(data)
return fmt.Sprintf("bulkWrite(%s)", string(b))
}
// debug debug
func (bws *BulkWriteScope) debug() {
if !bws.scope.debug {
return
}
debugger := &Debugger{
collection: bws.scope.tableName,
execT: bws.scope.execT,
action: bws,
}
debugger.String()
}
// SetContext 设置上下文
func (bws *BulkWriteScope) SetContext(ctx context.Context) *BulkWriteScope {
if bws.cw == nil {
bws.cw = &ctxWrap{}
}
bws.cw.ctx = ctx
return bws
}
func (bws BulkWriteScope) getContext() context.Context {
return bws.cw.ctx
}
// SetBulkWriteOption 设置BulkWriteOption
func (bws *BulkWriteScope) SetBulkWriteOption(opts options.BulkWriteOptions) *BulkWriteScope {
bws.opts = &opts
return bws
}
// SetOrdered 设置BulkWriteOptions中的Ordered
func (bws *BulkWriteScope) SetOrdered(ordered bool) *BulkWriteScope {
if bws.opts == nil {
bws.opts = new(options.BulkWriteOptions)
}
bws.opts.Ordered = &ordered
return bws
}
// SetChunkSize 指定分块操作大小 默认不分块 当数据足够大时 可能导致deadlock问题不确定这个问题
func (bws *BulkWriteScope) SetChunkSize(size uint32) *BulkWriteScope {
bws.chunkSize = size
return bws
}
// SetChunkFunc 可以在进行批量插入之前做一些事情 若错误将终止这一批数据的写入而执行下一批
func (bws *BulkWriteScope) SetChunkFunc(f func(models []mongo.WriteModel) error) *BulkWriteScope {
bws.chunkFunc = f
return bws
}
// SetWriteModel 设置需要操作的数据
func (bws *BulkWriteScope) SetWriteModel(models []mongo.WriteModel) *BulkWriteScope {
bws.models = models
return bws
}
// SetWriteModelFunc 可以定义函数来返回需要操作的数据
func (bws *BulkWriteScope) SetWriteModelFunc(f func() []mongo.WriteModel) *BulkWriteScope {
bws.models = f()
return bws
}
// preCheck 预检查
func (bws *BulkWriteScope) preCheck() {
var breakerTTL time.Duration
if bws.scope.breaker == nil {
breakerTTL = defaultBreakerTime
} else if bws.scope.breaker.ttl == 0 {
breakerTTL = defaultBreakerTime
} else {
breakerTTL = bws.scope.breaker.ttl
}
if bws.cw == nil {
bws.cw = &ctxWrap{}
}
if bws.cw.ctx == nil {
bws.cw.ctx, bws.cw.cancel = context.WithTimeout(context.Background(), breakerTTL)
}
}
func (bws *BulkWriteScope) doClear() {
if bws.cw != nil && bws.cw.cancel != nil {
bws.cw.cancel()
}
bws.scope.debug = false
bws.scope.execT = 0
}
func (bws *BulkWriteScope) assertErr() {
if bws.err == nil {
return
}
if errors.Is(bws.err, context.DeadlineExceeded) {
bws.err = &ErrRequestBroken
return
}
err, ok := bws.err.(mongo.CommandError)
if !ok {
return
}
if err.HasErrorMessage(context.DeadlineExceeded.Error()) {
bws.err = &ErrRequestBroken
}
}
func (bws *BulkWriteScope) doBulkWrite() {
defer bws.assertErr()
var starTime time.Time
if bws.scope.debug {
starTime = time.Now()
}
bws.result, bws.err = db.Collection(bws.scope.tableName).BulkWrite(bws.getContext(), bws.models, bws.opts)
if bws.scope.debug {
bws.scope.execT = time.Since(starTime)
bws.debug()
}
}
func (bws *BulkWriteScope) splitBulkWrite(arr []mongo.WriteModel, chunkSize int) [][]mongo.WriteModel {
var newArr [][]mongo.WriteModel
for i := 0; i < len(arr); i += chunkSize {
end := i + chunkSize
if end > len(arr) {
end = len(arr)
}
newArr = append(newArr, arr[i:end])
}
return newArr
}
func (bws *BulkWriteScope) checkModel() {
if len(bws.models) == 0 {
bws.err = fmt.Errorf("models empty")
return
}
// 命令检测
}
// Do 执行批量操作 请确保 SetWriteModel 已被设置 否则报错
func (bws *BulkWriteScope) Do() (*mongo.BulkWriteResult, error) {
defer bws.doClear()
bws.checkModel()
bws.preCheck()
if bws.err != nil {
return nil, bws.err
}
// 如果设置了chunkSize就分片插入
if bws.chunkSize > 0 {
models := bws.splitBulkWrite(bws.models, int(bws.chunkSize))
for _, model := range models {
bws.models = model
if bws.chunkFunc != nil {
if bws.err = bws.chunkFunc(bws.models); bws.err != nil {
continue
}
}
bws.doBulkWrite()
}
} else {
bws.doBulkWrite()
}
if bws.err != nil {
return nil, bws.err
}
return bws.result, nil
}