diff --git a/README.md b/README.md index 74dde6c..8128703 100644 --- a/README.md +++ b/README.md @@ -44,7 +44,10 @@ or use named parameters: ```java // in java9+ import static java.util.Map.of; -Collection names = db.select("SELECT name FROM TEST WHERE 1=1 AND ID IN (:ID) OR NAME=:name", of(":ID", new Object[]{1, 2}, ":name", "name_5")).execute(rs -> rs.getString("name")) +Collection names = db.select( + "SELECT name FROM TEST WHERE 1=1 AND ID IN (:ID) OR NAME=:name", + of(":ID", new Object[]{1, 2}, ":name", "name_5") + ).execute(rs -> rs.getString("name")) .reduce( new LinkedList<>(), (list, name) -> { @@ -60,9 +63,8 @@ Collection names = db.select("SELECT name FROM TEST WHERE 1=1 AND ID IN Parameter names are CASE SENSITIVE! 'Name' and 'name' are considered different parameter names.
Parameters may be provided with or without leading colon. -###### The N+1 problem resolution +##### The N+1 problem resolution For the cases when it is needed to process (say - enrich) each mapped row with an additional data the `Select.ForBatch` can be used - ```java Stream entities = db.select("SELECT * FROM HUGE_TABLE") .forBatch(/* map resultSet here to needed type*/) @@ -73,19 +75,18 @@ Stream entities = db.select("SELECT * FROM HUGE_TABLE") }); ``` For cases where it is needed to issue any additional queries to database use: - ```java -Stream users = db.select("SELECT * FROM HUGE_TABLE") - .forBatch(/* map resultSet here to needed type*/) - .size(1000) - .execute((batch, session) -> { - // list of mapped rows (to resulting type) with size not more than 1000 - // session maps to currently used connection - Map attrs = session.select("SELECT * FROM USER_ATTR WHERE id IN (:ids)", entry("ids", batch.stream().map(User::getId).collect(Collectors.toList()))) - .execute(/* map to collection of domain objects that represents a user attribute */) - .groupingBy(UserAttr::userId, Function::identity); - batch.forEach(user -> user.addAttrs(attrs.getOrDefault(user.getId, Collections.emptyList()))); - }); +// suppose the USERS table contains thousands of records +Stream users = db.select("SELECT * FROM USERS") + .forBatch(rs -> new User(rs.getLong("id"), rs.getString("name"))) + .size(1000) + .execute((batchOfUsers, session) -> { + Map attrs = session.select( + "SELECT * FROM USER_ATTR WHERE id IN (:ids)", + entry("ids", batchOfUsers.stream().map(User::getId).collect(Collectors.toList())) + ).execute().groupingBy(UserAttr::userId, Function::identity); + batchOfUsers.forEach(user -> user.addAttrs(attrs.getOrDefault(user.getId, Collections.emptyList()))); + }); // stream of users objects will consist of updated (enriched) objects ``` Using this to process batches you must keep some things in mind: @@ -95,10 +96,24 @@ Using this to process batches you must keep some things in mind:
  • Select.fetchSize and Select.ForBatch.size are not the same but connected
  • +##### Metadata processing +For the special cases when only a metadata of the query is needed `Select.forMeta` can be used: +```java +// suppose we want to collect information of which column of the provided query is a primary key +import java.util.HashMap; + +Map processedMeta = db.select("SELECT * FROM TEST").forMeta(metadata -> { + Map map = new HashMap<>(); + metadata.forEachColumn(columnIndex -> map.put(metadata.getName(columnIndex), metadata.isPrimaryKey(columnIndex))); + return map; +}); +``` + ### Insert with question marks: ```java -long res = db.update("INSERT INTO TEST(name) VALUES(?)", "New_Name").execute(); // res is an affected rows count +// res is an affected rows count +long res = db.update("INSERT INTO TEST(name) VALUES(?)", "New_Name").execute(); ``` Or with named parameters: ```java @@ -107,8 +122,7 @@ long res = db.update("INSERT INTO TEST(name) VALUES(:name)", new SimpleImmutable import static java.util.Map.entry; long res = db.update("INSERT INTO TEST(name) VALUES(:name)", entry("name","New_Name")).execute(); ``` -###### Getting generated keys - +##### Getting generated keys To retrieve possible generated keys provide a mapping function to `execute` method: ```java Collection generatedIds = db.update("INSERT INTO TEST(name) VALUES(?)", "New_Name").execute(rs -> rs.getLong(1)); @@ -128,7 +142,7 @@ long res = db.update("UPDATE TEST SET NAME=:name WHERE NAME=:new_name", import static java.util.Map.entry; long res = db.update("UPDATE TEST SET NAME=:name WHERE NAME=:new_name", entry(":name", "new_name_2"), entry(":new_name", "name_2")).execute(); ``` -###### Batch mode +##### Batch mode For batch operation use: ```java long res = db.update("INSERT INTO TEST(name) VALUES(?)", new Object[][]{ {"name1"}, {"name2"} }).batch(2).execute(); @@ -192,8 +206,7 @@ User latestUser = db.transaction() .orElse(null) ); ``` -###### Nested transactions and deadlocks - +##### Nested transactions and deadlocks Providing connection supplier function with plain connection
    like this: DB db = DB.create(() -> connection));
    or this:   DB db = DB.builder().withMaxConnections(1).build(() -> DriverManager.getConnection("vendor-specific-string")); @@ -218,7 +231,7 @@ The above will print a current query to provided logger with debug method.
    SELECT * FROM TEST WHERE id=7
    Calling print() without arguments will do the same with standard output. -###### Scripts logging +##### Scripts logging For Script query verbose() method can be used to track current script step execution. ```java db.script("SELECT * FROM TEST WHERE id=:id;DROP TABLE TEST", new SimpleImmutableEntry<>("id", 5)).verbose().execute(); diff --git a/src/main/java/buckelieg/jdbc/DB.java b/src/main/java/buckelieg/jdbc/DB.java index fc0c9e1..fb397d9 100644 --- a/src/main/java/buckelieg/jdbc/DB.java +++ b/src/main/java/buckelieg/jdbc/DB.java @@ -22,6 +22,7 @@ import javax.annotation.concurrent.ThreadSafe; import java.sql.Connection; import java.sql.SQLException; +import java.time.Duration; import java.util.Map; import java.util.Optional; import java.util.UUID; @@ -54,17 +55,21 @@ public final class DB extends Session { private final boolean terminateConveyorOnClose; + private final boolean terminateConnectionPoolOnClose; + private DB( Map metaCache, Supplier txIdProvider, ConnectionManager connectionManager, Supplier executorServiceSupplier, - boolean terminateConveyorOnClose) { + boolean terminateConveyorOnClose, + boolean terminateConnectionPoolOnClose) { super(metaCache, connectionManager::getConnection, connectionManager::close, executorServiceSupplier); this.txIdProvider = txIdProvider; this.connectionManager = connectionManager; this.executorServiceProvider = () -> getExecutorService(executorServiceSupplier); this.terminateConveyorOnClose = terminateConveyorOnClose; + this.terminateConnectionPoolOnClose = terminateConnectionPoolOnClose; } private ExecutorService getExecutorService(Supplier executorServiceSupplier) { @@ -122,10 +127,12 @@ public void close() { if (null != conveyor && terminateConveyorOnClose) { Optional.ofNullable(conveyor.get()).ifPresent(ExecutorService::shutdownNow); } - try { - connectionManager.close(); - } catch (SQLException e) { - throw Utils.newSQLRuntimeException(e); + if (terminateConnectionPoolOnClose) { + try { + connectionManager.close(); + } catch (SQLException e) { + throw Utils.newSQLRuntimeException(e); + } } } @@ -143,6 +150,8 @@ public static final class Builder { private boolean terminateExecutorServiceOnClose = false; + private boolean terminateConnectionPoolOnClose = true; + private ConnectionManager connectionManager; private Builder() { @@ -189,6 +198,19 @@ public Builder withTerminateExecutorServiceOnClose(boolean terminateExecutorServ return this; } + /** + * Configures a {@linkplain DB} instance with connection pool termination on close value provided
    + * Default is {@code true} + * + * @param terminateConnectionPoolOnClose if {@code true} - then underlying connection pool will be attempted to shut down on {@linkplain DB#close()} method invocation + * @return a {@linkplain Builder} instance + */ + @Nonnull + public Builder withTerminateConnectionPoolOnClose(boolean terminateConnectionPoolOnClose) { + this.terminateConnectionPoolOnClose = terminateConnectionPoolOnClose; + return this; + } + /** * Configures a {@linkplain DB} instance with connection manager instance provided
    * @@ -235,9 +257,10 @@ public DB build(TrySupplier connectionProvider) { return new DB( new ConcurrentHashMap<>(), txIdProvider, - null == connectionManager ? new DefaultConnectionManager(connectionProvider, maxConnections) : connectionManager, + null == connectionManager ? new DefaultConnectionManager(connectionProvider, maxConnections, Duration.ofSeconds(5)) : connectionManager, executorServiceSupplier, - terminateExecutorServiceOnClose + terminateExecutorServiceOnClose, + terminateConnectionPoolOnClose ); } } diff --git a/src/main/java/buckelieg/jdbc/DefaultConnectionManager.java b/src/main/java/buckelieg/jdbc/DefaultConnectionManager.java index ca1272b..42de846 100644 --- a/src/main/java/buckelieg/jdbc/DefaultConnectionManager.java +++ b/src/main/java/buckelieg/jdbc/DefaultConnectionManager.java @@ -21,12 +21,14 @@ import javax.annotation.Nullable; import java.sql.Connection; import java.sql.SQLException; +import java.time.Duration; import java.util.List; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; final class DefaultConnectionManager implements ConnectionManager { @@ -42,11 +44,16 @@ final class DefaultConnectionManager implements ConnectionManager { private final AtomicBoolean isShuttingDown = new AtomicBoolean(false); - DefaultConnectionManager(TrySupplier connectionSupplier, int maxConnections) { + private final AtomicLong activeConnections = new AtomicLong(0); + + private final Duration waitOnClose; + + DefaultConnectionManager(TrySupplier connectionSupplier, int maxConnections, Duration waitOnClose) { this.connectionSupplier = connectionSupplier; this.maxConnections = maxConnections; this.pool = new ArrayBlockingQueue<>(maxConnections); this.obtainedConnections = new CopyOnWriteArrayList<>(); + this.waitOnClose = waitOnClose; } @Nonnull @@ -67,6 +74,7 @@ public Connection getConnection() throws SQLException { throw new SQLException(e); } connection.setAutoCommit(false); + activeConnections.incrementAndGet(); return connection; } @@ -74,12 +82,21 @@ public Connection getConnection() throws SQLException { public void close(@Nullable Connection connection) throws SQLException { if (null == connection) return; connection.setAutoCommit(true); + activeConnections.decrementAndGet(); if (!pool.offer(connection)) throw new SQLException("Connection pool is full"); } @Override public void close() throws SQLException { isShuttingDown.set(true); + if(activeConnections.get() > 0) { + // gracefully closing pool waiting for configured time for existing transactions to complete + try { + Thread.sleep(waitOnClose.toMillis()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } pool.clear(); SQLException exception = null; for (Connection connection : obtainedConnections) { @@ -93,4 +110,5 @@ public void close() throws SQLException { obtainedConnections.clear(); if (null != exception) throw exception; } + } diff --git a/src/main/java/buckelieg/jdbc/JDBCDefaults.java b/src/main/java/buckelieg/jdbc/JDBCDefaults.java index 09a290e..9fa4e60 100644 --- a/src/main/java/buckelieg/jdbc/JDBCDefaults.java +++ b/src/main/java/buckelieg/jdbc/JDBCDefaults.java @@ -52,8 +52,10 @@ static TryFunction, SQLException> defaultMapper static Map defaultMapper(ValueReader reader) throws SQLException { Metadata meta = reader.meta(); - Map result = new HashMap<>(meta.count()); - reader.meta().forEachColumn(index -> result.put(meta.getName(index), reader(meta.getSQLType(index)).apply(reader, index))); + Map result = new HashMap<>(meta.columnCount()); + for(int index = 1; index <= meta.columnCount(); index++) { + result.put(meta.getName(index), reader(meta.getSQLType(index)).apply(reader, index)); + } return result; } @@ -68,7 +70,7 @@ public Map apply(ValueReader valueReader) throws SQLException { return mapper.updateAndGet(instance -> { if (null == instance) { Metadata meta = valueReader.meta(); - int columnCount = meta.count(); + int columnCount = meta.columnCount(); colReaders = new ArrayList<>(columnCount); for (int col = 1; col <= columnCount; col++) colReaders.add(entry(entry(meta.getLabel(col), col), reader(meta.getSQLType(col)))); @@ -183,7 +185,7 @@ private static TryBiFunction longVar } private static void setObject(ValueWriter writer, int index, Object value) throws SQLException { - if (value == null) writer.setObject(index, null); + if (null == value) writer.setObject(index, null); else { Class cls = value.getClass(); if (Clob.class.isAssignableFrom(cls)) writer.setClob(index, (Clob) value); @@ -195,6 +197,20 @@ private static void setObject(ValueWriter writer, int index, Object value) throw else if (Calendar.class.isAssignableFrom(cls)) writer.setTimestamp(index, new Timestamp(((Calendar) value).getTimeInMillis())); else if (Instant.class.isAssignableFrom(cls)) writer.setTimestamp(index, new Timestamp(((Instant) value).toEpochMilli()), Calendar.getInstance()); else if (ZonedDateTime.class.isAssignableFrom(cls)) writer.setTimestamp(index, new Timestamp(((ZonedDateTime) value).toInstant().toEpochMilli()), Calendar.getInstance()); + else if (byte.class.isAssignableFrom(cls)) writer.setShort(index, (byte) value); + else if (Byte.class.isAssignableFrom(cls)) writer.setShort(index, (Byte) value); + else if (short.class.isAssignableFrom(cls)) writer.setShort(index, (short) value); + else if (Short.class.isAssignableFrom(cls)) writer.setShort(index, (Short) value); + else if (int.class.isAssignableFrom(cls)) writer.setInt(index, (int) value); + else if (Integer.class.isAssignableFrom(cls)) writer.setInt(index, (Integer) value); + else if (long.class.isAssignableFrom(cls)) writer.setLong(index, (long) value); + else if (Long.class.isAssignableFrom(cls)) writer.setLong(index, (Long) value); + else if (float.class.isAssignableFrom(cls)) writer.setFloat(index, (float) value); + else if (Float.class.isAssignableFrom(cls)) writer.setFloat(index, (Float) value); + else if (double.class.isAssignableFrom(cls)) writer.setDouble(index, (double) value); + else if (Double.class.isAssignableFrom(cls)) writer.setDouble(index, (Double) value); + else if (boolean.class.isAssignableFrom(cls)) writer.setBoolean(index, (boolean) value); + else if (BigDecimal.class.isAssignableFrom(cls)) writer.setBigDecimal(index, (BigDecimal) value); else writer.setObject(index, value); } } diff --git a/src/main/java/buckelieg/jdbc/Metadata.java b/src/main/java/buckelieg/jdbc/Metadata.java index bd369ba..a6c6897 100644 --- a/src/main/java/buckelieg/jdbc/Metadata.java +++ b/src/main/java/buckelieg/jdbc/Metadata.java @@ -81,7 +81,7 @@ default List foreignKeys() { * * @return a column count */ - default int count() { + default int columnCount() { return names().size(); } diff --git a/src/main/java/buckelieg/jdbc/RSMeta.java b/src/main/java/buckelieg/jdbc/RSMeta.java index 7455907..bb852f0 100644 --- a/src/main/java/buckelieg/jdbc/RSMeta.java +++ b/src/main/java/buckelieg/jdbc/RSMeta.java @@ -265,9 +265,7 @@ public Optional getReferencedTable(@Nullable String columnName) { public void forEachColumn(TryConsumer action) { if (null == action) throw new NullPointerException("Action must be provided"); try { - for (Column column : getColumns()) { - action.accept(column.index); - } + for (Column column : getColumns()) action.accept(column.index); } catch (SQLException e) { throw newSQLRuntimeException(e); } @@ -276,7 +274,10 @@ public void forEachColumn(TryConsumer action) { private boolean isPrimaryKey(Table table, String column) { return getColumn(table.catalog, table.schema, table.name, column, c -> { if (c.pk == null) - c.pk = listResultSet(dbMeta.get().getPrimaryKeys(table.catalog, table.schema, table.name), rs -> rs.getString(COLUMN_NAME)).stream().anyMatch(name -> name.equalsIgnoreCase(column)); + c.pk = listResultSet( + dbMeta.get().getPrimaryKeys(table.catalog, table.schema, table.name), + rs -> rs.getString(COLUMN_NAME) + ).stream().anyMatch(name -> name.equalsIgnoreCase(column)); }).pk; } @@ -413,7 +414,7 @@ private Column getColumn(String catalog, String schema, String table, String col private Consumer getEnricher(@Nullable TryConsumer enricher) { return c -> { - if (enricher != null) { + if (null != enricher) { try { enricher.accept(c); } catch (Exception e) { diff --git a/src/test/java/buckelieg/jdbc/DBTests.java b/src/test/java/buckelieg/jdbc/DBTests.java index 699118e..72935ac 100644 --- a/src/test/java/buckelieg/jdbc/DBTests.java +++ b/src/test/java/buckelieg/jdbc/DBTests.java @@ -853,6 +853,14 @@ public void testSelectForBatch() throws Exception { @Test public void testSelectForMeta() { assertEquals(2, db.select("SELECT * FROM TEST").forMeta(Metadata::getColumnFullNames).size()); + Map meta = db.select("SELECT * FROM TEST").forMeta(metadata -> { + Map map = new HashMap<>(); + metadata.forEachColumn(columnIndex -> map.put(metadata.getName(columnIndex), metadata.isPrimaryKey(columnIndex))); + return map; + }); + assertEquals(2, meta.size()); + assertTrue(meta.get("ID")); + assertFalse(meta.get("NAME")); } }