das42

Insights

Take a deeper dive into our thoughs about data analytics, technology, strategy, and developments.

Cloud-based Data Warehousing

Preprocessing in Snowflake with Restages

April 22, 2020
Brian Darst

A method to effectively remove stubborn file-formatting errors using only SQL in Snowflake.

Anyone who works with data long enough becomes familiar with oddities and edge cases. Strange type mismatches or unexpected nulls are commonplace. An errant backslash at the end of a string field right before the column delimiter has thrown many burgeoning data engineers for a loop. For these simple cases, most data warehouses have tools in place which work seamlessly, like CSVSerde in Hive or file formats in Snowflake. But inconsistent formatting errors can become more complicated.

Recently I had to ingest some AdTech log files into Snowflake. Some process upstream was writing duplicate sets of header rows into the occasional file. If they occurred somewhere mid-file, only those rows failed and a simple ‘on_error = skip_file_n+1’ would suffice. This works because it allows up to n header rows to fail loading while everything else continues loading, although it might result in false positive records. But if the duplicate header rows occurred immediately after the real header rows, they would be incorrectly written into the table and all subsequent rows would fail. Additionally, there was no apparent pattern to how many times the header rows could be repeated. 

No amount of finagling the file format or initial transformation query in the copy statement fixed the issue, and an upstream fix wasn’t a viable option. So it was up to my team to solve the issue.

The Missing Piece of Snowflake’s Data Pipeline

In Snowflake, copy statements make it simple to stage and ingest data. Additionally, their scalable compute warehouses mean volume/velocity rarely become an issue.

As powerful as Snowflake is, however, there are times when its file-format object or copy transformations are insufficient to clean incoming files. This results in a bad/failed record or file. These types of issues require preprocessing, which would normally mean introducing another element in your architecture. A popular option is to write a custom Spark job using the Snowflake connector. But you may have to requisition a new resource like emr to host the job, or you may have to overburden a temperamental airflow worker node. Why go through the trouble when you can do it all in Snowflake—and do it relatively quickly and cleanly? Our solution is the ‘restage’ technique.

The idea behind it is simple: 

1) Ingest each bad record as a single varchar field, 

2) Use snowSQL to clean and unload the records into an internal Snowflake stage, and then:

3) Re-run the copy statement to ingest/parse the recently cleaned files.

Methodology for the Restage Technique

There are two ways to perform the first two steps of the restage technique. 

Steps 1 & 2: Upload, Clean and Unload

Option A: Validation Mode Rejected Record

First, you want to create a raw data field that you can query, manipulate and clean. The first method does this by capturing the bad rows with Snowflake’s data validation mode. To do this, you need to ingest the data with the original copy into statement and add ‘on_error = continue’. All of the rows with errors will be saved to a validation meta-table, which can be queried with the syntax

The next step queries, cleans, and unloads the bad records into your internal stage. The logic would look something like:

create or replace stage my_db.my_schema.tmp_restage;

Here, the rejected_record column is a value that gets saved to the validation table.

This option works best when only a small proportion of the rows or files fail.

Option B: Single Column Format

You can also create a file format with field_delimiter = none and copy data into a table with one varchar column (I usually alias this column src, short for source). Then use the same logic as above, but instead of rejected_record use src. Then query from <single_column_table> rather than ‘table(validate(<target_table>, job_id => ‘_last’))’

This option is more suitable for when most of the rows or files are failing.

Step 3: Reupload the cleaned data

Once you have the files in the restage, you can now re-run your copy statement. The same transformation logic should now work if you have done your unload query logic correctly. You will just have to change the source to your restage. 

You may be asking yourself: If I already have the bad data in snowflake, why not just write a new query to parse the bad rows into the columns I want and run it inside an insert statement? This method is doable but less than ideal because manually parsing a big long string into a bunch of columns will probably take a lot longer to write than just reusing the copy statement and using a file format to parse things automatically. Processing time will probably be much longer for a complicated parse and an insert statement rather than a simple parse and two copy statements.

Considerations when Restaging Data

So far this method has proven to be flexible and robust in many situations, but I have encountered a handful of gotchas when working it out at first. To help you save time, here is a list of those issues.

First, this method was built to cope with issues in delimited text files (.csv, .tsv, .txt, .log etc.). This technique has not been tested or adjusted to work in other file types. 

Second, if your initial copy statement’s file format removed headers, your new files in your restage won't have them. Thus you may need a new file format to deal with this.

Also, the unload statement makes new file names in the restage. If you want to carry through metadata$filename, you can append it to the single column along with the appropriate delimiter. However I have seen this add a hidden ‘\n’ to the rejected_record string. If this happens, you need to remove or else you will get two rows when you append anything.

Finally, I originally started by trying to log the files with errors using ‘on_error=skip_file’ so I could specifically target those files with the single column process and leave the rest be. But getting this working on my particular workflow orchestration platform (Airflow) required writing a custom Snowflake operator, so I switched to the validation mode option. This did result in some ‘bad’ rows getting written to the target table. I then just had to write one additional query into my jobs to remove these rows. 

And that’s it: the restage technique. It is a functional way to effectively preprocess data using only SQL in Snowflake. When the regular copy function falls short for your data ingestion needs and you can’t or don’t want to resort to external methods, it can be a powerful tool in your data engineering arsenal.

Get Our Newsletter

Ready to talk about your data needs?

Icon-Contact

Contact us to start building a data culture.