Skip to content

Commit

Permalink
268 fix connection pool (#269)
Browse files Browse the repository at this point in the history
* fix postgres connection pool #268

* add postgres exec

* fix postgres v1

* fix

* mariadb connection pool v2

* mariadb connection pool v1

* musql connection pool v1

* mysql connection pool v2

* sqlite connection pool v2

* sqlite connection pool v1

* surreal connection pool v1

* surreal connection pool v2

* fix warning

* fix benchmark example

* fix warning

* fix mysql test

* fix test
  • Loading branch information
itsumura-h authored Dec 20, 2023
1 parent bf44336 commit 4a16040
Show file tree
Hide file tree
Showing 95 changed files with 7,589 additions and 7,471 deletions.
39 changes: 21 additions & 18 deletions example/benchmark.nim
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ import ../src/allographer/query_builder

randomize()
let rdb = dbOpen(PostgreSQL, "database", "user", "pass", "postgres", 5432, 95, 30, shouldDisplayLog=false)
# let rdb = dbOpen(MariaDB, "database", "user", "pass", "mariadb", 3306, 95, 30, shouldDisplayLog=false)
# let rdb = dbOpen(SQLite3, "db.sqlite3", 95, 30, shouldDisplayLog=false)
# let rdb = dbOpen(SurrealDB, "test", "test", "user", "pass", "http://surreal", 8000, 500, 30, shouldDisplayLog=false).waitFor()
let stdRdb = open("postgres:5432", "user", "pass", "database")
const range1_10000 = 1..10000

Expand All @@ -27,23 +30,25 @@ proc migrate() {.async.} =
rdb.create(
table("World", [
Column.increments("id"),
# Column.increments("index"), # for surreal
Column.integer("randomNumber").default(0)
]),
table("Fortune", [
Column.increments("id"),
# Column.increments("index"), # for surreal
Column.string("message")
])
)

seeder rdb, "World":
seeder(rdb, "World"):
var data = newSeq[JsonNode]()
for i in 1..10000:
for i in range1_10000:
data.add(
%*{"randomNumber": rand(1..10000)}
%*{"randomNumber": rand(range1_10000)}
)
await rdb.table("World").insert(data)

seeder rdb, "Fortune":
seeder(rdb, "Fortune"):
data = @[
%*{"id": 1, "message": "fortune: No such file or directory"},
%*{"id": 2, "message": "A computer scientist is someone who fixes things that aren''t broken."},
Expand All @@ -65,12 +70,13 @@ proc migrate() {.async.} =
let getFirstPrepare = stdRdb.prepare("getFirst", sql""" SELECT * FROM "World" WHERE id = $1 LIMIT 1 """, 1)
let updatePrepare = stdRdb.prepare("updatePrepare", sql""" UPDATE "World" SET "randomNumber" = $1 WHERE id = $2 """, 2)

const countNum = 500

proc query():Future[seq[JsonNode]] {.async.} =
const countNum = 500
var futures = newSeq[Future[seq[string]]](countNum)
for i in 1..countNum:
let n = rand(range1_10000)
futures[i-1] = rdb.table("World").findPlain(n)
futures[i-1] = rdb.select().table("World").findPlain(n)
let resp = all(futures).await
let response = resp.map(
proc(x:seq[string]):JsonNode =
Expand All @@ -80,7 +86,6 @@ proc query():Future[seq[JsonNode]] {.async.} =
return response

proc queryRaw():Future[seq[JsonNode]] {.async.} =
const countNum = 500
var futures = newSeq[Future[seq[string]]](countNum)
for i in 1..countNum:
let n = rand(range1_10000)
Expand All @@ -95,7 +100,6 @@ proc queryRaw():Future[seq[JsonNode]] {.async.} =


proc queryStd():Future[seq[JsonNode]] {.async.} =
const countNum = 500
var resp:seq[Row]
for i in 1..countNum:
resp.add(stdRdb.getRow(getFirstPrepare, i))
Expand All @@ -107,23 +111,25 @@ proc queryStd():Future[seq[JsonNode]] {.async.} =


proc update():Future[seq[JsonNode]] {.async.} =
const countNum = 500
var response = newSeq[JsonNode](countNum)
var futures = newSeq[Future[void]](countNum)
for i in 1..countNum:
let index = rand(range1_10000)
let number = rand(range1_10000)
futures[i-1] = (proc():Future[void] =
discard rdb.select("id", "randomNumber").table("World").findPlain(index)
rdb.table("World").where("id", "=", index).update(%*{"randomNumber": number})
futures[i-1] = (proc():Future[void] {.async.} =
discard rdb.select("id", "randomNumber").table("World").findPlain(index).await
rdb.table("World").where("id", "=", index).update(%*{"randomNumber": number}).await

# for surreal
# discard rdb.select("id", "randomNumber").table("World").where("index", "=", index).first().await
# rdb.table("World").where("index", "=", index).update(%*{"randomNumber": number}).await
)()
response[i-1] = %*{"id":index, "randomNumber": number}
await all(futures)
return response


proc updateRaw():Future[seq[JsonNode]] {.async.} =
const countNum = 500
var response = newSeq[JsonNode](countNum)
var futures = newSeq[Future[void]](countNum)
for i in 1..countNum:
Expand All @@ -139,7 +145,6 @@ proc updateRaw():Future[seq[JsonNode]] {.async.} =


proc updateRawStd():Future[seq[JsonNode]] {.async.} =
const countNum = 500
var response = newSeq[JsonNode](countNum)
var futures = newSeq[Future[void]](countNum)
for i in 1..countNum:
Expand All @@ -162,10 +167,9 @@ proc timeProcess[T](name:string, cb:proc():Future[T]) {.async.}=
var resultStr = ""

for i in 1..times:
sleep(500)
sleep(100)
start = cpuTime()
for _ in 1..20:
discard cb().await
discard cb().await
eachTime = cpuTime() - start
sumTime += eachTime
if i > 1: resultStr.add("\n")
Expand All @@ -182,7 +186,6 @@ proc main() =
migrate().waitFor

timeProcess("query", query).waitFor
# timeProcess("queryTime", queryTime).waitFor
timeProcess("queryRaw", queryRaw).waitFor
timeProcess("queryStd", queryStd).waitFor
timeProcess("update", update).waitFor
Expand Down
20 changes: 10 additions & 10 deletions src/allographer/query_builder.nim
Original file line number Diff line number Diff line change
Expand Up @@ -6,61 +6,61 @@ when NimMajor == 2:

when isExistsSqlite:
import ./v2/query_builder/models/sqlite/sqlite_types; export sqlite_types
import ./v2/query_builder/models/sqlite/sqlite_connections; export sqlite_connections
import ./v2/query_builder/models/sqlite/sqlite_query; export sqlite_query
import ./v2/query_builder/models/sqlite/sqlite_exec; export sqlite_exec
import ./v2/query_builder/models/sqlite/sqlite_transaction; export sqlite_transaction

when isExistsPostgres:
import ./v2/query_builder/models/postgres/postgres_types; export postgres_types
import ./v2/query_builder/models/postgres/postgres_connections; export postgres_connections
import ./v2/query_builder/models/postgres/postgres_query; export postgres_query
import ./v2/query_builder/models/postgres/postgres_exec; export postgres_exec
import ./v2/query_builder/models/postgres/poatgres_transaction; export poatgres_transaction

when isExistsMariadb:
import ./v2/query_builder/models/mariadb/mariadb_types; export mariadb_types
import ./v2/query_builder/models/mariadb/mariadb_connections; export mariadb_connections
import ./v2/query_builder/models/mariadb/mariadb_query; export mariadb_query
import ./v2/query_builder/models/mariadb/mariadb_exec; export mariadb_exec
import ./v2/query_builder/models/mariadb/mariadb_transaction; export mariadb_transaction

when isExistsMysql:
import ./v2/query_builder/models/mysql/mysql_types; export mysql_types
import ./v2/query_builder/models/mysql/mysql_connections; export mysql_connections
import ./v2/query_builder/models/mysql/mysql_query; export mysql_query
import ./v2/query_builder/models/mysql/mysql_exec; export mysql_exec
import ./v2/query_builder/models/mysql/mysql_transaction; export mysql_transaction

when isExistsSurrealdb:
import ./v2/query_builder/models/surreal/surreal_types; export surreal_types
import ./v2/query_builder/models/surreal/surreal_connections; export surreal_connections
import ./v2/query_builder/models/surreal/surreal_query; export surreal_query
import ./v2/query_builder/models/surreal/surreal_exec; export surreal_exec
elif NimMajor == 1:
import ./v1/query_builder/enums; export enums
import ./v1/query_builder/error; export error

when isExistsSqlite:
import ./v1/query_builder/models/sqlite/sqlite_types; export sqlite_types
import ./v1/query_builder/models/sqlite/sqlite_connections; export sqlite_connections
import ./v1/query_builder/models/sqlite/sqlite_query; export sqlite_query
import ./v1/query_builder/models/sqlite/sqlite_exec; export sqlite_exec
import ./v1/query_builder/models/sqlite/sqlite_transaction; export sqlite_transaction

when isExistsPostgres:
import ./v1/query_builder/models/postgres/postgres_types; export postgres_types
import ./v1/query_builder/models/postgres/postgres_connections; export postgres_connections
import ./v1/query_builder/models/postgres/postgres_query; export postgres_query
import ./v1/query_builder/models/postgres/postgres_exec; export postgres_exec
import ./v1/query_builder/models/postgres/poatgres_transaction; export poatgres_transaction

when isExistsMariadb:
import ./v1/query_builder/models/mariadb/mariadb_types; export mariadb_types
import ./v1/query_builder/models/mariadb/mariadb_connections; export mariadb_connections
import ./v1/query_builder/models/mariadb/mariadb_query; export mariadb_query
import ./v1/query_builder/models/mariadb/mariadb_exec; export mariadb_exec
import ./v1/query_builder/models/mariadb/mariadb_transaction; export mariadb_transaction

when isExistsMysql:
import ./v1/query_builder/models/mysql/mysql_types; export mysql_types
import ./v1/query_builder/models/mysql/mysql_connections; export mysql_connections
import ./v1/query_builder/models/mysql/mysql_query; export mysql_query
import ./v1/query_builder/models/mysql/mysql_exec; export mysql_exec
import ./v1/query_builder/models/mysql/mysql_transaction; export mysql_transaction

when isExistsSurrealdb:
import ./v1/query_builder/models/surreal/surreal_types; export surreal_types
import ./v1/query_builder/models/surreal/surreal_connections; export surreal_connections
import ./v1/query_builder/models/surreal/surreal_query; export surreal_query
import ./v1/query_builder/models/surreal/surreal_exec; export surreal_exec
9 changes: 9 additions & 0 deletions src/allographer/v1/query_builder/libs/sqlite/sqlite_impl.nim
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import ./sqlite_lib

proc query*(db:PSqlite3, query:string, args:seq[string], timeout:int):Future[(seq[Row], DbRows)] {.async.} =
assert(not db.isNil, "Database not connected.")
sleepAsync(0).await
var dbRows: DbRows
var rows = newSeq[seq[string]]()
for row in db.instantRows(dbRows, query, args):
Expand All @@ -20,6 +21,7 @@ proc query*(db:PSqlite3, query:string, args:seq[string], timeout:int):Future[(se

proc queryPlain*(db:PSqlite3, query:string, args:seq[string], timeout:int):Future[seq[Row]] {.async.} =
assert(not db.isNil, "Database not connected.")
sleepAsync(0).await
var rows = newSeq[seq[string]]()
for row in db.instantRowsPlain(query, args):
var columns = newSeq[string](row.len)
Expand All @@ -30,6 +32,7 @@ proc queryPlain*(db:PSqlite3, query:string, args:seq[string], timeout:int):Futur


proc getColumnTypes*(db:PSqlite3, query: string):Future[seq[(string, string)]] {.async.} =
sleepAsync(0).await
var dbRows: DbRows
var columns = newSeq[(string, string)]()
for row in db.instantRows(dbRows, query, newSeq[string]()):
Expand All @@ -40,6 +43,7 @@ proc getColumnTypes*(db:PSqlite3, query: string):Future[seq[(string, string)]] {
proc exec*(db:PSqlite3, query: string, args: JsonNode, columns:seq[(string, string)], timeout:int) {.async.} =
## args is `JArray`
assert(not db.isNil, "Database not connected.")
sleepAsync(0).await
# var q = dbFormat(query, strArges)
var stmt: PStmt
var res:bool
Expand Down Expand Up @@ -93,6 +97,7 @@ proc exec*(db:PSqlite3, query: string, args: JsonNode, timeout:int) {.async.} =
## used for rdb.raw().exec()
## args are `JArray`
assert(not db.isNil, "Database not connected.")
sleepAsync(0).await
var stmt: PStmt
var res:bool
if prepare_v2(db, query.cstring, query.len.cint, stmt, nil) == SQLITE_OK:
Expand Down Expand Up @@ -132,6 +137,7 @@ proc exec*(db:PSqlite3, query: string, args: JsonNode, timeout:int) {.async.} =
proc exec*(db:PSqlite3, query: string, args: seq[string], timeout:int) {.async.} =
## Not used anymore
assert(not db.isNil, "Database not connected.")
sleepAsync(0).await
var q = dbFormat(query, args)
var stmt: PStmt
var res:bool
Expand All @@ -148,11 +154,13 @@ proc exec*(db:PSqlite3, query: string, args: seq[string], timeout:int) {.async.}

proc getColumns*(db:PSqlite3, query:string, args:seq[string], timeout:int):Future[seq[string]] {.async.} =
assert(not db.isNil, "Database not connected.")
sleepAsync(0).await
var dbRows: DbRows
return db.getColumns(dbRows, query, args)


proc prepare*(db:PSqlite3, query:string, timeout:int):Future[PStmt] {.async.} =
sleepAsync(0).await
if prepare_v2(db, query, query.len.cint, result, nil) != SQLITE_OK:
discard finalize(result)
dbError(db)
Expand All @@ -164,6 +172,7 @@ proc preparedQuery*(db:PSqlite3, args:seq[string] = @[], sqliteStmt:PStmt):Futur
sqliteStmt.bindParam(i+1, row)
# run query
assert(not db.isNil, "Database not connected.")
sleepAsync(0).await
var dbRows: DbRows
var rows = newSeq[seq[string]]()
for row in db.instantRows(dbRows, sqliteStmt):
Expand Down
36 changes: 0 additions & 36 deletions src/allographer/v1/query_builder/libs/surreal/surreal_impl.nim
Original file line number Diff line number Diff line change
Expand Up @@ -7,42 +7,6 @@ import ../../error
import ./surreal_rdb
import ./surreal_lib

type SurrealImpl* = ref object

# proc open*(_:type SurrealImpl, namespace="", database="",user="", password="",
# host="", port:int32 = 0, maxConnections=1, timeout=30):Future[SurrealConnections] {.async.} =
# var pools = newSeq[SurrealConn](maxConnections)
# for i in 0..<maxConnections:
# let client = newAsyncHttpClient()
# var headers = newHttpHeaders(true)
# headers["NS"] = namespace
# headers["DB"] = database
# headers["Accept"] = "application/json"
# headers["Authorization"] = "Basic " & base64.encode(user & ":" & password)
# client.headers = headers

# var url = &"{host}:{port}/status"
# var resp = client.get(url).await
# if(resp.status != $Http200):
# dbError(&"Cannot connect to SurrealDb {host}:{port}")

# url = &"{host}:{port}/sql"
# resp = client.post(url, &"DEFINE NAMESPACE `{namespace}`; USE NS `{namespace}`; DEFINE DATABASE `{database}`").await
# if(resp.status != $Http200):
# dbError(&"Cannot connect to SurrealDb {host}:{port}")

# pools[i] = SurrealConn(
# conn: client,
# host:host,
# port:port,
# isBusy: false,
# createdAt: getTime().toUnix(),
# )
# return SurrealConnections(
# pools: pools,
# timeout: timeout
# )


proc query*(db:SurrealConn, query: string, args: seq[string], timeout:int):Future[JsonNode] {.async.} =
## return JArray
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,35 +15,33 @@ proc select*(self:MariadbConnections, columnsArg:varargs[string]):MariadbQuery =
else:
query["select"] = %columnsArg

let MariadbQuery = MariadbQuery(
let mariadbQuery = MariadbQuery(
log: self.log,
pools: self.pools,
timeout: self.timeout,
info: self.info,
query: query,
queryString: "",
placeHolder: newJArray(),
isInTransaction: self.isInTransaction,
transactionConn: self.transactionConn,
)
return MariadbQuery
return mariadbQuery


proc table*(self:MariadbConnections, tableArg: string): MariadbQuery =
let query = newJObject()
query["table"] = %tableArg

let MariadbQuery = MariadbQuery(
let mariadbQuery = MariadbQuery(
log: self.log,
pools: self.pools,
timeout: self.timeout,
query: query,
queryString: "",
placeHolder: newJArray(),
isInTransaction: self.isInTransaction,
transactionConn: self.transactionConn,
)
return MariadbQuery
return mariadbQuery


proc raw*(self:MariadbConnections, sql:string, arges=newJArray()): RawMariadbQuery =
Expand All @@ -53,7 +51,6 @@ proc raw*(self:MariadbConnections, sql:string, arges=newJArray()): RawMariadbQue
let rawQueryRdb = RawMariadbQuery(
log: self.log,
pools: self.pools,
timeout: self.timeout,
info: self.info,
query: newJObject(),
queryString: sql.strip(),
Expand Down
Loading

0 comments on commit 4a16040

Please sign in to comment.