Skip to content

Commit

Permalink
dev & fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
buckelieg committed Jul 17, 2024
1 parent c80534f commit 58c5f19
Show file tree
Hide file tree
Showing 7 changed files with 119 additions and 40 deletions.
57 changes: 35 additions & 22 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,10 @@ or use named parameters:
```java
// in java9+
import static java.util.Map.of;
Collection<String> names = db.select("SELECT name FROM TEST WHERE 1=1 AND ID IN (:ID) OR NAME=:name", of(":ID", new Object[]{1, 2}, ":name", "name_5")).execute(rs -> rs.getString("name"))
Collection<String> names = db.select(
"SELECT name FROM TEST WHERE 1=1 AND ID IN (:ID) OR NAME=:name",
of(":ID", new Object[]{1, 2}, ":name", "name_5")
).execute(rs -> rs.getString("name"))
.reduce(
new LinkedList<>(),
(list, name) -> {
Expand All @@ -60,9 +63,8 @@ Collection<String> names = db.select("SELECT name FROM TEST WHERE 1=1 AND ID IN
Parameter names are CASE SENSITIVE! 'Name' and 'name' are considered different parameter names.
<br/> Parameters may be provided with or without leading colon.

###### The N+1 problem resolution
##### The N+1 problem resolution
For the cases when it is needed to process (say - enrich) each mapped row with an additional data the `Select.ForBatch` can be used

```java
Stream<Entity> entities = db.select("SELECT * FROM HUGE_TABLE")
.forBatch(/* map resultSet here to needed type*/)
Expand All @@ -73,19 +75,18 @@ Stream<Entity> entities = db.select("SELECT * FROM HUGE_TABLE")
});
```
For cases where it is needed to issue any additional queries to database use:

```java
Stream<User> users = db.select("SELECT * FROM HUGE_TABLE")
.forBatch(/* map resultSet here to needed type*/)
.size(1000)
.execute((batch, session) -> {
// list of mapped rows (to resulting type) with size not more than 1000
// session maps to currently used connection
Map<Long, UserAttr> attrs = session.select("SELECT * FROM USER_ATTR WHERE id IN (:ids)", entry("ids", batch.stream().map(User::getId).collect(Collectors.toList())))
.execute(/* map to collection of domain objects that represents a user attribute */)
.groupingBy(UserAttr::userId, Function::identity);
batch.forEach(user -> user.addAttrs(attrs.getOrDefault(user.getId, Collections.emptyList())));
});
// suppose the USERS table contains thousands of records
Stream<User> users = db.select("SELECT * FROM USERS")
.forBatch(rs -> new User(rs.getLong("id"), rs.getString("name")))
.size(1000)
.execute((batchOfUsers, session) -> {
Map<Long, UserAttr> attrs = session.select(
"SELECT * FROM USER_ATTR WHERE id IN (:ids)",
entry("ids", batchOfUsers.stream().map(User::getId).collect(Collectors.toList()))
).execute().groupingBy(UserAttr::userId, Function::identity);
batchOfUsers.forEach(user -> user.addAttrs(attrs.getOrDefault(user.getId, Collections.emptyList())));
});
// stream of users objects will consist of updated (enriched) objects
```
Using this to process batches you must keep some things in mind:
Expand All @@ -95,10 +96,24 @@ Using this to process batches you must keep some things in mind:
<li><code>Select.fetchSize</code> and <code>Select.ForBatch.size</code> are not the same but connected</li>
</ul>

##### Metadata processing
For the special cases when only a metadata of the query is needed `Select.forMeta` can be used:
```java
// suppose we want to collect information of which column of the provided query is a primary key
import java.util.HashMap;

Map<String, Boolean> processedMeta = db.select("SELECT * FROM TEST").forMeta(metadata -> {
Map<String, Boolean> map = new HashMap<>();
metadata.forEachColumn(columnIndex -> map.put(metadata.getName(columnIndex), metadata.isPrimaryKey(columnIndex)));
return map;
});
```

### Insert
with question marks:
```java
long res = db.update("INSERT INTO TEST(name) VALUES(?)", "New_Name").execute(); // res is an affected rows count
// res is an affected rows count
long res = db.update("INSERT INTO TEST(name) VALUES(?)", "New_Name").execute();
```
Or with named parameters:
```java
Expand All @@ -107,8 +122,7 @@ long res = db.update("INSERT INTO TEST(name) VALUES(:name)", new SimpleImmutable
import static java.util.Map.entry;
long res = db.update("INSERT INTO TEST(name) VALUES(:name)", entry("name","New_Name")).execute();
```
###### Getting generated keys

##### Getting generated keys
To retrieve possible generated keys provide a mapping function to `execute` method:
```java
Collection<Long> generatedIds = db.update("INSERT INTO TEST(name) VALUES(?)", "New_Name").execute(rs -> rs.getLong(1));
Expand All @@ -128,7 +142,7 @@ long res = db.update("UPDATE TEST SET NAME=:name WHERE NAME=:new_name",
import static java.util.Map.entry;
long res = db.update("UPDATE TEST SET NAME=:name WHERE NAME=:new_name", entry(":name", "new_name_2"), entry(":new_name", "name_2")).execute();
```
###### Batch mode
##### Batch mode
For batch operation use:
```java
long res = db.update("INSERT INTO TEST(name) VALUES(?)", new Object[][]{ {"name1"}, {"name2"} }).batch(2).execute();
Expand Down Expand Up @@ -192,8 +206,7 @@ User latestUser = db.transaction()
.orElse(null)
);
```
###### Nested transactions and deadlocks

##### Nested transactions and deadlocks
Providing connection supplier function with plain connection
<br/>like this: <code>DB db = DB.create(() -> connection));</code>
<br/>or this: &nbsp;&nbsp;<code>DB db = DB.builder().withMaxConnections(1).build(() -> DriverManager.getConnection("vendor-specific-string"));</code>
Expand All @@ -218,7 +231,7 @@ The above will print a current query to provided logger with debug method.
<br/><code>SELECT * FROM TEST WHERE id=7</code>
<br/>Calling <code>print()</code> without arguments will do the same with standard output.

###### Scripts logging
##### Scripts logging
For <code>Script</code> query <code>verbose()</code> method can be used to track current script step execution.
```java
db.script("SELECT * FROM TEST WHERE id=:id;DROP TABLE TEST", new SimpleImmutableEntry<>("id", 5)).verbose().execute();
Expand Down
37 changes: 30 additions & 7 deletions src/main/java/buckelieg/jdbc/DB.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import javax.annotation.concurrent.ThreadSafe;
import java.sql.Connection;
import java.sql.SQLException;
import java.time.Duration;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
Expand Down Expand Up @@ -54,17 +55,21 @@ public final class DB extends Session {

private final boolean terminateConveyorOnClose;

private final boolean terminateConnectionPoolOnClose;

private DB(
Map<String, RSMeta.Column> metaCache,
Supplier<String> txIdProvider,
ConnectionManager connectionManager,
Supplier<ExecutorService> executorServiceSupplier,
boolean terminateConveyorOnClose) {
boolean terminateConveyorOnClose,
boolean terminateConnectionPoolOnClose) {
super(metaCache, connectionManager::getConnection, connectionManager::close, executorServiceSupplier);
this.txIdProvider = txIdProvider;
this.connectionManager = connectionManager;
this.executorServiceProvider = () -> getExecutorService(executorServiceSupplier);
this.terminateConveyorOnClose = terminateConveyorOnClose;
this.terminateConnectionPoolOnClose = terminateConnectionPoolOnClose;
}

private ExecutorService getExecutorService(Supplier<ExecutorService> executorServiceSupplier) {
Expand Down Expand Up @@ -122,10 +127,12 @@ public void close() {
if (null != conveyor && terminateConveyorOnClose) {
Optional.ofNullable(conveyor.get()).ifPresent(ExecutorService::shutdownNow);
}
try {
connectionManager.close();
} catch (SQLException e) {
throw Utils.newSQLRuntimeException(e);
if (terminateConnectionPoolOnClose) {
try {
connectionManager.close();
} catch (SQLException e) {
throw Utils.newSQLRuntimeException(e);
}
}
}

Expand All @@ -143,6 +150,8 @@ public static final class Builder {

private boolean terminateExecutorServiceOnClose = false;

private boolean terminateConnectionPoolOnClose = true;

private ConnectionManager connectionManager;

private Builder() {
Expand Down Expand Up @@ -189,6 +198,19 @@ public Builder withTerminateExecutorServiceOnClose(boolean terminateExecutorServ
return this;
}

/**
* Configures a {@linkplain DB} instance with connection pool termination on close value provided<br/>
* Default is {@code true}
*
* @param terminateConnectionPoolOnClose if {@code true} - then underlying connection pool will be attempted to shut down on {@linkplain DB#close()} method invocation
* @return a {@linkplain Builder} instance
*/
@Nonnull
public Builder withTerminateConnectionPoolOnClose(boolean terminateConnectionPoolOnClose) {
this.terminateConnectionPoolOnClose = terminateConnectionPoolOnClose;
return this;
}

/**
* Configures a {@linkplain DB} instance with connection manager instance provided<br/>
*
Expand Down Expand Up @@ -235,9 +257,10 @@ public DB build(TrySupplier<Connection, SQLException> connectionProvider) {
return new DB(
new ConcurrentHashMap<>(),
txIdProvider,
null == connectionManager ? new DefaultConnectionManager(connectionProvider, maxConnections) : connectionManager,
null == connectionManager ? new DefaultConnectionManager(connectionProvider, maxConnections, Duration.ofSeconds(5)) : connectionManager,
executorServiceSupplier,
terminateExecutorServiceOnClose
terminateExecutorServiceOnClose,
terminateConnectionPoolOnClose
);
}
}
Expand Down
20 changes: 19 additions & 1 deletion src/main/java/buckelieg/jdbc/DefaultConnectionManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@
import javax.annotation.Nullable;
import java.sql.Connection;
import java.sql.SQLException;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

final class DefaultConnectionManager implements ConnectionManager {

Expand All @@ -42,11 +44,16 @@ final class DefaultConnectionManager implements ConnectionManager {

private final AtomicBoolean isShuttingDown = new AtomicBoolean(false);

DefaultConnectionManager(TrySupplier<Connection, SQLException> connectionSupplier, int maxConnections) {
private final AtomicLong activeConnections = new AtomicLong(0);

private final Duration waitOnClose;

DefaultConnectionManager(TrySupplier<Connection, SQLException> connectionSupplier, int maxConnections, Duration waitOnClose) {
this.connectionSupplier = connectionSupplier;
this.maxConnections = maxConnections;
this.pool = new ArrayBlockingQueue<>(maxConnections);
this.obtainedConnections = new CopyOnWriteArrayList<>();
this.waitOnClose = waitOnClose;
}

@Nonnull
Expand All @@ -67,19 +74,29 @@ public Connection getConnection() throws SQLException {
throw new SQLException(e);
}
connection.setAutoCommit(false);
activeConnections.incrementAndGet();
return connection;
}

@Override
public void close(@Nullable Connection connection) throws SQLException {
if (null == connection) return;
connection.setAutoCommit(true);
activeConnections.decrementAndGet();
if (!pool.offer(connection)) throw new SQLException("Connection pool is full");
}

@Override
public void close() throws SQLException {
isShuttingDown.set(true);
if(activeConnections.get() > 0) {
// gracefully closing pool waiting for configured time for existing transactions to complete
try {
Thread.sleep(waitOnClose.toMillis());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
pool.clear();
SQLException exception = null;
for (Connection connection : obtainedConnections) {
Expand All @@ -93,4 +110,5 @@ public void close() throws SQLException {
obtainedConnections.clear();
if (null != exception) throw exception;
}

}
24 changes: 20 additions & 4 deletions src/main/java/buckelieg/jdbc/JDBCDefaults.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,10 @@ static TryFunction<ValueReader, Map<String, Object>, SQLException> defaultMapper

static Map<String, Object> defaultMapper(ValueReader reader) throws SQLException {
Metadata meta = reader.meta();
Map<String, Object> result = new HashMap<>(meta.count());
reader.meta().forEachColumn(index -> result.put(meta.getName(index), reader(meta.getSQLType(index)).apply(reader, index)));
Map<String, Object> result = new HashMap<>(meta.columnCount());
for(int index = 1; index <= meta.columnCount(); index++) {
result.put(meta.getName(index), reader(meta.getSQLType(index)).apply(reader, index));
}
return result;
}

Expand All @@ -68,7 +70,7 @@ public Map<String, Object> apply(ValueReader valueReader) throws SQLException {
return mapper.updateAndGet(instance -> {
if (null == instance) {
Metadata meta = valueReader.meta();
int columnCount = meta.count();
int columnCount = meta.columnCount();
colReaders = new ArrayList<>(columnCount);
for (int col = 1; col <= columnCount; col++)
colReaders.add(entry(entry(meta.getLabel(col), col), reader(meta.getSQLType(col))));
Expand Down Expand Up @@ -183,7 +185,7 @@ private static TryBiFunction<ValueReader, Integer, String, SQLException> longVar
}

private static void setObject(ValueWriter writer, int index, Object value) throws SQLException {
if (value == null) writer.setObject(index, null);
if (null == value) writer.setObject(index, null);
else {
Class<?> cls = value.getClass();
if (Clob.class.isAssignableFrom(cls)) writer.setClob(index, (Clob) value);
Expand All @@ -195,6 +197,20 @@ private static void setObject(ValueWriter writer, int index, Object value) throw
else if (Calendar.class.isAssignableFrom(cls)) writer.setTimestamp(index, new Timestamp(((Calendar) value).getTimeInMillis()));
else if (Instant.class.isAssignableFrom(cls)) writer.setTimestamp(index, new Timestamp(((Instant) value).toEpochMilli()), Calendar.getInstance());
else if (ZonedDateTime.class.isAssignableFrom(cls)) writer.setTimestamp(index, new Timestamp(((ZonedDateTime) value).toInstant().toEpochMilli()), Calendar.getInstance());
else if (byte.class.isAssignableFrom(cls)) writer.setShort(index, (byte) value);
else if (Byte.class.isAssignableFrom(cls)) writer.setShort(index, (Byte) value);
else if (short.class.isAssignableFrom(cls)) writer.setShort(index, (short) value);
else if (Short.class.isAssignableFrom(cls)) writer.setShort(index, (Short) value);
else if (int.class.isAssignableFrom(cls)) writer.setInt(index, (int) value);
else if (Integer.class.isAssignableFrom(cls)) writer.setInt(index, (Integer) value);
else if (long.class.isAssignableFrom(cls)) writer.setLong(index, (long) value);
else if (Long.class.isAssignableFrom(cls)) writer.setLong(index, (Long) value);
else if (float.class.isAssignableFrom(cls)) writer.setFloat(index, (float) value);
else if (Float.class.isAssignableFrom(cls)) writer.setFloat(index, (Float) value);
else if (double.class.isAssignableFrom(cls)) writer.setDouble(index, (double) value);
else if (Double.class.isAssignableFrom(cls)) writer.setDouble(index, (Double) value);
else if (boolean.class.isAssignableFrom(cls)) writer.setBoolean(index, (boolean) value);
else if (BigDecimal.class.isAssignableFrom(cls)) writer.setBigDecimal(index, (BigDecimal) value);
else writer.setObject(index, value);
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/buckelieg/jdbc/Metadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ default List<String> foreignKeys() {
*
* @return a column count
*/
default int count() {
default int columnCount() {
return names().size();
}

Expand Down
11 changes: 6 additions & 5 deletions src/main/java/buckelieg/jdbc/RSMeta.java
Original file line number Diff line number Diff line change
Expand Up @@ -265,9 +265,7 @@ public Optional<String> getReferencedTable(@Nullable String columnName) {
public void forEachColumn(TryConsumer<Integer, SQLException> action) {
if (null == action) throw new NullPointerException("Action must be provided");
try {
for (Column column : getColumns()) {
action.accept(column.index);
}
for (Column column : getColumns()) action.accept(column.index);
} catch (SQLException e) {
throw newSQLRuntimeException(e);
}
Expand All @@ -276,7 +274,10 @@ public void forEachColumn(TryConsumer<Integer, SQLException> action) {
private boolean isPrimaryKey(Table table, String column) {
return getColumn(table.catalog, table.schema, table.name, column, c -> {
if (c.pk == null)
c.pk = listResultSet(dbMeta.get().getPrimaryKeys(table.catalog, table.schema, table.name), rs -> rs.getString(COLUMN_NAME)).stream().anyMatch(name -> name.equalsIgnoreCase(column));
c.pk = listResultSet(
dbMeta.get().getPrimaryKeys(table.catalog, table.schema, table.name),
rs -> rs.getString(COLUMN_NAME)
).stream().anyMatch(name -> name.equalsIgnoreCase(column));
}).pk;
}

Expand Down Expand Up @@ -413,7 +414,7 @@ private Column getColumn(String catalog, String schema, String table, String col

private Consumer<Column> getEnricher(@Nullable TryConsumer<Column, Exception> enricher) {
return c -> {
if (enricher != null) {
if (null != enricher) {
try {
enricher.accept(c);
} catch (Exception e) {
Expand Down
8 changes: 8 additions & 0 deletions src/test/java/buckelieg/jdbc/DBTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -853,6 +853,14 @@ public void testSelectForBatch() throws Exception {
@Test
public void testSelectForMeta() {
assertEquals(2, db.select("SELECT * FROM TEST").forMeta(Metadata::getColumnFullNames).size());
Map<String, Boolean> meta = db.select("SELECT * FROM TEST").forMeta(metadata -> {
Map<String, Boolean> map = new HashMap<>();
metadata.forEachColumn(columnIndex -> map.put(metadata.getName(columnIndex), metadata.isPrimaryKey(columnIndex)));
return map;
});
assertEquals(2, meta.size());
assertTrue(meta.get("ID"));
assertFalse(meta.get("NAME"));
}

}

0 comments on commit 58c5f19

Please sign in to comment.