fredag 5 juli 2013

RESTful MongoDB and Mule ESB Community Edition pattern

There are some alternative ways to go when RESTifying your flows in Mule. However they are poorly described in manuals and forums and the components and tools available are not very active projects in the community. Struggling with Mule's REST router and other alternatives I settled for the built in Jersey way of doing it. However all examples from the Mule documentation and forums describe only how to enable REST components to your Http endpoint and return directly. The do not describe how to integrate the RESTful endpoint with your Mule ESB integration flows and hence take advantage of Mule ESB's other powerful integration abilities.

Here is how I do it:

I use a Http endpoint or Https if secure and couple it with a REST element like this:
 <jersey:resources doc:name="REST">  
      <component class="se.redpill.mulecomponents.mongo.RedpillRESTMongo"/>  

The actual REST component is just plain Jersey Java, like this:
      public Map<String, Object> getQueryByName(@PathParam("singlelement") String name )   
           Map<String, Object> query = new HashMap<String, Object>();  
           query.put("MyCoolDocument.Type", "Cool");  
           query.put("MyCoolDocument.Unit", name);  
           query.put("Year", new Integer(2012));  
           return query;  

All fine. Now we use a Choice element to check on the inbound property if the request is
'http.method' GET, POST, PUT or DELETE and route each choice to its corresponding component. In my case I have a mapping to my own MongoDB components which are in the same format as my last blogpost.

Now to the real HACK to get this working:

If you tried something like this and failed you have probably seen that the object passed from the REST element  to your component is actually
which is a pain to handle. But the payload can be casted to a
org.mule.api.transport.OutputHandler since its implementation comes from

To avoid having to serialize / deserialize your own return value (in my case a Map) from your REST service to your next element (in my case my MongoDB query component) with casting techniques describe above you can instead do it in a much sager manner using the following statement:

           ContainerResponse cr = (ContainerResponse) eventContext.getMessage().getInvocationProperty("jersey_response");  
           Map<String, Object> map = (Map<String, Object>)cr.getResponse().getEntity();  

VOILA! We now have our Map back that we sent from our REST component.
My MongoDB Query component and all the others for PUT, POST and DELETE are implemented in this fashion like this:

 package se.redpill.mulecomponents.mongo;  
 import java.util.Map;  
 import org.mule.api.MuleEventContext;  
 import org.mule.api.lifecycle.Callable;  
 import com.mongodb.BasicDBObject;  
 import com.mongodb.DBCursor;  
 import com.mongodb.DBObject;  
 import com.sun.jersey.spi.container.ContainerResponse;  
 public class QueryMongoComponent extends AbstractMongoComponent implements Callable  
      public Object onCall(MuleEventContext eventContext) throws Exception {  
           ContainerResponse cr = (ContainerResponse) eventContext.getMessage().getInvocationProperty("jersey_response");  
           Map<String, Object> map = (Map<String, Object>)cr.getResponse().getEntity();  
           DBObject queryFields = new BasicDBObject(map);  
        String json = "[";  
          DBCursor cursor = db.getCollection("mycollection").find(queryFields);  
             try {  
               while(cursor.hasNext()) {  
                 json += + ",";  
             } finally {  
               json = json.substring(0,json.length()-1) + "]";  
        return json;  

And that's it!

REST enabled MongoDB with the power of Mule ESB CE integration flows at hand!

MongoDB with Mule ESB Community Edition integration tips

Integrating MongoDB with Mule ESB Community Edition might sound trivial and in some ways it is with the built in cloud connector and transformer. However documentation is not flawless, the user interfaces not very intuitive and last but not least the functionality is very limited compared to what you get using the MongoDB Java libraries as an API.

To demonstrate this I'm going to use a simple use case.

Lets say you wanted to populate your MongoDB with the data  from a CSV but not by means of CSV row -> BSON document (as with the mongoimport tool) but rather a group of CSV rows -> BSON document to get more document like structures in your MongoDB collections. Remember that performance wise it is better to do operations inside of a MongoDB document than on many MongoDB operations. That said you should not use very big documents either.
After population is done you want to do some aggregations on specific fields in the documents.

Getting the data in

The problem here is that you do not want the data to be inserted row by row but grouped by and but based on the in data. The second issue is that not all fields in the CSV should be treated as Strings but might be numbers and that is very crucial when it comes to aggregating the values.
The mongoimport tool handles the second issue well , but Mule doesn't.

Two ways to go really since we cannot use the fancy Datamapper from Mule ESB Enterprise edition.

1) You could do this by using Mule ESB File endpoints listening for CSV's and then use Mule's flow controls like splitters, choice, different filters, Object to JSON.

2) The other way is to simply implement your own transformer extending the AbstractMessageTransformer and in the transformMessage method use some JSON library like Jackson or org.json and divide the CSV into JSON documents yourself.

To use an non-built in JSON library like org.json simply add it to your Maven pom as


If you use Jackson with the CSV extension you can also build a CSV schema based on a POJO that describes each fields value type and hence parse the CSV based on that. I found Jackson to be a bit overkil though for simple operations. If you use org.json and the built in CDL.toJSONArray method you will end up with the same issue with unknown types getting everyting treated as Strings as in the first Mule solution. 

You do not want to end up with documents containing:

 { "This is really a value of numbers" : "214245","This is a value of String" : "Hi there 123" }  

If you do there is a little hack you could use in both case 1) With Mule ESB's Regex filter or in case 2) With Javas String method replaceAll(). I will show you the Java version for clarity:

 String good_json_data = baad_json_data.replaceAll("(\"([0-9]+)(\\.[0-9]+)?\")+", "$2$3");  

This regular expression search out all fields in your JSON and replaces those that actually are numbers with their numeric representation (without quotes).

Now you can pass your JSON data as payload for the next Mule flow item either directly to a MongoDB connector element with insert or your own MongoDB Java component.

Your flow might look something like this depending on which way you choose to attack the issue.

Using the latest Mongo Java driver

If your plan is (like in our use case) to use aggregate functionality on your BSON documents and you are not very fond of writing tons of map / reduce function code into tiny text fields in Mule's MongoDB connectors user interface you will have to come up with something else than Mule's built in MongoDB support. Also be aware that MongoDB's excellent aggregation framework is not even included in the version of the Java driver that Mule uses.

Again you need to edit your Maven pom.


Also after this you need to remove any MongoDB references you might have in the namespaces declarations in your xml configuration our you will end up with very strange error messages because of conflicting libraries.

Now we have access to the sweet com.mongodb.MongoClient (by the way handles connection pooling for you) object which lets us write our own Java components to connect and query MongoDB.

For example the MongoDB insert component from the flow image above simply takes the JSON payload from the element before and stores it straight into MongoDB, sweet.

 package se.redpill.mulecomponents.mongo;  
 import org.bson.types.ObjectId;  
 import org.mule.api.MuleEventContext;  
 import org.mule.api.lifecycle.Callable;  
 import com.mongodb.DBObject;  
 import com.mongodb.WriteConcern;  
 import com.mongodb.util.JSON;  
 public class InsertMongoComponent extends AbstractMongoComponent implements Callable{  
      public Object onCall(MuleEventContext eventContext) throws Exception {  
           Object payload = eventContext.getMessage().getPayload();  
           DBObject thedata = (DBObject) JSON.parse((String) payload);  
           db.getCollection("mycollection").insert(thedata, WriteConcern.SAFE);  
        ObjectId id = (ObjectId) thedata.get("_id");  
     if (id == null) return null;  
     return id.toStringMongod();  

The AbstractMongoComponent extended simply holds the db connection details and are instantiated from the xml configuration like:

        <spring:bean id="mongoDb" class="com.mongodb.MongoClient" scope="singleton">  
          <spring:constructor-arg index="0" type="java.lang.String" value="localhost"/>       
          <spring:constructor-arg index="1" type="int" value="27017"/>  
        <spring:bean id="aggregateMongo" class="se.redpill.mulecomponents.mongo.AggregateMongoComponent" scope="singleton" init-method="init">  
             <spring:property name="mongoDb" ref="mongoDb"/>  
             <spring:property name="dbName" value="mydatabase"/>  
        <spring:bean id="insertMongo" class="se.redpill.mulecomponents.mongo.InsertMongoComponent" scope="singleton" init-method="init">  
             <spring:property name="mongoDb" ref="mongoDb"/>  
             <spring:property name="dbName" value="mydatabase"/>  

And when using the instance:

  <foreach doc:name="For Each">  
          <component doc:name="MongoDB insert">  
                  <spring-object bean="insertMongo"/>  


Getting aggregated data out

Now how to get data out? Well simply enough you just write a QueryMongoComponent in the same style as the InsertMongoComponent above and reference it from the xml in the same way.

But in our case we wanted to use MongoDB's aggregation framework. Same thing. Simply write a component that uses the latest Java drivers aggregation functionality. Like in this example:
 package se.redpill.mulecomponents.mongo;  
 import java.util.Map;  
 import org.mule.api.MuleEventContext;  
 import org.mule.api.annotations.param.OutboundHeaders;  
 import org.mule.api.annotations.param.Payload;  
 import org.mule.api.lifecycle.Callable;  
 import com.mongodb.AggregationOutput;  
 import com.mongodb.BasicDBObject;  
 import com.mongodb.CommandResult;  
 import com.mongodb.DBObject;  
 public class AggregateMongoComponent extends AbstractMongoComponent implements Callable{  
      public String aggregateObject() {  
        // create our pipeline operations,   
        // build the $projection operation  
        DBObject fields = new BasicDBObject("OurCoolDocument.SpecificValue", 1);  
        fields.put("OurCoolDocument.Unit", 1);  
        fields.put("_id", 0);  
        DBObject project = new BasicDBObject("$project", fields );  
        // now unwind  
        DBObject unwind = new BasicDBObject("$unwind", "$OurCoolDocument");  
        // Now the $group operation  
        DBObject groupFields = new BasicDBObject( "_id", "$OurCoolDocument.Unit");  
        groupFields.put("average", new BasicDBObject( "$avg", "$OurCoolDocument.SpecificValue"));  
        DBObject group = new BasicDBObject("$group", groupFields);  
        // Now the $match operation  
        DBObject match = new BasicDBObject("$match", new BasicDBObject("average", new BasicDBObject( "$gte", 93)) );  
        // run aggregation  
        AggregationOutput output = db.getCollection("mycollection").aggregate( project,unwind,group, match);  
        return output.toString();  
      public Object onCall(MuleEventContext eventContext) throws Exception {  
           return aggregateObject();  

You can of course  pass all the aggregation parameters on the Mule message to the component from the element before.

The results might look like this:

 serverUsed: "localhost/",  
 result: [  
 _id: "My special",  
 average: 93.55555555555556  
 _id: "Another special one",  
 average: 96.66666666666667  
 _id: "Whats so special?",  
 average: 93.77777777777777  
 _id: "Special for you my friend!",  
 average: 96.88888888888889  
 ok: 1  

Best of luck!