Files

876 lines
18 KiB
Go

/*
* Copyright (c) 2000-2018, 达梦数据库有限公司.
* All rights reserved.
*/
package dm
import (
"bytes"
"context"
"database/sql"
"database/sql/driver"
"fmt"
"sync"
"sync/atomic"
"gitee.com/chunanyong/dm/parser"
"gitee.com/chunanyong/dm/util"
"golang.org/x/text/encoding"
)
type DmConnection struct {
filterable
mu sync.Mutex
dmConnector *DmConnector
Access *dm_build_1345
stmtMap map[int32]*DmStatement
lastExecInfo *execRetInfo
lexer *parser.Lexer
encode encoding.Encoding
encodeBuffer *bytes.Buffer
transformReaderDst []byte
transformReaderSrc []byte
serverEncoding string
GlobalServerSeries int
ServerVersion string
Malini2 bool
Execute2 bool
LobEmptyCompOrcl bool
IsoLevel int32
ReadOnly bool
NewLobFlag bool
sslEncrypt int
MaxRowSize int32
DDLAutoCommit bool
BackslashEscape bool
SvrStat int32
SvrMode int32
ConstParaOpt bool
DbTimezone int16
LifeTimeRemainder int16
InstanceName string
Schema string
LastLoginIP string
LastLoginTime string
FailedAttempts int32
LoginWarningID int32
GraceTimeRemainder int32
Guid string
DbName string
StandbyHost string
StandbyPort int32
StandbyCount int32
SessionID int64
OracleDateLanguage byte
FormatDate string
FormatTimestamp string
FormatTimestampTZ string
FormatTime string
FormatTimeTZ string
Local bool
MsgVersion int32
TrxStatus int32
dscControl bool
trxFinish bool
autoCommit bool
isBatch bool
watching bool
watcher chan<- context.Context
closech chan struct{}
finished chan<- struct{}
canceled atomicError
closed atomicBool
}
func (conn *DmConnection) setTrxFinish(status int32) {
switch status & Dm_build_132 {
case Dm_build_129, Dm_build_130, Dm_build_131:
conn.trxFinish = true
default:
conn.trxFinish = false
}
}
func (dmConn *DmConnection) init() {
dmConn.stmtMap = make(map[int32]*DmStatement)
dmConn.DbTimezone = 0
dmConn.GlobalServerSeries = 0
dmConn.MaxRowSize = 0
dmConn.LobEmptyCompOrcl = false
dmConn.ReadOnly = false
dmConn.DDLAutoCommit = false
dmConn.ConstParaOpt = false
dmConn.IsoLevel = -1
dmConn.Malini2 = true
dmConn.NewLobFlag = true
dmConn.Execute2 = true
dmConn.serverEncoding = ENCODING_GB18030
dmConn.TrxStatus = Dm_build_80
dmConn.setTrxFinish(dmConn.TrxStatus)
dmConn.OracleDateLanguage = byte(Locale)
dmConn.lastExecInfo = NewExceInfo()
dmConn.MsgVersion = Dm_build_13
dmConn.idGenerator = dmConnIDGenerator
}
func (dmConn *DmConnection) reset() {
dmConn.DbTimezone = 0
dmConn.GlobalServerSeries = 0
dmConn.MaxRowSize = 0
dmConn.LobEmptyCompOrcl = false
dmConn.ReadOnly = false
dmConn.DDLAutoCommit = false
dmConn.ConstParaOpt = false
dmConn.IsoLevel = -1
dmConn.Malini2 = true
dmConn.NewLobFlag = true
dmConn.Execute2 = true
dmConn.serverEncoding = ENCODING_GB18030
dmConn.TrxStatus = Dm_build_80
dmConn.setTrxFinish(dmConn.TrxStatus)
}
func (dc *DmConnection) checkClosed() error {
if dc.closed.IsSet() {
return driver.ErrBadConn
}
return nil
}
func (dc *DmConnection) executeInner(query string, execType int16) (interface{}, error) {
stmt, err := NewDmStmt(dc, query)
if err != nil {
return nil, err
}
if execType == Dm_build_97 {
defer stmt.close()
}
stmt.innerUsed = true
if stmt.dmConn.dmConnector.escapeProcess {
stmt.nativeSql, err = stmt.dmConn.escape(stmt.nativeSql, stmt.dmConn.dmConnector.keyWords)
if err != nil {
stmt.close()
return nil, err
}
}
var optParamList []OptParameter
if stmt.dmConn.ConstParaOpt {
optParamList = make([]OptParameter, 0)
stmt.nativeSql, optParamList, err = stmt.dmConn.execOpt(stmt.nativeSql, optParamList, stmt.dmConn.getServerEncoding())
if err != nil {
stmt.close()
optParamList = nil
}
}
if execType == Dm_build_96 && dc.dmConnector.enRsCache {
rpv, err := rp.get(stmt, query)
if err != nil {
return nil, err
}
if rpv != nil {
stmt.execInfo = rpv.execInfo
dc.lastExecInfo = rpv.execInfo
return newDmRows(rpv.getResultSet(stmt)), nil
}
}
var info *execRetInfo
if optParamList != nil && len(optParamList) > 0 {
info, err = dc.Access.Dm_build_1428(stmt, optParamList)
if err != nil {
stmt.nativeSql = query
info, err = dc.Access.Dm_build_1434(stmt, execType)
}
} else {
info, err = dc.Access.Dm_build_1434(stmt, execType)
}
if err != nil {
stmt.close()
return nil, err
}
dc.lastExecInfo = info
if execType == Dm_build_96 && info.hasResultSet {
return newDmRows(newInnerRows(0, stmt, info)), nil
} else {
return newDmResult(stmt, info), nil
}
}
func g2dbIsoLevel(isoLevel int32) int32 {
switch isoLevel {
case 1:
return Dm_build_84
case 2:
return Dm_build_85
case 4:
return Dm_build_86
case 6:
return Dm_build_87
default:
return -1
}
}
func (dc *DmConnection) Begin() (driver.Tx, error) {
if len(dc.filterChain.filters) == 0 {
return dc.begin()
} else {
return dc.filterChain.reset().DmConnectionBegin(dc)
}
}
func (dc *DmConnection) BeginTx(ctx context.Context, opts driver.TxOptions) (driver.Tx, error) {
if len(dc.filterChain.filters) == 0 {
return dc.beginTx(ctx, opts)
}
return dc.filterChain.reset().DmConnectionBeginTx(dc, ctx, opts)
}
func (dc *DmConnection) Commit() error {
if len(dc.filterChain.filters) == 0 {
return dc.commit()
} else {
return dc.filterChain.reset().DmConnectionCommit(dc)
}
}
func (dc *DmConnection) Rollback() error {
if len(dc.filterChain.filters) == 0 {
return dc.rollback()
} else {
return dc.filterChain.reset().DmConnectionRollback(dc)
}
}
func (dc *DmConnection) Close() error {
if len(dc.filterChain.filters) == 0 {
return dc.close()
} else {
return dc.filterChain.reset().DmConnectionClose(dc)
}
}
func (dc *DmConnection) Ping(ctx context.Context) error {
if len(dc.filterChain.filters) == 0 {
return dc.ping(ctx)
} else {
return dc.filterChain.reset().DmConnectionPing(dc, ctx)
}
}
func (dc *DmConnection) Exec(query string, args []driver.Value) (driver.Result, error) {
if len(dc.filterChain.filters) == 0 {
return dc.exec(query, args)
}
return dc.filterChain.reset().DmConnectionExec(dc, query, args)
}
func (dc *DmConnection) ExecContext(ctx context.Context, query string, args []driver.NamedValue) (driver.Result, error) {
if len(dc.filterChain.filters) == 0 {
return dc.execContext(ctx, query, args)
}
return dc.filterChain.reset().DmConnectionExecContext(dc, ctx, query, args)
}
func (dc *DmConnection) Query(query string, args []driver.Value) (driver.Rows, error) {
if len(dc.filterChain.filters) == 0 {
return dc.query(query, args)
}
return dc.filterChain.reset().DmConnectionQuery(dc, query, args)
}
func (dc *DmConnection) QueryContext(ctx context.Context, query string, args []driver.NamedValue) (driver.Rows, error) {
if len(dc.filterChain.filters) == 0 {
return dc.queryContext(ctx, query, args)
}
return dc.filterChain.reset().DmConnectionQueryContext(dc, ctx, query, args)
}
func (dc *DmConnection) Prepare(query string) (driver.Stmt, error) {
if len(dc.filterChain.filters) == 0 {
return dc.prepare(query)
}
return dc.filterChain.reset().DmConnectionPrepare(dc, query)
}
func (dc *DmConnection) PrepareContext(ctx context.Context, query string) (driver.Stmt, error) {
if len(dc.filterChain.filters) == 0 {
return dc.prepareContext(ctx, query)
}
return dc.filterChain.reset().DmConnectionPrepareContext(dc, ctx, query)
}
func (dc *DmConnection) ResetSession(ctx context.Context) error {
if len(dc.filterChain.filters) == 0 {
return dc.resetSession(ctx)
}
if err := dc.filterChain.reset().DmConnectionResetSession(dc, ctx); err != nil {
return driver.ErrBadConn
} else {
return nil
}
}
func (dc *DmConnection) CheckNamedValue(nv *driver.NamedValue) error {
if len(dc.filterChain.filters) == 0 {
return dc.checkNamedValue(nv)
}
return dc.filterChain.reset().DmConnectionCheckNamedValue(dc, nv)
}
func (dc *DmConnection) begin() (*DmConnection, error) {
return dc.beginTx(context.Background(), driver.TxOptions{driver.IsolationLevel(sql.LevelDefault), false})
}
func (dc *DmConnection) beginTx(ctx context.Context, opts driver.TxOptions) (*DmConnection, error) {
if err := dc.watchCancel(ctx); err != nil {
return nil, err
}
defer dc.finish()
err := dc.checkClosed()
if err != nil {
return nil, err
}
dc.autoCommit = false
if dc.ReadOnly != opts.ReadOnly {
dc.ReadOnly = opts.ReadOnly
var readonly = 0
if opts.ReadOnly {
readonly = 1
}
dc.exec(fmt.Sprintf("SP_SET_SESSION_READONLY(%d)", readonly), nil)
}
if dc.IsoLevel != int32(opts.Isolation) {
switch sql.IsolationLevel(opts.Isolation) {
case sql.LevelDefault:
dc.IsoLevel = int32(sql.LevelReadCommitted)
case sql.LevelReadUncommitted, sql.LevelReadCommitted, sql.LevelSerializable:
dc.IsoLevel = int32(opts.Isolation)
case sql.LevelRepeatableRead:
if dc.CompatibleMysql() {
dc.IsoLevel = int32(sql.LevelReadCommitted)
} else {
return nil, ECGO_INVALID_TRAN_ISOLATION.throw()
}
default:
return nil, ECGO_INVALID_TRAN_ISOLATION.throw()
}
err = dc.Access.Dm_build_1488(dc)
if err != nil {
return nil, err
}
}
return dc, nil
}
func (dc *DmConnection) commit() error {
err := dc.checkClosed()
if err != nil {
return err
}
defer func() {
dc.autoCommit = dc.dmConnector.autoCommit
if dc.ReadOnly {
dc.exec("SP_SET_SESSION_READONLY(0)", nil)
}
}()
if !dc.autoCommit {
err = dc.Access.Commit()
if err != nil {
return err
}
dc.trxFinish = true
return nil
} else if !dc.dmConnector.alwayseAllowCommit {
return ECGO_COMMIT_IN_AUTOCOMMIT_MODE.throw()
}
return nil
}
func (dc *DmConnection) rollback() error {
err := dc.checkClosed()
if err != nil {
return err
}
defer func() {
dc.autoCommit = dc.dmConnector.autoCommit
if dc.ReadOnly {
dc.exec("SP_SET_SESSION_READONLY(0)", nil)
}
}()
if !dc.autoCommit {
err = dc.Access.Rollback()
if err != nil {
return err
}
dc.trxFinish = true
return nil
} else if !dc.dmConnector.alwayseAllowCommit {
return ECGO_ROLLBACK_IN_AUTOCOMMIT_MODE.throw()
}
return nil
}
func (dc *DmConnection) reconnect() error {
err := dc.Access.Close()
if err != nil {
return err
}
for _, stmt := range dc.stmtMap {
for id, rs := range stmt.rsMap {
rs.Close()
delete(stmt.rsMap, id)
}
}
var newConn *DmConnection
if dc.dmConnector.group != nil {
if newConn, err = dc.dmConnector.group.connect(dc.dmConnector); err != nil {
return err
}
} else {
newConn, err = dc.dmConnector.connect(context.Background())
}
oldMap := dc.stmtMap
newConn.mu = dc.mu
newConn.filterable = dc.filterable
*dc = *newConn
for _, stmt := range oldMap {
if stmt.closed {
continue
}
err = dc.Access.Dm_build_1406(stmt)
if err != nil {
stmt.free()
continue
}
if stmt.prepared || stmt.paramCount > 0 {
if err = stmt.prepare(); err != nil {
continue
}
}
dc.stmtMap[stmt.id] = stmt
}
return nil
}
func (dc *DmConnection) cleanup() {
dc.close()
}
func (dc *DmConnection) close() error {
if !dc.closed.TrySet(true) {
return nil
}
util.AbsorbPanic(func() {
close(dc.closech)
})
if dc.Access == nil {
return nil
}
dc.rollback()
for _, stmt := range dc.stmtMap {
stmt.free()
}
dc.Access.Close()
return nil
}
func (dc *DmConnection) ping(ctx context.Context) error {
if err := dc.watchCancel(ctx); err != nil {
return err
}
defer dc.finish()
rows, err := dc.query("select 1", nil)
if err != nil {
return err
}
return rows.close()
}
func (dc *DmConnection) exec(query string, args []driver.Value) (*DmResult, error) {
err := dc.checkClosed()
if err != nil {
return nil, err
}
if args != nil && len(args) > 0 {
stmt, err := dc.prepare(query)
if err != nil {
return nil, err
}
defer stmt.close()
dc.lastExecInfo = stmt.execInfo
return stmt.exec(args)
} else {
r1, err := dc.executeInner(query, Dm_build_97)
if err != nil {
return nil, err
}
if r2, ok := r1.(*DmResult); ok {
return r2, nil
} else {
return nil, ECGO_NOT_EXEC_SQL.throw()
}
}
}
func (dc *DmConnection) execContext(ctx context.Context, query string, args []driver.NamedValue) (*DmResult, error) {
if err := dc.watchCancel(ctx); err != nil {
return nil, err
}
defer dc.finish()
err := dc.checkClosed()
if err != nil {
return nil, err
}
if args != nil && len(args) > 0 {
stmt, err := dc.prepare(query)
if err != nil {
return nil, err
}
defer stmt.close()
dc.lastExecInfo = stmt.execInfo
dargs, err := namedValueToValue(stmt, args)
if err != nil {
return nil, err
}
return stmt.exec(dargs)
} else {
r1, err := dc.executeInner(query, Dm_build_97)
if err != nil {
return nil, err
}
if r2, ok := r1.(*DmResult); ok {
return r2, nil
} else {
return nil, ECGO_NOT_EXEC_SQL.throw()
}
}
}
func (dc *DmConnection) query(query string, args []driver.Value) (*DmRows, error) {
err := dc.checkClosed()
if err != nil {
return nil, err
}
if args != nil && len(args) > 0 {
stmt, err := dc.prepare(query)
if err != nil {
return nil, err
}
dc.lastExecInfo = stmt.execInfo
stmt.innerUsed = true
return stmt.query(args)
} else {
r1, err := dc.executeInner(query, Dm_build_96)
if err != nil {
return nil, err
}
if r2, ok := r1.(*DmRows); ok {
return r2, nil
} else {
return nil, ECGO_NOT_QUERY_SQL.throw()
}
}
}
func (dc *DmConnection) queryContext(ctx context.Context, query string, args []driver.NamedValue) (*DmRows, error) {
if err := dc.watchCancel(ctx); err != nil {
return nil, err
}
defer dc.finish()
err := dc.checkClosed()
if err != nil {
return nil, err
}
if args != nil && len(args) > 0 {
stmt, err := dc.prepare(query)
if err != nil {
return nil, err
}
dc.lastExecInfo = stmt.execInfo
stmt.innerUsed = true
dargs, err := namedValueToValue(stmt, args)
if err != nil {
return nil, err
}
return stmt.query(dargs)
} else {
r1, err := dc.executeInner(query, Dm_build_96)
if err != nil {
return nil, err
}
if r2, ok := r1.(*DmRows); ok {
return r2, nil
} else {
return nil, ECGO_NOT_QUERY_SQL.throw()
}
}
}
func (dc *DmConnection) prepare(query string) (stmt *DmStatement, err error) {
if err = dc.checkClosed(); err != nil {
return
}
if stmt, err = NewDmStmt(dc, query); err != nil {
return
}
if err = stmt.prepare(); err != nil {
stmt.close()
stmt = nil
return
}
return
}
func (dc *DmConnection) prepareContext(ctx context.Context, query string) (*DmStatement, error) {
if err := dc.watchCancel(ctx); err != nil {
return nil, err
}
defer dc.finish()
return dc.prepare(query)
}
func (dc *DmConnection) resetSession(ctx context.Context) error {
if err := dc.watchCancel(ctx); err != nil {
return err
}
defer dc.finish()
err := dc.checkClosed()
if err != nil {
return err
}
return nil
}
func (dc *DmConnection) checkNamedValue(nv *driver.NamedValue) error {
var err error
var cvt = converter{dc, false}
nv.Value, err = cvt.ConvertValue(nv.Value)
dc.isBatch = cvt.isBatch
return err
}
func (dc *DmConnection) driverQuery(query string) (*DmStatement, *DmRows, error) {
stmt, err := NewDmStmt(dc, query)
if err != nil {
return nil, nil, err
}
stmt.innerUsed = true
stmt.innerExec = true
info, err := dc.Access.Dm_build_1434(stmt, Dm_build_96)
if err != nil {
return nil, nil, err
}
dc.lastExecInfo = info
stmt.innerExec = false
return stmt, newDmRows(newInnerRows(0, stmt, info)), nil
}
func (dc *DmConnection) getIndexOnEPGroup() int32 {
if dc.dmConnector.group == nil || dc.dmConnector.group.epList == nil {
return -1
}
for i := 0; i < len(dc.dmConnector.group.epList); i++ {
ep := dc.dmConnector.group.epList[i]
if dc.dmConnector.host == ep.host && dc.dmConnector.port == ep.port {
return int32(i)
}
}
return -1
}
func (dc *DmConnection) getServerEncoding() string {
if dc.dmConnector.charCode != "" {
return dc.dmConnector.charCode
}
return dc.serverEncoding
}
func (dc *DmConnection) lobFetchAll() bool {
return dc.dmConnector.lobMode == 2
}
func (conn *DmConnection) CompatibleOracle() bool {
return conn.dmConnector.compatibleMode == COMPATIBLE_MODE_ORACLE
}
func (conn *DmConnection) CompatibleMysql() bool {
return conn.dmConnector.compatibleMode == COMPATIBLE_MODE_MYSQL
}
func (conn *DmConnection) cancel(err error) {
conn.canceled.Set(err)
conn.close()
}
func (conn *DmConnection) finish() {
if !conn.watching || conn.finished == nil {
return
}
select {
case conn.finished <- struct{}{}:
conn.watching = false
case <-conn.closech:
}
}
func (conn *DmConnection) startWatcher() {
watcher := make(chan context.Context, 1)
conn.watcher = watcher
finished := make(chan struct{})
conn.finished = finished
go func() {
for {
var ctx context.Context
select {
case ctx = <-watcher:
case <-conn.closech:
return
}
select {
case <-ctx.Done():
conn.cancel(ctx.Err())
case <-finished:
case <-conn.closech:
return
}
}
}()
}
func (conn *DmConnection) watchCancel(ctx context.Context) error {
if conn.watching {
conn.cleanup()
return nil
}
if err := ctx.Err(); err != nil {
return err
}
if ctx.Done() == nil {
return nil
}
if conn.watcher == nil {
return nil
}
conn.watching = true
conn.watcher <- ctx
return nil
}
type noCopy struct{}
func (*noCopy) Lock() {}
type atomicBool struct {
_noCopy noCopy
value uint32
}
func (ab *atomicBool) IsSet() bool {
return atomic.LoadUint32(&ab.value) > 0
}
func (ab *atomicBool) Set(value bool) {
if value {
atomic.StoreUint32(&ab.value, 1)
} else {
atomic.StoreUint32(&ab.value, 0)
}
}
func (ab *atomicBool) TrySet(value bool) bool {
if value {
return atomic.SwapUint32(&ab.value, 1) == 0
}
return atomic.SwapUint32(&ab.value, 0) > 0
}
type atomicError struct {
_noCopy noCopy
value atomic.Value
}
func (ae *atomicError) Set(value error) {
ae.value.Store(value)
}
func (ae *atomicError) Value() error {
if v := ae.value.Load(); v != nil {
return v.(error)
}
return nil
}