Monday, March 28, 2011

A One-Package, Generic SSIS Staging Process


I recently worked on a project that was flat file and staging intensive. By intensive I mean we had essentially 1.5 ETL developers and over 50 flat files to stage. We also had a very aggressive timeline. Two data marts with source data coming from 5 totally disparate and disconnected (hence the flat files) subsidiaries/source systems all over the world (think currency conversion, translation, master data management, etc.) and only three and a half months to get it done.

Was I going to create a package to stage each flat file? Ummm . . . no! So what was I to do? I was going to create a single, metadata driven package that would loop through a data set and populate my staging tables one by one. Below is a screenshot of all the components necessary to make this work. There is no Data Flow.


Before going into the details there are a few things to keep in mind.

  1. This solution is based on text files. It can easily be tailored to read from relational sources and also non-relational sources with a little more tweaking.
  2. What makes this design possible is a Script Task that alleviates the need for custom data flows for each extract file/table to staged.
  3. The solution assumes that the staging tables can hold more than one day's/load's worth of data and therefore adds an ExtractFileID column to each staging table and to the source data as it is being loaded.
  4. For the script task to work as-is it is necessary to create a staging table for each data source that will be staged. Each staging table must meet the following criteria:
    • The staging table must contain column names that match the data source column names exactly.
    • The staging table columns must have data types that are appropriate for the incoming data. If they are character data types they must be large enough that they will not truncate any incoming data.  
The metadata that drives the solution might be specific to your project so I won't go into it in this post. What you'll need to do is create the metadata tables necessary to create a data set that can by used by a Foreach Loop container that will loop through each file or table that needs to be staged.

However, you can test the Script Task by hard-coding a single file's metadata within the Script Task. That is exactly what I did to create the script. Once the script was working as expected I created the necessary metadata tables and data and passed the values into the script to make it more dynamic. Below is the method that does the bulk of the work (no pun intended). The entire script can be downloaded here.



Private Sub BulkCopyDelimitedFile(ByVal batchSize As Integer, _
            ByVal columnDelimiter As String, _
            ByVal extractFileID As Int32, _
            ByVal extractFileFullPath As String, _
            ByVal stageTableName As String)

        Dim SqlConnectionManager As Microsoft.SqlServer.Dts.Runtime.ConnectionManager

        'Set a connection manager object equal to the connection manager named "Stage"
        SqlConnectionManager = Dts.Connections("Stage")

        'Since the "Stage" connection manager is of type OLEDB we need to modify it to make it compatible with an ADO.NET connection
        Dim dbConn As SqlConnection = New SqlConnection(SqlConnectionManager.ConnectionString.Replace("Provider=SQLNCLI10.1;", ""))

        'Create a new StreamReader object and pass in the path of the file to be read in
        Dim sr As StreamReader = New StreamReader(extractFileFullPath)

        'Read in the first line of the file
        Dim line As String = sr.ReadLine()

        'Create an array of strings and fill it with each column within the extract file by using the Split function
        Dim strArray As String() = line.Split(columnDelimiter)

        'Create a DataTable object
        Dim dt As DataTable = New DataTable()

        'Create a DataRow object
        Dim row As DataRow

        'Open the database connection
        dbConn.Open()

        'Create a SQLTransaction object in which to execute the Bulk Copy process so that if it fails, all writes are rolled back
        Dim bulkCopyTransaction As SqlTransaction = dbConn.BeginTransaction()

        'Instantiate our SqlBulkCopy object and set appropriate properties
        Dim bc As SqlBulkCopy = New SqlBulkCopy(dbConn, SqlBulkCopyOptions.Default, bulkCopyTransaction)

        'Set the BulkCopy destination table name equal to the staging table name provided by the Foreach Loop Container
        bc.DestinationTableName = stageTableName

        'Set the BulkCopy batch size equal to the size contained in the metadata provided by the Foreach Loop Container
        bc.BatchSize = batchSize

        'For each column that was found in the extract file
        For Each columnName As String In strArray
            'Add a column in the data table
            dt.Columns.Add(New DataColumn(columnName))

            'Add a column mapping in the SqlBulkCopy object
            bc.ColumnMappings.Add(columnName, columnName)
        Next

        'Add the ExtractFileID column to the data table since it doesn't exist in the extract file and wasn't added in the previous For Each loop
        dt.Columns.Add(New DataColumn("ExtractFileID", System.Type.GetType("System.String")))

        'Add the ExtractFileID column mapping since it doesn't exist in the extract file and wasn't added in the previous For Each loop
        bc.ColumnMappings.Add("ExtractFileID", "ExtractFileID")

        'Move the the first row in the extract file after the header
        line = sr.ReadLine()

        Dim rowCount As Integer

        'Loop through all rows in the extract file
        While Not line = String.Empty
            rowCount += 1

            'Create a new data table row to store the extract file's data in
            row = dt.NewRow()

            'Add all column values to the row
            row.ItemArray = line.Split(columnDelimiter)

            'Add the ExtractFileId 
            row("ExtractFileID") = extractFileID

            'Add the newly created row to the data table
            dt.Rows.Add(row)

            'Move to the next row in the extract file
            line = sr.ReadLine()
        End While
        

        Try
            'Write the data table to the staging table
            bc.WriteToServer(dt)

            'If successful, commit the transaction
            bulkCopyTransaction.Commit()
        Catch ex As Exception
            'If an exception occurs, rollback the transaction
            bulkCopyTransaction.Rollback()

            'Set the Task result to Failure
            Dts.TaskResult = ScriptResults.Failure


            Throw ex
        End Try

        'Close the database connection
        dbConn.Close()

        'Close the BulkCopy object
        bc.Close()

        'Set the SSIS metadata variable equal to the number of rows loaded
        Dts.Variables("ExtractActualRowCount").Value = rowCount
    End Sub


Download script

5 comments:

  1. I added a few updates.
    1) The script would fail if the text file didn't have any data rows. This is now resolved by a change in the loop structure.
    2) The Catch block did not have a Throw in it resulting in unreported errors. A Throw has been added to resolve this.

    The changes have been made in the blog text as well as in the downloadable source code file.

    ReplyDelete
  2. Really nice solution, I have literally this exact problem to solve, but on an even larger scale. Thank you.

    ReplyDelete
  3. Nice, But have you seen: http://www.cozyroc.com/ssis/dynamic-data-flow

    ReplyDelete
    Replies
    1. Yes, but it appears to be ~$2400 per server...

      Delete
    2. Or $399 per year for two licences. Not a bad price considering the amount of extra functions you can also include, such as parallel processing, lookups, connections to everything... :)

      Delete