13 minutes
Building a microservice framework in Golang
Introduction
Exactly one year ago I started my job at CSP Inc. to work on the ARIA product line. At the time we were a team of four engineers, all hailing from an embedded background. Having worked extensively in C/C++ in the past, we had little to no experience in Golang (heck I hadn’t even heard of Golang at that point). We were faced with a daunting task — to build the next biggest Software Defined Security product! The project had multiple components ranging from the web-service front-end to developing an agent software which would run on our Secure Intelligent Adapters (SIA).
I was tasked with designing and developing the Controlplane (which we refer to as the Software Defined Security Orchestrator — SDSo). The SDSo was to be developed as a collection of multiple microservices mostly written in Golang. Over the past year I have worked with multiple open-source libraries and implemented services using Kafka, gRPC, Redis, MySQL, HTTP, Celery, among several others. In this time, I have created over twenty microservices written exclusively in Golang. With the team growing from four to fifteen engineers working on the product, I have seen microservices being developed in a style as unique as the developer who worked on it. However, in principle, each one of these services are the same. Each one composed of one or more running servers, storage options and one or more upstream clients.
Due to the fast paced nature of our work, it was absolutely essential for me and my team of four engineers to focus on the business logic of our services. Rather than rehashing boilerplate code to setup our microservices such as initializing our server, database, clients etc., I came up with a simplified framework which I will be describing in this post.
Microservice interface
Each microservice should satisfy the Microservice interface. This is not imposed but only serves as a guideline.
type Microservice interface {
// RegisterServer registers a server with a ServerType identifier. The server must implement the ServerInterface
RegisterServer(serverType server.ServerType, server server.ServerInterface) (err error)
// Start is used to start the microservice. All servers registered with the microservice are started when this
// method is invoked.
Start() (err error)
// Stop is used to gracefully stop the microservice. All servers registered with the microservice are gracefully stopped
// when this method is invoked.
Stop() (err error)
}
An example of how we implement this interface would be :
type ServiceMgr struct {
// A map of all the servers registered with the service
servers map[server.ServerType]server.ServerInterface
}
func (ss *ServiceMgr) RegisterServer(serverType server.ServerType, server server.ServerInterface) (err error) {
// Store the server in a map
ss.servers[serverType] = server
return
}
func (ss *ServiceMgr) Start() (err error) {
// Do all your service initialization here.
err = ss.init()
if err != nil {
return
}
// Invoke the start method on each of the servers
for t, s := range ss.servers {
err = s.Start()
if err != nil {
return err
}
}
return
}
func (ss *ServiceMgr) Stop() (err error) {
// Issue a graceful shutdown call to each of the servers
for t, s := range ss.servers {
err = s.Stop()
if err != nil {
return err
}
}
return
}
We will get to the init()
seen in line 12 above in a few moment. (*)
Server interface
Each server would need to implement the server interface which is composed of the following :
A Start()
method which would start the would start the server as a go routine.
A Stop()
method which would gracefully stop the server by sending a message on its quit channel returned at the time of starting
A RegisterNamespace()
and RegisterService()
to register a namespace which allows for using the same channel/topic for sending messages destined for different endpoints identified by their service type string.
// ServerType is a typedef for server identifier
type ServerType string
// Service is a typedef for service identifier
type Service string
// ServiceEndpointMap is a typedef for a map of Service endpoints identified by their service type
type ServiceEndpointMap map[Service]endpoint.Endpoint
// Handler is a func signature implemented by the Endpoint Handle() method
type Handler func(in []byte, serviceEndpointMap ServiceEndpointMap) (err error)
// StartStopInterface is composed of the Start() and Stop() methods to be implemented by the server
type StartStopInterface interface {
// Start is used to start the server
Start() (err error)
// Stop is used to gracefully stop the server
Stop() (err error)
}
// ServerInterface defines the methods that all servers registed with the microservice must implement
type ServerInterface interface {
StartStopInterface
// RegisterNamesapce is used to register a namespace (like a kafka channel/topic or grpc namespace)
// with the server.
RegisterNamespace(namespace string)
// RegisterService is used to register a service and its endpoints with the server.
RegisterService(namespace string, service Service, endpoint endpoint.Endpoint)
}
An implementation of the server would look something like follows -
Dont get confused by the nomenclature [KafkaConsumer
] below. At CSPi we use kafka
not just as a pubsub
framework but also as a server for our microservices. Typically used when the service being targeted needs to asychronous. (You may note that the response in the Handle()
is not returned to the caller. Sending a response is an implementation detail when using kafka as a server).
// KafkaConsumer encapsulates all members used for the kafka server.
type KafkaConsumer struct {
cfg *configuration.KafkaConfig
config *sarama.Config
master sarama.Consumer
topics map[string]Topic
quit chan bool
}
// Topic encapsulates a kafka channel name and a map of endpoints used on that channel
type Topic struct {
Name string
ServiceEndpointMap ServiceEndpointMap
}
// MyTopic returns a new Topic instance
func MyTopic(name string) (topic Topic) {
return Topic{
Name: name,
ServiceEndpointMap: make(ServiceEndpointMap),
}
}
// MyKafkaConsumer takes in configuration object and returns a new KafkaConsumer instance.
func MyKafkaConsumer(cfg *configuration.KafkaConfig) (kc *KafkaConsumer, err error) {
config := sarama.NewConfig()
config.Consumer.Return.Errors = true
master, err := sarama.NewConsumer(cfg.Brokers, config)
if err != nil {
return
}
kc = &KafkaConsumer{
cfg,
config,
master,
make(map[string]Topic),
make(chan bool),
}
return
}
// Start is an implementation of the Server Start() method.
func (kc *KafkaConsumer) Start() (err error) {
for _, topic := range kc.topics {
go func(topic Topic) {
consumer(kc.master, kc.quit, topic, ByteHandler, kc.cfg)
select {
case <-kc.quit:
return
}
}(topic)
}
return
}
// Stop is an implementation of the Server Stop() method
func (kc *KafkaConsumer) Stop() (err error) {
close(kc.quit)
kc.master.Close()
return
}
// RegisterNamespace is an implementation of the Server RegisterNamespace method.
// In kafka a namespace is the same as the channel name.
func (kc *KafkaConsumer) RegisterNamespace(name string) {
kc.topics[name] = MyTopic(name)
return
}
// RegisterService is an implementation of the server RegisterService method.
// In kafka this is used to demux the messages coming in on a single channel/topic.
func (kc *KafkaConsumer) RegisterService(namespace string, service Service, ep endpoint.Endpoint) {
kc.topics[namespace].ServiceEndpointMap[service] = ep
return
}
Endpoint interface
The business logic of our service is encapsulated in the Endpoint interface within the Handle()
method.
package endpoint
import "context"
// Endpoint provides the main interface to be implemented by each endpoint.
// Endpoints are where the user provides all the business logic of the microservice.
type Endpoint interface {
Handle(ctx context.Context, request []byte) (response []byte, err error)
}
An implementation of which would look something like this :
package endpoint
import (
"context"
log "github.com/sirupsen/logrus"
)
// HelloEndpoint is the endpoint specific object
type HelloEndpoint struct {
}
// MyHelloEndpoint returns an instance of the HelloEndpoint object
func MyHelloEndpoint() *HelloEndpoint {
return &HelloEndpoint{}
}
// Handle implements the Endpoint Handle method.
// This is where the business logic of the endpoint is implemented.
func (ep *HelloEndpoint) Handle(ctx context.Context, request []byte) (response []byte, err error) {
log.Infof("\n=======\nHello %s\n=======\n", request)
return
}
The users job is to register this endpoint as a handler for the service identified by a string "hello"
registered under the "greeter"
namespace. In kafka
the namespace is synonymous with the topic
name.
// Do all other initialization work, specific to the microservice here.
func (ss *ServiceMgr) init() (err error) {
sv := ss.servers[server.ServerType("kafka")]
// Register a namespace
sv.RegisterNamespace("greeter")
// Register your endpoints against the namespace with the server
sv.RegisterService("greeter", server.Service("hello"), endpoint.MyHelloEndpoint())
return
}
That wraps up my post about microservices.
Keep watching this space for more edits, completed with snippets and gifs to come.
Doing this with an RPC Server instead of Kafka
The Server interface implementation would like below :
package server
import (
"encoding/json"
"go-micro-framework/microservice/configuration"
"go-micro-framework/microservice/endpoint"
"log"
"net"
context "golang.org/x/net/context"
grpc "google.golang.org/grpc"
)
type RPCNamespace struct {
Name string
ServiceEndpointMap ServiceEndpointMap
}
func MyRPCNamespace(name string) (namespace RPCNamespace) {
return RPCNamespace{
Name: name,
ServiceEndpointMap: make(ServiceEndpointMap),
}
}
type RPCServer struct {
addr string
namespaces map[string]RPCNamespace
server *grpc.Server
}
func MyRPCServer(cfg *configuration.RPCConfig) (rpc *RPCServer, err error) {
rpc = &RPCServer{
addr: cfg.Address,
namespaces: make(map[string]RPCNamespace),
}
return
}
func (rpc *RPCServer) Start() (err error) {
lis, err := net.Listen("tcp", rpc.addr)
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
rpc.server = grpc.NewServer()
RegisterMessageServiceServer(rpc.server, rpc)
go rpc.server.Serve(lis)
return
}
func (rpc *RPCServer) Stop() (err error) {
rpc.server.GracefulStop()
return
}
func (rpc *RPCServer) RegisterNamespace(name string) {
rpc.namespaces[name] = MyRPCNamespace(name)
return
}
func (rpc *RPCServer) RegisterService(namespace string, service Service, ep endpoint.Endpoint) {
rpc.namespaces[namespace].ServiceEndpointMap[service] = ep
return
}
func (rpc *RPCServer) Command(c context.Context, msg *RPCMessage) (resp *RPCResponse, err error) {
out, err := ByteHandler(msg.Data, rpc.namespaces[msg.Namespace].ServiceEndpointMap)
if err != nil {
resp = &RPCResponse{
Error: err.Error(),
}
return
}
resp = &RPCResponse{
Data: out,
}
return
}
type RPCClient struct {
addr string
client MessageServiceClient
}
func MyRPCClient(cfg *configuration.RPCConfig) (rpc *RPCClient, err error) {
conn, err := grpc.Dial(cfg.Address, grpc.WithInsecure())
if err != nil {
return
}
rpc = &RPCClient{
addr: cfg.Address,
client: NewMessageServiceClient(conn),
}
return
}
func (rpc *RPCClient) Send(namespace string, msg interface{}) (resp string, err error) {
b, err := json.Marshal(msg)
if err != nil {
return
}
respObj, err := rpc.client.Command(
context.Background(),
&RPCMessage{
Namespace: namespace,
Data: b,
},
)
if err != nil {
return
}
resp = string(respObj.Data)
return
}
I generated the RPC server using the protoc
tool using the following protobuf
specification file.
syntax = "proto3";
package rpc;
service MessageService {
rpc Command(RPCMessage) returns (RPCResponse) {};
}
message RPCMessage {
string Namespace = 1;
bytes Data = 2;
}
message RPCResponse {
bytes Data = 1;
string Error = 2;
}
, which can be used to generate the proto server using the protoc tool provided by gRPC
.
package server
// This is created using the rpc.proto file.
import (
fmt "fmt"
"math"
"github.com/golang/protobuf/proto"
context "golang.org/x/net/context"
grpc "google.golang.org/grpc"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
type RPCMessage struct {
Namespace string `protobuf:"bytes,1,opt,name=Namespace" json:"Namespace,omitempty"`
Data []byte `protobuf:"bytes,2,opt,name=Data,proto3" json:"Data,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *RPCMessage) Reset() { *m = RPCMessage{} }
func (m *RPCMessage) String() string { return proto.CompactTextString(m) }
func (*RPCMessage) ProtoMessage() {}
func (*RPCMessage) Descriptor() ([]byte, []int) {
return fileDescriptor_rpc_848df06cb7d89181, []int{0}
}
func (m *RPCMessage) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_RPCMessage.Unmarshal(m, b)
}
func (m *RPCMessage) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_RPCMessage.Marshal(b, m, deterministic)
}
func (dst *RPCMessage) XXX_Merge(src proto.Message) {
xxx_messageInfo_RPCMessage.Merge(dst, src)
}
func (m *RPCMessage) XXX_Size() int {
return xxx_messageInfo_RPCMessage.Size(m)
}
func (m *RPCMessage) XXX_DiscardUnknown() {
xxx_messageInfo_RPCMessage.DiscardUnknown(m)
}
var xxx_messageInfo_RPCMessage proto.InternalMessageInfo
func (m *RPCMessage) GetNamespace() string {
if m != nil {
return m.Namespace
}
return ""
}
func (m *RPCMessage) GetData() []byte {
if m != nil {
return m.Data
}
return nil
}
type RPCResponse struct {
Data []byte `protobuf:"bytes,1,opt,name=Data,proto3" json:"Data,omitempty"`
Error string `protobuf:"bytes,2,opt,name=Error" json:"Error,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *RPCResponse) Reset() { *m = RPCResponse{} }
func (m *RPCResponse) String() string { return proto.CompactTextString(m) }
func (*RPCResponse) ProtoMessage() {}
func (*RPCResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_rpc_848df06cb7d89181, []int{1}
}
func (m *RPCResponse) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_RPCResponse.Unmarshal(m, b)
}
func (m *RPCResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_RPCResponse.Marshal(b, m, deterministic)
}
func (dst *RPCResponse) XXX_Merge(src proto.Message) {
xxx_messageInfo_RPCResponse.Merge(dst, src)
}
func (m *RPCResponse) XXX_Size() int {
return xxx_messageInfo_RPCResponse.Size(m)
}
func (m *RPCResponse) XXX_DiscardUnknown() {
xxx_messageInfo_RPCResponse.DiscardUnknown(m)
}
var xxx_messageInfo_RPCResponse proto.InternalMessageInfo
func (m *RPCResponse) GetData() []byte {
if m != nil {
return m.Data
}
return nil
}
func (m *RPCResponse) GetError() string {
if m != nil {
return m.Error
}
return ""
}
func init() {
proto.RegisterType((*RPCMessage)(nil), "rpc.RPCMessage")
proto.RegisterType((*RPCResponse)(nil), "rpc.RPCResponse")
}
// Reference imports to suppress errors if they are not otherwise used.
var _ context.Context
var _ grpc.ClientConn
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
const _ = grpc.SupportPackageIsVersion4
// Client API for MessageService service
type MessageServiceClient interface {
Command(ctx context.Context, in *RPCMessage, opts ...grpc.CallOption) (*RPCResponse, error)
}
type messageServiceClient struct {
cc *grpc.ClientConn
}
func NewMessageServiceClient(cc *grpc.ClientConn) MessageServiceClient {
return &messageServiceClient{cc}
}
func (c *messageServiceClient) Command(ctx context.Context, in *RPCMessage, opts ...grpc.CallOption) (*RPCResponse, error) {
out := new(RPCResponse)
err := grpc.Invoke(ctx, "/rpc.MessageService/Command", in, out, c.cc, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// Server API for MessageService service
type MessageServiceServer interface {
Command(context.Context, *RPCMessage) (*RPCResponse, error)
}
func RegisterMessageServiceServer(s *grpc.Server, srv MessageServiceServer) {
s.RegisterService(&_MessageService_serviceDesc, srv)
}
func _MessageService_Command_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(RPCMessage)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(MessageServiceServer).Command(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/rpc.MessageService/Command",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(MessageServiceServer).Command(ctx, req.(*RPCMessage))
}
return interceptor(ctx, in, info, handler)
}
var _MessageService_serviceDesc = grpc.ServiceDesc{
ServiceName: "rpc.MessageService",
HandlerType: (*MessageServiceServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "Command",
Handler: _MessageService_Command_Handler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "rpc.proto",
}
func init() { proto.RegisterFile("rpc.proto", fileDescriptor_rpc_848df06cb7d89181) }
var fileDescriptor_rpc_848df06cb7d89181 = []byte{
// 171 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0x2c, 0x2a, 0x48, 0xd6,
0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x2e, 0x2a, 0x48, 0x56, 0xb2, 0xe3, 0xe2, 0x0a, 0x0a,
0x70, 0xf6, 0x4d, 0x2d, 0x2e, 0x4e, 0x4c, 0x4f, 0x15, 0x92, 0xe1, 0xe2, 0xf4, 0x4b, 0xcc, 0x4d,
0x2d, 0x2e, 0x48, 0x4c, 0x4e, 0x95, 0x60, 0x54, 0x60, 0xd4, 0xe0, 0x0c, 0x42, 0x08, 0x08, 0x09,
0x71, 0xb1, 0xb8, 0x24, 0x96, 0x24, 0x4a, 0x30, 0x29, 0x30, 0x6a, 0xf0, 0x04, 0x81, 0xd9, 0x4a,
0xe6, 0x5c, 0xdc, 0x41, 0x01, 0xce, 0x41, 0xa9, 0xc5, 0x05, 0xf9, 0x79, 0xc5, 0x08, 0x25, 0x8c,
0x08, 0x25, 0x42, 0x22, 0x5c, 0xac, 0xae, 0x45, 0x45, 0xf9, 0x45, 0x60, 0x7d, 0x9c, 0x41, 0x10,
0x8e, 0x91, 0x03, 0x17, 0x1f, 0xd4, 0xd6, 0xe0, 0xd4, 0xa2, 0xb2, 0xcc, 0xe4, 0x54, 0x21, 0x3d,
0x2e, 0x76, 0xe7, 0xfc, 0xdc, 0xdc, 0xc4, 0xbc, 0x14, 0x21, 0x7e, 0x3d, 0x90, 0x33, 0x11, 0x0e,
0x93, 0x12, 0x80, 0x09, 0xc0, 0x6c, 0x52, 0x62, 0x48, 0x62, 0x03, 0x7b, 0xc3, 0x18, 0x10, 0x00,
0x00, 0xff, 0xff, 0x62, 0x4f, 0xa7, 0xc1, 0xd3, 0x00, 0x00, 0x00,
}
You can find all the code hosted on github.
To test this framework out I have provided an example directory completed with a simple microservice running kafka
as a server which can found in the /examples/
folder. Simple start the server by running the main.go in the /examples/kafka-microservice/
directory.
To send a message over the listening kafka topic, use the producer.go
program provided under the /examples
directory.
[A docker-compose
file is in the works for using the example microservice. I will update the page as soon as I have that ready. If you are curious you can always manually startup your kafka server using the Dockerfile
provided at https://hub.docker.com/r/spotify/kafka/].
If you looking for a more mature and independent framework, take a look at gizmo, go-kit and go-micro. The reason we didn’t use any of these framework at our workplace was because we found these frameworks to be more web-dev-centric and not as flexible for our diverse needs.
Originally published on medium
{{ template “_internal/disqus.html” . }}