Persisting Data – part 6: TVPs are for SSISies

note- Nov 29, 2015 – finally got a copy of test scripts on github. see https://github.com/bwunder/data-load-methods for working version of all TSQL shown in this post…
By now we may agree that Table-valued Parameters (TVPs) have an undeniable throughput advantage over row-at-a-time approaches for moving data from application buffers to safe storage. We may even be on the same page with regard to the appeal of TVPs over BulkCopy methods for persistence operations when the sent row count is between 100 and 100,000. However, most of the time data accumulates so slowly that the latency between when the data originated – or came to be as far as the application is concerned anyway – and when the data is safely stored precludes any strategy that would reap this increased throughput benefit of a TVP based approach. In most applications there are but a few important activities that originate data a rates high enough to warrant consideration of sending the data for storage in TVP(s). Some applications may not even have a good candidate for a TVP based persistence implementation.

When rapid data creation result in a secondary storage IO bottleneck on the log device failure to consider set based (TVP) persistence operations is a conscious decision to use the wrong tool for the job. Even SSDs might only forestall the need to refactor to sets. Depends on the application. Better to build a robust solution if it can be done in about the same amount of time.

Previous charts showed T-SQL only testing. The chart below affords an opportunity to compare row-at-a-time persistence operations and TVP based operations from the client application perspective. The RowUpsert and TVPUpsert series are values shown in previous posts. They are included to provide a point of reference.

ForSISSies

The TVPAdapter method above 500 rows looks mighty impressive! It even outshines the native T-SQL TVPUpsert.

At the end of part 2 I asked rhetorically, “Which would you rather be supporting? A process that can load its rows in 10ms each or a process that can load its rows in 4/100ths of a millisecond  (aka 40µ or 40 microseconds) when comparing a non-transactional row-at-a-time upsert’s throughput to that of a transactional table-valued parameter based upsert. The results shown here suggest the desired number now is below 20µ and as well, that earliest reported level of 40µ for the TVPUpsert is mellowed to over 70µ as the row count in the child table of the target hierarchy grows beyond 50MM rows and the number of measured samples collected has increased during testing.

At 500 rows and below it appears that there is a significant overhead added consistently and regardless of the load method. This suggests that all the test is doing in this range is measuring the throughput of of SSIS at low row sizes.

This chart is most interesting if the application has a requirement to insert 1,000 to 100,000 rows and the application is already working with a DataSet.  Below 1,000 rows it is difficult to see a reason to change anything as long as the usage is effective. In part this is because the results for the TVPAdapter are so outstanding that they compress the other results toward the center of the chart. Removing TVPAdapter and TVPUpsert, it is easier to see that there is some important differentiation among the other SSIS methods tested.

ForSISSies2

The TVPCmdProc does indeed have a much nicer profile than any of the Row methods. That is not obvious in the first chart. The RowAdapter method appears to be better than the other row methods. Almost as good as the native upsert and not all that far behind the TVPCmdProc at 5,000 rows and above.

I admit I had initially hoped to use simple UI based package tasks when practical in this comparison. It just didn’t work out that way. In every case tested I was able to realize superior throughput characteristics using the Script Task. Even then, what you have to do to squeeze the bits through the package is rarely straight forward.  For example, in order to achieve the charted throughput it is essential to add a reference to System.Transactions and to wrap each batch or set in a transaction. This need for custom code and external transactional libraries would suggest something of an evolving ‘impedance mismatch’ between SSIS and SQL Server‘s database engine! As is true with EF and LINQ, the root of the mismatch appears to be owed to a fundamental lack of technology interoperability. It is a battle of getters and setters -vs- intersections and unions. I won’t say whether I believe it intentional. I have to wonder  not only why the fastest way to move bits is not the default but why it is necessary to go completely out of model to achieve…

Each of the SSIS based load methods is described below:

RowAdapter – In this package, a DataAdapter is created in a Script Task that sets the T-SQL row-at-a-time upsert as the InsertCommand of a DataTable. Prior to each persistence operation, rows are somehow [be creative] accumulated in an SSIS variable of type Object. This variable could be a design surrogate for any ADO.NET DataSet. As far as I can tell, anything done with the SSIS object variable can be done in .NET code against any DataSet. During the save operation the source data is transferred from the SSIS variable – cast as DataTable[0] of a DataSet – to the database using the familiar DataAdapter.Update. The DataAdapter.UpdateBatchSize is exposed as an ON/OFF option in the code. When the runtime is told to use batches the collection of row-at-a-time upserts are presented to the SQLServer as a single batch. When OFF, each upsert is it’s own batch. The performance difference is remarkable. In the current tests, only the option to batch is used.

public void Main()
{
    TransactionScope scope =
        new TransactionScope(TransactionScopeOption.RequiresNew
                            , new TimeSpan(0, 10, 0));
    using (scope)
    {
        using (SqlConnection conn =
        (SqlConnection)Dts.Connections["target"].AcquireConnection(Dts.Transaction))
        {
            if (conn.State != ConnectionState.Open)
            { conn.Open(); }
            SqlDataAdapter a = CreateTargetAdapter(conn);
            if ((Boolean)Dts.Variables["User::BatchUpdates"].Value)
       { a.UpdateBatchSize = (int)Dts.Variables["User::RowsToPush"].Value; }
            else
            { a.UpdateBatchSize = 1; }

            using (a)
            {
                DataTable t =((DataSet)Dts.Variables["User::TableVariable"].Value).Tables[0];

                foreach (DataRow dr in t.Rows) { dr.SetAdded(); }
                a.Update(t);
                if (conn.State == ConnectionState.Open) { conn.Close(); }
            }
        }
        scope.Complete();
    }
}

private SqlDataAdapter CreateTargetAdapter (SqlConnection conn)
{
    SqlDataAdapter a = new SqlDataAdapter();

    SqlCommand selcmd = new SqlCommand();
    selcmd.CommandText = "SELECT RowNumber"
                            + ", Host"
                            + ", GMTDateTime"
                            + ", Name"
                            + ", Value"
                            + ", FloatingPoint"
                            + ", Boolean"
                            + ", Bytes"
                            + ", LOB"
                            + ", LOBCheckSum"
                       + "FROM dbo.fGetTableVariable(1,1);";
    selcmd.CommandType = CommandType.Text;
    selcmd.Connection = conn;
    a.SelectCommand = selcmd;

    //DataAdapter.InsertCommand
    SqlCommand inscmd = new SqlCommand();
    inscmd.CommandText = "dbo.pRowUpsert";
    inscmd.CommandType = CommandType.StoredProcedure;
    inscmd.Connection = conn;

    SqlParameter RowNumber = new SqlParameter();
    RowNumber.ParameterName = "@RowNumber";
    RowNumber.SqlDbType = SqlDbType.Int;
    RowNumber.Direction = ParameterDirection.Input;
    RowNumber.SourceColumn = "RowNumber";
    inscmd.Parameters.Add(RowNumber);

...<adding rest of columns as params is snipped to save some space>

    a.InsertCommand = inscmd;
    a.InsertCommand.UpdatedRowSource = UpdateRowSource.None;

    return a;
}

RowCmdProc – I had planned to use a transaction around a Foreach Loop Container built on a freshly populated SSIS result set variable as shown below. As the container enumerates rows, each row is loaded to the database by calling the same stored procedure used by the RowUpsertRowLINQ, and RowAdapter methods.

RowCmdProc

Unfortunately, I was not able to overcome the pesky 0xC00291EC SSIS garbage can exception  – failed to acquire connection “target” for row sizes above 100 using this approach. The exception never occurred at 100 rows or less. By removing the transaction it finished eventually but the throughput was very poor and flat (i.e. as bad at 5 rows as at 50,000 rows). By changing the timeout on the connection and the timeout of the Execute SQL Task, it was possible only to make it take longer to timeout and raise the error, but it did nothing to eliminate the exception. Searching the Internet while trying to understand this exception lead me to believe that the 0xC00291EC masks more than one pretty much unrelated problem. It seems to be a pretty common exception. Timeouts maybe in my case but not always. My guess is in the SSIS transaction however setting the TransactionOption of the Foreach Loop Container to Required to assure that the batch will either enlist in an existing transaction or start a new transaction if  one does not exist did not prove useful in eliminating the exception.

Instead I replaced the Execute SQL Task that populates the SSIS variable and the Foreach Loop Container with a Script Task that uses a DataReader built directly from the source data rather than staging in an SSIS variable. This is an interesting scenario from the streaming data perspective, although it packages the time needed to fetch the data into the persistence operation. Perhaps that diminishes the validity of a direct comparison between RowCmdProc and other load methods? TVPCmdProc also uses the DataReader so will raise similar questions.

public void Main()
{
    TransactionScope SourceScope =
         new TransactionScope(TransactionScopeOption.RequiresNew,new TimeSpan(0,10,0));
    using (SourceScope)
    {
        SqlCommand SourceCmd = new SqlCommand();

        SqlConnection SourceConn =
               (SqlConnection)Dts.Connections["initiator"].AcquireConnection(Dts.Transaction);
        SourceCmd.Connection = SourceConn;
        SourceCmd.CommandText = "SELECT [RowNumber]"
                                   + ", [Host]"
                                   + ", [GMTDateTime]"
                                   + ", [Name]"
                                   + ", [Value]"
                                   + ", [FloatingPoint]"
                                   + ", [Boolean]"
                                   + ", [Bytes]"
                                   + ", [LOB]"
                                   + ", [LOBCheckSum] "
                            + "FROM dbo.fGetTableVariable (1, @RowsToPush);";
        SourceCmd.CommandType = CommandType.Text;

        SqlParameter RowsToPush = new SqlParameter();
        RowsToPush.ParameterName = "@RowsToPush";
        RowsToPush.SqlDbType = SqlDbType.Int;
        RowsToPush.Value = (Int32)Dts.Variables["User::RowsToPush"].Value;
        RowsToPush.Direction = ParameterDirection.Input;
        SourceCmd.Parameters.Add(RowsToPush);

        if (SourceConn.State == ConnectionState.Closed){SourceConn.Open();}
        SqlDataReader rdr = SourceCmd.ExecuteReader();

        TransactionScope TargetScope =
            new TransactionScope(TransactionScopeOption.RequiresNew
                                 , new TimeSpan(0, 10, 0));

        DateTime t = DateTime.Now;

        using (TargetScope)
        {
            if (rdr.HasRows)
            {
                while (rdr.Read())
                {
                    SqlCommand TargetCmd = new SqlCommand();

                    SqlConnection TargetConn =
                          (SqlConnection)Dts.Connections["target"].AcquireConnection(Dts.Transaction);
                    TargetCmd.Connection = TargetConn;
                    TargetCmd.CommandText = "dbo.pRowUpsert";
                    TargetCmd.CommandType = CommandType.StoredProcedure;

                    SqlParameter RowNumber = new SqlParameter();
                    RowNumber.ParameterName = "@RowNumber";
                    RowNumber.SqlDbType = SqlDbType.Int;
                    RowNumber.Direction = ParameterDirection.Input;
                    RowNumber.Value = (int)rdr["RowNumber"];
                    TargetCmd.Parameters.Add(RowNumber);

...<adding rest of columns as params is snipped to save some space>

                    if (TargetConn.State == ConnectionState.Closed) { TargetConn.Open(); }
                    TargetCmd.ExecuteNonQuery();
                     if (TargetConn.State == ConnectionState.Open) { TargetConn.Close(); }
                }
                TargetScope.Complete();
            }
            rdr.Close();
        }
        if (SourceConn.State == ConnectionState.Open) { SourceConn.Close(); }
        SourceScope.Complete();

        Dts.TaskResult = (int)ScriptResults.Success;
    }

RowLINQ – Using the same already populated SSIS variable of data rows, a LINQ T-SQL pass-through query calling the row-at-a-time stored procedure is executed. True enough this is not really a LINQ query, but is about the closest to a set based operation as can be done using Linq libraries. It is not clear how data is supposed to move between databases in ORMs.  There is no way to stuff a table-valued parameter through a LINQ library. LINQ does not know what a User Defined Table Type is so cannot use TVPs.  When high volume persistence is needed, a RowLINQ scenario is about as friendly as LINQ can get. When the application is successful and throughput is constrained by the volume of data row-at-a-time changes alternatives must be considered. It is important to recognize that in this usage using ExecuteCommand() turns LINQ into a brute force marshaling API. Use judiciously.

public void Main()
{
    SqlConnection conn = (SqlConnection)Dts.Connections["target"].AcquireConnection(Dts.Transaction);
    try
    {
        TransactionScope scope =
            new TransactionScope(TransactionScopeOption.RequiresNew
                                , new TimeSpan(0,10,0));
        using (scope)
        {
            using (conn)
            {
                if (conn.State != ConnectionState.Open) { conn.Open(); }
                DataContext target = new DataContext(conn);
                foreach (DataRow r in ((DataSet)Dts.Variables["User::TableVariable"].Value).Tables[0].Rows)
                {
                    target.ExecuteCommand("dbo.pRowUpsert {0},{1},<...>,{15}"
                             , r["RowNumber"]
                             , r["Host"]
                             , r["GMTDateTime"]
                             , r["Name"]
                             , r["Value"]
                             , r["FloatingPoint"]
                             , r["Boolean"]
                             , r["Bytes"]
                             , r["LOB"]
                             , r["LOBCheckSum"]
                             , Dts.Variables["User::LoggingLevel"].Value
                             , Dts.Variables["User::LoadMethod"].Value
                             , Dts.Variables["User::MergeParentRows"].Value
                             , Dts.Variables["User::MergeParent_ms"].Value
                             , Dts.Variables["User::InsertChildRows"].Value
                             , Dts.Variables["User::InsertChild_ms"].Value );
                }
            }
            scope.Complete();
        }
    }
    catch (TransactionAbortedException ex)
    {
        throw new Exception("Oops", ex);
    }
}

RowUpsert – This is the direct native T-SQL row-at-a-time upsert stored procedure described in some detail in parts 3, 4 & 5. RowAdapter, RowCmdProc, and RowLINQ all call this stored procedure at upsert.

TVPAdapter – Similar to the RowAdapter method, a DataAdapter is created in a Script Task. Instead of using the RowUpsert stored procedure, the TVPUpsert stored procedure is bound to the InsertCommand of the DataTable.

public void Main()
{
    using (SqlConnection conn =
                      (SqlConnection)Dts.Connections["target"].AcquireConnection(Dts.Transaction))
    {
        if (conn.State != ConnectionState.Open) { conn.Open(); }
        SqlDataAdapter a = CreateTargetAdapter(conn);
        using (a)
        {
            DataTable t =
                ((DataSet)Dts.Variables["User::TableVariable"].Value).Tables[0];
            a.Update(t);
            if (conn.State == ConnectionState.Open) { conn.Close(); }
        }
    }
}

private SqlDataAdapter CreateTargetAdapter (SqlConnection conn)
{
    SqlDataAdapter a = new SqlDataAdapter();

    SqlCommand selcmd = new SqlCommand();
    selcmd.CommandText = "SELECT "
                       + "RowNumber,Host,GMTDateTime,Name,Value,"
                       + "FloatingPoint,Boolean,Bytes,LOB,LOBCheckSum "
                       + "FROM dbo.fGetTableVariable(1,1);";
    selcmd.CommandType = CommandType.Text;
    selcmd.Connection = conn;
    a.SelectCommand = selcmd;

    //DataAdapter.InsertCommand
    SqlCommand inscmd = new SqlCommand();
    inscmd.CommandText = "dbo.pTVPUpsert";
    inscmd.CommandType = CommandType.StoredProcedure;
    inscmd.Connection = conn;

    SqlParameter tvp = new SqlParameter();
    tvp.ParameterName = "@TVP";
    tvp.SqlDbType = SqlDbType.Structured;
    tvp.TypeName = "dbo.TVPTableType";
    tvp.Value = ((DataSet)Dts.Variables["User::TableVariable"].Value).Tables[0];
    tvp.Direction = ParameterDirection.Input;
    inscmd.Parameters.Add(tvp);

    SqlParameter loglvl = new SqlParameter();
    loglvl.ParameterName = "@LoggingLevel";
    loglvl.SqlDbType = SqlDbType.TinyInt;
    loglvl.Value = (byte)Dts.Variables["User::LoggingLevel"].Value;
    loglvl.Direction = ParameterDirection.Input;
    inscmd.Parameters.Add(loglvl);

    SqlParameter method = new SqlParameter();
    method.ParameterName = "@LoadMethod";
    method.SqlDbType = SqlDbType.NVarChar;
    method.Size = 128;
    method.Value = Dts.Variables["User::LoadMethod"].Value;
    method.Direction = ParameterDirection.Input;
    inscmd.Parameters.Add(method);

    a.InsertCommand = inscmd;
    a.InsertCommand.UpdatedRowSource = UpdateRowSource.None;

    return a;
}

TVPCmdProc – Similar to RowCmdProc, a DataReader is used. Instead of enumerating the rows and adding them one-at-a-time, the DataReader is passed as a table valued parameter.

public void Main()
{
    TransactionScope SourceScope =
        new TransactionScope(TransactionScopeOption.RequiresNew
                            , new TimeSpan(0, 10, 0));
    using (SourceScope)
    {
        SqlCommand SourceCmd = new SqlCommand();

        SqlConnection SourceConn =
             (SqlConnection)Dts.Connections["initiator"].AcquireConnection(Dts.Transaction);
        SourceCmd.Connection = SourceConn;
        SourceCmd.CommandText = "SELECT [RowNumber]"
                                   + ", [Host]"
                                   + ", [GMTDateTime]"
                                   + ", [Name]"
                                   + ", [Value]"
                                   + ", [FloatingPoint]"
                                   + ", [Boolean]"
                                   + ", [Bytes]"
                                   + ", [LOB]"
                                   + ", [LOBCheckSum] "
                             + "FROM dbo.fGetTableVariable (1, @RowsToPush);";
        SourceCmd.CommandType = CommandType.Text;

        SqlParameter RowsToPush = new SqlParameter();
        RowsToPush.ParameterName = "@RowsToPush";
        RowsToPush.SqlDbType = SqlDbType.Int;
        RowsToPush.Value = (Int32)Dts.Variables["User::RowsToPush"].Value;
        RowsToPush.Direction = ParameterDirection.Input;
        SourceCmd.Parameters.Add(RowsToPush);

        if (SourceConn.State == ConnectionState.Closed) { SourceConn.Open(); }
        IAsyncResult rdr = SourceCmd.BeginExecuteReader();

        TransactionScope TargetScope =
                     new TransactionScope(TransactionScopeOption.RequiresNew, new TimeSpan(0, 10, 0));
        using (TargetScope)
        {
            SqlCommand TargetCmd = new SqlCommand();

            SqlConnection TargetConn =
                          (SqlConnection)Dts.Connections["target"].AcquireConnection(Dts.Transaction);
            TargetCmd.Connection = TargetConn;
            TargetCmd.CommandText = "dbo.pTVPUpsert";
            TargetCmd.CommandType = CommandType.StoredProcedure;

            SqlParameter tvp = new SqlParameter();
            tvp.ParameterName = "@TVP";
            tvp.SqlDbType = SqlDbType.Structured;
            tvp.TypeName = "dbo.TVPTableType";
            tvp.Value = SourceCmd.EndExecuteReader(rdr);
            tvp.Direction = ParameterDirection.Input;
            TargetCmd.Parameters.Add(tvp);

            SqlParameter LoggingLevel = new SqlParameter();
            LoggingLevel.ParameterName = "@LoggingLevel";
            LoggingLevel.SqlDbType = SqlDbType.TinyInt;
            LoggingLevel.Value = (byte)Dts.Variables["User::LoggingLevel"].Value;
            LoggingLevel.Direction = ParameterDirection.Input;
            TargetCmd.Parameters.Add(LoggingLevel);

            SqlParameter LoadMethod = new SqlParameter();
            LoadMethod.ParameterName = "@LoadMethod";
            LoadMethod.SqlDbType = SqlDbType.NVarChar;
            LoadMethod.Size = 128;
            LoadMethod.Value = Dts.Variables["User::LoadMethod"].Value;
            LoadMethod.Direction = ParameterDirection.Input;
            TargetCmd.Parameters.Add(LoadMethod);

            if (TargetConn.State == ConnectionState.Closed) { TargetConn.Open(); }
            IAsyncResult upd = TargetCmd.BeginExecuteNonQuery();
            //times out? on 100000 rows if sync
            TargetCmd.EndExecuteNonQuery(upd);
            if (TargetConn.State == ConnectionState.Open) { TargetConn.Close(); }
            TargetScope.Complete();
        }

        if (SourceConn.State == ConnectionState.Open) { SourceConn.Close(); }
        SourceScope.Complete();
    }
    Dts.TaskResult = (int)ScriptResults.Success;
}

TVPUpsert – This is the direct native T-SQL table-valued parameter based upsert stored procedure described in some detail in parts 3, 4 & 5. TVPAdapter and TVPCmdProc call this stored procedure at upsert.

Advertisements
This entry was posted in Data Loading, Testing. Bookmark the permalink.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s