MongoDB Tutorial 5 - Aggregation Framework
Category: tutorial Tags: [mongodb] [NoSQL] [database] [python] [java]
Table of Contents
The aggregation framework has its roots in SQL’s world of groupby
clause.
Introduction
Example used: imagine a SQL table of products.
name | category | manufacture | price |
---|---|---|---|
ipad | tablet | Apple | 499 |
nexus s | cellphone | Samsung | 350 |
To get number of products from each manufacture with SQL,
select manufacture, count(*) from products
group by manufacture;
with mongodb,
> use agg
> db.products.aggregate([ // array
{$group:
{
_id:"$manufacturer", // creating new collection
num_products:{$sum:1}
} // a series of upserts
}
])
{"_id" : "Amazon", "num_products" : 2}
{"_id" : "Sony", "num_products" : 1}
To do compound grouping with SQL,
select manufacturer, category, count(*) from
products group by manufacturer, category
with mongodb,
> db.products.aggregate([
{$group:
{
// _id can be a complex document, just have to be unique
_id:{"manufacturer":"$manufacturer","category":"category"},
num_products:{$sum:1}
}
}
])
{"_id" : {"manufacturer":"Amazon", "category":"Tablets"}, "num_products" : 2}
SQL to Aggregation Mapping Chart .
One can group on _id:null
to aggregate every single document such as counting or summing.
Aggregation Pipeline
pipeline stages | job | documents handling |
---|---|---|
$project |
reshape documents, select out fields potentially deep in hierarchy | 1:1 |
$match |
filter out | n:1 |
$group |
aggregate | n:1 |
$sort |
sorting | 1:1 |
$skip |
skips | n:1 |
$limit |
limit | n:1 |
$unwind |
normalize, flatten data before grouping | 1:n |
$out |
redirect output | 1:1 |
$redact |
security related | - |
$geonear |
location based searching | n:1 |
$unwind
example: tags:[red, blue]
unwinds to tag:red
and tag:blue
, expanding the number of documents.
More Operators with Examples
The $sum
operator:
> db.products.aggregate([ // array
{$group:
{
_id:{"maker":"$manufacturer"}, // creating new collection
sum_prices:{$sum:"$price"}
} // a series of upserts
}
])
{"_id" : {"maker":"Amazon"}, "sum_prices" : 328}
The $avg
operator:
> db.products.aggregate([ // array
{$group:
{
_id:{"category":"$category"}, // creating new collection
avg_prices:{$avg:"$price"}
} // a series of upserts
}
])
{"_id" : {"category":"Tablets"}, "avg_prices" : 396.42714}
The $addToSet
operator without counterpart in SQL:
> db.products.aggregate([ // array
{$group:
{
_id:{"maker":"$manufacturer"}, // creating new collection
categories:{$addToSet:"$category"}
} // a series of upserts
}
])
{"_id" : {"maker":"Apple"}, "categories" : ["Laptops", "Tablets"]}
The $push
operator:
> db.products.aggregate([ // array
{$group:
{
_id:{"maker":"$manufacturer"}, // creating new collection
categories:{$push:"$category"}
} // a series of upserts
}
])
{"_id" : {"maker":"Apple"}, "categories" :
["Tablets", "Tablets", "Tablets", "Laptops"]}
The $max
and $min
operators:
> db.products.aggregate([ // array
{$group:
{
_id:{"maker":"$manufacturer"}, // creating new collection
maxprice:{$max:"$price"}
} // a series of upserts
}
])
{"_id" : {"maker":"Apple"}, "maxprice" : 699 }
The $project
phase/stage: you can remove, add, reshape keys, use simple functions on keys such as $toUpper
, $toLower
, $add
, $multiply
.
db.products.aggregate([
{$project:
{
_id:0,
'maker': {$toLower:"$manufacturer"},
'details': {'category': "$category",
'price' : {"$multiply":["$price",10]}
},
'item':'$name'
}
}
])
{"maker":"amazon", "details":{"category":"Tablets", "price":1990},
"item":"Kindle Fire"}
The $match
phase. One thing to note about $match (and $sort) is that
they can use indexes
, but only if done at the beginning of the aggregation pipeline.
Example zips collection.
db.zips.aggregate([
{$match:
{
state:"NY"
}
},
{$group:
{
_id: "$city",
population: {$sum:"$pop"},
zip_codes: {$addToSet: "$_id"}
}
},
{$project:
{
_id: 0,
city: "$_id",
population: 1,
zip_codes:1
}
},
{$sort:
{population:-1}
},
{$skip:10},
{$limit:5}
])
{"population":9743, "zip_codes":[96162, 96161], "city":"TRUCKEE"}
// order of fields not retained
$sort
operator. The aggregation framework supports both memory (default, 100 MB limit for each pipeline stage unless allowing disk) and disk based sorting. Sorting can be done before or after the grouping stage.
$skip
and $limit
works similarly in find()
.
$unwind
operator: it’s not easy to group on the array elements (prejoined data) so we need to flatten (unjoin, data explosion) the array.
To count how many posts were attached to each tag in the blog,
use blog;
db.posts.aggregate([
/* unwind by tags */
{"$unwind":"$tags"},
/* now group by tags, counting each tag */
{"$group":
{"_id":"$tags",
"count":{$sum:1}
}
},
/* sort by popularity */
{"$sort":{"count":-1}},
/* show me the top 10 */
{"$limit": 10},
/* change the name of _id to be tag */
{"$project":
{_id:0,
'tag':'$_id',
'count' : 1
}
}
])
To reverse $unwind
You can use $push to reverse the effects of an $unwind. If the array elements were unique, $addToSet will also do the job.
More Advanced Aggregation Examples
Double Grouping
For a collection of student grades like below:
{ "_id" : { "$oid" : "50b59cd75bed76f46522c34e" },
"student_id" : 0, "class_id" : 2,
"scores" : [ { "type" : "exam", "score" : 57.92947112575566 },
{ "type" : "quiz", "score" : 21.24542588206755 },
{ "type" : "homework", "score" : 68.19567810587429 },
{ "type" : "homework", "score" : 67.95019716560351 },
{ "type" : "homework", "score" : 18.81037253352722 }]
}
We want to figure out the average class grade for each class,
db.grades.aggregate([
{'$group':{_id:{class_id:"$class_id", student_id:"$student_id"},
'average':{"$avg":"$score"}}
}, // pipe to a secondary grouping stage
{'$group':{_id:"$_id.class_id", 'average':{"$avg":"$average"}}}
])
$first
and $last
, example: find city with largest population in each state.
b.zips.aggregate([
/* get the population of every city in every state */
{$group:
{
_id: {state:"$state", city:"$city"},
population: {$sum:"$pop"},
}
},
/* sort by state, population */
{$sort:
{"_id.state":1, "population":-1}
},
/* group by state, get the first item in each group */
{$group:
{
_id:"$_id.state",
city: {$first: "$_id.city"},
population: {$first:"$population"}
}
},
/* now sort by state again */
{$sort:
{"_id":1}
}
])
$out
redirects the output to a new collection, write over, no appending, considering a games collection with documents structured like,
{
"_id":ObjectId("53684890"),
first_name: "Jerzy",
last_name: "Fischer",
points: 3,
moves: [1,2,5]
}
To summarize each person’s points,
db.games.aggregate([
{$group:
{_id:{first_name:"$first_name", last_name:"$last_name"},
points:{$sum:"$points"}
}
},
{$out:"summary_results"}
])
Note that the _id
field of the redirected output must be unique, the operations below will not succeed and leave the summary_results
collection untouched.
db.games.aggregate([
{$unwind:"moves"},
{$out:"summary_results"}
])
Double $unwind
Create a Cartesian product of the two or more arrays.
db.inventory.aggregate([
{$unwind: "$sizes"},
{$unwind: "$colors"},
{$group:
{
'_id': {'size':'$sizes', 'color':'$colors'},
'count' : {'$sum':1}
}
}
])
To reverse with $addToSet
in one stage since array elements were unique,
db.inventory.aggregate([
{$unwind: "$sizes"},
{$unwind: "$colors"},
{$group:
{
'_id': "$name",
'sizes': {$addToSet: "$sizes"},
'colors': {$addToSet: "$colors"},
}
}
])
To reverse with $push
,
db.inventory.aggregate([
{$unwind: "$sizes"},
{$unwind: "$colors"},
/* create the color array */
{$group:
{
'_id': {name:"$name",size:"$sizes"},
'colors': {$push: "$colors"},
}
},
/* create the size array */
{$group:
{
'_id': {'name':"$_id.name",
'colors' : "$colors"},
'sizes': {$push: "$_id.size"}
}
},
/* reshape for beauty */
{$project:
{
_id:0,
"name":"$_id.name",
"sizes":1,
"colors": "$_id.colors"
}
}
])
Full Text Search and Aggregation
“Two great taste that go great together.” - Andrew Erlichson.
db.sentences.aggregate([
{$match:{$text:{$search:"tree rat"}}},// must appear first
// one full text search per collection, no need to specify
{$sort:{score:{$meta:"textScore"}}},
{$project:{words:1, _id:0}}
])
{"words":"rat shrub granite."}
$text
is only allowed in the $match stage of the aggregation pipeline and must be the first stage of the aggregation pipeline.
Aggregation with Java driver
public class ZipCodeAggregationTest {
public static void main (Strng[] args) {
MongoClient client = new MongoClient();
MongoDatabase database = client.getDatabase("course");
MongoCollection<Document> collection = database.getCollection("zipcodes");
// verbose
// List<Document> pipeline = Arrays.asList(new Document("$group",
// new Document("_id", "$state").append("totalPop", new Document(
// "$sum", "$pop"))), new Document("$match", new Document("totalPop",
// new Document("$gte", 10000000))));
List<Bson> pipeline = Arrays.asList(Aggregates.group("$state", Accumulators.sum("totalPop",
"$pop")), Aggregates.match(gte("totalPop", 10000000)));
List<Document> pipeline2 = Arrays.asList(Document.parse(
"{ $group: { _id: \"$state\", totalPop: { $sum: \"$pop\" } } }"),
Document.parse("{ $match: { totalPop: { $gte: 1010001000 } } }"));
List<Document> results = collection.aggregate(pipeline).
into(new ArrayList<Document>());
// List<Document> results = collection.find().into(new ArrayList<Document>());
for (Document cur : results) {
System.out.println(cur.toJson());
}
}
}
Aggregation Options
explain
gets the query plan if we ran it, useful in optimization.allowDiskUse
allows use of hard drive for intermediate stages. Any stage is limited to 100 MB of memory use and will fail if exceeded. Certain stages like projection run the documents through and don’t use a lot of memory.cursor
allows cursor use and specify cursor size.
use agg
db.zips.aggregate(
[{$group:{_id:"$state", population:{$sum:"$pop"}}}],
{explain:true},
{allowDiskUse:true}
)
Two forms of aggregation:
aggregate([stage1, stage2, ...])
aggregate(stage1, stage2, ...)
cannot add options
Prior to the release of the 3.0 pymongo driver, you would get a document for aggregation queries by default (the aggregation result is limited by the 16 Mb size), though you had the option of getting back a cursor if you were working with MongoDB 2.6.0+, and your pymongo version was 2.6.0+. Starting with the release of the 3.0 pymongo driver, however the aggregation pipeline queries using the driver will now return a cursor by default.
The mongo shell returns a cursor by default starting 2.6.0.
import pymongo
connection = pymongo.MongoClient()
db = connection.agg
result = db.zips.aggregate([{'$group':{'_id':'state',
'population':{'$sum':'$pop'}}}])
print result
# <pymongo.command_cursor.CommandCursor object at 0x7f62829f5210> in 3.2.6
# piror to 3.0
result = db.zips.aggregate([{'$group':{'_id':'state',
'population':{'$sum':'$pop'}}}], cursor={}, allowDiskUse=True)
for doc in result:
print doc
# prior to 3.0
{u'ok':1.0, u'result' : [array of resulted documents]}
Limitations of Aggregation framework
- 100 MB limit for pipeline stages,
allowDiskUse
to get around. - 16 MB limit if you decide to return the result as a single document, set
cursor
. - In a sharded system, stages like
$group
,$sort
will bring back the results to the first shard. Stages like$match
and$project
can go in parallel. Aggregation in mongodb is an interface to map/reduce jobs. Alternatively, get the data out of mongodb using the hadoop connector and use Hadoop map/reduce. There is a map/reduce functionality built into mongodb that is not recommended.
Resources
Link to the MongoDB tutorial series.
comments powered by Disqus