Lightweight microservices with F# and ZeroMQ

Lightweight microservices with F# and ZeroMQ

Build fast and safe services

Foreword

This post is a part of the F# Advent Calendar 2021 . Made possible through the efforts of Sergey Tihon, the tradition he established since 2014 became very popular in F# community and later being adopted by C# community in C# Advent Calendar. Many thanks to Sergey for organizing this event.

Introduction

In microservices architecture we often split complex functionality into pieces, isolating each piece in a box known in DDD as bounded context. From implementation perspective we would like to see contexts as fully autonomous, not dependent on other contexts, so that workflows belongs to the boundaries of the particular context. In a real world however, workflows spans across multiple contexts. The way how services communicate changes between contexts is by sending domain events. This usually implies setting up quite complex infrastructure with message broker in the middle, architecting your services in publisher-subscriber fashion and integrating with one of those message brokers. But often you don’t need that complex setup, especially for fire-and-forget scenarios. We are going to write fully functional CLI services in .NET 6.0 using F# and NetMQ (.NET port of ZeroMQ messaging library). I choose ZeroMQ because it is a lightweight, very fast and easy to start working with. It allows you to connect services in different network topology using TCP sockets on steroids. You can build everything from request/reply, pub/sub to parallel pipelines across your services. It does not require complex setup like RabbitMQ or Kafka for the reason of it broker-less nature. On the other hand it does not provide you with reliable messaging and logging. So out of the box there are no tools for logging your messages and re-queuing failed ones.

You can find source code for this article on my GitHub account

Plan

We will containerize solution using guidelines from 12-factor apps. Sample solution consists of two projects: publisher and subscriber: publisher will be firing domain events and subscriber will be listening and reacting to those events. I deliberately skip details on domain-driven aspect of solution like error handling in your domain, converting from DTOs to domain models and vice-versa or command line parsing and focus on communication, configuration, projects structure and main flows of sending/receiving domain events. You will find DDD concepts implementation in the source code, although slightly simplified.

For details on DDD I highly recommend this book by Scott Wlaschin and Railway oriented programming series

Pre-requisites

  • .NET 6.0/F#

  • Docker

  • IDE/Code Editor of your choice

Sample solution

Sample solution has publisher and subscriber services and shared code with domain models and helpers. There is only one event defined: SendEmailDomainEvent, but of course you can create more.

image.png

Solution uses FsConfig library to read environment variables with server address, port and topics for pub/sub.

Domain

For the sake of an example there are just a few domain types defined in Domain.fs:

type EmailAddress = | EmailAddress of string

type Contact =
    {
        FirstName: string
        MiddleName: string option
        LastName: string
        EmailAddress: EmailAddress
    }

Domain is a shared module between publisher and subscriber. When working with business workflows inside domain we want to ensure we use types correctly.

Domain Events

Next, there are events which defined in Events.fs. Events are shared types that define how services communicate and share data. Event could be one of two types: business and infrastructure. Business events is anything related to business flows like ContactCreated, OrderPlaced, etc., whilst infrastructure events something not directly related to business flows and side effects of it. In example we work with business event:

type DomainEventType =
    | Business
    | Infrastructure

type ContactCreatedDomainEvent = 
    {
        Contact: Contact
    }

type DomainEvent = 
    | ContactCreated of Result<ContactCreatedDomainEvent, DtoError>

You can notice DomainEvent type. This is discriminated union which represents all events. We need a common type to convert concrete events and incorporate it later in workflow. It is actually a result of converting DTO to domain event type.

Data Transfer Objects

DTOs are simple types used for de-serialization from JSON which we get over the wires. Anything we get on the input side of the domain is DTO. Same applies for output. When we finish with workflow execution and want to produce another event we need to convert it to the new DTO and send to another service over the wire.

type DtoError = 
    | ValidationError of string
    | DeserializationException of exn

type ContactCreatedDto = 
    {
        FirstName: string
        MiddleName: string
        LastName: string
        EmailAddress: string
    }

Upon receiving a DTO we validate it and try to convert in domain event. There is a DtoError type defined in case of errors during converting DTOs.

Helpers and utilities

In solution there are various helper and utility functions such as converting to domain events from DTOs and vice-versa, serialization and deserialization, validation logic, etc. Check the source code for details. Now let’s jump straight to the publisher and subscriber.

Publisher

In real world it could be any workflow within bounded context defined as a service which producing events when some action completed. In the example it is just a CLI console application which randomly sends two things: well-formed ContactCreatedDomainEvent business event and malformed, non-existing event which is just some JSON. Publisher opens a pub socket, binds to the port and starts sending events until you cancel it with Ctrl + C. Simple enough. Complete code looks like this:

[<EntryPoint>]
let main argv =

    let env = Environment.GetEnvironmentVariable "DOTNET_ENVIRONMENT"
    let config = getConfig ()

    printfn "Pub server. Press Ctrl+C to exit."
    let mutable stopRequested = false
    let rand = Random(50)

    Console.CancelKeyPress.Add(fun arg -> 
        printfn "Exiting from Pub server..."
        stopRequested <- true
        arg.Cancel <- true)

    try
        use publisher = new PublisherSocket()
        publisher.Options.Linger <- TimeSpan.FromMilliseconds(1000.0)
        publisher.Bind($"tcp://*:{config.Port}")

        while not stopRequested do
            let randomizedTopic = rand.NextDouble();

            if randomizedTopic > 0.5 then
                let event : ContactCreatedDomainEvent = {
                    Contact = {
                        FirstName = "John"
                        MiddleName = None
                        LastName = "Dou"
                        EmailAddress = EmailAddress "johndou@gmail.com"
                    }
                } 
                printfn $"Sending event : {event}"

                let json = event |> ContactCreated.jsonFromDomain
                let eventType = Business |> toString

                config.Topics |> List.iter (
                    fun topic -> publisher.SendMoreFrame(topic).SendMoreFrame(eventType).SendFrame(json))
            else
                let json = """{ "NonExisting":"Message" }"""
                printfn $"Sending some non-defined event..."
                config.Topics |> List.iter (
                    fun topic -> publisher.SendMoreFrame(topic).SendMoreFrame(Business |> toString).SendFrame(json))

            Threading.Thread.Sleep(1000);
    finally
        NetMQConfig.Cleanup()
    0

You can see an endless loop where it randomly sends event each second to the topic. We get configuration values such as server address, port number, topic name from environment variables. This will help us later when we containerize publisher and subscriber. Let me highlight most important parts.

This code creates publisher socket, sets configuration and binds to it. Please refer to Pub-Sub documentation of NetMQ for details.

use publisher = new PublisherSocket()
publisher.Options.Linger <- TimeSpan.FromMilliseconds(1000.0)
publisher.Bind($"tcp://*:{config.Port}")

I set the linger period for the specified socket. The linger period determines how long pending messages which have yet to be sent to a peer shall linger in memory after a socket is closed. For this example it is irrelevant, however if you have high volume of events you need to consider this option.

Next you can see creation of event and sending it on the wires:

let event : ContactCreatedDomainEvent = {
    Contact = {
        FirstName = "John"
        MiddleName = None
        LastName = "Dou"
        EmailAddress = EmailAddress "johndou@gmail.com"
    }
} 
printfn $"Sending event : {event}"

let json = event |> ContactCreated.jsonFromDomain
let eventType = Business |> toString

config.Topics |> List.iter (
    fun topic -> publisher.SendMoreFrame(topic).SendMoreFrame(eventType).SendFrame(json))

Important to notice here that we create strongly typed domain event from shared module and convert it the JSON representation before sending. Because we could have multiple topics in real world, it is defined as a list of strings.

From ZeroMQ documentation: frames (also called a “message parts” in the ZeroMQ reference manual pages) are the basic wire format for ZeroMQ messages. Some useful info about frames:

  • A message can be one or more parts.

  • These parts are also called “frames”.

  • Each part is a zmq_msg_t object.

  • You send and receive each part separately, in the low-level API.

  • Higher-level APIs provide wrappers to send entire multipart messages

Basically it is a layered approach where on the sender side you wrap data in frames on on receiver side you unwrap it. In out example we have 3 layered frames structure:

untitled-diagram.drawio.webp

SendMoreFrame/SendFrame methods are exactly that kind of higher-level API provided by NetMQ library. So we wrap topic (test-topic), event type (Business) and domain event (ContactCreatedDomainEvent) into series of frames and send it over the wire.

Last to mention is: the else branch where we send some malformed event is being sent. It does not even have type, so it is just an arbitrary JSON:

let json = """{ "NonExisting":"Message" }"""
printfn $"Sending some non-defined event..."
config.Topics |> List.iter (
    fun topic -> publisher.SendMoreFrame(topic).SendMoreFrame(Business |> toString).SendFrame(json))

On the receiver side this will produce a validation error.

Subscriber

Now let’s move to the subscriber part. Here is complete code for it which we break into parts with further explanation:

[<EntryPoint>]
let main argv =

    let env = Environment.GetEnvironmentVariable "DOTNET_ENVIRONMENT"
    let config = getConfig ()

    printfn "Starting Sub client..."

    let runContinuously sub = async {
        let rec fetch () = async {
            (* Define workflows here based on event type *)
            match NetMQMessage.TryGetDomainEvent(sub) with
            | Some event -> printfn $"%A{event}"
            | None -> ()

            do! Async.Sleep config.DelayInternalMs
            do! fetch ()
        }
        do! fetch ()
    }

    let listen () = async {
        use subscriber = new SubscriberSocket()
        subscriber.Options.ReceiveHighWatermark <- 1000
        subscriber.Options.HeartbeatInterval <- TimeSpan(0, 0, 0, 5)  
        subscriber.Options.HeartbeatTimeout <- TimeSpan(0, 0, 0, 30)
        subscriber.Options.HeartbeatTtl <- TimeSpan(0, 0, 0, 30)
        subscriber.Connect($"tcp://{config.Server}:{config.Port}")
        printfn $"Running environment: {env}"
        printfn $"Connected to {config.Server} on port {config.Port}"
        config.Topics |> List.iter (fun topic -> subscriber.Subscribe(topic))
        printfn $"""Start listening for events on the following topics: [{config.Topics |> String.concat ", "}]..."""
        printfn "(Press Ctrl+C to exit)"

        do! runContinuously subscriber
    }

    try
        try
            let cts = new Threading.CancellationTokenSource()
            Console.CancelKeyPress.Add(fun arg -> arg.Cancel <- true; cts.Cancel())
            Async.RunSynchronously(listen(), 1000, cts.Token)
            0
        with
            | Failure msg -> eprintfn $"General Failure: {msg}"; -1
            | :? OperationCanceledException -> eprintfn "Exiting from Sub client..."; 0
    finally
        NetMQConfig.Cleanup()

Let’s look at the main block where we create cancellation token and asynchronously run the main listening function:

let cts = new Threading.CancellationTokenSource()
Console.CancelKeyPress.Add(fun arg -> arg.Cancel <- true; cts.Cancel())
Async.RunSynchronously(listen(), 1000, cts.Token)

The whole block is in try/with section which prints in stderr stream all unexpected errors or request for cancellation in the case you press Ctrl + C. There is also outer try/finally block which guarantees that sockets cleaned up before program exists.

Next, the listen function where we create subscriber socket and connect to the publisher on specified address, port and topic:

let listen () = async {
    use subscriber = new SubscriberSocket()
    subscriber.Options.ReceiveHighWatermark <- 1000
    subscriber.Options.HeartbeatInterval <- TimeSpan(0, 0, 0, 5)  
    subscriber.Options.HeartbeatTimeout <- TimeSpan(0, 0, 0, 30)
    subscriber.Options.HeartbeatTtl <- TimeSpan(0, 0, 0, 30)
    subscriber.Connect($"tcp://{config.Server}:{config.Port}")
    printfn $"Running environment: {env}"
    printfn $"Connected to {config.Server} on port {config.Port}"
    config.Topics |> List.iter (fun topic -> subscriber.Subscribe(topic))
    printfn $"""Start listening for events on the following topics: [{config.Topics |> String.concat ", "}]..."""
    printfn "(Press Ctrl+C to exit)"

    do! runContinuously subscriber
}

You could see that I set some options like heartbeats. I found this extremely useful when running in Kubernetes cluster where periodically subscriber lost connection to the publisher. With this option in place we make sure connection will be solid.

Last line starts continuous loop with runContinuously function:

let runContinuously sub = async {
    let rec fetch () = async {
        (* Define worflows here based on event type *)
        match NetMQMessage.TryGetDomainEvent(sub) with
        | Some event -> printfn $"%A{event}"
        | None -> ()

        do! Async.Sleep config.DelayInternalMs
        do! fetch ()
    }
    do! fetch ()
}

This is the core part of the flow. Function accepts subscriber socket as a parameter, defines nested fetch function which is recursive function and starts the endless loop. This function will be optimized to tail-recursive call so it will not blow up you stack. We use pattern matching to parse all incoming events from the topic and print it out. In real-world scenario you will have some kind of workflow here. To separate concerns, I suggest to create workflow module, define all workflows there and forward all incoming events to it from fetch function. It’s up to you.

One helper function worth mentioning is TryGetDomainEvent. It hides from you complexity of unwrapping the frames, de-serialization from DTO to domain event, validation logic and logging, keeping your main function clean. So here it is:

type NetMQMessage with
        static member TryGetDomainEvent(sub: SubscriberSocket) =
            let mutable message = NetMQMessage()
            let success = sub.TryReceiveMultipartMessage(&message, 3)
            if success && message.First.ConvertToString() = TopicName then
                let eventType = message.Item(1).ConvertToString() |> fromString<DomainEventType>
                let msg = message.Last.ToByteArray() |> Encoding.UTF8.GetString
                match (eventType, msg) with
                | (Some t, _) when t = Business -> 
                    msg 
                    |> ContactCreated.jsonToDomain
                    |> ContactCreated
                    |> (logEvent >> Some)
                | _, _ -> None
            else None

As you can see by signature of the function, it receives SubscriberSocket on the input and returns Option<DomainEvent> which is choice type for all domain events like Result<ContactCreatedDomainEvent, DtoError>. In case if convert from DTO to domain type was successful and there are no validation errors it returns domain event, otherwise a DtoError.

And that’s it. Very light and fast DDD microservices. Solution lacks proper logging and tracing but overall it should give you a feeling.

Containerization

To make solution better suited to modern realities of running services as containers, you will find dockerfiles for both projects and compose.yaml to run it in with docker compose. Here is Dockerfile for publisher:

ARG SDK_VERSION=6.0-alpine
ARG RUNTIME_VERSION=6.0-alpine

# Stage 1 - Build SDK image
FROM mcr.microsoft.com/dotnet/sdk:$SDK_VERSION AS build
WORKDIR /src
COPY ./src/pub ./pub
COPY ./src/shared ./shared
COPY ./src/utils ./utils

RUN dotnet restore ./pub -r alpine-x64
RUN dotnet publish ./pub/pub.fsproj -c Release -r alpine-x64 --self-contained true /p:PublishTrimmed=true --no-restore -o "../out"

# Stage 2 - Build runtime image
FROM mcr.microsoft.com/dotnet/runtime-deps:$RUNTIME_VERSION AS base
WORKDIR /micro-light-pub

ENV DOTNET_ENVIRONMENT Development
ENV MICROLIGHT_SERVER 127.0.0.1
ENV MICROLIGHT_PORT 5556

COPY --from=build /out ./
ENTRYPOINT ["./pub"]

Dockerfile for subscriber looks identical, except different paths, so let’s skip it. Below content of docker compose file:

version: '3.9'
services:
  pub:
    build:
      context: .
      dockerfile: pub.Dockerfile
    environment:
      - DOTNET_ENVIRONMENT=Development
    deploy:
      replicas: 1
      resources:
        limits:
          memory: 64M
    networks:
      - net
    ports:
      - 5556:5556/tcp
  sub:
    build:
      context: .
      dockerfile: sub.Dockerfile
    depends_on:
      - pub
    environment:
      - DOTNET_ENVIRONMENT=Development
      - MICROLIGHT_SERVER=pub
      - MICROLIGHT_PORT=5556
    deploy:
      replicas: 1
      resources:
        limits:
          memory: 64M
    networks:
      - net
    restart: always

networks:
  net:
    driver: bridge

According to the best practices of the 12-Factor app we should store configuration in the environment. Sample solution reads it from environment variables and here we override it, so no need in config files in the projects.

Wrapping up

You can run one by one pub and sub projects in Visual Studio Code if you prefer – there are launch.json and tasks.json files in repository. But with docker compose in place it much easier to run it like this:

cd ./micro-light
docker compose up -d

To check it is up and running you can connect to stdout of pub and sub with following commands (run it in different shells to get better effect):

docker compose logs -f pub
docker compose logs -f sub

To stop and free resources run:

docker compose down

Using DDD and broker-less messaging libraries like ZeroMQ allows you to start writing lightweight and fast services with less wrapping code very quickly. You don’t need complex infrastructure configuration. You can focus on your main business workflows in a nice functional way using all benefits of pipelining. When your services evolve to something more bigger and complex you can shift to full-fledged message brokers. But that should not affect your workflow codebase, the only changes you need to do are messaging logic.