From 9299a80259e2a7c00bef704044fd1bd6358259f3 Mon Sep 17 00:00:00 2001 From: Sumedh Wale Date: Sat, 29 Dec 2018 02:47:28 +0530 Subject: [PATCH] [SNAP-2818] trim the JOB_DESCRIPTION property in Spark jobs (#1227) - previous changes set SparkContext.SPARK_JOB_DESCRIPTION property to the query string for SnappySession.sql executions but this can exceed 32K and property will fail in serialization, so trim it to 100 size with "..." continuation like done in SQL tab display - added large view test to ViewTest and enhanced it to accept a generic "String => DataFrame" closure so that the same can be used for scala tests for SnappySession as well as dunits for JDBC Statement.execute; added the same tests to DDLRoutingDUnitTest using this Note: the above test is unable the reproduce the original issue with CREATE VIEW but it does reproduce for a large query string - disallow CREATE INDEX creation on column tables without experimental-features property - clear catalog cache in shutdown to avoid its accidental use by subsequent tests --- .../cluster/DDLRoutingDUnitTest.scala | 63 ++- .../cluster/QueryRoutingDUnitTest.scala | 1 + .../cluster/QueryRoutingSingleNodeSuite.scala | 4 +- .../spark/sql/store/SQLMetadataTest.scala | 6 +- .../SplitClusterDUnitSecurityTest.scala | 6 +- .../cluster/SplitClusterDUnitTestBase.scala | 18 +- .../apache/spark/sql/CachedDataFrame.scala | 7 +- .../org/apache/spark/sql/SnappySession.scala | 29 +- .../apache/spark/sql/collection/Utils.scala | 10 +- .../columnar/impl/ColumnFormatRelation.scala | 9 +- .../org/apache/spark/sql/execution/ddl.scala | 5 +- .../sql/hive/SnappyHiveExternalCatalog.scala | 15 +- .../api/JavaCreateIndexTestSuite.java | 2 + .../scala/io/snappydata/SnappyFunSuite.scala | 24 +- .../apache/spark/sql/store/MetadataTest.scala | 23 +- .../org/apache/spark/sql/store/ViewTest.scala | 397 +++++++++++------- .../test/java/io/snappydata/hydra/smoke.sh | 2 - gradle.properties | 7 + 18 files changed, 387 insertions(+), 241 deletions(-) diff --git a/cluster/src/dunit/scala/io/snappydata/cluster/DDLRoutingDUnitTest.scala b/cluster/src/dunit/scala/io/snappydata/cluster/DDLRoutingDUnitTest.scala index 2d1bef88a2..8d789bfec7 100644 --- a/cluster/src/dunit/scala/io/snappydata/cluster/DDLRoutingDUnitTest.scala +++ b/cluster/src/dunit/scala/io/snappydata/cluster/DDLRoutingDUnitTest.scala @@ -19,10 +19,12 @@ package io.snappydata.cluster import java.sql.{Connection, DriverManager, SQLException} import com.pivotal.gemfirexd.internal.engine.{GfxdConstants, Misc} +import io.snappydata.SnappyFunSuite.resultSetToDataset import io.snappydata.test.dunit.{AvailablePortHelper, SerializableRunnable} -import org.apache.spark.sql.SnappyContext import org.apache.spark.sql.collection.Utils +import org.apache.spark.sql.store.ViewTest +import org.apache.spark.sql.{Dataset, Row, SnappyContext, SnappySession} class DDLRoutingDUnitTest(val s: String) extends ClusterManagerTestBase(s) { @@ -72,12 +74,12 @@ class DDLRoutingDUnitTest(val s: String) extends ClusterManagerTestBase(s) { val conn = getANetConnection(netPort1) // first fail a statement - failCreateTableXD(conn, tableName, true, " row ") + failCreateTableXD(conn, tableName, doFail = true, " row ") createTableXD(conn, tableName, " row ") tableMetadataAssertRowTable("APP", tableName) // Test create table - error for recreate - failCreateTableXD(conn, tableName, false, " row ") + failCreateTableXD(conn, tableName, doFail = false, " row ") // Drop Table and Recreate dropTableXD(conn, tableName) @@ -167,7 +169,7 @@ class DDLRoutingDUnitTest(val s: String) extends ClusterManagerTestBase(s) { vm2.invoke(classOf[ClusterManagerTestBase], "stopAny") val props = bootProps.clone().asInstanceOf[java.util.Properties] - props.put("distributed-system-id" , "1") + props.put("distributed-system-id", "1") props.put("server-groups", "sg1") val restartServer = new SerializableRunnable() { @@ -185,7 +187,7 @@ class DDLRoutingDUnitTest(val s: String) extends ClusterManagerTestBase(s) { var s = conn.createStatement() s.execute(s"CREATE TABLE $tableName (Col1 INT, Col2 INT, Col3 STRING)") insertDataXD(conn, tableName) - var snc = org.apache.spark.sql.SnappyContext(sc) + val snc = org.apache.spark.sql.SnappyContext(sc) verifyResultAndSchema(snc, tableName, 3) s.execute(s"ALTER TABLE $tableName ADD Col4 INT") @@ -207,21 +209,21 @@ class DDLRoutingDUnitTest(val s: String) extends ClusterManagerTestBase(s) { s.execute(s"insert into $tableName values (1,1)") s.execute(s"ALTER TABLE $tableName add constraint emp_uk unique (Col1)") try { - s.execute(s"insert into $tableName values (1,1)") + s.execute(s"insert into $tableName values (1,1)") } catch { case sqle: SQLException => if (sqle.getSQLState != "23505" || - !sqle.getMessage.contains("duplicate key value in a unique or" + - " primary key constraint or unique index")) { + !sqle.getMessage.contains("duplicate key value in a unique or" + + " primary key constraint or unique index")) { throw sqle } } // asynceventlistener s.execute("CREATE ASYNCEVENTLISTENER myListener (" + - " listenerclass 'com.pivotal.gemfirexd.callbacks.DBSynchronizer'" + - " initparams 'org.apache.derby.jdbc.EmbeddedDriver,jdbc:derby:newDB;create=true')" + - " server groups(sg1)") + " listenerclass 'com.pivotal.gemfirexd.callbacks.DBSynchronizer'" + + " initparams 'org.apache.derby.jdbc.EmbeddedDriver,jdbc:derby:newDB;create=true')" + + " server groups(sg1)") s.execute(s"ALTER TABLE $tableName SET ASYNCEVENTLISTENER (myListener) ") var rs = s.executeQuery(s"select * from SYS.SYSTABLES where tablename='$tableName'") @@ -287,7 +289,8 @@ class DDLRoutingDUnitTest(val s: String) extends ClusterManagerTestBase(s) { var cnt = 0 while (rs.next()) { cnt += 1 - rs.getInt(1); rs.getInt(2); + rs.getInt(1) + rs.getInt(2) } assert(cnt == 5, cnt) @@ -296,7 +299,9 @@ class DDLRoutingDUnitTest(val s: String) extends ClusterManagerTestBase(s) { cnt = 0 while (rs2.next()) { cnt += 1 - rs2.getInt(1); rs2.getInt(2); rs2.getInt(3); + rs2.getInt(1) + rs2.getInt(2) + rs2.getInt(3) } assert(cnt == 5, cnt) @@ -324,6 +329,36 @@ class DDLRoutingDUnitTest(val s: String) extends ClusterManagerTestBase(s) { dropTableXD(conn, tableName) } + def testViews(): Unit = { + val netPort1 = AvailablePortHelper.getRandomAvailableTCPPort + vm2.invoke(classOf[ClusterManagerTestBase], "startNetServer", netPort1) + + val session = new SnappySession(sc) + ViewTest.createTables(session) + + def newExecution(): String => Dataset[Row] = { + val session = new SnappySession(sc) + val conn = getANetConnection(netPort1) + val stmt = conn.createStatement() + resultSetToDataset(session, stmt) + } + + val conn = getANetConnection(netPort1) + val stmt = conn.createStatement() + ViewTest.testTemporaryView(resultSetToDataset(session, stmt), newExecution) + ViewTest.testGlobalTemporaryView(resultSetToDataset(session, stmt), newExecution) + ViewTest.testTemporaryViewUsing(resultSetToDataset(session, stmt), newExecution) + ViewTest.testGlobalTemporaryViewUsing(resultSetToDataset(session, stmt), newExecution) + ViewTest.testPersistentView(resultSetToDataset(session, stmt), checkPlans = false, + newExecution, restartSpark) + ViewTest.dropTables(new SnappySession(sc)) + } + + private def restartSpark(): Unit = { + ClusterManagerTestBase.stopAny() + ClusterManagerTestBase.startSnappyLead(ClusterManagerTestBase.locatorPort, bootProps) + } + def createTableXD(conn: Connection, tableName: String, usingStr: String): Unit = { val s = conn.createStatement() @@ -421,7 +456,7 @@ class DDLRoutingDUnitTest(val s: String) extends ClusterManagerTestBase(s) { s.execute("CREATE EXTERNAL TABLE airlineRef_temp(Code VARCHAR(25), " + "Description VARCHAR(25)) USING parquet OPTIONS()") } catch { - case e: java.sql.SQLException => + case _: java.sql.SQLException => // println("Exception stack. create. ex=" + e.getMessage + // " ,stack=" + ExceptionUtils.getFullStackTrace(e)) } diff --git a/cluster/src/dunit/scala/io/snappydata/cluster/QueryRoutingDUnitTest.scala b/cluster/src/dunit/scala/io/snappydata/cluster/QueryRoutingDUnitTest.scala index fa4ec20a2d..4e507e39be 100644 --- a/cluster/src/dunit/scala/io/snappydata/cluster/QueryRoutingDUnitTest.scala +++ b/cluster/src/dunit/scala/io/snappydata/cluster/QueryRoutingDUnitTest.scala @@ -761,6 +761,7 @@ class QueryRoutingDUnitTest(val s: String) TPCHUtils.createAndLoadTables(snc, true) + snc.setConf(Property.EnableExperimentalFeatures.name, "true") snc.sql( s"""CREATE INDEX idx_orders_cust ON orders(o_custkey) options (COLOCATE_WITH 'customer') diff --git a/cluster/src/test/scala/io/snappydata/cluster/QueryRoutingSingleNodeSuite.scala b/cluster/src/test/scala/io/snappydata/cluster/QueryRoutingSingleNodeSuite.scala index 39194fa11a..4600a926fa 100644 --- a/cluster/src/test/scala/io/snappydata/cluster/QueryRoutingSingleNodeSuite.scala +++ b/cluster/src/test/scala/io/snappydata/cluster/QueryRoutingSingleNodeSuite.scala @@ -25,7 +25,7 @@ import io.snappydata.{SnappyFunSuite, SnappyTableStatsProviderService} import org.scalatest.BeforeAndAfterAll import org.apache.spark.sql.SnappySession -import org.apache.spark.sql.store.{ColumnTableBatchInsertTest, MetadataTest} +import org.apache.spark.sql.store.ColumnTableBatchInsertTest class QueryRoutingSingleNodeSuite extends SnappyFunSuite with BeforeAndAfterAll { @@ -707,7 +707,7 @@ class QueryRoutingSingleNodeSuite extends SnappyFunSuite with BeforeAndAfterAll val connSession = allSessions.head // skip the "isCached" checks with JDBC since session is different for JDBC connection ColumnTableBatchInsertTest.testSparkCachingUsingSQL(sc, - MetadataTest.resultSetToDataset(connSession, stmt), connSession.catalog.isCached, + SnappyFunSuite.resultSetToDataset(connSession, stmt), connSession.catalog.isCached, df => connSession.sharedState.cacheManager.lookupCachedData(df).isDefined) stmt.close() } finally { diff --git a/cluster/src/test/scala/org/apache/spark/sql/store/SQLMetadataTest.scala b/cluster/src/test/scala/org/apache/spark/sql/store/SQLMetadataTest.scala index 37c4a2e4af..68d9812f98 100644 --- a/cluster/src/test/scala/org/apache/spark/sql/store/SQLMetadataTest.scala +++ b/cluster/src/test/scala/org/apache/spark/sql/store/SQLMetadataTest.scala @@ -45,7 +45,7 @@ class SQLMetadataTest extends SnappyFunSuite { val conn = DriverManager.getConnection(s"jdbc:snappydata://localhost:$netPort") try { val stmt = conn.createStatement() - MetadataTest.testSYSTablesAndVTIs(MetadataTest.resultSetToDataset(session, stmt), + MetadataTest.testSYSTablesAndVTIs(SnappyFunSuite.resultSetToDataset(session, stmt), netServers = Seq(s"localhost/127.0.0.1[$netPort]")) stmt.close() } finally { @@ -58,7 +58,7 @@ class SQLMetadataTest extends SnappyFunSuite { val conn = DriverManager.getConnection(s"jdbc:snappydata://localhost:$netPort") try { val stmt = conn.createStatement() - MetadataTest.testDescribeShowAndExplain(MetadataTest.resultSetToDataset(session, stmt), + MetadataTest.testDescribeShowAndExplain(SnappyFunSuite.resultSetToDataset(session, stmt), usingJDBC = true) stmt.close() } finally { @@ -71,7 +71,7 @@ class SQLMetadataTest extends SnappyFunSuite { val conn = DriverManager.getConnection(s"jdbc:snappydata://localhost:$netPort") try { val stmt = conn.createStatement() - MetadataTest.testDSIDWithSYSTables(MetadataTest.resultSetToDataset(session, stmt), + MetadataTest.testDSIDWithSYSTables(SnappyFunSuite.resultSetToDataset(session, stmt), Seq(s"localhost/127.0.0.1[$netPort]")) stmt.close() } finally { diff --git a/core/src/dunit/scala/io/snappydata/cluster/SplitClusterDUnitSecurityTest.scala b/core/src/dunit/scala/io/snappydata/cluster/SplitClusterDUnitSecurityTest.scala index 946179e55e..e7ec9299a5 100644 --- a/core/src/dunit/scala/io/snappydata/cluster/SplitClusterDUnitSecurityTest.scala +++ b/core/src/dunit/scala/io/snappydata/cluster/SplitClusterDUnitSecurityTest.scala @@ -681,8 +681,7 @@ class SplitClusterDUnitSecurityTest(s: String) s"CREATE TEMPORARY TABLE ${t1}temp AS SELECT id, name FROM $schema.$t1", s"CREATE GLOBAL TEMPORARY TABLE ${t1}tempg AS SELECT id, name FROM $schema.$t1", s"CREATE EXTERNAL TABLE $schema.${t1}ext USING csv OPTIONS(path " + - s"'../../quickstart/src/main/resources/customer.csv')", - s"CREATE INDEX $schema.idx ON $schema.$t1 (id, name)") + s"'../../quickstart/src/main/resources/customer.csv')") .foreach(executeSQL(user1Stmt, _)) // user gemfire2 of same group gemGroup1 @@ -698,7 +697,6 @@ class SplitClusterDUnitSecurityTest(s: String) s"select * from $schema.$t2", s"delete from $schema.$t1 where name like 'two'", s"drop table $schema.$t1r", - s"drop index $schema.idx", s"select * from $schema.$t2").foreach(executeSQL(user2Stmt, _)) // user gemfire1 @@ -724,7 +722,7 @@ class SplitClusterDUnitSecurityTest(s: String) s"CREATE INDEX $schema.idx4 ON $schema.$t1 (id, name)") .foreach(sql => assertFailures(() => { executeSQL(user4Stmt, sql) - }, sql, Seq("42500", "42502", "42506", "42507"))) + }, sql, Seq("42500", "42502", "42506", "42507", "38000"))) // Grant DML permissions to gemfire4 and ensure it works. executeSQL(user1Stmt, s"grant select on $schema.$t1 to ldapgroup:$group2") diff --git a/core/src/dunit/scala/io/snappydata/cluster/SplitClusterDUnitTestBase.scala b/core/src/dunit/scala/io/snappydata/cluster/SplitClusterDUnitTestBase.scala index d951199642..cf435524e9 100644 --- a/core/src/dunit/scala/io/snappydata/cluster/SplitClusterDUnitTestBase.scala +++ b/core/src/dunit/scala/io/snappydata/cluster/SplitClusterDUnitTestBase.scala @@ -32,7 +32,7 @@ import com.pivotal.gemfirexd.internal.engine.Misc import io.snappydata.test.dunit.{SerializableRunnable, VM} import io.snappydata.test.util.TestException import io.snappydata.util.TestUtils -import io.snappydata.{ColumnUpdateDeleteTests, Constant} +import io.snappydata.{ColumnUpdateDeleteTests, Constant, SnappyFunSuite} import org.junit.Assert import org.apache.spark.sql.catalyst.InternalRow @@ -296,11 +296,11 @@ trait SplitClusterDUnitTestObject extends Logging { netServers, locatorId, locatorNetServer, servers, leadId) // next test metadata using JDBC connection stmt = jdbcConn.createStatement() - MetadataTest.testSYSTablesAndVTIs(MetadataTest.resultSetToDataset(session, stmt), + MetadataTest.testSYSTablesAndVTIs(SnappyFunSuite.resultSetToDataset(session, stmt), hostName = "localhost", netServers, locatorId, locatorNetServer, servers, leadId) - MetadataTest.testDescribeShowAndExplain(MetadataTest.resultSetToDataset(session, stmt), + MetadataTest.testDescribeShowAndExplain(SnappyFunSuite.resultSetToDataset(session, stmt), usingJDBC = true) - MetadataTest.testDSIDWithSYSTables(MetadataTest.resultSetToDataset(session, stmt), + MetadataTest.testDSIDWithSYSTables(SnappyFunSuite.resultSetToDataset(session, stmt), netServers, locatorId, locatorNetServer, servers, leadId) stmt.close() @@ -417,8 +417,10 @@ trait SplitClusterDUnitTestObject extends Logging { SnappyContext.getClusterMode(snc.sparkContext) match { case ThinClientConnectorMode(_, _) => // test index create op - snc.createIndex("tableName" + "_index", tableName, Map("COL1" -> None), - Map.empty[String, String]) + if ("row".equalsIgnoreCase(tableType)) { + snc.createIndex("tableName" + "_index", tableName, Map("COL1" -> None), + Map.empty[String, String]) + } case _ => } @@ -427,7 +429,9 @@ trait SplitClusterDUnitTestObject extends Logging { SnappyContext.getClusterMode(snc.sparkContext) match { case ThinClientConnectorMode(_, _) => // test index drop op - snc.dropIndex("tableName" + "_index", ifExists = false) + if ("row".equalsIgnoreCase(tableType)) { + snc.dropIndex("tableName" + "_index", ifExists = false) + } case _ => } } diff --git a/core/src/main/scala/org/apache/spark/sql/CachedDataFrame.scala b/core/src/main/scala/org/apache/spark/sql/CachedDataFrame.scala index 61cac1d3e9..f70d898e80 100644 --- a/core/src/main/scala/org/apache/spark/sql/CachedDataFrame.scala +++ b/core/src/main/scala/org/apache/spark/sql/CachedDataFrame.scala @@ -106,6 +106,9 @@ class CachedDataFrame(snappySession: SnappySession, queryExecution: QueryExecuti @transient private[sql] var currentLiterals: Array[ParamLiteral] = _ + @transient + private[sql] var queryShortString: String = _ + @transient private[sql] var queryString: String = _ @@ -288,7 +291,7 @@ class CachedDataFrame(snappySession: SnappySession, queryExecution: QueryExecuti try { didPrepare = prepareForCollect() val (result, elapsedMillis) = CachedDataFrame.withNewExecutionId(snappySession, - queryString, queryString, currentQueryExecutionString, currentQueryPlanInfo, + queryShortString, queryString, currentQueryExecutionString, currentQueryPlanInfo, currentExecutionId, planStartTime, planEndTime)(body) (result, elapsedMillis * 1000000L) } finally { @@ -613,7 +616,7 @@ object CachedDataFrame else Utils.nextExecutionIdMethod.invoke(SQLExecution).asInstanceOf[Long] val executionIdStr = java.lang.Long.toString(executionId) localProperties.setProperty(SQLExecution.EXECUTION_ID_KEY, executionIdStr) - localProperties.setProperty(SparkContext.SPARK_JOB_DESCRIPTION, queryLongForm) + localProperties.setProperty(SparkContext.SPARK_JOB_DESCRIPTION, queryShortForm) localProperties.setProperty(SparkContext.SPARK_JOB_GROUP_ID, executionIdStr) val startTime = System.currentTimeMillis() diff --git a/core/src/main/scala/org/apache/spark/sql/SnappySession.scala b/core/src/main/scala/org/apache/spark/sql/SnappySession.scala index b49f812901..d7c8fa3516 100644 --- a/core/src/main/scala/org/apache/spark/sql/SnappySession.scala +++ b/core/src/main/scala/org/apache/spark/sql/SnappySession.scala @@ -1835,8 +1835,8 @@ object SnappySession extends Logging { * data to the active executions. SparkListenerSQLPlanExecutionEnd is * then sent with the accumulated time of both the phases. */ - private def planExecution(qe: QueryExecution, session: SnappySession, sqlText: String, - executedPlan: SparkPlan, paramLiterals: Array[ParamLiteral], paramsId: Int) + private def planExecution(qe: QueryExecution, session: SnappySession, sqlShortText: String, + sqlText: String, executedPlan: SparkPlan, paramLiterals: Array[ParamLiteral], paramsId: Int) (f: => RDD[InternalRow]): (RDD[InternalRow], String, SparkPlanInfo, String, SparkPlanInfo, Long, Long, Long) = { // Right now the CachedDataFrame is not getting used across SnappySessions @@ -1845,7 +1845,7 @@ object SnappySession extends Logging { val context = session.sparkContext val localProperties = context.getLocalProperties localProperties.setProperty(SQLExecution.EXECUTION_ID_KEY, executionIdStr) - localProperties.setProperty(SparkContext.SPARK_JOB_DESCRIPTION, sqlText) + localProperties.setProperty(SparkContext.SPARK_JOB_DESCRIPTION, sqlShortText) localProperties.setProperty(SparkContext.SPARK_JOB_GROUP_ID, executionIdStr) val start = System.currentTimeMillis() try { @@ -1870,8 +1870,8 @@ object SnappySession extends Logging { } } - private def evaluatePlan(qe: QueryExecution, session: SnappySession, sqlText: String, - paramLiterals: Array[ParamLiteral], paramsId: Int): CachedDataFrame = { + private def evaluatePlan(qe: QueryExecution, session: SnappySession, sqlShortText: String, + sqlText: String, paramLiterals: Array[ParamLiteral], paramsId: Int): CachedDataFrame = { val (executedPlan, withFallback) = getExecutedPlan(qe.executedPlan) var planCaching = session.planCaching @@ -1906,7 +1906,7 @@ object SnappySession extends Logging { case _ => true } else true // post final execution immediately (collect for these plans will post nothing) - CachedDataFrame.withNewExecutionId(session, sqlText, sqlText, executionStr, planInfo, + CachedDataFrame.withNewExecutionId(session, sqlShortText, sqlText, executionStr, planInfo, postGUIPlans = postGUIPlans) { // create new LogicalRDD plan so that plan does not get re-executed // (e.g. just toRdd is not enough since further operators like show will pass @@ -1922,14 +1922,15 @@ object SnappySession extends Logging { case plan: CollectAggregateExec => val (childRDD, origExecutionStr, origPlanInfo, executionStr, planInfo, executionId, - planStartTime, planEndTime) = planExecution(qe, session, sqlText, plan, paramLiterals, - paramsId)(if (withFallback ne null) withFallback.execute(plan.child) else plan.childRDD) + planStartTime, planEndTime) = planExecution(qe, session, sqlShortText, sqlText, plan, + paramLiterals, paramsId)( + if (withFallback ne null) withFallback.execute(plan.child) else plan.childRDD) (childRDD, qe, origExecutionStr, origPlanInfo, executionStr, planInfo, childRDD.id, true, executionId, planStartTime, planEndTime) case plan => val (rdd, origExecutionStr, origPlanInfo, executionStr, planInfo, executionId, - planStartTime, planEndTime) = planExecution(qe, session, sqlText, plan, + planStartTime, planEndTime) = planExecution(qe, session, sqlShortText, sqlText, plan, paramLiterals, paramsId) { plan match { case p: CollectLimitExec => @@ -1993,6 +1994,7 @@ object SnappySession extends Logging { def sqlPlan(session: SnappySession, sqlText: String): CachedDataFrame = { val parser = session.sessionState.sqlParser + val sqlShortText = CachedDataFrame.queryStringShortForm(sqlText) val plan = parser.parsePlan(sqlText, clearExecutionData = true) val planCaching = session.planCaching val paramLiterals = parser.sqlParser.getAllLiterals @@ -2007,7 +2009,7 @@ object SnappySession extends Logging { session.currentKey = key try { val execution = session.executePlan(plan) - cachedDF = evaluatePlan(execution, session, sqlText, paramLiterals, paramsId) + cachedDF = evaluatePlan(execution, session, sqlShortText, sqlText, paramLiterals, paramsId) // put in cache if the DF has to be cached if (planCaching && cachedDF.isCached) { if (isTraceEnabled) { @@ -2026,12 +2028,13 @@ object SnappySession extends Logging { logDebug(s"Using cached plan for: $sqlText (existing: ${cachedDF.queryString})") cachedDF = cachedDF.duplicate() } - handleCachedDataFrame(cachedDF, plan, session, sqlText, paramLiterals, paramsId) + handleCachedDataFrame(cachedDF, plan, session, sqlShortText, sqlText, paramLiterals, paramsId) } private def handleCachedDataFrame(cachedDF: CachedDataFrame, plan: LogicalPlan, - session: SnappySession, sqlText: String, paramLiterals: Array[ParamLiteral], - paramsId: Int): CachedDataFrame = { + session: SnappySession, sqlShortText: String, sqlText: String, + paramLiterals: Array[ParamLiteral], paramsId: Int): CachedDataFrame = { + cachedDF.queryShortString = sqlShortText cachedDF.queryString = sqlText if (cachedDF.isCached && (cachedDF.paramLiterals eq null)) { cachedDF.paramLiterals = paramLiterals diff --git a/core/src/main/scala/org/apache/spark/sql/collection/Utils.scala b/core/src/main/scala/org/apache/spark/sql/collection/Utils.scala index baab7912c9..4ec0b01665 100644 --- a/core/src/main/scala/org/apache/spark/sql/collection/Utils.scala +++ b/core/src/main/scala/org/apache/spark/sql/collection/Utils.scala @@ -20,7 +20,7 @@ import java.io.ObjectOutputStream import java.lang.reflect.Method import java.net.{URL, URLClassLoader} import java.nio.ByteBuffer -import java.sql.DriverManager +import java.sql.{DriverManager, ResultSet} import java.util.TimeZone import scala.annotation.tailrec @@ -41,6 +41,7 @@ import org.apache.commons.math3.distribution.NormalDistribution import org.eclipse.collections.impl.map.mutable.UnifiedMap import org.apache.spark._ +import org.apache.spark.executor.InputMetrics import org.apache.spark.io.CompressionCodec import org.apache.spark.memory.TaskMemoryManager import org.apache.spark.rdd.RDD @@ -56,7 +57,7 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, analysis} import org.apache.spark.sql.execution.SQLExecution import org.apache.spark.sql.execution.columnar.ExternalStoreUtils.CaseInsensitiveMutableHashMap -import org.apache.spark.sql.execution.datasources.jdbc.{DriverRegistry, DriverWrapper} +import org.apache.spark.sql.execution.datasources.jdbc.{DriverRegistry, DriverWrapper, JdbcUtils} import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.internal.SnappySessionCatalog import org.apache.spark.sql.sources.{CastLongTime, JdbcExtendedUtils} @@ -678,6 +679,11 @@ object Utils { def createCatalystConverter(dataType: DataType): Any => Any = CatalystTypeConverters.createToCatalystConverter(dataType) + def resultSetToSparkInternalRows(resultSet: ResultSet, schema: StructType, + inputMetrics: InputMetrics = new InputMetrics): Iterator[InternalRow] = { + JdbcUtils.resultSetToSparkInternalRows(resultSet, schema, inputMetrics) + } + // we should use the exact day as Int, for example, (year, month, day) -> day def millisToDays(millisUtc: Long, tz: TimeZone): Int = { // SPARK-6785: use Math.floor so negative number of days (dates before 1970) diff --git a/core/src/main/scala/org/apache/spark/sql/execution/columnar/impl/ColumnFormatRelation.scala b/core/src/main/scala/org/apache/spark/sql/execution/columnar/impl/ColumnFormatRelation.scala index 60157eb4ff..b730edec6d 100644 --- a/core/src/main/scala/org/apache/spark/sql/execution/columnar/impl/ColumnFormatRelation.scala +++ b/core/src/main/scala/org/apache/spark/sql/execution/columnar/impl/ColumnFormatRelation.scala @@ -23,7 +23,7 @@ import scala.util.control.NonFatal import com.gemstone.gemfire.internal.cache.{ExternalTableMetaData, LocalRegion} import com.pivotal.gemfirexd.internal.engine.Misc import com.pivotal.gemfirexd.internal.engine.store.GemFireContainer -import io.snappydata.Constant +import io.snappydata.{Constant, Property} import io.snappydata.sql.catalog.{RelationInfo, SnappyExternalCatalog} import org.apache.spark.rdd.RDD @@ -518,8 +518,13 @@ class ColumnFormatRelation( indexColumns: Map[String, Option[SortDirection]], options: Map[String, String]): DataFrame = { - val parameters = new CaseInsensitiveMutableHashMap(options) val session = sqlContext.sparkSession.asInstanceOf[SnappySession] + // only allow if experimental-features are enabled + if (!Property.EnableExperimentalFeatures.get(session.sessionState.conf)) { + throw new UnsupportedOperationException( + "CREATE INDEX on column tables is an experimental unsupported feature") + } + val parameters = new CaseInsensitiveMutableHashMap(options) val parser = session.snappyParser val indexCols = indexColumns.keys.map(parser.parseSQLOnly(_, parser.parseIdentifier.run())) val catalog = session.sessionCatalog diff --git a/core/src/main/scala/org/apache/spark/sql/execution/ddl.scala b/core/src/main/scala/org/apache/spark/sql/execution/ddl.scala index 591bdde9c7..b6e73eeafc 100644 --- a/core/src/main/scala/org/apache/spark/sql/execution/ddl.scala +++ b/core/src/main/scala/org/apache/spark/sql/execution/ddl.scala @@ -367,9 +367,10 @@ case class SnappyCacheTableCommand(tableIdent: TableIdentifier, queryString: Str if (isOffHeap) df.persist(StorageLevel.OFF_HEAP) else df.persist() Nil } else { + val queryShortString = CachedDataFrame.queryStringShortForm(queryString) val localProperties = session.sparkContext.getLocalProperties val previousJobDescription = localProperties.getProperty(SparkContext.SPARK_JOB_DESCRIPTION) - localProperties.setProperty(SparkContext.SPARK_JOB_DESCRIPTION, queryString) + localProperties.setProperty(SparkContext.SPARK_JOB_DESCRIPTION, queryShortString) try { session.sessionState.enableExecutionCache = true // Get the actual QueryExecution used by InMemoryRelation so that @@ -386,7 +387,7 @@ case class SnappyCacheTableCommand(tableIdent: TableIdentifier, queryString: Str }.get val planInfo = PartitionedPhysicalScan.getSparkPlanInfo(cachedExecution.executedPlan) Row(CachedDataFrame.withCallback(session, df = null, cachedExecution, "cache")(_ => - CachedDataFrame.withNewExecutionId(session, queryString, queryString, + CachedDataFrame.withNewExecutionId(session, queryShortString, queryString, cachedExecution.toString(), planInfo)({ val start = System.nanoTime() // Dummy op to materialize the cache. This does the minimal job of count on diff --git a/core/src/main/scala/org/apache/spark/sql/hive/SnappyHiveExternalCatalog.scala b/core/src/main/scala/org/apache/spark/sql/hive/SnappyHiveExternalCatalog.scala index c4bb5fcd3f..6662dfe07a 100644 --- a/core/src/main/scala/org/apache/spark/sql/hive/SnappyHiveExternalCatalog.scala +++ b/core/src/main/scala/org/apache/spark/sql/hive/SnappyHiveExternalCatalog.scala @@ -140,7 +140,7 @@ class SnappyHiveExternalCatalog private[hive](val conf: SparkConf, } catch { case he: Exception if isDisconnectException(he) => // stale JDBC connection - closeHive() + closeHive(clearCache = false) suspendActiveSession { hiveClient = hiveClient.newSession() } @@ -236,12 +236,13 @@ class SnappyHiveExternalCatalog private[hive](val conf: SparkConf, } override def listDatabases(): Seq[String] = { - withHiveExceptionHandling(super.listDatabases().map(toUpperCase)) :+ SYS_SCHEMA + (withHiveExceptionHandling(super.listDatabases().map(toUpperCase).toSet) + SYS_SCHEMA) + .toSeq.sorted } override def listDatabases(pattern: String): Seq[String] = { - withHiveExceptionHandling(super.listDatabases(pattern).map(toUpperCase)) ++ - StringUtils.filterPattern(Seq(SYS_SCHEMA), pattern) + (withHiveExceptionHandling(super.listDatabases(pattern).map(toUpperCase).toSet) ++ + StringUtils.filterPattern(Seq(SYS_SCHEMA), pattern)).toSeq.sorted } override def setCurrentDatabase(schema: String): Unit = { @@ -730,7 +731,8 @@ class SnappyHiveExternalCatalog private[hive](val conf: SparkConf, override def close(): Unit = {} - private[hive] def closeHive(): Unit = synchronized { + private[hive] def closeHive(clearCache: Boolean): Unit = synchronized { + if (clearCache) invalidateAll() // Non-isolated client can be closed here directly which is only present in cluster mode // using the new property HiveUtils.HIVE_METASTORE_ISOLATION not present in upstream. // Isolated loader would require reflection but that case is only in snappy-core @@ -800,7 +802,8 @@ object SnappyHiveExternalCatalog { def close(): Unit = synchronized { if (instance ne null) { - instance.withHiveExceptionHandling(instance.closeHive(), handleDisconnects = false) + instance.withHiveExceptionHandling(instance.closeHive(clearCache = true), + handleDisconnects = false) instance = null } } diff --git a/core/src/test/java/io/snappydata/api/JavaCreateIndexTestSuite.java b/core/src/test/java/io/snappydata/api/JavaCreateIndexTestSuite.java index a0d3fcb1e4..ac7d06afe9 100644 --- a/core/src/test/java/io/snappydata/api/JavaCreateIndexTestSuite.java +++ b/core/src/test/java/io/snappydata/api/JavaCreateIndexTestSuite.java @@ -22,6 +22,7 @@ import java.util.List; import java.util.Map; +import io.snappydata.Property; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; @@ -44,6 +45,7 @@ public class JavaCreateIndexTestSuite implements Serializable { @Before public void setUp() { + snc.setConf(Property.EnableExperimentalFeatures().name(), "true"); List dummyList = new ArrayList(); for (int i = 0; i < 2; i++) { DummyBeanClass object = new DummyBeanClass(); diff --git a/core/src/test/scala/io/snappydata/SnappyFunSuite.scala b/core/src/test/scala/io/snappydata/SnappyFunSuite.scala index b57b0fede1..9361e16db7 100644 --- a/core/src/test/scala/io/snappydata/SnappyFunSuite.scala +++ b/core/src/test/scala/io/snappydata/SnappyFunSuite.scala @@ -17,6 +17,7 @@ package io.snappydata import java.io.File +import java.sql.Statement import scala.collection.mutable.ArrayBuffer @@ -27,10 +28,15 @@ import io.snappydata.test.dunit.DistributedTestBase.{InitializeRun, WaitCriterio import io.snappydata.util.TestUtils import org.scalatest.Assertions +import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder} import org.apache.spark.sql.catalyst.expressions.{Alias, And, AttributeReference, EqualNullSafe, EqualTo, Exists, ExprId, Expression, ListQuery, PredicateHelper, PredicateSubquery, ScalarSubquery} import org.apache.spark.sql.catalyst.plans.logical.{Filter, Join, LogicalPlan, OneRowRelation, Sample} import org.apache.spark.sql.catalyst.util.{sideBySide, stackTraceToString} -import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, QueryTest, Row} +import org.apache.spark.sql.collection.Utils +import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils +import org.apache.spark.sql.row.SnappyStoreDialect +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, QueryTest, Row, SnappySession} // scalastyle:off import org.scalatest.{BeforeAndAfterAll, FunSuite, Outcome, Retries} // scalastyle:on @@ -234,6 +240,22 @@ object SnappyFunSuite extends Assertions { assert(query.queryExecution.executedPlan.missingInput.isEmpty, s"The physical plan has missing inputs:\n${query.queryExecution.executedPlan}") } + + /** + * Converts a JDBC ResultSet to a DataFrame. + */ + def resultSetToDataset(session: SnappySession, stmt: Statement) + (sql: String): Dataset[Row] = { + if (stmt.execute(sql)) { + val rs = stmt.getResultSet + val schema = JdbcUtils.getSchema(rs, SnappyStoreDialect) + val rows = Utils.resultSetToSparkInternalRows(rs, schema).map(_.copy()).toSeq + session.internalCreateDataFrame(session.sparkContext.makeRDD(rows), schema) + } else { + implicit val encoder: ExpressionEncoder[Row] = RowEncoder(StructType(Nil)) + session.createDataset[Row](Nil) + } + } } /** diff --git a/core/src/test/scala/org/apache/spark/sql/store/MetadataTest.scala b/core/src/test/scala/org/apache/spark/sql/store/MetadataTest.scala index e79042c7b0..c922e40b95 100644 --- a/core/src/test/scala/org/apache/spark/sql/store/MetadataTest.scala +++ b/core/src/test/scala/org/apache/spark/sql/store/MetadataTest.scala @@ -16,7 +16,7 @@ */ package org.apache.spark.sql.store -import java.sql.{SQLException, Statement} +import java.sql.SQLException import java.util.regex.Pattern import com.gemstone.gemfire.internal.shared.ClientSharedUtils @@ -25,14 +25,10 @@ import com.pivotal.gemfirexd.internal.engine.diag.SysVTIs import io.snappydata.SnappyFunSuite import org.scalatest.Assertions -import org.apache.spark.executor.InputMetrics import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchTableException} -import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder} import org.apache.spark.sql.execution.columnar.impl.ColumnPartitionResolver -import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils -import org.apache.spark.sql.row.SnappyStoreDialect import org.apache.spark.sql.types._ -import org.apache.spark.sql.{AnalysisException, Dataset, Row, SnappySession} +import org.apache.spark.sql.{AnalysisException, Dataset, Row} /** * Tests for meta-data queries using Spark SQL. @@ -118,21 +114,6 @@ object MetadataTest extends Assertions { getLongVarcharTuple("GATEWAYSENDERS"), ("OFFHEAPENABLED", 0, "BOOLEAN", false), ("ROWLEVELSECURITYENABLED", 0, "BOOLEAN", false)) - def resultSetToDataset(session: SnappySession, stmt: Statement) - (sql: String): Dataset[Row] = { - if (stmt.execute(sql)) { - val rs = stmt.getResultSet - val schema = JdbcUtils.getSchema(rs, SnappyStoreDialect) - val dummyMetrics = new InputMetrics - val rows = JdbcUtils.resultSetToSparkInternalRows(rs, schema, dummyMetrics) - .map(_.copy()).toSeq - session.internalCreateDataFrame(session.sparkContext.makeRDD(rows), schema) - } else { - implicit val encoder: ExpressionEncoder[Row] = RowEncoder(StructType(Nil)) - session.createDataset[Row](Nil) - } - } - def testSYSTablesAndVTIs(executeSQL: String => Dataset[Row], hostName: String = ClientSharedUtils.getLocalHost.getCanonicalHostName, netServers: Seq[String] = Seq(""), locator: String = "", locatorNetServer: String = "", diff --git a/core/src/test/scala/org/apache/spark/sql/store/ViewTest.scala b/core/src/test/scala/org/apache/spark/sql/store/ViewTest.scala index 2e4b3f7e07..3dce18c0b2 100644 --- a/core/src/test/scala/org/apache/spark/sql/store/ViewTest.scala +++ b/core/src/test/scala/org/apache/spark/sql/store/ViewTest.scala @@ -17,17 +17,63 @@ package org.apache.spark.sql.store +import java.sql.SQLException + import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem +import io.snappydata.SnappyFunSuite.checkAnswer import io.snappydata.{Property, SnappyFunSuite} +import org.scalatest.Assertions import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, HashJoinExec} -import org.apache.spark.sql.{AnalysisException, Row, SnappySession} +import org.apache.spark.sql.{AnalysisException, Dataset, Row, SnappySession} /** * Tests for temporary, global and persistent views. */ class ViewTest extends SnappyFunSuite { + override def beforeAll(): Unit = { + super.beforeAll() + ViewTest.createTables(this.snc.snappySession) + } + + override def afterAll(): Unit = { + ViewTest.dropTables(this.snc.snappySession) + super.afterAll() + } + + test("temporary view") { + ViewTest.testTemporaryView(snc.snappySession.sql, () => new SnappySession(sc).sql) + } + + test("global temporary view") { + ViewTest.testGlobalTemporaryView(snc.snappySession.sql, () => new SnappySession(sc).sql) + } + + test("temporary view using") { + ViewTest.testTemporaryViewUsing(snc.snappySession.sql, () => new SnappySession(sc).sql) + } + + test("global temporary view using") { + ViewTest.testGlobalTemporaryViewUsing(snc.snappySession.sql, () => new SnappySession(sc).sql) + } + + test("persistent view") { + ViewTest.testPersistentView(snc.snappySession.sql, checkPlans = true, + () => new SnappySession(sc).sql, restartSpark) + } + + private def restartSpark(): Unit = { + stopAll() + val sys = InternalDistributedSystem.getConnectedInstance + if (sys ne null) { + sys.disconnect() + } + } +} + +object ViewTest extends Assertions { + private val columnTable = "viewColTable" private val rowTable = "viewRowTable" private val numRows = 10 @@ -35,275 +81,306 @@ class ViewTest extends SnappyFunSuite { private val viewTempMeta = Seq(Row("ID", "int", null), Row("ADDR", "string", null), Row("RANK", "int", null)) - override def beforeAll(): Unit = { - super.beforeAll() - val session = this.snc.snappySession + private def getExpectedResult: Seq[Row] = { + (0 until numRows).map(i => Row(i, "address_" + (i + 1), i + 1)) + } + + private def tableExists(executeSQL: String => Dataset[Row], name: String): Boolean = { + try { + executeSQL(s"select 1 from $name where 1 = 0") + true + } catch { + case _: Exception => false + } + } + + def createTables(session: SnappySession): Unit = { session.sql(s"create table $columnTable (id int, addr varchar(20)) using column " + "options (partition_by 'id')") session.sql(s"create table $rowTable (id int, addr varchar(20)) using row " + s"options (partition_by 'id', colocate_with '$columnTable')") val rows = (0 until numRows).map(i => Row(i, "address_" + (i + 1))) - snc.insert(columnTable, rows: _*) - snc.insert(rowTable, rows: _*) - } - - private def getExpectedResult: Seq[Row] = { - (0 until numRows).map(i => Row(i, "address_" + (i + 1), i + 1)) + session.insert(columnTable, rows: _*) + session.insert(rowTable, rows: _*) } - private def tableExists(session: SnappySession, name: String): Boolean = { - val identifier = session.tableIdentifier(name) - session.sessionCatalog.isTemporaryTable(identifier) || - session.sessionCatalog.tableExists(identifier) + def dropTables(session: SnappySession): Unit = { + session.sql(s"drop table $rowTable") + session.sql(s"drop table $columnTable") } - test("temporary view") { - val session = this.snc.snappySession - + def testTemporaryView(executeSQL: String => Dataset[Row], + newExecution: () => String => Dataset[Row]): Unit = { val tableMeta = Seq(Row("ID", "int", null), Row("ADDR", "varchar(20)", null)) - checkAnswer(session.sql(s"describe $columnTable"), tableMeta) - checkAnswer(session.sql(s"describe $rowTable"), tableMeta) + checkAnswer(executeSQL(s"describe $columnTable"), tableMeta) + checkAnswer(executeSQL(s"describe $rowTable"), tableMeta) val expected = getExpectedResult val showResult = Seq(Row("", "VIEWONTABLE", true, false)) // check temporary view and its meta-data for column table - session.sql(s"create temporary view viewOnTable as $viewQuery from $columnTable") + executeSQL(s"create temporary view viewOnTable as $viewQuery from $columnTable") - assert(tableExists(session, "viewOnTable") === true) - checkAnswer(session.sql("describe viewOnTable"), viewTempMeta) - checkAnswer(session.sql("select * from viewOnTable"), expected) - checkAnswer(session.sql("show views"), showResult) - checkAnswer(session.sql("show views in app"), showResult) - checkAnswer(session.sql("show views from app"), showResult) + assert(tableExists(executeSQL, "viewOnTable") === true) + checkAnswer(executeSQL("describe viewOnTable"), viewTempMeta) + checkAnswer(executeSQL("select * from viewOnTable"), expected) + checkAnswer(executeSQL("show views"), showResult) + checkAnswer(executeSQL("show views in app"), showResult) + checkAnswer(executeSQL("show views from app"), showResult) // should not be visible from another session - val session2 = session.newSession() - assert(tableExists(session2, "viewOnTable") === false) + val executeSQL2 = newExecution() + assert(tableExists(executeSQL2, "viewOnTable") === false) // drop and check unavailability - session.sql("drop view viewOnTable") - assert(tableExists(session, "viewOnTable") === false) - assert(tableExists(session2, "viewOnTable") === false) + executeSQL("drop view viewOnTable") + assert(tableExists(executeSQL, "viewOnTable") === false) + assert(tableExists(executeSQL2, "viewOnTable") === false) // check the same for view on row table - session.sql(s"create temporary view viewOnTable as $viewQuery from $rowTable") + executeSQL(s"create temporary view viewOnTable as $viewQuery from $rowTable") - assert(tableExists(session, "viewOnTable") === true) - checkAnswer(session.sql("describe viewOnTable"), viewTempMeta) - checkAnswer(session.sql("select * from viewOnTable"), expected) + assert(tableExists(executeSQL, "viewOnTable") === true) + checkAnswer(executeSQL("describe viewOnTable"), viewTempMeta) + checkAnswer(executeSQL("select * from viewOnTable"), expected) - assert(tableExists(session2, "viewOnTable") === false) - session.sql("drop view viewOnTable") - assert(tableExists(session, "viewOnTable") === false) - assert(tableExists(session2, "viewOnTable") === false) - - session2.close() + assert(tableExists(executeSQL2, "viewOnTable") === false) + executeSQL("drop view viewOnTable") + assert(tableExists(executeSQL, "viewOnTable") === false) + assert(tableExists(executeSQL2, "viewOnTable") === false) } - test("global temporary view") { - val session = this.snc.snappySession - + def testGlobalTemporaryView(executeSQL: String => Dataset[Row], + newExecution: () => String => Dataset[Row]): Unit = { val expected = getExpectedResult val showResult = Seq(Row("GLOBAL_TEMP", "VIEWONTABLE", true, true)) // check temporary view and its meta-data for column table - session.sql(s"create global temporary view viewOnTable as $viewQuery from $columnTable") + executeSQL(s"create global temporary view viewOnTable as $viewQuery from $columnTable") - assert(session.sessionCatalog.getGlobalTempView("viewOnTable").isDefined) - checkAnswer(session.sql("describe global_temp.viewOnTable"), viewTempMeta) - checkAnswer(session.sql("select * from viewOnTable"), expected) - checkAnswer(session.sql("show views"), Nil) - checkAnswer(session.sql("show views in global_temp"), showResult) - checkAnswer(session.sql("show views from global_temp"), showResult) + assert(executeSQL("show views in global_temp").collect() === + Array(Row("GLOBAL_TEMP", "VIEWONTABLE", true, true))) + checkAnswer(executeSQL("describe global_temp.viewOnTable"), viewTempMeta) + checkAnswer(executeSQL("select * from viewOnTable"), expected) + checkAnswer(executeSQL("show views"), Nil) + checkAnswer(executeSQL("show views in global_temp"), showResult) + checkAnswer(executeSQL("show views from global_temp"), showResult) // should be visible from another session - val session2 = session.newSession() - assert(session2.sessionCatalog.getGlobalTempView("viewOnTable").isDefined) - checkAnswer(session2.sql("describe global_temp.viewOnTable"), viewTempMeta) - checkAnswer(session2.sql("select * from viewOnTable"), expected) + val executeSQL2 = newExecution() + assert(executeSQL2("show views in global_temp").collect() === + Array(Row("GLOBAL_TEMP", "VIEWONTABLE", true, true))) + checkAnswer(executeSQL2("describe global_temp.viewOnTable"), viewTempMeta) + checkAnswer(executeSQL2("select * from viewOnTable"), expected) // drop and check unavailability - session.sql("drop view viewOnTable") - assert(session.sessionCatalog.getGlobalTempView("viewOnTable").isEmpty) - assert(session2.sessionCatalog.getGlobalTempView("viewOnTable").isEmpty) + executeSQL("drop view viewOnTable") + assert(executeSQL("show views in global_temp").collect().isEmpty) + assert(executeSQL2("show views in global_temp").collect().isEmpty) // check the same for view on row table - session.sql(s"create global temporary view viewOnTable as $viewQuery from $columnTable") - - assert(session.sessionCatalog.getGlobalTempView("viewOnTable").isDefined) - checkAnswer(session.sql("describe global_temp.viewOnTable"), viewTempMeta) - checkAnswer(session.sql("select * from viewOnTable"), expected) + executeSQL(s"create global temporary view viewOnTable as $viewQuery from $columnTable") - assert(session2.sessionCatalog.getGlobalTempView("viewOnTable").isDefined) - checkAnswer(session2.sql("describe global_temp.viewOnTable"), viewTempMeta) - checkAnswer(session2.sql("select * from viewOnTable"), expected) + assert(executeSQL("show views in global_temp").collect() === + Array(Row("GLOBAL_TEMP", "VIEWONTABLE", true, true))) + checkAnswer(executeSQL("describe global_temp.viewOnTable"), viewTempMeta) + checkAnswer(executeSQL("select * from viewOnTable"), expected) - session.sql("drop view viewOnTable") - assert(session.sessionCatalog.getGlobalTempView("viewOnTable").isEmpty) - assert(session2.sessionCatalog.getGlobalTempView("viewOnTable").isEmpty) + assert(executeSQL2("show views in global_temp").collect() === + Array(Row("GLOBAL_TEMP", "VIEWONTABLE", true, true))) + checkAnswer(executeSQL2("describe global_temp.viewOnTable"), viewTempMeta) + checkAnswer(executeSQL2("select * from viewOnTable"), expected) - session2.close() + executeSQL("drop view viewOnTable") + assert(executeSQL("show views in global_temp").collect().isEmpty) + assert(executeSQL2("show views in global_temp").collect().isEmpty) } - test("temporary view using") { - val session = this.snc.snappySession - + def testTemporaryViewUsing(executeSQL: String => Dataset[Row], + newExecution: () => String => Dataset[Row]): Unit = { // check temporary view with USING and its meta-data val hfile: String = getClass.getResource("/2015.parquet").getPath - val airline = session.read.parquet(hfile) - session.sql(s"create temporary view airlineView using parquet options(path '$hfile')") - val airlineView = session.table("airlineView") + executeSQL(s"create external table airlineTemp using parquet options (path '$hfile')") + val airline = executeSQL("select * from airlineTemp limit 1") + executeSQL(s"create temporary view airlineView using parquet options(path '$hfile')") + val airlineView = executeSQL("select * from airlineView limit 1") - assert(tableExists(session, "airlineView") === true) + assert(tableExists(executeSQL, "airlineView") === true) assert(airlineView.schema === airline.schema) - checkAnswer(session.sql("select count(*) from airlineView"), Seq(Row(airline.count()))) - assert(airlineView.count() == airline.count()) + checkAnswer(executeSQL("select count(*) from airlineView"), + executeSQL("select count(*) from airlineTemp").collect()) // should not be visible from another session - val session2 = session.newSession() - assert(tableExists(session2, "airlineView") === false) + val executeSQL2 = newExecution() + assert(tableExists(executeSQL2, "airlineView") === false) // drop and check unavailability - session.sql("drop table airlineView") - assert(tableExists(session, "airlineView") === false) - assert(tableExists(session2, "airlineView") === false) - - session2.close() + executeSQL("drop table airlineTemp") + executeSQL("drop table airlineView") + assert(tableExists(executeSQL, "airlineTemp") === false) + assert(tableExists(executeSQL2, "airlineTemp") === false) + assert(tableExists(executeSQL, "airlineView") === false) + assert(tableExists(executeSQL2, "airlineView") === false) } - test("global temporary view using") { - val session = this.snc.snappySession - + def testGlobalTemporaryViewUsing(executeSQL: String => Dataset[Row], + newExecution: () => String => Dataset[Row]): Unit = { // check global temporary view with USING and its meta-data val hfile: String = getClass.getResource("/2015.parquet").getPath - val airline = session.read.parquet(hfile) - session.sql(s"create global temporary view airlineView using parquet options(path '$hfile')") - val airlineView = session.table("airlineView") + executeSQL(s"create external table airlineTemp using parquet options (path '$hfile')") + val airline = executeSQL("select * from airlineTemp limit 1") + executeSQL(s"create global temporary view airlineView using parquet options(path '$hfile')") + val airlineView = executeSQL("select * from airlineView limit 1") - assert(session.sessionCatalog.getGlobalTempView("airlineView").isDefined) + assert(executeSQL("show views in global_temp").collect() === + Array(Row("GLOBAL_TEMP", "AIRLINEVIEW", true, true))) assert(airlineView.schema === airline.schema) - checkAnswer(session.sql("select count(*) from airlineView"), Seq(Row(airline.count()))) - assert(airlineView.count() == airline.count()) + checkAnswer(executeSQL("select count(*) from airlineView"), + executeSQL("select count(*) from airlineTemp").collect()) // should be visible from another session - val session2 = session.newSession() - assert(session2.sessionCatalog.getGlobalTempView("airlineView").isDefined) - checkAnswer(session2.sql("select count(*) from airlineView"), Seq(Row(airline.count()))) + val executeSQL2 = newExecution() + assert(executeSQL2("show views in global_temp").collect() === + Array(Row("GLOBAL_TEMP", "AIRLINEVIEW", true, true))) + checkAnswer(executeSQL2("select count(*) from airlineView"), + executeSQL("select count(*) from airlineTemp").collect()) // drop and check unavailability - session.sql("drop table airlineView") - assert(session.sessionCatalog.getGlobalTempView("airlineView").isEmpty) - assert(session2.sessionCatalog.getGlobalTempView("airlineView").isEmpty) - - session2.close() + executeSQL("drop table airlineTemp") + executeSQL("drop table airlineView") + assert(tableExists(executeSQL, "airlineTemp") === false) + assert(tableExists(executeSQL2, "airlineTemp") === false) + assert(executeSQL("show views in global_temp").collect().isEmpty) + assert(executeSQL2("show views in global_temp").collect().isEmpty) } - test("persistent view") { + def testPersistentView(executeSQL: String => Dataset[Row], checkPlans: Boolean, + newExecution: () => String => Dataset[Row], restartSpark: () => Unit): Unit = { val expected = getExpectedResult // check temporary view and its meta-data for column table - checkPersistentView(columnTable, rowTable, snc.snappySession, expected) - // check the same for view on row table - checkPersistentView(rowTable, columnTable, snc.snappySession, expected) + checkPersistentView(columnTable, rowTable, checkPlans, executeSQL, newExecution, + expected, restartSpark) + // check the same for view on row table with new session since old one would not be valid + val newExecuteSQL = newExecution() + checkPersistentView(rowTable, columnTable, checkPlans, newExecuteSQL, newExecution, + expected, restartSpark) } - private def checkPersistentView(table: String, otherTable: String, session: SnappySession, - expectedResult: Seq[Row]): Unit = { - session.sql(s"create view viewOnTable as $viewQuery from $table") + private def checkPersistentView(table: String, otherTable: String, checkPlans: Boolean, + executeSQL: String => Dataset[Row], newExecution: () => String => Dataset[Row], + expectedResult: Seq[Row], restartSpark: () => Unit): Unit = { + executeSQL(s"create view viewOnTable as $viewQuery from $table") val viewMeta = Seq(Row("ID", "int", null), Row("ADDR", "varchar(20)", null), Row("RANK", "int", null)) val showResult = Seq(Row("APP", "VIEWONTABLE", false, false)) - assert(tableExists(session, "viewOnTable") === true) - checkAnswer(session.sql("describe viewOnTable"), viewMeta) - checkAnswer(session.sql("select * from viewOnTable"), expectedResult) - checkAnswer(session.sql("show views"), showResult) - checkAnswer(session.sql("show views in app"), showResult) - checkAnswer(session.sql("show views from app"), showResult) + assert(tableExists(executeSQL, "viewOnTable") === true) + checkAnswer(executeSQL("describe viewOnTable"), viewMeta) + checkAnswer(executeSQL("select * from viewOnTable"), expectedResult) + checkAnswer(executeSQL("show views"), showResult) + checkAnswer(executeSQL("show views in app"), showResult) + checkAnswer(executeSQL("show views from app"), showResult) // should be visible from another session - var session2 = session.newSession() - assert(tableExists(session2, "viewOnTable") === true) - checkAnswer(session2.sql("describe viewOnTable"), viewMeta) - checkAnswer(session2.sql("select * from viewOnTable"), expectedResult) + var executeSQL2 = newExecution() + assert(tableExists(executeSQL2, "viewOnTable") === true) + checkAnswer(executeSQL2("describe viewOnTable"), viewMeta) + checkAnswer(executeSQL2("select * from viewOnTable"), expectedResult) // test for SNAP-2205: see CompressionCodecId.isCompressed for a description of the problem - session.conf.set(Property.ColumnBatchSize.name, "10k") + executeSQL(s"set ${Property.ColumnBatchSize.name}=10k") // 21 columns mean 63 for ColumnStatsSchema so total of 64 fields including the COUNT // in the stats row which will fit in exactly one long for the nulls bitset val cols = (1 to 21).map(i => s"col$i string").mkString(", ") - session.sql(s"CREATE TABLE test2205 ($cols) using column options (buckets '4')") + executeSQL(s"CREATE TABLE test2205 ($cols) using column options (buckets '4')") val numElements = 10000 val projection = (1 to 21).map(i => s"null as col$i") - session.range(numElements).selectExpr(projection: _*).write.insertInto("test2205") + executeSQL( + s"insert into test2205 select ${projection.mkString(", ")} from range($numElements)") - checkAnswer(session.sql("select count(*), count(col10) from test2205"), + checkAnswer(executeSQL("select count(*), count(col10) from test2205"), Seq(Row(numElements, 0))) - // should be available after a restart - session.close() - session2.close() - stopAll() - val sys = InternalDistributedSystem.getConnectedInstance - if (sys ne null) { - sys.disconnect() - } + // test large view + val longStr = (1 to 1000).mkString("test data ", "", "") + val largeViewStr = (1 to 100).map(i => + s"case when $i % 3 == 0 then cast(null as string) else '$longStr[$i]' end as c$i").mkString( + "select ", ", ", "") + assert(largeViewStr.length > 100000) + var rs = executeSQL2(largeViewStr).collect() + assert(rs.length == 1) + executeSQL2(s"create view largeView as $largeViewStr").collect() + rs = executeSQL("select * from largeView").collect() + assert(rs.length == 1) - session2 = new SnappySession(sc) - assert(tableExists(session2, "viewOnTable") === true) - checkAnswer(session2.sql("describe viewOnTable"), viewMeta) - checkAnswer(session2.sql("select * from viewOnTable"), expectedResult) + // should be available after a restart + restartSpark() + executeSQL2 = newExecution() + assert(tableExists(executeSQL2, "viewOnTable") === true) + checkAnswer(executeSQL2("describe viewOnTable"), viewMeta) + checkAnswer(executeSQL2("select * from viewOnTable"), expectedResult) - checkAnswer(session2.sql("select count(*), count(col10) from test2205"), + checkAnswer(executeSQL2("select count(*), count(col10) from test2205"), Seq(Row(numElements, 0))) try { - session2.sql("drop table viewOnTable") + executeSQL2("drop table viewOnTable") fail("expected drop table to fail for view") } catch { - case _: AnalysisException => // expected + case _: AnalysisException | _: SQLException => // expected } // drop and check unavailability - session2.sql("drop view viewOnTable") - assert(tableExists(session2, "viewOnTable") === false) - session2.sql("drop table test2205") + executeSQL2("drop view viewOnTable") + assert(tableExists(executeSQL2, "viewOnTable") === false) + executeSQL2("drop table test2205") + + // test large view after restart + rs = executeSQL2("select * from largeView").collect() + assert(rs.length == 1) + executeSQL2("drop view largeView") // check colocated joins with VIEWs (SNAP-2204) val query = s"select c.id, r.addr from $columnTable c inner join $rowTable r on (c.id = r.id)" // first check with normal query - var ds = session2.sql(query) + var ds = executeSQL2(query) checkAnswer(ds, expectedResult.map(r => Row(r.get(0), r.get(1)))) - var plan = ds.queryExecution.executedPlan - assert(plan.find(_.isInstanceOf[HashJoinExec]).isDefined) - assert(plan.find(_.isInstanceOf[BroadcastHashJoinExec]).isEmpty) + if (checkPlans) { + val plan = ds.queryExecution.executedPlan + assert(plan.find(_.isInstanceOf[HashJoinExec]).isDefined) + assert(plan.find(_.isInstanceOf[BroadcastHashJoinExec]).isEmpty) + } val expectedResult2 = expectedResult.map(r => Row(r.get(0), r.get(1))) // check for normal view join with table - session2.sql(s"create view viewOnTable as select id, addr, id + 1 from $table") - ds = session2.sql("select t.id, v.addr from viewOnTable v " + + executeSQL2(s"create view viewOnTable as select id, addr, id + 1 from $table") + ds = executeSQL2("select t.id, v.addr from viewOnTable v " + s"inner join $otherTable t on (v.id = t.id)") checkAnswer(ds, expectedResult2) - plan = ds.queryExecution.executedPlan - assert(plan.find(_.isInstanceOf[HashJoinExec]).isDefined) - assert(plan.find(_.isInstanceOf[BroadcastHashJoinExec]).isEmpty) + if (checkPlans) { + val plan = ds.queryExecution.executedPlan + assert(plan.find(_.isInstanceOf[HashJoinExec]).isDefined) + assert(plan.find(_.isInstanceOf[BroadcastHashJoinExec]).isEmpty) + } - session2.sql("drop view viewOnTable") - assert(tableExists(session2, "viewOnTable") === false) + executeSQL2("drop view viewOnTable") + assert(tableExists(executeSQL2, "viewOnTable") === false) // next query on a join view - session2.sql(s"create view viewOnJoin as $query") - ds = session2.sql("select * from viewOnJoin") + executeSQL2(s"create view viewOnJoin as $query") + ds = executeSQL2("select * from viewOnJoin") checkAnswer(ds, expectedResult2) - plan = ds.queryExecution.executedPlan - assert(plan.find(_.isInstanceOf[HashJoinExec]).isDefined) - assert(plan.find(_.isInstanceOf[BroadcastHashJoinExec]).isEmpty) + if (checkPlans) { + val plan = ds.queryExecution.executedPlan + assert(plan.find(_.isInstanceOf[HashJoinExec]).isDefined) + assert(plan.find(_.isInstanceOf[BroadcastHashJoinExec]).isEmpty) + } - session2.sql("drop view viewOnJoin") - assert(tableExists(session2, "viewOnJoin") === false) + executeSQL2("drop view viewOnJoin") + assert(tableExists(executeSQL2, "viewOnJoin") === false) } } diff --git a/dtests/src/test/java/io/snappydata/hydra/smoke.sh b/dtests/src/test/java/io/snappydata/hydra/smoke.sh index c7d502100e..5847d56c52 100755 --- a/dtests/src/test/java/io/snappydata/hydra/smoke.sh +++ b/dtests/src/test/java/io/snappydata/hydra/smoke.sh @@ -38,9 +38,7 @@ mkdir -p $resultDir shift $SNAPPYDATA_SOURCE_DIR/store/tests/core/src/main/java/bin/sample-runbt.sh $resultDir $SNAPPYDATA_SOURCE_DIR -r 1 -d false io/snappydata/hydra/cluster/startDualModeCluster_smoke.bt -sleep 30; $SNAPPYDATA_SOURCE_DIR/store/tests/core/src/main/java/bin/sample-runbt.sh $resultDir $SNAPPYDATA_SOURCE_DIR -r 1 -d false io/snappydata/hydra/smoke.bt -sleep 30; $SNAPPYDATA_SOURCE_DIR/store/tests/core/src/main/java/bin/sample-runbt.sh $resultDir $SNAPPYDATA_SOURCE_DIR -r 1 -d false io/snappydata/hydra/cluster/stopDualModeCluster.bt diff --git a/gradle.properties b/gradle.properties index 905f38f193..e097267349 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,3 +1,10 @@ +# Gradle daemon has been disabled due to two reasons: +# 1) It frequently fails after a few runs due to OOME. +# 2) Messes up buildOutput.log by writing to it multiple +# times, increasing by one in every run i.e. first run +# will be good, then second run will write each line twice, +# third run thrice and so on. Clearing the loggerService +# explicitly makes no difference. org.gradle.daemon=false org.gradle.warning.mode=none #org.gradle.parallel=true