Skip to content

Commit

Permalink
Add new parameters for Spark jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
JulienPeloton committed May 24, 2024
1 parent c6421ee commit db330b2
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 15 deletions.
8 changes: 8 additions & 0 deletions _e2e/finkctl.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,17 @@ stream2raw:

# {{.Night}} is optional and will be replaced by the value of run.night
kafka_topic: ztf-stream-{{.Night}}

max_offsets_per_trigger: 5000
#
# Parameters used to run the raw2science task
#
raw2science:
# Override the default value of run.instances
instances: 4
memory: 3Gi

mmconfigpath: no-config
#
# Parameters used to run the distribution task
#
Expand All @@ -49,6 +53,10 @@ distribution:
distribution_servers: "kafka-cluster-kafka-external-bootstrap.kafka:9094"
distribution_schema: "/home/fink/fink-alert-schemas/ztf/distribution_schema_0p2.avsc"
substream_prefix: "fink_"
kafka_buffer_memory: 134217728
kafka_delivery_timeout_ms: 240000

mmconfigpath: no-config
#
# Parameters used to access the S3 bucket
#
Expand Down
20 changes: 13 additions & 7 deletions cmd/run_distribution.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,16 @@ type KafkaCreds struct {
}

type DistributionConfig struct {
Cpu string `mapstructure:"cpu"`
DistributionServers string `mapstructure:"distribution_servers"`
Memory string `mapstructure:"memory"`
SubstreamPrefix string `mapstructure:"substream_prefix"`
DistributionSchema string `mapstructure:"distribution_schema"`
Night string
KafkaCreds KafkaCreds `mapstructure:"kafka"`
Cpu string `mapstructure:"cpu"`
DistributionServers string `mapstructure:"distribution_servers"`
Memory string `mapstructure:"memory"`
SubstreamPrefix string `mapstructure:"substream_prefix"`
DistributionSchema string `mapstructure:"distribution_schema"`
KafkaBufferMemory string `mapstructure:"kafka_buffer_memory"`
KafkaDeliveryTimeoutMs string `mapstructure:"kafka_delivery_timeout_ms"`
MmConfigPath string `mapstructure:"mmconfigpath"`
Night string
KafkaCreds KafkaCreds `mapstructure:"kafka"`
}

// distributionCmd represents the distribution command
Expand All @@ -51,6 +54,9 @@ var distributionCmd = &cobra.Command{
cmdTpl := sparkCmd + `-distribution_servers "{{ .DistributionServers }}" \
-distribution_schema "{{ .DistributionSchema }}" \
-substream_prefix "{{ .SubstreamPrefix }}" \
-kafka_buffer_memory "{{ .KafkaBufferMemory }}" \
-kafka_delivery_timeout_ms "{{ .KafkaDeliveryTimeoutMs }}" \
-mmconfigpath "{{ .MmConfigPath }}" \
-night "{{ .Night }}"`

createExecutorPodTemplate(rc.PodTemplateFile)
Expand Down
5 changes: 3 additions & 2 deletions cmd/run_raw2science.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ import (
const RAW2SCIENCE string = "raw2science"

type Raw2ScienceConfig struct {
Night string
MmConfigPath string `mapstructure:"mmconfigpath"`
Night string
}

// raw2scienceCmd represents the raw2science command
Expand All @@ -33,7 +34,7 @@ shared file system and send it to Kafka streams.`,

sparkCmd, rc := generateSparkCmd(RAW2SCIENCE)

cmdTpl := sparkCmd + `-night "{{ .Night }}"`
cmdTpl := sparkCmd + `-night "{{ .Night }}" -mmconfigpath "{{ .MmConfigPath }}"`
c := getRaw2ScienceConfig(rc.Night)
sparkCmd = format(cmdTpl, &c)
ExecCmd(sparkCmd)
Expand Down
14 changes: 8 additions & 6 deletions cmd/run_stream2raw.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@ import (
const STREAM2RAW string = "stream2raw"

type Stream2RawConfig struct {
KafkaSocket string `mapstructure:"kafka_socket"`
KafkaTopic string `mapstructure:"kafka_topic"`
FinkAlertSchema string `mapstructure:"fink_alert_schema"`
KafkaStartingOffset string `mapstructure:"kafka_starting_offset"`
Night string
KafkaSocket string `mapstructure:"kafka_socket"`
KafkaTopic string `mapstructure:"kafka_topic"`
FinkAlertSchema string `mapstructure:"fink_alert_schema"`
KafkaStartingOffset string `mapstructure:"kafka_starting_offset"`
MaxOffsetsPerTrigger string `mapstructure:"max_offsets_per_trigger"`
Night string
}

// stream2rawCmd represents the stream2raw command
Expand All @@ -42,7 +43,8 @@ and writes it to a shared file system for further processing and analysis.`,
cmdTpl := sparkCmd + `-servers "{{ .KafkaSocket }}" \
-schema "{{ .FinkAlertSchema }}" \
-startingoffsets_stream "{{ .KafkaStartingOffset }}" \
-topic "{{ .KafkaTopic }}"`
-topic "{{ .KafkaTopic }}" \
-max_offsets_per_trigger "{{ .MaxOffsetsPerTrigger }}"`
c := getStream2RawConfig(rc.Night)
sparkCmd = format(cmdTpl, &c)

Expand Down

0 comments on commit db330b2

Please sign in to comment.