Skip to content

Commit

Permalink
Allow specifying time range for import
Browse files Browse the repository at this point in the history
  • Loading branch information
Lun4m committed Dec 13, 2024
1 parent 252b168 commit 93038b6
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 15 deletions.
37 changes: 33 additions & 4 deletions migrations/kvalobs/db/csv_parsers.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@ package db
import (
"bufio"
"migrate/lard"
"migrate/utils"
"slices"
"strconv"
"strings"
"time"
)

func parseDataCSV(tsid int32, rowCount int, scanner *bufio.Scanner) ([][]any, [][]any, error) {
func parseDataCSV(tsid int32, rowCount int, timespan *utils.TimeSpan, scanner *bufio.Scanner) ([][]any, [][]any, error) {
data := make([][]any, 0, rowCount)
flags := make([][]any, 0, rowCount)
var originalPtr, correctedPtr *float32
Expand All @@ -23,6 +24,13 @@ func parseDataCSV(tsid int32, rowCount int, scanner *bufio.Scanner) ([][]any, []
return nil, nil, err
}

if timespan.From != nil && obstime.Sub(*timespan.From) < 0 {
continue
}
if timespan.To != nil && obstime.Sub(*timespan.To) > 0 {
break
}

obsvalue64, err := strconv.ParseFloat(fields[1], 32)
if err != nil {
return nil, nil, err
Expand Down Expand Up @@ -74,7 +82,7 @@ func parseDataCSV(tsid int32, rowCount int, scanner *bufio.Scanner) ([][]any, []
}

// Text obs are not flagged
func parseTextCSV(tsid int32, rowCount int, scanner *bufio.Scanner) ([][]any, error) {
func parseTextCSV(tsid int32, rowCount int, timespan *utils.TimeSpan, scanner *bufio.Scanner) ([][]any, error) {
data := make([][]any, 0, rowCount)
for scanner.Scan() {
// obstime, original, tbtime
Expand All @@ -85,6 +93,13 @@ func parseTextCSV(tsid int32, rowCount int, scanner *bufio.Scanner) ([][]any, er
return nil, err
}

if timespan.From != nil && obstime.Sub(*timespan.From) < 0 {
continue
}
if timespan.To != nil && obstime.Sub(*timespan.To) > 0 {
break
}

lardObs := lard.TextObs{
Id: tsid,
Obstime: obstime,
Expand All @@ -101,7 +116,7 @@ func parseTextCSV(tsid int32, rowCount int, scanner *bufio.Scanner) ([][]any, er
// but should instead be treated as scalars
// TODO: I'm not sure these params should be scalars given that the other cloud types are not.
// Should all cloud types be integers or text?
func parseMetarCloudType(tsid int32, rowCount int, scanner *bufio.Scanner) ([][]any, error) {
func parseMetarCloudType(tsid int32, rowCount int, timespan *utils.TimeSpan, scanner *bufio.Scanner) ([][]any, error) {
data := make([][]any, 0, rowCount)
for scanner.Scan() {
// obstime, original, tbtime
Expand All @@ -112,6 +127,13 @@ func parseMetarCloudType(tsid int32, rowCount int, scanner *bufio.Scanner) ([][]
return nil, err
}

if timespan.From != nil && obstime.Sub(*timespan.From) < 0 {
continue
}
if timespan.To != nil && obstime.Sub(*timespan.To) > 0 {
break
}

val, err := strconv.ParseFloat(fields[1], 32)
if err != nil {
return nil, err
Expand All @@ -134,7 +156,7 @@ func parseMetarCloudType(tsid int32, rowCount int, scanner *bufio.Scanner) ([][]

// Function for paramids 305, 306, 307, 308 that were stored as scalar data
// but should be treated as text
func parseSpecialCloudType(tsid int32, rowCount int, scanner *bufio.Scanner) ([][]any, error) {
func parseSpecialCloudType(tsid int32, rowCount int, timespan *utils.TimeSpan, scanner *bufio.Scanner) ([][]any, error) {
data := make([][]any, 0, rowCount)
for scanner.Scan() {
// obstime, original, tbtime, corrected, controlinfo, useinfo, cfailed
Expand All @@ -146,6 +168,13 @@ func parseSpecialCloudType(tsid int32, rowCount int, scanner *bufio.Scanner) ([]
return nil, err
}

if timespan.From != nil && obstime.Sub(*timespan.From) < 0 {
continue
}
if timespan.To != nil && obstime.Sub(*timespan.To) > 0 {
break
}

lardObs := lard.TextObs{
Id: tsid,
Obstime: obstime,
Expand Down
13 changes: 7 additions & 6 deletions migrations/kvalobs/db/import_functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bufio"
"log/slog"
"migrate/lard"
"migrate/utils"
"os"
"strconv"

Expand All @@ -18,7 +19,7 @@ import (
// - only for histkvalobs
// - 2751, 2752, 2753, 2754 are in `text_data` but should be treated as `data`?

func importData(tsid int32, label *Label, filename, logStr string, pool *pgxpool.Pool) (int64, error) {
func importData(tsid int32, label *Label, filename, logStr string, timespan *utils.TimeSpan, pool *pgxpool.Pool) (int64, error) {
file, err := os.Open(filename)
if err != nil {
slog.Error(logStr + err.Error())
Expand All @@ -36,7 +37,7 @@ func importData(tsid int32, label *Label, filename, logStr string, pool *pgxpool
scanner.Scan()

if label.IsSpecialCloudType() {
text, err := parseSpecialCloudType(tsid, rowCount, scanner)
text, err := parseSpecialCloudType(tsid, rowCount, timespan, scanner)
if err != nil {
slog.Error(logStr + err.Error())
return 0, err
Expand All @@ -51,7 +52,7 @@ func importData(tsid int32, label *Label, filename, logStr string, pool *pgxpool
return count, nil
}

data, flags, err := parseDataCSV(tsid, rowCount, scanner)
data, flags, err := parseDataCSV(tsid, rowCount, timespan, scanner)
count, err := lard.InsertData(data, pool, logStr)
if err != nil {
slog.Error(logStr + err.Error())
Expand All @@ -66,7 +67,7 @@ func importData(tsid int32, label *Label, filename, logStr string, pool *pgxpool
return count, nil
}

func importText(tsid int32, label *Label, filename, logStr string, pool *pgxpool.Pool) (int64, error) {
func importText(tsid int32, label *Label, filename, logStr string, timespan *utils.TimeSpan, pool *pgxpool.Pool) (int64, error) {
file, err := os.Open(filename)
if err != nil {
slog.Error(logStr + err.Error())
Expand All @@ -84,7 +85,7 @@ func importText(tsid int32, label *Label, filename, logStr string, pool *pgxpool
scanner.Scan()

if label.IsMetarCloudType() {
data, err := parseMetarCloudType(tsid, rowCount, scanner)
data, err := parseMetarCloudType(tsid, rowCount, timespan, scanner)
if err != nil {
slog.Error(logStr + err.Error())
return 0, err
Expand All @@ -98,7 +99,7 @@ func importText(tsid int32, label *Label, filename, logStr string, pool *pgxpool
return count, nil
}

text, err := parseTextCSV(tsid, rowCount, scanner)
text, err := parseTextCSV(tsid, rowCount, timespan, scanner)
if err != nil {
slog.Error(logStr + err.Error())
return 0, err
Expand Down
2 changes: 1 addition & 1 deletion migrations/kvalobs/db/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,4 @@ type LabelDumpFunc func(timespan *utils.TimeSpan, pool *pgxpool.Pool, maxConn in
type ObsDumpFunc func(label *Label, timespan *utils.TimeSpan, path string, pool *pgxpool.Pool) error

// Lard Import function
type ImportFunc func(tsid int32, label *Label, filename, logStr string, pool *pgxpool.Pool) (int64, error)
type ImportFunc func(tsid int32, label *Label, filename, logStr string, timespan *utils.TimeSpan, pool *pgxpool.Pool) (int64, error)
11 changes: 7 additions & 4 deletions migrations/kvalobs/import/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ func ImportTable(table *kvalobs.Table, cache *cache.Cache, pool *pgxpool.Pool, c
return 0, err
}

fmt.Printf("Number of stations to dump: %d...\n", len(stations))
importTimespan := config.TimeSpan()
fmt.Printf("Number of stations to import: %d...\n", len(stations))
var rowsInserted int64
for _, station := range stations {
stnr, err := strconv.ParseInt(station.Name(), 10, 32)
Expand Down Expand Up @@ -72,21 +73,23 @@ func ImportTable(table *kvalobs.Table, cache *cache.Cache, pool *pgxpool.Pool, c
return
}

timespan, err := cache.GetSeriesTimespan(label)
tsTimespan, err := cache.GetSeriesTimespan(label)
if err != nil {
slog.Error(logStr + err.Error())
return
}

// TODO: figure out where to get fromtime, kvalobs directly? Stinfosys?
tsid, err := lard.GetTimeseriesID(label.ToLard(), timespan, pool)
tsid, err := lard.GetTimeseriesID(label.ToLard(), tsTimespan, pool)
if err != nil {
slog.Error(logStr + err.Error())
return
}

filename := filepath.Join(stationDir, file.Name())
count, err := table.Import(tsid, label, filename, logStr, pool)
// TODO: it's probably better to dump in different directories
// instead of introducing runtime checks
count, err := table.Import(tsid, label, filename, logStr, importTimespan, pool)
if err != nil {
// Logged inside table.Import
return
Expand Down

0 comments on commit 93038b6

Please sign in to comment.