From 35ae6651be58da355a2ed3cd37b051d82fa81a5d Mon Sep 17 00:00:00 2001 From: Will Beasley Date: Wed, 17 Oct 2018 11:06:27 -0500 Subject: [PATCH] better metadata upload ref #3 & #4 --- dal/import-79-metadata.R | 175 ++++++++++++++++----------------------- dal/import-97-metadata.R | 52 +++--------- 2 files changed, 85 insertions(+), 142 deletions(-) diff --git a/dal/import-79-metadata.R b/dal/import-79-metadata.R index b84993b..433eed0 100644 --- a/dal/import-79-metadata.R +++ b/dal/import-79-metadata.R @@ -126,33 +126,22 @@ col_types_mapping <- readr::cols_only( ) - - - - - - - - - - -# ---- load-data --------------------------------------------------------------- -start_time <- Sys.time() - -ds_mapping <- readr::read_csv(file.path(directory_in, "_mapping.csv"), col_types=col_types_mapping) -ds_mapping - - ds_file <- lst_col_types %>% tibble::enframe(value = "col_types") %>% dplyr::mutate( - path = file.path(directory_in, paste0(name, ".csv")), + path = file.path(directory_in, paste0(name, ".csv")), + exists = purrr::map_lgl(path, file.exists) # col_types = purrr::map(name, function(x) lst_col_types[[x]]), - exists = purrr::map_lgl(path, file.exists) ) %>% dplyr::select(name, path, dplyr::everything()) ds_file +# ---- load-data --------------------------------------------------------------- +start_time <- Sys.time() + +ds_mapping <- readr::read_csv(file.path(directory_in, "_mapping.csv"), col_types=col_types_mapping) +ds_mapping + testit::assert("All metadata files must exist.", all(ds_file$exists)) ds_entries <- ds_file %>% @@ -174,14 +163,17 @@ rm(directory_in) # rm(col_types_tulsa) # ---- tweak-data -------------------------------------------------------------- # OuhscMunge::column_rename_headstart(ds_county) #Spit out columns to help write call to `dplyr::rename()`. +if( shallow_only ) { + ds_mapping <- ds_mapping %>% + dplyr::filter(.data$shallow) +} +ds_mapping ds_file <- ds_file %>% - dplyr::left_join( ds_mapping, by=c("name"="table_name")) %>% + dplyr::inner_join(ds_mapping, by=c("name"="table_name")) %>% dplyr::mutate( table_name = paste0("tbl", name), sql_delete = glue::glue("DELETE FROM {schema_name}.{table_name};") - # table_name = paste0(schema_name, ".tbl", name), - # sql_delete = paste0("DELETE FROM ", table_name) ) %>% dplyr::left_join( ds_entries %>% @@ -243,19 +235,56 @@ ds_enum %>% dplyr::pull(enum_cs) %>% cat() -# ---- verify-values ----------------------------------------------------------- +# ---- verify-values-deep ----------------------------------------------------------- # Sniff out problems -# testit::assert("The month value must be nonmissing & since 2000", all(!is.na(ds$month) & (ds$month>="2012-01-01"))) -# testit::assert("The county_id value must be nonmissing & positive.", all(!is.na(ds$county_id) & (ds$county_id>0))) -# testit::assert("The county_id value must be in [1, 77].", all(ds$county_id %in% seq_len(77L))) -# testit::assert("The region_id value must be nonmissing & positive.", all(!is.na(ds$region_id) & (ds$region_id>0))) -# testit::assert("The region_id value must be in [1, 20].", all(ds$region_id %in% seq_len(20L))) -# testit::assert("The `fte` value must be nonmissing & positive.", all(!is.na(ds$fte) & (ds$fte>=0))) -# # testit::assert("The `fmla_hours` value must be nonmissing & nonnegative", all(is.na(ds$fmla_hours) | (ds$fmla_hours>=0))) -# -# testit::assert("The County-month combination should be unique.", all(!duplicated(paste(ds$county_id, ds$month)))) -# testit::assert("The Region-County-month combination should be unique.", all(!duplicated(paste(ds$region_id, ds$county_id, ds$month)))) -# table(paste(ds$county_id, ds$month))[table(paste(ds$county_id, ds$month))>1] +if( !shallow_only ) { + d_extract_source <- ds_file %>% + dplyr::filter(name=="LUExtractSource") %>% + dplyr::pull(entries) %>% + purrr::flatten_df() + + d_item <- ds_file %>% + dplyr::filter(name=="item") %>% + dplyr::pull(entries) %>% + purrr::flatten_df() + + checkmate::assert_integer( d_item$ID , lower=1, upper=2^15 , any.missing=F, unique=T) + checkmate::assert_character(d_item$Label , pattern="^\\w+" , any.missing=F, unique=T) + + + d_variable <- ds_file %>% + dplyr::filter(name=="variable") %>% + dplyr::pull(entries) %>% + purrr::flatten_df() %>% + dplyr::mutate( + item_found = (ExtractSource %in% d_extract_source$ID), + extract_found = (Item %in% d_item$ID), + unique_index = paste(Item, Generation, SurveySource, SurveyYear, LoopIndex) + ) %>% + dplyr::group_by(unique_index) %>% + dplyr::mutate( + unique_index_violation = (1L < n()), + variables_codes = paste(VariableCode, collapse = "; ") + ) %>% + dplyr::ungroup() + + + pattern_unique_index <- "^\\d{1,5} \\d \\d (\\d|\\d{4}) \\d{1,3}$" + checkmate::assert_character(d_variable$VariableCode , pattern="^[A-Z]\\d{7}$" , any.missing=F, unique=T) + checkmate::assert_integer( d_variable$Item , lower=0 , any.missing=F) + checkmate::assert_logical( d_variable$item_found , any.missing=F) + testit::assert("All items referenced from the variables should be in the item table.", all(d_variable$item_found)) + testit::assert("All extract sources referenced from the variables should be in the item table.", all(d_variable$extract_found)) + checkmate::assert_character(d_variable$unique_index , pattern=pattern_unique_index , any.missing=F, unique=T) + + # d_variable %>% + # dplyr::filter(unique_index_violation) + # + # d_variable %>% + # dplyr::filter(!grepl(pattern_unique_index, unique_index)) + + rm(d_item, d_variable) +} # ---- specify-columns-to-upload ----------------------------------------------- @@ -278,12 +307,14 @@ DBI::dbGetInfo(channel) channel_rodbc <- open_dsn_channel_rodbc(study) RODBC::odbcGetInfo(channel_rodbc) -# Clear process tables -delete_results_process <- ds_table_process$sql_truncate %>% - purrr::set_names(ds_table_process$table_name) %>% - rev() %>% - purrr::map(DBI::dbGetQuery, conn=channel) -delete_results_process +if( !shallow_only ) { + # Clear process tables + delete_results_process <- ds_table_process$sql_truncate %>% + purrr::set_names(ds_table_process$table_name) %>% + rev() %>% + purrr::map(DBI::dbGetQuery, conn=channel) + delete_results_process +} # Delete metadata tables # delete_result <- RODBC::sqlQuery(channel, "DELETE FROM [NlsLinks].[Metadata].[tblVariable]", errors=FALSE) @@ -312,6 +343,7 @@ purrr::pmap_int( ds_file$schema_name ), function( d, table_name, schema_name ) { + message("Writing to table ", table_name) # browser() # DBI::dbWriteTable( # conn = channel, @@ -337,68 +369,7 @@ purrr::pmap_int( append = TRUE ) } -) #%>% -# purrr::set_names(ds_file$table_name) -# a <- ds_file$entries[[15]] -# table(a$ID) - -# RODBC::sqlSave( -# channel = channel_rodbc, -# dat = ds_file$entries[[16]][, ], -# tablename = "Metadata.tblVariable", -# safer = TRUE, # Don't keep the existing table. -# rownames = FALSE, -# append = TRUE -# ) - -# DBI::dbWriteTable( -# conn = channel, -# name = DBI::Id(catalog="NlsyLinks79", schema="Metadata", table="tblv"), -# value = ds_file$entries[[15]][1:10, 2], -# overwrite = FALSE, -# append = F -# ) - -# for( i in seq_len(nrow(ds_file)) ) { -# message(glue::glue("Uploading from `{ basename(ds_file$path)[i]}` to `{ds_file$table_name[i]}`.")) -# -# d <- ds_file$entries[[i]] -# print(d) -# -# # RODBC::sqlQuery(channel, ds_extract$sql_truncate[i], errors=FALSE) -# -# # d_peek <- RODBC::sqlQuery(channel, ds_extract$sql_select[i], errors=FALSE) -# # -# # missing_in_extract <- setdiff(colnames(d_peek), colnames(d)) -# # missing_in_database <- setdiff(colnames(d), colnames(d_peek)) -# # -# # d_column <- tibble::tibble( -# # db = colnames(d), -# # extract = colnames(d_peek) -# # ) %>% -# # dplyr::filter(db != extract) -# # -# # RODBC::sqlSave( -# # channel = channel, -# # dat = d, -# # tablename = ds_extract$table_name[i], -# # safer = TRUE, # Don't keep the existing table. -# # rownames = FALSE, -# # append = TRUE -# # ) %>% -# # print() -# -# OuhscMunge::upload_sqls_rodbc( -# d = d, -# table_name = ds_file$table_name[i] , -# dsn_name = "local-nlsy-links", -# clear_table = T, -# create_table = F -# ) -# -# -# message(glue::glue("{format(object.size(d), units='MB')}")) -# } +) # Close channel DBI::dbDisconnect(channel); rm(channel) diff --git a/dal/import-97-metadata.R b/dal/import-97-metadata.R index aa48e7c..98effd8 100644 --- a/dal/import-97-metadata.R +++ b/dal/import-97-metadata.R @@ -140,8 +140,7 @@ ds_file start_time <- Sys.time() ds_mapping <- readr::read_csv(file.path(directory_in, "_mapping.csv"), col_types=col_types_mapping) - - +ds_mapping testit::assert("All metadata files must exist.", all(ds_file$exists)) @@ -164,7 +163,6 @@ rm(directory_in) # rm(col_types_tulsa) # ---- tweak-data -------------------------------------------------------------- # OuhscMunge::column_rename_headstart(ds_county) #Spit out columns to help write call to `dplyr::rename()`. - if( shallow_only ) { ds_mapping <- ds_mapping %>% dplyr::filter(.data$shallow) @@ -265,7 +263,8 @@ if( !shallow_only ) { ) %>% dplyr::group_by(unique_index) %>% dplyr::mutate( - unique_index_violation = (1L < n()) + unique_index_violation = (1L < n()), + variables_codes = paste(VariableCode, collapse = "; ") ) %>% dplyr::ungroup() @@ -278,6 +277,12 @@ if( !shallow_only ) { testit::assert("All extract sources referenced from the variables should be in the item table.", all(d_variable$extract_found)) checkmate::assert_character(d_variable$unique_index , pattern=pattern_unique_index , any.missing=F, unique=T) + # d_variable %>% + # dplyr::filter(unique_index_violation) + # + # d_variable %>% + # dplyr::filter(!grepl(pattern_unique_index, unique_index)) + rm(d_item, d_variable) } @@ -302,14 +307,15 @@ DBI::dbGetInfo(channel) channel_rodbc <- open_dsn_channel_rodbc(study) RODBC::odbcGetInfo(channel_rodbc) -if( !shallow_only ){ -# Clear process tables +if( !shallow_only ) { + # Clear process tables delete_results_process <- ds_table_process$sql_truncate %>% purrr::set_names(ds_table_process$table_name) %>% rev() %>% purrr::map(DBI::dbGetQuery, conn=channel) delete_results_process } + # Delete metadata tables # delete_result <- RODBC::sqlQuery(channel, "DELETE FROM [NlsLinks].[Metadata].[tblVariable]", errors=FALSE) delete_results_metadata <- ds_file$sql_delete %>% @@ -329,49 +335,15 @@ delete_results_metadata # d2 <- d[, 1:16] # RODBC::sqlSave(channel, dat=d, tablename="Enum.tblLURosterGen1", safer=TRUE, rownames=FALSE, append=TRUE) -# ds_file <- ds_file %>% -# dplyr::slice(1) # Upload metadata tables - -# i <- 2L -# OuhscMunge::upload_sqls_odbc( -# d = ds_file$entries[[i]] %>% -# dplyr::mutate_if(is.logical, as.character), -# schema_name = ds_file$schema_name[[i]], -# table_name = ds_file$table_name[[i]], -# dsn_name = dsn_name(study), -# clear_table = F, -# create_table = FALSE, -# convert_logical_to_integer = F -# ) - purrr::pmap_int( list( ds_file$entries, ds_file$table_name, ds_file$schema_name - # seq_len(nrow(ds_file)) ), function( d, table_name, schema_name ) { message("Writing to table ", table_name) - # OuhscMunge::upload_sqls_odbc( - # d = d, - # schema_name = schema_name, - # table_name = table_name, - # dsn_name = dsn_name(study), - # clear_table = TRUE, - # create_table = FALSE, - # convert_logical_to_integer = TRUE - # ) - # browser() - # DBI::dbWriteTable( - # conn = channel, - # name = table_name, - # schema = schema_name, - # value = d, - # - # append = F - # ) RODBC::sqlSave( channel = channel_rodbc, dat = d,