forked from tsenart/nap
-
Notifications
You must be signed in to change notification settings - Fork 2
/
db.go
224 lines (190 loc) · 7.26 KB
/
db.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
package nap
import (
"context"
"database/sql"
"database/sql/driver"
"errors"
"strings"
"sync/atomic"
"time"
)
// DB is a logical database with multiple underlying physical databases
// forming a single master multiple slaves topology.
// Reads and writes are automatically directed to the correct physical db.
type DB struct {
pdbs []*sql.DB // Physical databases
count uint64 // Monotonically incrementing counter on each query
}
// Wrap wrapping origin *sql.DB connects
// For example, a mocked connect for tests.
func Wrap(db ...*sql.DB) (*DB, error) {
if len(db) == 0 {
return nil, errors.New("nap: no *sql.DB for wrapping")
}
return &DB{pdbs: db}, nil
}
// Open concurrently opens each underlying physical db.
// dataSourceNames must be a semi-comma separated list of DSNs with the first
// one being used as the master and the rest as slaves.
func Open(driverName, dataSourceNames string) (*DB, error) {
conns := strings.Split(dataSourceNames, ";")
db := &DB{pdbs: make([]*sql.DB, len(conns))}
err := scatter(len(db.pdbs), func(i int) (err error) {
db.pdbs[i], err = sql.Open(driverName, conns[i])
return err
})
if err != nil {
return nil, err
}
return db, nil
}
// Close closes all physical databases concurrently, releasing any open resources.
func (db *DB) Close() error {
return scatter(len(db.pdbs), func(i int) error {
return db.pdbs[i].Close()
})
}
// Driver returns the physical database's underlying driver.
func (db *DB) Driver() driver.Driver {
return db.Master().Driver()
}
// Begin starts a transaction on the master. The isolation level is dependent on the driver.
func (db *DB) Begin() (*sql.Tx, error) {
return db.Master().Begin()
}
// BeginTx starts a transaction.
// The provided context is used until the transaction is committed or rolled back.
// If the context is canceled, the sql package will roll back the transaction.
// Tx.Commit will return an error if the context provided to BeginTx is canceled.
// The provided TxOptions is optional and may be nil if defaults should be used.
// If a non-default isolation level is used that the driver doesn't support, an error will be returned.
func (db *DB) BeginTx(ctx context.Context, opts *sql.TxOptions) (*sql.Tx, error) {
return db.Master().BeginTx(ctx, opts)
}
// Exec executes a query without returning any rows.
// The args are for any placeholder parameters in the query.
// Exec uses the master as the underlying physical db.
func (db *DB) Exec(query string, args ...interface{}) (sql.Result, error) {
return db.Master().Exec(query, args...)
}
// ExecContext executes a query without returning any rows.
// The args are for any placeholder parameters in the query.
// Exec uses the master as the underlying physical db.
func (db *DB) ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error) {
return db.Master().ExecContext(ctx, query, args...)
}
// Ping verifies if a connection to each physical database is still alive,
// establishing a connection if necessary.
func (db *DB) Ping() error {
return scatter(len(db.pdbs), func(i int) error {
return db.pdbs[i].Ping()
})
}
// PingContext verifies if a connection to each physical database is still
// alive, establishing a connection if necessary.
func (db *DB) PingContext(ctx context.Context) error {
return scatter(len(db.pdbs), func(i int) error {
return db.pdbs[i].PingContext(ctx)
})
}
// Prepare creates a prepared statement for later queries or executions
// on each physical database, concurrently.
func (db *DB) Prepare(query string) (*Stmt, error) {
stmts := make([]*sql.Stmt, len(db.pdbs))
err := scatter(len(db.pdbs), func(i int) (err error) {
stmts[i], err = db.pdbs[i].Prepare(query)
return err
})
if err != nil {
return nil, err
}
return &Stmt{db: db, stmts: stmts}, nil
}
// PrepareContext creates a prepared statement for later queries or executions
// on each physical database, concurrently.
// The provided context is used for the preparation of the statement, not for
// the execution of the statement.
func (db *DB) PrepareContext(ctx context.Context, query string) (*Stmt, error) {
stmts := make([]*sql.Stmt, len(db.pdbs))
err := scatter(len(db.pdbs), func(i int) (err error) {
stmts[i], err = db.pdbs[i].PrepareContext(ctx, query)
return err
})
if err != nil {
return nil, err
}
return &Stmt{db: db, stmts: stmts}, nil
}
// Query executes a query that returns rows, typically a SELECT.
// The args are for any placeholder parameters in the query.
// Query uses a slave as the physical db.
func (db *DB) Query(query string, args ...interface{}) (*sql.Rows, error) {
return db.Slave().Query(query, args...)
}
// QueryContext executes a query that returns rows, typically a SELECT.
// The args are for any placeholder parameters in the query.
// QueryContext uses a slave as the physical db.
func (db *DB) QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error) {
return db.Slave().QueryContext(ctx, query, args...)
}
// QueryRow executes a query that is expected to return at most one row.
// QueryRow always return a non-nil value.
// Errors are deferred until Row's Scan method is called.
// QueryRow uses a slave as the physical db.
func (db *DB) QueryRow(query string, args ...interface{}) *sql.Row {
return db.Slave().QueryRow(query, args...)
}
// QueryRowContext executes a query that is expected to return at most one row.
// QueryRowContext always return a non-nil value.
// Errors are deferred until Row's Scan method is called.
// QueryRowContext uses a slave as the physical db.
func (db *DB) QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row {
return db.Slave().QueryRowContext(ctx, query, args...)
}
// SetMaxIdleConns sets the maximum number of connections in the idle
// connection pool for each underlying physical db.
// If MaxOpenConns is greater than 0 but less than the new MaxIdleConns then the
// new MaxIdleConns will be reduced to match the MaxOpenConns limit
// If n <= 0, no idle connections are retained.
func (db *DB) SetMaxIdleConns(n int) {
for i := range db.pdbs {
db.pdbs[i].SetMaxIdleConns(n)
}
}
// SetMaxOpenConns sets the maximum number of open connections
// to each physical database.
// If MaxIdleConns is greater than 0 and the new MaxOpenConns
// is less than MaxIdleConns, then MaxIdleConns will be reduced to match
// the new MaxOpenConns limit. If n <= 0, then there is no limit on the number
// of open connections. The default is 0 (unlimited).
func (db *DB) SetMaxOpenConns(n int) {
for i := range db.pdbs {
db.pdbs[i].SetMaxOpenConns(n)
}
}
// SetConnMaxLifetime sets the maximum amount of time a connection may be reused.
// Expired connections may be closed lazily before reuse.
// If d <= 0, connections are reused forever.
func (db *DB) SetConnMaxLifetime(d time.Duration) {
for i := range db.pdbs {
db.pdbs[i].SetConnMaxLifetime(d)
}
}
// Master returns the master physical database
func (db *DB) Master() *sql.DB {
return db.pdbs[0]
}
// Slave returns one of the physical databases which is a slave
func (db *DB) Slave() *sql.DB {
return db.pdbs[db.slave(len(db.pdbs))]
}
func (db *DB) slave(n int) int {
if n <= 1 {
return 0
}
return int(1 + (atomic.AddUint64(&db.count, 1) % uint64(n-1)))
}
// Databases returns return all physical databases
func (db *DB) Databases() []*sql.DB {
return db.pdbs
}