package mdbc import ( "context" "errors" "fmt" "time" jsoniter "github.com/json-iterator/go" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" ) type BulkWriteScope struct { scope *Scope cw *ctxWrap err error chunkSize uint32 opts *options.BulkWriteOptions models []mongo.WriteModel result *mongo.BulkWriteResult chunkFunc func([]mongo.WriteModel) error } // doString debug string func (bws *BulkWriteScope) doString() string { var data []interface{} builder := RegisterTimestampCodec(nil).Build() for _, v := range bws.models { var body interface{} rawv := Struct2MapOmitEmpty(v) b, _ := bson.MarshalExtJSONWithRegistry(builder, rawv, true, true) _ = jsoniter.Unmarshal(b, &body) data = append(data, body) } b, _ := jsoniter.Marshal(data) return fmt.Sprintf("bulkWrite(%s)", string(b)) } // debug debug func (bws *BulkWriteScope) debug() { if !bws.scope.debug { return } debugger := &Debugger{ collection: bws.scope.tableName, execT: bws.scope.execT, action: bws, } debugger.String() } // SetContext 设置上下文 func (bws *BulkWriteScope) SetContext(ctx context.Context) *BulkWriteScope { if bws.cw == nil { bws.cw = &ctxWrap{} } bws.cw.ctx = ctx return bws } func (bws BulkWriteScope) getContext() context.Context { return bws.cw.ctx } // SetBulkWriteOption 设置BulkWriteOption func (bws *BulkWriteScope) SetBulkWriteOption(opts options.BulkWriteOptions) *BulkWriteScope { bws.opts = &opts return bws } // SetOrdered 设置BulkWriteOptions中的Ordered func (bws *BulkWriteScope) SetOrdered(ordered bool) *BulkWriteScope { if bws.opts == nil { bws.opts = new(options.BulkWriteOptions) } bws.opts.Ordered = &ordered return bws } // SetChunkSize 指定分块操作大小 默认不分块 当数据足够大时 可能导致deadlock问题?不确定这个问题 func (bws *BulkWriteScope) SetChunkSize(size uint32) *BulkWriteScope { bws.chunkSize = size return bws } // SetChunkFunc 可以在进行批量插入之前做一些事情 若错误将终止这一批数据的写入而执行下一批 func (bws *BulkWriteScope) SetChunkFunc(f func(models []mongo.WriteModel) error) *BulkWriteScope { bws.chunkFunc = f return bws } // SetWriteModel 设置需要操作的数据 func (bws *BulkWriteScope) SetWriteModel(models []mongo.WriteModel) *BulkWriteScope { bws.models = models return bws } // SetWriteModelFunc 可以定义函数来返回需要操作的数据 func (bws *BulkWriteScope) SetWriteModelFunc(f func() []mongo.WriteModel) *BulkWriteScope { bws.models = f() return bws } // preCheck 预检查 func (bws *BulkWriteScope) preCheck() { var breakerTTL time.Duration if bws.scope.breaker == nil { breakerTTL = defaultBreakerTime } else if bws.scope.breaker.ttl == 0 { breakerTTL = defaultBreakerTime } else { breakerTTL = bws.scope.breaker.ttl } if bws.cw == nil { bws.cw = &ctxWrap{} } if bws.cw.ctx == nil { bws.cw.ctx, bws.cw.cancel = context.WithTimeout(context.Background(), breakerTTL) } } func (bws *BulkWriteScope) doClear() { if bws.cw != nil && bws.cw.cancel != nil { bws.cw.cancel() } bws.scope.debug = false bws.scope.execT = 0 } func (bws *BulkWriteScope) assertErr() { if bws.err == nil { return } if errors.Is(bws.err, context.DeadlineExceeded) { bws.err = &ErrRequestBroken return } err, ok := bws.err.(mongo.CommandError) if !ok { return } if err.HasErrorMessage(context.DeadlineExceeded.Error()) { bws.err = &ErrRequestBroken } } func (bws *BulkWriteScope) doBulkWrite() { defer bws.assertErr() var starTime time.Time if bws.scope.debug { starTime = time.Now() } bws.result, bws.err = db.Collection(bws.scope.tableName).BulkWrite(bws.getContext(), bws.models, bws.opts) if bws.scope.debug { bws.scope.execT = time.Since(starTime) bws.debug() } } func (bws *BulkWriteScope) splitBulkWrite(arr []mongo.WriteModel, chunkSize int) [][]mongo.WriteModel { var newArr [][]mongo.WriteModel for i := 0; i < len(arr); i += chunkSize { end := i + chunkSize if end > len(arr) { end = len(arr) } newArr = append(newArr, arr[i:end]) } return newArr } func (bws *BulkWriteScope) checkModel() { if len(bws.models) == 0 { bws.err = fmt.Errorf("models empty") return } // 命令检测 } // Do 执行批量操作 请确保 SetWriteModel 已被设置 否则报错 func (bws *BulkWriteScope) Do() (*mongo.BulkWriteResult, error) { defer bws.doClear() bws.checkModel() bws.preCheck() if bws.err != nil { return nil, bws.err } // 如果设置了chunkSize就分片插入 if bws.chunkSize > 0 { models := bws.splitBulkWrite(bws.models, int(bws.chunkSize)) for _, model := range models { bws.models = model if bws.chunkFunc != nil { if bws.err = bws.chunkFunc(bws.models); bws.err != nil { continue } } bws.doBulkWrite() } } else { bws.doBulkWrite() } if bws.err != nil { return nil, bws.err } return bws.result, nil }