diff --git a/components/azure-event-hubs/report-throughput.sh b/components/azure-event-hubs/report-throughput.sh index a92a7c94..2c90c46d 100755 --- a/components/azure-event-hubs/report-throughput.sh +++ b/components/azure-event-hubs/report-throughput.sh @@ -6,20 +6,34 @@ set -euo pipefail REPORT_THROUGHPUT_MINUTES=${REPORT_THROUGHPUT_MINUTES:-30} ofs=2 -eh_resource=$(az eventhubs namespace show -g $RESOURCE_GROUP -n "$EVENTHUB_NAMESPACE" --query id -o tsv) +eh_resources=$(az eventhubs namespace show -g $RESOURCE_GROUP -n "$EVENTHUB_NAMESPACE" --query id -o tsv) +if [ -n "${EVENTHUB_NAMESPACE_OUT:-}" ]; then + eh_resources="$eh_resources $(az eventhubs namespace show -g $RESOURCE_GROUP -n "$EVENTHUB_NAMESPACE_OUT" --query id -o tsv)" +fi eh_capacity=$(az eventhubs namespace show -g $RESOURCE_GROUP -n "$EVENTHUB_NAMESPACE" --query sku.capacity -o tsv) metric_names="IncomingMessages IncomingBytes OutgoingMessages OutgoingBytes ThrottledRequests" -fmt="%28s%20s%20s%20s%20s%20s\n" +fmt="%28s%12s%20s%20s%20s%20s%20s\n" echo "Event Hub capacity: $eh_capacity throughput units (this determines MAX VALUE below)." echo "Reporting aggregate metrics per minute, offset by $ofs minutes, for $REPORT_THROUGHPUT_MINUTES minutes." -printf "$fmt" "" $metric_names +printf "$fmt" "" "Event Hub #" $metric_names PER_MIN=60 MB=1000000 -printf "$fmt" "" $(tr -C " " "-" <<<$metric_names) -printf "$fmt" "MAX VALUE" "$((eh_capacity*1000*PER_MIN))" "$((eh_capacity*1*MB*PER_MIN))" "$((eh_capacity*4096*PER_MIN))" "$((eh_capacity*2*MB*PER_MIN))" "-" -printf "$fmt" "" $(tr -C " " "-" <<<$metric_names) +printf "$fmt" "" "-----------" $(tr -C " " "-" <<<$metric_names) +printf "$fmt" "MAX VALUE" "" "$((eh_capacity*1000*PER_MIN))" "$((eh_capacity*1*MB*PER_MIN))" "$((eh_capacity*4096*PER_MIN))" "$((eh_capacity*2*MB*PER_MIN))" "-" +printf "$fmt" "" "-----------" $(tr -C " " "-" <<<$metric_names) + for i in $(seq 1 $REPORT_THROUGHPUT_MINUTES) ; do - printf "$fmt" "$(date +%Y-%m-%dT%H:%M:%S%z)" $(az monitor metrics list --resource "$eh_resource" --interval PT1M --metrics $(tr " " "," <<< $metric_names) --offset ${ofs}M --query 'value[].timeseries[0].data[0].floor(total)' -o tsv) + eh_number=0 + date=$(date +%Y-%m-%dT%H:%M:%S%z) + for eh in $eh_resources; do + eh_number=$((eh_number+1)) + metrics=$( + az monitor metrics list --resource "$eh" --interval PT1M \ + --metrics $(tr " " "," <<< $metric_names) --offset ${ofs}M \ + --query 'value[].timeseries[0].data[0].floor(total)' -o tsv + ) + printf "$fmt" "$date" "$eh_number" $metrics + done # sleep until next full minute. "10#" is to force base 10 if string is e.g. "09" sleep "$((60 - 10#$(date +%S) ))" diff --git a/eventhubs-streamanalytics-eventhubs/README.md b/eventhubs-streamanalytics-eventhubs/README.md index 6020d072..18599f0a 100644 --- a/eventhubs-streamanalytics-eventhubs/README.md +++ b/eventhubs-streamanalytics-eventhubs/README.md @@ -130,6 +130,37 @@ The above settings has been chosen to sustain a 1000 msg/sec stream. Please use Metrics pane in Stream Analytics, see "Input/Output Events" for throughput and "Watermark Delay" metric to see if the job is keeping up with the input rate. You can also use Event Hub "Metrics" pane to see if there are any "Throttled Requests" and adjust the Threshold Units accordingly. "Watermark Delay" is one of the key metric that will help you to understand if Stream Analytics is keeping up with the incoming data. If delay is constantly increasing, you need to take a look at the destination to see if it can keep up with the speed or check if you need to increase SU: https://azure.microsoft.com/en-us/blog/new-metric-in-azure-stream-analytics-tracks-latency-of-your-streaming-pipeline/. + +The deployment script will also report performance, by default every minute for 30 minutes: + +``` +***** [M] Starting METRICS reporting +Event Hub capacity: 2 throughput units (this determines MAX VALUE below). +Reporting aggregate metrics per minute, offset by 2 minutes, for 30 minutes. + Event Hub # IncomingMessages IncomingBytes OutgoingMessages OutgoingBytes ThrottledRequests + ----------- ---------------- ------------- ---------------- ------------- ----------------- + MAX VALUE 120000 120000000 491520 240000000 - + ----------- ---------------- ------------- ---------------- ------------- ----------------- + 2019-10-03T07:57:00 1 0 0 0 0 0 + 2019-10-03T07:57:00 2 0 0 0 0 0 + 2019-10-03T07:58:00 1 24050 22809797 24050 22809797 0 + 2019-10-03T07:58:00 2 0 0 0 0 0 + 2019-10-03T07:59:01 1 60037 56940526 60037 56940526 0 + 2019-10-03T07:59:01 2 341 62393762 0 0 0 + 2019-10-03T08:00:00 1 60090 56989878 60090 56989878 0 + 2019-10-03T08:00:00 2 375 65683281 0 0 0 + 2019-10-03T08:01:00 1 60036 56940643 60036 56940643 0 + 2019-10-03T08:01:00 2 376 65708824 0 0 0 +``` + +In column "Event Hub #", 1 refers to the Event Hub used as input to Stream +Analytics, and 2 to the Event Hub used as output. After a few minutes of +ramp-up, the metrics for Event Hub 1 will show around 60k events/min +(depending on selected event rate, here 1k events/s). As Stream Analytics +batches up messages when outputting to Event Hubs, the rate in events/minute +on Event Hub 2 will be much lower, but you can see from the Incoming Bytes +metric that the data rate on both event hubs is similar. + ## Stream Analytics Note that the solution configurations have been verified with compatibility level 1.2. The deployed Stream Analytics solution doesn't do any analytics or projection, but it just inject an additional field using a simple Javascript UDF: diff --git a/eventhubs-streamanalytics-eventhubs/stream-analytics-job-simple-arm-template.json b/eventhubs-streamanalytics-eventhubs/stream-analytics-job-simple-arm-template.json index c14684e8..c7f69025 100644 --- a/eventhubs-streamanalytics-eventhubs/stream-analytics-job-simple-arm-template.json +++ b/eventhubs-streamanalytics-eventhubs/stream-analytics-job-simple-arm-template.json @@ -114,7 +114,8 @@ "serialization": { "type": "JSON", "properties": { - "encoding": "UTF8" + "encoding": "UTF8", + "format": "Array" } }, "datasource": { diff --git a/eventhubs-streamanalytics-eventhubs/test_spec.json b/eventhubs-streamanalytics-eventhubs/test_spec.json new file mode 100644 index 00000000..ea61d557 --- /dev/null +++ b/eventhubs-streamanalytics-eventhubs/test_spec.json @@ -0,0 +1,13 @@ +[ + { + "stage": "2", + "short": "ese1", + "steps": "CIPTMV", + "minutes": "10", + "throughput": "1", + "extra_args": [ + "a", + "simple" + ] + } +] diff --git a/integration-tests/azure-pipelines.yml b/integration-tests/azure-pipelines.yml index 08f70fba..61b7c5a4 100644 --- a/integration-tests/azure-pipelines.yml +++ b/integration-tests/azure-pipelines.yml @@ -10,7 +10,7 @@ jobs: azureSubscription: ARMConnection scriptLocation: 'inlineScript' inlineScript: az vm start -g "$AGENT_VM_RESOURCE_GROUP" -n "$AGENT_VM_NAME" - displayName: 'start agent' + displayName: 'start agent VM' - job: run_tests dependsOn: start_agent @@ -57,3 +57,13 @@ jobs: # Provide service principal (for Azure Data Explorer RBAC setup) addSpnToEnvironment: true displayName: 'pytest stage 3' + +- job: stop_agent + dependsOn: run_tests + steps: + - task: AzureCLI@1 + inputs: + azureSubscription: ARMConnection + scriptLocation: 'inlineScript' + inlineScript: az vm deallocate -g "$AGENT_VM_RESOURCE_GROUP" -n "$AGENT_VM_NAME" --no-wait + displayName: 'stop agent VM' diff --git a/streaming/databricks/job/run-databricks-job.sh b/streaming/databricks/job/run-databricks-job.sh index 36ae6962..cffc77e6 100755 --- a/streaming/databricks/job/run-databricks-job.sh +++ b/streaming/databricks/job/run-databricks-job.sh @@ -22,6 +22,7 @@ wait_for_run () { sleep 10 fi done + echo result_state=$(jq -r ".state.result_state" <<< "$run_info") state_message=$(jq -r ".state.state_message" <<< "$run_info") diff --git a/streaming/databricks/notebooks/verify-eventhubs.scala b/streaming/databricks/notebooks/verify-eventhubs.scala index 144a4c97..68410af9 100644 --- a/streaming/databricks/notebooks/verify-eventhubs.scala +++ b/streaming/databricks/notebooks/verify-eventhubs.scala @@ -37,10 +37,12 @@ val schema = StructType( StructField("processedAt", TimestampType) :: Nil) +val arrayOfEventsSchema = ArrayType(schema) + val stagingTable = "tempresult_" + randomUUID().toString.replace("-","_") var query = streamingData - .select(from_json(decode($"body", "UTF-8"), schema).as("eventData"), $"*") + .select(explode(from_json(decode($"body", "UTF-8"), arrayOfEventsSchema)).as("eventData"), $"*") // When consuming from the output of eventhubs-streamanalytics-eventhubs pipeline, 'enqueuedAt' will haven been // set when reading from the first eventhub, and the enqueued timestamp of the second eventhub is then the 'storedAt' time .select($"eventData.*", $"offset", $"sequenceNumber", $"publisher", $"partitionKey", $"enqueuedTime".as("storedAt"))