MetaRx User Manual v0.1.7

Reactive data structures

MetaRx currently implements four reactive data structures:

Channels

A channel models continuous values as a stream. It serves as a multiplexer for typed messages that consist of immutable values. Messages sent to the channel get propagated to the observers that have been attached to the channel — in the same order as they were added. It is possible to operate on channels with higher-order functions such as map(), filter() or take(). These methods may be chained, such that every produced value is propagated down the observer chain.

MetaRx differentiates between two top-level channel types:

There are four channel implementations:

Partial channels model optional values:

Note: Opt[T] is merely a convenience type and Var[Option[T]] could be used, too.

Operations

Here is a simple example for a channel that receives integers. We register an observer which prints all values on the console:

val ch = Channel[Int]() // initialise
ch.attach(println)      // attach observer
ch := 42                // produce value
Output:
42

Note: The := operator is a shortcut for the method produce.

The return values of operations are channels, therefore chaining is possible. Channels can be used to express data dependencies:

val ch = Channel[Int]()
ch.filter(_ > 3)
  .map(_ + 1)
  .attach(println)
ch := 42
ch := 1
Output:
43

Use the method distinct to produce a value if it is the first or different from the previous one. A use case is to perform time-consuming operations such as performing HTTP requests only once for the same user input:

ch.distinct.attach { query =>
  // perform HTTP request
}

Considering that you want to observe multiple channels of the same type and merge the produced values, you can use merge():

val a = Channel[String]()
val b = Channel[String]()
val c = Channel[String]()

val merged: ReadChannel[String] = a.merge(b).merge(c)
merged.attach(println)

c := "test"
Output:
test

A related function is or(), which detects changes in any of the passed channels. The operator | was defined for it:

val a = Channel[String]()
val b = Channel[String]()
val c = Channel[String]()

val or: ReadChannel[Unit] = a | b | c
or.attach(println)

b := "test"
Output:
()

For Boolean channels, the logical operators are defined, and yield a new channel with the result:

val a = Channel[Boolean]()
val b = Channel[Boolean]()

// a.zip(b).map { case (aVal, bVal) => aVal && bVal }
val aAndB: ReadChannel[Boolean] = a && b

// a.zip(b).map { case (aVal, bVal) => aVal || bVal }
val aOrB: ReadChannel[Boolean] = a || b

// a.isFalse()
val notA = !a

Furthermore, onTrue() and onFalse() are defined and will give a ReadChannel[Unit] that triggers when either true or false was produced.

Aritmetic operators like +, -, *, /, %, <, <=, >, >=, === and !== are also supported by those channels whose underlying types implement the Scala’s Numeric or Ordering trait:

val a = Channel[Int]()
val b = Channel[Int]()

val c: ReadChannel[Int] = 5 - 2 * a + 3 / b
val d: ReadChannel[Boolean] = c >= 42

It must be noted that streaming operations have different semantics than their non-reactive counterparts. For brevity, only certain combinators are covered by the manual. For the rest, please refer to the ScalaDoc documentation.

State channels

For better performance, Channel does not cache the produced values. Some operations cannot be implemented without access to the current value though. Also, often it is necessary to poll the current value. For these reasons state channels such as Var or Opt were introduced. The following example visualises the different behaviours:

val ch = Var(42)
ch.attach(println)

val ch2 = Channel[Int]()
ch2 := 42  // Value is lost as ch2 does not have any observers
ch2.attach(println)
Output:
42

update() is an operation that requires that the produced values are persisted. update() takes a function which modifies the current value:

val ch = Var(2)
ch.attach(println)
ch.update(_ + 1)
Output:
2
3

A partially-defined channel (Opt) is constructed as follows:

val x = Opt[Int]()
x := 42

Alternatively, a default value may be passed:

val x = Opt(42)

A state channel provides all the methods a channel does. Var[T] and Opt[T] can be obtained from any existing ReadChannel[T] using the method state:

val ch = Channel[Int]()
val chOpt: Opt[Int] = ch.state
val chVar: Var[Int] = ch.state(42)

chOpt is undefined as long as no value was produced on ch. chVar will be initialised with 42 and the value is overridden with the first produced value on ch.

If writing capabilities are not required, cache is to be preferred:

val ch = Channel[Int]()
val chPart:  ReadPartialChannel[Int] = ch.cache
val chState: ReadStateChannel[Int]   = ch.cache(42)

biMap() allows to implement a bi-directional map, i.e. a stream with back-propagation:

val map   = Map(1 -> "one", 2 -> "two", 3 -> "three")
val id    = Var(2)
val idMap = id.biMap(
  (id: Int)     => map(id),
  (str: String) => map.find(_._2 == str).get._1)
id   .attach(x => println("id   : " + x))
idMap.attach(x => println("idMap: " + x))
idMap := "three"
Output:
id   : 2
idMap: two
id   : 3
idMap: three

biMap() can be used to implement a lens as a channel. The following example defines a lens for the field b. It has a back channel that composes a new object with the changed field value.

case class Test(a: Int, b: Int)
val test = Var(Test(1, 2))
val lens = test.biMap(_.b, (x: Int) => test.get.copy(b = x))
test.attach(println)
lens := 42
Output:
Test(1,2)
Test(1,42)

A LazyVar evaluates its argument lazily. In the following example, it points to a mutable variable:

var counter = 0
val ch = LazyVar(counter)
ch.attach(value => { counter += 1; println(value) })
ch.attach(value => { counter += 1; println(value) })
Output:
0
1

Call semantics

Functions passed to higher-order operations are evaluated on-demand:

val ch = Var(42).map(i => { println(i); i + 1 })
ch.attach(_ => ())
ch.attach(_ => ())
Output:
42
42

The value of a state channel gets propagated to a child when it requests the value (flush()). In the example, Var delays the propagation of the initial value 42 until the first attach() call. attach() goes up the channel chain and triggers the flush on each channel. In other words, map(f) merely registers an observer, but does not call f right away. f is called each time when any of its direct or indirect children uses attach().

This reduces the memory usage and complexity of the channel implementation as no caching needs to be performed. On the other hand, you may want to perform on-site caching of the results of f, especially if the function is side-effecting.

The current value of a state channel may be read at any time using .get (if available) or flush().

There are operations that maintain state for all observers. For example, skip(n) counts the number of produced values[11]. As soon as n is exceeded, all subsequent values are passed on. The initial attach() calls ignore the first value (42), but deal with all values after that:

val ch = Var(42)
val dch = ch.drop(1)
dch.attach(println)
dch.attach(println)
ch := 23
Output:
23
23

If you do not want to receive the initial value, you may use silentAttach() instead of attach(). Another possibility would be to use tail.attach(). However, this will only work for state channels with a single initial value. Buffers, dictionaries and sets on the other hand have several initial values.

Cycles

Certain propagation flows may lead to cycles:

val todo = Channel[String]()
todo.attach { t =>
  println(t)
  todo := ""
}
todo := "42"

Setting todo will result in an infinite loop. Such flows are detected and will lead to a run-time exception. Otherwise, the application would block indefinitely which makes debugging more difficult.

If a cycle as in the above example is expected, use the combinator filterCycles to make it explicit. This will ignore value propagations caused by a cycle.

Subscription channels

Sub is short for subscription and its purpose is to take values as well as channels.

val x = Var(42)

val sub = Sub(23)
sub.attach(println)

sub := x  // `sub` will subscribe all values produced on `x`
x := 200  // Gets propagated to `sub`

sub := 10 // Cancel subscription and set value to 10
x := 404  // Doesn't get propagated to `sub`
Output:
23
42
200
10

Binary channels

A binary channel (Bin) can be used to communicate between two components. Values produced on Bin will be propagated to left and right, but not between those two. It can be used for two-way binding.

val bin = Bin(0)

def componentA(): Unit =
  bin.left.attach { x =>
    println(s"Component A received: $x")
    if (x == 3) bin.right := 42
  }

def componentB(): Unit = {
  bin.right.attach(x => println(s"Component B received: $x"))
  (1 to 3).foreach(bin.left := _)
}

componentA()  // Sends 42 to component B if current value is 3
componentB()  // Sends 1..3 to component A

// `bin` is a state channel and stores the current value
println(s"Current value: ${bin.get}")

bin := 23  // Broadcast to both components
Output:
Component A received: 0
Component B received: 0
Component A received: 1
Component A received: 2
Component A received: 3
Component B received: 42
Current value: 42
Component A received: 23
Component B received: 23

Dependency channels

Consider right = x + width. Obviously the following holds: x = right - width. A dependency channel (Dep) allows you to express such dependencies in code:

val right: Dep[Double] = x.dep[Double](_ + width, _ - width)

If you update right, it will re-compute x. This is not only limited to equations, but can be used for any invertible operation.

Note that Dep is conceptually related to biMap. The difference is that biMap only deals with single values, whereas Dep is an extension of Sub which supports channel subscriptions.

Picklers

This feature relies on url for serialising. You will need to add the following dependency:

libraryDependencies += "pl.metastack" %%  "metarx-upickle" % "0.1.7"  // Scala
libraryDependencies += "pl.metastack" %%% "metarx-upickle" % "0.1.7"  // Scala.js

Use it as follows:

import pl.metastack.metarx.Upickle._
import upickle.default._

val buffer = Buffer(1, 2, 3)

val json = write(buffer)
println(json)

val decoded = read[Buffer[Int]](json)
println(decoded)
Output:
[1,2,3]
ArrayBuffer(1, 2, 3)

Buffers

Buffers are reactive lists. State changes such as row additions, updates or removals are encoded as delta objects. This allows to reflect these changes directly in the DOM, without having to re-render the entire list. Buffer[T] is therefore more efficient than Channel[Seq[T]] when dealing with list changes.

The following example creates a buffer with three initial rows, observes the size[12] and then adds another row:

val buf = Buffer(1, 2, 3)
buf.size.attach(println)
buf += 4
Output:
3
4

All polling methods have a dollar sign as suffix $:

val buf = Buffer(1, 2, 2, 3)
println(buf.distinct$)
Output:
ArrayBuffer(1, 2, 3)

An example of using removeAll():

val buf  = Buffer(3, 4, 5)
val mod2 = buf.filter$(_ % 2 == 0)

buf.removeAll(mod2.get)

Note: Buffer will identify rows by their value if the row type is a case class. In this case, operations like insertAfter() or remove() will always refer to the first occurrence. This is often not desired. An alternative would be to define a class instead or to wrap the values in a Ref[] object.

case class Todo(value: String)
val todos = Buffer[Ref[Todo]]()
todos.map { case tr @ Ref(t) =>
  // ...
}

The value of a Ref[] can be obtained by calling get. However, it is more convenient to do pattern matching as in the example.

You can observe the delta objects produced by a buffer:

val buf = Buffer(1, 2, 3)
buf.changes.attach(println)
buf += 4
buf.clear()
Output:
Insert(Last(),1)
Insert(Last(),2)
Insert(Last(),3)
Insert(Last(),4)
Clear()

All streaming operations that a buffer provides are implemented in terms of the changes channel.

Dictionaries

Dictionaries are unordered maps from A to B. MetaRx abbreviates the type as Dict.

Mapping buffers

If you want to map each element in a buffer to a value, you can use mapTo:

val buf = Buffer(1, 2, 3)
val map = buf.mapTo(_ * 2)

buf += 4
println(map.buffer.get)
Output:
Map(2 -> 4, 4 -> 8, 1 -> 2, 3 -> 6)

Sets

Reactive sets are implemented as BufSet[13].


  1. In Rx terms, Var would correspond to a cold observer as attaching to it will flush its current value. This is different from Channel which loses its messages when there are no subscribers.  ↩

  2. It can be used to create delta channels from DOM variables by binding to the corresponding events that triggered by the value changes. For an example see Node.click.  ↩

  3. n must be greater than 0.  ↩

  4. size returns a ReadChannel[Int].  ↩

  5. This name was chosen as Set would have collided with Scala's implementation.  ↩

Generated with MetaDocs v0.1.1