March 23, 2014

A practical comparison of Map-Reduce in MongoDB and RavenDB

Over the past week I’ve been learning about MongoDB. In my adventures with Node.js I am now at the point where I need some kind of data store.

I’ve found MongoDB fairly easy to grasp due to their excellent documentation and that I have experience with RavenDB. Therefore many of the concepts surrounding NoSQL/Document databases are already familiar to me.

I thought it would be interesting to do a practical comparison of Map-Reduce in both MongoDB and RavenDB.

To begin with I created a bunch of “blog post” documents in both database with the following structure:

"title" : "ASP.NET Identity Stripped Bare - MVC",
"tags" : [
	"ASP.NET",
	"ASP.NET MVC",
	"Security"
]

The goal is to use Map-Reduce to generate a list of tags with a count of their usage e.g.

{
	"tag" : "DDD",
	"count" : 1,
},

{
	"tag" : "OWIN",
	"count" : 2,
},

{
	"tag" : "Security",
	"count" : 1,
}

We could use this data to generate something fancy like a tag cloud on a web site (like it’s 2005 baby).

MongoDB

One of my favourite things about MongoDB so far is the JavaScript API and hacking away in the mongo shell. Using a dynamic language for unstructured/dynamic data is quite advantageous.

Map-Reduce (JavaScript API)

To begin with I define two functions; one for map and one for reduce.

The map function projects (or emits) each tag:

var map = function() {
  this.tags.forEach(function(tag) {
    emit(tag, {
      count: 1
    });
  });
}

emit takes a key (used to reduce the data) and a value.

It’s important to note that MongoDB only calls the reduce function for those keys that have multiple values. Therefore the value you emit should mirror the return value from the reduce function.

This through me at first as I did not understand why I was not getting any data back for tags that have only been used once and is the reason why I emit count: 1 above.

The reduce function aggregates the results and is called for each unique key in the result set:

var reduce = function(key, values) {
  var totalCount = 0;

  values.forEach(function(value) {
    totalCount++;
  });

  return {
    count: totalCount
  }
};

To execute the Map-Reduce function and output to the shell we call:

db.posts.mapReduce(map, reduce, { out: { inline: 1 }})

This produces the following results:

{
	"results" : [
		{
			"_id" : "OWIN",
			"value" : {
				"count" : 3
			}
		},
		{
			"_id" : "Security",
			"value" : {
				"count" : 1
			}
		},
		{
			"_id" : "foo",
			"value" : {
				"count" : 1
			}
		},
		{
			"_id" : "jQuery",
			"value" : {
				"count" : 2
			}
		}
	],
	"timeMillis" : 35,
	"counts" : {
		"input" : 8,
		"emit" : 13,
		"reduce" : 2,
		"output" : 10
	},
	"ok" : 1,
}

Notice that we also get some stats about the Map-Reduce operation such as the number of documents emitted, how many were reduced and how many were output.

To store the results in a collection we execute:

db.posts.mapReduce(map, reduce, { out: 'post_tags' })

Now I can query the results, for example sorting by tag name:

db.post_tags.find().sort({ _id: 1 })

Something that caught me out (again because I’m used to RavenDB) is that MongoDB performs case sensitive sorting so I actually end up with:

Oranges
Apples
bananas
cherries

Instead of:

Apples
bananas
cherries
Oranges

The solution (it seems) is to store a lowercase version of each tag and use that as a sort key. Fortunately we can use a finalize function (a bit like TransformResults in RavenDB) to add this to the final results:

var finalize = function(key, reducedValue) {
  reducedValue.sort_key = key.toLowerCase();
  return reducedValue;
};

Unlike reduce, finalize is called for every key (not just those keys that have multiple values). To execute the Map-Reduce and use the finalize function we call:

db.posts.mapReduce(map, reduce, { out: 'post_tags', finalize: finalize })

Now we can sort on sort_key and get the expected results:

db.post_tags.find().sort({ 'value.sort_key': 1 })

Another way of achieving the same thing without finalize is make the sort key part of the key:

var map = function() {
  this.tags.forEach(function(tag) {
    emit({ tag: tag, sort_key: tag.toLowerCase() }, {
      count: 1
    });
  });
};

Then:

db.post_tags.find().sort({ '_id.sort_key': 1 })

Using the Aggregation Pipeline

The MongoDB docs recommend using the Aggregation Pipeline for most aggregation operations. Map-Reduce offers slightly more flexibility since you can do whatever you like within the map and reduce functions whereas aggregate is limited to the provided operators. That said it is a powerful feature.

The input documents are passed through each stage in the pipeline. There are built in operators for projection, grouping and sorting to name a few.

To achieve our tag summary we execute the following pipeline:

db.posts.aggregate(
  { 
    $project: { 
      tags: 1,
      count: { $add: 1 } 
    } 
  }, 

  { 
    $unwind: '$tags' 
  },

  { 
    $group: { 
      _id: {  tag: '$tags', sort_key: { $toLower : '$tags' } }, 
      count: { 
        $sum: '$count' 
      }
    }
  }

));

The $project operator projects a result containing just the tags of each post. If we were to execute just this stage of the pipeline we would get something like:

	{
		"_id" : ObjectId("532c320bb07ab5aace243c8d"),
		"tags" : [
			"ASP.NET",
			"ASP.NET MVC",
			"Security"
		]
	},
	{
		"_id" : ObjectId("532c323bb07ab5aace243c8e"),
		"tags" : [
			"jQuery",
			"foo"
		]
	},
	{
		"_id" : ObjectId("532c3285b07ab5aace243c8f"),
		"tags" : [
			"DDD"
		]
	},

We’re using the $count operator to $add our count of 1 into the result set for each tag.

Next $unwind effectively unwinds the tags array so an array of three tags would produce three separate documents. For example, attaching this step into the pipeline:

{
	"_id" : ObjectId("532c320bb07ab5aace243c8d"),
	"tags" : "ASP.NET"
},
{
	"_id" : ObjectId("532c320bb07ab5aace243c8d"),
	"tags" : "ASP.NET MVC"
},
{
	"_id" : ObjectId("532c320bb07ab5aace243c8d"),
	"tags" : "Security"
},
{
	"_id" : ObjectId("532c323bb07ab5aace243c8e"),
	"tags" : "jQuery"
},
{
	"_id" : ObjectId("532c323bb07ab5aace243c8e"),
	"tags" : "foo"
},

Finally, we use $group to group the results by the tag name. We also store the lowercase sort key in the group’s _id:

{
	"_id" : {
		"tag" : "ASP.NET MVC",
		"sort_key" : "asp.net mvc"
	},
	"count" : 1
},
{
	"_id" : {
		"tag" : "jQuery",
		"sort_key" : "jquery"
	},
	"count" : 2
},

Currently there is not a way to save the results within the aggregation pipeline (MongoDB 2.6.x will provide the $out operator) so you would need to store the results in a variable and then manually add them to a collection.

Using the C# Driver

After using the JavaScript API, the C# driver felt like a step backwards. This isn’t to say that it is bad, it’s just it is 1) not as flexible as the JavaScript API (really down to C# being a static language) and 2) not as mature as RavenDB’s .NET API.

Point 2 is understandable since RavenDB’s primary market is .NET developers and so that would have been a big factor in the design of their API.

After installing the mongocsharpdriver package using NuGet I’ve created the following class to encapsulate my connection logic and expose the posts collection:

public class MongoContext
{
    private readonly MongoDatabase db;
    public MongoContext()
    {
        RegisterConventions();

        var client = new MongoClient("mongodb://10.211.55.12");
        MongoServer server = client.GetServer();

        db = server.GetDatabase("mongodemo");
    }

    private void RegisterConventions()
    {
        var pack = new ConventionPack();
        pack.Add(new CamelCaseElementNameConvention());
        ConventionRegistry.Register("camel case",
                                    pack,
                                    t => t.FullName.StartsWith("MongoDBDemo"));
    }

    public MongoCollection<Post> Posts
    {
        get
        {
            return db.GetCollection<Post>("posts");
        }
    }
}

The conventions are necessary so that we can map the camel-cased property names in the database to our pascal-cased C# properties. I’ve defined the following class to represent a blog post:

public class Post
{
    public ObjectId Id { get; set; }
    public string Title { get; set; }
    public string[] Tags { get; set; }
}

Querying is similar to the JavaScript API:

foreach (var post in ctx.Posts.FindAll().SetSortOrder("title")) 
{
    Console.WriteLine(post.Title);
}

Things fall down a bit when it comes to Map-Reduce as we have to provide JavaScript Map/Reduce functions as string variables:

var map = new BsonJavaScript(
    @"
    function() {
      this.tags.forEach(function(tag) {
        emit({ tag: tag, sort_key: tag.toLowerCase() }, {
          count: 1
        });
      });
    };
");

var reduce = new BsonJavaScript(
    @"
    function(key, values) {
      var totalCount = 0;

      values.forEach(function(value) {
        totalCount++;
      });

      return {
        count: totalCount
      }
    };
");

var results = ctx.Posts.MapReduce(map, reduce);
foreach (var result in results.GetResults())
{
    Console.WriteLine(result);
}

Things are marginally better with aggregate but really the static nature of C# just makes what works so well with JavaScript a bit verbose:

var project = new BsonDocument 
{
    { 
        "$project", new BsonDocument {
            { "tags", 1},
            { "count", new BsonDocument { { "$add", 1} }}
        }
    }
};

var unwind = new BsonDocument
{
    { "$unwind", "$tags" }
};

var pipeline = new[] { project, unwind };

var aggregate = ctx.Posts.Aggregate(pipeline);

foreach (var result in aggregate.ResultDocuments)
{
    Console.WriteLine(result);
}

RavenDB

In RavenDB you create indexes to perform Map-Reduce. Map and reduce functions are specified in the form of LINQ expressions. Indexes can be created in your code and initialised when your application starts or directly within RavenDB studio:

 public class Post_Tags : AbstractIndexCreationTask<Post, Post_Tags.ReduceResult>
 {
     public Post_Tags()
     {
         Map = posts => from post in posts
                        from tag in post.Tags
                        select new
                        {
                            Tag = tag,
                            Count = 1
                        };

         Reduce = results => from result in results
                             group result by result.Tag into g
                             select new
                             {
                                 Tag = g.Key,
                                 Count = g.Sum(t => t.Count)
                             };
     }


     public class ReduceResult
     {
         public string Tag { get; set; }
         public int Count { get; set; }
     }
 }

In the Map function we loop through each post and project each tag with a count of 1. The Map function returns a collection of ReduceResult. The Reduce function then groups these results by Tag produces the total count.

The code below demonstrates how to initialise the store, initialise the index and then query it:

var store = new DocumentStore
{
    Url = "http://localhost:8080",
    DefaultDatabase = "ravendemo"
};

store.Initialize();
store.DatabaseCommands.EnsureDatabaseExists("ravendemo");
IndexCreation.CreateIndexes(Assembly.GetExecutingAssembly(), store);

using (var session = store.OpenSession())
{
    foreach (var result in session.Query<Post_Tags.ReduceResult, Post_Tags>()
        .OrderBy(t => t.Tag))
    {
        Console.WriteLine(result.Tag + " " + result.Count);
    }
}

For me this API is quite a bit easier to work with and test. Again this comes down to the fact that RavenDB was largely designed for .NET, MongoDB was not.

Map-Reduce updates

When you perform Map-Reduce or aggregation in MongoDB the input is effectively a snapshot of the data at that time.

If your data changes it is down to you to perform the Map-Reduce/aggregation again (ideally in a background process).

Rather than having to re-process all of your data, MongoDB enables you to perform incremental Map-Reduce using a query to process “new” data only:

db.sessions.mapReduce(
  mapFunction,
  reduceFunction,
  {
    query: { ts: { $gt: ISODate('2011-11-05 00:00:00') } },
    out: { reduce: "session_stat" },
    finalize: finalizeFunction
  }
 );;

Unfortunately how to perform the update isn’t the tricky part; it’s knowing when to do it.

Since MongoDB doesn’t provide triggers of any kind (e.g. for when a collection changes) you’d need to handle this in your application (re-execute the Map-Reduce when a post is added/updated).

It seems it is possible to use MongoDB’s Oplog as a notification mechanism but it doesn’t appear to be a trivial task and you would still need to perform this processing in your own background task.

RavenDB on the other hand performs index updates automatically. So if I add/update a post, the index will be updated. This I believe is a massive plus point, especially when you have data that changes sporadically or a large number of indexes to maintain.

© 2022 Ben Foster