In this tuto I’ll show you how to process billions of data with minimal efforts and code with elastic mapreduce and hadoop-stream. Our list to Santa is :
- I want to process an unknown amount of data in a scalable, custom way
- The same code can be run locally and remotely, so I can debug or process any amount of data without changing anything
- I should be able to use any language I like, and this does not especially have to be java. In this example I’ll be using ruby because it’s awesome. Simply translate this tuto to python, perl, php or anything you want, it’ll still work.
1. Let’s play a game
Let’s suppose we are running a huge gaming platform. As the brand new data scientist, we’re asked to perform some simple stats on our users.
For the sake of the example, let’s compute any user’s average ranking, per type of played games.
From the platform logs we have to kind of files:
GameLogfile describes the type of game, the date, and a game id
PlayerLogfile describes how a player scored a game. It contains the game id, the player id, and the player score.
This looks like
1 2 3 4 5
1 2 3
Our files are tab separated (tsv) format, and stored on an amazon aws s3 bucket.
1 2 3 4 5
2. Do this with style !
We are going to solve this algorithm with mapreduce. Map/reduce is a programming paradigm that will allow you to horizontally scale the program execution. This means the more parallel servers you get, the more efficient you will be. Obviously within a reasonable range, mounting 300 servers to process 10 lines of data doesn’t look like a good idea..
In addition to be scalable, I really find a map/reduce reduces the amount of code, increases claricity, and should thus be used even for moderate amount of datas.
Bonus: We’ll be able to run our code both locally for quick tests, and remotely for heavy processing \o/
Before entering in the details, here is what we are going to do:
- map the raw data and extract useful information (map)
- group the data by
- compute each player rank for each game (reduce1)
- group the data by (player,game_type) couples (sort)
- for each (player/game_type) couple, compute the average rank (reduce2)
Our steps hence consists in a map –> reduce –> reduce procedure. If we think of a second mapper which is identity, then we have two
As we plan to use hadoop-stream, the only things we need are three script files that will represent our mapper, and reducers. Each file will consist of a simple script that will “eat” data via
STDIN, and output something to
Again, I’m using ruby as an example here. If you’re more comfortable with any other language, then please use it, as long as it knows
Thanks to Hadoop, we won’t have to take care of the sort steps, the data redundency management, the possible server crashes, and plenty of boring stuff. How nice is that ?
2.1. first mapper
The first mapper’s role will be to “eat” raw data with absolutely no context, nor any knowledge of what’s happening elsewhere (i.e. on other potential workers). It is very important to note that there is absolutely no relation between two consecutive lines that a mapper receives. For instance, some mapper could receive the first line of the first log, then the 10th line of another log file, then the 2nd line of the first log…etc
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
As you can see, this mapper will always output the
game_id as first key. Then, regarding of the log type, it will either output informations about the player, or the game.
You can run locally your mapper by simply running
cat datain/* | map1.rb, whitch outputs something like
1 2 3 4 5 6
2.2 first sort
I feel like this step should be explained even if it does not require any work. What will be happening here is that hadoop will take care of our first outputed results.
By default, it will split using the
tab character, and will assign a single reducer instance for each key.
Furthermore, it will garanty that a reducer will see ‘sorted’ results
This step is very important to understand. It means two things for the reducer:
- For each primary key (
game_idin our example), all the corresponding lines will be sent to the same reducer instance. This allows to process data without any communication between the reducers.
- The data is sorted. This implies that if a reducer sees a
game_id=1key, then all following lines will also be
game_id=1until there is no
game_id=1key left. Ever. As soon a the reducer receives a different primary key, then we can assume all the
game_id=1lines have been processed.
When running with bash:
As I said, I should be able to run my code both locally and remotely. Fortunately, we can perform a sort with bash with the
This trick consists of performing a pure sort on the data. When running locally, we don’t have to distribute the data between different instances (which hadoop does) so a sorted data will garanty the two features that we require for our reducer.
running this in bash would yield:
cat datain/* | map1.rb | sort =>
1 2 3 4 5 6
As you can see, the data is now grouped by
game_id key. How delightful.
When running with hadoop:
Simply perform some cool dance moves while hadoop take care of everything.
2.3 first reduce
The first reducer will accumulate the player scores, in order to determine the players ranks in each played game:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
Now our process yield
cat datain.dat | ./map1.rb | sort | ./reduce1.rb =>
1 2 3 4
This could be read as
- player 1 scored one chess game with rank 1
- player 2 scored one chess game with rank 2
- player 1 scored one go game with rank 2
- player 2 scored one go game with rank 1
Please note something very important here: The reducer stores almost nothing in memory! As you can see in the script, as soon as a game is finished processing, then we can simply output the result and drop our reducer. Nothing has to stay in memory, so you don’t need any ram on your workers, even to process billions of games !
2.4. coffe break !
If you’re still reading this then I’m sure you deserve it.
2.5. Second mapper
Nothing has to be done here, the data is already formated for the next reduce step.
Conceptualy, we can view this step as a map step, where the mapper would be identity.
As a reminder that something is still hapening here, I’ll pipe the unix
cat command to our workflow. Of course it has no practical purpose.
When running our code with hadoop-stream, we’ll declare a
map step, with identity mapper ( or we’ll simply declare
cat to be our mapper script, which is pretty much the same)
2.6 Last step: averaging the scores
For the sake of the argument, let’s say I wasn’t this lazy, and generated much more data, which led to a
reduce1 output that reads
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
We now have three players, three different games. I also shuffled the results, to emphasis that the reduce step does not necessary provide sorted results.
Actually it does when running our bash workflow, since we’re using
sort and a single proc. Generally speaking it is not.
Once we run the identity mapper, followed by the sort step, it will again be sorted so we can write our final reducer as follows:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
If we run our bash process we get
cat datain/* | ./map1.rb | sort | ./reduce1.rb | cat | sort | ./reduce2.rb (assuming we have our new dataset):
1 2 3 4 5 6
And that’s it ! we know knows that the player 1 performed an average rank of 1.2 at chess, and an average rank of 8.0 at hide and seek !
3. Going heavy
Like I told you, our script are hadoop-ready. Provided you have an aws-amazon account, running our code can be done very easily:
- install amazon elastic-mapreduce client and configure it (basically give it your credentials)
- run the first map-reduce stage:
1 2 3 4 5 6 7 8 9 10 11 12
1 2 3 4 5 6 7 8 9 10 11 12
Note that I’m using two different launchers here. You can also tell your launcher to perform multiple steps by passing them as json. See the elastic-mapreduce doc for that.
Wait… Is it this simple?
Yes. This simple.
Thanks for reading, you’re awesome
If this helped you in any way, feel free to drop a comment, correct something, ask a question, or simply let me know this was interesting, thanks !
Please note that this approach can be — and have been — used for heavy industrial purposes. You can litterally process billions of rows in no time, with a few lines of code. This is, in my opinion, a tremendous way to prototype and scale your data analysis !