Skip to content

Commit

Permalink
better metadata upload
Browse files Browse the repository at this point in the history
ref #3 & #4
  • Loading branch information
wibeasley committed Oct 17, 2018
1 parent 80da602 commit 35ae665
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 142 deletions.
175 changes: 73 additions & 102 deletions dal/import-79-metadata.R
Original file line number Diff line number Diff line change
Expand Up @@ -126,33 +126,22 @@ col_types_mapping <- readr::cols_only(

)











# ---- load-data ---------------------------------------------------------------
start_time <- Sys.time()

ds_mapping <- readr::read_csv(file.path(directory_in, "_mapping.csv"), col_types=col_types_mapping)
ds_mapping


ds_file <- lst_col_types %>%
tibble::enframe(value = "col_types") %>%
dplyr::mutate(
path = file.path(directory_in, paste0(name, ".csv")),
path = file.path(directory_in, paste0(name, ".csv")),
exists = purrr::map_lgl(path, file.exists)
# col_types = purrr::map(name, function(x) lst_col_types[[x]]),
exists = purrr::map_lgl(path, file.exists)
) %>%
dplyr::select(name, path, dplyr::everything())
ds_file

# ---- load-data ---------------------------------------------------------------
start_time <- Sys.time()

ds_mapping <- readr::read_csv(file.path(directory_in, "_mapping.csv"), col_types=col_types_mapping)
ds_mapping

testit::assert("All metadata files must exist.", all(ds_file$exists))

ds_entries <- ds_file %>%
Expand All @@ -174,14 +163,17 @@ rm(directory_in) # rm(col_types_tulsa)

# ---- tweak-data --------------------------------------------------------------
# OuhscMunge::column_rename_headstart(ds_county) #Spit out columns to help write call to `dplyr::rename()`.
if( shallow_only ) {
ds_mapping <- ds_mapping %>%
dplyr::filter(.data$shallow)
}
ds_mapping

ds_file <- ds_file %>%
dplyr::left_join( ds_mapping, by=c("name"="table_name")) %>%
dplyr::inner_join(ds_mapping, by=c("name"="table_name")) %>%
dplyr::mutate(
table_name = paste0("tbl", name),
sql_delete = glue::glue("DELETE FROM {schema_name}.{table_name};")
# table_name = paste0(schema_name, ".tbl", name),
# sql_delete = paste0("DELETE FROM ", table_name)
) %>%
dplyr::left_join(
ds_entries %>%
Expand Down Expand Up @@ -243,19 +235,56 @@ ds_enum %>%
dplyr::pull(enum_cs) %>%
cat()

# ---- verify-values -----------------------------------------------------------
# ---- verify-values-deep -----------------------------------------------------------
# Sniff out problems
# testit::assert("The month value must be nonmissing & since 2000", all(!is.na(ds$month) & (ds$month>="2012-01-01")))
# testit::assert("The county_id value must be nonmissing & positive.", all(!is.na(ds$county_id) & (ds$county_id>0)))
# testit::assert("The county_id value must be in [1, 77].", all(ds$county_id %in% seq_len(77L)))
# testit::assert("The region_id value must be nonmissing & positive.", all(!is.na(ds$region_id) & (ds$region_id>0)))
# testit::assert("The region_id value must be in [1, 20].", all(ds$region_id %in% seq_len(20L)))
# testit::assert("The `fte` value must be nonmissing & positive.", all(!is.na(ds$fte) & (ds$fte>=0)))
# # testit::assert("The `fmla_hours` value must be nonmissing & nonnegative", all(is.na(ds$fmla_hours) | (ds$fmla_hours>=0)))
#
# testit::assert("The County-month combination should be unique.", all(!duplicated(paste(ds$county_id, ds$month))))
# testit::assert("The Region-County-month combination should be unique.", all(!duplicated(paste(ds$region_id, ds$county_id, ds$month))))
# table(paste(ds$county_id, ds$month))[table(paste(ds$county_id, ds$month))>1]
if( !shallow_only ) {
d_extract_source <- ds_file %>%
dplyr::filter(name=="LUExtractSource") %>%
dplyr::pull(entries) %>%
purrr::flatten_df()

d_item <- ds_file %>%
dplyr::filter(name=="item") %>%
dplyr::pull(entries) %>%
purrr::flatten_df()

checkmate::assert_integer( d_item$ID , lower=1, upper=2^15 , any.missing=F, unique=T)
checkmate::assert_character(d_item$Label , pattern="^\\w+" , any.missing=F, unique=T)


d_variable <- ds_file %>%
dplyr::filter(name=="variable") %>%
dplyr::pull(entries) %>%
purrr::flatten_df() %>%
dplyr::mutate(
item_found = (ExtractSource %in% d_extract_source$ID),
extract_found = (Item %in% d_item$ID),
unique_index = paste(Item, Generation, SurveySource, SurveyYear, LoopIndex)
) %>%
dplyr::group_by(unique_index) %>%
dplyr::mutate(
unique_index_violation = (1L < n()),
variables_codes = paste(VariableCode, collapse = "; ")
) %>%
dplyr::ungroup()


pattern_unique_index <- "^\\d{1,5} \\d \\d (\\d|\\d{4}) \\d{1,3}$"
checkmate::assert_character(d_variable$VariableCode , pattern="^[A-Z]\\d{7}$" , any.missing=F, unique=T)
checkmate::assert_integer( d_variable$Item , lower=0 , any.missing=F)
checkmate::assert_logical( d_variable$item_found , any.missing=F)
testit::assert("All items referenced from the variables should be in the item table.", all(d_variable$item_found))
testit::assert("All extract sources referenced from the variables should be in the item table.", all(d_variable$extract_found))
checkmate::assert_character(d_variable$unique_index , pattern=pattern_unique_index , any.missing=F, unique=T)

# d_variable %>%
# dplyr::filter(unique_index_violation)
#
# d_variable %>%
# dplyr::filter(!grepl(pattern_unique_index, unique_index))

rm(d_item, d_variable)
}

# ---- specify-columns-to-upload -----------------------------------------------

Expand All @@ -278,12 +307,14 @@ DBI::dbGetInfo(channel)
channel_rodbc <- open_dsn_channel_rodbc(study)
RODBC::odbcGetInfo(channel_rodbc)

# Clear process tables
delete_results_process <- ds_table_process$sql_truncate %>%
purrr::set_names(ds_table_process$table_name) %>%
rev() %>%
purrr::map(DBI::dbGetQuery, conn=channel)
delete_results_process
if( !shallow_only ) {
# Clear process tables
delete_results_process <- ds_table_process$sql_truncate %>%
purrr::set_names(ds_table_process$table_name) %>%
rev() %>%
purrr::map(DBI::dbGetQuery, conn=channel)
delete_results_process
}

# Delete metadata tables
# delete_result <- RODBC::sqlQuery(channel, "DELETE FROM [NlsLinks].[Metadata].[tblVariable]", errors=FALSE)
Expand Down Expand Up @@ -312,6 +343,7 @@ purrr::pmap_int(
ds_file$schema_name
),
function( d, table_name, schema_name ) {
message("Writing to table ", table_name)
# browser()
# DBI::dbWriteTable(
# conn = channel,
Expand All @@ -337,68 +369,7 @@ purrr::pmap_int(
append = TRUE
)
}
) #%>%
# purrr::set_names(ds_file$table_name)
# a <- ds_file$entries[[15]]
# table(a$ID)

# RODBC::sqlSave(
# channel = channel_rodbc,
# dat = ds_file$entries[[16]][, ],
# tablename = "Metadata.tblVariable",
# safer = TRUE, # Don't keep the existing table.
# rownames = FALSE,
# append = TRUE
# )

# DBI::dbWriteTable(
# conn = channel,
# name = DBI::Id(catalog="NlsyLinks79", schema="Metadata", table="tblv"),
# value = ds_file$entries[[15]][1:10, 2],
# overwrite = FALSE,
# append = F
# )

# for( i in seq_len(nrow(ds_file)) ) {
# message(glue::glue("Uploading from `{ basename(ds_file$path)[i]}` to `{ds_file$table_name[i]}`."))
#
# d <- ds_file$entries[[i]]
# print(d)
#
# # RODBC::sqlQuery(channel, ds_extract$sql_truncate[i], errors=FALSE)
#
# # d_peek <- RODBC::sqlQuery(channel, ds_extract$sql_select[i], errors=FALSE)
# #
# # missing_in_extract <- setdiff(colnames(d_peek), colnames(d))
# # missing_in_database <- setdiff(colnames(d), colnames(d_peek))
# #
# # d_column <- tibble::tibble(
# # db = colnames(d),
# # extract = colnames(d_peek)
# # ) %>%
# # dplyr::filter(db != extract)
# #
# # RODBC::sqlSave(
# # channel = channel,
# # dat = d,
# # tablename = ds_extract$table_name[i],
# # safer = TRUE, # Don't keep the existing table.
# # rownames = FALSE,
# # append = TRUE
# # ) %>%
# # print()
#
# OuhscMunge::upload_sqls_rodbc(
# d = d,
# table_name = ds_file$table_name[i] ,
# dsn_name = "local-nlsy-links",
# clear_table = T,
# create_table = F
# )
#
#
# message(glue::glue("{format(object.size(d), units='MB')}"))
# }
)

# Close channel
DBI::dbDisconnect(channel); rm(channel)
Expand Down
52 changes: 12 additions & 40 deletions dal/import-97-metadata.R
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,7 @@ ds_file
start_time <- Sys.time()

ds_mapping <- readr::read_csv(file.path(directory_in, "_mapping.csv"), col_types=col_types_mapping)


ds_mapping

testit::assert("All metadata files must exist.", all(ds_file$exists))

Expand All @@ -164,7 +163,6 @@ rm(directory_in) # rm(col_types_tulsa)

# ---- tweak-data --------------------------------------------------------------
# OuhscMunge::column_rename_headstart(ds_county) #Spit out columns to help write call to `dplyr::rename()`.

if( shallow_only ) {
ds_mapping <- ds_mapping %>%
dplyr::filter(.data$shallow)
Expand Down Expand Up @@ -265,7 +263,8 @@ if( !shallow_only ) {
) %>%
dplyr::group_by(unique_index) %>%
dplyr::mutate(
unique_index_violation = (1L < n())
unique_index_violation = (1L < n()),
variables_codes = paste(VariableCode, collapse = "; ")
) %>%
dplyr::ungroup()

Expand All @@ -278,6 +277,12 @@ if( !shallow_only ) {
testit::assert("All extract sources referenced from the variables should be in the item table.", all(d_variable$extract_found))
checkmate::assert_character(d_variable$unique_index , pattern=pattern_unique_index , any.missing=F, unique=T)

# d_variable %>%
# dplyr::filter(unique_index_violation)
#
# d_variable %>%
# dplyr::filter(!grepl(pattern_unique_index, unique_index))

rm(d_item, d_variable)
}

Expand All @@ -302,14 +307,15 @@ DBI::dbGetInfo(channel)
channel_rodbc <- open_dsn_channel_rodbc(study)
RODBC::odbcGetInfo(channel_rodbc)

if( !shallow_only ){
# Clear process tables
if( !shallow_only ) {
# Clear process tables
delete_results_process <- ds_table_process$sql_truncate %>%
purrr::set_names(ds_table_process$table_name) %>%
rev() %>%
purrr::map(DBI::dbGetQuery, conn=channel)
delete_results_process
}

# Delete metadata tables
# delete_result <- RODBC::sqlQuery(channel, "DELETE FROM [NlsLinks].[Metadata].[tblVariable]", errors=FALSE)
delete_results_metadata <- ds_file$sql_delete %>%
Expand All @@ -329,49 +335,15 @@ delete_results_metadata
# d2 <- d[, 1:16]
# RODBC::sqlSave(channel, dat=d, tablename="Enum.tblLURosterGen1", safer=TRUE, rownames=FALSE, append=TRUE)

# ds_file <- ds_file %>%
# dplyr::slice(1)
# Upload metadata tables

# i <- 2L
# OuhscMunge::upload_sqls_odbc(
# d = ds_file$entries[[i]] %>%
# dplyr::mutate_if(is.logical, as.character),
# schema_name = ds_file$schema_name[[i]],
# table_name = ds_file$table_name[[i]],
# dsn_name = dsn_name(study),
# clear_table = F,
# create_table = FALSE,
# convert_logical_to_integer = F
# )

purrr::pmap_int(
list(
ds_file$entries,
ds_file$table_name,
ds_file$schema_name
# seq_len(nrow(ds_file))
),
function( d, table_name, schema_name ) {
message("Writing to table ", table_name)
# OuhscMunge::upload_sqls_odbc(
# d = d,
# schema_name = schema_name,
# table_name = table_name,
# dsn_name = dsn_name(study),
# clear_table = TRUE,
# create_table = FALSE,
# convert_logical_to_integer = TRUE
# )
# browser()
# DBI::dbWriteTable(
# conn = channel,
# name = table_name,
# schema = schema_name,
# value = d,
#
# append = F
# )
RODBC::sqlSave(
channel = channel_rodbc,
dat = d,
Expand Down

0 comments on commit 35ae665

Please sign in to comment.