Developing Streaming Pipeline Components - Part 3

After covering the "why" of streaming design and some comparisons with in memory designs in Part 1, and then some technical differences in regards to moving processing logic from the pipeline components to the readers and streams they use in Part 2, this post will finally begin to dig into how the streaming processing is achieved in BizTalk.

...

By now from the last post (and the intro!) you will be aware that in streaming components the processing of the message is performed in the stream given to BizTalk.  I also mentioned that "Custom streams wrap other streams.  The inner stream becomes the source of data for the outer stream".  However, in order to perform some sort of processing of the data within the stream, we need to actually pay attention to (or "watch") the data that is moving through it so we know when we want to modify the data, or promote the data, or do whatever it is we want to do based on the data.

Introducing the XmlReader

This is where the XmlReader comes into play.  The XmlReader is a cursor based object that provides a forward only means of accessing XML data streams.  Due to the design of the cursor moving 1 node at a time through XML data, it is the perfect choice of class for basing streaming designs upon - as this essentially implies that only that node's details are loaded into memory at any given time.  The other benefit of the XmlReader is that it is very expressive about the node currently under the reader - you advance the reader using its methods, and its properties reflect the data that it is positioned above.  Because of its expressiveness of the data it is currently reading, the XmlReader is also excellent for being able to watch the data passing through it, and make clear decisions on when you are reading a piece of data that is particularly interesting for the task you are trying to achieve.  When this occurs, you can implement logic to perform your task - whether it be to modify the data, or to promote a value to the message context, or whatever it happens to be.  (I will go into the details of making streaming modifications to data, and other processing that you may need to do in a pipeline component in a future post.)

Due to the advantages the XmlReader offers us, it is pretty much always the class that is used in BizTalk pipelines to read incoming data that is in XML format.  However, the XmlReader only allows consuming XML data.  If our pipeline component hooks up some custom XmlReader to the inbound stream, the only thing we can do with that data is consume it.  What happens to the next pipeline component in the pipeline?  What data is going to be provided to it if the XmlReader our component used just consumed the entire stream?  Well, with what I have covered so far, it is basically going to get a reference to a stream that is already positioned at "end of stream".  Therefore, even if we managed to promote an XML node we were looking for in the data to the message context by implementing some custom logic with the XmlReader, we have essentially lost the message body!  We need to somehow be able to read the inbound stream with an XmlReader, but also maintain the data we are reading so the next component in the pipeline is also passed the XML message.

Introducing the XmlWriter, and coupling it with XmlReader

This issue is overcome by "coupling" or "chaining" the XmlReader with the XmlWriter.  The XmlWriter is for producing XML data.  (I will not go into great detail about the API of the XmlReader and XmlWriter as I assume most developers have some experience with them, and they are well documented on MSDN.)  Like the XmlReader, it is also very expressive about the data that you wish to produce with it.  It has a number of methods for writing very specific parts of XML data.  Therefore, due to the expressiveness of both the XmlReader and the XmlWriter, the node you are currently positioned over with the XmlReader has a known method which can be called on the XmlWriter to "reproduce" the XML that was consumed by the reader.  E.g. if the XmlReader is currently on an element (reader.NodeType property equals XmlNodetype.Element), you can call XmlWriter.WriteStartElement, passing the element's details from the reader - reader.Prefix, reader.LocalName, reader.NamespaceURI.

The following code snippet can be used to completely consume an XmlReader, but it will re-generate each node from the reader via the XmlWriter as it is read.  You can try this with any XML file - just make sure your XmlReader variable is called reader and your XmlWriter variable is called writer.  Try it and see.  This concept is absolutely key to how BizTalk processes messages in a streaming manner.  Also to note with this is that because it is re-creating the data as it is read, the data is essentially moving from the inbound stream to the outbound stream with very minimal memory consumption.

   1: while (reader.Read())
   2: {
   3:     switch (reader.NodeType)
   4:     {
   5:         case XmlNodeType.Element:
   6:             // Write out the element declaration
   7:             writer.WriteStartElement(reader.Prefix, reader.LocalName, reader.NamespaceURI);
   8:             // Copy the attributes of the reader
   9:             writer.WriteAttributes(reader, true);
  10:             // If the elemet is empty (eg. "<Test/>") then write the end immediately
  11:             if (reader.IsEmptyElement)
  12:                 writer.WriteEndElement();
  13:             break;
  14:  
  15:         case XmlNodeType.Text:
  16:             writer.WriteString(reader.Value);
  17:             break;
  18:  
  19:         case XmlNodeType.CDATA:
  20:             writer.WriteCData(reader.Value);
  21:             break;
  22:  
  23:         case XmlNodeType.EntityReference:
  24:             writer.WriteEntityRef(reader.Name);
  25:             break;
  26:  
  27:         case XmlNodeType.ProcessingInstruction:
  28:         case XmlNodeType.XmlDeclaration:
  29:             writer.WriteProcessingInstruction(reader.Name, reader.Value);
  30:             break;
  31:  
  32:         case XmlNodeType.Comment:
  33:             writer.WriteComment(reader.Value);
  34:             break;
  35:  
  36:         case XmlNodeType.DocumentType:
  37:             // Get the public and system IDs to pass to the WriteDocType method
  38:             writer.WriteDocType(reader.Name, reader.GetAttribute("PUBLIC"), reader.GetAttribute("SYSTEM"), reader.Value);
  39:             break;
  40:  
  41:         case XmlNodeType.Whitespace:
  42:         case XmlNodeType.SignificantWhitespace:
  43:             writer.WriteWhitespace(reader.Value);
  44:             break;
  45:  
  46:         case XmlNodeType.EndElement:
  47:             // Force a closing element (eg. "</Test>") by calling WriteFullEndElement rather than WriteEndElement
  48:             writer.WriteFullEndElement();
  49:             break;
  50:     }
  51:  
  52:     writer.Flush();
  53: }

 

This explanation and code snippet detail how we can consume data and "keep" the data that we consumed.  But this still does not tie in with what BizTalk message parts require for their data - Streams.  It has also not explained is how we can get this "chunking" of XML to happen within BizTalk at the right time.  I.e. how do we rig up the XmlReader and XmlWriter so we are only moving data from the inbound stream to the outbound stream when the data is required in the outbound stream?  As discussed in Part 2, data is only required in our outbound stream when something else reads it.  (For our component, the stream that is returned by our component is regarded as our outbound stream and the next component's inbound stream.)  If we push the entire inbound stream (or any data for that matter) into the outbound stream before anything needs to read it, we are most likely pushing that data into memory (assuming the XmlWriter is writing to a MemoryStream).  We could use the VirtualStream with ships with BizTalk, but this also writes to memory and then to disk once a size threshold has been passed.  The real issue is that we don't need to write data to the outbound stream until it really is needed (even if that stream does not consume memory) - otherwise we are not being efficient in that we are going to "double handle" the data as we go via some form of storage (memory, disk etc).  What would be better is if we could simply hand the next stream its data directly when it wants it.

This leads into the next post topic - the design of the BizTalk stream stack.  This will cover how the issues in the last paragraph are overcome.  I think it should be the last topic that needs to be covered before looking into real world examples of how to do something useful with the data!

 

NOTE: VirtualStream ships in both the SDK as open source and as part of the runtime.  Most developers are not aware that you can simply use the out of the box version - there is no need to compile the SDK one into your own library!