Circuit Breaker pattern in Scala

Posted on Mar 28, 2020

image

Circuit breaker pattern is a common microservice resiliency pattern to make system responsive after series of failures and have a fallback mechanism.

[NOTE] Spoiler — This would be a simple implementation and the Scala code would be stateful and have side-effects.

The Need

Doing a remote call or executing a task which is outside the domain boundary of the system is very common in modern applications. Especially in microservice world, we are bound to make calls to other microservices.

In such scenarios, we need to consider eventual failures. If some failures are too continuous and consistent in nature, they can hog too much of the system resources resulting into failures. We can avoid such cases using the circuit-breaker pattern.

The Concept

image courtesy: https://martinfowler.com/bliki/CircuitBreaker.html

The concept is simple, the circuit represents the connection between the caller and the callee. It can only have 3 states: closed, open and half-open. The system starts with the “closed” circuit initially.

We monitor the consecutive (or within a time window) failures from the callee and when the failures exceed a threshold we trip the circuit to “open” and all the further calls from the caller won’t reach the callee and rather caller is returned with a default fallback result.

— can be implemented using a failure result counter.

We also keep monitoring the time since opening the circuit and when a certain timeout is reached, we put the circuit to “half-open” meaning, only one ‘probe’ call will pass through and its result will determine if the circuit will be closed or opened based on its success or failure.

— can be implemented using a background timer.

Implementation

CircuitState — a trait extended by possible states

sealed trait CircuitState
object CircuitState {
case object Open extends CircuitState
case object Closed extends CircuitState
case object HalfOpen extends CircuitState
}

CircuitResult[T] — a trait extended by Success and Failure results of the Circuit with success return type T.

trait CircuitResult[T] extends IterableOnce[T] with Product with Serializable {
def isFailed: Boolean = this.toOption.isEmpty
def isSuccess: Boolean = this.toOption.isDefined
def toOption: Option[T] = this match {
case CircuitSuccess(value) => Some(value)
case _ => None
}
}

Circuit[R] — a class denoting a circuit where the call returns the result of type Future[R], future denoting an async call.

class Circuit[R](name: String,
// max consecutive failures on closed circuit
private final val threshold: Int,
// timeout from opened circuit to make it half-open
private final val timeout: Duration,
// max allowed consecutive half-open failures
private final val maxAllowedHalfOpen: Int,
// block of code whose result to be yielded on open circuit
defaultAction: => R,
private val logger: String => Unit,
// function which will make circuit fail on certain success results
failedWhen: R => Boolean = { _: R =>
false
},
// circuit failure exception to ingore and return CircuitFailure response anyway
ignoreException: Throwable => Boolean = { _: Throwable =>
false
})(implicit ec: ExecutionContext) {
???
}
view raw Circuit.scala hosted with ❤ by GitHub

States of the Circuit

// initial state
private val state = new AtomicReference[CircuitState](CircuitState.Closed)
// consecutive failures while the circuit is closed (will be set to 0 after any single success)
private val closedConsecutiveFailureCount = new AtomicInteger(0)
// consecutive failures while the circuit is half-open
private val halfOpenConsecutiveFailuresCount = new AtomicInteger(0)
// half-opener timer will check this for timeout for half-opening the circuit.
private val lastOpenTime = new AtomicLong(Long.MaxValue)
def isTimeOutOver: Boolean =
System.currentTimeMillis() - lastOpenTime.get() > timeout.toMillis
def getState: CircuitState = state.get()

Transition of states

private def openCircuit(): Unit = {
circuitLogger("Opening circuit...")
state.set(Open)
lastOpenTime.set(System.currentTimeMillis()) // registering open time for half-opener timer
circuitLogger("Circuit is open.")
}
private def closeCircuit(): Unit = {
circuitLogger("Closing circuit...")
state.set(Closed)
lastOpenTime.set(Long.MaxValue) // considering the circuit was never open
closedConsecutiveFailureCount.set(0) // fresh failure counting after closing circuit
circuitLogger("Circuit is closed.")
}
private def halfOpenCircuit(): Unit = {
circuitLogger("Half-opening circuit...")
state.set(HalfOpen)
halfOpenConsecutiveFailuresCount.set(0) // fresh counting half-open failures
circuitLogger("Circuit is half-open.")
}

Main block execution handler — It acts as a router and invokes other handlers based on the Circuit’s state. After state transition, the thread request may come back here to be routed again based on the updated state.

def executeAsync(block: => Future[R]): Future[CircuitResult[R]] = {
state.get() match {
case Closed => handleClosedAsync(block)
case HalfOpen => handleHalfOpenAsync(block)
case Open => Future.successful(handleOpen)
}
}

Handling failure response from the Callee

private def handleFailureAsync(block: => Future[R],
maybeException: Option[Throwable],
atomicCounter: AtomicInteger,
maxFailures: Int): Future[CircuitResult[R]] = {
val currentFailureCount = atomicCounter.incrementAndGet()
circuitLogger(s"[${state.get()}-error-count = $atomicCounter] $maybeException")
if (currentFailureCount > maxFailures) {
// on consecutive max-failure limit reached
openCircuit()
handleOpenAsync // returns Future.successful(handleOpen)
} else
maybeException match {
case Some(exception) if ignoreException(exception) =>
// treat it as a success
atomicCounter.decrementAndGet()
closeCircuit()
executeAsync(block)
case Some(exception) => Future.successful(CircuitFailure(exception))
case None => Future.successful(new Exception("Invalid result excpetion"))
}
}

Handling closed circuit— Executing the provided block of code and yield a success or failure as the result asynchronously, while maintaining/updating the count of consecutive failures.

private def handleClosedAsync(
block: => Future[R]
): Future[CircuitResult[R]] = {
block
.flatMap { value =>
// on successful result from the Callee
if (failedWhen(value)) {
// if successful result is to be considered a failure
handleFailureAsync(
Future.successful(value), // evaluated block value
None, // no exception
closedConsecutiveFailureCount,
threshold
)
} else {
// true success
closedConsecutiveFailureCount.set(0)
Future.successful(CircuitSuccess(value))
}
}(ec)
.recoverWith {
// exception from the Callee
case exception =>
handleFailureAsync(
block,
Some(exception),
closedConsecutiveFailureCount,
threshold
)
}
}

Handling Half-Open Circuit

private def handleHalfOpenAsync(
block: => Future[R]
): Future[CircuitResult[R]] = {
block
.flatMap { value =>
if (failedWhen(value)) {
// if success result is to be considered a failure
handleFailureAsync(
Future.successful(value), // evaluated block's value
None, // no exception
halfOpenConsecutiveFailuresCount,
maxAllowedHalfOpen
)
} else {
// on success
closeCircuit()
Future.successful(CircuitSuccess(value))
}
}(ec)
.recoverWith {
case exception =>
// on failure
handleFailureAsync(
block,
Some(exception),
halfOpenConsecutiveFailuresCount,
maxAllowedHalfOpen
)
}
}

Handling open circuit— return the successful future with CircuitSuccess wrapping the default value.

private def handleOpenAsync: Future[CircuitResult[R]] = {
Future.successful(CircuitSuccess(defaultAction))
}

Half-Opener Background Timer — Will trip the circuit to half-open after timeout reached since opening the circuit.

new Timer(s"$name-circuit-opener").scheduleAtFixedRate(new TimerTask {
override def run(): Unit = state.get() match {
case Open if isTimeOutOver =>
circuitLogger("Max open timeout reached.")
halfOpenCircuit()
case _ => ()
}
}, 0L, 10L)

CircuitImplicits — Implicits making the use of our Circuit class a breeze!

object CircuitImplicits {
implicit class AsyncBlockExtensions[R](block: => Future[R])(
implicit c: Circuit[R]
) {
def executeAsync(implicit ex: ExecutionContext): Future[CircuitResult[R]] =
c.executeAsync(block)
}
}

We can now do something like:

implicit val sampleCircuit: Circuit[Int] = new Circuit[Int](
    "sample-circuit",
    5,                // max allowed consecutive closed failures
    5.seconds,        // half-opener timeout
    1,                // max allowed consecutive half-open failures
    -1,               // invalid success result
    println
)

//this will use the above created circuit to execute
Future.successful(2 + 2).executeAsync

Testing it out — You can find the test cases here.

image