Data Synchronizing made easy with Watermarks

December 18 2013

4 comments 0

We’ve all been there. Sooner or later, someone asks you to periodically synchronize information from one system into another. No shame to admit it, it happens in the best families. Such an integration should start with getting the objects that have been modified since the last sync (or all of them in the case of the very first sync). Sure, this first part sounds like the easiest of all the sync process (and in some cases it actually is), but it’s not without its complexity. Step by step you need to:

  • Go into a persistent store and grab the timestamp for the last update
  • If this is the first update, then you need to use a default value
  • Go into the source system and get all the objects that have been updated since that timestamp
  • Update the persistent store with the timestamp that should be used on the next sync. Notice that because the server time on your machine probably differs from the clock on the source system, you can’t simply use a value like “server.dateTime”. Instead, you need to select the greatest timestamp in the dataset you just got
  • Finally, you need to have logic not to update the timestamp if the sync failed

Not so simple after all isn’t it? Let’s take a look at a sample flow that does this in :

This is how it would look like in XML:

<flow name="syncFlow">
  <poll doc:name="Poll">
    <fixed-frequency-scheduler frequency="1" timeUnit="HOURS"/>
    <logger message="Starting poll at #[dateTime()]" doc:name="Logger"/>
  <enricher target="#[flowVars['hasTimestamp']]">
    <objectstore:contains config-ref="ObjectStore" key="timestamp"/>
  <choice doc:name="Choice">
  	<when expression="#[flowVars['hasTimestamp']]">
    		<enricher target="#[flowVars['timestamp']]">
    			<objectstore:retrieve config-ref="ObjectStore" key="timestamp" />
          <set-variable variableName="timestamp" value="#[server.dateTime.format(&quot;yyyy-MM-dd'T'HH:mm:ss.SSS'Z'&quot;)]" />
    <sfdc:query config-ref="" query="select Id from Contact where LastModifiedDate &amp;gt; #[flowVars['timestamp']]" />
    <flow-ref name="selectNewTimestamp" />
    <flow-ref name="doYourSyncMagic" />
    <objectstore:store value-ref="#[flowVars['updatedTimestamp']]" key="timestamp" config-ref="ObjectStore" />
At Mulesoft we believe that this use case is generic enough to deserve a simpler solution. The flow above is way too complex and we haven’t even done the sync logic yet! So, we came out with the concept of Watermark.


The concept of Watermark refers to a flood after-match in which you look at the water stains in a wall to figure how high the water got, which is pretty much what we want to do in this use case: figure out which was the last item we updated and move from there on. What does watermark do for you?

  • It will automatically handle the ObjectStore, you don’t need to worry about that anymore
  • It will take care of checking if a watermark value already exists
  • It will help you get the next value
  • It will update the value when the flow is completed and will leave it untouched if it fails.
  • It will work with any type of List, Collection, Iterable or Iterator (including auto-paging ones)

So, let’s take a look at how the same flow looks using watermark: Way simpler isn’t it? What where’s the magic? Where did the behaviour go? The answer is the poll element. Let’s take a look:

As you can see the poll element now has a watermark element that implements the same behaviour we saw in the first example but in a declarative way:

  • First we provided the name of the flowVar that will hold the value
  • Then we provide the default value expression, in case the component cannot find a value for the watermark. This is the first big gain: we don’t have to worry about the ObjectStore’s state
  • Then we choose a selector, which is the criteria we want to use to pick the next value. There’re four available selectors: MIN, MAX, FIRST and LAST. In our case, we want the greatest LastModifiedDate so we’ll choose MAX
  • Finally, we entered a selector expression. This expression works in tandem with the selector by being executed on each object the salesforce query returned. The selector collects the return values and selects the right value
  • Optionally you can specify which object store you want to use but you don’t have to. Mule will select the user object store automatically.

If the flow ends without errors, watermark will automatically update the object store with the new value. Yeah! Go away complexity! Look how much more compact the XML looks like:

Advanced use cases

In our experience, the four available selectors pretty much fit most of the use cases. If your use case requires custom logic to determine the new value, you can also provide your own update expression which will be evaluated once at the end of the flow.


Watermark is a tool to simplify querying for updated objects, which is a very common use case when synchronizing data. This feature is available in the latest Studio. What do you think of it? Thanks for reading!

We'd love to hear your opinion on this post

4 Responses to “Data Synchronizing made easy with Watermarks”

  1. Great information. Just a quick FYI, for use against SFDC, set the default time to something else other than server time otherwise the changes may not be picked up.

    i.e. instead of:


    Can do #[‘2014-01-01T00:00:00Z’] for example (or some other values depending on first run use case requirement.)

  2. This is a perfect fit for a few of the projects I am implementing. Unfortunately, I am deploying to an on-premise mule application server running on the 3.4.0 runtime. When will this feature be available in an on-premise release?

    • Hello Matt,

      Thank you or the great feedback! Mule 3.5.0 will be available for on-premise users in April 2014. However, there’s a beta program starting between february and march.


  3. Thanks Mariano for the quick feedback. Good to hear 3.5.0 is close.