Developing Streaming Pipeline Components - Part 5

In the Part 4 I covered the XmlTranslatorStream in detail, explaining how it provides powerful coupling of an XmlReader and XmlWriter as it is read.  This revealed how BizTalk manages to execute a pipeline of components in an efficient way - because it is actually reading a series of wrapped streams added by each component in the pipeline.  However, what the last post did not disclose was how we actually perform any useful processing now that we know how to process data in a streaming mode.

This post I will cover how we can achieve this streaming based processing of data - and give some examples of how and where BizTalk manages to do this with its out of the box components.

...

As I eluded to towards the end of Part 4, processing data through the XmlTranslatorStream is pointless if you are simply providing an XML inbound stream, and reading from it an exact XML outbound stream, without performing some other processing function.  All this is achieving is slowing down the movement of data as it moves through the BizTalk streams by adding an extra layer of overhead.  However, because both the XmlReader and XmlTranslatorStream classes are very extensible, we can provide our own custom implementations of these to perform our processing.

In general, you have two main extensibility options:

  1. Subclass XmlReader and perform your custom processing in there
    • Then pass your custom XmlReader to the XmlTranslatorStream as its source of data
    • Then set the XmlTranslatorStream as the message part's data
  2. Subclass XmlTranslatorStream and perform your custom processing in there
    • Then set your subclass of XmlTranslatorStream as the message part's data

Note that you can also perform a combination of both, but in most cases I think you would only really need to do one.

BizTalk ships with out of the box components that perform functionality such as:

  • XML Data validation
  • Data conversion (XML/Flat File)
  • Property promotion/demotion
  • Data manipulation (e.g. property demotion modifies the outbound data)

All of these pieces of functionality are performed efficiently using a streaming approach - using a mixture of the extensibility options above to perform this processing logic.  Let's look at implementing some custom functionality, and placing the same logic in both a custom XmlReader and custom XmlTranslatorStream, to compare how each implementation would work.

Before we look at the code, I will go over some more details of how the XmlTranslatorStream works.  When XmlTranslatorStream's ProcessXmlNodes method is called (see Part 4 for a detailed look at when this is called), it then advances its internal XmlReader and calls the TranslateXmlNode method.  This method essentially performs a check on the current NodeType under the reader, and delegates to the appropriate TranslateXxx virtual method accordingly.  It provides a TranslateXxx method for every type of XML node, which is called when the corresponding node type is being copied from the XmlReader to the XmlWriter (see Part 3 for sample code on using the XmlReader.NodeType property to allow XmlReader to XmlWriter data duplication).

By providing all of the TranslateXxx methods as virtual, the XmlTranslatorStream allows us to easily intercept the nodes & values that are written from the XmlReader to the XmlWriter.  By allowing this, we can easily perform modification of the data as it is being written.  We can easily choose to write our own values to the XmlWriter instead of the ones that were provided by the XmlReader, or we can choose not to write any data at all - essentially removing the data from the stream.  This extensible design of the XmlTranslatorStream gives us very granular control over the output data.

Now, back to an example!  BizTalk ships with the NamespaceTranslatorStream class, which performs a streaming replacement of a specified XML namespace, given the old namespace to find and the new namespace to replace it with.  However, this stream is limited to only updating a single namespace per instance of the stream.  If you need to replace more than one namespace, you need to wrap each NamespaceTranslatorStream with another NamespaceTranslatorStream and give each one separate namespaces to replace.  This causes more overhead each layer of stream you add (note that NamespaceTranslatorStream derives from XmlTranslatorStream), and also makes the code that sets it up more verbose.  What would be nicer would be to have a single class that performs namespace replacements for any number of namespaces.  This can be easily achieved by creating our own subclass of XmlTranslatorStream.

Take a look at the following custom implementation of XmlTranslatorStream:

/// <summary>
/// Allows the streaming replacement of multiple namespaces.
/// Note: This class could be easily enhanced to also update the prefix being written (if the ns0 prefixes that BizTalk generates are not wanted!).
/// </summary>
public class MultiNamespaceTranslatorStream : XmlTranslatorStream
{
    private Dictionary<string, string> _namespaceReplacements;

    public MultiNamespaceTranslatorStream(XmlReader reader) : base(reader) 
    {
        Initialise();
    }

    public MultiNamespaceTranslatorStream(XmlReader reader, Encoding encoding) : base(reader, encoding) 
    {
        Initialise();
    }

    public MultiNamespaceTranslatorStream(XmlReader reader, Encoding encoding, MemoryStream outputStream) : base(reader, encoding, outputStream) 
    {
        Initialise();
    }

    public Dictionary<string, string> NamespaceReplacements
    {
        get { return _namespaceReplacements; }
        set { _namespaceReplacements = value; }
    }

    private void Initialise()
    {
        _namespaceReplacements = new Dictionary<string, string>();
    }

    protected override int ProcessXmlNodes(int count)
    {
        // This method is called when the buffer in the base stream cannot fulfill the amount of data being requested
        // This in turn causes the reader to advance, and the data from the reader to be written to the writer & underlying buffer
        return base.ProcessXmlNodes(count);
    }

    /// <summary>
    /// Override to replace the namespaces of attributes being translated.
    /// </summary>
    protected override void TranslateAttributeValue(string prefix, string localName, string nsURI, string val)
    {
        // Handle the default namespace declaration (has no prefix). Eg. xmlns="http://mynamespace"
        if (((localName == "xmlns") && (nsURI == "http://www.w3.org/2000/xmlns/")) && (NamespaceReplacements.ContainsKey(val)))
        {
            // Replace the ns with the one configured
            base.TranslateAttributeValue(prefix, localName, nsURI, NamespaceReplacements[val]);
        }
        // Handle non-default namespace declarations. Eg. xmlns:ns0="http://mynamespace"
        else if (((prefix == "xmlns") && (nsURI == "http://www.w3.org/2000/xmlns/")) && (NamespaceReplacements.ContainsKey(val)))
        {
            // Replace the ns with the one configured
            base.TranslateAttributeValue(prefix, localName, nsURI, NamespaceReplacements[val]);
        }
        else
        {
            // Does not match a namespace configured to be replaced
            base.TranslateAttributeValue(prefix, localName, nsURI, val);
        }
    }

    /// <summary>
    /// Override to replace the namespaces of elements being translated.
    /// </summary>
    protected override void TranslateStartElement(string prefix, string localName, string nsURI)
    {
        if (NamespaceReplacements.ContainsKey(nsURI))
        {
            // Replace the ns with the one configured
            base.TranslateStartElement(prefix, localName, NamespaceReplacements[nsURI]);
        }
        else
        {
            // Does not match a namespace configured to be replaced
            base.TranslateStartElement(prefix, localName, nsURI);
        }
    }
}

Note that the key methods to override for performing this particular task are the TranslateAttributeValue, which is called by the XmlTranslatorStream when an Attribute is being copied from its internal XmlReader over to the XmlWriter; and TranslateStartElement - called when the start of an Element is being copied.  Also note that this class could be easily enhanced to also replace the prefix being written - for example if the ns0 style prefixes that BizTalk generates are not the prefixes that are needed.

This can be easily tested with the following code.  Ensure that your sample file contains multiple namespaces to replace, and they match the ones configured:

public void TranslateData_CustomTranslatorStream()
{
    using(Stream output = new FileStream("MyOutputXmlFile.xml", FileMode.Create, FileAccess.Write, FileShare.Read))
    {
        Dictionary<string, string> nsReplacements = new Dictionary<string, string>();
        // Set the replacement namespaces
        nsReplacements.Add("http://originalNS1", "http://newwackynamespace.com");
        nsReplacements.Add("http://originalNS2", "http://anotherwackynamespace.com");
        nsReplacements.Add("http://originalNS3", "http://runoutofwackynames.com");
    
        using (XmlReader reader = XmlReader.Create("MyInputXmlFile.xml"))
        {
            using (MultiNamespaceTranslatorStream stream = new MultiNamespaceTranslatorStream(reader))
            {
                stream.NamespaceReplacements = nsReplacements;
                int value;
                while ((value = stream.ReadByte()) != -1)
                    output.WriteByte((byte)value);
            }
        }
    }
}

Now compare this to what you would have had to do to use BizTalk's built in NamespaceTranslatorStream:

/// <summary>
/// Translate a document with multiple namespaces using the NamespaceTranslatorStream. Wrap a NamespaceTranslatorStream per namespace to translate.
/// </summary>
public void Translate_MultipleNamespaces()
{
    using (Stream output = new FileStream("MyOutputXmlFile.xml", FileMode.Create))
    {
        using (Stream fileStream = new FileStream("MyInputXmlFile.xml", FileMode.Open, FileAccess.Read))
        {
            // Provide the old and replacement namespace for the first namespace.
            using (Stream stream1 = new NamespaceTranslatorStream(new PipelineContext(), fileStream, "http://originalNS1", "http://newwackynamespace.com"))
            {
                // Wrap the first stream with another NamespaceTranslatorStream for the second namespace.
                using (Stream stream2 = new NamespaceTranslatorStream(new PipelineContext(), stream1, "http://originalNS2", "http://anotherwackynamespace.com"))
                {
                    using (Stream stream3 = new NamespaceTranslatorStream(new PipelineContext(), stream2, "http://originalNS3", "http://runoutofwackynames.com"))
                    {
                        int value;
                        // Make sure the outer stream is the one being read!
                        while ((value = stream3.ReadByte()) != -1)
                            output.WriteByte((byte)value);
                    }
                }
            }
        }
    }
}

So, this simple custom class that is less than 100 lines of code long should help you start to see how real world problems can be solved using a streaming approach.  This small class allows you to avoid loading the entire XML message into the DOM and updating via the inefficient in memory approach.

Now let's take a look at how we could have achieved the same result by utilising a custom XmlReader.  The concept is similar in that we are changing the view of data at specific points - in this example, when we see data that is using a namespace we need to change.  However, because the XmlReader exposes its view of the data via properties, rather than specific virtual methods augmented by parameters, this time we want to create an XmlReader that changes the view of data exposed by its properties.

This is performed by the custom XmlReader in the following code.  Note that if this were to be used within BizTalk, this XmlReader would then be passed to the constructor of the XmlTranslatorStream:

public class MultiNamespaceTranslatorReader : XmlReader
{
    private bool _inAttribute;
    private XmlReader _wrappedReader;
    private Dictionary<string, string> _namespaceReplacements;

    public MultiNamespaceTranslatorReader(XmlReader wrappedReader) : this(wrappedReader, null)
    { }

    public MultiNamespaceTranslatorReader(XmlReader wrappedReader, 
        Dictionary<string, string> namespaceReplacements)
    {
        Initialise(wrappedReader, namespaceReplacements);
    }

    public Dictionary<string, string> NamespaceReplacements
    {
        get { return _namespaceReplacements; }
        set { _namespaceReplacements = value; }
    }

    protected XmlReader WrappedReader
    {
        get { return _wrappedReader; }
        set { _wrappedReader = value; }
    }

    protected bool InAttribute
    {
        get { return _inAttribute; }
        set { _inAttribute = value; }
    }

    public override int AttributeCount
    {
        get { return WrappedReader.AttributeCount; }
    }

    public override string BaseURI
    {
        get { return WrappedReader.BaseURI; }
    }

    public override void Close()
    {
        WrappedReader.Close();
    }

    public override int Depth
    {
        get { return WrappedReader.Depth; }
    }

    public override bool EOF
    {
        get { return WrappedReader.EOF; }
    }

    public override string GetAttribute(int i)
    {
        return WrappedReader.GetAttribute(i);
    }

    public override string GetAttribute(string name, string namespaceURI)
    {
        return WrappedReader.GetAttribute(name, namespaceURI);
    }

    public override string GetAttribute(string name)
    {
        return GetAttribute(name);
    }

    public override bool HasValue
    {
        get { return WrappedReader.HasValue; }
    }

    public override bool IsEmptyElement
    {
        get { return WrappedReader.IsEmptyElement; }
    }

    public override string LocalName
    {
        get { return WrappedReader.LocalName; }
    }

    public override string LookupNamespace(string prefix)
    {
        // Check if the namespace for this prefix has been replaced
        string ns = WrappedReader.LookupNamespace(prefix);
        if (NamespaceReplacements.ContainsKey(ns))
            return NamespaceReplacements[ns];
        else
            return ns;
    }

    public override bool MoveToAttribute(string name, string ns)
    {
        InAttribute = WrappedReader.MoveToAttribute(name, ns);
        return InAttribute;
    }

    public override bool MoveToAttribute(string name)
    {
        InAttribute = WrappedReader.MoveToAttribute(name);
        return InAttribute;
    }

    public override bool MoveToElement()
    {
        InAttribute = !WrappedReader.MoveToElement();
        return !InAttribute;
    }

    public override bool MoveToFirstAttribute()
    {
        InAttribute = WrappedReader.MoveToFirstAttribute();
        return InAttribute;
    }

    public override bool MoveToNextAttribute()
    {
        InAttribute = WrappedReader.MoveToNextAttribute();
        return InAttribute;
    }

    public override XmlNameTable NameTable
    {
        get { return WrappedReader.NameTable; }
    }

    public override string NamespaceURI
    {
        get 
        { 
            if(NamespaceReplacements.ContainsKey(WrappedReader.NamespaceURI))
                return NamespaceReplacements[WrappedReader.NamespaceURI];
            else
                return WrappedReader.NamespaceURI;
        }
    }

    public override XmlNodeType NodeType
    {
        get { return WrappedReader.NodeType; }
    }

    public override string Prefix
    {
        get { return WrappedReader.Prefix; }
    }

    public override bool Read()
    {
        return WrappedReader.Read();
    }

    public override bool ReadAttributeValue()
    {
        return WrappedReader.ReadAttributeValue();
    }

    public override ReadState ReadState
    {
        get { return WrappedReader.ReadState; }
    }

    public override void ResolveEntity()
    {
        WrappedReader.ResolveEntity();
    }

    public override string Value
    {
        get
        {
            // When reading an attribute (i.e. InAttribute is true), the Value property will
            // contain the actual namespace of the attribute.
            // E.g. For attribute: xmlns:ns0="http://mynamespace", Value = "http://mynamespace"
            if (InAttribute && NamespaceReplacements.ContainsKey(WrappedReader.Value))
                return NamespaceReplacements[WrappedReader.Value];
            else
                return WrappedReader.Value;
        }
    }

    private void Initialise(XmlReader wrappedReader, Dictionary<string, string> namespaceReplacements)
    {
        if (wrappedReader == null)
            throw new ArgumentNullException("wrappedReader");

        _wrappedReader = wrappedReader;

        if (namespaceReplacements == null)
            _namespaceReplacements = new Dictionary<string, string>();
        else
            _namespaceReplacements = namespaceReplacements;
    }
}

Note that our custom XmlReader just becomes a special type of "filter" over an existing XmlReader, which we wrap.  We can then delegate any calls directly down to the wrapped XmlReader if it is dealing with data that we are not concerned with.  Otherwise, we can intercept the call to the wrapped implementation, and return our own data instead.

This can be tested with the following code:

public void TranslateData_CustomTranslatorReader()
{
    Dictionary<string, string> nsReplacements = new Dictionary<string, string>();

    nsReplacements.Add("http://CBR.CBRInputSchema", "http://newwackynamespace.com");
    nsReplacements.Add("http://secondnamespace", "http://anotherwackynamespace.com");
    nsReplacements.Add("http://thirdnamespace", "http://runoutofwackynames.com");

    XmlWriterSettings settings = new XmlWriterSettings();
    settings.OmitXmlDeclaration = true;
    settings.Indent = true;
    using(XmlWriter writer = XmlWriter.Create(_output, settings))
    {
        using (XmlReader reader = XmlReader.Create(Constants.SampleFilePath_MultiNamespace))
        {
            using (XmlReader nsTransReader = new MultiNamespaceTranslatorReader(reader, nsReplacements))
            {
                // WriteNode writes out EVERY node from the reader to the writer
                // However, there is no way to get it to write just a SINGLE node
                writer.WriteNode(nsTransReader, true);
            }
        }
    }
}

After seeing the two different approaches, hopefully you can now see the how each allows for the custom processing of data by BizTalk by intercepting the inbound data as it is written back out.  I personally believe that subclassing the XmlReader is the better approach, because the XmlReader was designed as a "view" of XML data, and it allows clients to see your modified view in an XML sense, as it is read.  By subclassing the XmlTranslatorStream, you are changing the output stream of data for the next person who reads that stream.  They then have to create their own XmlReader to read over that stream anyway, which you could have done for them already just by implementing your logic as an XmlReader.  The main issue I believe with the XmlReader approach is that you need to have an understanding of the operations it provides, as you may be impacting how they need to function.  Take my custom XmlReader as an example - I needed to override the LookupNamespace method, otherwise it would have behaved incorrectly.  However, this may not be immediately obvious.

Well, that pretty much completes this series of 5 posts detailing how to develop streaming pipeline components.  I hope that from Part 1 through to this fifth one I have managed to fill in a number of gaps in the "art" of developing streaming pipeline components, and save you the many hours that I spent investigating this technique.  From what I have found, there is very little documented both officially and in blogs about how to develop proper streaming components, rather than the why, and also very little detail on what already comes with BizTalk that you can easily take advantage of.  There are only a handful of posts that I could find indicating that others are aware of what you get out of the box in this area, and that they understand how to use them - see here, here and here .  There are many more classes that ship with BizTalk which aid in developing your streaming components, which I have not yet covered.  These include the classes that implement the features I pointed out at the top such as validation and property promotion & demotion.  I plan to cover many of these in detail in future posts.  However, they follow the pattern which I have described in the last 5 posts (most are implemented as custom XmlReaders or custom XmlTranslatorStreams), and as a result I will not add them as posts under the banner of this series.  They are more or less a follow-on to this series.  Over and out!

Developing Streaming Pipeline Components - Part 4

This post I will focus on BizTalk's stream design, and how it allows for the XmlReader and XmlWriter coupling detailed in Part 3.  Once this topic has been covered, we are finally at a point where all of this background can be applied to achieving actual real world tasks.  Yes I know, about time...

This post I will cover the key BizTalk pipeline stream - the XmlTranslatorStream, and therefore will be a long post.  This post covers what is probably the most crucial details for understanding the whole streaming design.

...

Where I left off in my prior post, I had discussed the issues with needing to be able to chain the XmlReader and XmlWriter together at the right time - to move data through from the inbound stream to the outbound stream on demand.  In other words, when someone further down the pipeline from us needs some data to read - this is when we want this movement of data to occur.  Also, BizTalk requires that a message part's data is actually a System.IO.Stream type.

Introducing the XmlTranslatorStream

Instead of trying to re-format what I had already written up for the Toronto BTUG presentation I did on the same topic as this series, I thought I would steal my own slide.

This slide shows the key BizTalk stream when it comes to writing streaming pipeline components - the XmlTranslatorStream, and its relation to the base System.IO.Stream.

BizTalk Stream Design

 

As you can see, the XmlTranslatorStream has a number of parent classes in the inheritance hierarchy, and at each layer a piece of functionality is added.  I will go into the specifics of what occurs in each level later in this post, and how it is implemented internally.  The end result is that the XmlTranslatorStream can be provided with an XmlReader as its source of data, and when the XmlTranslatorStream is read it internally chains the reader's data through to an XmlWriter - resulting in an output from the XmlTranslatorStream that is identical to the inbound stream that the XmlReader is reading.

To put this in relation to BizTalk pipelines, this means that in a pipeline component we can now:

  • Attach an XmlReader to the inbound message's data stream
  • Create a new XmlTranslatorStream, and provide it with the XmlReader we created
  • Set the XmlTranslatorStream to the outbound message's data stream

Note: This does not explain how our custom logic is going to be performed.  I will go into this in a future post, but it usually involves passing a custom XmlReader to the XmlTranslatorStream, or deriving from XmlTranslatorStream and overriding the functionality it provides.

Once the next component in the pipeline gets a reference to its inbound message's data, it is going to be a reference to the XmlTranslatorStream that we set in the prior component.  This component, and all other components in the pipeline perform the steps in the bullet points.  Note that this is for general cases - I'm sure there are many cases where other steps will need to be performed in the pipeline component's main method.

Therefore, at the end of the pipeline when all pipeline components have been called by BizTalk (but no data has been read yet), we have a stream structure like is depicted in the following slide:

Wrapped Streams

This slide attempts to show that we end up with a series of layered streams.  This is for a receive pipeline, and it shows (from inner circle to outer):

  • A FileStream representing the message received by the file adapter (this isn't actually the type of stream the adapter gives us, but it's one that everyone is familiar with)
  • An XmlDasmStreamWrapper which is returned by the BizTalk XML Disassembler component
  • An XmlValidatingStream which is returned by the BizTalk XML Validator component
  • An XmlTranslatorStream returned by a custom pipeline component
  • Also, note that each inner layer of stream is "attached" to the next outer layer by the technique I discussed earlier.  That is, the outer stream contains an XmlReader which is reading over the inner stream as its source of data
  • Note that XmlDasmStreamWrapper and XmlValidatingStream both derive from XmlTranslatorStream, and therefore follow the same pattern as it

Now if you refer back to the "Streaming - Processing Execution Order" slide from Part 2, and the description I gave for how it executes, things should hopefully be starting to fall into place about how all of this operates.  Now when BizTalk starts reading the stream provided by the last component in the pipeline (represented by the blue circle in the above slide), this stream has no data to provide to the read request.  As a result, it reads from its internal XmlReader.  When the read method returns, it will push the data into its internal XmlWriter, and the data that the XmlWriter generates will then be given to the stream that requested the data.  But calling read on the outermost XmlReader in the chain will cause a number of calls before it returns...

You will notice that the only stream that has a physical backing store is the innermost FileStream.  All other streams are simply a wrapper around another stream.  Therefore, as each stream is read, it has no data to provide, so it spins its XmlReader, pulling data from the stream it is wrapping.  This in turn does the same thing until the read request makes it all the way into the innermost FileStream, which fulfils the read request as it actually has data to provide.  The call then propagates back down the stack, providing the data to the wrapping streams until it makes it all the way back to the outer stream and data is returned to BizTalk.

So far, in order to make it easier to understand where the XmlTranslatorStream fits in in the big picture and to be able to apply it to something developers can related to earlier rather than later, I have skipped going into the inner workings of it.  I have focused on how it can be used within the context of a BizTalk pipeline, and how data is passed from layer to layer of streams.  Now I will crack open the XmlTranslatorStream to reveal how it works in more detail.

Inside the XmlTranslatorStream

This section will show you how a call to the stream's Read method results in the internal XmlReader being read, and how that data is returned to the caller of Read.

I will now call upon another one of my presentation's slides.  This slide also shows the class hierarchy as did the one earlier, but also explains exactly what happens at each stage.  It details the method calls that will go on the stack as the request for data is fulfilled.  This slide also had shock and bore type animations, but unfortunately you won't be able to see them in this picture.  The main items to note are the red "no smoking" sign, which represents the XmlReader; the white one which represents the XmlWriter; and the drum which represents the internal buffer (more on this after the slide):

Data Through XmlTranslatorStream

The main thing to note which I haven't really covered for the chaining of the XmlReader and XmlWriter inside the XmlTranslatorStream is that there is also an internal buffer (represented by a MemoryStream), which is what the internal XmlWriter is actually outputting to.  This buffer is provided in the base class of XmlTranslatorStream - the XmlBufferedReaderStream.

So, to talk through what the slide is showing - the steps that occur in a single Read call to the XmlTranslatorStream:

  • Someone calls Stream.Read on the XmlTranslatorStream, passing in how many bytes it would like
  • The abstract Read method is implemented up at the EventingReadStream class.  (I will detail more about this class and how its features can be useful in a future post.)  It raises an event and then calls its abstract ReadInternal method, passing in all the parameters provided to the Read call
  • The abstract ReadInternal method is implemented up at the XmlBufferedReaderStream class.  The buffer is checked for data
  • If the buffer has enough data to fulfil the request, it is taken from the buffer.  Otherwise, the buffer needs more data, so XmlBufferedReaderStream calls its abstract ProcessXmlNodes method, providing it with the number of bytes it still requires after taking what was left from the buffer (the buffer may have fulfilled some of the data request)
  • The abstract ProcessXmlNodes method is implemented up at the XmlTranslatorStream.  When it is called by the XmlBufferedReaderStream, the XmlTranslatorStream needs to push more data into the buffer via the XmlWriter.  But the XmlWriter is given its data to write from the XmlReader.  Therefore, the XmlReader is read, and the new node under the reader is copied over to the XmlWriter, hence filling the buffer.  Each time the XmlReader's node is copied to the XmlWriter, the number of bytes in the buffer is compared against the number requested in the ProcessXmlNodes call.  If the buffer is still not full enough, more read/write calls are made.  When there is either enough data in the buffer, or the reader becomes end of file (EOF) before the number of bytes can be fulfilled, the call returns, passing the number of bytes that were pumped into the buffer during the ProcessXmlNodes call.  The XmlBufferedReaderStream then returns the data to the EventingReadStream as a result of the ReadInternal call, which then returns the data to the caller of the Read method, and the call completes.

That's it!  Those steps occur every time the stream is read.  The key point to take out of this is that we are pulling data through an XmlReader every time you read the stream (well, technically, every time you read the stream and the buffer cannot fulfil the request).  This is our main extensibility point - without utilising this extensibility, all we are doing is slowing the process down by consuming and replicating the data for no reason!

In the next post I will cover how we can take advantage of this extensibility and get the XmlTranslatorStream and the XmlReader we provide it to perform our processing logic.  By doing this we are moving away from an in memory approach and towards a streaming approach, which is exactly what we are trying to achieve.  I will demonstrate this by showing how BizTalk pipeline components use this technique for their processing.

[Update 22nd May 2008 - updated incorrect spelling of XmlBufferedReadStream to XmlBufferedReaderStream. The slide deck screen shots are still incorrect.]

BizTalk Server 2006 R3 Announced

Just in case you haven't heard from every other BizTalk blogger, BizTalk Server 2006 R3 has been announced.  This will support the latest release of technologies - Windows Server 2008, .Net Framework 3.5, VS2008 & SQL Server 2008.  It will also include some new bits too.  See here for the initial announcement.  See here for something completely different.

Developing Streaming Pipeline Components - Part 3

After covering the "why" of streaming design and some comparisons with in memory designs in Part 1, and then some technical differences in regards to moving processing logic from the pipeline components to the readers and streams they use in Part 2, this post will finally begin to dig into how the streaming processing is achieved in BizTalk.

...

By now from the last post (and the intro!) you will be aware that in streaming components the processing of the message is performed in the stream given to BizTalk.  I also mentioned that "Custom streams wrap other streams.  The inner stream becomes the source of data for the outer stream".  However, in order to perform some sort of processing of the data within the stream, we need to actually pay attention to (or "watch") the data that is moving through it so we know when we want to modify the data, or promote the data, or do whatever it is we want to do based on the data.

Introducing the XmlReader

This is where the XmlReader comes into play.  The XmlReader is a cursor based object that provides a forward only means of accessing XML data streams.  Due to the design of the cursor moving 1 node at a time through XML data, it is the perfect choice of class for basing streaming designs upon - as this essentially implies that only that node's details are loaded into memory at any given time.  The other benefit of the XmlReader is that it is very expressive about the node currently under the reader - you advance the reader using its methods, and its properties reflect the data that it is positioned above.  Because of its expressiveness of the data it is currently reading, the XmlReader is also excellent for being able to watch the data passing through it, and make clear decisions on when you are reading a piece of data that is particularly interesting for the task you are trying to achieve.  When this occurs, you can implement logic to perform your task - whether it be to modify the data, or to promote a value to the message context, or whatever it happens to be.  (I will go into the details of making streaming modifications to data, and other processing that you may need to do in a pipeline component in a future post.)

Due to the advantages the XmlReader offers us, it is pretty much always the class that is used in BizTalk pipelines to read incoming data that is in XML format.  However, the XmlReader only allows consuming XML data.  If our pipeline component hooks up some custom XmlReader to the inbound stream, the only thing we can do with that data is consume it.  What happens to the next pipeline component in the pipeline?  What data is going to be provided to it if the XmlReader our component used just consumed the entire stream?  Well, with what I have covered so far, it is basically going to get a reference to a stream that is already positioned at "end of stream".  Therefore, even if we managed to promote an XML node we were looking for in the data to the message context by implementing some custom logic with the XmlReader, we have essentially lost the message body!  We need to somehow be able to read the inbound stream with an XmlReader, but also maintain the data we are reading so the next component in the pipeline is also passed the XML message.

Introducing the XmlWriter, and coupling it with XmlReader

This issue is overcome by "coupling" or "chaining" the XmlReader with the XmlWriter.  The XmlWriter is for producing XML data.  (I will not go into great detail about the API of the XmlReader and XmlWriter as I assume most developers have some experience with them, and they are well documented on MSDN.)  Like the XmlReader, it is also very expressive about the data that you wish to produce with it.  It has a number of methods for writing very specific parts of XML data.  Therefore, due to the expressiveness of both the XmlReader and the XmlWriter, the node you are currently positioned over with the XmlReader has a known method which can be called on the XmlWriter to "reproduce" the XML that was consumed by the reader.  E.g. if the XmlReader is currently on an element (reader.NodeType property equals XmlNodetype.Element), you can call XmlWriter.WriteStartElement, passing the element's details from the reader - reader.Prefix, reader.LocalName, reader.NamespaceURI.

The following code snippet can be used to completely consume an XmlReader, but it will re-generate each node from the reader via the XmlWriter as it is read.  You can try this with any XML file - just make sure your XmlReader variable is called reader and your XmlWriter variable is called writer.  Try it and see.  This concept is absolutely key to how BizTalk processes messages in a streaming manner.  Also to note with this is that because it is re-creating the data as it is read, the data is essentially moving from the inbound stream to the outbound stream with very minimal memory consumption.

   1: while (reader.Read())
   2: {
   3:     switch (reader.NodeType)
   4:     {
   5:         case XmlNodeType.Element:
   6:             // Write out the element declaration
   7:             writer.WriteStartElement(reader.Prefix, reader.LocalName, reader.NamespaceURI);
   8:             // Copy the attributes of the reader
   9:             writer.WriteAttributes(reader, true);
  10:             // If the elemet is empty (eg. "<Test/>") then write the end immediately
  11:             if (reader.IsEmptyElement)
  12:                 writer.WriteEndElement();
  13:             break;
  14:  
  15:         case XmlNodeType.Text:
  16:             writer.WriteString(reader.Value);
  17:             break;
  18:  
  19:         case XmlNodeType.CDATA:
  20:             writer.WriteCData(reader.Value);
  21:             break;
  22:  
  23:         case XmlNodeType.EntityReference:
  24:             writer.WriteEntityRef(reader.Name);
  25:             break;
  26:  
  27:         case XmlNodeType.ProcessingInstruction:
  28:         case XmlNodeType.XmlDeclaration:
  29:             writer.WriteProcessingInstruction(reader.Name, reader.Value);
  30:             break;
  31:  
  32:         case XmlNodeType.Comment:
  33:             writer.WriteComment(reader.Value);
  34:             break;
  35:  
  36:         case XmlNodeType.DocumentType:
  37:             // Get the public and system IDs to pass to the WriteDocType method
  38:             writer.WriteDocType(reader.Name, reader.GetAttribute("PUBLIC"), reader.GetAttribute("SYSTEM"), reader.Value);
  39:             break;
  40:  
  41:         case XmlNodeType.Whitespace:
  42:         case XmlNodeType.SignificantWhitespace:
  43:             writer.WriteWhitespace(reader.Value);
  44:             break;
  45:  
  46:         case XmlNodeType.EndElement:
  47:             // Force a closing element (eg. "</Test>") by calling WriteFullEndElement rather than WriteEndElement
  48:             writer.WriteFullEndElement();
  49:             break;
  50:     }
  51:  
  52:     writer.Flush();
  53: }

 

This explanation and code snippet detail how we can consume data and "keep" the data that we consumed.  But this still does not tie in with what BizTalk message parts require for their data - Streams.  It has also not explained is how we can get this "chunking" of XML to happen within BizTalk at the right time.  I.e. how do we rig up the XmlReader and XmlWriter so we are only moving data from the inbound stream to the outbound stream when the data is required in the outbound stream?  As discussed in Part 2, data is only required in our outbound stream when something else reads it.  (For our component, the stream that is returned by our component is regarded as our outbound stream and the next component's inbound stream.)  If we push the entire inbound stream (or any data for that matter) into the outbound stream before anything needs to read it, we are most likely pushing that data into memory (assuming the XmlWriter is writing to a MemoryStream).  We could use the VirtualStream with ships with BizTalk, but this also writes to memory and then to disk once a size threshold has been passed.  The real issue is that we don't need to write data to the outbound stream until it really is needed (even if that stream does not consume memory) - otherwise we are not being efficient in that we are going to "double handle" the data as we go via some form of storage (memory, disk etc).  What would be better is if we could simply hand the next stream its data directly when it wants it.

This leads into the next post topic - the design of the BizTalk stream stack.  This will cover how the issues in the last paragraph are overcome.  I think it should be the last topic that needs to be covered before looking into real world examples of how to do something useful with the data!

 

NOTE: VirtualStream ships in both the SDK as open source and as part of the runtime.  Most developers are not aware that you can simply use the out of the box version - there is no need to compile the SDK one into your own library!

Developing Streaming Pipeline Components - Part 2

In Part 1 I provided a few comparisons of using streaming versus in memory approaches to pipeline component development.  In this post I will move into detailing the concepts behind developing streaming pipeline components (as in how is it technically different from the point of view of a pipeline component), and how a different mindset is required to understand how it works.  I won't yet delve into the really technical details as it's probably best to understand these concepts before looking into how this is achieved at a lower level.

...

One of the hardest concepts to grasp when moving from developing with an in-memory mindset to a streaming mindset is that in general your pipeline component itself does not modify or process the message data.  With a streaming approach, the message is modified as it passes through a stream (or a series of chained streams).  Developers are used to looking at the pipeline designer surface in Visual Studio and thinking that "component 1 will process it, then BizTalk will pass the resulting message to component 2 ... etc until the last component in the pipeline processes the message, and then BizTalk will put the resulting message in the message box database (or deliver it to the send adapter in the case of a send pipeline)".  This analysis is actually fairly correct if all the components in the pipeline do NOT process the message in a streaming manner.  However, with a pipeline utilising streaming components it is much different.

When taking a streaming approach, we want to be able to process the message in small chunks so as not to load much data into memory.  The way this is generally achieved is by:

  • Using a custom implementation of System.IO.Stream
    • NOTE: by custom implementation, this also includes the custom implementations that BizTalk provides out of the box.  I will go into deep detail for some of these in future posts, as well as writing your own custom ones 
    • Custom streams wrap other streams.  The inner stream becomes the source of data for the outer stream
  • Using a custom implementation of System.Xml.XmlReader (same rules apply to custom as above), coupled with a custom implementation of Stream.  I.e. you will always need a custom implemented stream, but not always need the custom XmlReader - it can depend on how/where you implement your logic

The processing that you intended your pipeline component to do in the Execute method (or whichever method applies to the type of component you are writing) is actually placed in logic that is contained in either the XmlReader or Stream.

  • NOTE: Execute is called for standard pipeline components
  • For dis-assembler components, BizTalk calls Disassemble AND GetNext, but the equivalent of the Execute method in terms of where to set the message part Data property is GetNext
  • For assembler components, BizTalk calls AddDocument AND Assemble, but the equivalent of the Execute method in terms of where to set the message part Data property is Assemble
  • Any reference to Execute is referring to the applicable method for the component type as stated above

Moving this logic into the XmlReader or Stream allows the logic to execute as the message is read by BizTalk.  And no - we don't just simply move the location of the logic from the pipeline component to one of these XmlReader or Stream classes - we need to change the way we actually achieve it too, otherwise we are just moving the issue.  As stated earlier, designing and writing these classes will be covered in future posts.

To tie our custom streams in with BizTalk, the BizTalk message part allows setting the Data property, which is actually of type Stream.  Therefore, we can set this property to our custom stream implementation and as a result when the stream is read our processing will execute.

Therefore, when a pipeline that contains components which have been designed in a streaming manner is executed by BizTalk, the behaviour is this (NOTE: this ignores per-instance configuration calls to Load which are a result of overriding properties in the send/receive location.  I have posted here about how per instance affects calls to Load):

  1. The Load method is called by BizTalk for each component in order of pipeline location
  2. The Execute method is called on component 1 by BizTalk
  3. Component 1 creates a new instance of a custom Stream (and optionally a custom XmlReader) and sets this as the return message part's Data property.  The source of data for the reader/stream is the set to the inbound stream.  The custom reader/stream will perform the actual processing (Eg. modification to data or promotions to the message context based on the data etc).  However, this will not occur until the stream that was passed back starts to be read by the next component in the pipeline, or BizTalk (in the case where it was the last component in the pipeline)
  4. BizTalk calls component 2's Execute method, passing to it the message returned from component 1.  Therefore, when component 2 accesses the message's Data property, it is actually retrieving a reference to the stream attached by component 1
  5. Component 2 does the same as component 1.  Note that no data has been read yet, but each component is wrapping another layer of stream around the stream it received
  6. Steps 4 and 5 are repeated for each component in the pipeline, passing in the message returned by the prior component
  7. Depending on whether the pipeline is a receive or a send pipeline - the BizTalk messaging engine (for receive pipelines) or the BizTalk send adapter (for send pipelines) retrieves a reference to the message, which was returned by the last component in the pipeline
  8. The message's Data stream is read by BizTalk or the send adapter.  This is the outermost stream in the layers of wrapped streams.  Therefore the data output by this stream is the net effect of all modifications made by inner streams (I.e the modifications made to the data by each inner stream are present in the data that is read here)
  9. For receive pipelines, the resulting data is pushed into the message box database.  For send pipelines, the data is sent by the adapter to the external system

In an attempt to visualise the difference, here are a couple of screen shots from the slide deck of my presentation at the Toronto BTUG.  This first one shows a pipeline (laying horizontally to show execution will occur L-R) with in memory style components.  The red cogs represent both a call to the component's Execute method, and that the processing logic is going to occur at the same time as the Execute call (the slide was animated to show the left most component executes, then each one individually through to the right most one):

In Memory Execution

Next is the corresponding slide for the streaming pipeline.  This time, the yellow lightning bolts indicate the call to the component's Execute method (again, the slide was animated to show the Execute call to the left most component through to the right most one).  However, this time the red cogs (which represents the processing each component is doing) are separate, and don't actually appear until after the right most component's Execute method has completed.  The database shape represents the BizTalk message box database, and the red "no smoking" sign represents fresh air for all, and also the BizTalk messaging engine which is reading the stream at the end of the pipeline.  In the slide version, which bamboozled viewers with a display of amazing PowerPoint animations never seen before at a local user group, once the last Execute method had run, the no smoking sign spun (indicating it was reading the outermost stream, which would have been set by the last component in the pipeline), pulling data through all wrapped streams that had been wired up by the pipeline components.  This resulted in the cogs spinning as well (indicating the processing was taking place in the streams as the data was read through them).  The small arrows indicate the data moving from the left side (from the receive adapter) to the right side as it is read, and then up to the message box as it is incrementally pushed into it.  I hope all that saw this wacky display of animations appreciated it, even if not for the technical knowledge it was attempting to convey, but for the fact it took me ages to figure out how to use PowerPoint animations.

Streaming Execution

 

 

Well I think that about covers enough of the high level details before being able to dig down into the specifics of how the custom implementations of Stream and XmlReader work.  The next post will get into these details, but may also seem a bit of a detour at first.  However it is also key to understanding the overall picture.  This post will look into "chaining" the XmlReader and XmlWriter together.

Pipeline Components: Per-Instance Configuration and Load calls

As most people that use BizTalk are probably aware, BizTalk 2004 and up allow the properties of a pipeline to be overridden in each send/receive location.  In BizTalk 2006 there is a nice UI via the admin console for doing this.

However, what is NOT immediately clear to developers of pipeline components is how this affects when BizTalk calls your Load method at runtime.  Due to the way this has been implemented by BizTalk, you need to be aware of this as your pipeline components need to explicitly handle this situation in the Load method.

In my opinion BizTalk handles per-instance configuration in a pretty ugly way.  When your component's Load method is first called, it is passed a property bag containing all properties that were set in the pipeline designer (and saved via your Save method).  This property bag does NOT contain any per-instance properties.  Per-instance properties are provided to your component via a second Load call.  In my opinion BizTalk should merge the 2 property bags for you and simply call your Load method once, with the resulting property bag.

What makes matters worse is that when per-instance Load calls are made, they only contain the properties which were overridden.  This potentially introduces bugs into your Load method.  Consider the following code for the Load method:

   1: public void Load(Microsoft.BizTalk.Component.Interop.IPropertyBag propertyBag, int errorLog)
   2: {
   3:     object temp;
   4:     propertyBag.Read("StringToReplaceProperty", out temp, 0);
   5:     StringToReplace = Convert.ToString(temp);
   6: }

This code will work fine for the first Load call which contains all saved properties (assuming all properties were saved when the pipeline was created - they can be missing if you add an extra property to the component and do not re-drag your component onto the designer).

However, when a per-instance Load call is made, if the actual "StringToReplace" property had not been overridden in the port, but instead some other property in the component had been, then the result after the per-instance Load call is that you will have set your property back to null!

The way to avoid this is to write a helper method which allows specifying a default value.  Then you can pass the property itself in as the default value.  If you do this, the per-instance read of the property bag returns null, so you simply return the default value, which is the value that the property was already set to by the first Load call.

I recommend writing a specific helper class and placing it in an assembly that you can use from all pipeline components to avoid having to deal with the extremely ugly IPropertyBag interface BizTalk presents to you.  Then you can add type specific read methods such as:

  • ReadStringProperty(propertyBag, propertyName, defaultValue)
  • ReadBoolProperty(propertyBag, propertyName, defaultValue)
  • ReadEnumProperty(propertyBag, propertyName, defaultValue)

This will make your code much more readable than creating variables to be used for out parameters.

Once you override properties using per-instance properties, the sequence of Load calls changes from a single call for each component.  Summarised below is the behaviour you will now see based on the pipeline component type.  NOTE: The easiest way of seeing what is going on here is to break in the Load method and view the call stack.  Right click and select "Show External Code".  This way you can see the BizTalk and System methods in the stack.

The Load behaviour you should now experience is:

  1. Design time properties are Loaded when the pipeline is created.  As a result, the Load method of each component is called from the first component in the pipeline to the last to load the design time properties (I.e. the VS designer configured properties)
  2. For standard pipeline components, per-instance properties cause a Load call immediately before the Execute method is called
  3. For dis-assembler components
    • Per-instance properties cause a second Load call which is triggered by the possible need of BizTalk to call to IProbeMessage.Probe (this interface can be optionally implemented by dis-assemblers as the dis-assemble stage is "first match").  NOTE: This call is made even if the component does NOT implement the IProbeMessage interface!
    • A Load call is made before the Disassemble method
    • A Load call is made before every call to GetNext
  4. For assembler components, per-instance properties cause a second Load call to be made prior to the call to AddDocument.  Therefore there are 2 simultaneous Load calls, then a call to AddDocument, then a call to Assemble
  5. If per-instance configuration has been specified for any properties in the pipeline, per-instance Load calls are made to every component in the pipeline, even if that specific component has no overridden properties

Developing Streaming Pipeline Components - Part 1

Hello thrillseekers.  Well here it is - Part 1 of ??? on developing streaming pipeline components for BizTalk Server.  My apologies for the delay of this post after causing mass hysteria with the earlier announcement of a series of posts on Developing Streaming Pipeline Components.  This photo was taken outside my apartment shortly after the announcement was made.

Mass Blogging Hysteria

This post I will cover the basics of what a "streaming approach" actually means, and why we ideally need take a streaming approach to pipeline component development.  I will perform some comparisons of an in memory approach to a streaming approach from a number of perspectives.  Then in the follow-up posts I will dive into the detail of how this is achieved.

...

So, when documentation etc. refers to taking a streaming approach to pipeline component development, it basically means that the messages being processed by the pipeline components do not load the entire message into memory - they work on the message "chunk by chunk" as it moves through the pipeline.  This has the obvious benefits of consuming less memory, which in a BizTalk system that is processing either large messages or a high volume of messages becomes critical.

Below is a table which compares an "in memory" approach to a streaming approach - from various perspectives.  I will get into the details in subsequent posts which will explain more about these areas.  There may be areas I have missed but these are the ones that come to mind:

Comparison of... Streaming In Memory
Memory Usage per Message Low, regardless of message size High (varies depending on message size)
Common Classes Used to Process XML Data

Built in and custom derivations of:

XmlTranslatorStream, XmlReader, XmlWriter

XmlDocument, XPathDocument, MemoryStream, VirtualStream
Documentation Poor – many un-supported and un-documented BizTalk classes Very good - framework classes

Location of “Processing Logic” Code

- “Wire up” readers & streams via Execute method

- Actual execution occurs in the readers & streams as the data is read through them

Directly from the Execute method of the pipeline component
Data Re-created at each wrapping layer as data is read through it

Read in/modified/written out at each component prior to next component being called

 

Now lets look at the advantages of both streaming and in-memory approaches:

Streaming In Memory
Low memory use Fast when small message size (i.e. When server memory consumption is not stretched)

By utilising the built in BizTalk classes, some functionality exposed in the out of box pipeline components can be embedded in your custom components

Developers generally have experience using these classes
Easy to add/re-use functionality by utilising the decorator pattern with Stream and XmlReader Classes are generally fully featured
E.g. XPath, XSLT fully support the standards
  Often quicker/easier to code
  Developers generally more familiar with this practice

 

And now for the limitations of each approach:

Streaming In Memory

Scenarios that require caching large amounts of data are generally not supported or defeat the purpose (i.e. The cache takes up memory anyway)

E.g. Certain XPath expressions

High memory use

This can cripple a server’s throughput when processing large messages or a large number of messages

Due to this, a streaming approach is recommended

Poor documentation/unfamiliar development patterns to many developers  
Built in BizTalk pipeline components not designed with extensibility in mind. Often a re-write is required to add a small amount of functionality  

  

As stated earlier, the details in the table are what comes to mind - I may have omitted some.  But basically, when you look at the advantages and limitations of each approach (note that I have included development factors in here as well as runtime performance), it looks like an in-memory approach takes the points!?

However, you will notice that a number of these relate to documentation and developer experience etc.  So although for runtime performance we want to use a streaming approach; for development factors, it appears more advantageous (as in development time and complexity) to go the in-memory approach.  I would say that this is generally true, and is the main reason that this is the route most often taken by developers.

That said, once you become experienced at developing with a streaming "mindset", and also once you build up an internal library of re-useable classes to assist you, most of the extra development time is removed and you have the runtime benefits that the streaming approach buys you.

 

So, I have outlined the pros and cons of each approach and also provided a comparison of each approach from both a technical workings perspective and a development perspective.  Stay tuned for the next post where I will start looking into how to technically achieve a streaming approach when developing your own custom pipeline components.

Developing Streaming BizTalk Pipeline Components

I recently presented at the Toronto BizTalk User Group on the topic of "Streaming Pipeline Component Development in BizTalk".  I found this to be a hard topic to present, as the content is possibly easier to absorb when reading it at your own speed and being able to take the time to soak it all in.  It is also a reasonably in-depth topic, and as a result assumes a prior knowledge of BizTalk pipeline design.

Anyway, I plan to blog the content as a series of posts.  I hope these posts will serve to fill in the gaps of an area of BizTalk that is not well known or documented.

These posts will cover:

  • An overview of streaming design & development – benefits and design differences
  • A detailed look into BizTalk pipeline's streaming design, and the underlying classes that BizTalk exposes to provide this
  • A look into BizTalk’s publicly available classes which are useful for pipeline components

My knowledge in this area is a result of developing a set of EDI pipeline components for BizTalk 2004 & 2006 to replace the Base EDI Adapter.  These components were developed prior to BizTalk 2006 R2, and support the original BizTalk 2000 - 2006 EDI schemas.

 

I will update this page with links to the articles once they have been posted.

[Update 28th April 2008 - links below]

Part 1 - Streaming versus In memory comparisons & pros/cons

Part 2 - Execution of custom processing logic differences

Part 3 - Coupling the XmlReader and XmlWriter

Part 4 - Deep dive into the XmlTranslatorStream

Part 5 - Implementing custom logic by sub-classing the XmlReader and XmlTranslatorStream

Robust Error Handling for BizTalk Solutions Presentation

I presented at the Microsoft SOA and Business Process Conference last week in Redmond. The title of the presentation was:

Robust Error Handling for BizTalk Solutions.

I did the presentation once on Thurs Nov 1 and again on Friday Nov 2. A number of people who attended the
presentation were asking for the demo code and powerpoint. The zipped code is here and the powerpoint from the presentation is here. Hopefully when I have more time, I will try to write a more formal blog entry describing some of the techniques for error handling in BizTalk. Also for anybody who attended the conference and missed my presentation, you can catch the recorded presentation on a post conference DVD, that should be mailed out to all attendees in the next month or so.

Bridging SOA & BPM

Slides from "Bridging SOA & BPM" SMART Event.