Reading Time: 10 minutes

This final post in my Mule 4 high scalability blog series is on the Mule 4 SDK.

Mule 4 is fully extensible. The SDK allows you to add modules that are treated as first-class citizens along side the out-of-the-box modules. The SDK exploits the same thread management and streaming configurations described in my previous blog.

SDK landscape

latest report
Learn why we are the Leaders in API management and iPaaS

A module is a collection of independently configurable components: sources, functions, operations, routers and scopes. The components are optionally configurable to accept parameter values particular to their use in a Mule App, and which are applied either at deployment time or at runtime through Dataweave expressions. A component can have multiple configurations or none. A module configuration may require connections to external systems. This is often the case for operations.

Mule 4 SDK

The SDK comes with a Maven archetype to get you started. The above paints a picture of the classes of the Basic module it generates. None of the class names are important. Their association is realized through annotations.

  1. The BasicExtension class defines the basic namespace through the use of annotations. It uses the @Configurations annotation to refer to its optional set of configurations. You can have zero or multiple configurations on the one module.
  2. The BasicConfiguration class has an @Parameter annotated configId field which facilitates unique identification of the configuration for the Mule app to which the module is deployed. External connectivity is optional in modules. The BasicConfiguration does require connectivity and has an @ConnectionProvider annotation naming the BasicConnectionProvider class, which will act as a factory of the BasicConnection class. The class also has an @Operations annotation pointing to the BasicOperations class.
  3. The BasicConnectionProvider class has @Parameter annotated fields that appear in the configuration. You can also use annotations that affect optionality and how Studio is configured.
  4. The BasicOperations class has two public methods. All public methods are separate operations. The signatures of these methods is important to determine which thread pool schedulers are assigned to them and how streaming will work.

Thread pool scheduler assignment criteria for operations

The SDK infers the types of execution of an operation automatically. It sees all operations falling into either blocking or non-blocking categories. So either the CPU_LITE scheduler or the BLOCKING_IO scheduler will be assigned to the operation upon deployment.

  1. If there is no @Connection annotated parameter in the method then the operation is considered non-blocking and the CPU_LITE scheduler will be assigned.
  2. If there is an @Connection annotated parameter in the method then:
    1. If the signature has a void return type and has a CompletionCallback parameter then the operation is considered non-blocking and the CPU_LITE scheduler will be assigned.
    2. If the signature has a return-type then the operation is considered blocking and the BLOCKING_IO scheduler will be assigned.

There is also a method level annotation, eg. @Execution(CPU_INTENSIVE), you can use to manually determine the scheduler that should be assigned. This will override the above automatic inference.  

Example Twitter module

Let’s say we want to build a module that consumes the Twitter streaming API to receive a stream of tweets that match a particular set of phrases but returns only a list of hashtags to the next event processor on the flow.

We can make the hashtags-in-tweets operation non-blocking by setting its return type to void and including a CompletionCallback in its parameter list. We’ll use this to send the hashtags back.

Twitter streaming API

The operation implementation is worth a glance as we can utilize the Spring Reactor SDK we described in the beginning. It subscribes to tweets matching the phrases parameter, then creates a new flux of hashtags found in those tweets for 10 seconds, removes duplicates, sorts them and returns them as a List.

public void hashtagsInTweets(@Config TwitterConfiguration configuration, 
@Connection TwitterConnection connection, String phrases, CompletionCallback, Void> callback) {
        connection.track(phrases) // flux of tweets
.doOnError(e -> callback.error(e))
            .map(tweet -> tweet.getText()) // to flux of their texts
            .flatMap(text -> { // to flux of hashtags in texts
                List allMatches = new ArrayList();
                Matcher m = HASHTAG_PATTERN.matcher(text);
                while (m.find()) allMatches.add(m.group());
                return Flux.fromIterable(allMatches);
            })
            .take(Duration.ofSeconds(configuration.getDuration())) // limit subscription to x seconds
            .distinct() // to flux of only unique hashtags
            .sort() // to flux of sorted hashtags
            .collectList() // to list
            .subscribe(list -> callback.success(Result., Void>builder().output(list).build()));
    }

The operation is assigned the CPU_LITE scheduler. Note how the callback builds a response using Result.builder() inside the function you pass to the .subscribe() method. This will be executed asynchronously when the response comes back from Twitter. The thread it executes on will be determined by the library you choose to make the HTTPS call to Twitter.

HTTPS call to Twitter

Binary and object streaming

All operations which have InputStream parameters or return an InputStream can exploit the binary streaming configurations described above. To exploit Object streaming in your operation you need to return a PagingProvider<Connection, String> interface so the next event processor on the flow can call its getPage() method. The following module consumes the CKan API. The important detail here is that it keeps track of the pageIndex across the getPage() calls. When getPage() returns an empty List it won’t be invoked anymore.

public class CKanOperations {
@MediaType(value = ANY, strict = false) 
public PagingProvider packageList() {
return new PagingProvider() {
private int pageIndex = 0;
@Override
public List getPage(@Connection CKanConnection connection) {
List packages = null;
packages = connection.packageList(this.pageIndex);
this.pageIndex += packages.size();
return packages;
}

@Override 
public void close(CKanConnection connection) throws MuleException {}

@Override
public Optional getTotalResults(CKanConnection arg0) {
return Optional.empty();
}

@Override
public boolean useStickyConnections() {
return false;
    }
}

Learn more

Mule is highly scalable both horizontally and vertically. Automatic tuning of deployed Mule applications allows them to execute in a non-blocking fashion that exploits all resources available to the Mule runtime. Threads are assigned to the application in a way that maximizes concurrency. Memory and disk are utilized in a way that facilitates stream processing of larger than memory payloads. All of these efficiencies are fully exploitable in the Mule SDK if you wish to author your own modules. Try Mule 4 today to see for yourself how to address vertical scalability in an innovative and effective way.