Nov 24, 2018

[Golang] protoactor-go 201: How middleware works to intercept incoming and outgoing messages

As described in a previous article, Protoactor-go 101: How actors communicate with each other, the core of actor system is message passing. Fine-grained actors work on their own tasks, communicate with each other by passing messages and achieve a bigger task as a whole. To intercept the incoming and outgoing messages to execute tasks before and after the message handling, protoactor supports middleware mechanism.
Protoactor’s plugin mechanism is built on top of this middleware mechanism so knowing middleware is vital to building highly customized actor.

Types of middleware

To intercept incoming and outgoing messages, two kinds of middleware are provided: actor.InboundMiddleware and actor.OutboundMiddleware. Inbound middleware is invoked when a message reaches an actor; Outbound middleware is invoked when a message is sent to another actor. Multiple middlewares can be registered for a given actor so it is possible to divide middleware’s tasks into small pieces and create a middleware for each of them in favor of maintainability.

Under the hood

To register a middleware to an actor, use Props.WithMiddleware() or Props.WithOutboundMiddleware(). Passed middleware implementations are appended to an internal slice so they can be referenced on actor construction.

// Assign one or more middlewares to the props
func (props *Props) WithMiddleware(middleware ...InboundMiddleware) *Props {
   props.inboundMiddleware = append(props.inboundMiddleware, middleware...)
   return props
}
  
func (props *Props) WithOutboundMiddleware(middleware ...OutboundMiddleware) *Props {
   props.outboundMiddleware = append(props.outboundMiddleware, middleware...)
   return props
}

On actor construction, stashed middlewares are transformed into a middleware chain. At this point a group of one or more middlewares are combined together and shape one actor.ActorFunc(). Middlewares are processed in reversed order in this process so they are executed in the registered order on message reception.

func makeInboundMiddlewareChain(middleware []InboundMiddleware, lastReceiver ActorFunc) ActorFunc {  
   if len(middleware) == 0 {  
      return nil  
   }  
  
   h := middleware[len(middleware)-1](lastReceiver)  
   for i := len(middleware) - 2; i >= 0; i-- {  
      h = middleware[i](h)  
   }  
   return h  
}  
  
func makeOutboundMiddlewareChain(outboundMiddleware []OutboundMiddleware, lastSender SenderFunc) SenderFunc {  
   if len(outboundMiddleware) == 0 {  
      return nil  
   }  
  
   h := outboundMiddleware[len(outboundMiddleware)-1](lastSender)  
   for i := len(outboundMiddleware) - 2; i >= 0; i-- {  
      h = outboundMiddleware[i](h)  
   }  
   return h  
}

When actor.Context handles an incoming message, the actor.Context that holds all the contextual information including message itself is passed to that middleware chain. One important thing to notice at this point is that the original message reception method, actor.Receive(), is wrapped in an anonymous function to match actor.ActorFunc() signature and is registered to the very end of the middleware chain. So when the context information is passed to the middleware chain, middlewares are executed in the registered order and actor.Receive() is called at last.

func (ctx *localContext) processMessage(m interface{}) {  
   ctx.message = m  
  
   if ctx.inboundMiddleware != nil {  
      ctx.inboundMiddleware(ctx)  
   } else {  
      if _, ok := m.(*PoisonPill); ok {  
         ctx.self.Stop()  
      } else {  
         ctx.receive(ctx)  
      }  
   }  
  
   ctx.message = nil  
}

Likewise, when a message is being sent to another actor, the registered outbound middlewares are executed in the registered order.

Example

Below is an example that leaves log messages around Actor.Receive() invocation and Context.Request().

Below is an example that leaves log messages when message a comes and goes. As the comment suggests, inbound middleware can run its task before and/or after actor.Receive() execution. Similarly, outbound middleware can run a task before and/or after the original message sending logic. The event occurrence order is described in the comment section of the example.

Conclusion

Middleware mechanism can be used to run a certain logic before and after the original method invocation in an AOP-ish manner.

Nov 23, 2018

[Golang] protoactor-go 101: How actor.Future works to synchronize concurrent task execution

Fine-grained actors concentrate on their own tasks and communicate with others to accomplish a bigger task as a whole. That is how a well-designed actor system works. Because each actor handles a smaller part of an incoming task, pass it to another and then proceed to work on the next incoming task, actors can efficiently work in a concurrent manner to accomplish more tasks in the same amount of time. To pass a result of one actor’s job execution to another or to execute a task when another actor’s execution is done, actor.Future mechanism comes in handy.

Introducing Future

The basic idea of “future” in this context is quite similar to that of future and promise implemented by many modern programming languages to coordinate concurrent executions.

For example, Future’s Javadoc reads as below:

AFuturerepresents the result of an asynchronous computation. Methods are provided to check if the computation is complete, to wait for its completion, and to retrieve the result of the computation.

In protoactor-go, actor.Future provides methods to wait for destination actor’s response in a blocking manner, to pipe one actor’s response to another in a non-blocking manner and to execute a callback function when destination actor’s response arrives.

How this works under the hood

The implementation of protoactor-go’s Future mechanism is composed of actor.Future and actor.futureProcess, where actor.Future provides common Future methods while actor.futureProcess wraps actor.Future and works as a actor.Process. A developer may call Context.RequestFuture() or PID.RequestFuture() instead of commonly used Context.Request() or PID.Request() to acquire actor.Future that represents a non-determined result.

In those method calls, actor.NewFuture() is called with preferred timeout duration as an argument. In actor.NewFuture(), actor.Future and its wrapper – actor.futureProcess – are both constructed, actor.futureProcess is registered to protoactor-go’s internal process registry for a later reference and actor.Future is returned.

As depicted in the below code fragment, actor.Future’s actor.PID is set as a Sender of the requesting message. So when the receiving actor responds to the sender, the response is actually sent to the actor.Future’s actor.PID. When actor.Future receives the response, the result of the Future is set and becomes available to the subscriber.

func (ctx *localContext) RequestFuture(pid *PID, message interface{}, timeout time.Duration) *Future {  
   future := NewFuture(timeout)  
   env := &MessageEnvelope{  
      Header:  nil,  
  Message: message,  
  Sender:  future.PID(),  
  }  
   ctx.sendUserMessage(pid, env)  
  
   return future  
}

The usage of the actor.Future returned by those methods are covered in later sections with detailed example codes. All example codes are available at github.com/oklahomer/protoactor-go-future-example.

Future.Wait() / Future.Result()

To wait until the execution times out or the destination actor responds, use Future.Wait() or Future.Result(). They both internally call a blocking private method, Future.wait(), to block till the preconfigured timeout duration passes or the execution completes. The only difference is whether to return the result of the computation; Future.Wait() simply waits for completion just like WaitGroup.Wait() while Future.Result() waits for completion and additionally returns the result ot the computation as its name suggests.

These methods are useful to retrieve the destination actors response and proceed own logic but are not usually preferred because the idea of such synchronous execution conflicts with the nature of actor model: concurrent computation.

Future.PipeTo()

While Future.Wait() and Future.Result() block until timeout or task completion, Future.PipeTo() asynchronously sends the result of computation to another actor. This can be a powerful tool when only the origination actor knows which actor should receive the result of a worker actor’s task; Actor A delegates a task to worker actor B but B does not know to what actor to pass the result message to. One important thing is that the message is transfered to the destination actor when and only when the comptation completes before it times out. Otherwise the response is sent to the dead letter mailbox.
Becausee this works in an asynchronous manner, origination actor can handle incoming messages right after dispatching tasks to worker actors no matter how long the worker actors take to respond.

Context.AwaitFuture()

The task execution is done in an asynchronous manner like Future.PipeTo but, because this can refer to contextual information, a callback function is still called even when the computation times out. Context.Message() contains the same message as when the origination actor’s Actor.Receive() is called so a developer does not have to add a workaround to copy the message to refer from the callback function.

Conclusion

As described in above sections, Future provides various methods to synchronize concurrent execution. While concurrent execution is the core of actor model, these come in handy to synchronize concurrent execution with minimal cost.