如何在Gorm中实现PostgreSQL的读写分离以优化数据库性能? | 客服服务营销数智化洞察_晓观点
       

如何在Gorm中实现PostgreSQL的读写分离以优化数据库性能?

之前发现pg的主库压力越来越大,虽然主从同步已经有了,但是一直没有引入读写分类,于是决定加入读写分离来分担主库的压力。接下来就分享下利用gorm实现读写分离。

配置文件

 "postgresql": {
        "test": "host=10.0.0.11 dbname=xxx port=5432 user=pguser password=xxx sslmode=disable",
        "test_slave":"host=10.0.0.11 dbname=xxx port=5432 user=pguser password=xxx sslmode=disable"
      },

主要代码

funcdbResolverDemo() {
    db, _ := gorm.Open(postgres.Open("test"), &gorm.Config{Logger: sqlLog})
    dbResolverCfg := dbresolver.Config{
       Sources:  []gorm.Dialector{postgres.Open("test")},
       Replicas: []gorm.Dialector{postgres.Open("test"), postgres.Open("test_slave")},
       Policy:   dbresolver.RandomPolicy{},
    }
    readWritePlugin := dbresolver.Register(dbResolverCfg)
    db.Use(readWritePlugin)
}

以上步骤已经实现了简单的读写分离。写入时只会写入到主库(test)。读取时会随机从主(test),从(test_slave)中读取数据。

DBResolver

gorm支持外挂插件,插件需要实现一个重要的方法Initialize;当执行db.Use(readWritePlugin)时会执行Initialize方法。DBResolver在Initialize中执行了dr.registerCallbacks(db)、dr.compile()。registerCallbacks的目的是在gorm注册sql路由器(包括switchSource,switchReplica,switchGuess)。这些路由器会根据读写策略(policy)将sql路由到pg实列。compile目的是根据配置初始化连接。

registerCallbacks

func (dr *DBResolver) registerCallbacks(db *gorm.DB) {
    dr.Callback().Create().Before("*").Register("gorm:db_resolver", dr.switchSource)
    dr.Callback().Query().Before("*").Register("gorm:db_resolver", dr.switchReplica)
    dr.Callback().Update().Before("*").Register("gorm:db_resolver", dr.switchSource)
    dr.Callback().Delete().Before("*").Register("gorm:db_resolver", dr.switchSource)
    dr.Callback().Row().Before("*").Register("gorm:db_resolver", dr.switchReplica)
    dr.Callback().Raw().Before("*").Register("gorm:db_resolver", dr.switchGuess)
}

func (dr *DBResolver) switchSource(db *gorm.DB) {
    if !isTransaction(db.Statement.ConnPool) {
       db.Statement.ConnPool = dr.resolve(db.Statement, Write)
    }
}

func (dr *DBResolver) switchReplica(db *gorm.DB) {
    if !isTransaction(db.Statement.ConnPool) {
       if rawSQL := db.Statement.SQL.String(); len(rawSQL) > 0 {
          dr.switchGuess(db)
       } else {
          _, locking := db.Statement.Clauses["FOR"]
          if _, ok := db.Statement.Settings.Load(writeName); ok || locking {
             db.Statement.ConnPool = dr.resolve(db.Statement, Write)
          } else {
             db.Statement.ConnPool = dr.resolve(db.Statement, Read)
          }
       }
    }
}

func (dr *DBResolver) switchGuess(db *gorm.DB) {
    if !isTransaction(db.Statement.ConnPool) {
       if _, ok := db.Statement.Settings.Load(writeName); ok {
          db.Statement.ConnPool = dr.resolve(db.Statement, Write)
       } else if _, ok := db.Statement.Settings.Load(readName); ok {
          db.Statement.ConnPool = dr.resolve(db.Statement, Read)
       } else if rawSQL := strings.TrimSpace(db.Statement.SQL.String()); len(rawSQL) > 10 && strings.EqualFold(rawSQL[:6], "select") && !strings.EqualFold(rawSQL[len(rawSQL)-10:], "for update") {
          db.Statement.ConnPool = dr.resolve(db.Statement, Read)
       } else {
          db.Statement.ConnPool = dr.resolve(db.Statement, Write)
       }
    }
}

func isTransaction(connPool gorm.ConnPool) bool {
    _, ok := connPool.(gorm.TxCommitter)
    return ok
}

registerCallbacks方法针对Create、Update、Delete方法注册了dr.switchSource;针对Query、Row注册了dr.switchReplica。swith* 方法实现了读写时的路由规则。比如写入时会从resource中选择连接,写入时会从repliaces中选择连接。当有多个repliaces实列时,根据policy来轮询。

compile

func (dr *DBResolver) compile() error {
    for _, config := range dr.configs {
       if err := dr.compileConfig(config); err != nil {
          return err
       }
    }
    return nil
}

func (dr *DBResolver) compileConfig(config Config) (err error) {
    var (
       connPool = dr.DB.Config.ConnPool
       r        = resolver{
          dbResolver:        dr,
          policy:            config.Policy,
          traceResolverMode: config.TraceResolverMode,
       }
    )

    if preparedStmtDB, ok := connPool.(*gorm.PreparedStmtDB); ok {
       connPool = preparedStmtDB.ConnPool
    }

    if len(config.Sources) == 0 {
       r.sources = []gorm.ConnPool{connPool}
    } else if r.sources, err = dr.convertToConnPool(config.Sources); err != nil {
       return err
    }

    if len(config.Replicas) == 0 {
       r.replicas = r.sources
    } else if r.replicas, err = dr.convertToConnPool(config.Replicas); err != nil {
       return err
    }
    }

compile方法遍历dr.configs,挨个执行dr.compileConfig(config),它会使用config.Policy创建resolver,resolver中的sources,repliaces则包含了连接池(gorm.ConnPool)。

Resolve

func (r *resolver) resolve(stmt *gorm.Statement, op Operation) (connPool gorm.ConnPool) {
    if op == Read {
       if len(r.replicas) == 1 {
          connPool = r.replicas[0]
       } else {
          connPool = r.policy.Resolve(r.replicas)
       }
       if r.traceResolverMode {
          markStmtResolverMode(stmt, ResolverModeReplica)
       }
    } else if len(r.sources) == 1 {
       connPool = r.sources[0]
       if r.traceResolverMode {
          markStmtResolverMode(stmt, ResolverModeSource)
       }
    } else {
       connPool = r.policy.Resolve(r.sources)
       if r.traceResolverMode {
          markStmtResolverMode(stmt, ResolverModeSource)
       }
    }

    if stmt.DB.PrepareStmt {
       if preparedStmt, ok := r.dbResolver.prepareStmtStore[connPool]; ok {
          return &gorm.PreparedStmtDB{
             ConnPool: connPool,
             Mux:      preparedStmt.Mux,
             Stmts:    preparedStmt.Stmts,
          }
       }
    }

    return
}

resolver的resolve在Operation为Read的时候,会使用r.replicas,若只有1个replica则直接返回,若有多个则使用r.policy.Resolve(r.replicas)选一个;若Operation为write时,判断sources,若只有一个sources,则直接返回,若有多个source则通过r.policy.Resolve(r.sources)选择。reslover方法会被路由方法调用(switch*)。

Policy

typePolicy interface{
    Resolve([]gorm.ConnPool) gorm.ConnPool
}

typeRandomPolicy struct{
}

func(RandomPolicy) Resolve(connPools []gorm.ConnPool) gorm.ConnPool {
    returnconnPools[rand.Intn(len(connPools))]
}

policy定义了reslove方法来选取数据源,默认是RandomPolicy。

执行过程

当我们调用Grom的方法时,首先会根据操作类型找到相应的callbacks,callbacks中的processor中注册了处理函数(文中注册的时switch*方法),然后执行switch*找到具体的数据源,最后执行sql。

延展阅读:

如何解决Qdrant性能瓶颈以提升向量数据库的QPS?

为什么要用Doris替换掉ClickHouse?

如何利用KtConnect简化Kubernetes开发调试?

如何利用Goroutine提升Go语言并发编程效率?

如何有效利用PostgreSQL的PITR技术保护客户数据完整性?

咨询方案 获取更多方案详情                        
(0)
研发专家-wu研发专家-wu
上一篇 2024年7月8日 下午3:02
下一篇 2024年7月22日 下午3:43

相关推荐