Circuit Breaker pattern in Scala
Circuit breaker pattern is a common microservice resiliency pattern to make system responsive after series of failures and have a fallback mechanism.
[NOTE] Spoiler — This would be a simple implementation and the Scala code would be stateful and have side-effects.
The Need
Doing a remote call or executing a task which is outside the domain boundary of the system is very common in modern applications. Especially in microservice world, we are bound to make calls to other microservices.
In such scenarios, we need to consider eventual failures. If some failures are too continuous and consistent in nature, they can hog too much of the system resources resulting into failures. We can avoid such cases using the circuit-breaker pattern.
The Concept
courtesy: https://martinfowler.com/bliki/CircuitBreaker.html
The concept is simple, the circuit represents the connection between the caller and the callee. It can only have 3 states: closed, open and half-open. The system starts with the “closed” circuit initially.
We monitor the consecutive (or within a time window) failures from the callee and when the failures exceed a threshold we trip the circuit to “open” and all the further calls from the caller won’t reach the callee and rather caller is returned with a default fallback result.
— can be implemented using a failure result counter.
We also keep monitoring the time since opening the circuit and when a certain timeout is reached, we put the circuit to “half-open” meaning, only one ‘probe’ call will pass through and its result will determine if the circuit will be closed or opened based on its success or failure.
— can be implemented using a background timer.
Implementation
CircuitState — a trait extended by possible states
sealed trait CircuitState | |
object CircuitState { | |
case object Open extends CircuitState | |
case object Closed extends CircuitState | |
case object HalfOpen extends CircuitState | |
} |
CircuitResult[T] — a trait extended by Success and Failure results of the Circuit with success return type T.
trait CircuitResult[T] extends IterableOnce[T] with Product with Serializable { | |
def isFailed: Boolean = this.toOption.isEmpty | |
def isSuccess: Boolean = this.toOption.isDefined | |
def toOption: Option[T] = this match { | |
case CircuitSuccess(value) => Some(value) | |
case _ => None | |
} | |
} |
Circuit[R] — a class denoting a circuit where the call returns the result of type Future[R], future denoting an async call.
class Circuit[R](name: String, | |
// max consecutive failures on closed circuit | |
private final val threshold: Int, | |
// timeout from opened circuit to make it half-open | |
private final val timeout: Duration, | |
// max allowed consecutive half-open failures | |
private final val maxAllowedHalfOpen: Int, | |
// block of code whose result to be yielded on open circuit | |
defaultAction: => R, | |
private val logger: String => Unit, | |
// function which will make circuit fail on certain success results | |
failedWhen: R => Boolean = { _: R => | |
false | |
}, | |
// circuit failure exception to ingore and return CircuitFailure response anyway | |
ignoreException: Throwable => Boolean = { _: Throwable => | |
false | |
})(implicit ec: ExecutionContext) { | |
??? | |
} |
States of the Circuit
// initial state | |
private val state = new AtomicReference[CircuitState](CircuitState.Closed) | |
// consecutive failures while the circuit is closed (will be set to 0 after any single success) | |
private val closedConsecutiveFailureCount = new AtomicInteger(0) | |
// consecutive failures while the circuit is half-open | |
private val halfOpenConsecutiveFailuresCount = new AtomicInteger(0) | |
// half-opener timer will check this for timeout for half-opening the circuit. | |
private val lastOpenTime = new AtomicLong(Long.MaxValue) | |
def isTimeOutOver: Boolean = | |
System.currentTimeMillis() - lastOpenTime.get() > timeout.toMillis | |
def getState: CircuitState = state.get() |
Transition of states
private def openCircuit(): Unit = { | |
circuitLogger("Opening circuit...") | |
state.set(Open) | |
lastOpenTime.set(System.currentTimeMillis()) // registering open time for half-opener timer | |
circuitLogger("Circuit is open.") | |
} | |
private def closeCircuit(): Unit = { | |
circuitLogger("Closing circuit...") | |
state.set(Closed) | |
lastOpenTime.set(Long.MaxValue) // considering the circuit was never open | |
closedConsecutiveFailureCount.set(0) // fresh failure counting after closing circuit | |
circuitLogger("Circuit is closed.") | |
} | |
private def halfOpenCircuit(): Unit = { | |
circuitLogger("Half-opening circuit...") | |
state.set(HalfOpen) | |
halfOpenConsecutiveFailuresCount.set(0) // fresh counting half-open failures | |
circuitLogger("Circuit is half-open.") | |
} |
Main block execution handler — It acts as a router and invokes other handlers based on the Circuit’s state. After state transition, the thread request may come back here to be routed again based on the updated state.
def executeAsync(block: => Future[R]): Future[CircuitResult[R]] = { | |
state.get() match { | |
case Closed => handleClosedAsync(block) | |
case HalfOpen => handleHalfOpenAsync(block) | |
case Open => Future.successful(handleOpen) | |
} | |
} |
Handling failure response from the Callee —
private def handleFailureAsync(block: => Future[R], | |
maybeException: Option[Throwable], | |
atomicCounter: AtomicInteger, | |
maxFailures: Int): Future[CircuitResult[R]] = { | |
val currentFailureCount = atomicCounter.incrementAndGet() | |
circuitLogger(s"[${state.get()}-error-count = $atomicCounter] $maybeException") | |
if (currentFailureCount > maxFailures) { | |
// on consecutive max-failure limit reached | |
openCircuit() | |
handleOpenAsync // returns Future.successful(handleOpen) | |
} else | |
maybeException match { | |
case Some(exception) if ignoreException(exception) => | |
// treat it as a success | |
atomicCounter.decrementAndGet() | |
closeCircuit() | |
executeAsync(block) | |
case Some(exception) => Future.successful(CircuitFailure(exception)) | |
case None => Future.successful(new Exception("Invalid result excpetion")) | |
} | |
} |
Handling closed circuit— Executing the provided block of code and yield a success or failure as the result asynchronously, while maintaining/updating the count of consecutive failures.
private def handleClosedAsync( | |
block: => Future[R] | |
): Future[CircuitResult[R]] = { | |
block | |
.flatMap { value => | |
// on successful result from the Callee | |
if (failedWhen(value)) { | |
// if successful result is to be considered a failure | |
handleFailureAsync( | |
Future.successful(value), // evaluated block value | |
None, // no exception | |
closedConsecutiveFailureCount, | |
threshold | |
) | |
} else { | |
// true success | |
closedConsecutiveFailureCount.set(0) | |
Future.successful(CircuitSuccess(value)) | |
} | |
}(ec) | |
.recoverWith { | |
// exception from the Callee | |
case exception => | |
handleFailureAsync( | |
block, | |
Some(exception), | |
closedConsecutiveFailureCount, | |
threshold | |
) | |
} | |
} |
Handling Half-Open Circuit —
private def handleHalfOpenAsync( | |
block: => Future[R] | |
): Future[CircuitResult[R]] = { | |
block | |
.flatMap { value => | |
if (failedWhen(value)) { | |
// if success result is to be considered a failure | |
handleFailureAsync( | |
Future.successful(value), // evaluated block's value | |
None, // no exception | |
halfOpenConsecutiveFailuresCount, | |
maxAllowedHalfOpen | |
) | |
} else { | |
// on success | |
closeCircuit() | |
Future.successful(CircuitSuccess(value)) | |
} | |
}(ec) | |
.recoverWith { | |
case exception => | |
// on failure | |
handleFailureAsync( | |
block, | |
Some(exception), | |
halfOpenConsecutiveFailuresCount, | |
maxAllowedHalfOpen | |
) | |
} | |
} |
Handling open circuit— return the successful future with CircuitSuccess wrapping the default value.
private def handleOpenAsync: Future[CircuitResult[R]] = { | |
Future.successful(CircuitSuccess(defaultAction)) | |
} |
Half-Opener Background Timer — Will trip the circuit to half-open after timeout reached since opening the circuit.
new Timer(s"$name-circuit-opener").scheduleAtFixedRate(new TimerTask { | |
override def run(): Unit = state.get() match { | |
case Open if isTimeOutOver => | |
circuitLogger("Max open timeout reached.") | |
halfOpenCircuit() | |
case _ => () | |
} | |
}, 0L, 10L) |
CircuitImplicits — Implicits making the use of our Circuit class a breeze!
object CircuitImplicits { | |
implicit class AsyncBlockExtensions[R](block: => Future[R])( | |
implicit c: Circuit[R] | |
) { | |
def executeAsync(implicit ex: ExecutionContext): Future[CircuitResult[R]] = | |
c.executeAsync(block) | |
} | |
} |
We can now do something like:
implicit val sampleCircuit: Circuit[Int] = new Circuit[Int](
"sample-circuit",
5, // max allowed consecutive closed failures
5.seconds, // half-opener timeout
1, // max allowed consecutive half-open failures
-1, // invalid success result
println
)
//this will use the above created circuit to execute
Future.successful(2 + 2).executeAsync
Testing it out — You can find the test cases here.