Accumulo – In Depth Look at Filters, Combiners, and Iterators with Complex Values

Apache Accumulo is a Google BigTable implementation out of the NSA which is similar to HBase but was earlier to adopt cell-based security features and a server-side programming framework called iterators.

One of my clients has a use case for such security, as well as scalability which Accumulo has a proven track record of, so we decided to prototype a project with it.

In this blog post, I wanted to explore the Accumulo data model as well as how to program using it’s server-side interfaces: Filters, combiners and iterators. Accumulo doesn’t have any high-level data types (Strings, Longs, etc), but serializes everything as a ‘Value’ which has an underlying byte[] array. Each of my examples simulate using a complex data type as a Value using a Java POJO, MyPojo.java:


public class MyPojo implements Serializable {

public String name;
public Integer count;

public MyPojo(String name, Integer count) {
  this.name = name;
  this.count = count;
}

All of my code examples are available on github with deployment instructions in the README.

Filters

We’ll start with the Filter interface as it’s the simplest. The idea behind a filter is simple: Rather than having a scan return all records, you’d like only those that match your filter criteria.

To implement a filter, you create your own class that implements the Filter interface.


public class MyPojoFilter extends Filter {

@Override
public boolean accept(Key k, Value v) {
  MyPojo pojo = (MyPojo) SerializationUtils.deserialize(v.get());
  if ("foo".equals(pojo.name)) {
    return true;
  } else {
    return false;
  }
 }
}

Pretty simple. For each value MyPojoFilter scans, I deserialize it into MyPojo and include only those that have the ‘name’ field equals ‘foo’.

Combiners

Combiners are implemented by extending the Combiner interface or its descendants. Combiners are used to to ‘Combine’ or ‘Aggregate’ homogeneous values for a row or rows, similar to the reduce step in map/reduce. I implemented my Combiner by extending the TypedValueCombiner interface.


public class MyPojoCombiner extends TypedValueCombiner<MyPojo> {

public static final MyPojoEncoder POJO_ENCODER = new MyPojoEncoder();
  @Override
  public void init(SortedKeyValueIterator<Key, Value> source, Map<String, String> options, IteratorEnvironment env) throws IOException {
    super.init(source, options, env);
    setEncoder(POJO_ENCODER);
  }

  @Override
  public MyPojo typedReduce(Key key, Iterator<MyPojo> iter) {
    int sum = 0;

    while (iter.hasNext()) {
      MyPojo next = iter.next();
      sum += next.count;
    }
    return new MyPojo("", sum);
  }

  public static class MyPojoEncoder implements Encoder<MyPojo> {
    @Override
    public byte[] encode(MyPojo v) {
      return SerializationUtils.serialize(v);
  }

    @Override
    public MyPojo decode(byte[] b) {
      return (MyPojo) SerializationUtils.deserialize(b);
    }
  }
}

The purpose of MyPojoCombiner is simple, to sum the ‘count’ field across MyPojo Values for each scanned row. Because Accumulo only understands byte[] as Values, I need to provide an implementation of TypedValueCombiner.Encoder for MyPojo to encode/decode each serialized Value during the scan.

You’ll notice the I’ve declared MyPojo as the type in my TypedValueCombiner implementation, so my overridden typeReduce method both iterates over AND returns an instance of MyPojo. We store very rich, complex objects in Accumulo using Avro, so aggregating instances of those objects and returning the same type can be very awkward – if you are aggregating a few numerical fields over, say, 100’s of fields, what should you set their values to? In this case, it seems more natural to be able to return an object that differs from the one you are Iterating over, and that’s one way Iterators can be used.

Iterators

This is where things start to get confusing and the official documentation started to get sparse. My, albeit contrived, examples are a culmination of trial an error and feedback from the project leads in the forums.

In writing an iterator, I had two goals.  First, to aggregate over one type of Value and return another (MyAggregationPojo) and second, to be able to aggregate with row boundaries or without them (for ALL Values scanned).

As the code samples are a bit long, I’ll only post the relevant snippets. Both examples implement the WrappingIterator interface which itself implements the SortedKeyValueIterator Interface.

To help us understand these interfaces let’s first look at the lifecycle of an iterator (via user@accumulo.apache.org):

  1. A new instance is called via Class.newInstance (so a no-args constructor is needed)
  2. Init is called. This allows users to configure the iterator, set its source, and possible check the environment. We can also call `deepCopy` on the source if we want to have multiple sources (we’d do this if we wanted to do a merge read out of multiple column families within a row).
  3. seek() is called. This gets our readers to the correct positions in the data that are within the scan range the user requested, as well as turning column families on or off. The name should reminiscent of seeking to some key on disk.
  4. hasTop() is called. If true, that means we have data, and the iterator has a key/value pair that can be retrieved by calling getTopKey() and getTopValue(). If fasle, we’re done because there’s no data to return.
  5. next() is called. This will attempt find a new top key and value. We go back to (4) to see if next was successful in finding a new top key/value and will repeat until the client is satisfied or hasTop() returns false.

You can kind of make a state machine out of those steps where we loop between (4) and (5) until there’s no data. There are more advanced workflows where next() can be reading from multiple sources, as well as seeking them to different positions in the tablet.

There are two implementations, MyPojoCountHistogramAll and MyPojoCountHistogram, the first which ignores row (key) boundaries and aggregates across all scanned rows. In both, the bulk of the logic is implemented in the overridden method, next().

MyPojoCountHistogramAll:


@Override
 public void next() throws IOException {
  top_key = null;
  while (this.getSource().hasTop()) {
    top_key = this.getSource().getTopKey();
    Value v = this.getSource().getTopValue();

    MyPojo pojo = (MyPojo) SerializationUtils.deserialize(v.get());
    Integer count = pojo.count;
    Integer countCount = countHistogram.get(count);

    if (countCount == null) {
      countCount = new Integer(1);
     } else {
       countCount = countCount + 1;
     }

     countHistogram.put(count, countCount);

     this.getSource().next();

   }

  this.top_value = new Value(SerializationUtils.serialize(new MyAggregationPojo(countHistogram)));
}

Not too bad. We deserialize MyPojo, grab it’s ‘count’ field and it to the countHistogram and return an instance of MyAggregationPojo when we are done. You’ll notice that unlike the Combiners we are able to return Values that differ from those we iterator over and that the interfaces are not typed which could be improved.

The next implementation, MyPojoCountHistogram, was adapted from RowEncodingIterator. It’s very similar to my previous example, except in the next() method, there is logic to determine if the current row differs from the previous row and therefore to end the current aggregation and begin a new one. There is also a more elaborate seek() method to help us seek the relevant rows.

Summary

HBase has now caught up with cell-based access security and it’s own implementation of iterators, coprocessors. Hopefully this blog post has demonstrated how to get started with this powerful, server-side programming framework which should be a bit reminiscent of SQL stored procedures. Please reach out in the comments if you have any questions or follow me on Twitter (@michaelmoss) where I like to discuss Accumulo and similar topics.