g2.8xlarge does not support NVIDIA GPUDirect peer to peer transfers

Go to


run make and then execute


[./simpleP2P] – Starting…
Checking for multiple GPUs…
CUDA-capable device count: 4
> GPU0 = ” GRID K520″ IS capable of Peer-to-Peer (P2P)
> GPU1 = ” GRID K520″ IS capable of Peer-to-Peer (P2P)
> GPU2 = ” GRID K520″ IS capable of Peer-to-Peer (P2P)
> GPU3 = ” GRID K520″ IS capable of Peer-to-Peer (P2P)

Checking GPU(s) for support of peer to peer memory access…
> Peer access from GRID K520 (GPU0) -> GRID K520 (GPU1) : No
> Peer access from GRID K520 (GPU0) -> GRID K520 (GPU2) : No
> Peer access from GRID K520 (GPU0) -> GRID K520 (GPU3) : No
> Peer access from GRID K520 (GPU1) -> GRID K520 (GPU0) : No
> Peer access from GRID K520 (GPU1) -> GRID K520 (GPU2) : No
> Peer access from GRID K520 (GPU1) -> GRID K520 (GPU3) : No
> Peer access from GRID K520 (GPU2) -> GRID K520 (GPU0) : No
> Peer access from GRID K520 (GPU2) -> GRID K520 (GPU1) : No
> Peer access from GRID K520 (GPU2) -> GRID K520 (GPU3) : No
> Peer access from GRID K520 (GPU3) -> GRID K520 (GPU0) : No
> Peer access from GRID K520 (GPU3) -> GRID K520 (GPU1) : No
> Peer access from GRID K520 (GPU3) -> GRID K520 (GPU2) : No
Two or more GPUs with SM 2.0 or higher capability are required for ./simpleP2P.
Peer to Peer access is not available amongst GPUs in the system, waiving test.

As you can see, g2.8xlarge does not support NVIDIA GPUDirect peer to peer transfers. Only P2 instances have the support.

OLAP in Big Data

Almost any big data pipeline has an aggregation processing step for analyzing multidimensional data. For example: in sales dataset, we have the following

<year, month, day, hour, minute, second, city, state, country, sales>

where <year, month, day> are attributes of the date dimension and <hour, minute, second> are attributes for the time dimension. These are so called the temporal dimensions.

<city, state, country> are attributes of the location dimension.

In OLAP data tube terminology, cube group denotes an actual group belonging to the cube region defined by a certain dimension attributes. Eg. one could have the following cube region <search_topic, device_type, *, *> with the corresponding actual cube groups <technology, iPhone, *, *>, <technology, android, *, *>, <news, iPhone, *, *> etc.

Note, * represents the all category which includes every attributes in that dimension.

Many pipelines are interested in computing aggregate measures SUM, REACH, TOP K over all possible aggregate/cube groups defined by the above dimensions. Eg. total sales for San Francisco, CA, USA for February 2015, number of unique buyers for last quarter for San Francisco, CA, USA.

Nowadays, terabytes of accumulated data per day is not uncommon in many companies. There are two types of aggregate measures, so called algebraic and holistic measures.

For algebraic measure, eg. SUM, COUNT, we can compute the SUM distributively part by part from the subgroups.


SUM( sales day 1, sales day 2, sales day 3) = SUM(sales day 1, sales day 2) + SUM(sales day 3)

For holistic measure, eg. DISTINCT_COUNT or TOP_K, we cannot compute them in a distributive fashion. This impacts the parallelization of our aggregation job.


DISTINCT_COUNT(USERS in day 1 and day 2) NOT equals to  DISTINCT_COUNT(USERS in day 1) + DISTINCT_COUNT(USERS in day 2)

Other than above holistic measure challenge, other issues in big data aggregation are
  • Size of intermediate data

Since the size of intermediate data emitted during the map phase depends of the number of cube regions and the size of the input data. As we increase the number and depth of dimensions in aggregation, a naive approach can emit huge amounts of data exponentially during the map phase. This causes IO disk space issue and incurs additional shuffling cost.

  • Size of large groups
There is high possibility that we have some large cube groups during reduce phase , e.g. <*, *> or <*, *, *, USA>. With a naive approach, the reducer assigned to these groups will take a long time to process compared to others.

Most of the time, cube regions at the bottom of the cube lattice are likely to have these large groups, we can call them reducer-unfriendly groups, e.g.. <*,*>. One could also perform sampling on the input and figure out the potential large groups.
We can convert some holistic measures, eg. DISTINCT_COUNT into partial algebraic form so that we can compute them from subgroups in a parallel fashion. If we don’t care about exact value, we could also use approximate distinct counting.
To be continued…

Mahout Distance Measure

In Mahout, there are different distance measure classes defined in the org.apache.mahout.common.distance package. Among them are SquaredEuclideanDistanceMeasure, EuclideanDistanceMeasure, CosineDistanceMeasure, MahalanobisDistanceMeasure, ManhattanDistanceMeasure. These distance classes implements the DistanceMeasure interface.

There is also support for weighted version of DistanceMeasure eg. WeightedEuclideanDistanceMeasure and WeightedManhattanDistanceMeasure. Both extends the abstract class WeightedDistanceMeasure.


DistanceMeasure Mahout



YARN log files

Viewing MapReduce job log files has been a pain. With YARN, you can enable the log aggregation. This will pull and aggregate all the individual logs belonging to a MR job and allow one to view the aggregated log with the following command.

You can view your MapReduce job log files using the following command

yarn logs -applicationId  <YOUR_APPLICATION_ID>  |less


yarn logs -applicationId  application_1399561129519_4422 |less

You can enable the log aggregation in the yarn-site.xml as follows

cat /etc/hadoop/conf/yarn-site.xml


To see the list of running MapReduce jobs

mapred job -list

To check the status of a MapReduce job

mapped job -status <YOUR_JOB_ID>

Hadoop ToolRunner and Tool

When your create a MapReduce Job, you can implement the Tool interface’s method

int run(String[] args) throws Exception;

This is the only method in Tool interface. This would allow the ToolRunner to invoke our implemented run(String[]) method of our MapReduce job. In the main method, you call ToolRunner.run(new Configuration(), new MapReduceJob(), args) as follows:

public static void main(String[] args) {

int status = ToolRunner.run(new Configuration(), new MapReduceJob(), args);


If you take a look at the ToolRunner class, you will find the method, since our MapReduceJob is an implementation of Tool interface, ToolRunner will in turn invokes our implementation of run(String[] args)

public static int run(Configuration conf, Tool tool, String[] args)

throws Exception{

if(conf == null) {

conf = new Configuration();


GenericOptionsParser parser = new GenericOptionsParser(conf, args);

//set the configuration back, so that Tool can configure itself


//get the args w/o generic hadoop args

String[] toolArgs = parser.getRemainingArgs();

return tool.run(toolArgs);


Avro Schema

Here are some examples:


“namespace”: “test.avro”,

“name”: “FacebookUser”,

“type”: “record”,

“fields”: [

{“name”: “name”, “type”: “string”},

{“name”: “num_likes”, “type”: “int”},

{“name”: “num_photos”, “type”: “int”},

{“name”: “num_groups”, “type”: “int”},

    {"name": "friends", "type": "array", "items": "FacebookUser"} 



First, create a ByteArrayInputStream with the avro data as the input

Then, create DecoderFactory and then get the DirectBinaryDecoder from the factory

Next, instantiate the GenericDatumReader with the Avro schema

Now we can use the above reader to read the deserialized avro data from the decoder


Schema schema = Schema.parse(new File(“PositionList.avsc”));

ByteArrayInputStream bai = new ByteArrayInputStream(bytes);

DecoderFactory decoderFactory = new DecoderFactory();

Decoder decoder = DecoderFactory.get().directBinaryDecoder(bai, null);

GenericDatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema);

GenericRecord rec = (GenericRecord)reader.read(null, decoder);