Distributed Counter

I heard you wanted a practical distributed systems, so here it is writing hello world of distributed systems - a distributed counter. For a little of background, let’s just say you are trying to write a likes service which tracks likes for a particular post and you want to horizontally scale this service by adding multiple instances of this service due to whatever reasons. So, let’s just talk about the ways to achieve this.

This seems like-

  • Atleast once
  • Exactly once
  • Atmost once

Where a single like by a user should be reflected exactly once for the post.

Use a map of post to counter.

Let’s just say the posts are unique based on their id and the id is represented in string, so an in memory database for that might look like map[string]int64. Now for each like you increase the counter for that post. Now all sorts of problem might arise and you need some sort of deduplication on requests which is not very easy to achieve. For example- you might have sent a post request to like a particular post and the request got through to the server but didn’t get through till the client and therefore you issue a request again, and so now you have got two likes for a user which has intended to like just once. So, you need to deduplicate the request here.

Now let’s just say even the request got through to the server and the response got through as well, but while letting other instances know that you have received a like you might end up in the same scenario and then all sorts of inconsistencies arise across your service instances because of this. But you want consistency across the service instances, they might be very strongly consistent guaranteeing linearizability but you might still want eventual consistency.

So, what can be done?

Consistency Without Consensus

You want consistency in distributed systems, what are the options?

  • Consensus- Very expensive
    • 2PC
  • Writes via leader- Very hard to scale, implicits consensus on scaling.
  • CRDT- Consistency(eventual consistency) without consensus.

Avoiding consensus as much as possible is the key to write distributed systems and systems that can scale horizontally without biting you.

What’s CRDT now?

CRDTs are conflict free replicated data types particularly used in distributed systems. Particularly referred to as ACID 2.0 types. Well, that’s not even a thing, but i like how somewhere CRDTs are reference like this and so i stole it.

Acronym Meaning
A Associative
C Commutative
I Idempotent
D Distributed

The D is not even a thing, just call it whatever you want, so we will call it distributed. The objective is to bring out consistency in distributed systems without consensus, as consensus is very costly and therefore we read about CRDTs. The main idea is to make the operations commute, idempotent and associative. So you start any number of replicas at any arbitrary state(same for the replicas) and apply operations in any order, and as they are associative, commutative and idempotent. You are sure to get the exact same state (after applying all the operations), even if the operations are not applied in the same order for both the nodes. This means the nodes that have seen same set of operations in any order are consistent with each other.

CRDTs is a huge topic and out of scope of this article to completely cover that, but you can find various resources on CRDT online. You can find various CRDTs implemented here . I created this repo to play around with CRDTs for fun and is written in go. The specific CRDT that we are going to use right now for this particular case is G-set CRDT. G-set or Grow only set is a special type of CRDT set that allows only to add data to the set, and not remove it. So, you might have guessed already the user is allowed to like, but not unlike or dislike it.

Now, let’s get to actual writing of the service. You can find the whole code here .

But let’s start in pieces on how to approach writing this?

We need some sort of database to store the likes for a post. We are going to use in memory database for this which is going to be a map[string]gset.Gset which a map of string to a gset. Going to use zerolog library for logging. We need someway to make a cluster of nodes for which we are going to use memberlist from hashicorp which is a hightly available cluster aware membership protocol and going to gossip the likes with each other and preiodically reduce the entropy among the nodes.

To subscribe to the gossip layer of the memberlist protocol you have to implement membership.Delegate interface. The NotifyMsg(b []byte) method of the interface is going to notify you of any incoming gossip message. On invoke of this method we send it in a channel for which a state machine is looping indefinitely until the channel is closed and then taking necessary actions as approrpiate on receive of a gossip message. The relevant code is-

1func (s *server) NotifyMsg(b []byte) {
2    s.chanMsg <- b
3}

and looping on this channel to listen for events-

 1func (s *server) loopSM() {
 2    for i := range s.chanMsg {
 3        s.logger.Debug().Msgf("Received message %s", string(i))
 4        var l likeDomain
 5        err := json.Unmarshal(i, &l)
 6        if err != nil {
 7            s.logger.Debug().Msgf("Error unmarshalling message %e", err)
 8            continue
 9        }
10        // if error is not nil add that to gset.
11        _ = s.postLikes.AddLike(l.User, l.Post)
12    }
13}

Also to reduce the entropy periodically we need to tell some local state of the node in some way which we achieve by

1s.mu.Lock()
2defer s.mu.Unlock()
3b, _ := json.Marshal(s.postLikes)
4return b

and then compare remote state received from some node’s global state.

 1func (s *server) MergeRemoteState(buf []byte, join bool) {
 2    if len(buf) == 0 || !join {
 3        return
 4    }
 5    var temp = map[string]gset.Gset{}
 6    _ = json.Unmarshal(buf, &temp) // deliberately ignore error so linter doesn't complain.
 7    // converge this temp into s.postLikes
 8    s.mu.Lock()
 9    defer s.mu.Unlock()
10    for key, value := range temp {
11        for _, user := range value.GetSet() {
12            _ = s.postLikes.AddLike(key, user)
13        }
14    }
15}

Also we want to be notified when some node joins, leaves or update the cluster which we subscribe by implementing the membership.EventDelegate interface. The implementation of the interface looks like-

 1func (s *server) NotifyJoin(n *memberlist.Node) {
 2    s.logger.Info().Msgf("node joined: %s  address: %s \n", n.String(), n.Addr.String())
 3}
 4
 5func (s *server) NotifyLeave(n *memberlist.Node) {
 6    s.logger.Info().Msgf("node left: %s \n", n.String())
 7}
 8
 9func (s *server) NotifyUpdate(n *memberlist.Node) {
10    s.logger.Info().Msgf("node updated: %s \n", n.String())
11}

Also we need to broadcast the likes we get on a particular node to all the nodes for which we use gossip protocol of the membership layer by implementing the membership.Broadcast interface. The gossip protocol works exctly like how gossip works in general. You tell your friends something they tell something to their friends and so on the chain goes on and eventually the gossip is spread out to a whole lot of people. Similarly you get an event you gossip it to some nodes, which gossip it to some nodes again and so on, and so the event reaches every node eventually. So, on receiving the post request for like we emit an event in broadcast queue to be broadcasted to the cluster.

And the get request gets the information from it’s local in memory database, and therefore it might be possible the like recorded at a particular node might not have reached here until now, but that’s alright we can be eventually consistent here. We can show the like after sometime, the important thing is that we don’t lose the like. If you want stronger consistency for example- linearizability- you might want something like writing to a quorum of nodes at the time of writing a like and then getting it from the quorum of nodes again while reading the likes for a post. That’s it, that’s the distributed service/counter. Now we need some sort of orchestration to deploy these services we are going to be using kubernetes for that. Not pasting the entire yamls just pasting some relevant code.

The membership requires atleast one known node in the cluster to join the cluster. so we are going to create a master deployment with

1labels:
2        workertype: master
3        application: likes-distribute

and a master deployment service to be able to connect with the pods described by master deployment.

1spec:
2  selector:
3    workertype: master
4    application: likes-distributed

This can be a clusterip service because only the worker nodes that we are going to create just right after, are going to connect to master via this service. The worker nodes are not actually workers and the request can land up on any of the nodes/pods but the membership requires atleast one known member of the cluster to join the cluster.

1metadata:
2  labels:
3        workertype: worker
4        application: likes-distributed

Distinguishing from master so that every pod doesn’t create it’s own cluster, then that’s just meaningless. Now we need some service to be able to connect to n workers and 1 master.

 1spec:
 2  selector:
 3    application: likes-distributed
 4  type: LoadBalancer
 5  ports:
 6  - port: 80
 7    nodePort: 31001
 8    protocol: TCP
 9    targetPort: 8080
10    name: http-server

You can check out the whole source code here , if you like the repo don’t forget to give it a start. You can use make run to run the kubernetes cluster on your local, if you have minikube installed. It uses configmaps as well so please apply in order if you are applying manually. Then use-

1minikube service distributed-likes-service-cluster

This will give you the service endpoint which you can use to register likes or get likes etc. A sample request looks something like-

example

So, as you can see the request landed on some worker pod but quickly got replicated to all the pods which can be seen in subsequent curl requests.

There’s a lot of scope of improvement in this service which is going to be iterative and many more blogs might come up with the improvements. Thanks.

Comments