Skip to content

Commit

Permalink
[SNAP-2818] trim the JOB_DESCRIPTION property in Spark jobs (#1227)
Browse files Browse the repository at this point in the history
- previous changes set SparkContext.SPARK_JOB_DESCRIPTION property to the query string
  for SnappySession.sql executions but this can exceed 32K and property will fail in
  serialization, so trim it to 100 size with "..." continuation like done in SQL tab display
- added large view test to ViewTest and enhanced it to accept a generic "String => DataFrame"
  closure so that the same can be used for scala tests for SnappySession as well as dunits
  for JDBC Statement.execute; added the same tests to DDLRoutingDUnitTest using this

Note: the above test is unable the reproduce the original issue with CREATE VIEW but it does
reproduce for a large query string

- disallow CREATE INDEX creation on column tables without experimental-features property
- clear catalog cache in shutdown to avoid its accidental use by subsequent tests
  • Loading branch information
Sumedh Wale authored Dec 28, 2018
1 parent 8e08267 commit 9299a80
Show file tree
Hide file tree
Showing 18 changed files with 387 additions and 241 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@ package io.snappydata.cluster
import java.sql.{Connection, DriverManager, SQLException}

import com.pivotal.gemfirexd.internal.engine.{GfxdConstants, Misc}
import io.snappydata.SnappyFunSuite.resultSetToDataset
import io.snappydata.test.dunit.{AvailablePortHelper, SerializableRunnable}

import org.apache.spark.sql.SnappyContext
import org.apache.spark.sql.collection.Utils
import org.apache.spark.sql.store.ViewTest
import org.apache.spark.sql.{Dataset, Row, SnappyContext, SnappySession}

class DDLRoutingDUnitTest(val s: String) extends ClusterManagerTestBase(s) {

Expand Down Expand Up @@ -72,12 +74,12 @@ class DDLRoutingDUnitTest(val s: String) extends ClusterManagerTestBase(s) {
val conn = getANetConnection(netPort1)

// first fail a statement
failCreateTableXD(conn, tableName, true, " row ")
failCreateTableXD(conn, tableName, doFail = true, " row ")

createTableXD(conn, tableName, " row ")
tableMetadataAssertRowTable("APP", tableName)
// Test create table - error for recreate
failCreateTableXD(conn, tableName, false, " row ")
failCreateTableXD(conn, tableName, doFail = false, " row ")

// Drop Table and Recreate
dropTableXD(conn, tableName)
Expand Down Expand Up @@ -167,7 +169,7 @@ class DDLRoutingDUnitTest(val s: String) extends ClusterManagerTestBase(s) {

vm2.invoke(classOf[ClusterManagerTestBase], "stopAny")
val props = bootProps.clone().asInstanceOf[java.util.Properties]
props.put("distributed-system-id" , "1")
props.put("distributed-system-id", "1")
props.put("server-groups", "sg1")

val restartServer = new SerializableRunnable() {
Expand All @@ -185,7 +187,7 @@ class DDLRoutingDUnitTest(val s: String) extends ClusterManagerTestBase(s) {
var s = conn.createStatement()
s.execute(s"CREATE TABLE $tableName (Col1 INT, Col2 INT, Col3 STRING)")
insertDataXD(conn, tableName)
var snc = org.apache.spark.sql.SnappyContext(sc)
val snc = org.apache.spark.sql.SnappyContext(sc)
verifyResultAndSchema(snc, tableName, 3)

s.execute(s"ALTER TABLE $tableName ADD Col4 INT")
Expand All @@ -207,21 +209,21 @@ class DDLRoutingDUnitTest(val s: String) extends ClusterManagerTestBase(s) {
s.execute(s"insert into $tableName values (1,1)")
s.execute(s"ALTER TABLE $tableName add constraint emp_uk unique (Col1)")
try {
s.execute(s"insert into $tableName values (1,1)")
s.execute(s"insert into $tableName values (1,1)")
} catch {
case sqle: SQLException =>
if (sqle.getSQLState != "23505" ||
!sqle.getMessage.contains("duplicate key value in a unique or" +
" primary key constraint or unique index")) {
!sqle.getMessage.contains("duplicate key value in a unique or" +
" primary key constraint or unique index")) {
throw sqle
}
}

// asynceventlistener
s.execute("CREATE ASYNCEVENTLISTENER myListener (" +
" listenerclass 'com.pivotal.gemfirexd.callbacks.DBSynchronizer'" +
" initparams 'org.apache.derby.jdbc.EmbeddedDriver,jdbc:derby:newDB;create=true')" +
" server groups(sg1)")
" listenerclass 'com.pivotal.gemfirexd.callbacks.DBSynchronizer'" +
" initparams 'org.apache.derby.jdbc.EmbeddedDriver,jdbc:derby:newDB;create=true')" +
" server groups(sg1)")

s.execute(s"ALTER TABLE $tableName SET ASYNCEVENTLISTENER (myListener) ")
var rs = s.executeQuery(s"select * from SYS.SYSTABLES where tablename='$tableName'")
Expand Down Expand Up @@ -287,7 +289,8 @@ class DDLRoutingDUnitTest(val s: String) extends ClusterManagerTestBase(s) {
var cnt = 0
while (rs.next()) {
cnt += 1
rs.getInt(1); rs.getInt(2);
rs.getInt(1)
rs.getInt(2)
}
assert(cnt == 5, cnt)

Expand All @@ -296,7 +299,9 @@ class DDLRoutingDUnitTest(val s: String) extends ClusterManagerTestBase(s) {
cnt = 0
while (rs2.next()) {
cnt += 1
rs2.getInt(1); rs2.getInt(2); rs2.getInt(3);
rs2.getInt(1)
rs2.getInt(2)
rs2.getInt(3)
}
assert(cnt == 5, cnt)

Expand Down Expand Up @@ -324,6 +329,36 @@ class DDLRoutingDUnitTest(val s: String) extends ClusterManagerTestBase(s) {
dropTableXD(conn, tableName)
}

def testViews(): Unit = {
val netPort1 = AvailablePortHelper.getRandomAvailableTCPPort
vm2.invoke(classOf[ClusterManagerTestBase], "startNetServer", netPort1)

val session = new SnappySession(sc)
ViewTest.createTables(session)

def newExecution(): String => Dataset[Row] = {
val session = new SnappySession(sc)
val conn = getANetConnection(netPort1)
val stmt = conn.createStatement()
resultSetToDataset(session, stmt)
}

val conn = getANetConnection(netPort1)
val stmt = conn.createStatement()
ViewTest.testTemporaryView(resultSetToDataset(session, stmt), newExecution)
ViewTest.testGlobalTemporaryView(resultSetToDataset(session, stmt), newExecution)
ViewTest.testTemporaryViewUsing(resultSetToDataset(session, stmt), newExecution)
ViewTest.testGlobalTemporaryViewUsing(resultSetToDataset(session, stmt), newExecution)
ViewTest.testPersistentView(resultSetToDataset(session, stmt), checkPlans = false,
newExecution, restartSpark)
ViewTest.dropTables(new SnappySession(sc))
}

private def restartSpark(): Unit = {
ClusterManagerTestBase.stopAny()
ClusterManagerTestBase.startSnappyLead(ClusterManagerTestBase.locatorPort, bootProps)
}

def createTableXD(conn: Connection, tableName: String,
usingStr: String): Unit = {
val s = conn.createStatement()
Expand Down Expand Up @@ -421,7 +456,7 @@ class DDLRoutingDUnitTest(val s: String) extends ClusterManagerTestBase(s) {
s.execute("CREATE EXTERNAL TABLE airlineRef_temp(Code VARCHAR(25), " +
"Description VARCHAR(25)) USING parquet OPTIONS()")
} catch {
case e: java.sql.SQLException =>
case _: java.sql.SQLException =>
// println("Exception stack. create. ex=" + e.getMessage +
// " ,stack=" + ExceptionUtils.getFullStackTrace(e))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -761,6 +761,7 @@ class QueryRoutingDUnitTest(val s: String)

TPCHUtils.createAndLoadTables(snc, true)

snc.setConf(Property.EnableExperimentalFeatures.name, "true")
snc.sql(
s"""CREATE INDEX idx_orders_cust ON orders(o_custkey)
options (COLOCATE_WITH 'customer')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import io.snappydata.{SnappyFunSuite, SnappyTableStatsProviderService}
import org.scalatest.BeforeAndAfterAll

import org.apache.spark.sql.SnappySession
import org.apache.spark.sql.store.{ColumnTableBatchInsertTest, MetadataTest}
import org.apache.spark.sql.store.ColumnTableBatchInsertTest

class QueryRoutingSingleNodeSuite extends SnappyFunSuite with BeforeAndAfterAll {

Expand Down Expand Up @@ -707,7 +707,7 @@ class QueryRoutingSingleNodeSuite extends SnappyFunSuite with BeforeAndAfterAll
val connSession = allSessions.head
// skip the "isCached" checks with JDBC since session is different for JDBC connection
ColumnTableBatchInsertTest.testSparkCachingUsingSQL(sc,
MetadataTest.resultSetToDataset(connSession, stmt), connSession.catalog.isCached,
SnappyFunSuite.resultSetToDataset(connSession, stmt), connSession.catalog.isCached,
df => connSession.sharedState.cacheManager.lookupCachedData(df).isDefined)
stmt.close()
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class SQLMetadataTest extends SnappyFunSuite {
val conn = DriverManager.getConnection(s"jdbc:snappydata://localhost:$netPort")
try {
val stmt = conn.createStatement()
MetadataTest.testSYSTablesAndVTIs(MetadataTest.resultSetToDataset(session, stmt),
MetadataTest.testSYSTablesAndVTIs(SnappyFunSuite.resultSetToDataset(session, stmt),
netServers = Seq(s"localhost/127.0.0.1[$netPort]"))
stmt.close()
} finally {
Expand All @@ -58,7 +58,7 @@ class SQLMetadataTest extends SnappyFunSuite {
val conn = DriverManager.getConnection(s"jdbc:snappydata://localhost:$netPort")
try {
val stmt = conn.createStatement()
MetadataTest.testDescribeShowAndExplain(MetadataTest.resultSetToDataset(session, stmt),
MetadataTest.testDescribeShowAndExplain(SnappyFunSuite.resultSetToDataset(session, stmt),
usingJDBC = true)
stmt.close()
} finally {
Expand All @@ -71,7 +71,7 @@ class SQLMetadataTest extends SnappyFunSuite {
val conn = DriverManager.getConnection(s"jdbc:snappydata://localhost:$netPort")
try {
val stmt = conn.createStatement()
MetadataTest.testDSIDWithSYSTables(MetadataTest.resultSetToDataset(session, stmt),
MetadataTest.testDSIDWithSYSTables(SnappyFunSuite.resultSetToDataset(session, stmt),
Seq(s"localhost/127.0.0.1[$netPort]"))
stmt.close()
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -681,8 +681,7 @@ class SplitClusterDUnitSecurityTest(s: String)
s"CREATE TEMPORARY TABLE ${t1}temp AS SELECT id, name FROM $schema.$t1",
s"CREATE GLOBAL TEMPORARY TABLE ${t1}tempg AS SELECT id, name FROM $schema.$t1",
s"CREATE EXTERNAL TABLE $schema.${t1}ext USING csv OPTIONS(path " +
s"'../../quickstart/src/main/resources/customer.csv')",
s"CREATE INDEX $schema.idx ON $schema.$t1 (id, name)")
s"'../../quickstart/src/main/resources/customer.csv')")
.foreach(executeSQL(user1Stmt, _))

// user gemfire2 of same group gemGroup1
Expand All @@ -698,7 +697,6 @@ class SplitClusterDUnitSecurityTest(s: String)
s"select * from $schema.$t2",
s"delete from $schema.$t1 where name like 'two'",
s"drop table $schema.$t1r",
s"drop index $schema.idx",
s"select * from $schema.$t2").foreach(executeSQL(user2Stmt, _))

// user gemfire1
Expand All @@ -724,7 +722,7 @@ class SplitClusterDUnitSecurityTest(s: String)
s"CREATE INDEX $schema.idx4 ON $schema.$t1 (id, name)")
.foreach(sql => assertFailures(() => {
executeSQL(user4Stmt, sql)
}, sql, Seq("42500", "42502", "42506", "42507")))
}, sql, Seq("42500", "42502", "42506", "42507", "38000")))

// Grant DML permissions to gemfire4 and ensure it works.
executeSQL(user1Stmt, s"grant select on $schema.$t1 to ldapgroup:$group2")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import com.pivotal.gemfirexd.internal.engine.Misc
import io.snappydata.test.dunit.{SerializableRunnable, VM}
import io.snappydata.test.util.TestException
import io.snappydata.util.TestUtils
import io.snappydata.{ColumnUpdateDeleteTests, Constant}
import io.snappydata.{ColumnUpdateDeleteTests, Constant, SnappyFunSuite}
import org.junit.Assert

import org.apache.spark.sql.catalyst.InternalRow
Expand Down Expand Up @@ -296,11 +296,11 @@ trait SplitClusterDUnitTestObject extends Logging {
netServers, locatorId, locatorNetServer, servers, leadId)
// next test metadata using JDBC connection
stmt = jdbcConn.createStatement()
MetadataTest.testSYSTablesAndVTIs(MetadataTest.resultSetToDataset(session, stmt),
MetadataTest.testSYSTablesAndVTIs(SnappyFunSuite.resultSetToDataset(session, stmt),
hostName = "localhost", netServers, locatorId, locatorNetServer, servers, leadId)
MetadataTest.testDescribeShowAndExplain(MetadataTest.resultSetToDataset(session, stmt),
MetadataTest.testDescribeShowAndExplain(SnappyFunSuite.resultSetToDataset(session, stmt),
usingJDBC = true)
MetadataTest.testDSIDWithSYSTables(MetadataTest.resultSetToDataset(session, stmt),
MetadataTest.testDSIDWithSYSTables(SnappyFunSuite.resultSetToDataset(session, stmt),
netServers, locatorId, locatorNetServer, servers, leadId)

stmt.close()
Expand Down Expand Up @@ -417,8 +417,10 @@ trait SplitClusterDUnitTestObject extends Logging {
SnappyContext.getClusterMode(snc.sparkContext) match {
case ThinClientConnectorMode(_, _) =>
// test index create op
snc.createIndex("tableName" + "_index", tableName, Map("COL1" -> None),
Map.empty[String, String])
if ("row".equalsIgnoreCase(tableType)) {
snc.createIndex("tableName" + "_index", tableName, Map("COL1" -> None),
Map.empty[String, String])
}
case _ =>
}

Expand All @@ -427,7 +429,9 @@ trait SplitClusterDUnitTestObject extends Logging {
SnappyContext.getClusterMode(snc.sparkContext) match {
case ThinClientConnectorMode(_, _) =>
// test index drop op
snc.dropIndex("tableName" + "_index", ifExists = false)
if ("row".equalsIgnoreCase(tableType)) {
snc.dropIndex("tableName" + "_index", ifExists = false)
}
case _ =>
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@ class CachedDataFrame(snappySession: SnappySession, queryExecution: QueryExecuti
@transient
private[sql] var currentLiterals: Array[ParamLiteral] = _

@transient
private[sql] var queryShortString: String = _

@transient
private[sql] var queryString: String = _

Expand Down Expand Up @@ -288,7 +291,7 @@ class CachedDataFrame(snappySession: SnappySession, queryExecution: QueryExecuti
try {
didPrepare = prepareForCollect()
val (result, elapsedMillis) = CachedDataFrame.withNewExecutionId(snappySession,
queryString, queryString, currentQueryExecutionString, currentQueryPlanInfo,
queryShortString, queryString, currentQueryExecutionString, currentQueryPlanInfo,
currentExecutionId, planStartTime, planEndTime)(body)
(result, elapsedMillis * 1000000L)
} finally {
Expand Down Expand Up @@ -613,7 +616,7 @@ object CachedDataFrame
else Utils.nextExecutionIdMethod.invoke(SQLExecution).asInstanceOf[Long]
val executionIdStr = java.lang.Long.toString(executionId)
localProperties.setProperty(SQLExecution.EXECUTION_ID_KEY, executionIdStr)
localProperties.setProperty(SparkContext.SPARK_JOB_DESCRIPTION, queryLongForm)
localProperties.setProperty(SparkContext.SPARK_JOB_DESCRIPTION, queryShortForm)
localProperties.setProperty(SparkContext.SPARK_JOB_GROUP_ID, executionIdStr)

val startTime = System.currentTimeMillis()
Expand Down
29 changes: 16 additions & 13 deletions core/src/main/scala/org/apache/spark/sql/SnappySession.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1835,8 +1835,8 @@ object SnappySession extends Logging {
* data to the active executions. SparkListenerSQLPlanExecutionEnd is
* then sent with the accumulated time of both the phases.
*/
private def planExecution(qe: QueryExecution, session: SnappySession, sqlText: String,
executedPlan: SparkPlan, paramLiterals: Array[ParamLiteral], paramsId: Int)
private def planExecution(qe: QueryExecution, session: SnappySession, sqlShortText: String,
sqlText: String, executedPlan: SparkPlan, paramLiterals: Array[ParamLiteral], paramsId: Int)
(f: => RDD[InternalRow]): (RDD[InternalRow], String, SparkPlanInfo,
String, SparkPlanInfo, Long, Long, Long) = {
// Right now the CachedDataFrame is not getting used across SnappySessions
Expand All @@ -1845,7 +1845,7 @@ object SnappySession extends Logging {
val context = session.sparkContext
val localProperties = context.getLocalProperties
localProperties.setProperty(SQLExecution.EXECUTION_ID_KEY, executionIdStr)
localProperties.setProperty(SparkContext.SPARK_JOB_DESCRIPTION, sqlText)
localProperties.setProperty(SparkContext.SPARK_JOB_DESCRIPTION, sqlShortText)
localProperties.setProperty(SparkContext.SPARK_JOB_GROUP_ID, executionIdStr)
val start = System.currentTimeMillis()
try {
Expand All @@ -1870,8 +1870,8 @@ object SnappySession extends Logging {
}
}

private def evaluatePlan(qe: QueryExecution, session: SnappySession, sqlText: String,
paramLiterals: Array[ParamLiteral], paramsId: Int): CachedDataFrame = {
private def evaluatePlan(qe: QueryExecution, session: SnappySession, sqlShortText: String,
sqlText: String, paramLiterals: Array[ParamLiteral], paramsId: Int): CachedDataFrame = {
val (executedPlan, withFallback) = getExecutedPlan(qe.executedPlan)
var planCaching = session.planCaching

Expand Down Expand Up @@ -1906,7 +1906,7 @@ object SnappySession extends Logging {
case _ => true
} else true
// post final execution immediately (collect for these plans will post nothing)
CachedDataFrame.withNewExecutionId(session, sqlText, sqlText, executionStr, planInfo,
CachedDataFrame.withNewExecutionId(session, sqlShortText, sqlText, executionStr, planInfo,
postGUIPlans = postGUIPlans) {
// create new LogicalRDD plan so that plan does not get re-executed
// (e.g. just toRdd is not enough since further operators like show will pass
Expand All @@ -1922,14 +1922,15 @@ object SnappySession extends Logging {

case plan: CollectAggregateExec =>
val (childRDD, origExecutionStr, origPlanInfo, executionStr, planInfo, executionId,
planStartTime, planEndTime) = planExecution(qe, session, sqlText, plan, paramLiterals,
paramsId)(if (withFallback ne null) withFallback.execute(plan.child) else plan.childRDD)
planStartTime, planEndTime) = planExecution(qe, session, sqlShortText, sqlText, plan,
paramLiterals, paramsId)(
if (withFallback ne null) withFallback.execute(plan.child) else plan.childRDD)
(childRDD, qe, origExecutionStr, origPlanInfo, executionStr, planInfo,
childRDD.id, true, executionId, planStartTime, planEndTime)

case plan =>
val (rdd, origExecutionStr, origPlanInfo, executionStr, planInfo, executionId,
planStartTime, planEndTime) = planExecution(qe, session, sqlText, plan,
planStartTime, planEndTime) = planExecution(qe, session, sqlShortText, sqlText, plan,
paramLiterals, paramsId) {
plan match {
case p: CollectLimitExec =>
Expand Down Expand Up @@ -1993,6 +1994,7 @@ object SnappySession extends Logging {

def sqlPlan(session: SnappySession, sqlText: String): CachedDataFrame = {
val parser = session.sessionState.sqlParser
val sqlShortText = CachedDataFrame.queryStringShortForm(sqlText)
val plan = parser.parsePlan(sqlText, clearExecutionData = true)
val planCaching = session.planCaching
val paramLiterals = parser.sqlParser.getAllLiterals
Expand All @@ -2007,7 +2009,7 @@ object SnappySession extends Logging {
session.currentKey = key
try {
val execution = session.executePlan(plan)
cachedDF = evaluatePlan(execution, session, sqlText, paramLiterals, paramsId)
cachedDF = evaluatePlan(execution, session, sqlShortText, sqlText, paramLiterals, paramsId)
// put in cache if the DF has to be cached
if (planCaching && cachedDF.isCached) {
if (isTraceEnabled) {
Expand All @@ -2026,12 +2028,13 @@ object SnappySession extends Logging {
logDebug(s"Using cached plan for: $sqlText (existing: ${cachedDF.queryString})")
cachedDF = cachedDF.duplicate()
}
handleCachedDataFrame(cachedDF, plan, session, sqlText, paramLiterals, paramsId)
handleCachedDataFrame(cachedDF, plan, session, sqlShortText, sqlText, paramLiterals, paramsId)
}

private def handleCachedDataFrame(cachedDF: CachedDataFrame, plan: LogicalPlan,
session: SnappySession, sqlText: String, paramLiterals: Array[ParamLiteral],
paramsId: Int): CachedDataFrame = {
session: SnappySession, sqlShortText: String, sqlText: String,
paramLiterals: Array[ParamLiteral], paramsId: Int): CachedDataFrame = {
cachedDF.queryShortString = sqlShortText
cachedDF.queryString = sqlText
if (cachedDF.isCached && (cachedDF.paramLiterals eq null)) {
cachedDF.paramLiterals = paramLiterals
Expand Down
Loading

0 comments on commit 9299a80

Please sign in to comment.