Most complex systems are made of multiple processes: often the tool you need is not easily usable as a library within your program, but can be easily started as a subprocess to accomplish the task you need it to do. This tutorial will walk through how to easily work with such subprocesses from the Scala programming language, to allow you to interact with the rich ecosystem of third-party tools and utilities that subprocesses make available.
About the Author: Haoyi is a software engineer, and the author of many open-source Scala tools such as the Ammonite REPL and the Mill Build Tool. If you enjoyed the contents on this blog, you may also enjoy Haoyi's book Hands-on Scala Programming
The easiest way to work with the subprocesses in Scala is through the OS-Lib library. OS-Lib is available on Maven Central for you to use with any version of Scala:
// SBT
"com.lihaoyi" %% "os-lib" % "0.2.7"
// Mill
ivy"com.lihaoyi::os-lib:0.2.7"
OS-Lib also comes bundled with Ammonite, and can be used within the REPL and *.sc
script files.
All functionality within this library comes from the os
package, e.g. os.Path
, os.read
, os.list
, and so on. To begin with, I will install Ammonite:
$ sudo sh -c '(echo "#!/usr/bin/env sh" && curl -L https://github.com/lihaoyi/Ammonite/releases/download/1.6.7/2.12-1.6.7) > /usr/local/bin/amm && chmod +x /usr/local/bin/amm'
And open the Ammonite REPL, using os.<tab>
to see the list of available operations:
$ amm
Loading...
Welcome to the Ammonite Repl 1.6.7
(Scala 2.12.8 Java 11.0.2)
@ os.<tab>
/ RelPath list
BasePath ResourceNotFoundException makeDir
BasePathImpl ResourcePath move
BasicStatInfo ResourceRoot mtime
Bytes SeekableSource owner
CommandResult SegmentedPath perms
FilePath Shellable proc
...
Most of the functionality we want will be in the os.proc
function. From here, we can begin our tutorial.
The most common thing to do with subprocesses is to spawn one and wait for it to complete. This is done through the os.proc.call function:
os.proc(command: os.Shellable*)
.call(cwd: Path = null,
env: Map[String, String] = null,
stdin: ProcessInput = Pipe,
stdout: ProcessOutput = Pipe,
stderr: ProcessOutput = Pipe,
mergeErrIntoOut: Boolean = false,
timeout: Long = Long.MaxValue,
check: Boolean = true,
propagateEnv: Boolean = true): os.CommandResult
os.proc.call
takes a lot of optional parameters, but at its simplest you simply pass in the command you want to execute:
@ val gitStatus = os.proc("git", "status").call()
gitStatus: os.CommandResult = CommandResult(
...
This gives you a os.CommandResult
object, which contains the exit code, stdout, and stderr of the completed process:
@ gitStatus.exitCode
res3: Int = 0
@ gitStatus.out.string
res4: String = """On branch master
Your branch is up to date with 'origin/master'.
Changes to be committed:
(use "git reset HEAD <file>..." to unstage)
new file: post/37 - How to work with Subprocesses in Scala.md
...
@ gitStatus.err.string
res5: String = ""
Common things to customize include:
cwd
: where you want the subprocess's working directory to be; defaults to the os.pwd
of the host process
env
: used to customize the environment variables passed to the subprocess; defaults to inheriting those of the host process
stdin
/stderr
/stdout
: allows you to pass data into the subprocess's standard input stream, and redirect where its standard output and error streams go. Defaults to not taking standard input and collecting the output/error in the os.CommandResult
check
: throws an exception if the subprocess has a non-zero exit code, override it with check = false
to avoid throwing
While here we are passing in String
s into os.proc()
, you can also pass in Seq[String]
s, Option[String]
s, and os.Path
s.
These can be used in a variety of ways:
We can do this in two steps: first, get the git log
of the current working directory as a string:
val gitLog = os.proc("git", "log").call().out.string
gitLog: String = """commit 160bf840ec70756b1dc0ea06036ada5a9993cd7f
Author: Li Haoyi
Date: Mon Jun 3 09:32:05 2019 +0800
tweaks
commit 32ccf9fe457f625243abf6903fd6095ff8d825c3
Author: Li Haoyi
Date: Mon Jun 3 09:27:14 2019 +0800
...
Next, pass the string into a separate grep "Author: "
subprocess, as that process's standard input:
@ val authorLines = os.proc("grep", "Author: ").call(stdin = gitLog).out.lines
authorLines: Vector[String] = Vector(
"Author: Li Haoyi",
"Author: Li Haoyi",
"Author: Li Haoyi",
"Author: Li Haoyi",
...
Lastly, call .distinct
on the lines to get the unique contributors:
@ authorLines.distinct
res15: Vector[String] = Vector(
"Author: Li Haoyi",
"Author: jlncrnt",
"Author: chasets",
"Author: n4to4",
"Author: George Shakhnazaryan",
"Author: Oleg Skovpen",
"Author: Prasanna Swaminathan",
"Author: nafg",
...
This retrieves the distinct contributors in two distinct subprocesses, one after the other, and buffering the intermediate output in memory. Later, we will see how to do so in a streaming fashion, with both subprocesses running in parallel and without storing the intermediate data in memory.
Often, someone working with git ends up creating one branch for every task they are working on. After merging that branch into master, the branch names still hang around, and it can be a bit tedious to run git branch -D
over and over to remove them. Let's use Git subprocesses to help remove all branches except the current branch from the local Git repo.
To do this, first we run git branch
to see the current branches, and get the output as a series of lines:
@ val gitBranches = os.proc("git", "branch").call().out.lines
gitBranches: Vector[String] = Vector(
" 561",
" 571",
" 595",
" 599",
" 600",
" 609",
"* master"
)
Next, we find all the branches whose lines start with " "
, and remove the whitespace:
@ val otherBranches = gitBranches.filter(_.startsWith(" ")).map(_.drop(2))
otherBranches: Vector[String] = Vector("561", "571", "595", "599", "600", "609")
Lastly, we run git branch -D
on each such branch, to remove them
@ for(branch <- otherBranches) os.proc("git", "branch", "-D", branch).call()
Now, we can see the other branches have been removed, leaving only the current * master
branch:
@ val gitBranches = os.proc("git", "branch").call().out.lines
gitBranches: Vector[String] = Vector("* master")
We have already seen how to configure the standard input we pass into os.proc.call
. You can also redirect the output: here we use it to spawn a curl
subprocess that saves its output to a local file:
@ val url = "https://api.github.com/repos/lihaoyi/mill/releases"
url: String = "https://api.github.com/repos/lihaoyi/mill/releases"
@ os.proc("curl", url).call(stdout = os.pwd / "github.json")
res7: os.CommandResult = CommandResult(
0,
...
We can now spawn a ls -lh
subprocess to get the metadata of the file we just downloaded:
@ os.proc("ls", "-lh", "github.json").call().out.string
res10: String = """-rw-r--r-- 1 lihaoyi staff 607K Jun 3 13:16 github.json
"""
os.proc.call
allows you to set both the stdin
as well as stdout
, using the subprocess to process data from one file to another in a streaming fashion:
@ os.proc("gzip").call(stdin = os.pwd / "github.json", stdout = os.pwd / "github.json.gz")
res11: os.CommandResult = CommandResult(0, ArrayBuffer())
@ os.proc("ls", "-lh", "github.json.gz").call().out.string
res12: String = """-rw-r--r-- 1 lihaoyi staff 23K Jun 3 13:30 github.json.gz
"""
This lets you use subprocesses to handle large files and large amounts of data without having to load either the input or the output into the host process's memory. Useful if the files are large and memory is limited.
While os.proc.call
allows you to pass concrete input data and receive concrete output data from a subprocess, and allows some degree of streaming input and output, it has one core limitation: the spawned subprocess must terminate before os.proc.call
returns. This means you cannot use it to set up pipelines where two or more processes are running in parallel and feeding data into each other to process, or to start a subprocess that runs in the background for you to interact with. For these use cases, you need os.proc.spawn:
os.proc(command: os.Shellable*)
.spawn(cwd: Path = null,
env: Map[String, String] = null,
stdin: os.ProcessInput = os.Pipe,
stdout: os.ProcessOutput = os.Pipe,
stderr: os.ProcessOutput = os.Pipe,
mergeErrIntoOut: Boolean = false,
propagateEnv: Boolean = true): os.SubProcess
os.proc.spawn
takes a similar set of arguments as os.proc.call
, but instead of returning a completed os.CommandResult
, it instead returns a os.SubProcess
object. This represents a subprocess that may or may not have completed, and you can interact with.
The first use case we will visit is to find the distinct contributors to a Git repository, using git log
and grep
. The earlier solution does so in a sequential fashion: running one process after the other, and accumulating the intermediate data in the host process memory in between. This can cause problems if the intermediate data is large.
To run these steps in parallel, in a streaming pipeline, you can use os.proc.spawn
:
@ {
val gitLog = os.proc("git", "log").spawn()
val grepAuthor = os.proc("grep", "Author: ").spawn(stdin = gitLog.stdout)
val output = grepAuthor.stdout.lines.distinct
}
gitLog: os.SubProcess = os.SubProcess@604c7e9b
grepAuthor: os.SubProcess = os.SubProcess@70485aa
output: Vector[String] = Vector(
"Author: Li Haoyi",
"Author: Guillaume Galy",
"Author: Nik Vanderhoof",
...
Here, we spawn one subprocess, and pass the stdout
of gitLog
into the stdin
of grepAuthor
. At that point, both os.SubProcess
es are running in the background, one feeding into into the other. The grepAuthor
subprocess exposes a grepAuthor.stdout
attribute that you can use to read the output, which (similar to os.CommandResult
) exposes helper methods like .string
or .lines
that wait for the subprocess to complete and aggregate the output.
We are aggregating the filtered output as one big list, and then call .distinct
on it. We can also avoid accumulating the filtered output by iterating over the lines of stdout
and directly adding it to a Set for de-duplication (here we use a LinkedHashSet
to preserve the ordering of input):
@ {
val gitLog = os.proc("git", "log").spawn()
val grepAuthor = os.proc("grep", "Author: ").spawn(stdin = gitLog.stdout)
val distinct = collection.mutable.LinkedHashSet.empty[String]
while(grepAuthor.stdout.available() > 0 || grepAuthor.isAlive()){
distinct.add(grepAuthor.stdout.readLine())
}
}
gitLog: os.SubProcess = os.SubProcess@6693818c
grepAuthor: os.SubProcess = os.SubProcess@a1e578f
distinct: collection.mutable.LinkedHashSet[String] = Set(
"Author: Li Haoyi",
"Author: Guillaume Galy",
"Author: Nik Vanderhoof",
...
Subprocess pipelines do not need to start or end in memory, or even on your local computer. Here is an example of downloading some data from api.github.com
, and re-uploading it to httpbin.org
, in a streaming fashion using curl
on both ends:
@ {
val download = os.proc(
"curl",
"https://api.github.com/repos/lihaoyi/mill/releases"
).spawn()
val upload = os.proc(
"curl", "-X", "PUT",
"-H", "Content-Type:application/octet-stream",
"-d", "@-",
"https://httpbin.org/anything"
).spawn(stdin = download.stdout)
val contentLength = upload.stdout.lines.filter(_.contains("Content-Length"))
}
download: os.SubProcess = os.SubProcess@19370af1
upload: os.SubProcess = os.SubProcess@37e967df
contentLength: Vector[String] = Vector(" \"Content-Length\": \"609216\", ")
We look at the JSON output of the final upload
response to see the "Content-Length"
of the output, which at 609216 bytes matches the 607kb number we saw earlier.
We can add even more stages to the pipeline if we wish, e.g. compressing the data using gzip
between downloading and re-uploading it:
@ {
val download = os.proc(
"curl",
"https://api.github.com/repos/lihaoyi/mill/releases"
).spawn()
val gzip = os.proc("gzip").spawn(stdin = download.stdout)
val upload = os.proc(
"curl", "-X", "PUT",
"-H", "Content-Type:application/octet-stream",
"-d", "@-",
"https://httpbin.org/anything"
).spawn(stdin = gzip.stdout)
val contentLength = upload.stdout.lines.filter(_.contains("Content-Length"))
}
download: os.SubProcess = os.SubProcess@4f45290f
gzip: os.SubProcess = os.SubProcess@56511eda
upload: os.SubProcess = os.SubProcess@3b9d85c2
contentLength: Vector[String] = Vector(" \"Content-Length\": \"17191\", ")
With the added gzip
step in the pipeline, the uploaded data has been compressed from 609216 bytes to 17191 bytes. Again, all three stages of the pipeline {download,gzip,upload} are running in parallel, and at no point are we buffering the entire data set in the memory of the host process.
os.proc.spawn
thus allows you to put together quite sophisticated subprocess pipelines with minimal effort, letting you perform streaming data processing with pipelined parallelism without needing to accumulate large amounts of data in memory.
The second big use case for os.proc.spawn
is to spawn a long-lived working process which you want to keep running concurrently with the host process. This background process may not be streaming large amount of data, but simply has its own in-memory state you want to interact with.
A simple example of this is to keep a Python process running in the background that you want to interact with:
@ val sub = os.proc("python", "-u", "-c", "while True: print(eval(raw_input()))").spawn()
sub: os.SubProcess = os.SubProcess@22a7d4a2
Rather than passing in data through the stdin =
argument and accumulating the output via .stdout.lines
, we can instead write to the subprocess using sub.stdin.write
/sub.stdin.writeLine
and read from it using sub.stdout.readLine()
:
@ sub.stdin.write("1 + 2")
@ sub.stdin.writeLine("+ 4")
@ sub.stdin.flush()
@ sub.stdout.readLine()
res42: String = "7"
@ sub.stdin.write("'1' + '2'")
@ sub.stdin.writeLine("+ '4'")
@ sub.stdin.flush()
@ sub.stdout.readLine()
res46: String = "124"
You can also exchange binary data with the subprocess via .write
/.read
:
@ sub.stdin.write("1 * 2".getBytes)
@ sub.stdin.write("* 4\n".getBytes)
@ sub.stdin.flush()
@ sub.stdout.read()
res50: Int = 56
@ res50.toChar
res51: Char = '8'
When we are done with the subprocess, we can destroy it:
@ sub.isAlive()
res1: Boolean = true
@ sub.destroy()
@ sub.isAlive()
res3: Boolean = false
This usage pattern is handy in a few cases:
The process you are delegating work to is slow to initialize, so you do not want to spawn a new one every time. e.g. a Python process can take 10s to 100s of milliseconds to start, depending on how many modules it uses
The process you are delegating work to has its own in-memory state that needs to be preserved, and throwing away the data by spawning a new subprocess each time simply doesn't work
Working with subprocesses in Scala can be easy and convenient; with OS-Lib's os
package, you can quickly call subprocesses with the input you have and extract the output you need, and can set up non-trivial pipelines that process data in a parallel streaming fashion. Compared to using the native java.lang.Process
APIs, you can usually achieve what you want in a tiny amount of code, making the code both easier to write and maintain without getting lost in boilerplate.
Effectively working with subprocesses exposes a whole world of possibilities you do not have access to within a single monolithic JVM, and is something that all programmers should have in their toolbox.
This tutorial just gives a quick tour of how to work with subprocesses in Scala. For a more thorough reference, check out the documentation page:
If you're interested in learn about the filesystem side of OS-Lib, check out this other blog post:
About the Author: Haoyi is a software engineer, and the author of many open-source Scala tools such as the Ammonite REPL and the Mill Build Tool. If you enjoyed the contents on this blog, you may also enjoy Haoyi's book Hands-on Scala Programming