diff --git a/packages/cdc/README.md b/packages/cdc/README.md new file mode 100644 index 0000000..19baa03 --- /dev/null +++ b/packages/cdc/README.md @@ -0,0 +1,221 @@ +# @diachronic/cdc + +> Parse Temporal database streams with Apache Spark + +Pyspark notebooks are available in `src`. + +## Steps + +### Generate protobuf descriptor file + +Generate protobuf descriptor file. we use one from temporal-api@135691242e9b4ed6214a7b5e1231c1c9930ff6c8. +This should correspond to the version of Temporal we are using. + + +Descriptor file is committed in this package. It works with Temporal v1.22 and was generated with +libprotoc 24.3 from the following: + +```shell +git clone https://github.com/temporalio/api.git + +protoc -I . \ + temporal/api/history/v1/message.proto \ + -o descriptors.binpb \ + --include_imports \ + --include_source_info +``` + +### Use the descriptor to decode the protobuf data in the history_node table + +With Debezium CDC connector for Postgres the CDC records have the following schema: +```sparksql + CREATE TABLE spark_catalog.temporal.history_node_cdc + ( + key STRUCT, + value STRUCT + , after : STRUCT, source + : STRUCT, op : STRING, ts_ms : BIGINT, transaction + : STRUCT>, + offset BIGINT, + timestamp BIGINT, + _rescued_data STRING + ) USING delta TBLPROPERTIES ( + 'delta.minReaderVersion' = '1', + 'delta.minWriterVersion' = '2' + ) +``` + +`data` contains protobuf data that can be decoded using the descriptor file: + +```python +from pyspark.sql.functions import * +from pyspark.sql.protobuf.functions import from_protobuf + +df = df.withColumn( + "proto", + from_protobuf( + df.data, + "History", + descFilePath='/path/to/descriptor/file', + options={"recursive.fields.max.depth": "2"}, + ), +).select( + # Primary key columns (in this order) + "shard_id", + "tree_id", + "branch_id", + "node_id", + "txn_id", + # Adds a row per item in the history array entry. The array item is stored in the entry column and star-expended in the next step + explode("proto.events").alias("entry"), + "prev_txn_id", +).select( + # Repeat all fields from above + "shard_id", + "tree_id", + "branch_id", + "node_id", + "txn_id", + "prev_txn_id", + # Star expand the history entry, effectively adding a column per history event type to the table + "entry.*", +) +``` + +For batch processing we can use windows. Streaming workloads can replace the same with self-joins. In either case it's +a bit complicated to get a coherent story from the data similar to what we see in the Temporal UI. + +```python +from pyspark.sql.window import Window + +# Adds a column workflow_info to each row, where workflow_info is the execution start event of each workflow +with_wf_info = ( + df.withColumn( + "workflow_info", + first( + df.workflow_execution_started_event_attributes, + ignorenulls=True, + ).over( + Window.partitionBy("shard_id", "tree_id").orderBy( + -col("txn_id") + ) + ), + ) + .withColumn( + "run_id", + coalesce( + first( + col("workflow_task_failed_event_attributes.new_run_id"), + ignorenulls=True, + ).over( + Window.partitionBy("shard_id", "tree_id", "branch_id").orderBy( + -col("txn_id") + ) + ), + col("workflow_info.original_execution_run_id"), + ), + ) + .withColumn("workflow_id", col("workflow_info.workflow_id")) + .withColumn("workflow_type", col("workflow_info.workflow_type.name")) + .withColumn( "parent_workflow_id", col("workflow_info.parent_workflow_execution.workflow_id") ) + .withColumn( "parent_workflow_run_id", col("workflow_info.parent_workflow_execution.run_id") ) + # .withColumn("run_id", col("workflow_info.original_execution_run_id")) + .withColumn("first_execution_run_id", col("workflow_info.first_execution_run_id")) + .withColumn( + "prev_execution_run_id", + coalesce( + first( + col("workflow_task_failed_event_attributes.base_run_id"), + ignorenulls=True, + ).over( + Window.partitionBy("shard_id", "tree_id", "branch_id").orderBy( + -col("txn_id") + ) + ), + col("workflow_info.continued_execution_run_id"), + ), + ) + .withColumn( + "task_queue", + coalesce( + col("workflow_info.task_queue.normal_name"), + col("workflow_info.task_queue.name"), + ), + ) + # Select all columns in the order we want to view them in + .select( + "workflow_id", + "run_id", + "workflow_type", + "event_time", + "event_type", + "parent_workflow_id", + "parent_workflow_run_id", + "first_execution_run_id", + "prev_execution_run_id", + "task_queue", + "event_id", + "workflow_info", + "workflow", + "workflow_execution_started_event_attributes", + "workflow_execution_completed_event_attributes", + "workflow_execution_failed_event_attributes", + "workflow_execution_timed_out_event_attributes", + "workflow_task_scheduled_event_attributes", + "workflow_task_started_event_attributes", + "workflow_task_completed_event_attributes", + "workflow_task_timed_out_event_attributes", + "workflow_task_failed_event_attributes", + "activity_task_scheduled_event_attributes", + "activity_task_started_event_attributes", + "activity_task_completed_event_attributes", + "activity_task_failed_event_attributes", + "activity_task_timed_out_event_attributes", + "timer_started_event_attributes", + "timer_fired_event_attributes", + "activity_task_cancel_requested_event_attributes", + "activity_task_canceled_event_attributes", + "timer_canceled_event_attributes", + "marker_recorded_event_attributes", + "workflow_execution_signaled_event_attributes", + "workflow_execution_terminated_event_attributes", + "workflow_execution_cancel_requested_event_attributes", + "workflow_execution_canceled_event_attributes", + "request_cancel_external_workflow_execution_initiated_event_attributes", + "request_cancel_external_workflow_execution_failed_event_attributes", + "external_workflow_execution_cancel_requested_event_attributes", + "workflow_execution_continued_as_new_event_attributes", + "start_child_workflow_execution_initiated_event_attributes", + "start_child_workflow_execution_failed_event_attributes", + "child_workflow_execution_started_event_attributes", + "child_workflow_execution_completed_event_attributes", + "child_workflow_execution_failed_event_attributes", + "child_workflow_execution_canceled_event_attributes", + "child_workflow_execution_timed_out_event_attributes", + "child_workflow_execution_terminated_event_attributes", + "signal_external_workflow_execution_initiated_event_attributes", + "signal_external_workflow_execution_failed_event_attributes", + "external_workflow_execution_signaled_event_attributes", + "upsert_workflow_search_attributes_event_attributes", + "workflow_execution_update_accepted_event_attributes", + "workflow_execution_update_rejected_event_attributes", + "workflow_execution_update_completed_event_attributes", + "workflow_properties_modified_externally_event_attributes", + "activity_properties_modified_externally_event_attributes", + "workflow_properties_modified_event_attributes", + "shard_id", + "tree_id", + "branch_id", + "node_id", + "txn_id", + # "prev_txn_id", + "task_id", + "version", + "worker_may_ignore", + ) +) +``` diff --git a/packages/cdc/src/descriptors.binpb b/packages/cdc/src/descriptors.binpb new file mode 100644 index 0000000..d1609c3 Binary files /dev/null and b/packages/cdc/src/descriptors.binpb differ diff --git a/packages/cdc/src/temporal-process-history_node_cdc.ipynb b/packages/cdc/src/temporal-process-history_node_cdc.ipynb new file mode 100644 index 0000000..a5004be --- /dev/null +++ b/packages/cdc/src/temporal-process-history_node_cdc.ipynb @@ -0,0 +1,1787 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "7372ff3f-b186-4c21-88c6-819f44ce3203", + "showTitle": false, + "title": "" + }, + "jp-MarkdownHeadingCollapsed": true + }, + "source": [ + "## Overview\n", + "Temporal provides an append-only table `history_node` that contains all data.\n", + "It deletes rows from the table but only to save space.\n", + "\n", + "## Generating a descriptor file for `temporal.api.history.v1.History` (array of `HistoryEvent`)\n", + "```shell\n", + "git clone https://github.com/temporalio/api.git\n", + "cd api\n", + "protoc -I . \\ \n", + " temporal/api/history/v1/message.proto \\\n", + " -o descriptors.binpb \\\n", + " --include_imports \\\n", + " --include_source_info\n", + "```\n", + "\n", + "The output file `descriptors.binpb` should be placed somewhere Spark can read it. In Databricks, a convenient place is DBFS. This can be done via the UI by clicking \"upload\" in a folder in the file explorer and selecting the file, or programmatically via the Databricks API.\n", + "\n", + "## References\n", + "- https://github.com/temporalio/api/tree/v1.24.0\n", + "- https://spark.apache.org/docs/latest/sql-data-sources-protobuf.html\n", + "- https://docs.gcp.databricks.com/structured-streaming/protocol-buffers.html\n", + "- https://buf.build/docs/reference/descriptors#generating-and-exchanging-descriptors\n" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "d3855226-c161-4df5-a3a7-de513bff3d74", + "showTitle": false, + "title": "" + } + }, + "source": [ + "## `temporal.history_node_cdc`\n", + "- We save this in Databricks as a delta table for performance only\n", + "- The CDC records contain deletes. We do not care about this at all because the table is append only. Deletes are an implementation detail of Temporal that are part of garbage collection. They are useful only if we want to analyze how it cleans up the database, or something" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "2667c0ee-2577-4195-85ec-110accc4a4be", + "showTitle": false, + "title": "" + } + }, + "outputs": [], + "source": [ + "# raw = spark.read.format(\"parquet\").load(\"gs://path/to/history-cdc/\").dropDuplicates()\n", + "# spark.sql(\"CREATE DATABASE IF NOT EXISTS temporal\")\n", + "\n", + "# raw.write.format(\"delta\").mode(\"overwrite\").saveAsTable(\"temporal.history_node_cdc\")" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "collapsed": false, + "jupyter": { + "outputs_hidden": false + } + }, + "source": [ + "The history node table from Debezium / Kafka CDC has the following schema:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "67323d44-a4be-478f-83e8-5058067a3f94", + "showTitle": false, + "title": "" + } + }, + "outputs": [], + "source": [ + "# %sql\n", + "# CREATE TABLE spark_catalog.temporal.history_node_cdc (\n", + "# key STRUCT < shard_id: INT,\n", + "# tree_id: BINARY,\n", + "# branch_id: BINARY,\n", + "# node_id: BIGINT,\n", + "# txn_id: BIGINT >,\n", + "# value STRUCT < before: STRUCT < shard_id: INT,\n", + "# tree_id: BINARY,\n", + "# branch_id: BINARY,\n", + "# node_id: BIGINT,\n", + "# txn_id: BIGINT,\n", + "# data: BINARY,\n", + "# data_encoding: STRING,\n", + "# prev_txn_id: BIGINT >,\n", + "# after: STRUCT < shard_id: INT,\n", + "# tree_id: BINARY,\n", + "# branch_id: BINARY,\n", + "# node_id: BIGINT,\n", + "# txn_id: BIGINT,\n", + "# data: BINARY,\n", + "# data_encoding: STRING,\n", + "# prev_txn_id: BIGINT >,\n", + "# source: STRUCT < version: STRING,\n", + "# connector: STRING,\n", + "# name: STRING,\n", + "# ts_ms: BIGINT,\n", + "# snapshot: STRING,\n", + "# db: STRING,\n", + "# sequence: STRING,\n", + "# schema: STRING,\n", + "# table: STRING,\n", + "# txId: BIGINT,\n", + "# lsn: BIGINT,\n", + "# xmin: BIGINT >,\n", + "# op: STRING,\n", + "# ts_ms: BIGINT,\n", + "# transaction: STRUCT < id: STRING,\n", + "# total_order: BIGINT,\n", + "# data_collection_order: BIGINT > >,\n", + "# offset BIGINT,\n", + "# timestamp BIGINT,\n", + "# _rescued_data STRING\n", + "# ) USING delta TBLPROPERTIES (\n", + "# 'delta.minReaderVersion' = '1',\n", + "# 'delta.minWriterVersion' = '2'\n", + "# )" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "collapsed": false, + "jupyter": { + "outputs_hidden": false + } + }, + "source": [ + "This is a fast way to copy in batch in Databricks and uses checkpointing." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "dda4ea02-c0ca-4b57-8dca-973112497c9e", + "showTitle": false, + "title": "" + } + }, + "outputs": [], + "source": [ + "%sql\n", + "CREATE TABLE IF NOT EXISTS `temporal`.`history_node_cdc`;\n", + "COPY INTO temporal.history_node_cdc \n", + "FROM 'gs://path/to/history-node-cdc/'\n", + "FILEFORMAT = parquet\n", + "COPY_OPTIONS ('mergeSchema' = 'true')" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "collapsed": false, + "jupyter": { + "outputs_hidden": false + } + }, + "source": [ + "Print out information about the CDC source data from Debezium/Kafka (optional)." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "ExecuteTime": { + "end_time": "2024-03-30T14:44:57.665973Z", + "start_time": "2024-03-30T14:44:57.522020Z" + }, + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "bac16329-4728-4c92-909f-1944c5afc60c", + "showTitle": false, + "title": "" + } + }, + "outputs": [], + "source": [ + "from pyspark.sql.functions import col\n", + "\n", + "n_deletes = spark.table(\"temporal.history_node_cdc\").where(col(\"value.after\").isNull()).count()\n", + "n_cdc_records = spark.table(\"temporal.history_node_cdc\").count()\n", + "n_table_rows = spark.table(\"temporal.history_node_cdc\").where(col(\"value.after\").isNotNull()).count()\n", + "n_modifications = spark.table(\"temporal.history_node_cdc\").where(col('value.before').isNotNull()).where(col(\"value.after\").isNotNull()).count()\n", + "\n", + "print({\n", + " 'n_deletes': n_deletes,\n", + " 'n_modifications': n_modifications,\n", + " 'n_cdc_records': n_cdc_records,\n", + " 'n_table_rows': n_table_rows\n", + "})" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "f27736f2-33e9-4e8f-97a6-2a688d3ccc4b", + "showTitle": false, + "title": "" + } + }, + "source": [ + "# Temporal History\n", + "\n", + "Provides the `\"temporal.history\"` table from Temporal's raw `history_node` table with the following properties:\n", + "- All event log / history fields at the top level of the table (decoded from their protobuf representation)\n", + "- All rows have `workflow_info` which contains workflow id and other core identifying information about the workflow that the row belongs to\n", + "- This is optimized for batch workloads. For streaming we cannot use Window. Instead, we can swap with self joins." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "ebbfce4a-32c9-4cf3-ba19-67da09ee34da", + "showTitle": false, + "title": "" + }, + "is_executing": true + }, + "outputs": [], + "source": [ + "from pyspark.sql.functions import *\n", + "from pyspark.sql.protobuf.functions import to_protobuf, from_protobuf\n", + "from pyspark.sql.window import Window\n", + "\n", + "\n", + "# Get Temporal history_node table\n", + "input_df = (\n", + " spark.read.table(\"temporal.history_node_cdc\")\n", + " .where(col(\"value.after\").isNotNull())\n", + " .select(col(\"value.after.*\"))\n", + " .dropDuplicates()\n", + ")\n", + "\n", + "# Order history_node table such that workflow executions are grouped and within each one their events are in ascending order\n", + "history_node_df = input_df#.orderBy(\"tree_id\", \"branch_id\", \"node_id\", \"txn_id\")\n", + "\n", + "\n", + "# Decode the data column and spread all columns from it on the same level as the history_node table columns\n", + "temporal_history_node_proto_descriptor_filepath = (\n", + " \"./descriptors.binpb\"\n", + ")\n", + "\n", + "history_node_exploded_proto = (\n", + " input_df.withColumn(\n", + " \"proto\",\n", + " from_protobuf(\n", + " input_df.data,\n", + " \"History\",\n", + " descFilePath=temporal_history_node_proto_descriptor_filepath,\n", + " options={\"recursive.fields.max.depth\": \"2\"},\n", + " ),\n", + " )\n", + " .select(\n", + " # Primary key columns (in this order)\n", + " \"shard_id\",\n", + " \"tree_id\",\n", + " \"branch_id\",\n", + " \"node_id\",\n", + " \"txn_id\",\n", + " # Adds a row per item in the history array entry. The array item is stored in the entry column and star-expended in the next step\n", + " explode(\"proto.events\").alias(\"entry\"),\n", + " \"prev_txn_id\",\n", + " )\n", + " .select(\n", + " # Repeat all fields from above\n", + " \"shard_id\",\n", + " \"tree_id\",\n", + " \"branch_id\",\n", + " \"node_id\",\n", + " \"txn_id\",\n", + " # Star expand the history entry, effectively adding a column per history event type to the table\n", + " \"entry.*\",\n", + " \"prev_txn_id\",\n", + " )\n", + ")\n", + "\n", + "# Adds a column workflow_info to each row, where workflow_info is the execution start event of each workflow\n", + "with_wf_info = (\n", + " history_node_exploded_proto.withColumn(\n", + " \"workflow_info\",\n", + " first(\n", + " history_node_exploded_proto.workflow_execution_started_event_attributes,\n", + " ignorenulls=True,\n", + " ).over(\n", + " Window.partitionBy(\"shard_id\", \"tree_id\").orderBy(\n", + " -col(\"txn_id\")\n", + " )\n", + " ),\n", + " )\n", + " .withColumn(\n", + " \"run_id\",\n", + " coalesce(\n", + " first(\n", + " col(\"workflow_task_failed_event_attributes.new_run_id\"),\n", + " ignorenulls=True,\n", + " ).over(\n", + " Window.partitionBy(\"shard_id\", \"tree_id\", \"branch_id\").orderBy(\n", + " -col(\"txn_id\")\n", + " )\n", + " ),\n", + " col(\"workflow_info.original_execution_run_id\"),\n", + " ),\n", + " )\n", + " .withColumn(\"workflow_id\", col(\"workflow_info.workflow_id\"))\n", + " .withColumn(\"workflow_type\", col(\"workflow_info.workflow_type.name\"))\n", + " .withColumn(\n", + " \"parent_workflow_id\", col(\"workflow_info.parent_workflow_execution.workflow_id\")\n", + " )\n", + " .withColumn(\n", + " \"parent_workflow_run_id\", col(\"workflow_info.parent_workflow_execution.run_id\")\n", + " )\n", + " # .withColumn(\"run_id\", col(\"workflow_info.original_execution_run_id\"))\n", + " .withColumn(\"first_execution_run_id\", col(\"workflow_info.first_execution_run_id\"))\n", + " .withColumn(\n", + " \"prev_execution_run_id\",\n", + " coalesce(\n", + " first(\n", + " col(\"workflow_task_failed_event_attributes.base_run_id\"),\n", + " ignorenulls=True,\n", + " ).over(\n", + " Window.partitionBy(\"shard_id\", \"tree_id\", \"branch_id\").orderBy(\n", + " -col(\"txn_id\")\n", + " )\n", + " ),\n", + " col(\"workflow_info.continued_execution_run_id\"),\n", + " ),\n", + " )\n", + " .withColumn(\n", + " \"task_queue\",\n", + " coalesce(\n", + " col(\"workflow_info.task_queue.normal_name\"),\n", + " col(\"workflow_info.task_queue.name\"),\n", + " ),\n", + " )\n", + " # Select all columns in the order we want to view them in\n", + " .select(\n", + " \"workflow_id\",\n", + " \"run_id\",\n", + " \"workflow_type\",\n", + " \"event_time\",\n", + " \"event_type\",\n", + " \"parent_workflow_id\",\n", + " \"parent_workflow_run_id\",\n", + " \"first_execution_run_id\",\n", + " \"prev_execution_run_id\",\n", + " \"temporal_ui_link\",\n", + " \"task_queue\",\n", + " \"event_id\",\n", + " \"workflow_info\",\n", + " \"workflow\",\n", + " \"workflow_execution_started_event_attributes\",\n", + " \"workflow_execution_completed_event_attributes\",\n", + " \"workflow_execution_failed_event_attributes\",\n", + " \"workflow_execution_timed_out_event_attributes\",\n", + " \"workflow_task_scheduled_event_attributes\",\n", + " \"workflow_task_started_event_attributes\",\n", + " \"workflow_task_completed_event_attributes\",\n", + " \"workflow_task_timed_out_event_attributes\",\n", + " \"workflow_task_failed_event_attributes\",\n", + " \"activity_task_scheduled_event_attributes\",\n", + " \"activity_task_started_event_attributes\",\n", + " \"activity_task_completed_event_attributes\",\n", + " \"activity_task_failed_event_attributes\",\n", + " \"activity_task_timed_out_event_attributes\",\n", + " \"timer_started_event_attributes\",\n", + " \"timer_fired_event_attributes\",\n", + " \"activity_task_cancel_requested_event_attributes\",\n", + " \"activity_task_canceled_event_attributes\",\n", + " \"timer_canceled_event_attributes\",\n", + " \"marker_recorded_event_attributes\",\n", + " \"workflow_execution_signaled_event_attributes\",\n", + " \"workflow_execution_terminated_event_attributes\",\n", + " \"workflow_execution_cancel_requested_event_attributes\",\n", + " \"workflow_execution_canceled_event_attributes\",\n", + " \"request_cancel_external_workflow_execution_initiated_event_attributes\",\n", + " \"request_cancel_external_workflow_execution_failed_event_attributes\",\n", + " \"external_workflow_execution_cancel_requested_event_attributes\",\n", + " \"workflow_execution_continued_as_new_event_attributes\",\n", + " \"start_child_workflow_execution_initiated_event_attributes\",\n", + " \"start_child_workflow_execution_failed_event_attributes\",\n", + " \"child_workflow_execution_started_event_attributes\",\n", + " \"child_workflow_execution_completed_event_attributes\",\n", + " \"child_workflow_execution_failed_event_attributes\",\n", + " \"child_workflow_execution_canceled_event_attributes\",\n", + " \"child_workflow_execution_timed_out_event_attributes\",\n", + " \"child_workflow_execution_terminated_event_attributes\",\n", + " \"signal_external_workflow_execution_initiated_event_attributes\",\n", + " \"signal_external_workflow_execution_failed_event_attributes\",\n", + " \"external_workflow_execution_signaled_event_attributes\",\n", + " \"upsert_workflow_search_attributes_event_attributes\",\n", + " \"workflow_execution_update_accepted_event_attributes\",\n", + " \"workflow_execution_update_rejected_event_attributes\",\n", + " \"workflow_execution_update_completed_event_attributes\",\n", + " \"workflow_properties_modified_externally_event_attributes\",\n", + " \"activity_properties_modified_externally_event_attributes\",\n", + " \"workflow_properties_modified_event_attributes\",\n", + " \"shard_id\",\n", + " \"tree_id\",\n", + " \"branch_id\",\n", + " \"node_id\",\n", + " \"txn_id\",\n", + " # \"prev_txn_id\",\n", + " \"task_id\",\n", + " \"version\",\n", + " \"worker_may_ignore\",\n", + " )\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "b2ea4960-89ee-40cb-a5d1-86a9b8223d7b", + "showTitle": false, + "title": "" + } + }, + "source": [ + "## Invariants\n", + "\n", + "Derivations require the following invariants about the base tables to hold:\n", + "\n", + "For the base history node table in Databricks we assume 1 row per the primary key Temporal uses in Postgres: `\"shard_id\"` + `\"tree_id\"` + `\"branch_id\"` + `\"node_id\"` + `\"txn_id\"`. \n", + "\n", + "For the decoded protobuf tables, we assume 1 row per `\"shard_id\"` + `\"tree_id\"` + `\"branch_id\"` + `\"node_id\"` + `\"event_id\"`, where `\"event_id\"` a value from the decoded protobuf array items for History. This is a monotonically increasing number that Temporal uses to identify log entries in their API.\n", + "\n", + "The code in the following cell expresses these invariants. It throws errors if our assumptions about the base data are ever violated, stopping updates (or creation) of the tables that we use for further computations." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "2f2fa985-ee00-4e37-b60f-287da0cc2bf5", + "showTitle": false, + "title": "" + } + }, + "outputs": [], + "source": [ + "history_node_primary_key = [\"shard_id\", \"tree_id\", \"branch_id\", \"node_id\", \"txn_id\"]\n", + "\n", + "# Source data has one row per primary key\n", + "assert (\n", + " input_df.orderBy(col(\"timestamp\").desc())\n", + " .groupBy(*history_node_primary_key)\n", + " .agg(count(\"*\").alias('ct'))\n", + " .where(col(\"ct\") > 1)\n", + " .count()\n", + " == 0\n", + ")\n", + "\n", + "# Exploded proto has one one row per shard_id + tree_id + branch_id + node_id (from the source table) + event_id (from the decoded and exploded protobuf)\n", + "# (expected if CDC table rows are edited/updated and/or temporal appends to the table only)\n", + "assert(\n", + " history_node_exploded_proto.groupBy(\n", + " \"shard_id\", \"tree_id\", \"branch_id\", \"node_id\", \"event_id\"\n", + " )\n", + " .agg(count(\"event_id\").alias(\"ct_event\"))\n", + " .where(col(\"ct_event\") > 1)\n", + " .count()\n", + " == 0\n", + ")\n", + "\n", + "# The table with workflow info should have no repeats for the same criteria\n", + "assert(\n", + " with_wf_info.groupBy(\"shard_id\", \"tree_id\", \"branch_id\", \"node_id\", \"event_id\")\n", + " .agg(count(\"event_id\").alias(\"ct_event\"))\n", + " .where(col(\"ct_event\") > 1)\n", + " .count()\n", + " == 0\n", + ")\n", + "\n", + "# All workflow started event rows have event_type workflow_execution_started,\n", + "# non-null column `workflow_execution_started_event_attributes`\n", + "# and event_id.= 1\n", + "assert (\n", + " with_wf_info.where(\n", + " (col(\"event_type\") == \"EVENT_TYPE_WORKFLOW_EXECUTION_STARTED\")\n", + " & (col(\"workflow_execution_started_event_attributes\").isNotNull())\n", + " & (col(\"event_id\") != 1)\n", + " ).count()\n", + " == 0\n", + ")\n", + "\n", + "# All rows have workflow_id\n", + "assert(with_wf_info.where(col(\"workflow_id\").isNull()).count() == 0)" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "a4bacbd3-5d57-4040-bcde-e7e395045974", + "showTitle": false, + "title": "" + } + }, + "source": [ + "## Drop Columns\n", + "- We can always add these back\n", + "- We expect they are not needed by anything from here on\n", + "- Anything we leave will be needed for something or other" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "764cf51f-3e15-4061-b145-a6ca8a89f341", + "showTitle": false, + "title": "" + } + }, + "outputs": [], + "source": [ + "with_wf_info = with_wf_info.drop(\n", + " # 100% missing\n", + " \"version\",\n", + " \"worker_may_ignore\",\n", + "\n", + " # Primary key attributes of the Postgres table\n", + " # Used to derive ordering and uniqueness, don't expect to need them past this point. If we do we can add them back\n", + " \"shard_id\",\n", + " \"tree_id\",\n", + " \"branch_id\",\n", + " \"node_id\",\n", + " \"txn_id\",\n", + " \"task_id\",\n", + " # \"event_id\"\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "b19d0752-ce02-4514-bb12-59ef421c505a", + "showTitle": false, + "title": "" + } + }, + "source": [ + "## Drop Rows\n", + "- Ignore history of temporal system workflows" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "8f8812b4-ff13-4f5c-a9a9-4220be3f8501", + "showTitle": false, + "title": "" + } + }, + "outputs": [], + "source": [ + "is_ignored_workflow_type = col(\"workflow_type\").isin(\n", + " [\n", + " \"temporal-sys-batch-workflow\",\n", + " \"ExecuteBlobStoreCleanupCron\",\n", + " \"RunScheduleSyncCron\",\n", + " \"UpdateTimedOutWorkflowRuns\",\n", + " \"temporalCloudAuthRotationWorkflow\"\n", + " \"temporal-sys-history-scanner-workflow\"\n", + " \"temporal-sys-tq-scanner-workflow\",\n", + " ]\n", + ")\n", + "with_wf_info = with_wf_info.where(~is_ignored_workflow_type)" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "5c799821-ba8d-41dd-9bf3-7e51ab0a3bf5", + "showTitle": false, + "title": "" + } + }, + "source": [ + "## Save table" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "42bb63b6-a4d2-45de-8b3b-3ecbc5955c6b", + "showTitle": false, + "title": "" + } + }, + "outputs": [], + "source": [ + "with_wf_info.write.format(\"delta\").mode(\"overwrite\").saveAsTable(\"temporal.history\")" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "02da8177-8c9b-45c8-b808-a930e21f0707", + "showTitle": false, + "title": "" + } + }, + "source": [ + "# Temporal Workflows\n", + "\n", + "Produces a table `temporal.workflows` with the following properties:\n", + "\n", + "- The set of all workflows, independent of executions. This simply means unique by `\"workflow_id\"`.\n", + "- Excludes child workflows.\n", + "- The status of the workflow, one of \"RUNNING\", \"TERMINATED\", \"COMPLETED\". This is derived from the latest execution that corresponds to the workflow_id.\n", + "- The time the workflow started.\n", + "- The time the workflow completed, if any.\n", + "\n" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "2f2e327b-22f4-4ebe-bb71-cc408fad74f6", + "showTitle": false, + "title": "" + } + }, + "source": [ + "## Running Workflows\n", + "Derives from `temporal.history` the workflows that are root workflows, meaning\n", + "1. they have started events\n", + "2. they have no parent\n", + "3. there is (at least) 1 run for which there is no corresponding event that terminates the run (terminated, failed, completed, cancelled, continued as new)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "109434f2-1516-41ca-a84f-cd06b7e48aa9", + "showTitle": false, + "title": "" + } + }, + "outputs": [], + "source": [ + "is_terminal_event_type = col(\"event_type\").isin(\n", + " [\n", + " \"EVENT_TYPE_WORKFLOW_EXECUTION_CANCELED\",\n", + " \"EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED\",\n", + " \"EVENT_TYPE_WORKFLOW_EXECUTION_TERMINATED\",\n", + " \"EVENT_TYPE_WORKFLOW_EXECUTION_FAILED\",\n", + " \"EVENT_TYPE_WORKFLOW_EXECUTION_CONTINUED_AS_NEW\",\n", + " \"EVENT_TYPE_WORKFLOW_EXECUTION_TIMED_OUT\",\n", + " ]\n", + ")\n", + "\n", + "wind = Window.partitionBy(\"workflow_id\").orderBy(col(\"event_time\").desc())\n", + "# a better name in this context might be execution or workflow execution to disambiguate from other uses of workflow\n", + "running_workflows = (\n", + " # Inconsistent with the Temporal UI, which shows children and root workflows together\n", + " with_wf_info.where(col(\"parent_workflow_id\").isNull()) \n", + " .withColumn(\"latest_event_time\", max(col(\"event_time\")).over(wind))\n", + " .where(col(\"event_time\") == col(\"latest_event_time\"))\n", + " .where(~is_terminal_event_type)\n", + ")\n" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "8fa4a841-6a61-4473-b190-2c580c076983", + "showTitle": false, + "title": "" + } + }, + "source": [ + "### Invariants\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "a7adfcfd-bbfe-452e-9596-1a1c553d6bc9", + "showTitle": false, + "title": "" + } + }, + "outputs": [], + "source": [ + "# No duplicate workflow ids for running workflows\n", + "assert (\n", + " running_workflows.dropDuplicates([\"workflow_id\"]).count()\n", + " == running_workflows.count()\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "5c3b5992-fa32-49ad-8aba-ae6664911d1e", + "showTitle": false, + "title": "" + } + }, + "source": [ + "## Completed Workflows\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "ef60b0cb-fdd8-4ca9-8acf-2c3a1f156bb6", + "showTitle": false, + "title": "" + } + }, + "outputs": [], + "source": [ + "completed_workflows = (\n", + " with_wf_info\n", + " .where(col(\"event_type\") == \"EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED\")\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "42688f2b-8146-4fec-8fe8-134f12776457", + "showTitle": false, + "title": "" + } + }, + "outputs": [], + "source": [ + "# TODO. move. maybe this is reusable\n", + "def get_histories_for_runs(wfs: DataFrame, history: DataFrame):\n", + " wfs.select(\"workflow_id\", \"run_id\").alias(\"runs\").join(\n", + " history.alias(\"history\"),\n", + " [\"workflow_id\", \"run_id\"],\n", + " \"inner\",\n", + " ).orderBy(\"workflow_id\", \"run_id\", \"event_id\")\n", + "\n", + "\n", + "completed_of_example_type = (\n", + " completed_workflows.where(col(\"workflow_type\") == \"example\")\n", + " .select(\"workflow_id\", \"run_id\")\n", + " .alias(\"runs\")\n", + " .join(\n", + " with_wf_info.alias(\"history\"),\n", + " [\"workflow_id\", \"run_id\"],\n", + " \"inner\",\n", + " )\n", + " .orderBy(\"workflow_id\", \"run_id\", \"event_id\")\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "9c481879-08c8-458b-b8fa-c5bbe8c3c777", + "showTitle": false, + "title": "" + } + }, + "source": [ + "## Summary\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "0343eeed-f30c-4754-8c64-72990a0126b6", + "showTitle": false, + "title": "" + } + }, + "outputs": [], + "source": [ + "summary = {\n", + " \"running\": running_workflows.count(),\n", + " \"completed\": completed_workflows.count(),\n", + "}\n", + "\n", + "print(summary)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "b7bbd417-70c7-42fa-a936-8e66db01d9f5", + "showTitle": false, + "title": "" + } + }, + "outputs": [], + "source": [ + "event_type = [\n", + " \"EVENT_TYPE_ACTIVITY_TASK_CANCEL_REQUESTED\",\n", + " \"EVENT_TYPE_ACTIVITY_TASK_COMPLETED\",\n", + " \"EVENT_TYPE_ACTIVITY_TASK_FAILED\",\n", + " \"EVENT_TYPE_ACTIVITY_TASK_SCHEDULED\",\n", + " \"EVENT_TYPE_ACTIVITY_TASK_STARTED\",\n", + " \"EVENT_TYPE_ACTIVITY_TASK_TIMED_OUT\",\n", + "\n", + " \"EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_COMPLETED\",\n", + " \"EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_STARTED\",\n", + "\n", + " \"EVENT_TYPE_EXTERNAL_WORKFLOW_EXECUTION_SIGNALED\",\n", + "\n", + " \"EVENT_TYPE_MARKER_RECORDED\",\n", + " \n", + " \"EVENT_TYPE_REQUEST_CANCEL_EXTERNAL_WORKFLOW_EXECUTION_INITIATED\",\n", + " \"EVENT_TYPE_SIGNAL_EXTERNAL_WORKFLOW_EXECUTION_INITIATED\",\n", + " \n", + " \"EVENT_TYPE_START_CHILD_WORKFLOW_EXECUTION_FAILED\",\n", + " \"EVENT_TYPE_START_CHILD_WORKFLOW_EXECUTION_INITIATED\",\n", + " \n", + " \"EVENT_TYPE_TIMER_CANCELED\",\n", + " \"EVENT_TYPE_TIMER_FIRED\",\n", + " \"EVENT_TYPE_TIMER_STARTED\",\n", + " \n", + " \"EVENT_TYPE_UPSERT_WORKFLOW_SEARCH_ATTRIBUTES\",\n", + " \n", + " \"EVENT_TYPE_WORKFLOW_EXECUTION_CANCELED\",\n", + " \"EVENT_TYPE_WORKFLOW_EXECUTION_CANCEL_REQUESTED\",\n", + " \"EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED\",\n", + " \"EVENT_TYPE_WORKFLOW_EXECUTION_CONTINUED_AS_NEW\",\n", + " \"EVENT_TYPE_WORKFLOW_EXECUTION_FAILED\",\n", + " \"EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED\",\n", + " \"EVENT_TYPE_WORKFLOW_EXECUTION_STARTED\",\n", + " \"EVENT_TYPE_WORKFLOW_EXECUTION_TERMINATED\",\n", + " \"EVENT_TYPE_WORKFLOW_EXECUTION_TIMED_OUT\",\n", + "\n", + " \n", + " \"EVENT_TYPE_WORKFLOW_PROPERTIES_MODIFIED\",\n", + " \n", + " \"EVENT_TYPE_WORKFLOW_TASK_COMPLETED\",\n", + " \"EVENT_TYPE_WORKFLOW_TASK_FAILED\",\n", + " \"EVENT_TYPE_WORKFLOW_TASK_SCHEDULED\",\n", + " \"EVENT_TYPE_WORKFLOW_TASK_STARTED\",\n", + " \"EVENT_TYPE_WORKFLOW_TASK_TIMED_OUT\",\n", + "]\n", + "\n", + "workflow_execution = [\n", + " \"EVENT_TYPE_WORKFLOW_EXECUTION_CANCELED\",\n", + " \"EVENT_TYPE_WORKFLOW_EXECUTION_CANCEL_REQUESTED\",\n", + " \"EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED\",\n", + " \"EVENT_TYPE_WORKFLOW_EXECUTION_CONTINUED_AS_NEW\",\n", + " \"EVENT_TYPE_WORKFLOW_EXECUTION_FAILED\",\n", + " \"EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED\",\n", + " \"EVENT_TYPE_WORKFLOW_EXECUTION_STARTED\",\n", + " \"EVENT_TYPE_WORKFLOW_EXECUTION_TERMINATED\",\n", + " \"EVENT_TYPE_WORKFLOW_EXECUTION_TIMED_OUT\"\n", + "]\n", + "\n", + "timer_event_type = [\n", + " \"EVENT_TYPE_TIMER_CANCELED\",\n", + " \"EVENT_TYPE_TIMER_FIRED\",\n", + " \"EVENT_TYPE_TIMER_STARTED\",\n", + "]\n", + "\n", + "activity_event_type = [\n", + " \"EVENT_TYPE_ACTIVITY_TASK_CANCEL_REQUESTED\",\n", + " \"EVENT_TYPE_ACTIVITY_TASK_COMPLETED\",\n", + " \"EVENT_TYPE_ACTIVITY_TASK_FAILED\",\n", + " \"EVENT_TYPE_ACTIVITY_TASK_SCHEDULED\",\n", + " \"EVENT_TYPE_ACTIVITY_TASK_STARTED\",\n", + " \"EVENT_TYPE_ACTIVITY_TASK_TIMED_OUT\",\n", + "]\n", + "\n", + "child_workflow_event_type = [\n", + " \"EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_COMPLETED\",\n", + " \"EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_STARTED\",\n", + " \"EVENT_TYPE_START_CHILD_WORKFLOW_EXECUTION_FAILED\",\n", + " \"EVENT_TYPE_START_CHILD_WORKFLOW_EXECUTION_INITIATED\",\n", + "]" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "a7f515c8-92d7-4199-bd53-e86c2f22d8f6", + "showTitle": false, + "title": "" + } + }, + "source": [ + "# Temporal Activities\n", + "\n", + "Derives a table `temporal.activities` from the `temporal.history` table with the following properties:\n", + "- Matches activity input to activity success or failure\n", + "- Decodes activity input / output / error columns from base64 string to JSON encoded string (`input`, `output`, `error` columns)\n" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "a78e316c-b4fc-4de3-bb99-1ef3511ecfbe", + "showTitle": false, + "title": "" + } + }, + "source": [ + "## Activity History" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "054925b4-27d7-4515-bee4-1ce9a60becc9", + "showTitle": false, + "title": "" + } + }, + "outputs": [], + "source": [ + "activity_history = (\n", + " spark.table(\"temporal.history\")\n", + " .where(col(\"event_type\").isin(activity_event_type))\n", + " .withColumn(\n", + " \"input\",\n", + " col(\"activity_task_scheduled_event_attributes.input.payloads\")[0][\"data\"].cast(\n", + " \"string\"\n", + " ),\n", + " )\n", + " .withColumn(\n", + " \"output\",\n", + " col(\"activity_task_completed_event_attributes.result.payloads\")[0][\"data\"].cast(\n", + " \"string\"\n", + " ),\n", + " )\n", + " .withColumn(\n", + " \"error\",\n", + " # Provides any JSON data that was returned from an activity. There is often richer data like error messages and stacktraces in the raw column\n", + " col(\n", + " \"activity_task_failed_event_attributes.failure.application_failure_info.details.payloads\"\n", + " )[0][\"data\"].cast(\"string\"),\n", + " )\n", + " .select(\n", + " \"workflow_id\",\n", + " \"run_id\",\n", + " \"workflow_type\",\n", + " \"event_time\",\n", + " \"event_type\",\n", + " \"parent_workflow_id\",\n", + " \"parent_workflow_run_id\",\n", + " \"first_execution_run_id\",\n", + " \"prev_execution_run_id\",\n", + " \"temporal_ui_link\",\n", + " \"task_queue\",\n", + " \"event_id\",\n", + " \"workflow_info\",\n", + " \"workflow\",\n", + " \"input\",\n", + " \"output\",\n", + " \"error\",\n", + " \"activity_task_scheduled_event_attributes\",\n", + " \"activity_task_started_event_attributes\",\n", + " \"activity_task_completed_event_attributes\",\n", + " \"activity_task_failed_event_attributes\",\n", + " \"activity_task_timed_out_event_attributes\",\n", + " \"activity_task_cancel_requested_event_attributes\",\n", + " \"activity_task_canceled_event_attributes\",\n", + " )\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "687a6824-2619-4289-8498-63d3babf741b", + "showTitle": false, + "title": "" + } + }, + "source": [ + "## Activity Calls" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "d9e48115-2797-4abc-9ddc-75dabf8e9197", + "showTitle": false, + "title": "" + } + }, + "outputs": [], + "source": [ + "# Temporal only puts the activity's name on the scheduled event \n", + "# Most of this is just linking all other activity events with the activity name\n", + "# As a side effect we end up establishing the identity of an activity invocation, spreading an id\n", + "# globally unique by workflow_id+run_id+activity_scheduled_event_id across one or more rows\n", + "#\n", + "# In the second part we use this to form the activity_calls table. \n", + "# This collapses the different activity events that make up an invocation into one row.\n", + "\n", + "activity_name_event = (\n", + " activity_history\n", + " .withColumn(\n", + " \"activity_type\",\n", + " col(\"activity_task_scheduled_event_attributes.activity_type.name\"),\n", + " )\n", + " .withColumn(\n", + " \"activity_task_queue\",\n", + " coalesce(\n", + " col(\"activity_task_scheduled_event_attributes.task_queue.normal_name\"),\n", + " col(\"activity_task_scheduled_event_attributes.task_queue.name\"),\n", + " ),\n", + " )\n", + " .withColumnRenamed(\"task_queue\",'workflow_task_queue')\n", + " .where(col(\"activity_type\").isNotNull())\n", + " .select(\n", + " \"workflow_id\",\n", + " \"run_id\",\n", + " col(\"event_id\").alias(\"activity_event_id\"),\n", + " \"activity_type\",\n", + " \"activity_task_queue\",\n", + " \"workflow_task_queue\"\n", + " )\n", + ")\n", + "\n", + "with_activity_evt_id = activity_history.withColumn(\n", + " \"activity_event_id\",\n", + " coalesce(\n", + " when(\n", + " col(\"activity_task_scheduled_event_attributes\").isNotNull(),\n", + " col(\"event_id\"),\n", + " ),\n", + " when(\n", + " col(\"activity_task_started_event_attributes\").isNotNull(),\n", + " col(\"activity_task_started_event_attributes.scheduled_event_id\"),\n", + " ),\n", + " when(\n", + " col(\"activity_task_completed_event_attributes\").isNotNull(),\n", + " col(\"activity_task_completed_event_attributes.scheduled_event_id\"),\n", + " ),\n", + " when(\n", + " col(\"activity_task_failed_event_attributes\").isNotNull(),\n", + " col(\"activity_task_failed_event_attributes.scheduled_event_id\"),\n", + " ),\n", + " when(\n", + " col(\"activity_task_timed_out_event_attributes\").isNotNull(),\n", + " col(\"activity_task_timed_out_event_attributes.scheduled_event_id\"),\n", + " ),\n", + " ),\n", + ")\n", + "\n", + "activity_invocation_window = Window.partitionBy(\n", + " \"workflow_id\", \"run_id\", \"activity_event_id\"\n", + ")\n", + "\n", + "activity_calls = (\n", + " with_activity_evt_id.join(\n", + " activity_name_event, [\"workflow_id\", \"run_id\", \"activity_event_id\"]\n", + " )\n", + " .withColumn(\n", + " \"time_scheduled\",\n", + " first(\n", + " when(\n", + " col(\"event_type\") == \"EVENT_TYPE_ACTIVITY_TASK_SCHEDULED\",\n", + " col(\"event_time\"),\n", + " ),\n", + " ignorenulls=True,\n", + " ).over(activity_invocation_window),\n", + " )\n", + " .withColumn(\n", + " \"time_started\",\n", + " first(\n", + " when(\n", + " col(\"event_type\") == \"EVENT_TYPE_ACTIVITY_TASK_STARTED\",\n", + " col(\"event_time\"),\n", + " ),\n", + " ignorenulls=True,\n", + " ).over(activity_invocation_window),\n", + " )\n", + " .withColumn(\n", + " \"time_completed\",\n", + " first(\n", + " when(\n", + " col(\"event_type\").isin(\n", + " [\n", + " \"EVENT_TYPE_ACTIVITY_TASK_COMPLETED\",\n", + " \"EVENT_TYPE_ACTIVITY_TASK_FAILED\",\n", + " \"EVENT_TYPE_ACTIVITY_TASK_TIMED_OUT\",\n", + " ]\n", + " ),\n", + " col(\"event_time\"),\n", + " ),\n", + " ignorenulls=True,\n", + " ).over(activity_invocation_window),\n", + " )\n", + " .withColumn(\n", + " \"activity_task_started_event_attributes\",\n", + " first(col(\"activity_task_started_event_attributes\"), ignorenulls=True).over(\n", + " activity_invocation_window\n", + " ),\n", + " )\n", + " .withColumn(\n", + " \"activity_task_completed_event_attributes\",\n", + " first(col(\"activity_task_completed_event_attributes\"), ignorenulls=True).over(\n", + " activity_invocation_window\n", + " ),\n", + " )\n", + " .withColumn(\n", + " \"activity_task_failed_event_attributes\",\n", + " first(col(\"activity_task_failed_event_attributes\"), ignorenulls=True).over(\n", + " activity_invocation_window\n", + " ),\n", + " )\n", + " .withColumn(\n", + " \"activity_task_timed_out_event_attributes\",\n", + " first(col(\"activity_task_timed_out_event_attributes\"), ignorenulls=True).over(\n", + " activity_invocation_window\n", + " ),\n", + " )\n", + " .withColumn(\n", + " \"activity_task_cancel_requested_event_attributes\",\n", + " first(\n", + " col(\"activity_task_cancel_requested_event_attributes\"), ignorenulls=True\n", + " ).over(activity_invocation_window),\n", + " )\n", + " .withColumn(\n", + " \"activity_task_canceled_event_attributes\",\n", + " first(col(\"activity_task_canceled_event_attributes\"), ignorenulls=True).over(\n", + " activity_invocation_window\n", + " ),\n", + " )\n", + " .withColumn(\n", + " \"input\",\n", + " first(col(\"input\"), ignorenulls=True).over(activity_invocation_window),\n", + " )\n", + " .withColumn(\n", + " \"output\",\n", + " first(col(\"output\"), ignorenulls=True).over(activity_invocation_window),\n", + " )\n", + " .withColumn(\n", + " \"error\",\n", + " first(col(\"error\"), ignorenulls=True).over(activity_invocation_window),\n", + " )\n", + " .dropDuplicates([\"workflow_id\", \"run_id\", \"activity_event_id\"])\n", + " .select(\n", + " \"workflow_id\",\n", + " \"run_id\",\n", + " \"workflow_type\",\n", + " \"activity_type\",\n", + " \"time_scheduled\",\n", + " \"time_started\",\n", + " \"time_completed\",\n", + " \"input\",\n", + " \"output\",\n", + " \"error\",\n", + " \"parent_workflow_id\",\n", + " \"parent_workflow_run_id\",\n", + " \"first_execution_run_id\",\n", + " \"prev_execution_run_id\",\n", + " \"temporal_ui_link\",\n", + " \"task_queue\",\n", + " \"event_id\",\n", + " \"workflow_info\",\n", + " \"workflow\",\n", + " \"activity_task_scheduled_event_attributes\",\n", + " \"activity_task_started_event_attributes\",\n", + " \"activity_task_completed_event_attributes\",\n", + " \"activity_task_failed_event_attributes\",\n", + " \"activity_task_timed_out_event_attributes\",\n", + " \"activity_task_cancel_requested_event_attributes\",\n", + " \"activity_task_canceled_event_attributes\",\n", + " )\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "3762e053-0bad-4589-b613-77120f5a00be", + "showTitle": false, + "title": "" + } + }, + "source": [ + "### Save Table" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "41829520-cc99-4610-9b18-759738eb2e85", + "showTitle": false, + "title": "" + } + }, + "outputs": [], + "source": [ + "activity_calls.write.format(\"delta\").mode(\"overwrite\").option(\"overwriteSchema\",True).saveAsTable(\"temporal.activity_calls\")" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "a07e0a6b-cd70-41f4-9f2f-4b60e80ecd3c", + "showTitle": false, + "title": "" + } + }, + "source": [ + "## Activities Tables" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "b442f5d9-b9d8-4f93-ae5c-884614961178", + "showTitle": false, + "title": "" + } + }, + "outputs": [], + "source": [ + "# TODO. can we use temporal.activity_calls if we partition by activity_type?\n", + "(\n", + " activity_calls.write.format(\"delta\")\n", + " .partitionBy(\"activity_type\")\n", + " .mode(\"overwrite\")\n", + " .saveAsTable(\"tmp_activity_calls_by_activity_type\")\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "93a09d44-19fe-46ee-ae9a-967c5c06f97d", + "showTitle": false, + "title": "" + } + }, + "outputs": [], + "source": [ + "# for now we save to activites.{activity_type}\n", + "spark.sql(\"CREATE DATABASE IF NOT EXISTS activities\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "466fa66a-e8de-4580-b994-d3ffd5fd1a90", + "showTitle": false, + "title": "" + } + }, + "outputs": [], + "source": [ + "# Import generated Diachronic types Python Package\n", + "from diachronic.types.activities import schemas as activity_schemas\n", + "import multiprocessing\n", + "from threading import Thread\n", + "from queue import Queue\n", + "from pyspark.sql.functions import *\n", + "\n", + "by_activity_type = spark.table(\"tmp_activity_calls_by_activity_type\")\n", + "\n", + "\n", + "def run(f, q: Queue):\n", + " while not q.empty():\n", + " activity_type, schema = q.get()\n", + " f(activity_type, schema)\n", + " q.task_done()\n", + "\n", + "\n", + "def fn(activity_type, schema):\n", + " try:\n", + " has_input = True if schema.get(\"input\", None) is not None else False\n", + " has_output = True if schema.get(\"output\", None) is not None else False\n", + " has_error = True if schema.get(\"error\", None) is not None else False\n", + "\n", + " table = (\n", + " by_activity_type.where(col(\"activity_type\") == activity_type)\n", + " .withColumn(\n", + " \"input\", from_json(\"input\", schema[\"input\"]) if has_input else col(\"input\")\n", + " )\n", + " .withColumn(\n", + " \"output\",\n", + " from_json(\"output\", schema[\"output\"]) if has_output else col(\"output\"),\n", + " )\n", + " .withColumn(\n", + " \"error\", from_json(\"error\", schema[\"error\"]) if has_error else col(\"error\")\n", + " )\n", + " )\n", + " \n", + " table.write.format(\"delta\").mode(\"overwrite\").saveAsTable(\n", + " f\"activities.{activity_type}\"\n", + " )\n", + " except Exception as e:\n", + " print(f\"this failed {activity_type}\", e)\n", + "\n", + "\n", + "num_cores = multiprocessing.cpu_count()\n", + "\n", + "q = Queue()\n", + "\n", + "for (activity_type, schema) in activity_schemas.items():\n", + " q.put((activity_type, schema))\n", + "\n", + "for i in range(num_cores):\n", + " # print(\"core\", i)\n", + " t = Thread(target=run, args=(fn, q), name=f\"activity_tables-{i}\")\n", + " # t.daemon = True\n", + " t.start()\n", + "\n", + "q.join()" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "d2a48fa4-f249-440c-835b-2bdaf9cc04b3", + "showTitle": false, + "title": "" + } + }, + "source": [ + "# Temporal Signals\n", + "\n", + "Derives table `temporal.signals` from `temporal.history` with the following properties:\n", + "\n", + "- Table contains only \"workflow execution signaled\" // TODO. decide if we want to include child workflow signals or any others temporal encodes\n", + "- Each row has a column with the JSON encoded `signal_payload`\n", + "- Each row has columns for each Embedded Insurance signal type with JSON decoded `signal_payload`. Only one of these is populated per row since it corresponds to the signal type definition. The name of the column for the decoded data is derived from `signal_type` via the function `signal_column_name`." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "6ab0f4da-0ef7-457c-9bf2-97499791a692", + "showTitle": false, + "title": "" + } + }, + "outputs": [], + "source": [ + "from pyspark.sql.functions import col\n", + "\n", + "with_wf_info = spark.table(\"temporal.history\")\n", + "\n", + "signals = (\n", + " spark.table(\"temporal.history\").where(\n", + " # TODO. we may wish to include other signal types in this batch (child wfs may have special event_type for this, others)\n", + " (col(\"event_type\") == \"EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED\")\n", + " )\n", + " .withColumn(\n", + " \"signal_type\",\n", + " col(\"workflow_execution_signaled_event_attributes.signal_name\"),\n", + " )\n", + " .withColumn(\n", + " \"signal_payload\",\n", + " col(\"workflow_execution_signaled_event_attributes.input.payloads\")[0][\n", + " \"data\"\n", + " ].cast(\"string\"),\n", + " )\n", + " .select(\n", + " \"event_time\",\n", + " \"signal_type\",\n", + " \"signal_payload\",\n", + " \"workflow_info\",\n", + " \"workflow\",\n", + " \"workflow_id\",\n", + " \"workflow_type\",\n", + " \"parent_workflow_id\",\n", + " \"parent_workflow_run_id\",\n", + " \"run_id\",\n", + " \"first_execution_run_id\",\n", + " \"prev_execution_run_id\",\n", + " \"temporal_ui_link\",\n", + " \"task_queue\",\n", + " \"event_id\",\n", + " # \"event_type\",\n", + " # \"workflow_execution_signaled_event_attributes\",\n", + " )\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "828df5d9-6919-46a5-bd2e-4b697e9b87b5", + "showTitle": false, + "title": "" + } + }, + "source": [ + "## Save table" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "fe86b299-2855-48f1-b242-ea20762c4ef5", + "showTitle": false, + "title": "" + } + }, + "outputs": [], + "source": [ + "signals.write.format(\"delta\").mode(\"overwrite\").option(\"mergeSchema\",\"true\").saveAsTable(\"temporal.signals\")" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "35e737e9-d640-4fbd-8a3b-d432352ab0cd", + "showTitle": false, + "title": "" + } + }, + "source": [ + "## Signals Tables\n", + "\n", + "Create 1 table per signal type.\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "d78ce128-6e41-471b-b210-f858d4f014db", + "showTitle": false, + "title": "" + } + }, + "outputs": [], + "source": [ + "def omit(m: dict, ks: list[str]):\n", + " return {k: v for k, v in m.items() if k not in ks}" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "706adeab-e6b1-4b26-bc9c-1e0bc1c3fa72", + "showTitle": false, + "title": "" + } + }, + "outputs": [], + "source": [ + "from diachronic.types.signals import schemas\n", + "\n", + "# Helpers: signals\n", + "def signal_sql_name(s: str) -> str:\n", + " \"\"\"Normalizes signal names for usage as Spark SQL tables or columns.\"\"\"\n", + " s2 = s.replace(\".\", \"_\").replace(\"-\",\"_\")\n", + " return s2" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "e0bbae66-d322-4596-aab5-dd5bc552e49f", + "showTitle": false, + "title": "" + } + }, + "outputs": [], + "source": [ + "generated_tables_map_fully_qualified = {k: f\"signals.{signal_sql_name(k)}\" for k in schemas.keys() }\n", + "\n", + "generated_tables_map_fully_qualified" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "44d817c8-3aa8-4237-8333-afeb22ace94f", + "showTitle": false, + "title": "" + } + }, + "outputs": [], + "source": [ + "# above should be new temporal.signals (everything is a string) or some other name\n", + "# from which we now derive the following individuated signals tables (schematized, exploded):\n", + "\n", + "from diachronic.types.signals import schemas\n", + "from pyspark.sql import DataFrame\n", + "from pyspark.sql.functions import from_json\n", + "\n", + "\n", + "def create_signals_tables(\n", + " schemas: dict[str, str], table_map: dict[str, str], base_df: DataFrame\n", + ") -> dict[str, DataFrame]:\n", + " return {\n", + " table_name: base_df.where(col(\"signal_type\") == signal_type)\n", + " .withColumn(\n", + " \"payload\", from_json(base_df[\"signal_payload\"], schema=schemas[signal_type])\n", + " )\n", + " .drop(\"signal_type\", \"signal_payload\")\n", + " .select(\"payload.*\", \"*\")\n", + " for signal_type, table_name in table_map.items()\n", + " }\n", + "\n", + "\n", + "signal_tables = create_signals_tables(\n", + " schemas, generated_tables_map_fully_qualified, signals\n", + ")\n", + "\n", + "\n", + "# for name, df in signal_tables.items():\n", + "# display(df)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "6bcd543e-8ef7-46a0-bc52-bee0e370a8b4", + "showTitle": false, + "title": "" + } + }, + "outputs": [], + "source": [ + "%sql\n", + "create database if not exists signals" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "a8448bea-e474-4dc6-aa9b-168376b278db", + "showTitle": false, + "title": "" + } + }, + "source": [ + "### Save tables" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "3afdb47e-51b9-48c6-809b-c1d226092ff6", + "showTitle": false, + "title": "" + } + }, + "outputs": [], + "source": [ + "for table_name, df in signal_tables.items():\n", + " df.write.format(\"delta\").mode(\"overwrite\").option(\"overwriteSchema\", True).saveAsTable(table_name)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "15646256-c7c9-4f7d-869a-4131a4c6e5e0", + "showTitle": false, + "title": "" + } + }, + "outputs": [], + "source": [ + "# import threading\n", + "# for thread in threading.enumerate():\n", + "# print(thread.name)" + ] + } + ], + "metadata": { + "application/vnd.databricks.v1+notebook": { + "dashboards": [], + "language": "python", + "notebookMetadata": {}, + "notebookName": "temporal-process-history_node_cdc", + "widgets": {} + }, + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.9.6" + } + }, + "nbformat": 4, + "nbformat_minor": 4 +}