diff --git a/cloud/tq/queuehandler.go b/cloud/tq/queuehandler.go index ae5a5e8c..7072cd12 100644 --- a/cloud/tq/queuehandler.go +++ b/cloud/tq/queuehandler.go @@ -201,6 +201,9 @@ func (qh *ChannelQueueHandler) handleLoop(next api.TaskPipe, bucketOpts ...optio } } else { log.Println("No task files") + task.Queue = "" + task.Update(state.Done) + task.Delete() } } log.Println(qh.Queue, "waiting for deduper to close") diff --git a/dispatch/deduphandler.go b/dispatch/deduphandler.go index 6147952c..cb1d18e2 100644 --- a/dispatch/deduphandler.go +++ b/dispatch/deduphandler.go @@ -222,12 +222,13 @@ func (dh *DedupHandler) handleLoop(opts ...option.ClientOption) { // feeding channel is closed, and processing is complete. func NewDedupHandler(opts ...option.ClientOption) *DedupHandler { project := os.Getenv("PROJECT") + dataset := os.Getenv("DATASET") // When running in prod, the task files and queues are in mlab-oti, but the destination // BigQuery tables are in measurement-lab. - if project == "mlab-oti" { + // However, for sidestream private tables, we leave them in mlab-oti + if project == "mlab-oti" && dataset != "private" { project = "measurement-lab" // destination for production tables. } - dataset := os.Getenv("DATASET") msg := make(chan state.Task) rsp := make(chan error) dh := DedupHandler{project, dataset, msg, rsp} @@ -312,4 +313,3 @@ func Dedup(dsExt *bqext.Dataset, src string, destTable *bigquery.Table) (*bigque } return job, nil } - diff --git a/k8s/data-processing-cluster/deployments/etl-gardener.yml b/k8s/data-processing-cluster/deployments/etl-gardener.yml index ecd605d7..49dd19a6 100644 --- a/k8s/data-processing-cluster/deployments/etl-gardener.yml +++ b/k8s/data-processing-cluster/deployments/etl-gardener.yml @@ -44,13 +44,13 @@ spec: - name: PROJECT value: {{GCLOUD_PROJECT}} - name: TASKFILE_BUCKET - value: "archive-{{GCLOUD_PROJECT}}" + value: "scraper-{{GCLOUD_PROJECT}}" # NOTE: if we start deleting unembargoed files from scraper, this will no longer work. - name: START_DATE - value: "20180301" + value: "20170601" - name: EXPERIMENTS - value: "sidestream,ndt" # For example "ndt,sidestream,switch" + value: "sidestream" # For example "ndt,sidestream,switch" - name: DATASET - value: batch + value: private - name: FINAL_DATASET value: "" # e.g. base_tables - name: QUEUE_BASE