5 Star 36 Fork 19

springrain / dm

Create your Gitee Account
Explore and code with more than 12 million developers,Free private repositories !:)
Sign up
This repository doesn't specify license. Please pay attention to the specific project description and its upstream code dependency when using it.
Clone or Download
zc.go 11.03 KB
Copy Edit Web IDE Raw Blame History
springrain authored 2024-01-30 17:18 . v1.8.14 来自 达梦8.1.3.100
/*
* Copyright (c) 2000-2018, 达梦数据库有限公司.
* All rights reserved.
*/
package dm
import (
"context"
"database/sql/driver"
"reflect"
"sync"
"sync/atomic"
"time"
)
type filter interface {
DmDriverOpen(filterChain *filterChain, d *DmDriver, dsn string) (*DmConnection, error)
DmDriverOpenConnector(filterChain *filterChain, d *DmDriver, dsn string) (*DmConnector, error)
DmConnectorConnect(filterChain *filterChain, c *DmConnector, ctx context.Context) (*DmConnection, error)
DmConnectorDriver(filterChain *filterChain, c *DmConnector) *DmDriver
DmConnectionBegin(filterChain *filterChain, c *DmConnection) (*DmConnection, error)
DmConnectionBeginTx(filterChain *filterChain, c *DmConnection, ctx context.Context, opts driver.TxOptions) (*DmConnection, error)
DmConnectionCommit(filterChain *filterChain, c *DmConnection) error
DmConnectionRollback(filterChain *filterChain, c *DmConnection) error
DmConnectionClose(filterChain *filterChain, c *DmConnection) error
DmConnectionPing(filterChain *filterChain, c *DmConnection, ctx context.Context) error
DmConnectionExec(filterChain *filterChain, c *DmConnection, query string, args []driver.Value) (*DmResult, error)
DmConnectionExecContext(filterChain *filterChain, c *DmConnection, ctx context.Context, query string, args []driver.NamedValue) (*DmResult, error)
DmConnectionQuery(filterChain *filterChain, c *DmConnection, query string, args []driver.Value) (*DmRows, error)
DmConnectionQueryContext(filterChain *filterChain, c *DmConnection, ctx context.Context, query string, args []driver.NamedValue) (*DmRows, error)
DmConnectionPrepare(filterChain *filterChain, c *DmConnection, query string) (*DmStatement, error)
DmConnectionPrepareContext(filterChain *filterChain, c *DmConnection, ctx context.Context, query string) (*DmStatement, error)
DmConnectionResetSession(filterChain *filterChain, c *DmConnection, ctx context.Context) error
DmConnectionCheckNamedValue(filterChain *filterChain, c *DmConnection, nv *driver.NamedValue) error
DmStatementClose(filterChain *filterChain, s *DmStatement) error
DmStatementNumInput(filterChain *filterChain, s *DmStatement) int
DmStatementExec(filterChain *filterChain, s *DmStatement, args []driver.Value) (*DmResult, error)
DmStatementExecContext(filterChain *filterChain, s *DmStatement, ctx context.Context, args []driver.NamedValue) (*DmResult, error)
DmStatementQuery(filterChain *filterChain, s *DmStatement, args []driver.Value) (*DmRows, error)
DmStatementQueryContext(filterChain *filterChain, s *DmStatement, ctx context.Context, args []driver.NamedValue) (*DmRows, error)
DmStatementCheckNamedValue(filterChain *filterChain, s *DmStatement, nv *driver.NamedValue) error
DmResultLastInsertId(filterChain *filterChain, r *DmResult) (int64, error)
DmResultRowsAffected(filterChain *filterChain, r *DmResult) (int64, error)
DmRowsColumns(filterChain *filterChain, r *DmRows) []string
DmRowsClose(filterChain *filterChain, r *DmRows) error
DmRowsNext(filterChain *filterChain, r *DmRows, dest []driver.Value) error
DmRowsHasNextResultSet(filterChain *filterChain, r *DmRows) bool
DmRowsNextResultSet(filterChain *filterChain, r *DmRows) error
DmRowsColumnTypeScanType(filterChain *filterChain, r *DmRows, index int) reflect.Type
DmRowsColumnTypeDatabaseTypeName(filterChain *filterChain, r *DmRows, index int) string
DmRowsColumnTypeLength(filterChain *filterChain, r *DmRows, index int) (length int64, ok bool)
DmRowsColumnTypeNullable(filterChain *filterChain, r *DmRows, index int) (nullable, ok bool)
DmRowsColumnTypePrecisionScale(filterChain *filterChain, r *DmRows, index int) (precision, scale int64, ok bool)
}
type IDGenerator int64
var dmDriverIDGenerator = new(IDGenerator)
var dmConntorIDGenerator = new(IDGenerator)
var dmConnIDGenerator = new(IDGenerator)
var dmStmtIDGenerator = new(IDGenerator)
var dmResultIDGenerator = new(IDGenerator)
var dmRowsIDGenerator = new(IDGenerator)
func (g *IDGenerator) incrementAndGet() int64 {
return atomic.AddInt64((*int64)(g), 1)
}
type RWSiteEnum int
const (
PRIMARY RWSiteEnum = iota
STANDBY
ANYSITE
)
var (
goMapMu sync.RWMutex
goMap = make(map[string]goRun, 2)
)
type filterable struct {
filterChain *filterChain
rwInfo *rwInfo
logInfo *logInfo
recoverInfo *recoverInfo
statInfo *statInfo
objId int64
idGenerator *IDGenerator
}
func runLog() {
goMapMu.Lock()
_, ok := goMap["log"]
if !ok {
goMap["log"] = &logWriter{
flushQueue: make(chan []byte, LogFlushQueueSize),
date: time.Now().Format("2006-01-02"),
logFile: nil,
flushFreq: LogFlushFreq,
filePath: LogDir,
filePrefix: "dm_go",
buffer: Dm_build_935(),
}
go goMap["log"].doRun()
}
goMapMu.Unlock()
}
func runStat() {
goMapMu.Lock()
_, ok := goMap["stat"]
if !ok {
goMap["stat"] = newStatFlusher()
go goMap["stat"].doRun()
}
goMapMu.Unlock()
}
func (f *filterable) createFilterChain(bc *DmConnector, props *Properties) {
var filters = make([]filter, 0, 5)
if bc != nil {
if LogLevel != LOG_OFF {
filters = append(filters, &logFilter{})
f.logInfo = &logInfo{logRecord: new(LogRecord)}
runLog()
}
if StatEnable {
filters = append(filters, &statFilter{})
f.statInfo = newStatInfo()
goStatMu.Lock()
if goStat == nil {
goStat = newGoStat(1000)
}
goStatMu.Unlock()
runStat()
}
if bc.doSwitch != DO_SWITCH_OFF {
filters = append(filters, &reconnectFilter{})
f.recoverInfo = newRecoverInfo()
}
if bc.rwSeparate {
filters = append(filters, &rwFilter{})
f.rwInfo = newRwInfo()
}
} else if props != nil {
if ParseLogLevel(props) != LOG_OFF {
filters = append(filters, &logFilter{})
f.logInfo = &logInfo{logRecord: new(LogRecord)}
runLog()
}
if props.GetBool("statEnable", StatEnable) {
filters = append(filters, &statFilter{})
f.statInfo = newStatInfo()
goStatMu.Lock()
if goStat == nil {
goStat = newGoStat(1000)
}
goStatMu.Unlock()
runStat()
}
if props.GetInt(DoSwitchKey, int(DO_SWITCH_OFF), 0, 2) != int(DO_SWITCH_OFF) {
filters = append(filters, &reconnectFilter{})
f.recoverInfo = newRecoverInfo()
}
if props.GetBool("rwSeparate", false) {
filters = append(filters, &rwFilter{})
f.rwInfo = newRwInfo()
}
}
f.filterChain = newFilterChain(filters)
}
func (f *filterable) resetFilterable(src *filterable) {
f.filterChain = src.filterChain
f.logInfo = src.logInfo
f.rwInfo = src.rwInfo
f.statInfo = src.statInfo
}
func (f *filterable) getID() int64 {
if f.objId < 0 {
f.objId = f.idGenerator.incrementAndGet()
}
return f.objId
}
type logInfo struct {
logRecord *LogRecord
lastExecuteStartNano time.Time
}
type rwInfo struct {
distribute RWSiteEnum
rwCounter *rwCounter
connStandby *DmConnection
connCurrent *DmConnection
tryRecoverTs int64
stmtStandby *DmStatement
stmtCurrent *DmStatement
readOnly bool
}
func newRwInfo() *rwInfo {
rwInfo := new(rwInfo)
rwInfo.distribute = PRIMARY
rwInfo.readOnly = true
return rwInfo
}
func (rwi *rwInfo) cleanup() {
rwi.distribute = PRIMARY
rwi.rwCounter = nil
rwi.connStandby = nil
rwi.connCurrent = nil
rwi.stmtStandby = nil
rwi.stmtCurrent = nil
}
func (rwi *rwInfo) toPrimary() RWSiteEnum {
if rwi.distribute != PRIMARY {
rwi.rwCounter.countPrimary()
}
rwi.distribute = PRIMARY
return rwi.distribute
}
func (rwi *rwInfo) toAny() RWSiteEnum {
rwi.distribute = rwi.rwCounter.count(ANYSITE, rwi.connStandby)
return rwi.distribute
}
type recoverInfo struct {
checkEpRecoverTs int64
}
func newRecoverInfo() *recoverInfo {
recoverInfo := new(recoverInfo)
recoverInfo.checkEpRecoverTs = 0
return recoverInfo
}
type statInfo struct {
constructNano int64
connStat *connectionStat
lastExecuteStartNano int64
lastExecuteTimeNano int64
lastExecuteType ExecuteTypeEnum
firstResultSet bool
lastExecuteSql string
sqlStat *sqlStat
sql string
cursorIndex int
closeCount int
readStringLength int64
readBytesLength int64
openInputStreamCount int
openReaderCount int
}
var (
goStatMu sync.RWMutex
goStat *GoStat
)
func newStatInfo() *statInfo {
si := new(statInfo)
return si
}
func (si *statInfo) init(conn *DmConnection) {
si.connStat = goStat.createConnStat(conn)
}
func (si *statInfo) setConstructNano() {
si.constructNano = time.Now().UnixNano()
}
func (si *statInfo) getConstructNano() int64 {
return si.constructNano
}
func (si *statInfo) getConnStat() *connectionStat {
return si.connStat
}
func (si *statInfo) getLastExecuteStartNano() int64 {
return si.lastExecuteStartNano
}
func (si *statInfo) setLastExecuteStartNano(lastExecuteStartNano int64) {
si.lastExecuteStartNano = lastExecuteStartNano
}
func (si *statInfo) getLastExecuteTimeNano() int64 {
return si.lastExecuteTimeNano
}
func (si *statInfo) setLastExecuteTimeNano(lastExecuteTimeNano int64) {
si.lastExecuteTimeNano = lastExecuteTimeNano
}
func (si *statInfo) getLastExecuteType() ExecuteTypeEnum {
return si.lastExecuteType
}
func (si *statInfo) setLastExecuteType(lastExecuteType ExecuteTypeEnum) {
si.lastExecuteType = lastExecuteType
}
func (si *statInfo) isFirstResultSet() bool {
return si.firstResultSet
}
func (si *statInfo) setFirstResultSet(firstResultSet bool) {
si.firstResultSet = firstResultSet
}
func (si *statInfo) getLastExecuteSql() string {
return si.lastExecuteSql
}
func (si *statInfo) setLastExecuteSql(lastExecuteSql string) {
si.lastExecuteSql = lastExecuteSql
}
func (si *statInfo) getSqlStat() *sqlStat {
return si.sqlStat
}
func (si *statInfo) setSqlStat(sqlStat *sqlStat) {
si.sqlStat = sqlStat
}
func (si *statInfo) setConnStat(connStat *connectionStat) {
si.connStat = connStat
}
func (si *statInfo) setConstructNanoWithConstructNano(constructNano int64) {
si.constructNano = constructNano
}
func (si *statInfo) afterExecute(nanoSpan int64) {
si.lastExecuteTimeNano = nanoSpan
}
func (si *statInfo) beforeExecute() {
si.lastExecuteStartNano = time.Now().UnixNano()
}
func (si *statInfo) getSql() string {
return si.sql
}
func (si *statInfo) setSql(sql string) {
si.sql = sql
}
func (si *statInfo) getCursorIndex() int {
return si.cursorIndex
}
func (si *statInfo) setCursorIndex(cursorIndex int) {
si.cursorIndex = cursorIndex
}
func (si *statInfo) getCloseCount() int {
return si.closeCount
}
func (si *statInfo) setCloseCount(closeCount int) {
si.closeCount = closeCount
}
func (si *statInfo) getReadStringLength() int64 {
return si.readStringLength
}
func (si *statInfo) setReadStringLength(readStringLength int64) {
si.readStringLength = readStringLength
}
func (si *statInfo) getReadBytesLength() int64 {
return si.readBytesLength
}
func (si *statInfo) setReadBytesLength(readBytesLength int64) {
si.readBytesLength = readBytesLength
}
func (si *statInfo) getOpenInputStreamCount() int {
return si.openInputStreamCount
}
func (si *statInfo) setOpenInputStreamCount(openInputStreamCount int) {
si.openInputStreamCount = openInputStreamCount
}
func (si *statInfo) getOpenReaderCount() int {
return si.openReaderCount
}
func (si *statInfo) setOpenReaderCount(openReaderCount int) {
si.openReaderCount = openReaderCount
}
func (si *statInfo) incrementCloseCount() {
si.closeCount++
}
Go
1
https://gitee.com/chunanyong/dm.git
git@gitee.com:chunanyong/dm.git
chunanyong
dm
dm
master

Search