Testing :: Red Planet Labs Documentation (original) (raw)

With InProcessCluster you can launch, update, and destroy modules just like you can on a real cluster. There are no differences in module capabilities or execution semantics between an InProcessCluster and a real cluster.

Here’s an example of using InProcessCluster along with JUnit to test a simple module:

public class SimpleInProcessClusterExample {
  public static class SimpleModule implements RamaModule {
    @Override
    public void define(Setup setup, Topologies topologies) {
      setup.declareDepot("*depot", Depot.hashBy(Ops.IDENTITY));

      StreamTopology s = topologies.stream("counter");
      s.pstate("$$counts", PState.mapSchema(String.class, Long.class));
      s.source("*depot").out("*k")
       .compoundAgg("$$counts", CompoundAgg.map("*k", Agg.count()));
    }
  }

  @Test
  public void simpleTest() throws Exception {
    try(InProcessCluster cluster = InProcessCluster.create()) {
      cluster.launchModule(new SimpleModule(), new LaunchConfig(4, 2));

      String moduleName = SimpleModule.class.getName();
      Depot depot = cluster.clusterDepot(moduleName, "*depot");
      PState counts = cluster.clusterPState(moduleName, "$$counts");

      depot.append("cagney");
      depot.append("davis");
      depot.append("cagney");

      assertEquals(2, (Long) counts.selectOne(Path.key("cagney")));
      assertEquals(1, (Long) counts.selectOne(Path.key("davis")));
      assertNull(counts.selectOne(Path.key("garbo")));
    }
  }
}

An InProcessCluster launches many threads for a Conductor, a Supervisor, the Metastore, and any workers launched by the module. So it’s important to call close on an InProcessCluster when the test finishes to clean up those resources. Generally it’s easiest to use a try block as demonstrated in this example to handle calling close for you.

An InProcessCluster is created using InProcessCluster.create(). There’s another version of that method for registering custom serializations which is explained more on this page.

A module is launched on an InProcessCluster using the method launchModule. It takes as input an instance of a module and a LaunchConfig to specify the parallelism. This example specifies four tasks and two task threads. The replication factor cannot be configured for a module on an InProcessCluster since it doesn’t change the semantics of the module. So all modules on an InProcessCluster run with a replication factor of one.

You can also specify multiple workers using the numWorkers method on LaunchConfig, like so:

cluster.launchModule(new SimpleModule(), new LaunchConfig(4, 2).numWorkers(2));

This would run two workers each with one task thread and two tasks. Running more workers doesn’t change the semantics of a module, but it does enable the module to exercise any custom serializations you have registered. Within a module Rama only performs serialization of data between workers and doesn’t serialize data going to a task thread located within the same worker.

The general flow of testing with InProcessCluster is to perform depot appends and then assert on the expected changes to downstream PStates. With a stream topology, like in this example, depot appends using AckLevel.ACK don’t return until all downstream processing has finished. So the assertions can be made immediately on the PStates after the depot appends complete.

Testing microbatch topologies

With a microbatch topology, processing is asynchronous to depot appends even with AckLevel.ACK. So InProcessCluster provides the helper method waitForMicrobatchProcessedCount to ease testing. Here’s an example:

public class MicrobatchTestingExample {
  public static class MicrobatchModule implements RamaModule {
    @Override
    public void define(Setup setup, Topologies topologies) {
      setup.declareDepot("*depot", Depot.hashBy(Ops.IDENTITY));

      MicrobatchTopology mb = topologies.microbatch("counter");
      mb.pstate("$$counts", PState.mapSchema(String.class, Long.class));
      mb.source("*depot").out("*mb")
        .explodeMicrobatch("*mb").out("*k")
        .compoundAgg("$$counts", CompoundAgg.map("*k", Agg.count()));
    }
  }

  @Test
  public void microbatchTest() throws Exception {
    try(InProcessCluster cluster = InProcessCluster.create()) {
      cluster.launchModule(new MicrobatchModule(), new LaunchConfig(4, 2));

      String moduleName = MicrobatchModule.class.getName();
      Depot depot = cluster.clusterDepot(moduleName, "*depot");
      PState counts = cluster.clusterPState(moduleName, "$$counts");

      depot.append("cagney");
      depot.append("davis");
      depot.append("cagney");

      cluster.waitForMicrobatchProcessedCount(moduleName, "counter", 3);
      assertEquals(2, (Long) counts.selectOne(Path.key("cagney")));
      assertEquals(1, (Long) counts.selectOne(Path.key("davis")));
      assertNull(counts.selectOne(Path.key("garbo")));

      depot.append("cagney");

      cluster.waitForMicrobatchProcessedCount(moduleName, "counter", 4);
      assertEquals(3, (Long) counts.selectOne(Path.key("cagney")));
    }
  }
}

This example uses waitForMicrobatchProcessedCount to ensure the PStates reflect the processing of the appended depot records. waitForMicrobatchProcessedCount takes in as input a module name, topology name, and a total number of depot records. Critically, the number of depot records specified represents the total amount of records ever processed by the topology, not the number of depot records since the last time it was called. This is why the example first waits for three records and then waits for four records after appending one more record.

InProcessCluster also exposes facilities to pause and resume microbatch topologies. Since microbatching is always running asynchronously, you can’t control which depot records comprise an individual microbatch attempt. If you perform three depot appends, those could be processed in three separate microbatches, in two separate microbatches, or in one microbatch. If you care about the composition of a microbatch for the purposes of testing, like to test the behavior of batch blocks, you can use pauseMicrobatchTopology and resumeMicrobatchTopology. For example:

cluster.pauseMicrobatchTopology(moduleName, "counter");
depot.append("a");
depot.append("b");
depot.append("c");
depot.append("a");
cluster.resumeMicrobatchTopology(moduleName, "counter");

pauseMicrobatchTopology waits for the currently in-flight microbatch to complete before returning. Since a microbatch processes up to 1000 unprocessed records from each depot partition (by default), this code guarantees the next microbatch will contain those four records.

Testing stream topologies that are using mirror depots

Depot appends using AckLevel.ACK don’t wait for stream topologies from other modules to finish processing them. So whereas with colocated stream topologies you can assert on PStates immediately after a depot append finishes, stream topologies consuming mirror depots may still be processing that depot record after the append finishes.

The technique to use in this case is to repeatedly poll the desired PState condition up to a timeout. Here’s an example:

public class StreamTopologyWithMirrorTestingExample {
  public static class DepotModule implements RamaModule {
    @Override
    public void define(Setup setup, Topologies topologies) {
      setup.declareDepot("*depot", Depot.hashBy(Ops.IDENTITY));
    }
  }

  public static class CounterModule implements RamaModule {
    @Override
    public void define(Setup setup, Topologies topologies) {
      setup.clusterDepot("*mirror", DepotModule.class.getName(), "*depot");

      StreamTopology s = topologies.stream("counter");
      s.pstate("$$counts", PState.mapSchema(String.class, Long.class));
      s.source("*mirror").out("*k")
       .hashPartition("*k")
       .compoundAgg("$$counts", CompoundAgg.map("*k", Agg.count()));
    }
  }

  public void assertValueAttained(Object expected, RamaFunction0 f) throws Exception {
    long nanos = System.nanoTime();
    while(true) {
      Object val = f.invoke();
      if(expected.equals(val)) break;
      else if(System.nanoTime() - nanos > 1000000000L * 30) {
        throw new RuntimeException("Condition failed to attain " + expected + " != " + val);
      }
      Thread.sleep(50);
    }
  }

  @Test
  public void streamTopologyWithMirrorTest() throws Exception {
    try(InProcessCluster cluster = InProcessCluster.create()) {
      cluster.launchModule(new DepotModule(), new LaunchConfig(8, 2));
      cluster.launchModule(new CounterModule(), new LaunchConfig(4, 2));

      Depot depot = cluster.clusterDepot(DepotModule.class.getName(), "*depot");
      PState counts = cluster.clusterPState(CounterModule.class.getName(), "$$counts");

      depot.append("cagney");
      depot.append("davis");
      depot.append("cagney");

      assertValueAttained(2L, () -> counts.selectOne(Path.key("cagney")));
      assertValueAttained(1L, () -> counts.selectOne(Path.key("davis")));
    }
  }
}

Like the other examples, this code creates a PState that counts the depot records. The difference is the depot is kept in a separate module as the stream topology computing the counts.

To perform the assertions, this class defines a helper assertValueAttained that polls the provided function for up to 30 seconds until it equals the expected value. It’s important to use a generous timeout since garbage collection can easily cause a condition like this to fail incorrectly if the timeout is low.

Module update

You can also perform module updates with InProcessCluster. To perform module updates the two module instances you deploy need to have the same module name, so you’ll need to override the getModuleName method of RamaModule to achieve this. Here’s an example:

public class UpdateExample {
  public static class CounterModule_v1 implements RamaModule {
    @Override
    public void define(Setup setup, Topologies topologies) {
      setup.declareDepot("*depot", Depot.hashBy(Ops.IDENTITY));

      StreamTopology s = topologies.stream("counter");
      s.pstate("$$counts", PState.mapSchema(String.class, Long.class));
      s.source("*depot").out("*k")
       .compoundAgg("$$counts", CompoundAgg.map("*k", Agg.sum(2)));
    }

    @Override
    public String getModuleName() {
      return "CounterModule";
    }
  }

  public static class CounterModule_v2 implements RamaModule {
    @Override
    public void define(Setup setup, Topologies topologies) {
      setup.declareDepot("*depot", Depot.hashBy(Ops.IDENTITY));

      StreamTopology s = topologies.stream("counter");
      s.pstate("$$counts", PState.mapSchema(String.class, Long.class));
      s.source("*depot").out("*k")
       .compoundAgg("$$counts", CompoundAgg.map("*k", Agg.count()));
    }

    @Override
    public String getModuleName() {
      return "CounterModule";
    }
  }

  @Test
  public void updateTest() throws Exception {
    try(InProcessCluster cluster = InProcessCluster.create()) {
      cluster.launchModule(new CounterModule_v1(), new LaunchConfig(4, 2));
      Depot depot = cluster.clusterDepot("CounterModule", "*depot");
      PState counts = cluster.clusterPState("CounterModule", "$$counts");

      depot.append("cagney");
      assertEquals(2, (Long) counts.selectOne(Path.key("cagney")));

      cluster.updateModule(new CounterModule_v2());

      depot.append("cagney");
      assertEquals(3, (Long) counts.selectOne(Path.key("cagney")));
    }
  }
}

This example simulates fixing a bug in a running module. The first version of "CounterModule" increments by two instead of by one, and the second version fixes that. In the test code, you can see that depot and counts can be used after the module update, just like with clients for real clusters. Internally those clients automatically redirect their appends/queries to the updated module instance.

The updateModule call shown in this example can only be used when no PStates or depots are being removed from the existing module instance. Just like with real clusters, since removing PStates and depots is destructive Rama requires you to be explicit about their removals. With real clusters this is specified with an additional flag on the Rama CLI, but with InProcessCluster this is specified with an additional argument to updateModule. For example, if the new version of your module removes "*depot" and "$$p", the update code would be:

cluster.updateModule(new MyModule_v2(), UpdateOptions.objectsToDelete("*depot", "$$p"));

The deleted objects must be specified exactly or the update will not go through and you’ll get an exception.

Module destroy

You can also destroy modules using InProcessCluster. Here’s an example:

cluster.destroyModule("com.mycompany.MyModule");

Just like a real cluster, this requires no other modules to have a dependency on the module being destroyed.