May 5, 2021

[Golang] protoactor-go 301: How proto.actor's clustering works to achieve higher availability

NOTE: This article is now officially hosted at https://proto.actor/docs/clusterintro/ with better diagrams and additional notes for other languages’ implementations.

The previous articles described the basics of proto.actor including its concepts, terms, messaging mechanism, middleware mechanism, and plugin mechanism. This article describes the benefit of clustering architecture, introduces its terms and concepts, and demonstrates how to work with this. For those who are not familiar with proto.actor, starting from [Golang] protoactor-go 101: Introduction to golang’s actor model implementation is recommended.

The Basic Ideas

To better understand why and when clustering architecture should be adopted, the below subsections show the benefits of actor model in general, its remoting architecture, and its clustering architecture.

Actor Model in General

With the power of actor model, a developer has easier access to concurrent programming. A mutable state is encapsulated in an actor, and its messaging queue – a mailbox – guarantees messages are passed to the actor one at a time. The actor processes a corresponding task against the receiving message, updates its mutable state, and then receives the next message. Therefore only one job is run by an actor at any moment. This solves the difficulty of concurrency and mutable states, and lets developers concentrate on business logic.

Remoting

While actor model eases the concurrent programming with mutatable states, the system is hard to scale as long as the actor system is hosted by a single machine. Remoting is a good solution to scale out the actor system among multiple machines.

When one sends a message to a specific actor, the sender is not directly referring to the receiving actor itself. Instead, a reference to the actor is exposed to developers for messaging. This reference is called PID, which is short for “process id.” In proto.actor, the concept of “process” or “process id” is quite similar to that of Erlang. For those who are familiar with Akka’s actor model, PID can be equivalent to ActorRef.

This PID knows where the actual actor instance is located and how to communicate with it. The location may be within the same host machine; Maybe not. The important thing is that a sender does not have to pay extra attention to the data serialization/deserialization and data transport. In short, one can communicate with an actor hosted by another machine just like the way communicating with a locally hosted actor.

With such location transparency, multiple machines can collaborate with each other and work as a single actor system. This is remoting – a key to scaling out the actor system.

Clustering

While remoting is an important feature to build a scalable actor system, there still is room to improve the actor system’s availability as a whole. A sender enqueues a message to the remotely hosted actor’s mailbox, but the destination host’s availability is not always guaranteed. A hardware outage or power outage on a specific host may occur at any moment. As a matter of fact, even a daily operation such as application deployment may lower the service availability instantaneously. In such a case, messaging a remotely hosted actor results in a dead letter. To work as a distributed actor system, all machines must always be available and ready to interact with each other, or otherwise a messaging fails. Keeping a hundred percent availability for all time is not realistic or pragmatic.

Clustering is built with a service discovery mechanism on top of the remoting feature to give extra robustness to work with the aforementioned availability issue. Multiple server instances work as a single cluster to provide specific types of actors. When one or more server instances go down, such an event is detected by the service discovery mechanism, and messages are always rooted to active actors on active instances.

The following sections introduce some concepts and terms, how a specific actor is located at a specific server instance before and after a topology change, and some executable code to work with clustering.

Concepts and Terms

Node

When an application process joins cluster membership, the application process is explicitly called a “node.” This may be effectively equal to a server instance especially when one server instance hosts one application process. However, multiple application processes can technically run on a single server instance at the same time, so there still is a difference.

Cluster Provider

The core of clustering is cluster provider; this provides a consistent view of active nodes. Once the application process starts, the node constantly interacts with cluster provider to update its own availability and gets other nodes’ membership status. With the up-to-date topology view, proto.actor automatically distributes actors across cluster nodes based on partitioning by consistent hash.

Proto.actor supports several cluster provider implementations:

  • Consul … This implementation uses HashiCorp’s Consul for service discovery. This was the first implementation of cluster provider.
  • etcd … This is an etcd version of cluster provider implementation. If one has experience with Kubernetes, this implementation may be easier to start with.
  • Automanaged … This does not use any centralized service discovery system, but instead each member ping each other to manage membership.

Virtual Actor

Proto.actor’s clustering mechanism borrows the idea of “virtual actor” from Microsoft Orleans, where developers are not obligated to handle an actor’s lifecycle. If the destination actor is not yet spawned when the first message is sent, proto.actor spawns one and lets this newborn actor handle the message; if the actor is already present, the existing actor simply receives the incoming message. From message sender’s point of view, the destination actor is always guaranteed to “exist.” This is highly practical and works well with the clustering mechanism. An actor’s hosting node may crash at any moment, and the messages to that actor may be redirected to a new hosting node. If a developer must be aware of the actor’s lifecycle, a developer is obligated to be aware of such topology change to re-spawn the failing actor. The concept of virtual actor hides such complexity and eases the interaction.

Activation

As described in the above “virtual actor” section, an actor always exists. Instead of explicitly spawn a new actor, one may “activate” the destination actor by getting the PID of the destination actor. Proto.actor internally checks the existence of the destination actor and spawns one if one is not present.

An actor may disappear when a hosting node crashes, or an actor may stop itself when an idle interval with no message reception exceeds a certain period of time. Activation works well to re-spawn such actors with no extra care.

Passivation

Once the grain is initialized by activation, the grain always exists because of the nature of the virtual actor. This, however, is not ideal in terms of limited server resources. Proto.actor lets a developer specify a timeout interval, where the grain terminates itself when this interval passes after the last message reception time.

Grain

With virtual actor model, an actor is specifically called a “grain.” However, the implementation of the grain is quite the same as any other actor. A notable difference is that proto.actor automatically spawns the grain on the initial message reception.

Kind

To explicitly state which node is capable of providing what types of grains, a developer needs to register the “kind” on cluster membership initiation. By registering the mapping of a kind and a corresponding grain construction function, the cluster provider knows the node is capable of hosting a set of grains, and the client can compute to which node it must send an activation request.

Ownership

Other than a grain itself, an “ownership” is an important concept to understand how grains are located in one specific node. The cluster’s topology view changes when a node goes down or a new node is added to the cluster membership. One may assume that grain must be relocated to another node because grains are distributed by using consistent hashing. That, however, is a relatively complicated task. A grain may have its own state and behavior, so serializing them and transferring that information to another node is difficult.

Instead of transferring a grain itself, proto.actor only transfers the “ownership” of the grain. An owner knows where the grain is currently located. When sending a message to a specific grain, proto.actor calculates the location of the “owner” instead of the grain with consistent hashing, and then gets the grain’s address from the “owner.” Therefore, an owner and its subordinating grains do not necessarily exist on the same node. The later section covers how the ownership is transferred.

Communication Protocol

Because the topology view may change at any moment and the ownership can be transferred at any moment as well, the fire-and-forget model of messaging may fail from time to time. For example, one may send a message to a specific grain at the same time as the topology change. The ownership could be transferred when the message is received by the previous owner node. To make sure a message is delivered to the destination grain, a gRPC-based communication is available.

Once an IDL file for gRPC is given, a messaging method with a retrial logic is generated by proto.actor. This method computes the location of the ownership and sends an activation request again when the initial messaging fails due to the aforementioned ownership transfer. This gRPC-based communication gives more robustness, but the nature of the request/response communication model may affect performance. In such a case, a developer may simply send a message with the pre-defined messaging methods such as Context.Send(), Context.Request() and Context.RequestFuture().

Locating a Grain

If a developer has experience working on storage sharding, one might be familiar with the idea of consistent hashing. This is a powerful mechanism to decide in a reproducible manner which node on a virtual ring topology has the ownership of a given “key,” and also requires a fewer re-location on topology change. Proto.actor employs this algorithm to decide where the grain – more precisely the owner – must be located.

Initial State

The below image describes how a grain is located. With the latest membership shared by cluster provider, a message sender computes the hash value of the destination grain and elicits where the recipient grain’s owner exists based on the partitioning by consistent hash. Once the owner’s location is known, a sender sends an activation request to the owner. The owner receives the message and sees if the grain instance already exists. If exist, then return the PID of the grain; if not, then spawn one and returns its PID. This is the simplest form of identity lookup.

Topology Update

When the cluster membership is updated and the topology changes due to the Node B’s outage, all cluster members acquire such an event from the cluster provider. Each server instance then re-computes the hash value of its owning grains and checks if it still owns them. If a grain needs to be owned by another server instance, the ownership is transferred to the new owner. This guarantees that owners are always placed on each ideal node that is determined by consistent hashing while grain instances stay where they are currentlylocated.

Grain Re-activation

After the topology refresh, a sender re-computes where the owner of Actor 2 exists. This sends an activation request to the new owner – node A –, and node A returns the PID of actor 2 on node D. The sender now can send a message to actor 2 on node D. In this way, the existing grain and its internal state is not re-located on topology change; only the ownership does.

For better performance, proto.actor internally caches the location of known grains and refresh this when topology view changes.

Messaging with a Grain

With the basics introduced in the previous sections, this section works on a project where a pinger actor sends a “ping” message and a ponger grain sends back a “pong” message. In addition to simply sending an empty signal, a ping message contains a cumulative count of the ping messages being sent; pong message contains the count a ping message contained.

Below is the detailed spec.

  • Use automanaged cluster provider to minimize the implementation
  • One application process hosts a pinger actor and sends a ping message every second
  • Another application process hosts a ponger grain
    • This grain is capable of handling gRPC-based Ping() request and a plain message
    • This grain is initialized with a passivation interval setting of ten seconds

The complete code is located at github.com/oklahomer/protoactor-go-sender-example.

Message Definition

Because messages are sent from one node to another over wire, they must be serializable. Proto.actor employs pre-existing, well-known Protocol Buffers for data serialization instead of inventing a new serialization protocol. Before getting started, be sure to install protoc and gogoprotobuf’s protoc-gen-gogoslick to generate Golang code. In addition to those tools, one proto.actor-specific tool is required. Run the below command to install the binary. A developer needs to specify dev branch by adding @dev at the end since this is not yet merged to master branch as of 2021-05-03.

$ go get github.com/AsynkronIT/protoactor-go/protobuf/protoc-gen-gograinv2@dev

Below is an example of two messages: PingMessage and PongMessage. These two message definitions are sufficient to send ping and pong messages to each other. However, a service definition is required to utilize the gRPC-based messaging. That is Ponger. Ponger lets the caller send a Ping message with SendPing() method and the receiver sends back Pong message.

syntax = "proto3";
package messages;

message PingMessage {
    uint64 cnt = 1;
}

message PongMessage {
    uint64 cnt = 1;
}

service Ponger {
    rpc Ping(PingMessage) returns (PongMessage) {}
}

Name this file “protos.proto” and locate under cluster/messages. When the IDL file is ready, run the below command to generate required Go code. Two files other than the IDL – protos.pb.go and protos_protoactor.go – are generated.

$ protoc --gogoslick_out=. ./cluster/messages/protos.proto
$ protoc --gograinv2_out=. ./cluster/messages/protos.proto 
$ tree ./cluster 
cluster
└── messages
    ├── protos.pb.go
    ├── protos.proto
    └── protos_protoactor.go

1 directory, 3 files

protos_protoactor.go defines Proto.actor-specific interface, struct, and function. They are covered in the below sections:

Grain Implementation

PongerActor

protos_protoactor.go contains a PongerActor struct in it, which receives the incoming message and makes a gRPC-based method call or simply proxies the message to a defaut message reception method. A developer only has to provide such methods by providing Ponger implementation.

Ponger Interface

Ponger interface is defined in protos_protoactor.go, of which a developer must provide an implementation to set up a ponger grain.

// Ponger interfaces the services available to the Ponger
type Ponger interface {
	Init(id string)
	Terminate()
	ReceiveDefault(ctx actor.Context)
	Ping(*PingMessage, cluster.GrainContext) (*PongMessage, error)
	
}

A common method for initialization – Init() – is already implemented by cluster.Grain so a Ponger implementation can re-use this by embedding cluster.Grain as below:

type ponger struct {
	cluster.Grain
}

However, Terminate(), ReceiveDefault() and Ping() still need to be implemented by a developer. Terminate() is called on passivation right before PongerActor stops and hence the subordinating ponger instance also must stop. ReceiveDefault() is a method to receive any message that are not expected to be handled in gRPC manner; Ping() is a method to recieve PingMessage and return PongMessage in gRPC manner.

type ponger struct {
	cluster.Grain
}

var _ messages.Ponger = (*ponger)(nil) // This guarantees ponger implements messages.Ponger.

func (*ponger) Terminate() { 
	// A virtual actor always exists, so this usually does not terminate once the actor is initialized.
	// However, a timeout can be set so a virtual actor terminates itself when no message comes for a certain period of time.
	// Do the finalization if required.
}

func (*ponger) ReceiveDefault(ctx actor.Context) {
	// Do something with a received message. Not necessarily in request-response manner.
}

func (*ponger) Ping(ping *messages.PingMessage, ctx cluster.GrainContext) (*messages.PongMessage, error) {
	// Receive ping and return pong in gRPC-based protocol
	return nil, nil
}

Method implementations could be somewhat like below. Because the actor struct is already generated and exported to protos_protoactor.go by protoc-ge-gograinv2, the implementations are pretty simple.

// Terminate takes care of the finalization.
func (p *ponger) Terminate() {
	// Do finalization if required. e.g. Store the current state to storage and switch its behavior to reject further messages.
	// This method is called when a pre-configured idle interval passes from the last message reception.
	// The actor will be re-initialized when a message comes for the next time.
	// Terminating the idle actor is effective to free unused server resource.
	//
	// A poison pill message is enqueued right after this method execution and the actor eventually stops.
	log.Printf("Terminating ponger: %s", p.ID())
}

// ReceiveDefault is a default method to receive and handle incoming messages.
func (*ponger) ReceiveDefault(ctx actor.Context) {
	switch msg := ctx.Message().(type) {
	case *messages.PingMessage:
		pong := &messages.PongMessage{Cnt: msg.Cnt}
		log.Print("Received ping message")
		ctx.Respond(pong)

	default:
		// Do nothing

	}
}

// Ping is a method to support gRPC Ponger service.
func (*ponger) Ping(ping *messages.PingMessage, ctx cluster.GrainContext) (*messages.PongMessage, error) {
	// The sender process is not a sending actor, but a future process
	log.Printf("Sender: %+v", ctx.Sender())

	pong := &messages.PongMessage{
		Cnt: ping.Cnt,
	}
	return pong, nil
}

Overall ponger process

To activate the ponger grain, a process must be defined as below code. Comments are added to each steps.

package main
import (
"github.com/AsynkronIT/protoactor-go/actor"
"github.com/AsynkronIT/protoactor-go/cluster"
"github.com/AsynkronIT/protoactor-go/cluster/automanaged"
"github.com/AsynkronIT/protoactor-go/remote"
"log"
"os"
"os/signal"
"protoactor-go-sender-example/cluster/messages"
"time"
)
// ponger handles the incoming messages.
// This supports gRPC Ponger service and plain message handling.
type ponger struct {
cluster.Grain
}
var _ messages.Ponger = (*ponger)(nil)
// Terminate takes care of the finalization.
func (p *ponger) Terminate() {
// Do finalization if required. e.g. Store the current state to storage and switch its behavior to reject further messages.
// This method is called when a pre-configured idle interval passes from the last message reception.
// The actor will be re-initialized when a message comes for the next time.
// Terminating the idle actor is effective to free unused server resource.
//
// A poison pill message is enqueued right after this method execution and the actor eventually stops.
log.Printf("Terminating ponger: %s", p.ID())
}
// ReceiveDefault is a default method to receive and handle incoming messages.
func (p *ponger) ReceiveDefault(ctx actor.Context) {
log.Printf("A plain message is sent from sender: %+v", ctx.Sender())
switch msg := ctx.Message().(type) {
case *messages.PingMessage:
log.Print("Received ping message")
pong := &messages.PongMessage{Cnt: msg.Cnt}
ctx.Respond(pong)
default:
}
}
// Ping is called when gRPC-based request is sent against Ponger service.
func (p *ponger) Ping(ping *messages.PingMessage, ctx cluster.GrainContext) (*messages.PongMessage, error) {
// The sender process is not a sending actor, but a future process
log.Printf("Received Ping call from sender: %+v", ctx.Sender())
pong := &messages.PongMessage{
Cnt: ping.Cnt,
}
return pong, nil
}
func main() {
// Setup actor system
system := actor.NewActorSystem()
// Register ponger constructor.
// This is called when the wrapping PongerActor is initialized.
// PongerActor proxies messages to ponger's corresponding methods.
messages.PongerFactory(func() messages.Ponger {
return &ponger{}
})
// Prepare remote env that listens to 8080
// Messages are sent to this port.
remoteConfig := remote.Configure("127.0.0.1", 8080)
// Configure cluster provider to work as a cluster member.
// This node uses port 6331 for cluster provider, and register itself -- localhost:6331" -- as cluster member.
cp := automanaged.NewWithConfig(1*time.Second, 6331, "localhost:6331")
// Register an actor constructor for the Ponger kind.
// With this registration, the message sender and other cluster nodes know this node is capable of providing Ponger.
// PongerActor will implicitly be initialized when the first message comes.
clusterKind := cluster.NewKind(
"Ponger",
actor.PropsFromProducer(func() actor.Actor {
return &messages.PongerActor{
// The actor stops when 10 seconds passed since the last message reception.
// When the next
Timeout: 10 * time.Second,
}
}))
clusterConfig := cluster.Configure("cluster-example", cp, remoteConfig, clusterKind)
c := cluster.New(system, clusterConfig)
// Start as a cluster member.
// Use StartClient() when this process is not a member of cluster nodes but required to send messages to cluster grains.
c.Start()
// Run till signal comes
finish := make(chan os.Signal, 1)
signal.Notify(finish, os.Interrupt, os.Kill)
<-finish
}
view raw main.go hosted with ❤ by GitHub

Sender Implementation

messages.GetPongerGrainClient()

For a message sender, protos_protoactor.go provides GetPongerGrainClient() function. By calling this function, one can acquire PongerGrainClient instance to initiate gRPC request with PongerGrainClient.Ping(). Making a request in gRPC manner is preferable while the fire-and-forget messaging method such as Context.Send() also works to send message to the destination grain. The gRPC request method calls Cluster.Call to get a hold of ponger grain’s PID, where it retries up to pre-defined threshold count to get the destination PID. As introduced in “Communication Protocol” section, the ownership of the grain may transfer at the same time as one sends a message to it. Retrial logic is vital to make sure the message is actually received by the destination grain. One can pass the retry setting
The implementation can be somewhat like below:

// Setup cluster
c := cluster.Configure(...)
// Get PID of ponger grain
grain := messages.GetPongerGrainClient(clustr, "ponger-1")

// Build a PingMessage payload and make a gRPC request.
ping := &messages.PingMessage{
	Cnt: 1,
}

// Explicitly define the retrial count
option := cluster.NewGrainCallOptions(c).WithRetry(3)

// Make a request and receive a response
pong, err := grain.Ping(ping, option)

Overall pinger process

Below is the example code to run pinger actor.

package main
import (
"github.com/AsynkronIT/protoactor-go/actor"
"github.com/AsynkronIT/protoactor-go/cluster"
"github.com/AsynkronIT/protoactor-go/cluster/automanaged"
"github.com/AsynkronIT/protoactor-go/remote"
"log"
"os"
"os/signal"
"protoactor-go-sender-example/cluster/messages"
"time"
)
var cnt uint64 = 0
type pingActor struct {
cluster *cluster.Cluster
cnt uint
}
func (p *pingActor) Receive(ctx actor.Context) {
switch ctx.Message().(type) {
case struct{}:
cnt += 1
ping := &messages.PingMessage{
Cnt: cnt,
}
client := messages.GetPongerGrainClient(p.cluster, "ponger-1")
option := cluster.NewGrainCallOptions(p.cluster).WithRetry(3)
pong, err := client.Ping(ping, option)
if err != nil {
log.Print(err.Error())
return
}
log.Printf("Received %v", pong)
case *messages.PongMessage:
// Never comes here.
// When the pong grain responds to the sender's gRPC call,
// the sender is not a ping actor but a future process.
log.Print("Received pong message")
}
}
func main() {
// Setup actor system
system := actor.NewActorSystem()
// Prepare remote env that listens to 8081
remoteConfig := remote.Configure("127.0.0.1", 8081)
// Configure cluster on top of the above remote env
// This node uses port 6330 for cluster provider, and add ponger node -- localhost:6331 -- as member.
// With automanaged implementation, one must list up all known nodes at first place to ping each other.
// Note that this node itself is not registered as a member node because this only works as a client.
cp := automanaged.NewWithConfig(1*time.Second, 6330, "localhost:6331")
clusterConfig := cluster.Configure("cluster-example", cp, remoteConfig)
c := cluster.New(system, clusterConfig)
// Start as a client, not as a cluster member.
c.StartClient()
// Start ping actor that periodically send "ping" payload to "Ponger" cluster grain
pingProps := actor.PropsFromProducer(func() actor.Actor {
return &pingActor{
cluster: c,
}
})
pingPid := system.Root.Spawn(pingProps)
// Subscribe to signal to finish interaction
finish := make(chan os.Signal, 1)
signal.Notify(finish, os.Interrupt, os.Kill)
// Periodically send ping payload till signal comes
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
system.Root.Send(pingPid, struct{}{})
case <-finish:
log.Print("Finish")
return
}
}
}
view raw main.go hosted with ❤ by GitHub

Conclusion

As illustrated in this article, clustering is a good way to scale the actor system and have higher availability. A developer can interact with actors in the same way as interacting with a local one because proto.actor takes care of locating the destination grain, grain activation, and data transport. Thanks to such architecture, a developer may concentrate on the business logic instead of designing an architecture from scratch.

Further Readings

Dec 6, 2018

[Golang] protoactor-go 201: Use plugins to add behaviors to an actor

The previous article, “How middleware works to intercept incoming and outgoing messages”, introduced how middlewares can be used to add behaviors without modifying an actor’s implementation. This is a good AOP-ish approach for multiple types of actors to execute a common procedure such as logging on message receiving/sending. A plugin mechanism is implemented on top of this middleware mechanism to run a specific task on actor initialization and message reception to enhance an actor’s ability. This article covers the implementation of this plugin feature and how this can be used.

Under the Hood

Implementing plugin is as easy as fulfilling the below plugin.plugin interface.

type plugin interface {
  OnStart(actor.Context)
  OnOtherMessage(actor.Context,  interface{})
}

When a plugin.plugin implementation is passed to plugin.Use(), this wraps the given plugin and return in a form of actor.InboundMiddleware so this can be set to actor.Props as a middleware.

func  Use(plugin  plugin)  func(next  actor.ActorFunc)  actor.ActorFunc  {
  return  func(next  actor.ActorFunc)  actor.ActorFunc  {
    fn  :=  func(context  actor.Context)  {
      switch  msg  :=  context.Message().(type)  {
      case  *actor.Started:
        plugin.OnStart(context)
      default:
        plugin.OnOtherMessage(context,  msg)
      }

      next(context)
    }
    return  fn
  }
}

As shown in the above code fragment, plugin.OnStart is called on actor initialization; plugin.OnOtherMessage is called on subsequent message receptions. A developer may initialize plugin on plugin.OnStart so its logic can run on other message receptions. Remember that next(context) is called at the end of its execution so the actor’s actor.Receive() is called after the plugin logic runs. A minimal implementation can be somewhat like below:

package main
import (
"github.com/AsynkronIT/protoactor-go/actor"
"github.com/AsynkronIT/protoactor-go/plugin"
)
type myPlugin struct {
}
func (_ *myPlugin) OnStart(actor.Context) {
// Do nothing
}
func (_ *myPlugin) OnOtherMessage(actor.Context, interface{}) {
// Do nothing
}
func main() {
// Construct plugin implementation
myPlugin := &myPlugin{}
// Wrap plugin implementation in a form of InboundMiddleware
middleware := plugin.Use(myPlugin)
props := actor.
FromFunc(func(ctx actor.Context) {
// Some logic comes here
}).
WithMiddleware(middleware) // Set as a middleware
pid := actor.Spawn(props)
pid.Tell("dummy message")
}
view raw main.go hosted with ❤ by GitHub

Example

A good example should be a passivation plugin provided by protoactor-go itself. This is a plugin that enables an idle actor to stop when no message comes in for a certain amount of time. Such plugin comes in handy when a developer employs cluster grain architecture because a grain actor is automatically initialized on first message reception and this lives forever without such self destraction mechanism. When a message is received after the destraction, another grain actor is automatically created. This initializes a timer on actor initialization, resets a timer on every message reception and stops the actor when timer ticks.

One important thing to mention here is that this plugin makes an extra effort to let an actor implement plugin.PassivationAware by embedding plugin.PassivationHolder in actor struct so a developer does not have to implement plugin.PassivationAware by oneself.

package plugin
import (
"log"
"time"
"github.com/AsynkronIT/protoactor-go/actor"
)
type PassivationAware interface {
Init(*actor.PID, time.Duration)
Reset(time.Duration)
Cancel()
}
type PassivationHolder struct {
timer *time.Timer
done bool
}
func (state *PassivationHolder) Reset(duration time.Duration) {
if state.timer == nil {
log.Fatalf("Cannot reset passivation of a non-started actor")
}
if !state.done {
state.timer.Reset(duration)
}
}
func (state *PassivationHolder) Init(pid *actor.PID, duration time.Duration) {
state.timer = time.NewTimer(duration)
state.done = false
go func() {
select {
case <-state.timer.C:
pid.Stop()
state.done = true
break
}
}()
}
func (state *PassivationHolder) Cancel() {
if state.timer != nil {
state.timer.Stop()
}
}
type PassivationPlugin struct {
Duration time.Duration
}
func (pp *PassivationPlugin) OnStart(ctx actor.Context) {
if a, ok := ctx.Actor().(PassivationAware); ok {
a.Init(ctx.Self(), pp.Duration)
}
}
func (pp *PassivationPlugin) OnOtherMessage(ctx actor.Context, msg interface{}) {
if p, ok := ctx.Actor().(PassivationAware); ok {
switch msg.(type) {
case *actor.Stopped:
p.Cancel()
default:
p.Reset(pp.Duration)
}
}
}
view raw passivation.go hosted with ❤ by GitHub

Thanks for that effort, an actor implementation can be as simple as below. This is obvious that, because the passivation implementation itself is implemented by embedded plugin.PassivationHolder, MyActor developer can separate the passivation procedure and concentrate on her own business.

type MyActor struct {  
  // Implement plugin.PassivationAware by embedding its default implementation: plugin.PassivationHolder
  PassivationHolder  
}  
  
func (state *MyActor) Receive(context actor.Context) {  
  switch context.Message().(type) {  
    // Do its own business
  }  
}

Conclusion

To add a pluggable behavior to an actor, a developer can provde a plugin by implementing plugin.plugin interface. By defining core interface and its embeddable default implementation of the plugin, it is quite easier to separate the areas of responsibility of a plugin and an actor.

Nov 24, 2018

[Golang] protoactor-go 201: How middleware works to intercept incoming and outgoing messages

As described in a previous article, Protoactor-go 101: How actors communicate with each other, the core of actor system is message passing. Fine-grained actors work on their own tasks, communicate with each other by passing messages and achieve a bigger task as a whole. To intercept the incoming and outgoing messages to execute tasks before and after the message handling, protoactor supports middleware mechanism.
Protoactor’s plugin mechanism is built on top of this middleware mechanism so knowing middleware is vital to building highly customized actor.

Types of middleware

To intercept incoming and outgoing messages, two kinds of middleware are provided: actor.InboundMiddleware and actor.OutboundMiddleware. Inbound middleware is invoked when a message reaches an actor; Outbound middleware is invoked when a message is sent to another actor. Multiple middlewares can be registered for a given actor so it is possible to divide middleware’s tasks into small pieces and create a middleware for each of them in favor of maintainability.

Under the hood

To register a middleware to an actor, use Props.WithMiddleware() or Props.WithOutboundMiddleware(). Passed middleware implementations are appended to an internal slice so they can be referenced on actor construction.

// Assign one or more middlewares to the props
func (props *Props) WithMiddleware(middleware ...InboundMiddleware) *Props {
   props.inboundMiddleware = append(props.inboundMiddleware, middleware...)
   return props
}
  
func (props *Props) WithOutboundMiddleware(middleware ...OutboundMiddleware) *Props {
   props.outboundMiddleware = append(props.outboundMiddleware, middleware...)
   return props
}

On actor construction, stashed middlewares are transformed into a middleware chain. At this point a group of one or more middlewares are combined together and shape one actor.ActorFunc(). Middlewares are processed in reversed order in this process so they are executed in the registered order on message reception.

func makeInboundMiddlewareChain(middleware []InboundMiddleware, lastReceiver ActorFunc) ActorFunc {  
   if len(middleware) == 0 {  
      return nil  
   }  
  
   h := middleware[len(middleware)-1](lastReceiver)  
   for i := len(middleware) - 2; i >= 0; i-- {  
      h = middleware[i](h)  
   }  
   return h  
}  
  
func makeOutboundMiddlewareChain(outboundMiddleware []OutboundMiddleware, lastSender SenderFunc) SenderFunc {  
   if len(outboundMiddleware) == 0 {  
      return nil  
   }  
  
   h := outboundMiddleware[len(outboundMiddleware)-1](lastSender)  
   for i := len(outboundMiddleware) - 2; i >= 0; i-- {  
      h = outboundMiddleware[i](h)  
   }  
   return h  
}

When actor.Context handles an incoming message, the actor.Context that holds all the contextual information including message itself is passed to that middleware chain. One important thing to notice at this point is that the original message reception method, actor.Receive(), is wrapped in an anonymous function to match actor.ActorFunc() signature and is registered to the very end of the middleware chain. So when the context information is passed to the middleware chain, middlewares are executed in the registered order and actor.Receive() is called at last.

func (ctx *localContext) processMessage(m interface{}) {  
   ctx.message = m  
  
   if ctx.inboundMiddleware != nil {  
      ctx.inboundMiddleware(ctx)  
   } else {  
      if _, ok := m.(*PoisonPill); ok {  
         ctx.self.Stop()  
      } else {  
         ctx.receive(ctx)  
      }  
   }  
  
   ctx.message = nil  
}

Likewise, when a message is being sent to another actor, the registered outbound middlewares are executed in the registered order.

Example

Below is an example that leaves log messages around Actor.Receive() invocation and Context.Request().

Below is an example that leaves log messages when message a comes and goes. As the comment suggests, inbound middleware can run its task before and/or after actor.Receive() execution. Similarly, outbound middleware can run a task before and/or after the original message sending logic. The event occurrence order is described in the comment section of the example.

package main
import (
"github.com/AsynkronIT/protoactor-go/actor"
"log"
"os"
"os/signal"
"syscall"
"time"
)
func newInboundMiddleware() actor.InboundMiddleware {
cnt := 0
return func(next actor.ActorFunc) actor.ActorFunc {
return func(ctx actor.Context) {
cnt++
log.Printf("Start handling incoming message #%d: %#v", cnt, ctx.Message())
next(ctx)
log.Printf("End handling incoming message #%d: %#v", cnt, ctx.Message())
}
}
}
func newOutboundMiddleware() actor.OutboundMiddleware {
cnt := 0
return func(next actor.SenderFunc) actor.SenderFunc {
return func(ctx actor.Context, target *actor.PID, envelope *actor.MessageEnvelope) {
cnt++
log.Printf("Start sending message #%d to %s", cnt, target.Id)
next(ctx, target, envelope)
log.Printf("End sending message #%d to %s", cnt, target.Id)
}
}
}
type ping struct{}
type pong struct{}
func main() {
pongProps := actor.
FromFunc(func(ctx actor.Context) {
switch ctx.Message().(type) {
case *ping:
ctx.Respond(&pong{})
}
})
pongPid, _ := actor.SpawnNamed(pongProps, "pong")
// Output should be somewhat like below.
// Because ping actor receives both signal of struct{}{} and a pong message of &pong{},
// the printed number of execution is doubled comparing to that of pong actor.
//
// 2018/11/24 12:59:25 Start handling incoming message #1: &actor.Started{}
// 2018/11/24 12:59:25 End handling incoming message #1: &actor.Started{}
// 2018/11/24 12:59:26 Start handling incoming message #2: struct {}{}
// 2018/11/24 12:59:26 Received signal
// 2018/11/24 12:59:26 Start sending message #1 to pong
// 2018/11/24 12:59:26 End sending message #1 to pong
// 2018/11/24 12:59:26 End handling incoming message #2: struct {}{}
// 2018/11/24 12:59:26 Start handling incoming message #3: &main.pong{}
// 2018/11/24 12:59:26 Received pong
// 2018/11/24 12:59:26 End handling incoming message #3: &main.pong{}
// 2018/11/24 12:59:27 Start handling incoming message #4: struct {}{}
// 2018/11/24 12:59:27 Received signal
// 2018/11/24 12:59:27 Start sending message #2 to pong
// 2018/11/24 12:59:27 End sending message #2 to pong
// 2018/11/24 12:59:27 End handling incoming message #4: struct {}{}
// 2018/11/24 12:59:27 Start handling incoming message #5: &main.pong{}
// 2018/11/24 12:59:27 Received pong
// 2018/11/24 12:59:27 End handling incoming message #5: &main.pong{}
// 2018/11/24 12:59:28 Start handling incoming message #6: struct {}{}
// 2018/11/24 12:59:28 Received signal
// 2018/11/24 12:59:28 Start sending message #3 to pong
// 2018/11/24 12:59:28 End sending message #3 to pong
// 2018/11/24 12:59:28 End handling incoming message #6: struct {}{}
// 2018/11/24 12:59:28 Start handling incoming message #7: &main.pong{}
// 2018/11/24 12:59:28 Received pong
// 2018/11/24 12:59:28 End handling incoming message #7: &main.pong{}
// ^C2018/11/24 12:59:29 Finish
pingProps := actor.
FromFunc(func(ctx actor.Context) {
switch ctx.Message().(type) {
case struct{}:
log.Print("Received signal")
ctx.Request(pongPid, &ping{})
case *pong:
log.Print("Received pong")
}
}).
WithMiddleware(newInboundMiddleware()).
WithOutboundMiddleware(newOutboundMiddleware())
pingPid, _ := actor.SpawnNamed(pingProps, "ping")
finish := make(chan os.Signal, 1)
signal.Notify(finish, os.Interrupt)
signal.Notify(finish, syscall.SIGTERM)
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
pingPid.Tell(struct{}{})
case <-finish:
pingPid.Stop()
pongPid.Stop()
log.Print("Finish")
return
}
}
}
view raw middleware.go hosted with ❤ by GitHub

Conclusion

Middleware mechanism can be used to run a certain logic before and after the original method invocation in an AOP-ish manner.

Nov 23, 2018

[Golang] protoactor-go 101: How actor.Future works to synchronize concurrent task execution

Fine-grained actors concentrate on their own tasks and communicate with others to accomplish a bigger task as a whole. That is how a well-designed actor system works. Because each actor handles a smaller part of an incoming task, pass it to another and then proceed to work on the next incoming task, actors can efficiently work in a concurrent manner to accomplish more tasks in the same amount of time. To pass a result of one actor’s job execution to another or to execute a task when another actor’s execution is done, actor.Future mechanism comes in handy.

Introducing Future

The basic idea of “future” in this context is quite similar to that of future and promise implemented by many modern programming languages to coordinate concurrent executions.

For example, Future’s Javadoc reads as below:

AFuturerepresents the result of an asynchronous computation. Methods are provided to check if the computation is complete, to wait for its completion, and to retrieve the result of the computation.

In protoactor-go, actor.Future provides methods to wait for destination actor’s response in a blocking manner, to pipe one actor’s response to another in a non-blocking manner and to execute a callback function when destination actor’s response arrives.

How this works under the hood

The implementation of protoactor-go’s Future mechanism is composed of actor.Future and actor.futureProcess, where actor.Future provides common Future methods while actor.futureProcess wraps actor.Future and works as a actor.Process. A developer may call Context.RequestFuture() or PID.RequestFuture() instead of commonly used Context.Request() or PID.Request() to acquire actor.Future that represents a non-determined result.

In those method calls, actor.NewFuture() is called with preferred timeout duration as an argument. In actor.NewFuture(), actor.Future and its wrapper – actor.futureProcess – are both constructed, actor.futureProcess is registered to protoactor-go’s internal process registry for a later reference and actor.Future is returned.

As depicted in the below code fragment, actor.Future’s actor.PID is set as a Sender of the requesting message. So when the receiving actor responds to the sender, the response is actually sent to the actor.Future’s actor.PID. When actor.Future receives the response, the result of the Future is set and becomes available to the subscriber.

func (ctx *localContext) RequestFuture(pid *PID, message interface{}, timeout time.Duration) *Future {  
   future := NewFuture(timeout)  
   env := &MessageEnvelope{  
      Header:  nil,  
  Message: message,  
  Sender:  future.PID(),  
  }  
   ctx.sendUserMessage(pid, env)  
  
   return future  
}

The usage of the actor.Future returned by those methods are covered in later sections with detailed example codes. All example codes are available at github.com/oklahomer/protoactor-go-future-example.

Future.Wait() / Future.Result()

To wait until the execution times out or the destination actor responds, use Future.Wait() or Future.Result(). They both internally call a blocking private method, Future.wait(), to block till the preconfigured timeout duration passes or the execution completes. The only difference is whether to return the result of the computation; Future.Wait() simply waits for completion just like WaitGroup.Wait() while Future.Result() waits for completion and additionally returns the result ot the computation as its name suggests.

package main
import (
"github.com/AsynkronIT/protoactor-go/actor"
"log"
"os"
"os/signal"
"syscall"
"time"
)
type pong struct {
}
type ping struct {
}
type pongActor struct {
timeOut bool
}
func (p *pongActor) Receive(ctx actor.Context) {
// Dead letter occurs because the PID of Future process ends and goes away when Future times out
// so the pongActor fails to send message.
switch ctx.Message().(type) {
case *ping:
var sleep time.Duration
if p.timeOut {
sleep = 2500 * time.Millisecond
p.timeOut = false
} else {
sleep = 300 * time.Millisecond
p.timeOut = true
}
time.Sleep(sleep)
ctx.Sender().Tell(&pong{})
}
}
type pingActor struct {
pongPid *actor.PID
}
func (p *pingActor) Receive(ctx actor.Context) {
switch ctx.Message().(type) {
case struct{}:
// Output becomes somewhat like below.
// See a diagram at https://raw.githubusercontent.com/oklahomer/protoactor-go-future-example/master/docs/wait/timeline.png
//
// 2018/10/13 17:03:22 Received pong message &main.pong{}
// 2018/10/13 17:03:24 Timed out
// 2018/10/13 08:03:26 [ACTOR] [DeadLetter] pid="nonhost/future$4" message=&{} sender="nil"
// 2018/10/13 17:03:26 Received pong message &main.pong{}
// 2018/10/13 17:03:28 Timed out
// 2018/10/13 08:03:30 [ACTOR] [DeadLetter] pid="nonhost/future$6" message=&{} sender="nil"
// 2018/10/13 17:03:30 Received pong message &main.pong{}
future := p.pongPid.RequestFuture(&ping{}, 1*time.Second)
// Future.Result internally waits until response comes or times out
result, err := future.Result()
if err != nil {
log.Print("Timed out")
return
}
log.Printf("Received pong message %#v", result)
}
}
func main() {
pongProps := actor.FromProducer(func() actor.Actor {
return &pongActor{}
})
pongPid := actor.Spawn(pongProps)
pingProps := actor.FromProducer(func() actor.Actor {
return &pingActor{
pongPid: pongPid,
}
})
pingPid := actor.Spawn(pingProps)
finish := make(chan os.Signal, 1)
signal.Notify(finish, os.Interrupt)
signal.Notify(finish, syscall.SIGTERM)
ticker := time.NewTicker(2 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
pingPid.Tell(struct{}{})
case <-finish:
return
log.Print("Finish")
}
}
}
view raw wait.go hosted with ❤ by GitHub

These methods are useful to retrieve the destination actors response and proceed own logic but are not usually preferred because the idea of such synchronous execution conflicts with the nature of actor model: concurrent computation.

Future.PipeTo()

While Future.Wait() and Future.Result() block until timeout or task completion, Future.PipeTo() asynchronously sends the result of computation to another actor. This can be a powerful tool when only the origination actor knows which actor should receive the result of a worker actor’s task; Actor A delegates a task to worker actor B but B does not know to what actor to pass the result message to. One important thing is that the message is transfered to the destination actor when and only when the comptation completes before it times out. Otherwise the response is sent to the dead letter mailbox.
Becausee this works in an asynchronous manner, origination actor can handle incoming messages right after dispatching tasks to worker actors no matter how long the worker actors take to respond.

package main
import (
"github.com/AsynkronIT/protoactor-go/actor"
"github.com/AsynkronIT/protoactor-go/router"
"log"
"os"
"os/signal"
"syscall"
"time"
)
type pong struct {
count uint
}
type ping struct {
count uint
}
type pingActor struct {
count uint
pongPid *actor.PID
}
func (p *pingActor) Receive(ctx actor.Context) {
switch msg := ctx.Message().(type) {
case struct{}:
p.count++
// Output becomes somewhat like below.
// See a diagram at https://raw.githubusercontent.com/oklahomer/protoactor-go-future-example/master/docs/pipe/timeline.png
// 2018/10/14 14:20:36 Received pong message &main.pong{count:1}
// 2018/10/14 14:20:39 Received pong message &main.pong{count:4}
// 2018/10/14 14:20:39 Received pong message &main.pong{count:3}
// 2018/10/14 05:20:39 [ACTOR] [DeadLetter] pid="nonhost/future$e" message=&{'\x02'} sender="nil"
// 2018/10/14 14:20:42 Received pong message &main.pong{count:7}
// 2018/10/14 14:20:42 Received pong message &main.pong{count:6}
// 2018/10/14 05:20:42 [ACTOR] [DeadLetter] pid="nonhost/future$h" message=&{'\x05'} sender="nil"
// 2018/10/14 14:20:45 Received pong message &main.pong{count:10}
// 2018/10/14 14:20:45 Received pong message &main.pong{count:9}
// 2018/10/14 05:20:45 [ACTOR] [DeadLetter] pid="nonhost/future$k" message=&{'\b'} sender="nil"
message := &ping{
count: p.count,
}
p.
pongPid.
RequestFuture(message, 2500*time.Millisecond).
PipeTo(ctx.Self())
case *pong:
log.Printf("Received pong message %#v", msg)
}
}
func main() {
pongProps := router.NewRoundRobinPool(10).
WithFunc(func(ctx actor.Context) {
switch msg := ctx.Message().(type) {
case *ping:
var sleep time.Duration
remainder := msg.count % 3
if remainder == 0 {
sleep = 1700 * time.Millisecond
} else if remainder == 1 {
sleep = 300 * time.Millisecond
} else {
sleep = 2900 * time.Millisecond
}
time.Sleep(sleep)
message := &pong{
count: msg.count,
}
ctx.Sender().Tell(message)
}
})
pongPid := actor.Spawn(pongProps)
pingProps := actor.FromProducer(func() actor.Actor {
return &pingActor{
count: 0,
pongPid: pongPid,
}
})
pingPid := actor.Spawn(pingProps)
finish := make(chan os.Signal, 1)
signal.Notify(finish, os.Interrupt)
signal.Notify(finish, syscall.SIGTERM)
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
pingPid.Tell(struct{}{})
case <-finish:
return
log.Print("Finish")
}
}
}
view raw pipeto.go hosted with ❤ by GitHub

Context.AwaitFuture()

The task execution is done in an asynchronous manner like Future.PipeTo but, because this can refer to contextual information, a callback function is still called even when the computation times out. Context.Message() contains the same message as when the origination actor’s Actor.Receive() is called so a developer does not have to add a workaround to copy the message to refer from the callback function.

package main
import (
"github.com/AsynkronIT/protoactor-go/actor"
"github.com/AsynkronIT/protoactor-go/router"
"log"
"os"
"os/signal"
"syscall"
"time"
)
type tick struct {
count int
}
type pong struct {
count int
}
type ping struct {
count int
}
type pingActor struct {
pongPid *actor.PID
}
func (p *pingActor) Receive(ctx actor.Context) {
switch msg := ctx.Message().(type) {
case *tick:
// Output becomes somewhat like below.
// See a diagram at https://raw.githubusercontent.com/oklahomer/protoactor-go-future-example/master/docs/await_future/timeline.png
//
// 2018/10/14 16:10:49 Received pong response: &main.pong{count:1}
// 2018/10/14 16:10:52 Received pong response: &main.pong{count:4}
// 2018/10/14 16:10:52 Failed to handle: 2. message: &main.tick{count:2}.
// 2018/10/14 16:10:53 Received pong response: &main.pong{count:3}
// 2018/10/14 07:10:53 [ACTOR] [DeadLetter] pid="nonhost/future$e" message=&{'\x02'} sender="nil"
// 2018/10/14 16:10:55 Received pong response: &main.pong{count:7}
// 2018/10/14 16:10:55 Failed to handle: 5. message: &main.tick{count:5}.
// 2018/10/14 16:10:56 Received pong response: &main.pong{count:6}
// 2018/10/14 07:10:56 [ACTOR] [DeadLetter] pid="nonhost/future$h" message=&{'\x05'} sender="nil"
// 2018/10/14 16:10:58 Received pong response: &main.pong{count:10}
// 2018/10/14 16:10:58 Failed to handle: 8. message: &main.tick{count:8}.
// 2018/10/14 16:10:59 Received pong response: &main.pong{count:9}
// 2018/10/14 07:10:59 [ACTOR] [DeadLetter] pid="nonhost/future$k" message=&{'\b'} sender="nil"
message := &ping{
count: msg.count,
}
future := p.pongPid.RequestFuture(message, 2500*time.Millisecond)
cnt := msg.count
ctx.AwaitFuture(future, func(res interface{}, err error) {
if err != nil {
// Context.Message() returns the exact message that was present on Context.AwaitFuture call.
// ref. https://github.com/AsynkronIT/protoactor-go/blob/3992780c0af683deb5ec3746f4ec5845139c6e42/actor/local_context.go#L289
log.Printf("Failed to handle: %d. message: %#v.", cnt, ctx.Message())
return
}
switch res.(type) {
case *pong:
log.Printf("Received pong response: %#v", res)
default:
log.Printf("Received unexpected response: %#v", res)
}
})
}
}
func main() {
pongProps := router.NewRoundRobinPool(10).
WithFunc(func(ctx actor.Context) {
switch msg := ctx.Message().(type) {
case *ping:
var sleep time.Duration
remainder := msg.count % 3
if remainder == 0 {
sleep = 1700 * time.Millisecond
} else if remainder == 1 {
sleep = 300 * time.Millisecond
} else {
sleep = 2900 * time.Millisecond
}
time.Sleep(sleep)
message := &pong{
count: msg.count,
}
ctx.Sender().Tell(message)
}
})
pongPid := actor.Spawn(pongProps)
pingProps := actor.FromProducer(func() actor.Actor {
return &pingActor{
pongPid: pongPid,
}
})
pingPid := actor.Spawn(pingProps)
finish := make(chan os.Signal, 1)
signal.Notify(finish, os.Interrupt)
signal.Notify(finish, syscall.SIGTERM)
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
count := 0
for {
select {
case <-ticker.C:
count++
pingPid.Tell(&tick{count: count})
case <-finish:
return
log.Print("Finish")
}
}
}
view raw await_future.go hosted with ❤ by GitHub

Conclusion

As described in above sections, Future provides various methods to synchronize concurrent execution. While concurrent execution is the core of actor model, these come in handy to synchronize concurrent execution with minimal cost.

Sep 24, 2018

[Golang] protoactor-go 101: How actors communicate with each other

Designing actor-based program is all about dividing tasks into smaller pieces. Fine-grained actors concentrate on their tasks, collaborate with other actors and accomplish a big task as a whole. Hence mastering actors’ communication mechanism and modeling well-defined messages are always the keys to designing an actor system. This article describes protoactor-go’s actor categories, their messaging methods and how those methods differ on referencing sender actors.

See my previous article, [Golang] protoactor-go 101: Introduction to golang’s actor model implementation, for protoactor-go’s basic concepts and terms.

TL;DR

While there are several kinds of actors, those actors share a unified interface to communicate with each other. Various methods are provided for their communication, but always use Request() to acknowledge the recipient actor who the sender actor is. When that is not an option, include the sender actor’s actor.PID in the sending message.

Example codes

Example codes that cover all communication means for all actor implementations are located at github.com/oklahomer/protoactor-go-sender-example. Minimal examples are introduced in this article for description, but visit this repository for comprehensive examples.

Premise: Three major kinds of actors

protoactor-go comes with three kinds of actors: local, remote and cluster grain.

  • Local … Those actors located in the same process.
  • Remote … Actors located in different processes or servers. An actor is considered to be “local” when addressed from within the same process; while this is “remote” when addressed across a network. Because a message is sent over a network, message serialization is required. Protocol Buffers is used for this task in protoactor-go.
  • Cluster grain … A kind of remote actor but the lifecycle and other complexity are taken care of by protoactor-go library. Cluster topology is managed by consul and a grain can be addressed over a network. Consul manages the cluster membership and the availability of each node.

Thanks to the location transparency, an actor can communicate with other actors in the same way without worrying about where the recipient actors are located at. In addition to those basic communication means, a cluster grain has an extra mechanism to provide RPC based interface.

Each actor is encapsulated in an actor.PID instance so developers communicate with actors via methods provided by this actor.PID. (actor.Context also provides equivalent methods, but these can be considered as wrappers for actor.PID’s corresponding methods.) One important thing to remember is that above actors are not the only entities encapsulated in actor.PIDs. As a matter of fact, any actor.Process implementation including mailbox, Future mechanism and others are also encapsulated in actor.PIDs. This may be familiar to those with Erlang background. Understanding this becomes vital when one tries referring to message sender actor. The rest of this article is going to describe each messaging method and how a recipient actor can refer to the sending actor.

Communication methods

Below are the common communication methods – Tell(), Request() and RequestFuture() – and RPC based method for cluster grain. Examples in this article all demonstrate local actor messaging because local and remote actors share a common messaging interface. Visit my example repository to cover all messaging implementations of local, remote and cluster grain.

Tell() tells nothing about the sender 

To send a message to an actor, one may call actor.PID’s Tell() method. When a message is sent from outside of an actor system by calling PID.Tell(), the recipient actor fails to refer to the sending actor with Context.Sender(). This is pretty obvious. Because the message is sent from outside, there is no such thing as sending actor. Below is an example:

package main
import (
"github.com/AsynkronIT/protoactor-go/actor"
"time"
)
type ping struct{}
type pong struct{}
func main() {
props := actor.FromFunc(func(ctx actor.Context) {
switch ctx.Message().(type) {
case *ping:
// This fails to get sender
// because the message came
// from outside of actor system
//
// Below execution leads to dead letter
// 2018/09/14 22:40:02 [ACTOR] [DeadLetter] pid="nil" message=&{} sender="nil"
ctx.Respond(&pong{})
// Below execution causes a panic since Sender() returns nil.
// Actor crashes and that causes supervisor to restart this failing actor.
// 2018/09/14 22:40:02 [MAILBOX] [ACTOR] Recovering actor="nonhost/$1" reason="runtime error: invalid memory address or nil pointer dereference" stacktrace="github.com/AsynkronIT/protoactor-go/actor.(*PID).ref:26"
// 2018/09/14 22:40:02 [ACTOR] [SUPERVISION] actor="nonhost/$1" directive="RestartDirective" reason="runtime error: invalid memory address or nil pointer dereference"
ctx.Sender().Tell(&pong{})
}
})
pid := actor.Spawn(props)
pid.Tell(&ping{})
time.Sleep(1 * time.Second) // Just to make sure system ends after actor execution
}
view raw no-sender.go hosted with ❤ by GitHub

In the above example, a message is directly sent to an actor from outside of an actor system. Therefore the recipient actor fails to refer to the sending actor. With Akka, this behavior is similar to set ActorRef#noSender as the second argument of ActorRef#tell – when the recipient tries to respond, the message goes to the dead letter mailbox.

When a message is sent from one actor to another, there indeed is a sender-recipient relationship. Recipient actor’s contextual information, actor.Context, appears to provide such information for us. Below is an example code that tries to refer to the sender actor with actor.Context:

package main
import (
"github.com/AsynkronIT/protoactor-go/actor"
"log"
"time"
)
type pong struct {
}
type ping struct {
}
type pingActor struct {
pongPid *actor.PID
}
func (p *pingActor) Receive(ctx actor.Context) {
switch ctx.Message().(type) {
case struct{}:
// Below does not set ctx.Self() as sender,
// and hence the recipient has no knowledge of the sender
// even though the message is sent from another actor via actor.Context.
//
ctx.Tell(p.pongPid, &ping{})
case *pong:
log.Print("Received pong message")
}
}
func main() {
pongProps := actor.FromFunc(func(ctx actor.Context) {
switch ctx.Message().(type) {
case *ping:
log.Print("Received ping message")
// 2018/09/15 02:01:27 [ACTOR] [DeadLetter] pid="nil" message=&{} sender="nil"
ctx.Respond(&pong{})
// 2018/09/15 02:01:27 [MAILBOX] [ACTOR] Recovering actor="nonhost/$1" reason="runtime error: invalid memory address or nil pointer dereference" stacktrace="github.com/AsynkronIT/protoactor-go/actor.(*PID).ref:26"
// 2018/09/15 02:01:27 [ACTOR] [SUPERVISION] actor="nonhost/$1" directive="RestartDirective" reason="runtime error: invalid memory address or nil pointer dereference"
ctx.Sender().Tell(&pong{})
default:
}
})
pongPid := actor.Spawn(pongProps)
pingProps := actor.FromProducer(func() actor.Actor {
return &pingActor{
pongPid: pongPid,
}
})
pingPid := actor.Spawn(pingProps)
pingPid.Tell(struct{}{})
time.Sleep(1 * time.Second) // Just to make sure system ends after actor execution
}

However, the recipient fails to refer to the sender actor in the same way it failed in the previous example. This may seem odd, but let us take a look at actor.Context’s implementation. A call to Context.Tell() is proxied to Context.sendUserMessage(), where the message is stuffed into actor.MessageEnvelope with nil Sender field as below:

func (ctx *localContext) Tell(pid *PID, message interface{}) {
 ctx.sendUserMessage(pid, message)
}

func (ctx *localContext) sendUserMessage(pid *PID, message interface{}) {
 if ctx.outboundMiddleware != nil {
  if env, ok := message.(*MessageEnvelope); ok {
   ctx.outboundMiddleware(ctx, pid, env)
  } else {
   ctx.outboundMiddleware(ctx, pid, &MessageEnvelope{
    Header:  nil,
    Message: message,
    Sender:  nil,
   })
  }
 } else {
  pid.ref().SendUserMessage(pid, message)
 }
}

That is why a recipient cannot refer to the sender even though the messaging occurs between two actors and such contextual information seems to be available. The above code fragment suggests that passing actor.MessageEnvelope with pre-filled Sender field should tell the sending actor to the recipient. This actually works because all actor.MessageEnvelope’s fields are public and accessible, but this is a cumbersome job. There should be a way to do that.

Request() lets a recipient request for the sender reference

A second messaging method is Request(). This lets developers set who the sender actor is, and the recipient actor can reply to the sender actor by calling Context.Respond() or by calling Context.Sender().Tell(). Below is the method signature.

// Request sends a messages asynchronously to the PID. The actor may send a response back via respondTo, which is
// available to the receiving actor via Context.Sender
func (pid *PID) Request(message interface{}, respondTo *PID) {
 env := &MessageEnvelope{
  Message: message,
  Header:  nil,
  Sender:  respondTo,
 }
 pid.ref().SendUserMessage(pid, env)
}

Above signature may look more like Akka’s ActorRef#tell than Tell() in a way that a developer can set a sender actor, more precisely a sending actor.PID in this case, as a second argument. An actor.PID and an actor.Context both have Request() method and they behave equivalently as described in the below example:

sender-respond.go · GitHub

This not only works for request-response model, but also works to propagate the sending actor identity to subsequent actor calls.

RequestFuture() only has its future

The last method is ReqeustFuture(). This can be used as an extension of Request() where an actor.Future is returned to the requester. However, its behavior differs slightly but significantly when the recipient actor tries referring to the sender with Context.Sender() and treating this as a reference to the sender actor. Below is a simple example that demonstrates a regular request-response model:

future.go · GitHub

Now the below example demonstrates how Request() and RequestFuture() behave differently when Context.Sender() or Context.Respond() is called to refer to the sender actor’s actor.PID. The code structure is almost the same as the previous example besides that below tries to send back multiple messages to the sender actor.

package main
import (
"github.com/AsynkronIT/protoactor-go/actor"
"log"
"time"
)
type pong struct {
}
type ping struct {
}
type pingActor struct {
pongPid *actor.PID
}
func (p *pingActor) Receive(ctx actor.Context) {
switch ctx.Message().(type) {
case struct{}:
// Below both work.
//
//future := p.pongPid.RequestFuture(&ping{}, time.Second)
future := ctx.RequestFuture(p.pongPid, &ping{}, time.Second)
result, err := future.Result()
if err != nil {
log.Print(err.Error())
return
}
log.Printf("Received %#v", result)
case *pong:
// Never comes here.
// When the pong actor responds to the sender,
// the sender is not a ping actor but a future process.
log.Print("Received pong message")
}
}
func main() {
pongProps := actor.FromFunc(func(ctx actor.Context) {
switch ctx.Message().(type) {
case *ping:
log.Print("Received ping message")
// Below both work in this example, but their behavior slightly differ.
// ctx.Sender().Tell() panics and recovers if the sender is nil;
// while ctx.Respond() checks the presence of sender and redirects the message to dead letter process
// when sender is absent.
//
//ctx.Sender().Tell(&pong{})
ctx.Respond(&pong{})
// Take a look at the id field.
// 2018/09/23 10:58:53 &actor.PID{Address:"nonhost", Id:"future$3", p:(*actor.Process)(0xc4200ea010)}
log.Printf("%#v", ctx.Sender())
// Below all fail because the sender PID does not represents the sender actor,
// but the sending Future process and the Future process ends when the first payload is returned.
ctx.Sender().Tell(&pong{})
ctx.Respond(&pong{})
ctx.Sender().Tell(&pong{})
ctx.Respond(&pong{})
ctx.Sender().Tell(&pong{})
ctx.Respond(&pong{})
ctx.Sender().Tell(&pong{})
ctx.Respond(&pong{})
default:
}
})
pongPid := actor.Spawn(pongProps)
pingProps := actor.FromProducer(func() actor.Actor {
return &pingActor{
pongPid: pongPid,
}
})
pingPid := actor.Spawn(pingProps)
pingPid.Tell(struct{}{})
time.Sleep(1 * time.Second) // Just to make sure system ends after actor execution
}

Remember, as briefly introduced in the “Premise” section, an actor.PID not only encapsulates an actor.Actor instance but also encapsulates any actor.Process implementation. The concept of “process” and its representation, PID, are quite similar to those of Erlang in this way. With that said, let us take a closer look at how the above example behaves under the hood. First, two processes for actor PIDs are explicitly created by the developer: pingPid and pongPid. When pingPid sends a message to pongPid, another process is implicitly created by protoactor-go: that of actor.Future. And this actor.Future process is set as the sender PID when communication takes place.

func (ctx *localContext) RequestFuture(pid *PID, message interface{}, timeout time.Duration) *Future {
 future := NewFuture(timeout)
 env := &MessageEnvelope{
  Header:  nil,
  Message: message,
  Sender:  future.PID(),
 }
 ctx.sendUserMessage(pid, env)

 return future
}

When the recipient actor’s process, pongPid, receives the message and respond to the sender, the “sender” is not actually pingPid but the actor.Future’s process. After one message is sent back to pingPid, the actor.Future process ends and therefore the subsequent calls to Context.Respond() or Context.Sender() from pongPid fail to refer to the sender. So when the passing of sender actor’s PID is vital for the recipient’s task execution, use Request() or include the sender actor’s actor.PID in the sending message so the recipient can refer to the sender actor for sure.

Cluster grain’s unique RPC based messaging

Actors can communicate with Cluster grains just like communicating with remote actors. In fact, protoactor-go’s cluster mechanism is implemented on top of actor.remote implementation. However, this cluster mechanism adopts the idea of Microsoft Orleans where the actor lifecycle and other major tasks are managed by the actor framework to ease the developer’s work. This effort includes the introduction of handy RPC based communication protocol. Communication with cluster grains still use Protocol Buffers for serialization and deserialization, but this goes a bit further by providing a wrapper for gRPC service calls.

By using gograin protoc plugin, a code is generated for gRPC services. This code provides an actor.Actor implementation where Receive() receives a message from another actor, deserializes it and calls a corresponding method depending on the incoming message type. Developers only have to implement a method for each gRPC service. The returning value of the implemented method is returned to the sender actor.  One thing to notice is that this remote gRPC call is implemented with RequestFuture() under the hood. So when the method tries referring to the sender by Context.Sender(), the returned actor.PID is not a representation of the sender actor but an actor.Future. The example contains a relatively large amount of code so visit my example repository for details. Directory layout is as below:

Conclusion

While there are several kinds of actors, those actors have unified ways to communicate with other actors no matter where they are located at. However, because an actor.PID is not only a representation of an actor process but also a representation of any actor.Process implementation, extra work may be required for a recipient actor to refer to the sender actor since the returning actor.PID of Context.Sender() is not necessarily a sender actor’s representation. To ensure that the recipient actor can refer to the sender actor, include the sender actor’s PID in the sending message or use Request(). Visit github.com/oklahomer/protoactor-go-sender-example for more comprehensive examples.

Jul 22, 2018

[Golang] protoactor-go 101: Introduction to golang's actor model implementation

A year has passed since I officially launched go-sarah. While this bot framework had been a great help with my ChatOps, I found myself becoming more and more interested in designing a chat system as a whole. Not just a text-based communication tool or its varied extension; but as a customizable event aggregation system that provides and consumes any conceivable event varied from virtual to real-life. In the course of its server-side design, Golang’s actor model implementation, protoactor-go, seemed like a good option. However, protoactor-go is still in its Beta phase and has less documentation at this point in time. This article describes what I have learned about this product. The basic of actor model is not going to be covered, but for those who are interested, my previous post “Yet another Akka introduction for dummies“ might be a help.

Unless otherwise noted, this introduction is based on the latest version as of 2018-07-21.

Terms, Concepts, and Common Types

Message

With the nature of the actor model, a message plays an important part to let actors interact with others. Messages internally fall into two categories:

  • User message … Messages defined by developers for actor interaction.
  • System message … Messages defined by protoactor-go for internal use that mainly handles the actor lifecycle.

PID

actor.PID is a container that combines a unique identifier, the address and a reference to actor.Process altogether. Since this provides interfaces for others to interact with the underlying actor, this can be seen as an actor reference if one is familiar with Akka. Or simply a Pid if familiar with Erlang. However, this is very important to remember that an actor process is not the only entity that a PID encapsulates.

Process

actor.Process defines a common interface that all interacting “process” must implement. In this project, the concepts of process and PID are quite similar to those of Erlang. Understanding that PID is not necessarily a representation of an actor process is vital when referring to actor messaging context. This distinction and its importance are described in the follow-up article, [Golang] protoactor-go 101: How actors communicate with each other. Its implementation varies depending on each role as below:

Router

router.process receives a message and broadcasts it to all subordinating actors: “routees.”

Local process

actor.localProcess has a reference to a mailbox. On message reception,  this enqueues the message to its mailbox so the actor can receive this for further procedure.

Remote process

On contrary to a local process, this represents an actor that exists in a remote environment. On message reception, this serializes the message and sends it to the destination host.

Guardian process

When a developer passes a “guardian”’s supervisor strategy for actor constructor, a parent actor is created with this supervisor strategy along with the actor itself. This parent “guardian” actor will take care of the child actor’s uncontrollable state. This should be effective when the constructing actor is the “root actor” – an actor without a parent actor – but customized supervision is still required. When multiple actor constructions contain the same settings for guardian supervision, only one guardian actor is created and this becomes the parent of all actors with the same settings.

Future process

actor.futureProcess provides some dedicated features for Future related tasks.

Dead letter process

actor.deadLetterProcess provides features to handle “dead letters.” A dead letter is a message that failed to reach target because, for example, the target actor did not exist or was already stopped. This dead letter process publishes actor.DeadLetterEvent to the event stream, so a developer can detect the dead letter by subscribing to the event via eventstream.Subscribe().

Mailbox

This works as a queue to receive incoming messages, store them temporarily and pass them to its coupled actor when the actor is ready for message execution. The actor is to receive the message one at a time, execute its task and alter its state if necessary. Mailbox implements mailbox.Inbound interface.

  • Default mailbox … mailbox.defaultMailbox not only receives incoming messages as a mailbox.Inbound implementation, but also coordinates the actor invocation schedule with its mailbox.Dispatcher implementation entity. This mailbox also contains mailbox.MessageInvoker implementation as its entity and its methods are called by mailbox.Dispatcher for actor invocation purpose. actor.localContext implements mailbox.MessageInvoker.

Context

This is equivalent to Akka’s ActorCoontext. This contains contextual information and contextual methods for the underlying actor such as below:

  • References to watching actors and methods to watch/unwatch other actors
  • A reference to the actor who sent the currently processing message and a method to access to this
  • Methods to pass a message to another actor
  • etc…

Middleware

Zero or more pre-registered procedures can be executed around actor invocation, which enables an AOP-like approach to modify behavior.

  • Inbound middleware … actor.InboundMiddleware is a middleware that is executed on message reception. A developer may register one or more middleware via Props.WithMiddleware().
  • Outbound middleware … actor.OutboundMiddleware is a middleware that is executed on message sending. A developer may register one or more middleware via Props.WithOutboundMiddleware().

Router

A sub-package, router, provides a series of mechanism that routes a given message to one or more of its routees.

  • Broadcast router … Broadcast given message to all of its routee actors.
  • Round robin router … Send given message to one of its routee actors chosen by round-robin manner
  • Random router … Send given message to a randomly chosen routee actor.

Event Stream

eventstream.EventStream is a mechanism to publish and subscribe given event where the event is an empty interface, interface{}. So the developer can technically publish and subscribe to any desired event. By default an instance of eventstream.EventStream is cached in package local manner and is used to publish and subscribe events such as dead letter messages.

Actor Construction

To construct a new actor and acquire a reference to this, a developer can feed an actor.Props to actor.Spawn or actor.SpawnNamed. The struct called actor.Props is a set of configuration for actor construction. actor.Props can be initialized with helper functions listed below:

  • actor.FromProducer() … Pass a function that returns an actor.Actor implementation. This returns a pointer to actor.Props, which contains a set of configurations for actor construction.
  • actor.FromFunc() … Pass a function that satisfies actor.ActorFunc type, which receives exactly the same arguments as Actor.Recieve(). This is a handy wrapper of actor.FromProducer.
  • actor.FromSpawnFunc() … Pass a function that satisfies actor.SpawnFunc type. on actor construction, this function is called with a series of arguments containing id, actor.Props and parent PID to construct a new actor. When this function is not set, actor.DefaultSpawner is used.
  • actor.FromInstance() … Deprecated.

Additional configuration can be added via its setter methods with “With” prefix. See example code.

Spawner – Construct actor and initiate its lifecycle

A developer feeds a prepared actor.Props to actor.Spawn() or actor.SpawnNamed() depending on the requirement to initialize an actor, its context, and its mailbox. In any construction flow, Props.spawn() is called. To alter this spawning behavior, an alternative function can be set with actor.FromSpawnFunc() or Props.WithSpawnFunc() to override the default behavior. When none is set, actor.DefaultSpawner is used by default. Its behavior is as below:

  • The default spawner creates an instance of actor.localProcess, which is an actor.Process implementation.
  • Add the instance to actor.ProcessRegistry.
    • The registry returns an error if given id is already registered.
  • Create new actor.localContext which is an actor.Context implementation. This stores all contextual data.
  • Mailbox is created for the context. To modify the behavior of mailbox, use Props.WithDispatcher() and Props.WithMailbox().
  • Created mailbox is stored in the actor.localProcess instance.
  • The pointer to the process is set to actor.PID’s field.
  • actor.localContext also has a reference to the actor.PID as “self.”
  • Start mailbox
  • Enqueue mailbox a startedMessage as a system message which is an instance of actor.Started.

When construction is done and the actor lifecycle is successfully started, actor.PID for the new actor is returned.

Child Actor construction

With the introduced actor construction procedure, a developer can create any “root actor,” an actor with no parent. To achieve a hierarchized actor system, use actor.Context’s Spawn() or SpawnNamed() method. Those methods work similarly to actor.Spawn() and actor.SpawnNamed(), but the single and biggest difference is that they create a parent-child relationship between the spawning actor and the newly created actor. They work as below:

  1. Check if Props.guardianStrategy is set
    • If set, it panics. Because the calling actor is going to be the parent and be obligated to be a supervisor, there is no need to set one. This strategy is to create a parent actor for customized supervision as introduced in the first section.
  2. Call Props.spawn()
    • The ID has a form of {parent-id}/{child-id}
    • Own PID is set as a parent for the new actor
  3. Add created actors actor.PID to its children
  4. Start watching the created actor.PID to subscribe its lifecycle event

See example code.

Supervisor Strategy

This is a parent actor’s responsibility to take care of its child actor’s exceptional state. When a child actor can no longer control its state, based on the “let-it-crash” philosophy, child actor notifies such situation to parent actor by panic(). The parent actor receives such notification with recover() and decides how to treat such failing actor. This decision is made by a customizable actor.SupervisorStrategy. When no strategy is explicitly set by a developer, actor.defaultSupervisorStrategy is set on actor construction.

The supervision flow is as follows:

  1. A mailbox passes a message to Actor.Recieve() via target actor context’s localContext.InvokeUserMessage().
  2. In Actor.Receive(), the actor calls panic().
  3. Caller mailbox catches such uncontrollable state with recover().
  4. The mailbox calls localContext.EscalateFailure(), where localContext is that of the failing actor.
    1. In localContext.EscalateFailure(), this tells itself to suspend any incoming message till recovery is done.
    2. Create actor.Failure instance that holds failing reason and other statistical information, where “reason” is the argument passed to panic().
    3. Judges if the failing actor has any parent
      • If none is found, the failing actor is the “root actor” so the actor.Failure is passed to actor.handleRootFactor().
      • If found, this passes actor.Failure to parent’s PID.sendSystemMessage() to notify failing state
        1. The message is enqueued to parent actor’s mailbox
        2. Parent’s mailbox calls its localContext.InvokeSystemMessage.
        3. actor.Failure is passed to localContext.handleFailure
        4. If its actor.Actor entity itself implements actor.SupervisorStrategy, its HandleFailure() is called.
        5. If not, its supervisor entity’s handleFailure() is called.
        6. In HandleFailure(), decide recovery policy and call localContext.(ResumeChildren|RestartChildren|StopChildren|EscalateFailure).

See example code.

Upcoming Interface Change

A huge interface change is expected according to the issue “Design / API Changes upcoming.”

Further Readings

See below articles for more information:

Aug 19, 2017

[Golang] Introducing go-sarah: simple yet highly customizable bot framework

As mentioned in the latest blog post, I created a new bot framework: go-sarah. This article introduces its notable features and overall architecture along with some sample codes. Upcoming articles should focus on details about each specific aspect.

Notable features

User's Conversational Context

In this project, user's conversational context is referred to as "user context," which stores previous user states and defines what function should be executed on following input. While typical bot implementation is somewhat "stateless" and hence user-and-bot interaction does not consider previous state, Sarah natively supports the idea of this conversational context. Its aim is to let user provide information as they send messages, and finally build up complex arguments to be passed.

For example, instead of obligating user to input long confusing text such as ".todo Fix Sarah's issue #123 by 2017-04-15 12:00:00" at once, let user build up arguments in a conversational manner as below image:


Live Configuration Update

When configuration file for a command is updated, Sarah automatically detects the event and re-builds the command or scheduled task in thread-safe manner so the next execution of that command/task appropriately reflects the new configuration values.

See the usage of CommandPropsBuilder and ScheduledTaskPropsBuilder for detail.

Concurrent Execution by Default

Developers may implement their own bot by a) implementing sarah.Bot interface or b) implementing sarah.Adapter and pass it to sarah.NewBot() to get instance of default Bot implementation.

Either way, a component called sarah.Runner takes care of Commmand execution against given user input. This sarah.Runner dispatches tasks to its internal workers, which means developers do not have to make extra effort to handle flooding incoming messages.

Alerting Mechanism

When a bot confronts critical situation and can not continue its operation or recover, Sarah's alerting mechanism sends alert to administrator. Zero or more sarah.Alerter implementations can be registered to send alert to desired destinations.

Higher Customizability

To have higher customizability, Sarah is composed of fine grained components that each has one domain to serve; sarah.Alerter is responsible for sending bot's critical state to administrator, workers.Worker is responsible for executing given job in a panic-proof manner, etc... Each component comes with an interface and default implementation, so developers may change Sarah's behavior by implementing corresponding component's interface and replacing default implementation.

Overall Architecture

Below illustrates some major components.


Runner

Runner is the core of Sarah; It manages other components' lifecycles, handles concurrent job execution with internal workers, watches configuration file changes, re-configures commands/tasks on file changes, executes scheduled tasks, and most importantly makes Sarah comes alive.

Runner may take multiple Bot implementations to run multiple Bots in single process, so resources such as workers and memory space can be shared.

Bot / Adapter

Bot interface is responsible for actual interaction with chat services such as Slack, LINE, gitter, etc...

Bot receives messages from chat services, sees if the sending user is in the middle of user context, searches for corresponding Command, executes Command, and sends response back to chat service.

Important thing to be aware of is that, once Bot receives message from chat service, it sends the input to Runner via a designated channel. Runner then dispatches a job to internal worker, which calls Bot.Respond and sends response via Bot.SendMessage. In other words, after sending input via the channel, things are done in concurrent manner without any additional work. Change worker configuration to throttle the number of concurrent execution -- this may also impact the number of concurrent HTTP requests against chat service provider.

DefaultBot

Technically Bot is just an interface. So, if desired, developers can create their own Bot implementations to interact with preferred chat services. However most Bots have similar functionalities, and it is truly cumbersome to implement one for every chat service of choice.

Therefore defaultBot is already predefined. This can be initialized via sarah.NewBot.

Adapter

sarah.NewBot takes multiple arguments: Adapter implementation and arbitrary number ofsarah.DefaultBotOptions as functional options. This Adapter thing becomes a bridge between defaultBot and chat service. DefaultBot takes care of finding corresponding Command against given input, handling stored user context, and other miscellaneous tasks; Adapter takes care of connecting/requesting to and messaging with chat service.

package main

import (
        "github.com/oklahomer/go-sarah"
        "github.com/oklahomer/go-sarah/slack"
        "gopkg.in/yaml.v2"
        "io/ioutil"
)

func main() {
        // Setup slack bot.
        // Any Bot implementation can be fed to Runner.RegisterBot(), but for convenience slack and gitter adapters are predefined.
        // sarah.NewBot takes adapter and returns defaultBot instance, which satisfies Bot interface.
        configBuf, _ := ioutil.ReadFile("/path/to/adapter/config.yaml")
        slackConfig := slack.NewConfig() // config struct is returned with default settings.
        yaml.Unmarshal(configBuf, slackConfig)
        slackAdapter, _ := slack.NewAdapter(slackConfig)
        sarah.NewBot(slackAdapter)
}

Command

Command interface represents a plugin that receives user input and return response. Command.Match is called against user input in Bot.Respond. If it returns true, then the command is considered "corresponds to user input," and hence its Execute method is called.

Any struct that satisfies Command interface can be fed to Bot.AppendCommand as a command. CommandPropsBuilder is provided to easily implement Command interface on the fly:

Simple Command

There are several ways to setup Commands:
  • Define a struct that implements Command interface. Pass its instance to Bot.ApendCommand.
  • Use CommandPropsBuilder to construct a non-contradicting set of arguments, and pass this to Runner.Runner internally builds a command, and re-built it when configuration struct is present and corresponding configuration file is updated.
Below are several ways to setup CommandProps with CommandPropsBuilder for different customization.
// In separate plugin file such as echo/command.go
// Export some pre-build command props
package echo

import (
 "github.com/oklahomer/go-sarah"
 "github.com/oklahomer/go-sarah/slack"
 "golang.org/x/net/context"
 "regexp"
)

// CommandProps is a set of configuration options that can be and should be treated as one in logical perspective.
// This can be fed to Runner to build Command on the fly.
// CommandProps is re-used when command is re-built due to configuration file update.
var matchPattern = regexp.MustCompile(`^\.echo`)
var SlackProps = sarah.NewCommandPropsBuilder().
        BotType(slack.SLACK).
        Identifier("echo").
        MatchPattern(matchPattern).
        Func(func(_ context.Context, input sarah.Input) (*sarah.CommandResponse, error) {
                // ".echo foo" to "foo"
                return slack.NewStringResponse(sarah.StripMessage(matchPattern, input.Message())), nil
        }).
        InputExample(".echo knock knock").
        MustBuild()

// To have complex checking logic, MatchFunc can be used instead of MatchPattern.
var CustomizedProps = sarah.NewCommandPropsBuilder().
        MatchFunc(func(input sarah.Input) bool {
                // Check against input.Message(), input.SenderKey(), and input.SentAt()
                // to see if particular user is sending particular message in particular time range
                return false
        }).
        // Call some other setter methods to do the rest.
        MustBuild()

// Configurable is a helper function that returns CommandProps built with given CommandConfig.
// CommandConfig can be first configured manually or from YAML/JSON file, and then fed to this function.
// Returned CommandProps can be fed to Runner and when configuration file is updated,
// Runner detects the change and re-build the Command with updated configuration struct.
func Configurable(config sarah.CommandConfig) *sarah.CommandProps {
        return sarah.NewCommandPropsBuilder().
                ConfigurableFunc(config, func(_ context.Context, input sarah.Input, conf sarah.CommandConfig) (*sarah.CommandResponse, error) {
                        return nil, nil
                }).
                // Call some other setter methods to do the rest.
                MustBuild()
}

Reconfigurable Command

With CommandPropsBuilder.ConfigurableFunc, a desired configuration struct may be added. This configuration struct is passed on command execution as 3rd argument. Runner is watching the changes on configuration files' directory and if configuration file is updated, then the corresponding command is built, again.

To let Runner supervise file change event, set sarah.Config.PluginConfigRoot. Internal directory watcher supervises sarah.Config.PluginConfigRoot + "/" + BotType + "/" as Bot's configuration directory. When any file under that directory is updated, Runner searches for corresponding CommandProps based on the assumption that the file name is equivalent to CommandProps.identifier + ".(yaml|yml|json)". If a corresponding CommandProps exists, Runner rebuild Command with latest configuration values and replaces with the old one.

Scheduled Task

While commands are set of functions that respond to user input, scheduled tasks are those that run in scheduled manner. e.g. Say "Good morning, sir!" every 7:00 a.m., search on database and send "today's chores list" to each specific room, etc...

ScheduledTask implementation can be fed to Runner.RegisterScheduledTask. When Runner.Run is called, clock starts to tick and scheduled task becomes active; Tasks will be executed as scheduled, and results are sent to chat service via Bot.SendMessage.

Simple Scheduled Task

Technically any struct that satisfies ScheduledTask interface can be treated as scheduled task, but a builder is provided to construct a ScheduledTask on the fly.
package foo

import (
 "github.com/oklahomer/go-sarah"
 "github.com/oklahomer/go-sarah/slack"
 "github.com/oklahomer/golack/slackobject"
 "golang.org/x/net/context"
)

// TaskProps is a set of configuration options that can be and should be treated as one in logical perspective.
// This can be fed to Runner to build ScheduledTask on the fly.
// ScheduledTaskProps is re-used when command is re-built due to configuration file update.
var TaskProps = sarah.NewScheduledTaskPropsBuilder().
        BotType(slack.SLACK).
        Identifier("greeting").
        Func(func(_ context.Context) ([]*sarah.ScheduledTaskResult, error) {
                return []*sarah.ScheduledTaskResult{
                        {
                                Content:     "Howdy!!",
                                Destination: slackobject.ChannelID("XXX"),
                        },
                }, nil
        }).
        Schedule("@everyday").
        MustBuild()

Reconfigurable Scheduled Task

With ScheduledTaskPropsBuilder.ConfigurableFunc, a desired configuration struct may be added. This configuration struct is passed on task execution as 2nd argument. Runner is watching the changes on configuration files' directory and if configuration file is updated, then the corresponding task is built/scheduled, again.

To let Runner supervise file change event, set sarah.Config.PluginConfigRoot. Internal directory watcher supervises sarah.Config.PluginConfigRoot + "/" + BotType + "/" as Bot's configuration directory. When any file under that directory is updated, Runner searches for corresponding ScheduledTaskProps based on the assumption that the file name is equivalent to ScheduledTaskProps.identifier + ".(yaml|yml|json)". If a corresponding ScheduledTaskProps exists, Runner rebuild ScheduledTask with latest configuration values and replaces with the old one.

UserContextStorage

As described in "Notable Features," Sarah stores user's current state when Command's response expects user to send series of messages with extra supplemental information. UserContextStorage is where the state is stored. Developers may store state into desired storage by implementing UserContextStorage interface. Two implementations are currently provided by author:

Store in Process Memory Space

defaultUserContextStorage is a UserContextStorage implementation that stores ContextualFunc, a function to be executed on next user input, in the exact same memory space that process is currently running. Under the hood this storage is simply a map where key is user identifier and value is ContextualFunc. This ContextFunc can be any function including instance method and anonymous function that satisfies ContextFunc type. However it is recommended to use anonymous function since some variable declared on last method call can be casually referenced in this scope.

Store in External KVS

go-sarah-rediscontext stores combination of function identifier and serializable arguments in Redis. This is extremely effective when multiple Bot processes run and user context must be shared among them.
e.g. Chat platform such as LINE sends HTTP requests to Bot on every user input, where Bot may consist of multiple servers/processes to balance those requests.

Alerter

When registered Bot encounters critical situation and requires administrator's direct attention, Runner sends alert message as configured with Alerter. LINE alerter is provided by default, but anything that satisfies Alerter interface can be registered as Alerter. Developer may add multiple Alerter implementations via Runner.RegisterAlerter so it is recommended to register multiple Alerters to avoid Alerting channel's malfunction and make sure administrator notices critical state.

Bot/Adapter may send BotNonContinurableError via error channel to notify critical state to Runner. e.g. Adapter can not connect to chat service provider after reasonable number of retrials.

Getting Started

That is pretty much everything developers should know before getting started. To see working example code, visit https://github.com/oklahomer/go-sarah/tree/master/examples. Fore more details, make sure to follow upcoming blog posts.

P.S. Stars on go-sarah project are always welcom :)