之前发现pg的主库压力越来越大,虽然主从同步已经有了,但是一直没有引入读写分类,于是决定加入读写分离来分担主库的压力。接下来就分享下利用gorm实现读写分离。
配置文件
"postgresql": {
"test": "host=10.0.0.11 dbname=xxx port=5432 user=pguser password=xxx sslmode=disable",
"test_slave":"host=10.0.0.11 dbname=xxx port=5432 user=pguser password=xxx sslmode=disable"
},
主要代码
funcdbResolverDemo() {
db, _ := gorm.Open(postgres.Open("test"), &gorm.Config{Logger: sqlLog})
dbResolverCfg := dbresolver.Config{
Sources: []gorm.Dialector{postgres.Open("test")},
Replicas: []gorm.Dialector{postgres.Open("test"), postgres.Open("test_slave")},
Policy: dbresolver.RandomPolicy{},
}
readWritePlugin := dbresolver.Register(dbResolverCfg)
db.Use(readWritePlugin)
}
以上步骤已经实现了简单的读写分离。写入时只会写入到主库(test)。读取时会随机从主(test),从(test_slave)中读取数据。
DBResolver
gorm支持外挂插件,插件需要实现一个重要的方法Initialize;当执行db.Use(readWritePlugin)时会执行Initialize方法。DBResolver在Initialize中执行了dr.registerCallbacks(db)、dr.compile()。registerCallbacks的目的是在gorm注册sql路由器(包括switchSource,switchReplica,switchGuess)。这些路由器会根据读写策略(policy)将sql路由到pg实列。compile目的是根据配置初始化连接。
registerCallbacks
func (dr *DBResolver) registerCallbacks(db *gorm.DB) {
dr.Callback().Create().Before("*").Register("gorm:db_resolver", dr.switchSource)
dr.Callback().Query().Before("*").Register("gorm:db_resolver", dr.switchReplica)
dr.Callback().Update().Before("*").Register("gorm:db_resolver", dr.switchSource)
dr.Callback().Delete().Before("*").Register("gorm:db_resolver", dr.switchSource)
dr.Callback().Row().Before("*").Register("gorm:db_resolver", dr.switchReplica)
dr.Callback().Raw().Before("*").Register("gorm:db_resolver", dr.switchGuess)
}
func (dr *DBResolver) switchSource(db *gorm.DB) {
if !isTransaction(db.Statement.ConnPool) {
db.Statement.ConnPool = dr.resolve(db.Statement, Write)
}
}
func (dr *DBResolver) switchReplica(db *gorm.DB) {
if !isTransaction(db.Statement.ConnPool) {
if rawSQL := db.Statement.SQL.String(); len(rawSQL) > 0 {
dr.switchGuess(db)
} else {
_, locking := db.Statement.Clauses["FOR"]
if _, ok := db.Statement.Settings.Load(writeName); ok || locking {
db.Statement.ConnPool = dr.resolve(db.Statement, Write)
} else {
db.Statement.ConnPool = dr.resolve(db.Statement, Read)
}
}
}
}
func (dr *DBResolver) switchGuess(db *gorm.DB) {
if !isTransaction(db.Statement.ConnPool) {
if _, ok := db.Statement.Settings.Load(writeName); ok {
db.Statement.ConnPool = dr.resolve(db.Statement, Write)
} else if _, ok := db.Statement.Settings.Load(readName); ok {
db.Statement.ConnPool = dr.resolve(db.Statement, Read)
} else if rawSQL := strings.TrimSpace(db.Statement.SQL.String()); len(rawSQL) > 10 && strings.EqualFold(rawSQL[:6], "select") && !strings.EqualFold(rawSQL[len(rawSQL)-10:], "for update") {
db.Statement.ConnPool = dr.resolve(db.Statement, Read)
} else {
db.Statement.ConnPool = dr.resolve(db.Statement, Write)
}
}
}
func isTransaction(connPool gorm.ConnPool) bool {
_, ok := connPool.(gorm.TxCommitter)
return ok
}
registerCallbacks方法针对Create、Update、Delete方法注册了dr.switchSource;针对Query、Row注册了dr.switchReplica。swith* 方法实现了读写时的路由规则。比如写入时会从resource中选择连接,写入时会从repliaces中选择连接。当有多个repliaces实列时,根据policy来轮询。
compile
func (dr *DBResolver) compile() error {
for _, config := range dr.configs {
if err := dr.compileConfig(config); err != nil {
return err
}
}
return nil
}
func (dr *DBResolver) compileConfig(config Config) (err error) {
var (
connPool = dr.DB.Config.ConnPool
r = resolver{
dbResolver: dr,
policy: config.Policy,
traceResolverMode: config.TraceResolverMode,
}
)
if preparedStmtDB, ok := connPool.(*gorm.PreparedStmtDB); ok {
connPool = preparedStmtDB.ConnPool
}
if len(config.Sources) == 0 {
r.sources = []gorm.ConnPool{connPool}
} else if r.sources, err = dr.convertToConnPool(config.Sources); err != nil {
return err
}
if len(config.Replicas) == 0 {
r.replicas = r.sources
} else if r.replicas, err = dr.convertToConnPool(config.Replicas); err != nil {
return err
}
}
compile方法遍历dr.configs,挨个执行dr.compileConfig(config),它会使用config.Policy创建resolver,resolver中的sources,repliaces则包含了连接池(gorm.ConnPool)。
Resolve
func (r *resolver) resolve(stmt *gorm.Statement, op Operation) (connPool gorm.ConnPool) {
if op == Read {
if len(r.replicas) == 1 {
connPool = r.replicas[0]
} else {
connPool = r.policy.Resolve(r.replicas)
}
if r.traceResolverMode {
markStmtResolverMode(stmt, ResolverModeReplica)
}
} else if len(r.sources) == 1 {
connPool = r.sources[0]
if r.traceResolverMode {
markStmtResolverMode(stmt, ResolverModeSource)
}
} else {
connPool = r.policy.Resolve(r.sources)
if r.traceResolverMode {
markStmtResolverMode(stmt, ResolverModeSource)
}
}
if stmt.DB.PrepareStmt {
if preparedStmt, ok := r.dbResolver.prepareStmtStore[connPool]; ok {
return &gorm.PreparedStmtDB{
ConnPool: connPool,
Mux: preparedStmt.Mux,
Stmts: preparedStmt.Stmts,
}
}
}
return
}
resolver的resolve在Operation为Read的时候,会使用r.replicas,若只有1个replica则直接返回,若有多个则使用r.policy.Resolve(r.replicas)选一个;若Operation为write时,判断sources,若只有一个sources,则直接返回,若有多个source则通过r.policy.Resolve(r.sources)选择。reslover方法会被路由方法调用(switch*)。
Policy
typePolicy interface{
Resolve([]gorm.ConnPool) gorm.ConnPool
}
typeRandomPolicy struct{
}
func(RandomPolicy) Resolve(connPools []gorm.ConnPool) gorm.ConnPool {
returnconnPools[rand.Intn(len(connPools))]
}
policy定义了reslove方法来选取数据源,默认是RandomPolicy。
执行过程
当我们调用Grom的方法时,首先会根据操作类型找到相应的callbacks,callbacks中的processor中注册了处理函数(文中注册的时switch*方法),然后执行switch*找到具体的数据源,最后执行sql。
延展阅读:
如何利用KtConnect简化Kubernetes开发调试?
如何有效利用PostgreSQL的PITR技术保护客户数据完整性?
免费试用 更多热门智能应用