Reading/Writing IPC formats — Apache Arrow v20.0.0 (original) (raw)

Arrow defines two types of binary formats for serializing record batches:

Writing and Reading Streaming Format#

First, let’s populate a VectorSchemaRoot with a small batch of records

BitVector bitVector = new BitVector("boolean", allocator); VarCharVector varCharVector = new VarCharVector("varchar", allocator); for (int i = 0; i < 10; i++) { bitVector.setSafe(i, i % 2 == 0 ? 0 : 1); varCharVector.setSafe(i, ("test" + i).getBytes(StandardCharsets.UTF_8)); } bitVector.setValueCount(10); varCharVector.setValueCount(10);

List fields = Arrays.asList(bitVector.getField(), varCharVector.getField()); List vectors = Arrays.asList(bitVector, varCharVector); VectorSchemaRoot root = new VectorSchemaRoot(fields, vectors);

Now, we can begin writing a stream containing some number of these batches. For this we use ArrowStreamWriter(DictionaryProvider used for any vectors that are dictionary encoded is optional and can be null))

try ( ByteArrayOutputStream out = new ByteArrayOutputStream(); ArrowStreamWriter writer = new ArrowStreamWriter(root, /DictionaryProvider=/null, Channels.newChannel(out)); ) { // ... do write into the ArrowStreamWriter }

Here we used an in-memory stream, but this could have been a socket or some other IO stream. Then we can do

writer.start(); // write the first batch writer.writeBatch();

// write another four batches. for (int i = 0; i < 4; i++) { // populate VectorSchemaRoot data and write the second batch BitVector childVector1 = (BitVector)root.getVector(0); VarCharVector childVector2 = (VarCharVector)root.getVector(1); childVector1.reset(); childVector2.reset(); // ... do some populate work here, could be different for each batch writer.writeBatch(); }

writer.end();

Note that, since the VectorSchemaRoot in the writer is a container that can hold batches, batches flow throughVectorSchemaRoot as part of a pipeline, so we need to populate data before writeBatch, so that later batches could overwrite previous ones.

Now the ByteArrayOutputStream contains the complete stream which contains 5 record batches. We can read such a stream with ArrowStreamReader. Note that the VectorSchemaRoot within the reader will be loaded with new values on every call to loadNextBatch()

try (ArrowStreamReader reader = new ArrowStreamReader(new ByteArrayInputStream(out.toByteArray()), allocator)) { // This will be loaded with new values on every call to loadNextBatch VectorSchemaRoot readRoot = reader.getVectorSchemaRoot(); Schema schema = readRoot.getSchema(); for (int i = 0; i < 5; i++) { reader.loadNextBatch(); // ... do something with readRoot } }

Here we also give a simple example with dictionary encoded vectors

// create provider DictionaryProvider.MapDictionaryProvider provider = new DictionaryProvider.MapDictionaryProvider();

try ( final VarCharVector dictVector = new VarCharVector("dict", allocator); final VarCharVector vector = new VarCharVector("vector", allocator); ) { // create dictionary vector dictVector.allocateNewSafe(); dictVector.setSafe(0, "aa".getBytes()); dictVector.setSafe(1, "bb".getBytes()); dictVector.setSafe(2, "cc".getBytes()); dictVector.setValueCount(3);

// create dictionary Dictionary dictionary = new Dictionary(dictVector, new DictionaryEncoding(1L, false, /indexType=/null)); provider.put(dictionary);

// create original data vector vector.allocateNewSafe(); vector.setSafe(0, "bb".getBytes()); vector.setSafe(1, "bb".getBytes()); vector.setSafe(2, "cc".getBytes()); vector.setSafe(3, "aa".getBytes()); vector.setValueCount(4);

// get the encoded vector IntVector encodedVector = (IntVector) DictionaryEncoder.encode(vector, dictionary);

ByteArrayOutputStream out = new ByteArrayOutputStream();

// create VectorSchemaRoot List fields = Arrays.asList(encodedVector.getField()); List vectors = Arrays.asList(encodedVector); try (VectorSchemaRoot root = new VectorSchemaRoot(fields, vectors)) {

  // write data
  ArrowStreamWriter writer = new ArrowStreamWriter(root, provider, Channels.newChannel(out));
  writer.start();
  writer.writeBatch();
  writer.end();

}

// read data try (ArrowStreamReader reader = new ArrowStreamReader(new ByteArrayInputStream(out.toByteArray()), allocator)) { reader.loadNextBatch(); VectorSchemaRoot readRoot = reader.getVectorSchemaRoot(); // get the encoded vector IntVector intVector = (IntVector) readRoot.getVector(0);

// get dictionaries and decode the vector
Map<Long, Dictionary> dictionaryMap = reader.getDictionaryVectors();
long dictionaryId = intVector.getField().getDictionary().getId();
try (VarCharVector varCharVector =
    (VarCharVector) DictionaryEncoder.decode(intVector, dictionaryMap.get(dictionaryId))) {
  // ... use decoded vector
}

} }

Writing and Reading Random Access Files#

The ArrowFileWriter has the same API as ArrowStreamWriter

try ( ByteArrayOutputStream out = new ByteArrayOutputStream(); ArrowFileWriter writer = new ArrowFileWriter(root, /DictionaryProvider=/null, Channels.newChannel(out)); ) { writer.start(); // write the first batch writer.writeBatch(); // write another four batches. for (int i = 0; i < 4; i++) { // ... do populate work writer.writeBatch(); } writer.end(); }

The difference between ArrowFileReader and ArrowStreamReader is that the input source must have a seek method for random access. Because we have access to the entire payload, we know the number of record batches in the file, and can read any at random

try (ArrowFileReader reader = new ArrowFileReader( new ByteArrayReadableSeekableByteChannel(out.toByteArray()), allocator)) {

// read the 4-th batch ArrowBlock block = reader.getRecordBlocks().get(3); reader.loadRecordBatch(block); VectorSchemaRoot readBatch = reader.getVectorSchemaRoot(); }