Skip to main content
  1. Posts/

Considerations About Scaling Out Redis Streams with Producers and Consumers While Designing UptimeNinja.io

·1691 words

Intro #

As a hobbyist coder, one of the toughest questions is, “What should I work on next?” Recently, my interest has been piqued by event-driven architecture and decoupling service dependencies using message queues.

I had heard about event-driven architecture before, but had it in my since watching the video discussing event-sourcing in DEVSISTERS; DEVSISTERS’ Cookie Run Kingdom server architecture. I had jotted down some ideas and forgotten about them, but with recent discussions around Kafka at work and among peers, my interest was reignited.

Usually, when creating a product, you develop an idea based on user needs and market fit. I approach it differently: I design services based on technology stacks and architectures that intrigue me. This is just because I find more joy in programming itself than in making money or starting a business. This led me to start developing UptimeNinja.

šŸ„· UptimeNinja.io #

I’ve already purchased the domain uptimeninja.io for now. The concept for UptimeNinja is straightforward: think of it as a typical uptime monitoring tool. It checks the status of websites or servers at regular intervals and sends alerts.

The need arose from my home server operations; I needed a way to monitor its status and receive alerts. Although I have a monitoring stack including Prometheus, I needed an external alerting solution independent of the home server environment. After finding existing uptime monitoring tools pricey, I decided to build one myself.

What started as a simple idea has revealed numerous complexities as I consider scaling out. This blog post is less about solutions and more about the challenges and considerations that I’m facing.

  1. Which MQ to choose? Kafka vs. Redis
  2. How to execute actions at specific intervals? 1) Cronjob (robfig/cron), time.Ticker, 2) pull from DB (tick_table)
  3. Considerations for scaling out producers and consumers in an event-driven architecture (with Redis Stream)
  4. Why TSDBs are more appealing than RDBs

1) Kakfa vs Redis #

When discussing event-driven architecture or message queues, Apache Kafka often comes up (it used to be RabbitMQ). My initial interest in this project was also sparked by Kafka. One of Kafka’s few drawbacks is its operational complexity, especially since Iā€™m considering self-hosting on a home server rather than using managed services like MSK or Confluent. Still, my home server specs can handle Kafka šŸ˜Ž.

I’ve been particularly interested in Strimzi Operator, which allows managing Kafka clusters on Kubernetes using Kubernetes CRDs (Custom Resource Definitions). With Strimzi’s Kraft, there’s no need to provision Zookeeper for Kafka.

Despite Kafka’s appeal, I ultimately chose Redis Stream due to my limited understanding of Kafka. I felt that without sufficient knowledge, debugging Kafka issues would be challenging. Additionally, I anticipate Redis will be more cost-effective than Kafka, even if I migrate to the cloud later. So, what is Redis Stream?

Redis is well-known as an in-memory database, but Redis Stream might be less familiar. Redis is typically used in event-driven architecture through 1) Redis Pub/Sub or 2) Redis List type.

However, Redis Stream offers unique features compared to these two. Here’s a brief comparison:

Delivery Persistence Complexity
Redis Pub/Sub At-most-once low
Redis List At-most-once AOF, RDB low
Redis Stream At-least-once AOF, RDB higth

Redis Pub/Sub isn’t suitable for UptimeNinja since it broadcasts events to all subscribers, while we need one request per monitor. Redis List works by pushing elements to a queue and having consumers pop them off, ensuring each event triggers only once. However, it only guarantees At-most-once delivery, meaning failed actions are challenging to handle. And this is how Redis workout.

flowchart LR Producer -->|LPUSH| Redis[(Redis)] Redis -.->|BRPOP| a[Consumer a] Redis -.->|BRPOP| b[Consumer B]

Redis Stream addresses these issues.

redis-stream
from Redis Docs

Redis supports the stream data type. Events are added with XADD and read with XREAD. Notably, XREADGROUP allows for consumer groups, facilitating parallel consumer setups. Redis Stream events have the following states:

stateDiagram state "Added to Stream" as Added state "Pending (PEL) of A" as PendingA state "Pending (PEL) of B" as PendingB state "Acknowledged" as Acknowledged [*] --> Added: XADD Added --> PendingA: XGROUPREAD from 'A' Added --> PendingB: XGROUPREAD from 'B' PendingA --> Acknowledged: XACK PendingB --> Acknowledged: XACK PendingB --> PendingA: XCLAIM PendingA --> PendingB: pending event

In Redis Stream, when a consumer reads an event, it gets added to the consumer’s Pending Entries List (PEL). After processing, the consumer explicitly acknowledges the event with XACK. If an event remains pending for too long, another consumer can claim it using XCLAIM, ensuring At-least-once delivery.

2) Stateful vs Stateless Producer #

The key event for UptimeNinja is the HealthCheckRequest, structured as follows:

type HealthCheckReq struct {
	ID        string `json:"id"`
	MonitorID string `json:"monitor_id"`
	Url       string `json:"url"`
	Method    string `json:"method"`
	// Headers map[string]string `json:"headers"`
	Timeout int64 `json:"timeout"`
	// Body    string            `json:"body"`
}

The producer must generate HealthCheckReq events at specific intervals per monitor. There are two primary methods:

  1. Using Cronjob (robfig/cron) or golang’s time.Ticker internally
  2. Maintaining ticks state in a DB, periodically querying it when generating events

This is essentially about whether the producer is 1) stateful or 2) stateless.

Cronjob and *time.Ticker #

type Producer struct {
	redisClient *redis.Client
	dbClient    *sql.DB
	config      *config.Config

	monitorTickers map[string]*time.Ticker
	monitorMutex   sync.RWMutex
}

The producer can maintain a map of monitors and their associated tickers. Here’s a pseudocode example:

func (p *Producer) updateMonitorTickers(monitors models.MonitorSlice) {
	p.monitorMutex.Lock()
	defer p.monitorMutex.Unlock()

	// ...

	// Stop and remove tickers for monitors that no longer exist
	for id, ticker := range p.monitorTickers {
		exist := lo.ContainsBy(monitors, func(m *models.Monitor) bool {
			return m.ID == id
		})
		if !exist {
			ticker.Stop()
			delete(p.monitorTickers, id)
			log.Println("Removing monitor ticker for", id)
		}
	}

	// Update or add tickers for monitors
	for _, monitor := range monitors {
		if ticker, exists := p.monitorTickers[monitor.ID]; exists {
			// Update existing ticker's interval
			ticker.Reset(time.Duration(monitor.Interval) * time.Second)
			go p.startMonitorTicker(monitor, ticker)
			// TODO: need to stop original goroutines
		} else {
			// Create new ticker
			ctx, cancel := context.WithCancel(context.Background())
			p.monitorContexts[monitor.ID] = cancel

			ticker := time.NewTicker(time.Duration(monitor.Interval) * time.Second)
			p.monitorTickers[monitor.ID] = ticker

			log.Println("Starting monitor ticker for", monitor.URL)
			go p.startMonitorTicker(ctx, monitor, ticker)
		}
	}
}

This method reduces DB requests but increases code complexity for managing goroutines. Handling changes in monitors (e.g., additions, deletions, interval updates) requires careful synchronization. Moreover, running multiple parallel producers can result in duplicate events, necessitating state synchronization across servers.

Pull from DB #

CREATE TABLE ticks (
    id VARCHAR(255) PRIMARY KEY,
    monitor_id VARCHAR(255) NOT NULL,
    last_tick TIMESTAMP WITH TIME ZONE DEFAULT NULL,
    next_tick TIMESTAMP WITH TIME ZONE DEFAULT NULL,
    counter INT NOT NULL DEFAULT 0,
    updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,

    CONSTRAINT fk_monitor_id FOREIGN KEY (monitor_id) REFERENCES monitors (id)
        ON DELETE CASCADE ON UPDATE CASCADE
);

By specifying a tick table like this and recording the last_tick in the consumer after processing an event, with next_tick being the last_tick plus the interval, the server can issue events by periodically selecting records where now() > next_tick. This approach makes state management much simpler by storing it in the DB.

However, this method has a significant drawback: the number of DB requests increases dramatically. Additionally, the interval of the HealthCheck monitor cannot be shorter than the frequency of querying next_tick from the DB. This makes it challenging to set very short intervals, such as 1 second.

3) How to Scale Out Producers and Consumers #

Scaling out consumers is not a major issue since Redis Stream’s consumer groups handle this effectively. However, the case is different for producers.

Producers execute cron jobs with different intervals for each HealthCheck monitor. We need to ensure that 1) the same cron job is not executed by different producers simultaneously. Running the same cron job multiple times isn’t a critical issue for UptimeNinja, but it can lead to unintended resource wastage if accumulated. Additionally, 2) when the interval is changed or a new HealthCheck monitor is created, this should be reflected on the producer server.

flowchart LR Manager -->|Manages States| A[Producer A] Manager -->|Assigns Cronjob| A Manager -->|Add/Delete/Edit Cronjob| B[Producer B] Manager --> C[Producer C] A & B & C --> Redis[(Redis stream)] Redis --> Consumers[Consumer Groups] Consumers <-->|HTTP Request| Internet Consumers -->|Persist| RDB[(RDB)]

One straightforward method is to have the producer provide endpoints to export and edit its state, managed by an external manager.

flowchart LR A[Producer A] -->|Acquire Lock| Lock[(Cronjob RDB)] B[Producer B] --> Lock C[Producer C] --> Lock A & B & C -->|event| Redis[(Redis Stream)] Redis --> Consumers(Consumer Groups) Consumers <-.->|HTTP Request| Internet((Internet)) Consumers -->|Persist| RDB[(Log RDB)]

If creating a separate manager server is burdensome, a table in the RDB can be used to specify ownership of each cron job by producers. However, operating multiple parallel producers requires more careful consideration.

Additionally, graceful shutdown and restart of each producer are essential. While not a big issue for short intervals like 30 seconds or 1 minute, frequent restarts of the producer server could delay HealthChecks with longer intervals, such as 22 hours.

4) Why Not Use tsdb Instead of RDB? #

CREATE TABLE reports (
    id VARCHAR(255) PRIMARY KEY,
    monitor_id VARCHAR(255) NOT NULL,
    status_code INT NOT NULL,
    response_time INT NOT NULL,
    created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
    status VARCHAR(255) NOT NULL, -- "UP" or "DOWN"

    CONSTRAINT fk_monitor_id FOREIGN KEY (monitor_id) REFERENCES monitors (id)
        ON DELETE CASCADE ON UPDATE CASCADE
);

Assuming HealthCheck event information is stored in a table like this, the number of records will increase exponentially. Modern RDBs can handle heavy write workloads, but they have the drawback of decreasing performance as indexes grow. To address this, Time Series Databases (TSDBs) like InfluxDB have been developed. TSDBs solve the issues of RDBs and are optimized for write and query operations, making them suitable for storing immutable data like weather or stock prices. This aligns well with the UptimeNinja service.

However, for the same reason I didn’t choose Kafka, my lack of knowledge about TSDBs requires more consideration. Even if TSDBs aren’t used, I believe blindly storing all past events in an RDB is not the answer.

Conclustion #

These architectural considerations have been incredibly fascinating and continue to inspire the development of UptimeNinja.io. The process of seeking out technical challenges and solutions has been immensely enjoyable, and I kindly ask for your support and interest. Please keep watching UptimeNinja.io grow and evolve in the future.

Reference #