Using S3DistCP to Merge Many Small S3 Files

At some point, many companies face the reality that using S3 as input for their MapReduce or Spark jobs can be terribly slow. One of the primary reasons for this is having too many small files as input which might also be uncompressed. If this describes your predicament, you can use S3DistCP to merge and compress your files for better MapReduce/Spark performance and lower storage costs.

Unfortunately, the documentation around using S3DistCP is not great, so I wanted to document some tips here.

There are several ways to find and download s3distcp, I used s3cmd:

s3cmd get s3://elasticmapreduce/libs/s3distcp/1.latest/s3distcp.jar

s3distcp relies on many dependencies, which are included in the aws-java-sdk:

wget http://sdk-for-java.amazonwebservices.com/latest/aws-java-sdk.zip

The goal is to run a command which might look a little something like the following which means we need to get our classpath and s3 credentials set up:

hadoop jar /tmp/s3distcp.jar -D mapred.child.java.opts=-Xmx1024m -D s3DistCp.copyfiles.mapper.numWorkers=1 -libjars ${LIBJARS} --src s3n://data/2015/* --dest hdfs:///data/compressed/ --groupBy '.*(1).*' --targetSize 256 --outputCodec snappy

To configure the classpath, you’ll want to export two environment variables, LIBJARS and HADOOP_CLASSPATH which include jars from the unzipped aws-java-sdk. Let’s say you unzipped it in /tmp, setting your classpath would like something like this:

export LIBJARS=/tmp/aws-java-sdk-1.9.40/third-party/jackson-core-2.3.2/jackson-core-2.3.2.jar,/tmp/aws-java-sdk-1.9.40/third-party/aspectj-1.6/aspectjweaver.jar,/tmp/aws-java-sdk-1.9.40/third-party/aspectj-1.6/aspectjrt.jar,/tmp/aws-java-sdk-1.9.40/third-party/freemarker-2.3.18/freemarker-2.3.18.jar,/tmp/aws-java-sdk-1.9.40/third-party/joda-time-2.2/joda-time-2.2.jar,/tmp/aws-java-sdk-1.9.40/third-party/javax-mail-1.4.6/javax.mail-api-1.4.6.jar,/tmp/aws-java-sdk-1.9.40/third-party/jackson-databind-2.3.2/jackson-databind-2.3.2.jar,/tmp/aws-java-sdk-1.9.40/third-party/httpcomponents-client-4.3/httpclient-4.3.jar,/tmp/aws-java-sdk-1.9.40/third-party/httpcomponents-client-4.3/httpcore-4.3.jar,/tmp/aws-java-sdk-1.9.40/third-party/jackson-annotations-2.3.0/jackson-annotations-2.3.0.jar,/tmp/aws-java-sdk-1.9.40/third-party/commons-logging-1.1.3/commons-logging-1.1.3.jar,/tmp/aws-java-sdk-1.9.40/third-party/spring-3.0/spring-core-3.0.7.jar,/tmp/aws-java-sdk-1.9.40/third-party/spring-3.0/spring-context-3.0.7.jar,/tmp/aws-java-sdk-1.9.40/third-party/spring-3.0/spring-beans-3.0.7.jar,/tmp/aws-java-sdk-1.9.40/third-party/commons-codec-1.6/commons-codec-1.6.jar,/tmp/aws-java-sdk-1.9.40/lib/aws-java-sdk-1.9.40.jar

Do the same for HADOOP_CLASSPATH

Credentials for s3 go in your core-site.xml file. The keys are: fs.s3.awsAccessKeyId and fs.s3.awsSecretAccessKey

With the classpath and credentials in place, you should be all set!

For more information about available s3distcp options, see: http://docs.aws.amazon.com/ElasticMapReduce/latest/DeveloperGuide/UsingEMR_s3distcp.html

For more tips on optimizing Hadoop and Spark jobs against S3 data, check out this great post from the Mortar blog: http://blog.mortardata.com/post/58920122308/s3-hadoop-performance

Single-node Accumulo development cluster in one command…

I’m really pleased to share accumulo-vagrant which uses Vagrant to create a single-node Accumulo cluster for development.

Nearly every distributed database seems to have a dependency on Zookeeper or HDFS these days and while developing with them, I often use the convenient HDP2.x sandboxes from Hortonworks. These VirtualBox images gives me the latest Hadoop components playing nicely together, but having all of them like HBase, Hive, Storm at once is heavy when all you need is HDFS and Zookeeper as is the case with Accumulo. If you want to go even lighter weight for development, and it fits your use cases, you can check out Mock Accumulo or Mini Accumulo cluster.

I enjoyed putting together this Vagrant script which uses an excellent and relatively new feature of Ambari, Blueprints, which opens up Ambari/Hadoop cluster configuration and deployment via a RESTful, JSON API. Once all the bits are in place and Zookeeper and HDFS are running, the Vagrant script runs ‘accumulo init’ and your cluster is ready for development! When creating this Vagrant  script, I referenced ambari-vagrant by the generous folks at SequenceIQ who publish a lot of great open source stuff around Hadoop deployment automation. I hope you find this git repo helpful and I look forward to your comments and suggestions.

Accumulo – In Depth Look at Filters, Combiners, and Iterators with Complex Values

Apache Accumulo is a Google BigTable implementation out of the NSA which is similar to HBase but was earlier to adopt cell-based security features and a server-side programming framework called iterators.

One of my clients has a use case for such security, as well as scalability which Accumulo has a proven track record of, so we decided to prototype a project with it.

In this blog post, I wanted to explore the Accumulo data model as well as how to program using it’s server-side interfaces: Filters, combiners and iterators. Accumulo doesn’t have any high-level data types (Strings, Longs, etc), but serializes everything as a ‘Value’ which has an underlying byte[] array. Each of my examples simulate using a complex data type as a Value using a Java POJO, MyPojo.java:


public class MyPojo implements Serializable {

public String name;
public Integer count;

public MyPojo(String name, Integer count) {
  this.name = name;
  this.count = count;
}

All of my code examples are available on github with deployment instructions in the README.

Filters

We’ll start with the Filter interface as it’s the simplest. The idea behind a filter is simple: Rather than having a scan return all records, you’d like only those that match your filter criteria.

To implement a filter, you create your own class that implements the Filter interface.


public class MyPojoFilter extends Filter {

@Override
public boolean accept(Key k, Value v) {
  MyPojo pojo = (MyPojo) SerializationUtils.deserialize(v.get());
  if ("foo".equals(pojo.name)) {
    return true;
  } else {
    return false;
  }
 }
}

Pretty simple. For each value MyPojoFilter scans, I deserialize it into MyPojo and include only those that have the ‘name’ field equals ‘foo’.

Combiners

Combiners are implemented by extending the Combiner interface or its descendants. Combiners are used to to ‘Combine’ or ‘Aggregate’ homogeneous values for a row or rows, similar to the reduce step in map/reduce. I implemented my Combiner by extending the TypedValueCombiner interface.


public class MyPojoCombiner extends TypedValueCombiner<MyPojo> {

public static final MyPojoEncoder POJO_ENCODER = new MyPojoEncoder();
  @Override
  public void init(SortedKeyValueIterator<Key, Value> source, Map<String, String> options, IteratorEnvironment env) throws IOException {
    super.init(source, options, env);
    setEncoder(POJO_ENCODER);
  }

  @Override
  public MyPojo typedReduce(Key key, Iterator<MyPojo> iter) {
    int sum = 0;

    while (iter.hasNext()) {
      MyPojo next = iter.next();
      sum += next.count;
    }
    return new MyPojo("", sum);
  }

  public static class MyPojoEncoder implements Encoder<MyPojo> {
    @Override
    public byte[] encode(MyPojo v) {
      return SerializationUtils.serialize(v);
  }

    @Override
    public MyPojo decode(byte[] b) {
      return (MyPojo) SerializationUtils.deserialize(b);
    }
  }
}

The purpose of MyPojoCombiner is simple, to sum the ‘count’ field across MyPojo Values for each scanned row. Because Accumulo only understands byte[] as Values, I need to provide an implementation of TypedValueCombiner.Encoder for MyPojo to encode/decode each serialized Value during the scan.

You’ll notice the I’ve declared MyPojo as the type in my TypedValueCombiner implementation, so my overridden typeReduce method both iterates over AND returns an instance of MyPojo. We store very rich, complex objects in Accumulo using Avro, so aggregating instances of those objects and returning the same type can be very awkward – if you are aggregating a few numerical fields over, say, 100’s of fields, what should you set their values to? In this case, it seems more natural to be able to return an object that differs from the one you are Iterating over, and that’s one way Iterators can be used.

Iterators

This is where things start to get confusing and the official documentation started to get sparse. My, albeit contrived, examples are a culmination of trial an error and feedback from the project leads in the forums.

In writing an iterator, I had two goals.  First, to aggregate over one type of Value and return another (MyAggregationPojo) and second, to be able to aggregate with row boundaries or without them (for ALL Values scanned).

As the code samples are a bit long, I’ll only post the relevant snippets. Both examples implement the WrappingIterator interface which itself implements the SortedKeyValueIterator Interface.

To help us understand these interfaces let’s first look at the lifecycle of an iterator (via user@accumulo.apache.org):

  1. A new instance is called via Class.newInstance (so a no-args constructor is needed)
  2. Init is called. This allows users to configure the iterator, set its source, and possible check the environment. We can also call `deepCopy` on the source if we want to have multiple sources (we’d do this if we wanted to do a merge read out of multiple column families within a row).
  3. seek() is called. This gets our readers to the correct positions in the data that are within the scan range the user requested, as well as turning column families on or off. The name should reminiscent of seeking to some key on disk.
  4. hasTop() is called. If true, that means we have data, and the iterator has a key/value pair that can be retrieved by calling getTopKey() and getTopValue(). If fasle, we’re done because there’s no data to return.
  5. next() is called. This will attempt find a new top key and value. We go back to (4) to see if next was successful in finding a new top key/value and will repeat until the client is satisfied or hasTop() returns false.

You can kind of make a state machine out of those steps where we loop between (4) and (5) until there’s no data. There are more advanced workflows where next() can be reading from multiple sources, as well as seeking them to different positions in the tablet.

There are two implementations, MyPojoCountHistogramAll and MyPojoCountHistogram, the first which ignores row (key) boundaries and aggregates across all scanned rows. In both, the bulk of the logic is implemented in the overridden method, next().

MyPojoCountHistogramAll:


@Override
 public void next() throws IOException {
  top_key = null;
  while (this.getSource().hasTop()) {
    top_key = this.getSource().getTopKey();
    Value v = this.getSource().getTopValue();

    MyPojo pojo = (MyPojo) SerializationUtils.deserialize(v.get());
    Integer count = pojo.count;
    Integer countCount = countHistogram.get(count);

    if (countCount == null) {
      countCount = new Integer(1);
     } else {
       countCount = countCount + 1;
     }

     countHistogram.put(count, countCount);

     this.getSource().next();

   }

  this.top_value = new Value(SerializationUtils.serialize(new MyAggregationPojo(countHistogram)));
}

Not too bad. We deserialize MyPojo, grab it’s ‘count’ field and it to the countHistogram and return an instance of MyAggregationPojo when we are done. You’ll notice that unlike the Combiners we are able to return Values that differ from those we iterator over and that the interfaces are not typed which could be improved.

The next implementation, MyPojoCountHistogram, was adapted from RowEncodingIterator. It’s very similar to my previous example, except in the next() method, there is logic to determine if the current row differs from the previous row and therefore to end the current aggregation and begin a new one. There is also a more elaborate seek() method to help us seek the relevant rows.

Summary

HBase has now caught up with cell-based access security and it’s own implementation of iterators, coprocessors. Hopefully this blog post has demonstrated how to get started with this powerful, server-side programming framework which should be a bit reminiscent of SQL stored procedures. Please reach out in the comments if you have any questions or follow me on Twitter (@michaelmoss) where I like to discuss Accumulo and similar topics.