A chunk is…​

  • an immutable, strict, finite sequence of values

  • that supports efficient index-based random access of elements

A chunk Vector is…​

  • an immutable, strict, finite sequence of values

  • that supports efficient index-based random access of elements

Why not use Vector?

trait Socket[F[_]]:
  def reads: Stream[F, Byte]

trait Files[F[_]]:
  def readAll(path: Path): Stream[F, Byte]
  • Streams move chunks

  • If streams passed around vectors, we’d need to copy the underlying buffers.

Why not use Vector?


2.12 Vector







2.13 Vector





















  • Vector[Byte], Array[Byte], and Chunk[Byte]

  • Object size reported by Spark’s SizeEstimator.estimate

A chunk is…​

  • an immutable, strict, finite sequence of values

  • that supports efficient index-based random access of elements

  • that’s memory efficient for all sizes

  • that avoids unnecessary copying

A chunk is finite

trait Chunk[+A]:
  def size: Int

A chunk has efficient random access

trait Chunk[+A]:
  def size: Int
  def apply(idx: Int): A

A chunk is memory efficient

object Chunk:
  val empty: Chunk[Nothing] = new:
    def size = 0
    def apply(idx: Int) = throw new IndexOutOfBoundsException

  def singleton[A](a: A): Chunk[A] = new:
    def size = 1
    def apply(idx: Int) = idx match
      case 0 => a
      case _ => throw new IndexOutOfBoundsException

  def array[A](arr: Array[A]): Chunk[A] = new:   (1)
    def size = arr.length
    def apply(idx: Int) = arr(idx)
1Not making a defensive copy for performance reasons

A chunk avoids copying

object Chunk:
  def indexedSeq[A](as: IndexedSeq[A]): Chunk[A] = new:
    def size = as.size
    def apply(idx: Int) = as(idx)

  import java.nio.ByteBuffer

  def byteBuffer(buffer: ByteBuffer): Chunk[Byte] = new:  (1)
    private val b = buffer.duplicate().asReadOnlyBuffer
    def size = b.remaining
    def apply(idx: Int) = b.get(b.position + idx)
1Not making a defensive copy for performance reasons

A chunk avoids copying

val huge: Chunk[Byte] = ???
val pfx: Chunk[Byte] = huge.take(10)
val sfx: Chunk[Byte] = huge.drop(10)

A chunk avoids copying

val huge: Chunk[Byte] = ???
val (prefix, suffix) = huge.splitAt(10)
process(prefix) >> saveForLater(suffix)

A chunk avoids copying

case class ArraySlice[+A](values: Array[A],
                          offset: Int,
                          size: Int) extends Chunk[A]:
  require(offset >= 0 && ...)

  def apply(idx: Int) =
    if idx < 0 || idx >= size then throw new IndexOutOfBoundsException()
    else values(offset + idx)

  def splitAt(idx: Int) =                              (1)
    if idx <= 0 then Chunk.empty -> this
    else if idx >= size then this -> Chunk.empty
    else ArraySlice(values, offset, idx) ->
           ArraySlice(values, offset + idx, length - idx)
1Zero copy splitAt at cost of non-strictness and potentially increased memory usage

Combinators: foreach

trait Chunk[+A]:
  def size: Int
  def apply(idx: Int): A

  def foreach(f: A => Unit): Unit =
    var i = 0
    while (i < size)
      i += 1

Combinators: foreachWithIndex

trait Chunk[+A]:
  def size: Int
  def apply(idx: Int): A

  def foreachWithIndex(f: (A, Int) => Unit): Unit =
    var i = 0
    while (i < size)
      f(apply(i), i)
      i += 1

Combinators: map

trait Chunk[+A]:
  def map[B](f: A => B): Chunk[B] =

Combinators: map

trait Chunk[+A]:
  def map[B](f: A => B): Chunk[B] =
    var arr = new Array[B](size)                (1)
    foreachWithIndex((a, i) => arr(i) = f(a))
1cannot find class tag for element type B

Combinators: mapCompact

trait Chunk[+A]:
  def mapCompact[B: ClassTag](f: A => B): Chunk[B] =   (1)
    var arr = new Array[B](size)
    foreachWithIndex((a, i) => arr(i) = f(a))
1Add a ClassTag constraint

Combinators: mapCompact

mapCompact doesn’t exist on Chunk - why?

  • Function1 is not specialized for all primitives

    trait Function1[
      @specialized(Int, Long, Double) -T1,
      @specialized(Int, Long, Float, Double, Boolean, Unit) +R]
  • ClassTag constraints virally propagate

  • Forces folks to chose between map and mapConcat

  • Doesn’t scale to other operations

Combinators: map

trait Chunk[+A]:
  def map[B](f: A => B): Chunk[B] =
    var arr = new Array[Any](size)               (1)
    foreachWithIndex((a, i) => arr(i) = f(a))
    Chunk.array(arr).asInstanceOf[Chunk[B]]      (2)
1Create an an Array[Any] instead
2Unsound! Must ensure the underlying array is never accessed as an Array[B]

Combinators: compact

trait Chunk[+A]:
  def toArray[A2 >: A: ClassTag]: Array[A] =
    val arr = new Array[A2](size)
    foreachWithIndex((a, i) => arr(i) = a)

  def compact[A2 >: A: ClassTag]: Chunk[A] =

Combinators: filter

trait Chunk[+A]:
  def filter(p: A => Boolean): Chunk[A] =

Combinators: filter

trait Chunk[+A]:
  def filter(p: A => Boolean): Chunk[A] =
    val b = collection.mutable.ArrayBuilder.make[Any]  (1) (2)
    foreach(a => if p(a) then b += a)
1Use ArrayBuilder instead of Array since we don’t know final size
2Use Any like in map, resulting in boxing of primitives

Avoiding Copying

val huge: Chunk[Byte] = ???
val crlf: Chunk[Byte] = Chunk.array("\r\n".getBytes)

val discouraged = Stream.chunk(huge ++ crlf)
val encouraged = Stream.chunk(huge) ++ Stream.chunk(crlf)

How can we discourage copying?

Avoiding Copying

val huge: Chunk[Byte] = ???
val crlf: Chunk[Byte] = Chunk.array("\r\n".getBytes)

val discouraged = Stream.chunk(Chunk.concat(List(huge, crlf)))
val encouraged = Stream.chunk(huge) ++ Stream.chunk(crlf)

Make it inconvenient!


object Chunk:
  def concat[A: ClassTag](chunks: Seq[Chunk[A]]): Chunk[A] =
    val totalSize = chunks.foldMap(_.size)
    val arr = new Array[A](totalSize)
    var offset = 0
    chunks.foreach { c =>
      if !c.isEmpty then
        c.copyToArray(arr, offset)
        offset += c.size


def unconsN[F[_], O](
  s: Stream[F, O],
  n: Int
): Pull[F, Nothing, (Chunk[O], Stream[F, O])] =
  def go(
    acc: Queue[Chunk[O]],
    s: Stream[F, O],
    n: Int
  ): Pull[F, Nothing, (Chunk[O], Stream[F, O])] =
    s.pull.uncons.flatMap {
      case None => Pull.pure(Chunk.concat(acc) -> Stream.empty)
      case Some((hd, tl)) =>
        if hd.size < n then
          go(acc :+ hd, tl, n - hd.size)
          val (pfx, sfx) = hd.splitAt(n)
          val out = Chunk.concat(acc :+ pfx)
          Pull.pure(out -> tl.cons(sfx))
  go(Queue.empty, s, n)


  • Problem: concat requires a ClassTag[O]

  • Option 1: add ClassTag constraint

    def unconsN[F[_], O: ClassTag](...)
  • Option 2: change return type to Queue[Chunk[O]]

    def unconsN[F[_], O](
    ): Pull[F, Nothing, (Queue[Chunk[O]], Stream[F, O])]
  • Option 3: remove ClassTag constraint from concat

concat tagless

object Chunk:
  def concat[A](chunks: Seq[Chunk[A]]): Chunk[A] =
    val totalSize = chunks.foldMap(_.size)
    val arr = new Array[Any](totalSize)
    var offset = 0
    chunks.foreach { c =>
      if !c.isEmpty then
        c.copyToArray(arr, offset)
        offset += c.size

Downside: primitives get boxed. Can we fix?

concat too clever

object Chunk:
  def concat[A](chunks: Seq[Chunk[A]]): Chunk[A] =
    if chunks.forall(containsOnly[Byte])
    then concatTagged[Byte](
    else if ...
    else concatUntagged(chunks)

  def containsOnly[A](c: Chunk[A])(using ct: ClassTag[A]): Boolean =
    c.knownElementType == ct || c.forall(_.isInstanceOf[A])

  def concatUntagged[A](chunks: Seq[Chunk[A]]): Chunk[A] =
    /* store elements in an Array[Any] */
  def concatTagged[A: ClassTag](chunks: Seq[Chunk[A]]): Chunk[A] = ???
    /* store elements in an Array[A] */

concat too clever

scalajs> 1.isInstanceOf[Byte]
res0: Boolean = true

Reflection is never safe.[1]

1. I made up this Scala.js REPL, but the result is real!


  • Problem: concat requires a ClassTag[A]

  • Option 4: make Queue[Chunk[A]] a subtype of Chunk[A]


import scala.collection.immutable.Queue as SQueue

object Chunk:
  class Queue[+A] private (
    val chunks: SQueue[Chunk[A]],
    val size: Int
  ) extends Chunk[A]:

    def +:[A2 >: A](c: Chunk[A2]): Queue[A2] =
      if c.isEmpty then this
      else new Queue(c +: chunks, c.size + size)

    def :+[A2 >: A](c: Chunk[A2]): Queue[A2] =
      if c.isEmpty then this
      else new Queue(chunks :+ c, size + c.size)

    def apply(i: Int): O =
      if i < 0 || i >= size
      then throw new IndexOutOfBoundsException()
      def go(chunks: SQueue[Chunk[O]], offset: Int): O =
        val head = chunks.head
        if offset < head.size then head(offset)
        else go(chunks.tail, offset - head.size)
      go(chunks, i)


trait Chunk[+A]:
  def ++[A2 >: A](that: Chunk[A2]): Chunk[A2] =
    if isEmpty then that
      that match
        case that if that.isEmpty  => this
        case that: Chunk.Queue[A2] => this +: that
        case that                  => Chunk.Queue(this, that)

unconsN Redux

def unconsN[F[_], O](
  s: Stream[F, O],
  n: Int
): Pull[F, Nothing, (Chunk[O], Stream[F, O])] =
  def go(
    acc: Chunk[O],
    s: Stream[F, O],
    n: Int
  ): Pull[F, Nothing, (Chunk[O], Stream[F, O])] =
    s.pull.uncons.flatMap {
      case None => Pull.pure(acc -> Stream.empty)
      case Some((hd, tl)) =>
        if hd.size < n then
          go(acc ++ hd, tl, n - hd.size)
          val (pfx, sfx) = hd.splitAt(n)
          val out = acc ++ pfx
          Pull.pure(out -> tl.cons(sfx))
  go(Queue.empty, s, n)

Chunk.Queue Runtime

What’s asymptotic runtime of Chunk.Queue#apply?
  • If number of constituent chunks is much smaller than total size, then O(1)

  • As number of constituent chunks approaches total size, runtime approaches O(n)


What’s asymptotic runtime of foreach?
trait Chunk[+A]:
  def foreach(f: A => Unit): Unit =
    var i = 0
    while (i < size)
      i += 1
  • O(n) when number of constituent chunks much smaller than total size

  • O(n2) when number of constituent chunks approaches total size

Restoring foreach linearity

object Chunk:
  class Queue[+A] private (
    val chunks: SQueue[Chunk[A]],
    val size: Int
  ) extends Chunk[A]:

    def foreach(f: A => Unit): Unit =

    def foreachWithIndex(f: (A, Int) => Unit): Unit =
      var i = 0
      chunks.foreach { chunk =>
        chunk.foreach { a =>
          f(a, i)
          i += 1

What about lookup by index?

What about lookup by index?

  • Compute an array of accumulated sizes

    • sizes(0) = size of first chunk

    • sizes(1) = sum of sizes of first and second chunk

  • To lookup element at index i:

    • Binary search sizes to find smallest entry greater than i

    • The chunk at the resulting index contains the desired element

  • (Suggested by fs2 maintainer Diego E. Alonso Blas)

What about lookup by index?

chunks accum
> find smallest acc > 20
> returns idx = 3, acc = 34
> return chunks(idx)(20 - 14)

What about lookup by index?

class Queue[+A](...):
  private[this] lazy val accumulatedSizes: (Array[Int], Array[Chunk[O]]) =
    val sizes = new Array[Int](chunks.size)
    val arr = new Array[Chunk[A]](chunks.size)
    var accSize = 0
    var i = 0
    chunks.foreach { c =>
      accSize += c.size
      sizes(i) = accSize
      arr(i) = c
      i += 1
    (sizes, arr)

  def apply(i: Int): A =
    if i < 0 || i >= size
    then throw new IndexOutOfBoundsException()
    if i == 0 then chunks.head(0)
    else if i == size - 1 then chunks.last.last.get
      val (sizes, chunks) = accumulatedSizes
      val j = java.util.Arrays.binarySearch(sizes, i)
      if j >= 0 then
        chunks(j + 1)(0)
        val k = -(j + 1)
        val accSizeBefore = if k == 0 then 0
                            else sizes(k - 1)
        chunks(k)(i - accSizeBefore)


  • For a Chunk.Queue with n elements and m constituent chunks:

    • first call to apply is O(m)

    • subsequent calls are O(log m)

    • Worst case m == n


A chunk is…​

  • a mostly immutable, mostly strict, finite sequence of values

  • that supports mostly efficient index-based random access of elements

  • that’s memory efficient for all sizes

  • that avoids unnecessary copying

Design is a series of logical decisions.

Design is a series of logical decisions.

Design is an iterative process of failures and successes, informed by experiments and analysis.


  • Identify important properties

  • Constraints provide tension

  • Be open to relaxing constraints

  • Benchmark common usage patterns

  • Iterate

