Skip to content

Commit

Permalink
Merge branch 'master' into spark_2.3_merge
Browse files Browse the repository at this point in the history
Conflicts:
	core/src/main/scala/org/apache/spark/sql/SnappyDDLParser.scala
  • Loading branch information
ymahajan committed May 20, 2018
2 parents d198abe + e27c177 commit 3590c61
Show file tree
Hide file tree
Showing 10 changed files with 326 additions and 21 deletions.
7 changes: 6 additions & 1 deletion cluster/src/main/scala/io/snappydata/ToolsCallbackImpl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package io.snappydata

import java.io.File
import java.lang.reflect.InvocationTargetException
import java.net.URLClassLoader
import java.util.UUID

Expand Down Expand Up @@ -68,7 +69,11 @@ object ToolsCallbackImpl extends ToolsCallback {
})
// Close and reopen interpreter
if (alias != null) {
lead.closeAndReopenInterpreterServer();
try {
lead.closeAndReopenInterpreterServer();
} catch {
case ite: InvocationTargetException => assert(ite.getCause.isInstanceOf[SecurityException])
}
}
}

Expand Down
17 changes: 10 additions & 7 deletions cluster/src/test/scala/org/apache/spark/sql/store/BugTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,20 @@ import java.util.Properties

import com.pivotal.gemfirexd.Attribute
import com.pivotal.gemfirexd.security.{LdapTestServer, SecurityTestUtils}
import io.snappydata.util.TestUtils
import io.snappydata.{Constant, PlanTest, SnappyFunSuite}
import org.scalatest.BeforeAndAfterAll



import org.junit.Assert.{assertEquals, assertFalse, assertTrue}

import org.apache.spark.SparkConf

class BugTest extends SnappyFunSuite with BeforeAndAfterAll {
private val sysUser = "gemfire10"

override def beforeAll(): Unit = {
this.stopAll()
}

protected override def newSparkConf(addOn: (SparkConf) => SparkConf): SparkConf = {
val ldapProperties = SecurityTestUtils.startLdapServerAndGetBootProperties(0, 0, sysUser,
getClass.getResource("/auth.ldif").getPath)
Expand All @@ -56,28 +58,30 @@ class BugTest extends SnappyFunSuite with BeforeAndAfterAll {
}

override def afterAll(): Unit = {
this.stopAll()
val ldapServer = LdapTestServer.getInstance()
if (ldapServer.isServerStarted) {
ldapServer.stopService()
}
import com.pivotal.gemfirexd.Property.{AUTH_LDAP_SERVER, AUTH_LDAP_SEARCH_BASE}
for (k <- List(Attribute.AUTH_PROVIDER, AUTH_LDAP_SERVER, AUTH_LDAP_SEARCH_BASE)) {
System.clearProperty(k)
System.clearProperty("gemfirexd." + k)
System.clearProperty(Constant.STORE_PROPERTY_PREFIX + k)
}
System.clearProperty(Constant.STORE_PROPERTY_PREFIX + Attribute.USERNAME_ATTR)
System.clearProperty(Constant.STORE_PROPERTY_PREFIX + Attribute.PASSWORD_ATTR)
super.afterAll()
System.setProperty("gemfirexd.authentication.required", "false")
}

test("Bug SNAP-2255 connection pool exhaustion ") {
test("Bug SNAP-2255 connection pool exhaustion") {
val user1 = "gemfire1"
val user2 = "gemfire2"

val snc1 = snc.newSession()
snc1.snappySession.conf.set(Attribute.USERNAME_ATTR, user1)
snc1.snappySession.conf.set(Attribute.PASSWORD_ATTR, user1)


snc1.sql(s"create table test (id integer," +
s" name STRING) using column")
snc1.sql("insert into test values (1, 'name1')")
Expand All @@ -95,6 +99,5 @@ class BugTest extends SnappyFunSuite with BeforeAndAfterAll {
val rs = snc2.sql(s"select * from $user1.test").collect()
assertEquals(1, rs.length)
}

}
}
4 changes: 2 additions & 2 deletions core/src/main/scala/io/snappydata/util/ServiceUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,8 @@ object ServiceUtils {
}
}

def invokeStopFabricServer(sc: SparkContext): Unit = {
ServerManager.getServerInstance.stop(null)
def invokeStopFabricServer(sc: SparkContext, shutDownCreds: Properties = null): Unit = {
ServerManager.getServerInstance.stop(shutDownCreds)
}

def getAllLocators(sc: SparkContext): scala.collection.Map[DistributedMember, String] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ object SnappyParserConsts {
final val exponent: CharPredicate = CharPredicate('e', 'E')
final val numeric: CharPredicate = CharPredicate.Digit ++
CharPredicate('.')
final val numericSuffix: CharPredicate = CharPredicate('D', 'd', 'F', 'f', 'L', 'l')
final val numericSuffix: CharPredicate = CharPredicate('D', 'd', 'F', 'f', 'L', 'l', 'B', 'b')
final val plural: CharPredicate = CharPredicate('s', 'S')

final val reservedKeywords: OpenHashSet[String] = new OpenHashSet[String]
Expand Down
12 changes: 11 additions & 1 deletion core/src/main/scala/org/apache/spark/sql/SnappyContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import scala.language.implicitConversions
import scala.reflect.runtime.universe.TypeTag
import com.gemstone.gemfire.distributed.internal.MembershipListener
import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember
import com.pivotal.gemfirexd.Attribute
import com.pivotal.gemfirexd.internal.engine.Misc
import com.pivotal.gemfirexd.internal.shared.common.SharedUtils
import io.snappydata.util.ServiceUtils
Expand Down Expand Up @@ -1148,7 +1149,16 @@ object SnappyContext extends Logging {
// clear current hive catalog connection
Hive.closeCurrent()
if (ExternalStoreUtils.isLocalMode(sc)) {
ServiceUtils.invokeStopFabricServer(sc)
val props = sc.conf.getOption(Constant.STORE_PROPERTY_PREFIX +
Attribute.USERNAME_ATTR) match {
case Some(user) => val prps = new java.util.Properties();
val pass = sc.conf.get(Constant.STORE_PROPERTY_PREFIX + Attribute.PASSWORD_ATTR, "")
prps.put(com.pivotal.gemfirexd.Attribute.USERNAME_ATTR, user)
prps.put(com.pivotal.gemfirexd.Attribute.PASSWORD_ATTR, pass)
prps
case None => null
}
ServiceUtils.invokeStopFabricServer(sc, props)
}

// clear static objects on the driver
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -764,7 +764,6 @@ case class DeployCommand(
override def run(sparkSession: SparkSession): Seq[Row] = {
val jarsstr = SparkSubmitUtils.resolveMavenCoordinates(coordinates,
SparkSubmitUtils.buildIvySettings(repos, jarCache))
logInfo(s"KN: jarstr ${jarsstr}")
if (jarsstr.nonEmpty) {
val jars = jarsstr.split(",")
val sc = sparkSession.sparkContext
Expand Down
30 changes: 24 additions & 6 deletions core/src/main/scala/org/apache/spark/sql/SnappyParser.scala
Original file line number Diff line number Diff line change
Expand Up @@ -104,16 +104,33 @@ class SnappyParser(session: SnappySession)
var index = 0
val len = s.length
// use double if ending with D/d, float for F/f and long for L/l

s.charAt(len - 1) match {
case 'D' | 'd' =>
return newTokenizedLiteral(
java.lang.Double.parseDouble(s.substring(0, len - 1)), DoubleType)
case 'F' | 'f' =>
if (s.length > 2) {
s.charAt(len - 2) match {
case 'B' | 'b' => return toDecimalLiteral(s.substring(0, len - 2),
checkExactNumeric = false)
case c if (Character.isDigit(c)) => return newTokenizedLiteral(
java.lang.Double.parseDouble(s.substring(0, len - 1)), DoubleType)
case _ => throw new ParseException(s"Found non numeric token $s")
}
} else {
return newTokenizedLiteral(
java.lang.Double.parseDouble(s.substring(0, len - 1)), DoubleType)
}
case 'F' | 'f' => if (Character.isDigit(s.charAt(len - 2))) {
return newTokenizedLiteral(
java.lang.Float.parseFloat(s.substring(0, len - 1)), FloatType)
case 'L' | 'l' =>
} else {
throw new ParseException(s"Found non numeric token $s")
}
case 'L' | 'l' => if (Character.isDigit(s.charAt(len - 2))) {
return newTokenizedLiteral(
java.lang.Long.parseLong(s.substring(0, len - 1)), LongType)
} else {
throw new ParseException(s"Found non numeric token $s")
}
case _ =>
}
while (index < len) {
Expand Down Expand Up @@ -144,6 +161,7 @@ class SnappyParser(session: SnappySession)
} else {
toDecimalLiteral(s, checkExactNumeric = false)
}

}

private final def updatePerTableQueryHint(tableIdent: TableIdentifier,
Expand Down Expand Up @@ -176,8 +194,8 @@ class SnappyParser(session: SnappySession)

protected final def numericLiteral: Rule1[Expression] = rule {
capture(plusOrMinus.? ~ Consts.numeric. + ~ (Consts.exponent ~
plusOrMinus.? ~ CharPredicate.Digit. +).? ~ Consts.numericSuffix.?) ~
delimiter ~> ((s: String) => toNumericLiteral(s))
plusOrMinus.? ~ CharPredicate.Digit. +).? ~ Consts.numericSuffix.? ~
Consts.numericSuffix.?) ~ delimiter ~> ((s: String) => toNumericLiteral(s))
}

protected final def literal: Rule1[Expression] = rule {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,51 @@ class ColumnTableTest
count
}

test("Test Bug SNAP-2308 view creation fails if query contains decimal numbers not suffixed") {
val tableName = "TEST_COLUMN"
val viewName = "TEST_VIEW"
snc.sql(s"CREATE TABLE $tableName (Col1 String, Col2 String) " +
s" USING column " + options)

val data = Seq(("1.1", "2.2"), ("1", "2"), ("3.57", "3"), ("4.3", "4"), ("5.341", "5"))
val rdd = sc.parallelize(data)
val dataDF = snc.createDataFrame(rdd)
dataDF.write.insertInto(tableName)

var query = s"SELECT sum(Col1) as summ FROM $tableName where col1 > .0001 having summ > .001"
snc.sql(query).collect

snc.sql(s"create or replace view $viewName as ($query)")

query = s"SELECT sum(Col1) as summ FROM $tableName where col1 > .0001BD having summ > .001bD"
snc.sql(query)collect

snc.sql(s"create or replace view $viewName as ($query)")

query = s"SELECT sum(Col1) as summ FROM $tableName having summ > .001f"
snc.sql(query).collect

snc.sql(s"create or replace view $viewName as ($query)")

query = s"SELECT sum(Col1) as summ FROM $tableName having summ > .001d"
snc.sql(query).collect

snc.sql(s"create or replace view $viewName as ($query)")

query = s"SELECT sum(Col1) as summ FROM $tableName having summ > .004ld"
var expectedException = intercept[Exception]{ snc.sql(query).collect }
assert(expectedException.isInstanceOf[ParseException])

query = s"SELECT sum(Col1) as summ FROM $tableName having summ > 4bl"
expectedException = intercept[Exception]{ snc.sql(query).collect }
assert(expectedException.isInstanceOf[ParseException])

snc.sql(s"drop view $viewName")
snc.sql(s"drop table $tableName")

logInfo("Successful")
}

test("More columns -- SNAP-1345") {
snc.sql(s"Create Table coltab (a INT) " +
"using column options()")
Expand All @@ -146,7 +191,6 @@ class ColumnTableTest
} catch {
case ex: SQLException => assert("42802".equals(ex.getSQLState))
}

snc.sql("drop table coltab")
}

Expand Down
Loading

0 comments on commit 3590c61

Please sign in to comment.