201 lines
5.9 KiB
Go
201 lines
5.9 KiB
Go
|
package mdbc
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
"fmt"
|
||
|
"time"
|
||
|
|
||
|
"go.mongodb.org/mongo-driver/mongo"
|
||
|
"go.mongodb.org/mongo-driver/mongo/options"
|
||
|
"go.mongodb.org/mongo-driver/mongo/readpref"
|
||
|
)
|
||
|
|
||
|
type ClientInit struct {
|
||
|
*mongo.Client
|
||
|
}
|
||
|
|
||
|
var (
|
||
|
ci *ClientInit
|
||
|
)
|
||
|
|
||
|
type Database struct {
|
||
|
*mongo.Database
|
||
|
dbname string
|
||
|
}
|
||
|
|
||
|
type Collection struct {
|
||
|
*mongo.Collection
|
||
|
dbname string
|
||
|
colname string
|
||
|
}
|
||
|
|
||
|
//ConnInit 初始化mongo
|
||
|
func ConnInit(config *Config) (*ClientInit, error) {
|
||
|
if config == nil {
|
||
|
return nil, fmt.Errorf("config nil")
|
||
|
}
|
||
|
if config.URI == "" {
|
||
|
return nil, fmt.Errorf("empty uri")
|
||
|
}
|
||
|
if config.MinPoolSize == 0 {
|
||
|
config.MinPoolSize = 1
|
||
|
}
|
||
|
if config.MaxPoolSize == 0 {
|
||
|
config.MaxPoolSize = 32
|
||
|
}
|
||
|
var timeout time.Duration
|
||
|
if config.ConnTimeout == 0 {
|
||
|
config.ConnTimeout = 10
|
||
|
}
|
||
|
timeout = time.Duration(config.ConnTimeout) * time.Second
|
||
|
if config.ReadPreference == nil {
|
||
|
config.ReadPreference = readpref.PrimaryPreferred()
|
||
|
}
|
||
|
|
||
|
op := options.Client().ApplyURI(config.URI).SetMinPoolSize(config.MinPoolSize).
|
||
|
SetMaxPoolSize(config.MaxPoolSize).SetConnectTimeout(timeout).
|
||
|
SetReadPreference(config.ReadPreference)
|
||
|
|
||
|
if config.RegistryBuilder != nil {
|
||
|
op.SetRegistry(config.RegistryBuilder.Build())
|
||
|
}
|
||
|
|
||
|
c, err := mongo.NewClient(op)
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
var ctx = context.Background()
|
||
|
err = c.Connect(ctx)
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
err = c.Ping(ctx, readpref.Primary())
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
ci = &ClientInit{c}
|
||
|
return ci, nil
|
||
|
}
|
||
|
|
||
|
func (c *ClientInit) Database(dbname string, opts ...*options.DatabaseOptions) *Database {
|
||
|
db := c.Client.Database(dbname, opts...)
|
||
|
return &Database{db, dbname}
|
||
|
}
|
||
|
|
||
|
func (db *Database) Collection(collection string, opts ...*options.CollectionOptions) *Collection {
|
||
|
col := db.Database.Collection(collection, opts...)
|
||
|
return &Collection{col, db.dbname, collection}
|
||
|
}
|
||
|
|
||
|
func (col *Collection) InsertOne(ctx context.Context, document interface{},
|
||
|
opts ...*options.InsertOneOptions) (*mongo.InsertOneResult, error) {
|
||
|
res, err := col.Collection.InsertOne(ctx, document, opts...)
|
||
|
return res, err
|
||
|
}
|
||
|
|
||
|
func (col *Collection) InsertMany(ctx context.Context, documents []interface{},
|
||
|
opts ...*options.InsertManyOptions) (*mongo.InsertManyResult, error) {
|
||
|
res, err := col.Collection.InsertMany(ctx, documents, opts...)
|
||
|
return res, err
|
||
|
}
|
||
|
|
||
|
func (col *Collection) DeleteOne(ctx context.Context, filter interface{},
|
||
|
opts ...*options.DeleteOptions) (*mongo.DeleteResult, error) {
|
||
|
res, err := col.Collection.DeleteOne(ctx, filter, opts...)
|
||
|
return res, err
|
||
|
}
|
||
|
|
||
|
func (col *Collection) DeleteMany(ctx context.Context, filter interface{},
|
||
|
opts ...*options.DeleteOptions) (*mongo.DeleteResult, error) {
|
||
|
res, err := col.Collection.DeleteMany(ctx, filter, opts...)
|
||
|
return res, err
|
||
|
}
|
||
|
|
||
|
func (col *Collection) UpdateOne(ctx context.Context, filter interface{}, update interface{},
|
||
|
opts ...*options.UpdateOptions) (*mongo.UpdateResult, error) {
|
||
|
res, err := col.Collection.UpdateOne(ctx, filter, update, opts...)
|
||
|
return res, err
|
||
|
}
|
||
|
|
||
|
func (col *Collection) UpdateMany(ctx context.Context, filter interface{}, update interface{},
|
||
|
opts ...*options.UpdateOptions) (*mongo.UpdateResult, error) {
|
||
|
res, err := col.Collection.UpdateMany(ctx, filter, update, opts...)
|
||
|
return res, err
|
||
|
}
|
||
|
|
||
|
func (col *Collection) ReplaceOne(ctx context.Context, filter interface{},
|
||
|
replacement interface{}, opts ...*options.ReplaceOptions) (*mongo.UpdateResult, error) {
|
||
|
res, err := col.Collection.ReplaceOne(ctx, filter, replacement, opts...)
|
||
|
return res, err
|
||
|
}
|
||
|
|
||
|
func (col *Collection) Aggregate(ctx context.Context, pipeline interface{},
|
||
|
opts ...*options.AggregateOptions) (*mongo.Cursor, error) {
|
||
|
res, err := col.Collection.Aggregate(ctx, pipeline, opts...)
|
||
|
return res, err
|
||
|
}
|
||
|
|
||
|
func (col *Collection) CountDocuments(ctx context.Context, filter interface{},
|
||
|
opts ...*options.CountOptions) (int64, error) {
|
||
|
res, err := col.Collection.CountDocuments(ctx, filter, opts...)
|
||
|
return res, err
|
||
|
}
|
||
|
|
||
|
func (col *Collection) Distinct(ctx context.Context, fieldName string, filter interface{},
|
||
|
opts ...*options.DistinctOptions) ([]interface{}, error) {
|
||
|
res, err := col.Collection.Distinct(ctx, fieldName, filter, opts...)
|
||
|
return res, err
|
||
|
}
|
||
|
|
||
|
func (col *Collection) Find(ctx context.Context, filter interface{},
|
||
|
opts ...*options.FindOptions) (*mongo.Cursor, error) {
|
||
|
res, err := col.Collection.Find(ctx, filter, opts...)
|
||
|
return res, err
|
||
|
}
|
||
|
|
||
|
func (col *Collection) FindOne(ctx context.Context, filter interface{},
|
||
|
opts ...*options.FindOneOptions) *mongo.SingleResult {
|
||
|
res := col.Collection.FindOne(ctx, filter, opts...)
|
||
|
return res
|
||
|
}
|
||
|
|
||
|
func (col *Collection) FindOneAndDelete(ctx context.Context, filter interface{},
|
||
|
opts ...*options.FindOneAndDeleteOptions) *mongo.SingleResult {
|
||
|
res := col.Collection.FindOneAndDelete(ctx, filter, opts...)
|
||
|
return res
|
||
|
}
|
||
|
|
||
|
func (col *Collection) FindOneAndReplace(ctx context.Context, filter interface{},
|
||
|
replacement interface{}, opts ...*options.FindOneAndReplaceOptions) *mongo.SingleResult {
|
||
|
res := col.Collection.FindOneAndReplace(ctx, filter, replacement, opts...)
|
||
|
return res
|
||
|
}
|
||
|
|
||
|
func (col *Collection) FindOneAndUpdate(ctx context.Context, filter interface{},
|
||
|
update interface{}, opts ...*options.FindOneAndUpdateOptions) *mongo.SingleResult {
|
||
|
res := col.Collection.FindOneAndUpdate(ctx, filter, update, opts...)
|
||
|
return res
|
||
|
}
|
||
|
|
||
|
func (col *Collection) Watch(ctx context.Context, pipeline interface{},
|
||
|
opts ...*options.ChangeStreamOptions) (*mongo.ChangeStream, error) {
|
||
|
res, err := col.Collection.Watch(ctx, pipeline, opts...)
|
||
|
return res, err
|
||
|
}
|
||
|
|
||
|
func (col *Collection) Indexes(ctx context.Context) mongo.IndexView {
|
||
|
res := col.Collection.Indexes()
|
||
|
return res
|
||
|
}
|
||
|
|
||
|
func (col *Collection) Drop(ctx context.Context) error {
|
||
|
err := col.Collection.Drop(ctx)
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
func (col *Collection) BulkWrite(ctx context.Context, models []mongo.WriteModel,
|
||
|
opts ...*options.BulkWriteOptions) (*mongo.BulkWriteResult, error) {
|
||
|
res, err := col.Collection.BulkWrite(ctx, models, opts...)
|
||
|
return res, err
|
||
|
}
|