This blog post is a chapter 16 excerpt from the book Hands-on Scala Programming
class SimpleUploadActor()(implicit cc: castor.Context) extends castor.SimpleActor[String]{
def run(msg: String) = {
val res = requests.post("https://httpbin.org/post", data = msg)
println("response " + res.statusCode)
}
}
Snippet 16.1: a simple actor implemented in Scala using the Castor library
Message-based parallelism is a technique that involves splitting your application logic into multiple "actors", each of which can run concurrently, and only interacts with other actors by exchanging asynchronous messages. This style of programming was popularized by the Erlang programming language and the Akka Scala actor library, but the approach is broadly useful and not limited to any particular language or library.
This chapter will introduce the fundamental concepts of message-based parallelism with actors, and how to use them to achieve parallelism in scenarios where the techniques we covered in Chapter 13: Fork-Join Parallelism with Futures cannot be applied. We will first discuss the basic actor APIs, see how they can be used in a standalone use case, and then see how they can be used in more involved multi-actor pipelines. The techniques in this chapter will come in useful later in Chapter 18: Building a Real-time File Synchronizer.
About the Author: Haoyi is a software engineer, and the author of many open-source Scala tools such as the Ammonite REPL and the Mill Build Tool. If you enjoyed the contents on this blog, you may also enjoy Haoyi's book Hands-on Scala Programming
For this chapter, we will be using the Castor library, which provides lightweight, typed actors for Scala:
import $ivy.`com.lihaoyi::castor:0.1.7`
We will be writing most of our code in Scala Scripts, which we will either load into the Ammonite Scala REPL for manual testing or test using a separate testing script. First, let us go into the core APIs that the Castor actor library exposes to users.
At their core, actors are objects who receive messages via a send
method, and asynchronously process those messages one after the other:
trait Actor[T]{
def send(t: T): Unit
def sendAsync(f: scala.concurrent.Future[T]): Unit
}
This processing happens in the background, and can take place without blocking. After a message is sent, the thread or actor that called .send()
can immediately go on to do other things, even if the message hasn't been processed yet. Messages sent to an actor that is already busy will be queued up until the actor is free.
Note that Actor
is parameterized on the type T
, which specifies what messages a particular Actor
is expected to receive. This is checked at compile time to make sure any message you send to an Actor
is of the correct type.
Castor provides three primary classes you can inherit from to define actors:
abstract class SimpleActor[T]()(implicit cc: Context) extends Actor[T]{
def run(msg: T): Unit
}
SimpleActor
works by providing a run
function that will be run on each message.
abstract class BatchActor[T]()(implicit cc: Context) extends Actor[T]{
def runBatch(msgs: Seq[T]): Unit
}
BatchActor
allows you to provide a runBatch
function that works on groups of messages at a time: this is useful when message processing can be batched together for better efficiency, e.g. making batched database queries or batched filesystem operations instead of many individual actions.
abstract class StateMachineActor[T]()(implicit cc: Context) extends Actor[T] {
class State(val run: T => State)
protected[this] def initialState: State
}
StateMachineActor
allows you to define actors via a set of distinct states, each of which has a separate run
callback that performs actions and returns the next state that we want the actor to transition to.
While all actors can maintain state in private fields and variables that are read and modified in the run
or runBatch
methods, StateMachineActor
makes the state and state transitions explicit. This can make it easier to specify exactly which states are valid and how the actor transitions between them.
All Castor actors require an implicit castor.Context
parameter, which is an extended ExecutionContext
that is used to schedule and manage execution of your actors. Thus having an implicit castor.Context
also allows you to perform Future
operations that require an implicit ExecutionContext
present.
Any uncaught exception that is thrown while an actor is processing a message (or batch of messages, in the case of BatchActor
) is reported to the castor.Context
's reportFailure
method: the default just prints to the console using .printStackTrace()
, but you can hook in to pass the exceptions elsewhere e.g. if you have a remote error aggregating service.
After an exception is thrown, the actor continues processing messages as before. The internal state of the actor is unchanged from the point where the exception was thrown. In the case of StateMachineActor
, state transitions only happen after the run
method completes successfully, and so messages that result in exceptions do not end up changing the state.
Castor actors are meant to manage mutable state internal to the actor. Note that it is up to you to mark the state private
to avoid accidental external access. Each actor may run on a different thread, and the same actor may run on different threads at different times, so you should ensure you do not share mutable variables between actors. Otherwise, you risk race conditions.
We will now look at three simple examples that exercise the three Actor
base classes in a standalone fashion.
As a use case, imagine that we want to upload data to a server in the background, and we are using an actor because we do not want the upload to block the main program's execution. Furthermore, we may want batch uploads for performance, or to limit the frequency at which this actor performs uploads to avoid overloading the server.
A simple actor that receives messages and uploads them could be written as follows:
import $ivy.`com.lihaoyi::castor:0.1.7`
class SimpleUploadActor()(implicit cc: castor.Context)
extends castor.SimpleActor[String]{
var count = 0
def run(msg: String) = {
println(s"Uploading $msg")
val res = requests.post("https://httpbin.org/post", data=msg)
count += 1
println(s"response $count ${res.statusCode} " + ujson.read(res)("data"))
}
}
implicit val cc = new castor.Context.Test()
val uploader = new SimpleUploadActor()
This snippet defines an SimpleUploadActor
class that uploads all the messages it receives to the https://httpbin.org/post
endpoint. Note that we need the SimpleUploadActor
class to take an implicit parameter cc: castor.Context
. We instantiate it as uploader
, and external code can send messages to uploader
via the .send
method. send
returns immediately while the actor processes the incoming messages one after the other in the background.
We can test this script by loading it into the Ammonite Scala REPL for interactive use via amm --predef Simple.sc
, and using {}
s to send three messages to the actor in quick succession:
@ {
println("sending hello")
uploader.send("hello")
println("sending world")
uploader.send("world")
println("sending !")
uploader.send("!")
}
sending hello
sending world
sending !
Uploading hello
response 1 200 "hello"
Uploading world
response 2 200 "world"
Uploading !
response 3 200 "!"
Note how all three sending
messages got printed before any of the HTTP requests were performed: calls to .send
are asynchronous, and queue up a message for the actor to process later. Only later do the three Uploading
and response
messages get printed, indicating that the three requests to the httpbin.org
server were performed.
While actors and futures are both concurrency constructs, they have very different use cases.
Using actors is ideal for pipeline parallelism scenarios where the dataflow is one way. Taking logging as an example, an application writes logs but does not need to wait for them to be processed. In contrast, the futures we learned about in Chapter 13: Fork-Join Parallelism with Futures support a more request-response usage pattern, where an asynchronous computation takes a set of input parameters and produces a single result that the application needs to wait for before it can perform further work.
For most use cases, the choice of either streaming or request-response styles is made for you. Log processing tends to be a streaming process, HTTP handling tend to be request-response. Metrics and monitoring systems tend to fit streaming, whereas database queries tend to fit request-response. Actors and futures are complementary techniques, and which one you use to parallelize your code depends on whether a scenario fits better into a streaming or request-response style. Hybrid approaches that use both actors and futures together are also possible.
Messages sent to an actor are always processed in the order in which the messages are sent. In contrast, computations running on futures may be scheduled and executed in arbitrary order, which may be different from the order in which the futures were created.
When processing application logs, the order of log messages needs to be preserved. In contrast, when hashing files, the order in which you hash the files probably does not matter. If the order of processing is important, using actors is the way to go.
Even in a concurrent environment with many messages being sent from different threads, each actor only processes messages in a single-threaded fashion one after the other. This means an actor can freely make use of private mutable fields without worrying about locks or thread-safety. In contrast, futures always have the possibility of running in parallel, and cannot safely access shared mutable variables without risking race conditions.
For example, the SimpleUploadActor
earlier keeps track of a count
of how many uploads have occurred. The actor's single-threaded execution means that count
will always be incremented correctly, without race conditions or lost updates. If we want to use futures to perform our background uploads, we would need to make sure our count
variable and any other mutable state can be safely accessed from multiple futures running at the same time: not impossible, but definitely tricky and easy to get wrong for anything more complicated than a single counter.
Note that the castor.Context.Test
has extra instrumentation to support a .waitForInactivity()
method, useful for waiting for the actors to complete their work in testing scenarios. This instrumentation has overhead, and you can use a castor.Context.Simple
in your production code if you wish to avoid that overhead.
The fact that SimpleActor
uploads messages one at a time can be inefficient: we may instead want to upload as many messages as we can in each HTTP request to minimize the per-request overhead. To do so, we can use a BatchActor
:
import $ivy.`com.lihaoyi::castor:0.1.7`
class BatchUploadActor()(implicit cc: castor.Context)
extends castor.BatchActor[String]{
var responseCount = 0
def runBatch(msgs: Seq[String]) = {
val res = requests.post("https://httpbin.org/post", data = msgs.mkString)
responseCount += 1
println(s"response ${res.statusCode} " + ujson.read(res)("data"))
}
}
implicit val cc = new castor.Context.Test()
val batchUploader = new BatchUploadActor()
Now, if we send multiple messages in quick succession, the BatchUploadActor
calls .mkString
to concatenate them together and only performs one HTTP POST:
@ {
println("sending hello")
batchUploader.send("hello")
println("sending world")
batchUploader.send("world")
println("sending !")
batchUploader.send("!")
}
sending hello
sending world
sending !
Uploading helloworld!
response 200
If further messages get sent to the BatchActor
while the initial batch upload is taking place, they too are batched together and ready for the next batch upload. Essentially, every message that is received while a previous runBatch
invocation is executing is batched together for the next invocation: this can be non-deterministic, and depends on thread scheduling, CPU load, networking, and many other factors.
Note that when extending BatchActor
, it is up to the implementer to ensure that the BatchActor
's runBatch
method has the same visible effect as if they had run a single run
method on each message individually. Violating that assumption may lead to confusing bugs where the actor behaves non-deterministically depending on how the messages are batched.
Let us consider one more requirement: rather than sending batches of HTTP requests back to back, we would instead like to send a request at most once every N seconds. This is often called throttling, and is a common requirement to avoid overloading the remote server.
The easiest way to implement this is to define a state machine as follows:
The uploader starts off Idle
, and when a message is received it uploads the message and transitions into Buffering
If messages are received when Buffering
, it buffers the messages without sending them
After N seconds in Buffering
, it checks if there are buffered messages. If there are, it uploads them in one batch and remains Buffering
for another N seconds. If there are not, it transitions to Idle
The following snippet defines an StateMachineUploadActor
that implements this protocol:
import $ivy.`com.lihaoyi::castor:0.1.7`
sealed trait Msg
case class Text(s: String) extends Msg
case class Flush() extends Msg
class StateMachineUploadActor(n: Int)(implicit cc: castor.Context)
extends castor.StateMachineActor[Msg]{
var responseCount = 0
def initialState = Idle()
case class Idle() extends State({
case Text(msg) => upload(msg)
})
case class Buffering(msgs: Vector[String]) extends State({
case Text(s) => Buffering(msgs :+ s)
case Flush() =>
if (msgs.isEmpty) Idle()
else upload(msgs.mkString)
})
def upload(data: String) = {
println("Uploading " + data)
val res = requests.post("https://httpbin.org/post", data=data)
responseCount += 1
println(s"response ${res.statusCode} " + ujson.read(res)("data"))
cc.scheduleMsg(this, Flush(), java.time.Duration.ofSeconds(n))
Buffering(Vector.empty)
}
}
implicit val cc = new castor.Context.Test()
val stateMachineUploader = new StateMachineUploadActor(n = 5)
This code snippet is somewhat more complex than what we saw earlier: rather than just receiving raw String
s, StateMachineUploadActor
instead receives Msg
s which are either Text
objects or Flush
s. The actor also has two states, Idle
, or Buffering
, each of which pattern matches on incoming messages to decide what action to perform as well as what next state to transition into.
The implementation of this actor matches almost exactly the state machine we described above. The only subtlety is the "after N seconds of buffering" logic is implemented via cc.scheduleMsg
: this means the actor will receive the Flush()
message N seconds after uploading a batch of messages and transitioning to Buffering
, giving it a chance to either upload any buffered messages or transition back to Idle
We can test this logic by sending messages to StateMachineUploadActor
in the REPL. The first message we send gets uploaded immediately, while subsequent messages are buffered according to our N second rule:
@ stateMachineUploader.send(Text("I am Cow"))
Uploading I am Cow
response 200 "I am Cow"
@ stateMachineUploader.send(Text("Hear me moo"))
@ stateMachineUploader.send(Text("I weigh twice as much as you"))
Uploading Hear me mooI weigh twice as much as you
response 200 "Hear me mooI weigh twice as much as you"
@ stateMachineUploader.send(Text("And I look good on the barbecue"))
Uploading And I look good on the barbecue
response 200 "And I look good on the barbecue"
In general, StateMachineActor
is very useful in cases where there are multiple distinct states which an actor can be in. It forces you to explicitly define:
When the number of distinct states and messages is large, StateMachineActor
can be easier to manage than a SimpleActor
with many mutable var
s inside of it.
Note that while multiple threads can send messages to Logger
at once, and the Flush
message can also be sent at an arbitrary time in the future, the actor will only ever process one message at a time. This ensures that it will transition between the two states Idle
and Buffering
in a straightforward manner, without needing to worry about race conditions when trying to update the internal state of the actor.
We will now work through a slightly more advanced example: using actors to build a concurrent logging pipeline. This logging pipeline will receive logs from an application and process them in the background without needing the application to stop and wait for it.
We will start with a single actor logging to disk, and extend it to form a multi-stage concurrent logging pipeline logging its messages to multiple destinations.
Unlike the simple HTTP upload actor we saw earlier, our logging actors will have to deal with concerns such as serializing writes to a log file, log rotation, and pipelining to run different parts of the logging logic in parallel. We will also see how to test your actors programmatically in a simple and deterministic way, using castor.Context.Test
. We will be using Ammonite Scala Scripts to implement and test the rest of the examples in this chapter.
Here is a small demonstration of using a castor.SimpleActor
to perform asynchronous logging to disk:
import $ivy.`com.lihaoyi::castor:0.1.7`
class DiskActor(logPath: os.Path, rotateSize: Int = 50)
(implicit cc: castor.Context) extends castor.SimpleActor[String]{
val oldPath = logPath / os.up / (logPath.last + "-old")
def run(s: String) = {
val newLogSize = logSize + s.length + 1
if (newLogSize <= rotateSize) logSize = newLogSize
else { // rotate log file by moving it to old path and starting again from empty
logSize = s.length + 1
os.move(logPath, oldPath, replaceExisting = true)
}
os.write.append(logPath, s + "\n", createFolders = true)
}
private var logSize = 0
}
import $file.Classes, Classes._
implicit val cc = new castor.Context.Test()
val diskActor = new DiskActor(os.pwd / "log.txt")
val logger = diskActor
We alias diskActor
under the name logger
for use by application code; this will simplify subsequent examples. To test this DiskActor
, we will use a separate TestLoggingPipeline.sc
script that imports the earlier LoggingPipeline.sc
to interact with and assert on.
DiskActor
doesn't just write to a log file: the actor also monitors the size of the file, and when it crosses a threshold archives it and starts from a new empty log file. This is called "log rotation", and is a common requirement when handling logs to avoid log files growing indefinitely and filling up your disk.
We can test this using the following script, which we can run via amm
TestLoggingPipeline.sc
.
import $file.LoggingPipeline, LoggingPipeline.{logger, cc}
logger.send("I am cow")
logger.send("hear me moo")
logger.send("I weight twice as much as you")
logger.send("And I look good on the barbecue")
logger.send("Yoghurt curds cream cheese and butter")
logger.send("Comes from liquids from my udder")
logger.send("I am cow, I am cow")
logger.send("Hear me moo, moooo")
// Logger hasn't finished yet, running in the background
cc.waitForInactivity()
// Now logger has finished
assert(os.read.lines(os.pwd / "log.txt-old") == Seq("Comes from liquids from my udder"))
assert(
os.read.lines(os.pwd / "log.txt") ==
Seq("I am cow, I am cow", "Hear me moo, moooo")
)
Note that logger.send
is thread-safe: multiple threads can be sending messages to the logger
at once, and the messages will be queued up and executed one at a time. Even if logger
is in the middle of writing to disk, or is currently performing a log-rotation, the fact that it's in a separate actor means the processing happens in the background without slowing down the main logic of your program.
Actors give you pipelined parallelism when processing data: the ability to feed your messages through multiple stages of processing, with each stage's processing occurring in parallel. In the following example, we add a base64Actor
to form a two-stage pipeline:
diskActor
handles the same writing-strings-to-disk-and-rotating-log-files logic we saw earlier, while base64Actor
adds another step of encoding the data before it gets written to disk:
// Classes.sc
import $ivy.`com.lihaoyi::castor:0.1.7`
class DiskActor...
+class Base64Actor(dest: castor.Actor[String])
+ (implicit cc: castor.Context) extends castor.SimpleActor[String]{
+ def run(msg: String) = {
+ dest.send(java.util.Base64.getEncoder.encodeToString(msg.getBytes))
+ }
+}
// LoggingPipeline.sc
implicit val cc = new castor.Context.Test()
val diskActor = new DiskActor(os.pwd / "log.txt", rotateSize = 50)
+val base64Actor = new Base64Actor(diskActor)
-val logger = diskActor
+val logger = base64Actor
Although we have added another Base64 encoding step to the logging process, this new step lives in a separate actor from the original write-to-disk step, and both of these can run in parallel with each other as well as in parallel with the main application code.
We can modify TestLoggingPipeline.sc
to verify that it writes lines to the log file base64-encoded, and that when decoded the contents are what we expect:
// TestLoggingPipeline.sc
cc.waitForInactivity()
-assert(os.read.lines(os.pwd / "log.txt-old") == Seq("Comes from liquids from my udder"))
-assert(
- os.read.lines(os.pwd / "log.txt") ==
- Seq("I am cow, I am cow", "Hear me moo, moooo")
-)
+def decodeFile(p: os.Path) = {
+ os.read.lines(p).map(s => new String(java.util.Base64.getDecoder.decode(s)))
+}
+assert(decodeFile(os.pwd / "log.txt-old") == Seq("Comes from liquids from my udder"))
+assert(decodeFile(os.pwd / "log.txt") == Seq("I am cow, I am cow", "Hear me moo, moooo"))
Actor pipelines are not limited to two stages, nor are they limited to a single linear sequence. For the last example in this chapter, let us now consider the following 4 actors arranged in a T-shaped pipeline:
diskActor
, which writes to disk with log rotationuploadActor
, uploading the log messages to a HTTP endpointbase64Actor
, which does the base64 encoding of the log messagessanitizeActor
, which masks credit card numbers with a regexTo implement this pipeline, we can modify LoggingPipeline.sc
as follows::
// Classes.sc
class DiskActor...
class Base64Actor...
+class UploadActor(url: String)
+ (implicit cc: castor.Context) extends castor.SimpleActor[String]{
+ def run(msg: String) = {
+ val res = requests.post(url, data = msg)
+ println(s"response ${res.statusCode} " + ujson.read(res)("data"))
+ }
+}
+class SanitizeActor(dest: castor.Actor[String])
+ (implicit cc: castor.Context) extends castor.SimpleActor[String]{
+ def run(msg: String) = {
+ dest.send(msg.replaceAll("([0-9]{4})[0-9]{8}([0-9]{4})", "<redacted>"))
+ }
+}
// LoggingPipeline.sc
implicit val cc = new castor.Context.Test()
val diskActor = new DiskActor(os.pwd / "log.txt")
+val uploadActor = new UploadActor("https://httpbin.org/post")
-val base64Actor = new Base64Actor(diskActor)
+val base64Actor = new Base64Actor(new castor.SplitActor(diskActor, uploadActor))
+val sanitizeActor = new SanitizeActor(base64Actor)
-val logger = base64Actor
+val logger = sanitizeActor
Apart from the new additions of uploadActor
and sanitizeActor
, we also use a castor.SplitActor
to take the output of base64Actor
and send it to two downstream destinations. SplitActor
can be used to dispatch messages to any number of downstream actors.
Now, if we modify our TestLoggingPipeline.sc
script to also send a 16-digit credit-card-like number as part of the logging message, we can see that it gets replaced by <redacted>
in the base64 logged output:
// TestLoggingPipeline.sc
logger.send("Comes from liquids from my udder")
-logger.send("I am cow, I am cow")
+logger.send("I am cow1234567887654321")
logger.send("Hear me moo, moooo")
cc.waitForInactivity()
def decodeFile(p: os.Path) = {
os.read.lines(p).map(s => new String(java.util.Base64.getDecoder.decode(s)))
}
assert(decodeFile(os.pwd / "log.txt-old") == Seq("Comes from liquids from my udder"))
-assert(decodeFile(os.pwd / "log.txt") == Seq("I am cow, I am cow", "Hear me moo, moooo"))
+assert(decodeFile(os.pwd / "log.txt") == Seq("I am cow<redacted>", "Hear me moo, moooo"))
You will also see it print out the response 200 ...
messages as the log messages are uploaded to the https://httpbin.org/post
HTTP endpoint.
The messages we send to logger
are processed with pipeline parallelism on the four actors: we can have one message being sanitized, another being base64 encoded, a third being uploaded, and a fourth being written to disk, all happening simultaneously. We gain this parallelism while preserving the order in which messages are processed, ensuring that our HTTP endpoint and log files receive the messages in the exact same order that they were originally sent in.
Any of the SimpleActor
s in this pipeline could also be replaced by BatchActor
s or StateMachineActor
s to improve performance or to implement additional functionality: e.g. batching writes to disk, batching HTTP uploads, or adding throttling. Doing so is left as an exercise to the reader.
The four actors in our last pipeline are defined as class
es, with each class constructor taking a cask.Actor[...]
reference. Defining our actors in this way gives us flexibility in how we want to arrange our pipeline: each actor doesn't need to know about the details of the other actors it is interacting with. It only needs to know what message type it expects to receive and the message type of the downstream actors it needs to send messages to.
For example, if we wanted to re-configure our 4-node pipeline to run without sanitizing credit card numbers, it is easy to remove sanitizeActor
from the pipeline:
import $file.Classes, Classes._
implicit val cc = new castor.Context.Test()
val diskActor = new DiskActor(os.pwd / "log.txt")
val uploadActor = new UploadActor("https://httpbin.org/post")
val base64Actor = new Base64Actor(new castor.SplitActor(diskActor, uploadActor))
val logger = base64Actor
What if we wanted only the file logging to be base64 encoded, and only the HTTP logging to be sanitized? Again, it is straightforward to re-configure our actor pipeline to do this:
import $file.Classes, Classes._
implicit val cc = new castor.Context.Test()
val diskActor = new DiskActor(os.pwd / "log.txt")
val uploadActor = new UploadActor("https://httpbin.org/post")
val base64Actor = new Base64Actor(diskActor)
val sanitizeActor = new SanitizeActor(uploadActor)
val logger = new castor.SplitActor(base64Actor, sanitizeActor)
As you can see, using actors to model your data processing pipelines allows a great deal of flexibility in how your pipelines will be laid out. Without any change to the implementation of individual actors, we have reconfigured our concurrent logging pipeline to support 4 very different use cases. It only took a tiny change in how the actors were instantiated to completely re-architect how the data flows through our system.
This flexibility to arrange and re-arrange your actor pipelines also makes it easy to test parts of the pipeline in isolation, or to re-use parts of the pipeline in different scenarios with different requirements.
Lastly, let us look at a few techniques for debugging what an actor is doing. These will come in handy when your actor-based code inevitably misbehaves!
When using StateMachineActor
, all your actor's internal state should be in the single state
variable. You can thus easily override def run
to print the state before and after each message is received:
override def run(msg: Msg): Unit = {
println(s"$state + $msg -> ")
super.run(msg)
println(state)
}
If your StateMachineActor
is misbehaving, this should hopefully make it easier to trace what it is doing in response to each message, so you can figure out exactly why it is misbehaving. Here is the logging of the StateMachineUploadActor, where the logging prints out how the actor handles messages and transitions between states:
stateMachineUploader.send(Text("I am cow"))
// Idle() + Text(I am cow) ->
// Buffering(Vector(I am cow))
stateMachineUploader.send(Text("hear me moo"))
// Buffering(Vector(I am cow)) + Text(hear me moo) ->
// Buffering(Vector(I am cow, hear me moo))
Thread.sleep(100)
// Buffering(Vector(I am cow, hear me moo)) + Flush() ->
// Idle()
Logging every message received and processed by one or more actors may get very verbose in a large system. You can use a conditional if (...)
in your override def run
to specify exactly which state transitions on which actors you care about (e.g. only actors handling a certain user ID) to cut down the noise:
override def run(msg: Msg): Unit = {
if (...) println(s"$state + $msg -> ")
super.run(msg)
if (...) println(state)
}
Note that if you have multiple actors sending messages to each other, by default they run on a thread pool and so the println
messages above may become interleaved and hard to read. To resolve that, you can try running actors single threaded.
Another debugging strategy is to replace the cask.Context
executor with a single-threaded executor. This can help our actor pipeline behave more deterministically:
implicit val cc = new castor.Context.TestThreadPool(1)
Any actor pipeline should be able to run on a single threaded executor. This makes it easier to track down logical bugs without multithreaded parallelism getting in the way.
Apart from logging individual actors, you can also insert logging into the castor.Context
to log state transitions or actions across every actor. For example, you can log every time a message is run on an actor by overriding the reportRun
callback:
implicit val cc = new castor.Context.Test() {
override def reportRun(a: castor.Actor[_],
msg: Any,
token: castor.Context.Token): Unit = {
println(s"$a <- $msg")
super.reportRun(a, msg, token)
}
}
Running this on the four-actor pipeline example from earlier, we can see the logging messages get interleaved as the different actors all run in parallel.
SanitizeActor@5ad26966 <- I am cow
SanitizeActor@5ad26966 <- hear me moo
Base64Actor@5578b956 <- I am cow
SanitizeActor@5ad26966 <- I weigh twice as much as you
SanitizeActor@5ad26966 <- And I look good on the barbecue
Base64Actor@5578b956 <- hear me moo
SanitizeActor@5ad26966 <- Yoghurt curds cream cheese and butter
castor.SplitActor@7cdcd738 <- SSBhbSBjb3c=
DiskActor@7aada8fd <- SSBhbSBjb3c=
SanitizeActor@5ad26966 <- Comes from liquids from my udder
UploadActor@775713fd <- SSBhbSBjb3c=
By instrumenting the castor.Context
, we can see the messages that are being sent and state transitions that are happening to all actors within our program. That can help greatly when you are not sure exactly which actor is the one that is misbehaving, and helps us visualize what our group of actors is doing. We can simplify the logging even further by also Running Actors Single-Threaded.
In this chapter, we have seen how to structure our code using actors. They allow us to process data concurrently, similar to what we did in Chapter 13: Fork-Join Parallelism with Futures, but with the following tradeoffs:
Actors are a better fit for streaming computations, while futures are a better fit for request-response computations
Actors always process data in the same order, whereas futures may run in an arbitrary order
Actors ensure single-threaded access to mutable state, e.g. a log file on disk or an in-memory state machine, whereas futures work best without any mutable state
Actors provide pipelined parallelism between dependent computations, whereas futures provide fork-join parallelism between independent computations
We have seen how we can easily construct actor pipelines of varying shape, where each stage of the pipeline processes the incoming messages in parallel, all without needing to deal with threads and locks ourselves.
Actors are a fundamental model of parallel computation, that together with the fork-join style provided by futures, is a valuable addition to your toolbox. Both models have their strengths and weaknesses, and Scala makes it very easy to pick the one that best fits the problem at hand.
This chapter makes use of the Castor actor library, which has its documentation online:
In the wild you may encounter other projects using the Akka actor framework. This is a much larger and more complex framework than Castor, with much more to learn, but all the same concepts still apply:
We will be making heavy use of actors and actor piplines later in Chapter 18: Building a Real-time File Synchronizer.
Use a single actor to implement an asynchronous web crawler using the same
fetchLinksAsync
method we saw in Chapter 13: Fork-Join Parallelism with Futures, but without the batch-by-batch limitation. The result of each HTTP request should be processed immediately once that requests completes, without waiting for all other requests in the same "batch", so that a single long-running request does not hold up the entire crawler. You will likely need to use the asynchronous Future operations together with thesendAsync
method to integrate your actors withfetchLinksAsync
's futures.Exercise 10 - WebCrawlerPipeline
Combine the actor-based web crawler you wrote in the above exercise with the
DiskActor
we saw earlier in the chapter, to stream the crawled results to a file on disk in a simple pipeline.Exercise 11 - WebCrawlerThrottled
Add throttling to the actor-based web crawler above, to ensure it does not make more than a configurable
maxConcurrent: Int
open HTTP requests at a time.
About the Author: Haoyi is a software engineer, and the author of many open-source Scala tools such as the Ammonite REPL and the Mill Build Tool. If you enjoyed the contents on this blog, you may also enjoy Haoyi's book Hands-on Scala Programming