The latest version of my open source libraries have standardized on two new interfaces - Writable and Readable - that allow efficient streaming data exchange between libraries. This blog post will explore the origin of these two interfaces, what purpose they serve, and how they can enable more efficient inter-operabilty between a wide range of different libraries and frameworks.
About the Author: Haoyi is a software engineer, and the author of many open-source Scala tools such as the Ammonite REPL and the Mill Build Tool. If you enjoyed the contents on this blog, you may also enjoy Haoyi's book Hands-on Scala Programming
The basic issue that Writable
and Readable
solve is that many libraries and frameworks need to exchange data with one another, but the number of ways they can do so is limited enough to cause inefficiency. For example:
Scalatags renders HTML templates, which are often written to files using OS-Lib, or served to a browser via a Cask server
uPickle generates JSON and MessagePack blobs, which are written to files using OS-Lib, served to a browser via a Cask HTTP server, or uploaded to a server via Requests-Scala
Requests-Scala, apart from allowing streaming uploads, also can provide streaming downloads that can be streamed to files via OS-Lib, into subsequent Requests-Scala HTTP requests, or processed in-memory
uPickle and FastParse both need to incoming data to parse, which is often read from a file via OS-Lib, received from a HTTP request via Requests-Scala, or uploaded to a Cask HTTP endpoint via the cask.Request
API.
These libraries are all not aware of each other, and thus you end up with only a few options to exchange data between them:
java.lang.String
s: this requires materializing all the data in memory, which is often wasteful and unnecessary. Why should I build up a huge String in memory only to write it to a file and then throw it away immediately?
Array[Byte]
: this has the same problem as java.lang.String
java.io.InputStream
: this allows some amount of streaming, but it can be difficult to create an InputStream from an arbitrary data source. For example, turning a Scalatags template into an InputStream
to allow streaming data access would require a rewrite of most of Scalatags' library internals
Because InputStream
is often too difficult to implement, a developer using these libraries will typically end up materializing many short-lived String
s and Array[Byte]
s in memory when sending data between these two libraries. While not the end of the world - the garbage collector cleans them up once they are no longer needed - this adds unnecessary overhead to do work and use memory that should really be unnecessary.
One point to note is that many libraries are both data sources as well as data sinks: uPickle generates JSON data, but it also parses it. Requests-Scala both uploads and downloads data, and Cask HTTP endpoints both receive and return data to browsers. In all these cases, it was common to materialize short lived String
s or Array[Byte]
s to do the data exchange between the libraries.
Because materializing large short-lived String
s and Array[Byte]
s in memory is wasteful and inefficient, most of these libraries had already grown a menagerie of ad-hoc ways to stream the data. For example:
ChannelParser
, which allowed streaming data input over a Java NIO Channel, and had a .writeTo(out: java.io.Writer): Unit
and .writeBinaryTo(out: java.io.OutputStream)
method in addition to .write():
String
and .writeBinary(): Array[Byte]
to allow writing the data directly to a file or elsewhere:def writeTo[T: Writer](t: T,
out: java.io.Writer,
indent: Int = -1,
escapeUnicode: Boolean = false): Unit
def writeBinaryTo[T: Writer](t: T, out: java.io.OutputStream): Unit
.writeTo
method, to allow streaming the generated HTML directly to files:def writeTo(strb: java.io.Writer): Unit
requests.get.stream
method, that exposed both upload and download streams for you to send and receive data in a streaming fashion:requests.get.stream("https://api.github.com/events")(
onUpload = outputStream => {...},
onDownload = inputStream => {...}
)
Iterator[String]
s, not just raw String
s, to allow parsing of data too large to fit in memory:fastparse.parse(Iterator("i", "am", "cow"), parser(_))
os.read.chunks
and os.read.lines.stream
, methods, in addition to the batch-oriented os.read.bytes
and os.read
, that returned Generators
to allow incremental processing of the read bytes or text lines:os.read.chunks(p: ReadablePath, chunkSize: Int): os.Generator[(Array[Byte], Int)]
os.read.chunks(p: ReadablePath, buffer: Array[Byte]): os.Generator[(Array[Byte], Int)]
os.read.lines.stream(arg: os.ReadablePath): os.Generator[String]
os.read.lines.stream(arg: os.ReadablePath, charSet: Codec): os.Generator[String]
All these APIs work, and serve their purpose of allowing a developer to perform streaming reads and writes in scenarios where efficient data transfer was important. However, the ad-hoc nature of the APIs meant that trying to connect these libraries together was clunky.
For example, let us imagine I wanted to upload a file directly up to a HTTP request, download the response into my JSON parser, and then modify the JSON and write the modified JSON to a file? It would look something like this:
val response = requests.put(
"http://httpbin.org/put",
data = os.read(os.pwd / "input.txt")
)
val json = ujson.read(response.text())
val interestingJson = json("data")
os.write(os.pwd / "output.json", interestingJson.render())
Looks easy enough. But this code does the unnecessary work of reading input.txt
into an in-memory string, aggregating the HTTP response into an in-memory string for parsing into JSON, and then marshalling the JSON structure into an in-memory string before writing it to output.json
. That's 3 places where we are aggregating large in-memory strings unnecessarily.
What if we wanted to do this in a streaming fashion, to avoid creating these throw-away in memory strings? It would look like this:
requests.put.stream("http://httpbin.org/put")(
onUpload = outputStream => {
for((buf, n) <- os.read.chunks(os.pwd / "input.txt")) {
outputStream.write(buf, 0, n)
}
},
onDownload = inputStream => {
val json = ujson.read(java.nio.channel.Channels.newChannel(inputStream))
val interestingJson = json("form")
val out = os.write.outputStream(os.pwd / "output.json")
val writer = new java.io.OutputStreamWriter(out)
try ujson.writeTo(interestingJson, writer)
finally {
writer.close()
out.close()
}
}
)
Doable, but clunky and verbose: even though we are conceptually doing the same thing as earlier, we end up with tons of boilerplate to deal with chunks, channels, OutputStreams and OutputStreamWriters, and so on.
This is because even though the libraries conceptually all expose the same functionality "receive streaming data" and "return streaming data", the ad-hoc nature of these APIs means that a good amount of glue code needs to be written to e.g. write a Generator[(Array[Byte], Int)]
into a java.io.OutputStream
for upload, turn the download java.io.InputStream
into a java.nio.channel.Channel
so it can be parsed into JSON, and wrap the OutputStream
of a file in a java.io.Writer
so the modified JSON can be streamed into it.
With the standard Readable
and Writable
interfaces introduced in the latest versions of all these libraries, this entire flow is as simple as:
val response = requests.put.stream(
"http://httpbin.org/put",
data = os.read.stream(os.pwd / "input.txt")
)
val json = ujson.read(response)
val interestingJson = json("data")
os.write(os.pwd / "output.json", interestingJson)
Performing these streaming workflows becomes just as easy as the previous batch equivalents: we simply need to add some .stream
s, but otherwise the code is almost identical. You can try this out yourself in version 2.0.0 of the Ammonite Scala REPL:
Welcome to the Ammonite Repl 2.0.0
(Scala 2.13.1 Java 11.0.2)
If you like Ammonite, please support our development at www.patreon.com/lihaoyi
@ val response = requests.put.stream(
"http://httpbin.org/put",
data = os.read.stream(os.pwd / "input.txt")
)
response: geny.Readable = requests.Requester$$anon$1@4b770e40
@ val json = ujson.read(response)
json: ujson.Value.Value = Obj(
LinkedHashMap(
"args" -> Obj(LinkedHashMap()),
"data" -> Str("hello"),
...
@ val interestingJson = json("data")
interestingJson: ujson.Value = Str("hello")
@ os.write(os.pwd / "output.json", interestingJson)
@ os.read(os.pwd / "output.json")
res4: String = "\"hello\""
The way this works is that rather than every library implementing their own ad-hoc way of receiving or returning streaming data, they all standardize on the following two interfaces:
trait Writable{
def writeBytesTo(out: OutputStream): Unit
}
trait Readable extends Writable{
def readBytesThrough[T](f: InputStream => T): T
def writeBytesTo(out: OutputStream): Unit = readBytesThrough(Internal.transfer(_, out))
}
These two minimal interfaces encapsulate the existing ad-hoc conventions that the various methods that library data types have already grown to manage streaming data, and categorize them into two groups:
Writable
data types that can push streaming data to an OutputStream
: this includes things like Scalatags HTML fragments, uPickle JSON values, Requests responses, OS-Lib files, etc.
Readable
data types that can expose an InputStream
for a consumer to pull from: this includes Requests responses, OS-Lib files, but not Scalatags HTML fragments or uPickle JSON values
Note that Readable
doesn't provide an InputStream
directly, but only within the callback to readBytesThrough
. This allows the data source to perform any necessary cleanup actions after the reading is complete: closing files, terminating HTTP connections, and so on.
Writable
and Readable
essentially categorize data sources into pull-based and push-based data sources. This idea of "pull" v.s. "push" is core to the idea of streaming data exchange:
It is easier to implement the push-based Writable
than the pull-based Readable
, as any data type that had some sort of def writeTo(out:
OutputStream)
or def writeTo(out: Writer)
or def writeTo(out:
StringBuilder)
method could trivially be adapted to support the def
writeToByte
interface. On the other hand, many of these would need invasive refactoring in order to support a pull-based interface
On the other hand, as a function receiving data, it is easier to process data from a pull-based Readable
than from a push-based Writable
. Hence uPickle, FastParse, etc. only support parsing data from pull-based Readable
s. Again, trying to make them support push-based data sources would require invasive refactoring of their JSON and combinator parsers
Any pull-based Readable
can be trivially used as a push-based Writable
, and any method that can receive a push-based Writable
can also receive a pull-based Readable
. This is reflected in type inheritence hierarchy, with trait Readable extends Writable
.
Libraries can both produce as well as receive instances of Readable
and Writable
. For example, the old requests.stream
API that takes callbacks for both streaming upload and download:
def stream(...)(onUpload: OutputStream => Unit,
onDownload: InputStream => Unit): Unit
Has been replaced by a method that receives a Writable
and returns a Readable
:
def stream(data: Writable): Readable
This makes it clear that streaming uploads can use any data source - push or pull - and in return provides a streaming download that is a pull-based source suitable for use in APIs that require pull-based streams like ujson.read
or fastparse.parse
In general, Writable
and Readable
do not attempt to modify how the various libraries work internally: they simply apply a standard interface on libraries with similar properties so they can be used interchangeably. Trying to refactor Scalatags or uPickle to become pull-based data sources that expose InputStream
s, or trying to refactor FastParse to allow "push parsing" or "async parsing", is beyond the scope of this standardization.
Now that we have seen how Readable
and Writable
work, we can re-visit the cleaned up streaming code snippet for earlier and understand how these interfaces allow the seamless interoperability between libraries:
val response = requests.put.stream(
"http://httpbin.org/put",
data = os.read.stream(os.pwd / "input.txt")
)
val json = ujson.read(response)
val interestingJson = json("data")
os.write(os.pwd / "output.json", interestingJson)
Essentially, the (simplified) signatures we are looking at are:
os.read.stream(p: os.Path): Readable
requests.put.stream(url: String, data: Writable): Readable
ujson.read(data: Readable): ujson.Value extends Writable
os.write(p: os.Path, data: Writable): Unit
Thus, because all our library methods accept and return Readable
/Writable
data types, we can seamlessly pass data from one library to another in a streaming fashion without needing any boilerplate to convert data back and forth.
Writable
and Readable
are provided by the tiny Geny library, and are now broadly supported by all libraries I maintain:
scalatags.Text.Tag
implements Writable
fastparse.parse
now can take a Readable
ujson.read
/upack.read
/upickle.default.read
now all accept Readable
ujson.Value
/upack.Msg
/upickle.default.stream
/upickle.default.streamBinary
now implement Writable
requests.{put,post}
's data
attribute now accepts a Writable
requests.{get,put,post}.stream
now returns a Readable
cask.Request
now implements Readable
@cask.get
, @cask.post
, etc. endpoints can return any Writable
os.write
and os.proc
's stdin
arguments now accept Writable
os.read.stream
returns a Readable
The above listing of which methods accept or return Writable
and Readable
tells you exactly which streaming workflows are possible: streaming JSON to disk, streaming files on disk to Requests-Scala HTTP requests or Cask HTTP responses, using FastParse directly on a file without loading the whole in memory, and so on.
You can easily put together pretty elaborate streaming data processing workflows due to the standardization. For example, here is a snippet that streams data from one HTTP service to another, streams the data into a JSON parser, selects a portion of the parsed JSON, and streams the JSON structure to a wc
subprocess that performs a word count:
val events = requests.get.stream("https://api.github.com/events")
val httpBinResponse = requests.post.stream("https://httpbin.org/post", data = events)
val mangledJson = ujson.read(httpBinResponse).apply("form")
val wordCount = os.proc("wc").call(stdin = mangledJson).out.text()
These streaming workflows would previously have been a lot of error-prone boilerplate: we are orchestrating multiple web services, a subprocess, along with some in-memory processing, all in a streaming fashion. But thanks to the standard Writable
/Readable
interfaces creating them is now just as easy as the previous batch-oriented workflows that allocated temporary String
s or Array[Byte]
s: you simply plug your method calls together, and the compiler makes sure the push/pull-based nature of your streams line up and everything streams smoothly at run time.
In this blog post, we have seen how two tiny interfaces, Writable
and Readable
, allow our wide range of libraries to seamlessly exchange data in a streaming fashion. We have also seen how these two interfaces simply standardize the existing conventions that the various libraries already provide for streaming IO, allowing them to interoperate seamlessly using the functionality they already have built in.
Writable
and Readable
, despite being tiny interfaces, reflect a deep structure of how streaming data works:
By implementing Writable
and Readable
, we are categorizing our streaming data sources into "push" and "pull" based sources
By receiving Writable
s and Readable
s as input parameters, we are declaring whether our streaming data processor requires a "push" or "pull" based data source
Pull-based data sources are harder to write than push-based data sources, while push-based data processors are harder to write than pull-based data processors. The Writable
and Readable
interfaces do not attempt to force you one way or the other, and instead allow you to declare what kinds of data sources and processors you have so the compiler can check to make sure your data sources and data processors line up.
The Geny library is tiny and stable, and can be depended upon without any risk of breakage. If you happen to maintain libraries that you would like to have seamless streaming interop with the broader Scala ecosystem, I encourage you to accept and implement the Writable
and Readable
interfaces in your own code!
About the Author: Haoyi is a software engineer, and the author of many open-source Scala tools such as the Ammonite REPL and the Mill Build Tool. If you enjoyed the contents on this blog, you may also enjoy Haoyi's book Hands-on Scala Programming