It's been a busy week this week, so further development on the league manager application has been slow.

That said, it felt like time to add the groundings for inter-service communication.

It's event bus time.

Inter-Service Communication

In a microservice-based architecture, there is always going to be some element of communication between your separate services.

What was once a simple method call within the same library, now crosses processes, networks and sometimes even geographical locations.

Microservices are, in my opinion, the best way to write software in 2020. That said, getting these services to communicate is a pain in the arse.

There are two types of communication to consider:

1. Synchronous (request/response)

A synchronous communication is used when the calling service needs a response right away.

In our football league management application, let's consider the transfer service is going to move Player A from one team to another. For that to happen, index data about the two teams need to be pulled from the team service.

The transfer service could make two separate HTTP calls to https://team-service/team/{id} to return the data and then parse the response.

That's all well and good, but that's extremely chatty and also relies on https://team-service resolving to the correct location.

An alternative option (and the one I prefer, albeit slightly more complicated) is for the transfer service to hold a cache of the required team data it needs.

On startup, it runs one HTTP request to return ALL of the current HTTP data. Whenever a new team is created thereafter, the transfer service can simply listen out for an 'info.newteamcreated' event.

Whenever team data is required, the transfer service can first query its local cache (in mem, Redis) and failing that then make the outbound request to the team-service.

This is a concept called eventual consistency.

Eventual consistency can be described as each replica of the data receiving the entire dataset eventually!

Dropbox and similar storage technologies are a good example of eventual consistency. You edit a file locally on your PC, eventually, both the dropbox servers and your other connected smartphone will get that same update. However, if you hit save and instantly check your phone, the update may not be there right away.

This keeps the requirement for an instant response in the transfer service intact, whilst also further decoupling our services. Win!

It also leads quite nicely to our second type of communication.

2. Asyncronous (publish/subscribe)

Asynchronous communication is my favourite type of communication.

It's the act of a service raising a new event to some kind of centralised location and not caring what happens next.

The team-service in its current form already has a couple of examples of this.

When a new team is persisted to the databases a notification is sent to an event handler containing the details of the newly created team. Similar things happen when players are added or removed.

// CreateTeam creates a new team in the database.
func (interactor *TeamInteractor) CreateTeam(team *CreateTeamRequest) (*CreateTeamResponse, error) {
    if len(team.Name) == 0 {
        interactor.Logger.Log("Team name cannot be empty")
        var response = &CreateTeamResponse{
            ID:     "",
            Name:   team.Name,
            Errors: make([]string, 1),
        }

        response.Errors[0] = "Team name cannot be empty"

        return response, errors.New("Team name cannot be empty")
    }

    newTeam := &domain.Team{
        Name: team.Name,
    }

    createdTeamID := interactor.TeamRepository.Store(newTeam)

    interactor.EventHandler.Publish("leaguemanager-newteam", TeamCreatedEvent{
        TeamID:   createdTeamID,
        TeamName: team.Name,
    })

    return &CreateTeamResponse{
        ID:   createdTeamID,
        Name: newTeam.Name,
    }, nil
}

The team service handles the storage of the team data, and then just tells the world about it. There's no need to wait for anything else to happen, no worrying about anything else. Just a simple publish and that is that.

I prefer this kind of communication, mainly for the ease of inserting new functionality.

Sticking with the team/transfer interaction. Let's say we have 4 instances of the transfer service running to handle a spike in load. When a new team is created ALL 4 instances can listen for the same event. No complications, no additional code. Just four services all listening for the same event.

Now, the fixture management team have started working on their scheduling service. They also care about when a new team is created so it can be used by their scheduling service.

In the world of microservices, this kind of functionality is simple to add.

The fixture service just needs to start listening out for the same 'info.newteamcreated' event. It then receives the same event at the same time as the 4 transfer instances.

Alt Text

AWS Implementation

There is no single AWS service that gives the complete set of functionality we need for the above implementation. To get it working exactly as we need to, a combination of Amazon Simple Queue and Simple Notification services are required.

Conceptually, this is how the implementation will work.

Alt Text

As with all of the AWS SDKs, the details for making this work with Go Lang is incredibly easy.

Our implementation of the SNS publish looks a little something like this.

package infrastructure

import (
    "errors"
    "fmt"
    "strings"
    "team-service/domain"

    "github.com/aws/aws-sdk-go/aws"
    "github.com/aws/aws-sdk-go/aws/credentials"
    "github.com/aws/aws-sdk-go/aws/session"
    "github.com/aws/aws-sdk-go/service/sns"
)

// ErrTopicNotFound is returned when the requested topic is not found.
var ErrTopicNotFound = errors.New("Specified topic not found")

// AmazonSnsEventBus is an event bus implementation using Amaazon SQS.
type AmazonSnsEventBus struct {
    svc       *sns.SNS
    availableTopics []string
}

// NewAmazonSnsEventBus creates a instance of the AmazonSnsEventBus.
func NewAmazonSnsEventBus(requiredTopics []string) *AmazonSnsEventBus {
    // Initialize a session that the SDK will use to load
    // credentials from the shared credentials file ~/.aws/credentials
    // and region from the shared configuration file ~/.aws/config.
    sess := session.Must(session.NewSession(&aws.Config{
        Region:      aws.String("eu-west-1"),
        Credentials: credentials.NewSharedCredentials("", "league-manager-sqs"),
    }))

    svc := sns.New(sess)

    availableTopics, _ := svc.ListTopics(nil)

    availableTopicArns := make([]string, len(availableTopics.Topics))

    for i, t := range availableTopics.Topics {
        availableTopicArns[i] = *t.TopicArn
    }

    return &AmazonSnsEventBus{
        svc:       svc,
        availableTopics: availableTopicArns,
    }
}

// Publish sends a new message to the event bus.
func (ev AmazonSnsEventBus) Publish(publishTo string, evt domain.Event) error {
    requiredTopicArn := ""

    for _, t := range ev.availableTopics {
        if strings.Contains(t, publishTo) {
            requiredTopicArn = t
        }
    }

    if len(requiredTopicArn) > 0 {

        result, err := ev.svc.Publish(&sns.PublishInput{
            Message: aws.String(string(evt.AsEvent())),
            TopicArn:    aws.String(requiredTopicArn),
        })

        if err != nil {
            fmt.Println("Error", err)
        }

        fmt.Println("Event published: ", *result.MessageId)

        return err
    }

    return ErrTopicNotFound
}

And then the usage in main.go

teamInteractor := new(usecases.TeamInteractor)
// teamInteractor.TeamRepository = infrastructure.NewInMemTeamRepo()
teamInteractor.TeamRepository = infrastructure.NewDynamoDbRepo()
teamInteractor.Logger = new(infrastructure.Logger)
// teamInteractor.EventHandler = new(infrastructure.MockEventBus)
teamInteractor.EventHandler = infrastructure.NewAmazonSnsEventBus()

One of the important things I've learned about both SNS and SQS is that you are charged per request.

For that reason, on the creation of the AmazonSqsEventBus, I load into memory all of the available topics.

availableTopics, _ := svc.ListTopics(nil)

availableTopicArns := make([]string, len(availableTopics.Topics))

for i, t := range availableTopics.Topics {
    availableTopicArns[i] = *t.TopicArn
}

return &AmazonSnsEventBus{
    svc:       svc,
    availableTopics: availableTopicArns,
}

When the publish request is actually made, I loop through the available topic ARN's and pick out the one that contains the topic name I'm trying to publish to.

requiredTopicArn := ""

for _, t := range ev.availableTopics {
    if strings.Contains(t, publishTo) {
        requiredTopicArn = t
    }
}

if len(requiredTopicArn) > 0 {
    ...
}

With that, I can run the application and make a POST request to create a new team. Low and behold...

Alt Text

Part one of the solution complete, albeit the much much simpler side.

In my next post, I'm going to dive deeper into the subscribe side of this model. This is a much more complicated side.

For each instance of a service running, it will need to create its own queue and then subscribe that queue to the correct SNS topic. Ideally, it also needs to sever that connection on shut down.

That could all be done manually within the UI, but in the world of auto-scaling and load balancing that would just be no fun.

As always, any comments questions or feedback is greatly appreciated. This is a learning journey for me as well.

This post is also available on DEV.