|
MapReduceSample
Tutorial for Lokad.Cloud illustrating Map/Reduce with the Image Histogram Sample.
Featured IntroductionThis tutorial explains the creation of a map/reduce application based on Lokad.Cloud; the application computes the histogram of an image using several Azure workers and handles the data in blob storage and queues. We strongly suggest to checkout/update Lokad.Cloud (to get the latest version), and also to take a look first at the GettingStarted tutorial. The relevant C# projects in the Lokad.Cloud Visual Studio solution are
Map/Reduce Mini FrameworkWe've built a mini-framework that can be used to write any map/reduce application against Lokad.Cloud. The architecture is simple, as shown in the picture below.
MapReduceJob is the only class that the (non-cloud) client app sees and uses. It allows to push a batch if items toward the map/reduce logic and to wait for and finally retrieve the results. The class also allows to define the map/reduce functions as .NET delegates. MapReduceBlobSet is the class that assigns work items to workers and that executes the map/reduce logic. MapReduceJob and MapReduceService both use this class to manipulate data (blobs/queues are never accessed directly). MapReduceService is the actual service that waits for messages in the job queue and processes data; the service is executed and managed by Lokad.Cloud. The actual work-flow is as follows:
This architecture works with many different jobs running concurrently, because each one has a unique name and MapReduceService processes work items as they arrive, without any knowledge of the actual work being done or other map/reduce functions being used. MapReduceBlobSet is also completely job-agnostic. ImplementationAs said, the MapReduceBlobSet class plays a central role in the map/reduce mini-framework. The class interface follows. class MapReduceBlobSet
{
// Generates initial blobsets, storing the configuration and queuing work items in the job queue queue
//(IMapReduceFunctions contains the map and reduce functions, i.e. Func<TMapIn, TMapOut> and Func<TMapOut, TMapOut, TMapOut>)
// mapIn and mapOut are the types corresponding to TMapIn and TMapOut, necessary for job configuration
void GenerateBlobSets(string jobName, IList<object> items, IMapReduceFunctions functions, int workerCount, Type mapIn, Type mapOut);
// Performs map/reduce operations for all blobs in a blobset of a specific job
// When all blobsets are processed, a message is put in the job queue
void void PerformMapReduce(string jobName, int blobSetId);
// Aggregates reduced results in a single result, using the reduce function
void PerformAggregate(string jobName);
// Gets the number of completed and total blobsets for a job (checking if a job is complete)
Tuple<int, int> GetCompletedBlobSets(string jobName);
// Returns the result of a job (only if the job is complete)
T GetAggregatedResult<T>(string jobName);
// Deletes all the data of a job, regardless of its state
void DeleteJobData(string jobName);
}The MapReduceBlobSet class implements most of the map/reduce infrastructure. A few aspects worth noting:
The MapReduceService class inherits CloudService<JobMessage> and implements the Start method. It invokes MapReduceBlobSet.PerformMapReduce and MapReduceBlobSet.PerformAggregate, based on the type of the message. The JobMessage class is just a data container: [Serializable]
public sealed class JobMessage
{
// Either BlobSetToProcess or ReducedDataToAggregate
public MessageType Type { get; set; }
public string JobName { get; set; }
// Only set if MessageType is BlobSetToProcess
public int? BlobSetId { get; set; }
}As previously said, a JobMessage is put in the job queue for each blobset. When all blobsets are processed for a job, another JobMessage is put to the queue with MessageType set to ReducedDataToAggregate. Note that the JobMessage class is marked Serializable. The client of the map/reduce framework interacts with it entirely using the MapReduceJob class, whose interface follows: class MapReduceJob<TMapIn, TMapOut>
{
// Pushes items to process with map/reduce functions
string PushItems(IMapReduceFunctions functions, IList<TMapIn> items, int workerCount);
// Verifies whether the job is completed
bool IsCompleted();
// Gets the job result
TMapOut GetResult();
// Deletes all the job data (usually called when the job is complete)
void DeleteJobData();
}IMapReduceFunctions contains the map and reduce functions (Func<TMapIn, TMapOut> and Func<TMapOut, TMapOut, TMapOut>), boxed in objects. The boxing is necessary because MapReduceBlobSet and MapReduceService work without knowing the actual functions being used (they are invoked as weakly-typed delegates behind the scenes). The infrastructure stores the full type name of the class that implements IMapReduceFunctions so that, once the client has configured the job, the service is able to create an instance of that class and invoke the map/reduce functions. For this reason, the class must be deployed on the cloud too, and must have a parameter-less constructor. The MapReduceJob class interfaces with the MapReduceBlobSet class and invokes its members. The client should invoke methods in this sequence:
Strongly-Typed Blob NamingIn this map/reduce sample, blob naming is done entirely using the StrongTypedBlobName pattern. As an example, the MapReduceConfiguration class contains configuration data for a map/reduce job (as previously mentioned, the total number of blobsets and the map/reduce functions). For storing the (serialized) configuration object, we created a MapReduceConfigurationName class: public class MapReduceConfigurationName : BaseTypedBlobName<MapReduceConfiguration>
{
public override string ContainerName
{
get { return MapReduceBlobSet.ContainerName; }
}
[Rank(0)] public string Prefix;
[Rank(1)] public string JobName;
public MapReduceConfigurationName(string prefix, string jobName)
{
Prefix = prefix;
JobName = jobName;
}
public static MapReduceConfigurationName Create(string jobName)
{
return new MapReduceConfigurationName(ConfigPrefix, jobName);
}
public static BlobNamePrefix<MapReduceConfigurationName> GetPrefix()
{
return GetPrefix(new MapReduceConfigurationName(ConfigPrefix, null), 1);
}
}The key part of the blob name for the job configuration is the name of the job, so we have a JobName field. In order to store multiple blob types for each job (configuration, input data blobs, the counter, etc.), we also added a Prefix field, which is fixed for each name class (as you can see in the static helper methods). The resulting blob name is <container name>/config/<job name> Using this approach, listing blobs with a specific name prefix is very easy. For example, we have to list all blobs in a blobset: // Snippet from MapReduceBlobSet.PerformMapReduce method
var blobsetPrefix = InputBlobName.GetPrefix(jobName, blobSetId);
foreach(var blobName in _blobStorage.List(blobsetPrefix))
{
object inputBlob = _blobStorage.GetBlob(...);
object mapResult = InvokeAsDelegate(config.MapReduceFunctions.Mapper, inputBlob);
mapResults.Add(mapResult);
}All operations performed on blobs by MapReduceBlobSet work with objects: although the blobs contain strongly-typed data (for example a System.Int32 or a System.Drawing.Bitmap), they are all treated as objects so no changes must be made to the class for making it work with any type of data items. Known CaveatsThe time needed to upload the input items to the cloud is - in this example - an order of magnitude bigger than the time it takes to process them. For this reason, this map/reduce solution is better suited for processing data that is already in the cloud or to perform computations that require much more time than uploading the data. The output of the map function is not stored in the blob storage. This helps improving the performance and keeping the solution simple, but in case of interruption or crash during the reduce phase, the processing must restart from the map phase, thus wasting computing resources. Image Histogram Client ApplicationThe application computes the grayscale histogram of an image (the distribution of each gray level, 0 to 255, in the whole image).
The application works like this:
Note: an initial version of the application transferred image slices using a serialized System.Drawing.Bitmap, but that turned out to waste a lot of bandwidth and thus time. Using the PNG format, which applies a lossless compression algorithm, is a better choice. In our case, the client application instantiates and uses the job controller as follows (Setup is an IoC setup class): _mapReduceJob = new MapReduceJob<byte[], Histogram>(
Setup.Container.Resolve<Lokad.Cloud.IBlobStorageProvider>(),
Setup.Container.Resolve<Lokad.Cloud.IQueueStorageProvider>());
// ...
_mapReduceJob.PushItems(new MapReduceFunctions()
{
Mapper = (Func<byte[], Histogram>)Helpers.ComputeHistogram,
Reducer = (Func<Histogram, Histogram, Histogram>)Helpers.MergeHistograms
}, slices, 4);The Histogram class is defined as follows (note it's marked as DataContract and its members as DataMember - Lokad.Cloud uses DataContractSerializer behind the scenes, so either DataContract or Serializable attributes are needed): [DataContract]
public class Histogram
{
[DataMember]
public double[] Frequencies;
[DataMember]
public int TotalPixels;
// Constructor needed for deserialization
protected Histogram() { }
public Histogram(int totalPixels)
{
Frequencies = new double[256];
TotalPixels = totalPixels;
}
public double GetMaxFrequency()
{
return Frequencies.Max();
}
}Given the architecture of Lokad.Cloud, all the CLR types the queue service uses must be known. For this reason, the Histogram class is defined in a separate assembly (MapReduceClientLib.dll), which must be uploaded alongside the main service assembly (MapReduceService.dll), very much like the class implementing IMapReduceFunctions, as mentioned above. The map/reduce functions are defined as follows (implementation omitted for brevity, see Helpers and HistogramMapReduceFunctions classes, MapReduceClientLib.dll assembly): // Func<byte[], Histogram> Histogram ComputeHistogram(byte[] inputStream); // Func<Histogram, Histogram, Histogram> Histogram MergeHistograms(Histogram hist1, Histogram hist2); Client Application ImplementationWhen an image is selected from hard disk, and the Start button is pushed, the following operations are performed:
Notes: the number of workers passed in PushItems can be greater than the actual number of Azure Worker Roles, but it introduces some unneeded overhead. Also, if the number of pushed items is smaller than the number of workers, the latter is reduced to the number of items so that each worker processes one item (again, this situation introduces some overhead for the reduce operation). The map/reduce mini-framework applies the map function to all picture slices contained in the N blobsets, obtaining a histogram for each slice. Then, it reduces the histograms of a blobset into a single histogram, using the reduce function. When all the N blobsets are completed (map+reduce), the service aggregates the N histograms into a single result, using the same reduce function.
|