From f59dd533b8cfa37c453d3f204bc2293c18bd2e13 Mon Sep 17 00:00:00 2001 From: Mahesh Nayak Date: Tue, 18 Aug 2020 09:58:02 +0530 Subject: [PATCH 1/4] new:usr:SDK-415:Adding an spark command exmple --- example/qds_spark_example.py | 115 +++++++++++++++++++++++++++++++++++ 1 file changed, 115 insertions(+) create mode 100644 example/qds_spark_example.py diff --git a/example/qds_spark_example.py b/example/qds_spark_example.py new file mode 100644 index 00000000..30ad13de --- /dev/null +++ b/example/qds_spark_example.py @@ -0,0 +1,115 @@ +""" +This is a sample code used for submitting a Spark script (SparkCommand) and getting the result. +""" + +from qds_sdk.qubole import Qubole +from qds_sdk.commands import SparkCommand +import time + + +def get_results_filename(command_id): + """ + A helper method to generate a file name to write the downloaded result + :param command_id: + :return: + """ + return "/tmp/result_{}.tsv".format(command_id) + + +def get_content(script_file_path): + """ + Helper method to read a script file in the given location. + :param script_file_path: + :return: + """ + with open(script_file_path, 'r') as content_file: + content = content_file.read() + return content + + +def execute_query(cluster_label, cmd_to_run, language, user_program_arguments=None, arguments=None): + """ + Helper method to execute a script + :param cluster_label: + :param cmd_to_run: + :param language: + :param user_program_arguments: + :param arguments: + :return: + """ + if script is None or script == "": + print("script cannot be None or empty") + return None + + if not language: + print("language cannot be None or empty") + return + + if language in ["command_line"]: + # A Shell command needs to be invoked in this fashion + cmd = SparkCommand.create(label=cluster_label, cmdline=cmd_to_run, arguments=arguments, + user_program_arguments=user_program_arguments) + elif language == "sql": + # A SQL command needs to be invoked in this fashion + cmd = SparkCommand.create(label=cluster_label, sql=cmd_to_run, arguments=arguments) + else: + # A python, R or scala command needs to be invoked in this fashion. + cmd = SparkCommand.create(label=cluster_label, program=cmd_to_run, language=language, + arguments=arguments, user_program_arguments=user_program_arguments) + + while not SparkCommand.is_done(cmd.status): + print("Waiting for completion of command : {}".format(cmd.id)) + cmd = SparkCommand.find(cmd.id) + time.sleep(5) + + if SparkCommand.is_success(cmd.status): + print("\nCommand Executed: Completed successfully") + else: + print("\nCommand Executed: Failed!!!. The status returned is: {}".format(cmd.status)) + print(cmd.get_log()) + return cmd + + +def get_results(command): + """ + A helper method to get the results + :param command: + :return: + """ + if command is None: + return None + + results_file_name = get_results_filename(command.id) + fp = open(results_file_name, 'w') + + command.get_results(fp, delim="\n") + print("results are written to {}".format(results_file_name)) + + +if __name__ == '__main__': + # Set the API token. If you are using any other environment other then api.qubole.com then set api_url to that url + # as /api + Qubole.configure(api_token='') + + filename = "" + user_program_arguments = None # arguments for your script + arguments = None # spark configuration for your program for ex : "--conf spark.executor.memory=1024M" + cluster_label = "" # the cluster on which the command will run + + # Running a python command. In case of scala or R get the script content and then set the langauage field to scala + # or R as required + script = get_content(filename) + command = execute_query(cluster_label, script, "python", user_program_arguments=user_program_arguments, + arguments=arguments) + get_results(command) + + # Running a SQL command + script = "show tables" + command = execute_query(cluster_label, script, "sql", arguments=arguments) + get_results(command) + + # Running a shell command + script = "/usr/lib/spark/bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn " \ + "--deploy-mode client /usr/lib/spark/spark-examples* 1" + command = execute_query(cluster_label, script, "command_line", arguments=arguments) + get_results(command) From fb84a9d62bcf3e535f3063bd7462aefed6239abf Mon Sep 17 00:00:00 2001 From: Mahesh Nayak Date: Tue, 18 Aug 2020 16:08:12 +0530 Subject: [PATCH 2/4] Implementing different example files --- ...example.py => qds_spark_python_example.py} | 42 +++----- example/qds_spark_scala_inline_example.py | 98 +++++++++++++++++++ example/qds_spark_shell_command_example.py | 74 ++++++++++++++ example/qds_spark_sql_example.py | 74 ++++++++++++++ 4 files changed, 258 insertions(+), 30 deletions(-) rename example/{qds_spark_example.py => qds_spark_python_example.py} (53%) create mode 100644 example/qds_spark_scala_inline_example.py create mode 100644 example/qds_spark_shell_command_example.py create mode 100644 example/qds_spark_sql_example.py diff --git a/example/qds_spark_example.py b/example/qds_spark_python_example.py similarity index 53% rename from example/qds_spark_example.py rename to example/qds_spark_python_example.py index 30ad13de..773f85eb 100644 --- a/example/qds_spark_example.py +++ b/example/qds_spark_python_example.py @@ -1,5 +1,5 @@ """ -This is a sample code used for submitting a Spark script (SparkCommand) and getting the result. +This is a sample code used for submitting a Spark Python/R/Scala command and getting the result. """ from qds_sdk.qubole import Qubole @@ -27,7 +27,7 @@ def get_content(script_file_path): return content -def execute_query(cluster_label, cmd_to_run, language, user_program_arguments=None, arguments=None): +def execute_script(cluster_label, cmd_to_run, language, user_program_arguments=None, arguments=None): """ Helper method to execute a script :param cluster_label: @@ -45,17 +45,9 @@ def execute_query(cluster_label, cmd_to_run, language, user_program_arguments=No print("language cannot be None or empty") return - if language in ["command_line"]: - # A Shell command needs to be invoked in this fashion - cmd = SparkCommand.create(label=cluster_label, cmdline=cmd_to_run, arguments=arguments, - user_program_arguments=user_program_arguments) - elif language == "sql": - # A SQL command needs to be invoked in this fashion - cmd = SparkCommand.create(label=cluster_label, sql=cmd_to_run, arguments=arguments) - else: - # A python, R or scala command needs to be invoked in this fashion. - cmd = SparkCommand.create(label=cluster_label, program=cmd_to_run, language=language, - arguments=arguments, user_program_arguments=user_program_arguments) + # A python, R or scala command needs to be invoked in this fashion. + cmd = SparkCommand.create(label=cluster_label, program=cmd_to_run, language=language, arguments=arguments, + user_program_arguments=user_program_arguments) while not SparkCommand.is_done(cmd.status): print("Waiting for completion of command : {}".format(cmd.id)) @@ -89,27 +81,17 @@ def get_results(command): if __name__ == '__main__': # Set the API token. If you are using any other environment other then api.qubole.com then set api_url to that url # as /api - Qubole.configure(api_token='') + Qubole.configure(api_token='') filename = "" - user_program_arguments = None # arguments for your script - arguments = None # spark configuration for your program for ex : "--conf spark.executor.memory=1024M" - cluster_label = "" # the cluster on which the command will run + script_language = None # Script language.. Python, R or scala + user_program_arguments = None # arguments for your script + arguments = None # spark configuration for your program for ex : "--conf spark.executor.memory=1024M" + cluster_label = "" # the cluster on which the command will run # Running a python command. In case of scala or R get the script content and then set the langauage field to scala # or R as required script = get_content(filename) - command = execute_query(cluster_label, script, "python", user_program_arguments=user_program_arguments, - arguments=arguments) - get_results(command) - - # Running a SQL command - script = "show tables" - command = execute_query(cluster_label, script, "sql", arguments=arguments) - get_results(command) - - # Running a shell command - script = "/usr/lib/spark/bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn " \ - "--deploy-mode client /usr/lib/spark/spark-examples* 1" - command = execute_query(cluster_label, script, "command_line", arguments=arguments) + command = execute_script(cluster_label, script, script_language, user_program_arguments=user_program_arguments, + arguments=arguments) get_results(command) diff --git a/example/qds_spark_scala_inline_example.py b/example/qds_spark_scala_inline_example.py new file mode 100644 index 00000000..7b0cc8ab --- /dev/null +++ b/example/qds_spark_scala_inline_example.py @@ -0,0 +1,98 @@ +""" +This is a sample code used for submitting a Spark Python/R/Scala command and getting the result. +""" + +from qds_sdk.qubole import Qubole +from qds_sdk.commands import SparkCommand +import time + + +def get_results_filename(command_id): + """ + A helper method to generate a file name to write the downloaded result + :param command_id: + :return: + """ + return "/tmp/result_{}.tsv".format(command_id) + + +def execute_script(cluster_label, cmd_to_run, language, user_program_arguments=None, arguments=None): + """ + Helper method to execute a script + :param cluster_label: + :param cmd_to_run: + :param language: + :param user_program_arguments: + :param arguments: + :return: + """ + if script is None or script == "": + print("script cannot be None or empty") + return None + + if not language: + print("language cannot be None or empty") + return + + # A python, R or scala command needs to be invoked in this fashion. + cmd = SparkCommand.create(label=cluster_label, program=cmd_to_run, language=language, arguments=arguments, + user_program_arguments=user_program_arguments) + + while not SparkCommand.is_done(cmd.status): + print("Waiting for completion of command : {}".format(cmd.id)) + cmd = SparkCommand.find(cmd.id) + time.sleep(5) + + if SparkCommand.is_success(cmd.status): + print("\nCommand Executed: Completed successfully") + else: + print("\nCommand Executed: Failed!!!. The status returned is: {}".format(cmd.status)) + print(cmd.get_log()) + return cmd + + +def get_results(command): + """ + A helper method to get the results + :param command: + :return: + """ + if command is None: + return None + + results_file_name = get_results_filename(command.id) + fp = open(results_file_name, 'w') + + command.get_results(fp, delim="\n") + print("results are written to {}".format(results_file_name)) + + +if __name__ == '__main__': + # Set the API token. If you are using any other environment other then api.qubole.com then set api_url to that url + # as /api + Qubole.configure(api_token='') + + script_language = "scala" # Script language.. Python, R or scala + user_program_arguments = None # arguments for your script + arguments = None # spark configuration for your program for ex : "--conf spark.executor.memory=1024M" + cluster_label = "" # the cluster on which the command will run + + # Running a python command. In case of scala or R get the script content and then set the langauage field to scala + # or R as required + script = """ + import org.apache.spark.sql.SparkSession + + object TestMergeBucketIdScenarios { + def main(args: Array[String]): Unit = { + val spark = SparkSession.builder().appName("Spark Example").getOrCreate() + val sampleData = Seq(("John" ,19), ("Smith", 29),("Adam", 35),("Henry", 50)) + + import spark.implicits._ + val dataFrame = sampleData.toDF("name", "age") + val output = dataFrame.select("name").where("age < 30").collect + output.foreach(println) + } + }""" + command = execute_script(cluster_label, script, script_language, user_program_arguments=user_program_arguments, + arguments=arguments) + get_results(command) diff --git a/example/qds_spark_shell_command_example.py b/example/qds_spark_shell_command_example.py new file mode 100644 index 00000000..9a2983c9 --- /dev/null +++ b/example/qds_spark_shell_command_example.py @@ -0,0 +1,74 @@ +""" +This is a sample code used for submitting a Shell script as a SparkCommand on a spark Cluster and getting the result. +""" + +from qds_sdk.qubole import Qubole +from qds_sdk.commands import SparkCommand +import time + + +def get_results_filename(command_id): + """ + A helper method to generate a file name to write the downloaded result + :param command_id: + :return: + """ + return "/tmp/result_{}.tsv".format(command_id) + + +def execute_spark_shell_command(cluster_label, cmd_to_run): + """ + Helper method to execute a script + :param cluster_label: + :param cmd_to_run: + :return: + """ + if cmd_to_run is None or cmd_to_run == "": + print("command to run cannot be None or empty") + return None + + # A Shell command needs to be invoked in this fashion + cmd = SparkCommand.create(label=cluster_label, cmdline=cmd_to_run) + + while not SparkCommand.is_done(cmd.status): + print("Waiting for completion of command : {}".format(cmd.id)) + cmd = SparkCommand.find(cmd.id) + time.sleep(5) + + if SparkCommand.is_success(cmd.status): + print("\nCommand Executed: Completed successfully") + else: + print("\nCommand Executed: Failed!!!. The status returned is: {}".format(cmd.status)) + print(cmd.get_log()) + return cmd + + +def get_results(command): + """ + A helper method to get the results + :param command: + :return: + """ + if command is None: + return None + + results_file_name = get_results_filename(command.id) + fp = open(results_file_name, 'w') + + command.get_results(fp, delim="\n") + print("results are written to {}".format(results_file_name)) + + +if __name__ == '__main__': + # Set the API token. If you are using any other environment other then api.qubole.com then set api_url to that url + # as /api + Qubole.configure(api_token='') + + arguments = None # spark configuration for your program for ex : "--conf spark.executor.memory=1024M" + cluster_label = "" # the cluster on which the command will run + + # Running a shell command + script = "/usr/lib/spark/bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn " \ + "--deploy-mode client /usr/lib/spark/spark-examples* 1" + command = execute_spark_shell_command(cluster_label, script) + get_results(command) diff --git a/example/qds_spark_sql_example.py b/example/qds_spark_sql_example.py new file mode 100644 index 00000000..e912bcc4 --- /dev/null +++ b/example/qds_spark_sql_example.py @@ -0,0 +1,74 @@ +""" +This is a sample code used for submitting a SQL query as a SparkCommand and getting the result. +""" + +from qds_sdk.qubole import Qubole +from qds_sdk.commands import SparkCommand +import time + + +def get_results_filename(command_id): + """ + A helper method to generate a file name to write the downloaded result + :param command_id: + :return: + """ + return "/tmp/result_{}.tsv".format(command_id) + + +def execute_sql_query(cluster_label, query, arguments=None): + """ + Helper method to execute a script + :param cluster_label: + :param query: + :param arguments: + :return: + """ + if query is None or query == "": + print("query cannot be None or empty") + return None + + # A SQL command needs to be invoked in this fashion + cmd = SparkCommand.create(label=cluster_label, sql=query, arguments=arguments) + + while not SparkCommand.is_done(cmd.status): + print("Waiting for completion of command : {}".format(cmd.id)) + cmd = SparkCommand.find(cmd.id) + time.sleep(5) + + if SparkCommand.is_success(cmd.status): + print("\nCommand Executed: Completed successfully") + else: + print("\nCommand Executed: Failed!!!. The status returned is: {}".format(cmd.status)) + print(cmd.get_log()) + return cmd + + +def get_results(command): + """ + A helper method to get the results + :param command: + :return: + """ + if command is None: + return None + + results_file_name = get_results_filename(command.id) + fp = open(results_file_name, 'w') + + command.get_results(fp, delim="\n") + print("results are written to {}".format(results_file_name)) + + +if __name__ == '__main__': + # Set the API token. If you are using any other environment other then api.qubole.com then set api_url to that url + # as /api + Qubole.configure(api_token='') + + arguments = None # spark configuration for your program for ex : "--conf spark.executor.memory=1024M" + cluster_label = "" # the cluster on which the command will run + + # Running a SQL command + script = "show tables" + command = execute_sql_query(cluster_label, script, arguments=arguments) + get_results(command) From c179c7d6cc1625f328fba9eb90dd64a93c5dce57 Mon Sep 17 00:00:00 2001 From: Mahesh Nayak Date: Sun, 23 Aug 2020 16:16:48 +0530 Subject: [PATCH 3/4] Implementing reviews --- example/qds_spark_python_example.py | 36 ++++++++++++---------- example/qds_spark_scala_inline_example.py | 29 +++++++++-------- example/qds_spark_shell_command_example.py | 22 +++++++------ example/qds_spark_sql_example.py | 20 ++++++------ 4 files changed, 58 insertions(+), 49 deletions(-) diff --git a/example/qds_spark_python_example.py b/example/qds_spark_python_example.py index 773f85eb..32734c92 100644 --- a/example/qds_spark_python_example.py +++ b/example/qds_spark_python_example.py @@ -1,6 +1,9 @@ """ -This is a sample code used for submitting a Spark Python/R/Scala command and getting the result. +The following code example submits a Spark program written in either Scala, Python or R using SparkCommand +command type. +The input program/script is made available using a local file path and the results are written to a temporary file. """ +import logging from qds_sdk.qubole import Qubole from qds_sdk.commands import SparkCommand @@ -38,27 +41,24 @@ def execute_script(cluster_label, cmd_to_run, language, user_program_arguments=N :return: """ if script is None or script == "": - print("script cannot be None or empty") - return None + raise RuntimeError("script to be executed cannot be None or empty") if not language: - print("language cannot be None or empty") - return + raise RuntimeError("Language is a mandatory parameter. Please set the correct language for your script.") # A python, R or scala command needs to be invoked in this fashion. cmd = SparkCommand.create(label=cluster_label, program=cmd_to_run, language=language, arguments=arguments, user_program_arguments=user_program_arguments) while not SparkCommand.is_done(cmd.status): - print("Waiting for completion of command : {}".format(cmd.id)) + logging.info("Waiting for completion of command : {}".format(cmd.id)) cmd = SparkCommand.find(cmd.id) time.sleep(5) if SparkCommand.is_success(cmd.status): - print("\nCommand Executed: Completed successfully") + logging.info("\nCommand Executed: Completed successfully") else: - print("\nCommand Executed: Failed!!!. The status returned is: {}".format(cmd.status)) - print(cmd.get_log()) + raise RuntimeError("Command {} has failed. The following are the command logs {}".format(cmd.id, cmd.get_log())) return cmd @@ -68,14 +68,14 @@ def get_results(command): :param command: :return: """ - if command is None: - return None + if not command: + raise RuntimeError("Command cannot be None. Please provide a valid SparkCommand object") results_file_name = get_results_filename(command.id) fp = open(results_file_name, 'w') command.get_results(fp, delim="\n") - print("results are written to {}".format(results_file_name)) + logging.info("results are written to {}".format(results_file_name)) if __name__ == '__main__': @@ -83,14 +83,16 @@ def get_results(command): # as /api Qubole.configure(api_token='') - filename = "" - script_language = None # Script language.. Python, R or scala + # the following are mandatory parameters while submitting the SparkCommand + cluster_label = "" # the label of the cluster on which the command will run + filename = "" + script_language = "" # Script language.. Python, R or scala + + # the following are optional parameters that can be supplied to a SparCommand user_program_arguments = None # arguments for your script arguments = None # spark configuration for your program for ex : "--conf spark.executor.memory=1024M" - cluster_label = "" # the cluster on which the command will run - # Running a python command. In case of scala or R get the script content and then set the langauage field to scala - # or R as required + # read the content of the script file into a variable. script = get_content(filename) command = execute_script(cluster_label, script, script_language, user_program_arguments=user_program_arguments, arguments=arguments) diff --git a/example/qds_spark_scala_inline_example.py b/example/qds_spark_scala_inline_example.py index 7b0cc8ab..a895946e 100644 --- a/example/qds_spark_scala_inline_example.py +++ b/example/qds_spark_scala_inline_example.py @@ -1,6 +1,9 @@ """ -This is a sample code used for submitting a Spark Python/R/Scala command and getting the result. +The following code example submits a Spark program written in either Scala, Python or R using SparkCommand +command type. +The input program/script is made available inline and the results are written to a temporary file. """ +import logging from qds_sdk.qubole import Qubole from qds_sdk.commands import SparkCommand @@ -27,27 +30,24 @@ def execute_script(cluster_label, cmd_to_run, language, user_program_arguments=N :return: """ if script is None or script == "": - print("script cannot be None or empty") - return None + raise RuntimeError("script cannot be None or empty") if not language: - print("language cannot be None or empty") - return + raise RuntimeError("Language is a mandatory parameter. Please set the correct language for your script.") # A python, R or scala command needs to be invoked in this fashion. cmd = SparkCommand.create(label=cluster_label, program=cmd_to_run, language=language, arguments=arguments, user_program_arguments=user_program_arguments) while not SparkCommand.is_done(cmd.status): - print("Waiting for completion of command : {}".format(cmd.id)) + logging.info("Waiting for completion of command : {}".format(cmd.id)) cmd = SparkCommand.find(cmd.id) time.sleep(5) if SparkCommand.is_success(cmd.status): - print("\nCommand Executed: Completed successfully") + logging.info("\nCommand Executed: Completed successfully") else: - print("\nCommand Executed: Failed!!!. The status returned is: {}".format(cmd.status)) - print(cmd.get_log()) + raise RuntimeError("Command {} has failed. The following are the command logs {}".format(cmd.id, cmd.get_log())) return cmd @@ -57,14 +57,14 @@ def get_results(command): :param command: :return: """ - if command is None: - return None + if not command: + raise RuntimeError("command cannot be None. Please provide a valid SparkCommand object") results_file_name = get_results_filename(command.id) fp = open(results_file_name, 'w') command.get_results(fp, delim="\n") - print("results are written to {}".format(results_file_name)) + logging.info("results are written to {}".format(results_file_name)) if __name__ == '__main__': @@ -72,10 +72,13 @@ def get_results(command): # as /api Qubole.configure(api_token='') + # the following are mandatory parameters while submitting the SparkCommand + cluster_label = "" # the label of the cluster on which the command will run script_language = "scala" # Script language.. Python, R or scala + + # the following are optional parameters that can be supplied to a SparCommand user_program_arguments = None # arguments for your script arguments = None # spark configuration for your program for ex : "--conf spark.executor.memory=1024M" - cluster_label = "" # the cluster on which the command will run # Running a python command. In case of scala or R get the script content and then set the langauage field to scala # or R as required diff --git a/example/qds_spark_shell_command_example.py b/example/qds_spark_shell_command_example.py index 9a2983c9..5999d9ec 100644 --- a/example/qds_spark_shell_command_example.py +++ b/example/qds_spark_shell_command_example.py @@ -1,6 +1,7 @@ """ This is a sample code used for submitting a Shell script as a SparkCommand on a spark Cluster and getting the result. """ +import logging from qds_sdk.qubole import Qubole from qds_sdk.commands import SparkCommand @@ -24,22 +25,20 @@ def execute_spark_shell_command(cluster_label, cmd_to_run): :return: """ if cmd_to_run is None or cmd_to_run == "": - print("command to run cannot be None or empty") - return None + raise RuntimeError("command to be executed cannot be None or empty") # A Shell command needs to be invoked in this fashion cmd = SparkCommand.create(label=cluster_label, cmdline=cmd_to_run) while not SparkCommand.is_done(cmd.status): - print("Waiting for completion of command : {}".format(cmd.id)) + logging.info("Waiting for completion of command : {}".format(cmd.id)) cmd = SparkCommand.find(cmd.id) time.sleep(5) if SparkCommand.is_success(cmd.status): - print("\nCommand Executed: Completed successfully") + logging.info("\nCommand Executed: Completed successfully") else: - print("\nCommand Executed: Failed!!!. The status returned is: {}".format(cmd.status)) - print(cmd.get_log()) + raise RuntimeError("Command {} has failed. The following are the command logs {}".format(cmd.id, cmd.get_log())) return cmd @@ -49,14 +48,14 @@ def get_results(command): :param command: :return: """ - if command is None: - return None + if not command: + raise RuntimeError("command cannot be None. Please provide a valid SparkCommand object") results_file_name = get_results_filename(command.id) fp = open(results_file_name, 'w') command.get_results(fp, delim="\n") - print("results are written to {}".format(results_file_name)) + logging.info("results are written to {}".format(results_file_name)) if __name__ == '__main__': @@ -64,8 +63,11 @@ def get_results(command): # as /api Qubole.configure(api_token='') + # the following are mandatory parameters while submitting the SparkCommand + cluster_label = "" # the label of the cluster on which the command will run + + # the following are optional parameters that can be supplied to a SparCommand arguments = None # spark configuration for your program for ex : "--conf spark.executor.memory=1024M" - cluster_label = "" # the cluster on which the command will run # Running a shell command script = "/usr/lib/spark/bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn " \ diff --git a/example/qds_spark_sql_example.py b/example/qds_spark_sql_example.py index e912bcc4..947c4718 100644 --- a/example/qds_spark_sql_example.py +++ b/example/qds_spark_sql_example.py @@ -1,6 +1,7 @@ """ This is a sample code used for submitting a SQL query as a SparkCommand and getting the result. """ +import logging from qds_sdk.qubole import Qubole from qds_sdk.commands import SparkCommand @@ -25,22 +26,20 @@ def execute_sql_query(cluster_label, query, arguments=None): :return: """ if query is None or query == "": - print("query cannot be None or empty") - return None + raise RuntimeError("Script to be executed cannot be None or empty") # A SQL command needs to be invoked in this fashion cmd = SparkCommand.create(label=cluster_label, sql=query, arguments=arguments) while not SparkCommand.is_done(cmd.status): - print("Waiting for completion of command : {}".format(cmd.id)) + logging.info("Waiting for completion of command : {}".format(cmd.id)) cmd = SparkCommand.find(cmd.id) time.sleep(5) if SparkCommand.is_success(cmd.status): print("\nCommand Executed: Completed successfully") else: - print("\nCommand Executed: Failed!!!. The status returned is: {}".format(cmd.status)) - print(cmd.get_log()) + raise RuntimeError("Command {} has failed. The following are the command logs {}".format(cmd.id, cmd.get_log())) return cmd @@ -50,14 +49,14 @@ def get_results(command): :param command: :return: """ - if command is None: - return None + if not command: + raise RuntimeError("command cannot be None. Please provide a valid SparkCommand object") results_file_name = get_results_filename(command.id) fp = open(results_file_name, 'w') command.get_results(fp, delim="\n") - print("results are written to {}".format(results_file_name)) + logging.info("results are written to {}".format(results_file_name)) if __name__ == '__main__': @@ -65,9 +64,12 @@ def get_results(command): # as /api Qubole.configure(api_token='') - arguments = None # spark configuration for your program for ex : "--conf spark.executor.memory=1024M" + # the following are mandatory parameters while submitting the SparkCommand cluster_label = "" # the cluster on which the command will run + # the following are optional parameters that can be supplied to a SparCommand + arguments = None # spark configuration for your program for ex : "--conf spark.executor.memory=1024M" + # Running a SQL command script = "show tables" command = execute_sql_query(cluster_label, script, arguments=arguments) From b279a1c40a293da29b3d5d7941ff95a0a74b876f Mon Sep 17 00:00:00 2001 From: Mahesh Nayak Date: Tue, 25 Aug 2020 12:46:28 +0530 Subject: [PATCH 4/4] Implementing more reviews --- example/qds_spark_python_example.py | 6 +++--- example/qds_spark_scala_inline_example.py | 6 +++--- example/qds_spark_shell_command_example.py | 4 ++-- example/qds_spark_sql_example.py | 4 ++-- 4 files changed, 10 insertions(+), 10 deletions(-) diff --git a/example/qds_spark_python_example.py b/example/qds_spark_python_example.py index 32734c92..e605efad 100644 --- a/example/qds_spark_python_example.py +++ b/example/qds_spark_python_example.py @@ -80,13 +80,13 @@ def get_results(command): if __name__ == '__main__': # Set the API token. If you are using any other environment other then api.qubole.com then set api_url to that url - # as /api - Qubole.configure(api_token='') + # as http:///api + Qubole.configure(api_token='', api_url="") # the following are mandatory parameters while submitting the SparkCommand cluster_label = "" # the label of the cluster on which the command will run filename = "" - script_language = "" # Script language.. Python, R or scala + script_language = "" # Script language.. python, R or scala # the following are optional parameters that can be supplied to a SparCommand user_program_arguments = None # arguments for your script diff --git a/example/qds_spark_scala_inline_example.py b/example/qds_spark_scala_inline_example.py index a895946e..a3651549 100644 --- a/example/qds_spark_scala_inline_example.py +++ b/example/qds_spark_scala_inline_example.py @@ -69,12 +69,12 @@ def get_results(command): if __name__ == '__main__': # Set the API token. If you are using any other environment other then api.qubole.com then set api_url to that url - # as /api - Qubole.configure(api_token='') + # as http:///api + Qubole.configure(api_token='', api_url="") # the following are mandatory parameters while submitting the SparkCommand cluster_label = "" # the label of the cluster on which the command will run - script_language = "scala" # Script language.. Python, R or scala + script_language = "scala" # Script language.. python, R or scala # the following are optional parameters that can be supplied to a SparCommand user_program_arguments = None # arguments for your script diff --git a/example/qds_spark_shell_command_example.py b/example/qds_spark_shell_command_example.py index 5999d9ec..5691bfcb 100644 --- a/example/qds_spark_shell_command_example.py +++ b/example/qds_spark_shell_command_example.py @@ -60,8 +60,8 @@ def get_results(command): if __name__ == '__main__': # Set the API token. If you are using any other environment other then api.qubole.com then set api_url to that url - # as /api - Qubole.configure(api_token='') + # as http:///api + Qubole.configure(api_token='', api_url="") # the following are mandatory parameters while submitting the SparkCommand cluster_label = "" # the label of the cluster on which the command will run diff --git a/example/qds_spark_sql_example.py b/example/qds_spark_sql_example.py index 947c4718..8653daab 100644 --- a/example/qds_spark_sql_example.py +++ b/example/qds_spark_sql_example.py @@ -61,8 +61,8 @@ def get_results(command): if __name__ == '__main__': # Set the API token. If you are using any other environment other then api.qubole.com then set api_url to that url - # as /api - Qubole.configure(api_token='') + # as http:///api + Qubole.configure(api_token='', api_url="") # the following are mandatory parameters while submitting the SparkCommand cluster_label = "" # the cluster on which the command will run