Developing Streaming Pipeline Components - Part 5

In the Part 4 I covered the XmlTranslatorStream in detail, explaining how it provides powerful coupling of an XmlReader and XmlWriter as it is read.  This revealed how BizTalk manages to execute a pipeline of components in an efficient way - because it is actually reading a series of wrapped streams added by each component in the pipeline.  However, what the last post did not disclose was how we actually perform any useful processing now that we know how to process data in a streaming mode.

This post I will cover how we can achieve this streaming based processing of data - and give some examples of how and where BizTalk manages to do this with its out of the box components.

...

As I eluded to towards the end of Part 4, processing data through the XmlTranslatorStream is pointless if you are simply providing an XML inbound stream, and reading from it an exact XML outbound stream, without performing some other processing function.  All this is achieving is slowing down the movement of data as it moves through the BizTalk streams by adding an extra layer of overhead.  However, because both the XmlReader and XmlTranslatorStream classes are very extensible, we can provide our own custom implementations of these to perform our processing.

In general, you have two main extensibility options:

  1. Subclass XmlReader and perform your custom processing in there
    • Then pass your custom XmlReader to the XmlTranslatorStream as its source of data
    • Then set the XmlTranslatorStream as the message part's data
  2. Subclass XmlTranslatorStream and perform your custom processing in there
    • Then set your subclass of XmlTranslatorStream as the message part's data

Note that you can also perform a combination of both, but in most cases I think you would only really need to do one.

BizTalk ships with out of the box components that perform functionality such as:

  • XML Data validation
  • Data conversion (XML/Flat File)
  • Property promotion/demotion
  • Data manipulation (e.g. property demotion modifies the outbound data)

All of these pieces of functionality are performed efficiently using a streaming approach - using a mixture of the extensibility options above to perform this processing logic.  Let's look at implementing some custom functionality, and placing the same logic in both a custom XmlReader and custom XmlTranslatorStream, to compare how each implementation would work.

Before we look at the code, I will go over some more details of how the XmlTranslatorStream works.  When XmlTranslatorStream's ProcessXmlNodes method is called (see Part 4 for a detailed look at when this is called), it then advances its internal XmlReader and calls the TranslateXmlNode method.  This method essentially performs a check on the current NodeType under the reader, and delegates to the appropriate TranslateXxx virtual method accordingly.  It provides a TranslateXxx method for every type of XML node, which is called when the corresponding node type is being copied from the XmlReader to the XmlWriter (see Part 3 for sample code on using the XmlReader.NodeType property to allow XmlReader to XmlWriter data duplication).

By providing all of the TranslateXxx methods as virtual, the XmlTranslatorStream allows us to easily intercept the nodes & values that are written from the XmlReader to the XmlWriter.  By allowing this, we can easily perform modification of the data as it is being written.  We can easily choose to write our own values to the XmlWriter instead of the ones that were provided by the XmlReader, or we can choose not to write any data at all - essentially removing the data from the stream.  This extensible design of the XmlTranslatorStream gives us very granular control over the output data.

Now, back to an example!  BizTalk ships with the NamespaceTranslatorStream class, which performs a streaming replacement of a specified XML namespace, given the old namespace to find and the new namespace to replace it with.  However, this stream is limited to only updating a single namespace per instance of the stream.  If you need to replace more than one namespace, you need to wrap each NamespaceTranslatorStream with another NamespaceTranslatorStream and give each one separate namespaces to replace.  This causes more overhead each layer of stream you add (note that NamespaceTranslatorStream derives from XmlTranslatorStream), and also makes the code that sets it up more verbose.  What would be nicer would be to have a single class that performs namespace replacements for any number of namespaces.  This can be easily achieved by creating our own subclass of XmlTranslatorStream.

Take a look at the following custom implementation of XmlTranslatorStream:

/// <summary>
/// Allows the streaming replacement of multiple namespaces.
/// Note: This class could be easily enhanced to also update the prefix being written (if the ns0 prefixes that BizTalk generates are not wanted!).
/// </summary>
public class MultiNamespaceTranslatorStream : XmlTranslatorStream
{
    private Dictionary<string, string> _namespaceReplacements;

    public MultiNamespaceTranslatorStream(XmlReader reader) : base(reader) 
    {
        Initialise();
    }

    public MultiNamespaceTranslatorStream(XmlReader reader, Encoding encoding) : base(reader, encoding) 
    {
        Initialise();
    }

    public MultiNamespaceTranslatorStream(XmlReader reader, Encoding encoding, MemoryStream outputStream) : base(reader, encoding, outputStream) 
    {
        Initialise();
    }

    public Dictionary<string, string> NamespaceReplacements
    {
        get { return _namespaceReplacements; }
        set { _namespaceReplacements = value; }
    }

    private void Initialise()
    {
        _namespaceReplacements = new Dictionary<string, string>();
    }

    protected override int ProcessXmlNodes(int count)
    {
        // This method is called when the buffer in the base stream cannot fulfill the amount of data being requested
        // This in turn causes the reader to advance, and the data from the reader to be written to the writer & underlying buffer
        return base.ProcessXmlNodes(count);
    }

    /// <summary>
    /// Override to replace the namespaces of attributes being translated.
    /// </summary>
    protected override void TranslateAttributeValue(string prefix, string localName, string nsURI, string val)
    {
        // Handle the default namespace declaration (has no prefix). Eg. xmlns="http://mynamespace"
        if (((localName == "xmlns") && (nsURI == "http://www.w3.org/2000/xmlns/")) && (NamespaceReplacements.ContainsKey(val)))
        {
            // Replace the ns with the one configured
            base.TranslateAttributeValue(prefix, localName, nsURI, NamespaceReplacements[val]);
        }
        // Handle non-default namespace declarations. Eg. xmlns:ns0="http://mynamespace"
        else if (((prefix == "xmlns") && (nsURI == "http://www.w3.org/2000/xmlns/")) && (NamespaceReplacements.ContainsKey(val)))
        {
            // Replace the ns with the one configured
            base.TranslateAttributeValue(prefix, localName, nsURI, NamespaceReplacements[val]);
        }
        else
        {
            // Does not match a namespace configured to be replaced
            base.TranslateAttributeValue(prefix, localName, nsURI, val);
        }
    }

    /// <summary>
    /// Override to replace the namespaces of elements being translated.
    /// </summary>
    protected override void TranslateStartElement(string prefix, string localName, string nsURI)
    {
        if (NamespaceReplacements.ContainsKey(nsURI))
        {
            // Replace the ns with the one configured
            base.TranslateStartElement(prefix, localName, NamespaceReplacements[nsURI]);
        }
        else
        {
            // Does not match a namespace configured to be replaced
            base.TranslateStartElement(prefix, localName, nsURI);
        }
    }
}

Note that the key methods to override for performing this particular task are the TranslateAttributeValue, which is called by the XmlTranslatorStream when an Attribute is being copied from its internal XmlReader over to the XmlWriter; and TranslateStartElement - called when the start of an Element is being copied.  Also note that this class could be easily enhanced to also replace the prefix being written - for example if the ns0 style prefixes that BizTalk generates are not the prefixes that are needed.

This can be easily tested with the following code.  Ensure that your sample file contains multiple namespaces to replace, and they match the ones configured:

public void TranslateData_CustomTranslatorStream()
{
    using(Stream output = new FileStream("MyOutputXmlFile.xml", FileMode.Create, FileAccess.Write, FileShare.Read))
    {
        Dictionary<string, string> nsReplacements = new Dictionary<string, string>();
        // Set the replacement namespaces
        nsReplacements.Add("http://originalNS1", "http://newwackynamespace.com");
        nsReplacements.Add("http://originalNS2", "http://anotherwackynamespace.com");
        nsReplacements.Add("http://originalNS3", "http://runoutofwackynames.com");
    
        using (XmlReader reader = XmlReader.Create("MyInputXmlFile.xml"))
        {
            using (MultiNamespaceTranslatorStream stream = new MultiNamespaceTranslatorStream(reader))
            {
                stream.NamespaceReplacements = nsReplacements;
                int value;
                while ((value = stream.ReadByte()) != -1)
                    output.WriteByte((byte)value);
            }
        }
    }
}

Now compare this to what you would have had to do to use BizTalk's built in NamespaceTranslatorStream:

/// <summary>
/// Translate a document with multiple namespaces using the NamespaceTranslatorStream. Wrap a NamespaceTranslatorStream per namespace to translate.
/// </summary>
public void Translate_MultipleNamespaces()
{
    using (Stream output = new FileStream("MyOutputXmlFile.xml", FileMode.Create))
    {
        using (Stream fileStream = new FileStream("MyInputXmlFile.xml", FileMode.Open, FileAccess.Read))
        {
            // Provide the old and replacement namespace for the first namespace.
            using (Stream stream1 = new NamespaceTranslatorStream(new PipelineContext(), fileStream, "http://originalNS1", "http://newwackynamespace.com"))
            {
                // Wrap the first stream with another NamespaceTranslatorStream for the second namespace.
                using (Stream stream2 = new NamespaceTranslatorStream(new PipelineContext(), stream1, "http://originalNS2", "http://anotherwackynamespace.com"))
                {
                    using (Stream stream3 = new NamespaceTranslatorStream(new PipelineContext(), stream2, "http://originalNS3", "http://runoutofwackynames.com"))
                    {
                        int value;
                        // Make sure the outer stream is the one being read!
                        while ((value = stream3.ReadByte()) != -1)
                            output.WriteByte((byte)value);
                    }
                }
            }
        }
    }
}

So, this simple custom class that is less than 100 lines of code long should help you start to see how real world problems can be solved using a streaming approach.  This small class allows you to avoid loading the entire XML message into the DOM and updating via the inefficient in memory approach.

Now let's take a look at how we could have achieved the same result by utilising a custom XmlReader.  The concept is similar in that we are changing the view of data at specific points - in this example, when we see data that is using a namespace we need to change.  However, because the XmlReader exposes its view of the data via properties, rather than specific virtual methods augmented by parameters, this time we want to create an XmlReader that changes the view of data exposed by its properties.

This is performed by the custom XmlReader in the following code.  Note that if this were to be used within BizTalk, this XmlReader would then be passed to the constructor of the XmlTranslatorStream:

public class MultiNamespaceTranslatorReader : XmlReader
{
    private bool _inAttribute;
    private XmlReader _wrappedReader;
    private Dictionary<string, string> _namespaceReplacements;

    public MultiNamespaceTranslatorReader(XmlReader wrappedReader) : this(wrappedReader, null)
    { }

    public MultiNamespaceTranslatorReader(XmlReader wrappedReader, 
        Dictionary<string, string> namespaceReplacements)
    {
        Initialise(wrappedReader, namespaceReplacements);
    }

    public Dictionary<string, string> NamespaceReplacements
    {
        get { return _namespaceReplacements; }
        set { _namespaceReplacements = value; }
    }

    protected XmlReader WrappedReader
    {
        get { return _wrappedReader; }
        set { _wrappedReader = value; }
    }

    protected bool InAttribute
    {
        get { return _inAttribute; }
        set { _inAttribute = value; }
    }

    public override int AttributeCount
    {
        get { return WrappedReader.AttributeCount; }
    }

    public override string BaseURI
    {
        get { return WrappedReader.BaseURI; }
    }

    public override void Close()
    {
        WrappedReader.Close();
    }

    public override int Depth
    {
        get { return WrappedReader.Depth; }
    }

    public override bool EOF
    {
        get { return WrappedReader.EOF; }
    }

    public override string GetAttribute(int i)
    {
        return WrappedReader.GetAttribute(i);
    }

    public override string GetAttribute(string name, string namespaceURI)
    {
        return WrappedReader.GetAttribute(name, namespaceURI);
    }

    public override string GetAttribute(string name)
    {
        return GetAttribute(name);
    }

    public override bool HasValue
    {
        get { return WrappedReader.HasValue; }
    }

    public override bool IsEmptyElement
    {
        get { return WrappedReader.IsEmptyElement; }
    }

    public override string LocalName
    {
        get { return WrappedReader.LocalName; }
    }

    public override string LookupNamespace(string prefix)
    {
        // Check if the namespace for this prefix has been replaced
        string ns = WrappedReader.LookupNamespace(prefix);
        if (NamespaceReplacements.ContainsKey(ns))
            return NamespaceReplacements[ns];
        else
            return ns;
    }

    public override bool MoveToAttribute(string name, string ns)
    {
        InAttribute = WrappedReader.MoveToAttribute(name, ns);
        return InAttribute;
    }

    public override bool MoveToAttribute(string name)
    {
        InAttribute = WrappedReader.MoveToAttribute(name);
        return InAttribute;
    }

    public override bool MoveToElement()
    {
        InAttribute = !WrappedReader.MoveToElement();
        return !InAttribute;
    }

    public override bool MoveToFirstAttribute()
    {
        InAttribute = WrappedReader.MoveToFirstAttribute();
        return InAttribute;
    }

    public override bool MoveToNextAttribute()
    {
        InAttribute = WrappedReader.MoveToNextAttribute();
        return InAttribute;
    }

    public override XmlNameTable NameTable
    {
        get { return WrappedReader.NameTable; }
    }

    public override string NamespaceURI
    {
        get 
        { 
            if(NamespaceReplacements.ContainsKey(WrappedReader.NamespaceURI))
                return NamespaceReplacements[WrappedReader.NamespaceURI];
            else
                return WrappedReader.NamespaceURI;
        }
    }

    public override XmlNodeType NodeType
    {
        get { return WrappedReader.NodeType; }
    }

    public override string Prefix
    {
        get { return WrappedReader.Prefix; }
    }

    public override bool Read()
    {
        return WrappedReader.Read();
    }

    public override bool ReadAttributeValue()
    {
        return WrappedReader.ReadAttributeValue();
    }

    public override ReadState ReadState
    {
        get { return WrappedReader.ReadState; }
    }

    public override void ResolveEntity()
    {
        WrappedReader.ResolveEntity();
    }

    public override string Value
    {
        get
        {
            // When reading an attribute (i.e. InAttribute is true), the Value property will
            // contain the actual namespace of the attribute.
            // E.g. For attribute: xmlns:ns0="http://mynamespace", Value = "http://mynamespace"
            if (InAttribute && NamespaceReplacements.ContainsKey(WrappedReader.Value))
                return NamespaceReplacements[WrappedReader.Value];
            else
                return WrappedReader.Value;
        }
    }

    private void Initialise(XmlReader wrappedReader, Dictionary<string, string> namespaceReplacements)
    {
        if (wrappedReader == null)
            throw new ArgumentNullException("wrappedReader");

        _wrappedReader = wrappedReader;

        if (namespaceReplacements == null)
            _namespaceReplacements = new Dictionary<string, string>();
        else
            _namespaceReplacements = namespaceReplacements;
    }
}

Note that our custom XmlReader just becomes a special type of "filter" over an existing XmlReader, which we wrap.  We can then delegate any calls directly down to the wrapped XmlReader if it is dealing with data that we are not concerned with.  Otherwise, we can intercept the call to the wrapped implementation, and return our own data instead.

This can be tested with the following code:

public void TranslateData_CustomTranslatorReader()
{
    Dictionary<string, string> nsReplacements = new Dictionary<string, string>();

    nsReplacements.Add("http://CBR.CBRInputSchema", "http://newwackynamespace.com");
    nsReplacements.Add("http://secondnamespace", "http://anotherwackynamespace.com");
    nsReplacements.Add("http://thirdnamespace", "http://runoutofwackynames.com");

    XmlWriterSettings settings = new XmlWriterSettings();
    settings.OmitXmlDeclaration = true;
    settings.Indent = true;
    using(XmlWriter writer = XmlWriter.Create(_output, settings))
    {
        using (XmlReader reader = XmlReader.Create(Constants.SampleFilePath_MultiNamespace))
        {
            using (XmlReader nsTransReader = new MultiNamespaceTranslatorReader(reader, nsReplacements))
            {
                // WriteNode writes out EVERY node from the reader to the writer
                // However, there is no way to get it to write just a SINGLE node
                writer.WriteNode(nsTransReader, true);
            }
        }
    }
}

After seeing the two different approaches, hopefully you can now see the how each allows for the custom processing of data by BizTalk by intercepting the inbound data as it is written back out.  I personally believe that subclassing the XmlReader is the better approach, because the XmlReader was designed as a "view" of XML data, and it allows clients to see your modified view in an XML sense, as it is read.  By subclassing the XmlTranslatorStream, you are changing the output stream of data for the next person who reads that stream.  They then have to create their own XmlReader to read over that stream anyway, which you could have done for them already just by implementing your logic as an XmlReader.  The main issue I believe with the XmlReader approach is that you need to have an understanding of the operations it provides, as you may be impacting how they need to function.  Take my custom XmlReader as an example - I needed to override the LookupNamespace method, otherwise it would have behaved incorrectly.  However, this may not be immediately obvious.

Well, that pretty much completes this series of 5 posts detailing how to develop streaming pipeline components.  I hope that from Part 1 through to this fifth one I have managed to fill in a number of gaps in the "art" of developing streaming pipeline components, and save you the many hours that I spent investigating this technique.  From what I have found, there is very little documented both officially and in blogs about how to develop proper streaming components, rather than the why, and also very little detail on what already comes with BizTalk that you can easily take advantage of.  There are only a handful of posts that I could find indicating that others are aware of what you get out of the box in this area, and that they understand how to use them - see here, here and here .  There are many more classes that ship with BizTalk which aid in developing your streaming components, which I have not yet covered.  These include the classes that implement the features I pointed out at the top such as validation and property promotion & demotion.  I plan to cover many of these in detail in future posts.  However, they follow the pattern which I have described in the last 5 posts (most are implemented as custom XmlReaders or custom XmlTranslatorStreams), and as a result I will not add them as posts under the banner of this series.  They are more or less a follow-on to this series.  Over and out!