· 6 years ago · Nov 25, 2019, 09:36 AM
1#' Loads an R dataframe/data table to a table in Netezza
2#'
3#' @param db The database name (string)
4#' @param uid Your database username (string)
5#' @param pwd Your database password (string)
6#' @param r_table_name The dataframe object you want copying
7#' @param netezza_table_name The table name in Netezza you want to create (string)
8#' @param odbc_connection The ROBDBC connection object
9#' @param distribution_column The column that the table should be distributed on (string)
10#' @param varlength For varchar fields in the Netezza table, the length of the field when creating the table - default 50
11#' @param verbsoe Whether to show the create table query - default True (boolean)
12#' @param overwrite_temp_file Whether to overwrite the temporary file that is created when loading the data to Netezza - default True (boolean)
13#' @param delete_file Whether to delete the temporary file that is created when loading the data to Netezza (boolean)
14#' @param drop_table Whether to drop the table you want to write to in Netezza before the create table query is run (boolean)
15#' @return Prints the dimensions of the R table and the Netezza table
16#' @examples
17#' load_to_netezza('sp_marketing','sbradshaw','abc123',r_table,'scott_table',netezza,'user_id',50,T,T,T,T)
18
19load_to_netezza <- function(db="sp_marketing",uid,pwd,r_table_name,netezza_table_name,odbc_connection=netezza,distribution_column,varchar_length=50,verbose=T,overwrite_temp_file=T,delete_file=T,drop_table=T) {
20
21 now <- Sys.time()
22
23 # Checks ------------------------------------------------------------------
24 if (!RODBC:::odbcValidChannel(odbc_connection))
25 stop("Stage 0: first argument is not an open RODBC channel")
26
27 ## Check if the temporary file folder exists
28 folder_exists <- file_test("-d", file.path('/hadoop/used_by_hadoop/hadoop-kn-p2',uid))
29 if(!folder_exists) {stop(paste0("Stage 0 : user folder doesn't exist in /hadoop/used_by_hadoop/hadoop-kn-p2. Run system('mkdir /hadoop/used_by_hadoop/hadoop-kn-p2/",uid,"/')"))}
30
31 # Changes -----------------------------------------------------------------
32 netezza_table_name <- toupper(netezza_table_name)
33
34 # Do stuff ----------------------------------------------------------------
35
36 # Create temporary file
37 if(overwrite_temp_file) {
38 cat(paste0("Stage 1: writing table to text file in /hadoop/used_by_hadoop/hadoop-kn-p2/",uid,"/\n"))
39 write.table(r_table_name,paste0("/hadoop/used_by_hadoop/hadoop-kn-p2/",uid,"/temp.csv"),quote=F,sep="|",row.names = F,col.names=F,na="")
40 }
41
42 # Drop table if exists and specified in the argument
43 if(drop_table) {
44 cat(paste0("Stage 2: dropping table: ",netezza_table_name,"\n"))
45 RODBC::sqlQuery(odbc_connection,paste0("drop table ",db,"..",netezza_table_name," if exists"))
46 }
47
48 # Create table in Netezza -------------------------------------------------
49 # This is a modified version of sqlSave from RODBC
50
51 if (!RODBC:::odbcValidChannel(odbc_connection))
52 stop("first argument is not an open RODBC channel")
53 if (missing(r_table_name))
54 stop("missing parameter")
55 if (!is.data.frame(r_table_name))
56 stop("should be a data frame")
57 if(missing(distribution_column))
58 stop("missing distribution column")
59 if (is.null(netezza_table_name))
60 netezza_table_name <- if (length(substitute(r_table_name)) == 1)
61 as.character(substitute(r_table_name))
62 else as.character(substitute(r_table_name)[[2L]])
63 if (length(netezza_table_name) != 1L)
64 stop(sQuote(netezza_table_name), " should be a name")
65 keys <- -1
66 if (is.logical(rownames) && rownames)
67 rownames <- "rownames"
68 if (is.character(rownames)) {
69 r_table_name <- cbind(row.names(r_table_name), r_table_name)
70 names(r_table_name)[1L] <- rownames
71 if (addPK) {
72 keys <- vector("list", 4L)
73 keys[[4L]] <- rownames
74 }
75 }
76 if (is.logical(colnames) && colnames) {
77 r_table_name <- as.data.frame(rbind(colnames(r_table_name), as.matrix(r_table_name)))
78 }
79 dbname <- RODBC:::odbcTableExists(odbc_connection, netezza_table_name, abort = FALSE)
80 types <- sapply(r_table_name, class)
81 isnum <- (types %in% c("integer","numeric"))
82 islogi <- (types == "logical")
83 isdate <- (types == "Date")
84 colspecs <- rep(paste0("varchar(",varchar_length,")"), length(r_table_name))
85
86 colspecs[isnum] <- "double"
87 colspecs[islogi] <- "varchar(5)"
88 colspecs[isdate] <- "date"
89
90 names(colspecs) <- names(r_table_name)
91
92 # Generate query ----------------------------------------------------------
93 query <- paste("CREATE TABLE", RODBC:::quoteTabNames(odbc_connection, netezza_table_name)," (")
94 colnames <- RODBC:::quoteColNames(odbc_connection, RODBC:::mangleColNames(names(colspecs)))
95 entries <- paste(colnames, colspecs)
96 query <- paste(query, paste(entries, collapse = ", "),sep = "")
97 query <- paste(query, ")", sep = "")
98 query <- paste(query," distribute on (",distribution_column,")",sep="")
99 query <- toupper(query)
100 cat("Stage 3: creating table\n")
101
102 if (verbose)
103 cat("Query: ", query, "\n", sep = "")
104 res <- RODBC:::sqlQuery(odbc_connection, query, errors = FALSE)
105 if (is.numeric(res) && res == -1)
106 stop(paste(RODBC:::odbcGetErrMsg(odbc_connection), collapse = "\n"))
107 invisible(1L)
108
109 # Call nzload (Unix command)
110 cat("Stage 4: loading text file to Netezza table\n")
111 system(paste0('/usr/local/nz/bin/nzload -u ',uid,' -pw ',pwd,' -host nz-mak-kn-p1 -db ',db,' -t ', netezza_table_name,' -delim "|" -df /hadoop/used_by_hadoop/hadoop-kn-p2/',uid,'/temp.csv'))
112
113 if(delete_file) {file.remove(paste0("/hadoop/used_by_hadoop/hadoop-kn-p2/",uid,"/temp.csv"))}
114
115 RODBC::sqlQuery(odbc_connection,paste0("generate statistics on ",db,"..",toupper(netezza_table_name),";"))
116
117 dim_r_object <- dim(r_table_name)
118 dim_sql_table <- c(RODBC::sqlQuery(odbc_connection,paste0("select count(*) as COUNT from ",db,"..",netezza_table_name,";"))$COUNT,
119 RODBC::sqlQuery(odbc_connection,paste0("select count(*) as COUNT from _v_odbc_columns1 where table_schem = '",toupper(uid),"' and table_name = '",toupper(netezza_table_name),"'"))$COUNT)
120
121 cat(paste0("Dimensions of R object: ",dim_r_object[1]," rows, ",dim_r_object[2]," columns\n"))
122 cat(paste0("Dimensions of Netezza table: ",dim_sql_table[1]," rows, ",dim_sql_table[2]," columns\n"))
123
124 difftime(Sys.time(),now)
125}