Processing a Large Flat File Message with BizTalk and the SqlBulkInsert Adapter

Below discusses the second demo for a presentation that I did at the Business Process Integration & Workflow Conference in Redmond during the week of Oct 4.

The title of the presentation was entitled:
Handling Large Messages in BizTalk

The first demo that I did for the presentation can be found here:
Processing a large message and a faster, less CPU intensive splitter pattern.

This entry compares two different methods for processing large flat files in BizTalk.
The size of the flat files being tested ranged in size from: 8MB to 201MB.

At the end of this entry is the download for the SqlBulkInsert Adapter code and the BizTalk Test Project to
perform the tests.

A section of the flat file message being processed is as below. About half of the columns are cut off:

The above is the flat file representation of the XML Production orders message that were used in the first Demo.
Each line of the flat file message contains a production order. The production order data (columns) are separated by commas and each production order line (record) is separated by a Carriage Return/Line Feed.

To process the flat file message, two different implementations were used. In both implementations, the flat file is pre-processed so that it is in a format for further processing -> For example mapping or splitting.


Implementation One: Processing with a Flat File Schema and Custom Receive Pipeline

This implementation uses the standard method of using a XSD Schema with flat file extensions to represent the above flat file message. A custom receive pipeline utilizing a flat file disassembler transforms the flat file into an internal XML message. This XML message will be published into the MessageBox database. If you have not processed a flat file in BizTalk before, try this example in the SDK.

This is just a short recap of how this implementation works and is really for the benefit for those not familiar with BizTalk or who have never processed a flat file in BizTalk.

a) Flat File is picked up by a Receive Location.
b) Receive Location is configured to use a Custom Receive Pipeline
c) Custom Receive Pipeline will convert (disassemble) the flat file, into an internal XML message as below:


d) This XML Message is then published into the MessageBox database
e) Once the XML Message is published into the MessageBox database, any number of Orchestrations or send Ports can subscribe to this message and process this message. This message could then be mapped into another message or split into separate messages as in the first demo.


Implementation Two: Processing with the SqlBulkInsert Adapter

From a performance perspective there are three areas where the above implementation can be improved. This is especially true, if the incoming flat files messages are large greater than 20 MB.
Areas of improvement are listed below:

1) The pipeline processing time.
2) CPU utilization is at 100% for pipeline processing.
3) Flat files when converted into XML files can double or triple in size. For example when a 201MB flat file production order message is converted into an internal XML message, the size of this internal XML message expands to 767MB. This large 767MB message is then published into the MessageBox database. The size of the XML message is greater because of added tags, elements and attributes.

Therefore one approach would be to skip the pipeline processing for the large flat file message. Pipeline processing is a necessary and great feature in BizTalk, but will be skipped in this implementation to increase performance.

A Custom Adapter (SqlBulkInsert Adapter) will be used to pre-process the flat file.
This adapter can only be used with a One-Way Receive Port. The property pages of the Receive Location are as below:


How this implementation works:

a) This adapter behaves similar to the out of the box File Adapter used in a Receive Location. It will poll a directory on a Hard Drive for files to be picked up and processed.
 
b) The FileReceiveFolder Property is set to a directory on a hard drive.
For example -> C:\BTSBulkLoad\Files\ReceiveForSqlBulkLoad
Flat Files dropped into this folder will be picked up to be processed.

c) The FileMask Property is set so the adapter will only pick up only files with the mask.
For example -> *.txt

d) The FileErrorFolder Property will be populated with error files.
For example -> C:\BTSBulkLoad\Files\FileErrorFolder.
If some of the rows(data) in the flat file message are malformed, these will not be processed , but will be placed in a file for viewing. This is analogous to a recoverable interchange. Rows in the flat file that are correct in structure will be processed.

For example, if two of the rows in the flat file cannot be processed (Missing Columns, missing delimiters etc) a file is produced with the row(s) that could not be processed.

11506,10020,TST GF 01 GRD 01,7045936818561,20.25,20.25,58.25,58.250359010995972,3 Inch Core,,,
11665,10020,TST GF 01 GRD 01,36,23584,23584,1432.7045936818561,1432.7045936818561,20.25,20.25,5

Additionally another file is produced with a more detailed explanation of why the row(s) could not be processed as below:

Row 16 File Offset 3822 ErrorFile Offset 0 - HRESULT 0x80004005
Row 25 File Offset 6422 ErrorFile Offset 493 - HRESULT 0x80004005

The Max Number of rows with errors can be configured (explained below). If this number is exceeded, the operation will fail as a whole.

Once a File has been picked up, the SqlBulkInsert adapter will use the below properties to delegate the processing of the flat file to a Sql server stored procedure. The large File will not be submitted to the BizTalk engine to be processed. Therefore the BizTalk pipeline processing will be skipped for the large file.


e) The SqlConnectionString Property is set with the connection string to the sql database that will process the flat file.
For example -> packet size=4096;integrated security=SSPI;data source="(local)";persist security info=False;initial catalog=BTSSqlBulkLoad 

f) The SqlStoredProcedureName Property is set to the name of the stored procedure that will process the flat file.
For example -> LoadInsertProductionOrders

The declaration of the stored procedure is as below:

CREATE Procedure [dbo].[LoadInsertProductionOrders]

@pathAndFileName  varchar(500),
@pathAndErrorFileName varchar(500)


g) The SqlStoredProcFileNameParameter property will set with the name of  the parameter that accepts the file to be processed.
For example -> @pathAndFileName

At run time when the adapter picks up the file, it will generate the following to populate the @pathAndFileName stored procedure parameter:
For example -> C:\BTSBulkLoad\Files\ReceiveForSqlBulkLoad\FlatFileToProcess.txt

Note: Only a pointer to the flat file to be processed is passed to the stored procedure.
Note: UNC paths should also work, but have not actually tried.


h) The SqlStoredProcFileErrorParameter property is set to the name of the parameter in the stored procedure
to indicate the location to where the error files should be placed. See part d)
For example -> @pathAndErrorFileName

At run time when the adapter picks up the file, it will set the @pathAndErrorFileName stored procedure parameter to a Value.
For example -> C:\BTSBulkLoad\Files\FileErrorFolder


At run time when the File Receive portion of the adapter has picked up a file to process, the following code is then called in the adapter to invoke the stored procedure:


internal static string callBulkInsertStoredProcedure
   (string connectionString,
   int commandTimeOut,
   string storedProcedureName,
   string fileName,
   string fileErrorName,
   string fileNameParameterName,
   string fileErrorNameParameterName)
{
 System.Text.StringBuilder sbForReturnedXML = new System.Text.StringBuilder();
 SqlConnection sqlConnect = new SqlConnection();
 sqlConnect.ConnectionString = connectionString;
 SqlCommand sqlCommand = new SqlCommand();
 sqlCommand.CommandType = System.Data.CommandType.StoredProcedure;
 sqlCommand.CommandText = storedProcedureName;
 
 // Dynamically set the name of the parameters that should be called in the custom stored procedure.
 // This is because each stored procedure may have parameters with different names.
 sqlCommand.Parameters.Add(new System.Data.SqlClient.SqlParameter(fileNameParameterName, System.Data.SqlDbType.VarChar,500));
 sqlCommand.Parameters.Add(new System.Data.SqlClient.SqlParameter(fileErrorNameParameterName, System.Data.SqlDbType.VarChar, 500));
 sqlCommand.Parameters[fileNameParameterName].Value = fileName;
 sqlCommand.Parameters[fileErrorNameParameterName].Value = fileErrorName;
   
 sqlCommand.Connection = sqlConnect;
 sqlCommand.CommandTimeout = commandTimeOut;
 System.Xml.XmlReader xmlReader = null;
 sqlConnect.Open();

 try
 {
  xmlReader = sqlCommand.ExecuteXmlReader();
  xmlReader.MoveToContent();
  string xmlProductionOrderIdNode;
    
  while (!xmlReader.EOF)
  {
   xmlProductionOrderIdNode = xmlReader.ReadOuterXml();
   sbForReturnedXML = sbForReturnedXML.Append(xmlProductionOrderIdNode);
  }

  }
  catch (System.Exception ex)
  {
   System.Diagnostics.Debug.WriteLine("Error Occured in Adapter " + ex.Message);
   throw ex;
  }

  return sbForReturnedXML.ToString();
 } 
}

The above is just basic ADO.NET code to call the stored procedure that will return back a small
XML message generated by the stored procedure. This XML message can contain any information. This small XML message will
be published into the MessageBox database. (More about this below).

The stored procedure configured in the receive location is executed. This is where the real processing of the flat file takes place.
Note: This stored procedure could be altered in any way to process the flat file. The below is just an example.

CREATE Procedure [dbo].[LoadInsertProductionOrders]

@pathAndFileName  varchar(500),
@pathAndErrorFileName varchar(500)

-- This stored procedure will take the passed path of the flat file (produced by the the adapter) to process.
-- For example c:\directory\FlatFile.txt.
-- The code in this procedure will then Bulk Insert this flat file into a Sql Server table.
-- This stored procedure was originally created in Sql 2000, but was moved to Sql 2005
-- for the following reasons:

-- a) Sql 2005 has introduced a new argument for Bulk Insert Clause [ [ , ] ERRORFILE = 'file_name' ] 
-- This will place malformed rows into an error file.

-- b) Sql 2005 has introduced Try Catch Blocks in T-Sql. In Sql 2000, if there were any malformed
-- rows in the flat file, an error would be raised to the .Net Helper Component and the operation would cease.
-- With the Try Catch Block in Sql 2005, no error is raised to the .Net Helper Conponent and processing continues.


As

-- Flat File rows will be inserted into this temp table.
-- Note: Could of used a Format File  file, but this temp table matches the structure of the flat file.
-- Eventually, the rows in this temp table will be inserted into a permanent table.

 

CREATE TABLE #tempInsertNewProductionOrders (
 [trk_unit_id] [int] NOT NULL ,
 [pro_product_id] [int] NOT NULL ,
 [actual_grade] [varchar] (80) COLLATE SQL_Latin1_General_CP1_CI_AS NULL ,
 [actual_basis_weight] [float] NULL ,
 [actual_length_at_turnup] [float] NULL ,
 [actual_length] [float] NULL ,
 [actual_weight_at_turnup] [float] NULL ,
 [actual_weight] [float] NULL ,
 [required_width] [float] NULL ,
 [actual_width] [float] NULL ,
 [required_diameter] [float] NULL ,
 [actual_diameter] [float] NULL ,
 [actual_core] [varchar] (80) COLLATE SQL_Latin1_General_CP1_CI_AS NULL ,
 [actual_property_1] [varchar] (255) COLLATE SQL_Latin1_General_CP1_CI_AS NULL ,
 [actual_property_2] [varchar] (255) COLLATE SQL_Latin1_General_CP1_CI_AS NULL ,
 [actual_property_3] [varchar] (255) COLLATE SQL_Latin1_General_CP1_CI_AS NULL ,
 [update_timechain] [smallint] NULL ,
 [update_time] [varchar] (30) COLLATE SQL_Latin1_General_CP1_CI_AS NULL ,
 [update_user_id] [int] NULL ,
 [position_index] [int] NOT NULL ,
 [comment] [varchar] (255) COLLATE SQL_Latin1_General_CP1_CI_AS NULL ,
 [required_length] [float] NULL ,
 [required_weight] [float] NULL ,
 [actual_mfg_grade] [varchar] (80) COLLATE SQL_Latin1_General_CP1_CI_AS NULL ,
 [actual_moisture] [float] NULL ,
 [actual_caliper] [float] NULL ,
 [actual_colour] [varchar] (80) COLLATE SQL_Latin1_General_CP1_CI_AS NULL ,
 [actual_finish] [varchar] (80) COLLATE SQL_Latin1_General_CP1_CI_AS NULL ,
 [set_number] [int] NULL ,
 [position_percent] [int] NOT NULL ,
 [tare_weight] [float] NULL ,
 [user_scale_weight] [float] NULL ,
 [wire_side] [char] (3) COLLATE SQL_Latin1_General_CP1_CI_AS NOT NULL ,
 [trkc_length_adjust_type_id] [int] NULL ,
 [actual_compression] [float] NULL ,
 [actual_hardness] [float] NULL ,
 [sch_prod_order_id] [int] NULL ,
 [trk_set_item_id] [int] NULL ,
 [trk_unit_id_package] [int] NULL )


declare @return int
declare @rowcount int, @error int

declare @sqlStatement varchar(8000)


-- Create the dynamic sql that contains the Bulk Insert Statement to Bulk Load the flat file into the Temp Table
-- An Argument could of been used to control the max number of error rows. As below:
-- MAXERRORS [ = max_errors ]
-- MAXERRORS, Specifies the maximum number of errors that can occur before the bulk copy operation is canceled.
-- Each row that cannot be imported by the bulk copy operation is ignored and counted as one error. If max_errors is not specified, the default is 10.


set @sqlStatement = 'BULK INSERT #tempInsertNewProductionOrders'
set @sqlStatement = @sqlStatement +    ' FROM ' +  ''''+ ltrim(rtrim(@pathAndFileName))  +''''
set @sqlStatement = @sqlStatement +  ' WITH '
set @sqlStatement = @sqlStatement +     '  ( '
set @sqlStatement = @sqlStatement +  ' FIELDTERMINATOR = '  +   '''' + ',' + '''' + ','
set @sqlStatement = @sqlStatement  + ' ROWTERMINATOR = ' +  ''''  +  '\n'   + '''' + ','
set @sqlStatement = @sqlStatement  + ' ERRORFILE = ' +  ''''  +  ltrim(rtrim(@pathAndErrorFileName ))   + ''''
set @sqlStatement = @sqlStatement  +   ')'


-- Temp Table for Sending back results

create table #results (rowsprocessed int,
           ErrorCode int,
           BatchIdentifier varchar(50))

-- Create a unique GUID, that will be stored in each row.
-- This is to differentiate the rows from other batches.
-- This GUID, will be returned back to the Adapter in a small XML message:

declare @newid uniqueidentifier
set @newid = newid()
declare @UniqueIdentifier varchar(50)
set @UniqueIdentifier = replace(convert(varchar(50), @newid),'-','')

 

Begin Try
 -- Excecute the Bulk Insert Statement
 exec (@sqlStatement)

 -- Insert the rows from the temp table into the Permanet Table.
 -- For each row, also set the Batch Guid.
 Insert into InsertNewProductionOrders
 Select  #tempInsertNewProductionOrders.*,@UniqueIdentifier from #tempInsertNewProductionOrders

 Select @rowcount = @@rowcount,@error = @@error 
 Insert into #results values(@rowcount,@error,@UniqueIdentifier)
  
End Try
Begin Catch

    -- Catch any errors and re-raise 

    DECLARE @ErrorMessage NVARCHAR(400);
    DECLARE @ErrorSeverity INT;
    DECLARE @ErrorState INT;


    SELECT @ErrorMessage = ERROR_MESSAGE();
    SELECT @ErrorSeverity = ERROR_SEVERITY();
    SELECT @ErrorState = ERROR_STATE();

    RAISERROR (@ErrorMessage, -- Message text.
               @ErrorSeverity, -- Severity.
               @ErrorState -- State.
               );
End Catch;

-- Send back a small informational XML message to the Adapter.
-- This XML message will be published in the Messagebox Database:

WITH XMLNAMESPACES ( DEFAULT 'http://SqlBulkInsert')
Select rowsprocessed,  ErrorCode,BatchIdentifier,
  (Select Distinct sch_prod_order_id  ProductionOrderId
   from #tempInsertNewProductionOrders  ProductionOrder
   For XML Auto ,Type )
from #results  Results
For xml auto, ROOT('BulkInsertResults')

-- Note with Sql 2005 can:
-- a) Place a Root node around the returned XML
-- b) Specify a Target Namespace.
-- c) Do nesting with an XML Auto Statement.

GO
SET ANSI_NULLS OFF
GO
SET QUOTED_IDENTIFIER OFF

A sample of the XML returned from this stored procedure is as below:

The above XML message is then passed back to the Adapter. This Small message is then published into the MessageBox Database.
Note: When authoring this stored procedure, any information can be returned in this XML message.


Once this small XML message is published into the MessageBox database, any number of orchestrations can subscribe to this message. In this particular case, the main message (Now residing in a table in a Sql Server database) is to be split into separate messages by using the distinct production orders returned back in the above
XML message. A discussion of this pattern can be found here. The productions order records in the Sql Server table could also be transformed (mapped) by using Select Statements with XML Auto and XML Explicit clauses.

Results of Tests

The following hardware was used: Laptop with 2.0 GHz processor.
Windows XP operating System hosting a Windows 2003 VPC image with 1.3 MB of memory allocated.
This VPC image hosted the BizTalk 2006 Server and a Sql Server 2005 instance hosting the BizTalk databases.
The local Sql Server instance also hosted the database where Bulk Insert operation occurred.

Test for processing one Message

Only one flat file message was processed at a time. Only one implementation was tested at a time:
Either the PipeLine (Implementation one) or the SqlBulkInsert Adapter (Impelementation Two).


Results for Pipeline Processing

Size of Flat File Time for Pipeline Processing

XML File Size (Published into the MessageBox)

Number of Rows (Production Orders)  in Message
201 MB 25 Minutes 767 MB 807428
108 MB 13 Minutes 413 MB 435076
56 MB 7 Minutes 213 MB 224130
30 MB 4 Minutes 114 MB 120234
15 MB 3 Minutes 56 MB 62234
8 MB 2 Minutes 31 MB 29123


Results for SqlBulkInsert Adapter

Size of Flat File Time For SqlBulkInsert Adapter Load Number of Rows (Production Orders)  in Message
201 MB 3 Minutes 807428
108 MB 1 Minute 15 Seconds 435076
56 MB 50 Seconds 224130
30 MB 15 Seconds 120234
15 MB 8 Seconds 62234
8 MB 5 Seconds 29123


Discussion of Results

Implementation One: Processing with a Flat File Schema and Custom Receive Pipeline

This is the usual method to process a flat file in BizTalk. Besides disassembling (converting) the flat file into an XML format, receive pipelines perform many other important operations such as:
a) Decoding
b) Disassembling
c) Validation
d) Custom operations using Custom pipeline Components. One example would be Unzipping a file.
etc.

The Good

a) It works!
b) From a design point of view, this is the preferred method as the pipeline can perform a number
of operations on the original message before the final message is published into the messagebox database.
c) Much more complicated flat files can be disassembled in a pipeline. The flat file used in the demo, is simple in structure.
An example of a more complicated delimited and positional flat file is as below, with Header, Detail and Footer Records:

PO1999-10-20
US        Alice Smith         123 Maple Street    Mill Valley    CA 90952
US        Robert Smith        8 Oak Avenue        Old Town       PA 95819
Hurry, my lawn is going wild!
ITEMS,ITEM872-AA|Lawnmower|1|148.95


The Bad

For large Flat File Messages:
a) the operation is somewhat slow (see times above),
b) CPU is pinned the entire time the flat file is being processed in the pipeline.
c) The original flat file message when converted to a XML, more than triples in Size, for example:
201 MB (flat file) transforms to 767 MB (XML). Depending on the operation being performed, a 767 MB message may be unmanageable (for example as a source message in a map).


Implementation Two : Processing with the SqlBulkInsert Adapter

This implementation was discussed in detail above.

The Good

a) Much faster in pre-processing the large messages (see times above).
b) Will not pin the CPU on the BizTalk machine (pipeline processing is skipped)
c) Large XML messages are not published into the messagebox database.
d) There really is not much to this -> A custom adapter and a stored procedure.


The Bad

a) A more complex flat file (see complex flat file message above) cannot be processed with this implementation. This is a limitation of the Bulk Insert Statement.
Other techniques could be investigated to Load structurally more complex flat files into Sql Server including:
i) DTS (Data Transformation Services). Sql 2000 and Sql 2005
ii) Sql Server Integration Services. Sql 2005
b) For the initial message the receive pipeline processing is skipped. If the incoming flat file was encoded or zipped up then the incoming flat file could be first be processed by another receive location/pipeline to decode or unzip, and then routed to a folder where the receive location is configured to use the SqlBulkInsert adapter.
c) The process (orchestration) that subscribes to the small XML message published into the MessageBox, has to be configured to connect to the database where the main message is stored in a relational sql table. This might mean just configuring Solicit - Send Send ports using a Sql Adapter
d) If using a Bulk Insert Statement to load in the flat file, Sql 2005 might have to be used. (See comments in the Stored Procedure) 


Final Thoughts

I have only spent about two hours on the adapter and it is not ready for production (It is a prototype).
I have also tested by processing two large messages simultaneously. Therefore if you are interested in 
using it, download the code and modify or just start from scratch. The code in the adapter is relatively straightforward.

The adapter was originally created with the BizTalk Server 2004 Adapter Wizard and uses the common adapter base classes and interfaces.

This technique (Adapter) could also be used to Bulk load an XML message into Sql Server.
Download the Code HERE. Read the ReadMe.Txt before installing and running.