Skip to content

Commit

Permalink
Merge branch 'master' into SNAP-2158
Browse files Browse the repository at this point in the history
  • Loading branch information
rmishra committed Feb 7, 2018
2 parents 8310890 + 28b43fe commit ad8ff3a
Show file tree
Hide file tree
Showing 157 changed files with 4,090 additions and 646 deletions.
19 changes: 10 additions & 9 deletions cluster/sbin/snappy-nodes.sh
Original file line number Diff line number Diff line change
Expand Up @@ -223,14 +223,6 @@ function execute() {
2>&1 | sed "s/^/$host: /") &
LAST_PID="$!"
fi
if [ -z "$RUN_IN_BACKGROUND" ]; then
wait $LAST_PID
else
sleep 1
if [ -e "/proc/$LAST_PID/status" ]; then
sleep 1
fi
fi
else
if [ "$dirfolder" != "" ]; then
# Create the directory for the snappy component if the folder is a default folder
Expand All @@ -239,7 +231,16 @@ function execute() {
fi
fi
launchcommand="${@// /\\ } ${args} < /dev/null 2>&1"
eval $launchcommand
eval $launchcommand &
LAST_PID="$!"
fi
if [ -z "$RUN_IN_BACKGROUND" ]; then
wait $LAST_PID
else
sleep 1
if [ -e "/proc/$LAST_PID/status" ]; then
sleep 1
fi
fi
df=${dirfolder}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ abstract class ClusterManagerTestBase(s: String)
sysProps.setProperty("p2p.minJoinTries", "1")

// spark memory fill to detect any uninitialized memory accesses
// sysProps.setProperty("spark.memory.debugFill", "true")
sysProps.setProperty("spark.memory.debugFill", "true")

var host: Host = _
var vm0: VM = _
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,14 @@ import java.io.{FileOutputStream, PrintWriter}

import com.pivotal.gemfirexd.Attribute
import com.typesafe.config.{Config, ConfigException}
import io.snappydata.Constant
import io.snappydata.{Constant, ServiceManager}
import io.snappydata.impl.LeadImpl
import org.apache.spark.SparkCallbacks
import org.apache.spark.sql.types.{DecimalType, IntegerType, StructField, StructType}
import org.apache.spark.sql._
import org.apache.spark.sql.streaming.SnappyStreamingJob
import org.apache.spark.streaming.SnappyStreamingContext
import org.apache.spark.ui.SnappyBasicAuthenticator

// scalastyle:off println
class SnappySecureJob extends SnappySQLJob {
Expand Down Expand Up @@ -56,6 +59,12 @@ class SnappySecureJob extends SnappySQLJob {
} else {
accessAndModifyTablesOwnedByOthers(snSession, jobConfig)
}
// Confirm that our zeppelin interpreter is not initialized.
assert(ServiceManager.getLeadInstance.asInstanceOf[LeadImpl].getInterpreterServerClass() ==
null, "Zeppelin interpreter must not be initialized in secure cluster")
// Check SnappyData Pulse UI is secured by our custom authenticator.
assert(SparkCallbacks.getAuthenticatorForJettyServer().get
.isInstanceOf[SnappyBasicAuthenticator], "SnappyData Pulse UI not secured")
pw.println(msg)
} finally {
pw.close()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ class SplitSnappyClusterDUnitTest(s: String)
ColumnUpdateDeleteTests.testSNAP1925(session)
ColumnUpdateDeleteTests.testSNAP1926(session)
ColumnUpdateDeleteTests.testConcurrentOps(session)
ColumnUpdateDeleteTests.testSNAP2124(session, checkPruning = true)
} finally {
StoreUtils.TEST_RANDOM_BUCKETID_ASSIGNMENT = false
}
Expand Down
25 changes: 16 additions & 9 deletions cluster/src/main/scala/io/snappydata/impl/LeadImpl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ class LeadImpl extends ServerImpl with Lead

isTestSetup = bootProperties.getProperty("isTest", "false").toBoolean
bootProperties.remove("isTest")
val authSpecified = Misc.checkAuthProvider(bootProperties)

// prefix all store properties with "snappydata.store" for SparkConf

Expand Down Expand Up @@ -162,7 +163,7 @@ class LeadImpl extends ServerImpl with Lead

val zeppelinEnabled = bootProperties.getProperty(
Constant.ENABLE_ZEPPELIN_INTERPRETER, "false").equalsIgnoreCase("true")
if (zeppelinEnabled) {
if (zeppelinEnabled && !authSpecified) {
try {

val zeppelinIntpUtilClass = Utils.classForName(
Expand Down Expand Up @@ -190,7 +191,7 @@ class LeadImpl extends ServerImpl with Lead

// The auth service is not yet initialized at this point.
// So simply check the auth-provider property value.
if (Misc.checkAuthProvider(bootProperties)) {
if (authSpecified) {
logInfo("Enabling user authentication for SnappyData Pulse")
SparkCallbacks.setAuthenticatorForJettyServer()
}
Expand Down Expand Up @@ -234,7 +235,7 @@ class LeadImpl extends ServerImpl with Lead
}

// wait for a while until servers get registered
val endWait = System.currentTimeMillis() + 10000
val endWait = System.currentTimeMillis() + 120000
while (!SnappyContext.hasServerBlockIds && System.currentTimeMillis() <= endWait) {
Thread.sleep(100)
}
Expand All @@ -258,8 +259,10 @@ class LeadImpl extends ServerImpl with Lead
// start other add-on services (job server)
startAddOnServices(conf, confFile, jobServerConfig)

// finally start embedded zeppelin interpreter if configured
checkAndStartZeppelinInterpreter(zeppelinEnabled, bootProperties)
// finally start embedded zeppelin interpreter if configured and security is not enabled.
if (!authSpecified) {
checkAndStartZeppelinInterpreter(zeppelinEnabled, bootProperties)
}

if (jobServerWait) {
// mark RUNNING after job server and zeppelin initialization if so configured
Expand Down Expand Up @@ -620,13 +623,13 @@ class LeadImpl extends ServerImpl with Lead

/**
* This method is used to start the zeppelin interpreter thread.
* As discussed by default zeppelin interpreter will be enabled.User can disable it by
* setting "zeppelin.interpreter.enable" to false in leads conf file.User can also specify
* the port on which intrepreter should listen using property zeppelin.interpreter.port
* By default, zeppelin interpreter will be disabled. User can enable it by
* setting "zeppelin.interpreter.enable" to true in leads conf file. User can also specify
* the port on which interpreter should listen using property "zeppelin.interpreter.port"
*/
private def checkAndStartZeppelinInterpreter(enabled: Boolean,
bootProperties: Properties): Unit = {
// As discussed ZeppelinRemoteInterpreter Server will be enabled by default.
// As discussed ZeppelinRemoteInterpreter Server will be disabled by default.
// [sumedh] Our startup times are already very high and we are looking to
// cut that down and not increase further with these external utilities.
if (enabled) {
Expand Down Expand Up @@ -654,6 +657,10 @@ class LeadImpl extends ServerImpl with Lead

}
}

def getInterpreterServerClass(): Class[_] = {
remoteInterpreterServerClass
}
}

object LeadImpl {
Expand Down
10 changes: 8 additions & 2 deletions cluster/src/main/scala/org/apache/spark/SparkCallbacks.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@
package org.apache.spark

import org.apache.spark

import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.memory.StoreUnifiedManager
import org.apache.spark.rpc.RpcEnv
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.{RetrieveSparkAppConfig, SparkAppConfig}
import org.apache.spark.ui.{JettyUtils, SnappyBasicAuthenticator}
import org.eclipse.jetty.security.authentication.BasicAuthenticator

/**
* Calls that are needed to be sent to snappy-cluster classes because
Expand All @@ -39,8 +39,10 @@ object SparkCallbacks {
ioEncryptionKey: Option[Array[Byte]],
isLocal: Boolean): SparkEnv = {

SparkEnv.createExecutorEnv(driverConf, executorId, hostname,
val env = SparkEnv.createExecutorEnv(driverConf, executorId, hostname,
port, numCores, ioEncryptionKey, isLocal)
env.memoryManager.asInstanceOf[StoreUnifiedManager].init()
env
}

def getRpcEnv(sparkEnv: SparkEnv): RpcEnv = {
Expand Down Expand Up @@ -92,6 +94,10 @@ object SparkCallbacks {
}
}

def getAuthenticatorForJettyServer(): Option[BasicAuthenticator] = {
JettyUtils.customAuthenticator
}

def setSparkConf(sc: SparkContext, key: String, value: String): Unit = {
if (value ne null) sc.conf.set(key, value) else sc.conf.remove(key)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -785,6 +785,10 @@ class SnappyUnifiedMemoryManager private[memory](
wrapperStats.setMemoryManagerStats(stats)
}

/**
* Initializes the memoryManager
*/
override def init(): Unit = memoryForObject
}

object SnappyUnifiedMemoryManager extends Logging {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ class SnappyEmbeddedModeClusterManager extends ExternalClusterManager {
(split(0).trim, split(1).trim)
}
else if (locator.isEmpty ||
locator == "" ||
locator == "null" ||
!ServiceUtils.LOCATOR_URL_PATTERN.matcher(locator).matches()
) {
Expand Down
14 changes: 7 additions & 7 deletions cluster/src/test/scala/io/snappydata/QueryTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package io.snappydata

import scala.collection.JavaConverters._

import org.apache.spark.sql.QueryTest.checkAnswer
import org.apache.spark.sql.execution.benchmark.ColumnCacheBenchmark
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.{AnalysisException, Row, SnappyContext, SnappySession, SparkSession}
Expand Down Expand Up @@ -271,10 +270,11 @@ class QueryTest extends SnappyFunSuite {
"select count(*), city from $t group by city",
"select count(*), city from $t where country like 'country_1%' group by city",
"select count(*), city, collect_list(airport_id), collect_list(name), " +
"collect_list(country) from (select * from $t order by airport_id) as t group by city",
"collect_list(country) from (select * from $t order by airport_id, name, country) " +
"as t group by city order by city",
"select count(*), city, collect_list(airport_id), collect_list(name), " +
"collect_list(country) from (select * from $t where country like 'country_1%' " +
" order by airport_id) as t group by city"
" order by airport_id, name, country) as t group by city order by city"
)

// To validate the results against queries directly on data disabling snappy aggregation.
Expand All @@ -287,8 +287,8 @@ class QueryTest extends SnappyFunSuite {
}

for (((r1, r2), e) <- results.zip(expectedResults)) {
org.apache.spark.sql.QueryTest.checkAnswer(r1, e)
org.apache.spark.sql.QueryTest.checkAnswer(r2, e)
checkAnswer(r1, e)
checkAnswer(r2, e)
}

// fire updates and check again
Expand All @@ -302,8 +302,8 @@ class QueryTest extends SnappyFunSuite {
}

for (((r1, r2), e) <- results.zip(expectedResults)) {
org.apache.spark.sql.QueryTest.checkAnswer(r1, e)
org.apache.spark.sql.QueryTest.checkAnswer(r2, e)
checkAnswer(r1, e)
checkAnswer(r2, e)
}
}
}
127 changes: 127 additions & 0 deletions cluster/src/test/scala/io/snappydata/benchmark/snappy/TPCDSSuite.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/*
* Copyright (c) 2017 SnappyData, Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License. You
* may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License. See accompanying
* LICENSE file.
*/

package io.snappydata.benchmark.snappy

import java.io.{File, FileOutputStream, PrintStream}

import io.snappydata.SnappyFunSuite
import org.apache.spark.sql.execution.benchmark.TPCDSQuerySnappyBenchmark
import org.apache.spark.sql.{SnappySession, SparkSession}
import org.apache.spark.{SparkConf, SparkContext}
import org.scalatest.BeforeAndAfterAll


class TPCDSSuite extends SnappyFunSuite
with BeforeAndAfterAll {

var tpcdsQueries = Seq[String]()


val conf =
new SparkConf()
.setMaster("local[*]")
.setAppName("test-sql-context")
.set("spark.driver.allowMultipleContexts", "true")
.set("spark.sql.shuffle.partitions", "4")
.set("spark.driver.memory", "1g")
.set("spark.executor.memory", "1g")
.set("spark.sql.autoBroadcastJoinThreshold", (20 * 1024 * 1024).toString)

override def beforeAll(): Unit = {
super.beforeAll()
tpcdsQueries = Seq(
"q1", "q2", "q3", "q4", "q5", "q6", "q7", "q8", "q9", "q10", "q11",
"q12", "q13", "q14a", "q14b", "q15", "q16", "q17", "q18", "q19", "q20",
"q21", "q22", "q23a", "q23b", "q24a", "q24b", "q25", "q26", "q27", "q28", "q29", "q30",
"q31", "q32", "q33", "q34", "q35", "q36", "q37", "q38", "q39a", "q39b", "q40",
"q41", "q42", "q43", "q44", "q45", "q46", "q47", "q48", "q49", "q50",
"q51", "q52", "q53", "q54", "q55", "q56", "q57", "q58", "q59", "q60",
"q61", "q62", "q63", "q64", "q65", "q66", "q67", "q68", "q69", "q70",
"q71", "q72", "q73", "q74", "q75", "q76", "q77", "q78", "q79", "q80",
"q81", "q82", "q83", "q84", "q85", "q86", "q87", "q88", "q89", "q90",
"q91", "q92", "q93", "q94", "q95", "q96", "q97", "q98", "q99")
}

// Disabling the test run from precheckin as it takes around an hour.
// TODO : Add TPCDS tests to be run as a part of smokePerf bt which will run on a dedicated
// machine.

ignore("Test with Snappy") {
val sc = new SparkContext(conf)
TPCDSQuerySnappyBenchmark.snappy = new SnappySession(sc)
val dataLocation = "/export/shared/QA_DATA/TPCDS/data"
val snappyHome = System.getenv("SNAPPY_HOME")
val snappyRepo = s"$snappyHome/../../.."

TPCDSQuerySnappyBenchmark.execute(dataLocation,
queries = tpcdsQueries, true, s"$snappyRepo/spark/sql/core/src/test/resources/tpcds")
}

// Disabling the test run from precheckin as it takes around an hour.
// TODO : Add TPCDS tests to be run as a part of smokePerf bt which will run on a dedicated
// machine.

ignore("Test with Spark") {
TPCDSQuerySnappyBenchmark.spark = SparkSession.builder.config(conf).getOrCreate()
val dataLocation = "/export/shared/QA_DATA/TPCDS/data"
val snappyHome = System.getenv("SNAPPY_HOME")
val snappyRepo = s"$snappyHome/../../..";

TPCDSQuerySnappyBenchmark.execute(dataLocation,
queries = tpcdsQueries, false, s"$snappyRepo/spark/sql/core/src/test/resources/tpcds")

}

// Disabling the validation for now as this requires the expected result files to be created
// using stock spark before hand.

ignore("Validate Results") {

for (query <- tpcdsQueries) {

val actualResultsAvailableAt = "path for actual result"
val expectedResultsAvailableAt = "path for expected result"

val resultFileStream: FileOutputStream = new FileOutputStream(new File("Comparison.out"))
val resultOutputStream: PrintStream = new PrintStream(resultFileStream)

val expectedFile = sc.textFile(s"file://$expectedResultsAvailableAt/Spark_$query.out")
val actualFile = sc.textFile(s"file://$actualResultsAvailableAt/Snappy_$query.out")

val expectedLineSet = expectedFile.collect().toList.sorted
val actualLineSet = actualFile.collect().toList.sorted

if (!actualLineSet.equals(expectedLineSet)) {
if (!(expectedLineSet.size == actualLineSet.size)) {
resultOutputStream.println(s"For $query " +
s"result count mismatched observed with " +
s"expected ${expectedLineSet.size} and actual ${actualLineSet.size}")
} else {
for ((expectedLine, actualLine) <- expectedLineSet zip actualLineSet) {
if (!expectedLine.equals(actualLine)) {
resultOutputStream.println(s"For $query result mismatched observed")
resultOutputStream.println(s"Excpected : $expectedLine")
resultOutputStream.println(s"Found : $actualLine")
resultOutputStream.println(s"-------------------------------------")
}
}
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,10 @@ public static void main(String[] args) {
//treat first value are base and divide subsequent values with this base value and plot values

Path p = Paths.get(args[0]);
final int maxDepth = 4;
final int maxDepth = 5;
List<String> errorList = new ArrayList<String>();
try {
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH-mm-ss") ;
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH-mm-ss") ;
FileOutputStream reportOutputStream = new FileOutputStream(new File(p.toString(), "ComparisonReport_"+dateFormat.format(new Date())+".txt"));
PrintStream reportPrintStream = new PrintStream(reportOutputStream);

Expand Down
Loading

0 comments on commit ad8ff3a

Please sign in to comment.