Authors:  Daniel Charles Mwangila and Prémices Kamasuwa

Introduction

In this article, we'll explore how to build a robust and flexible scheduling system by combining the strengths of Redis and Node.js. This approach not only replicates the functionality of traditional cron jobs but also enhances it by leveraging Redis to manage task scheduling through expiration events. Redis, known for its high-speed in-memory data storage, becomes an optimal solution for real-time applications that demand precise and efficient task execution.

Throughout this guide, we'll walk through the development of an event-driven scheduling system that utilizes Redis' key expiration feature to trigger tasks at predefined intervals. By the end, you will have a deeper understanding of how Redis can be harnessed for more complex real-time scenarios, enabling you to build systems that go beyond basic task scheduling.

Prerequisites

To follow along with this tutorial, ensure you have the following:

Step 1 — Setting Up Your Node.js Application

Let's start by setting up the Node.js environment for our project.

1.1. Create a New Project Directory

First, create a directory for your project and navigate into it:

mkdir redis-scheduler
cd redis-scheduler

1.2. Initialize a New Node.js Project

Initialize a new Node.js project using the following command:

npm init -y

This command creates a package.json file with default settings.

1.3. Install Necessary Node.js Packages

Install the required packages for this tutorial:

npm install redis dotenv date-fns

These packages include:

  • redis: For interacting with Redis.
  • dotenv: To manage environment variables.
  • date-fns: For date manipulation.

Next, install the development dependencies:

npm install @types/node ts-node typescript

These packages will help you work with TypeScript and Node.js more effectively.

1.4. Initialize a TypeScript Project

Initialize a TypeScript configuration file:

tsc --init

This command creates a tsconfig.json file in your project root, enabling TypeScript support.

1.5. Project Structure

To organize your code effectively, set up the following directory structure:

redis-scheduler/
│
├── src/
│   ├── interfaces/
│   │   └── Task.ts
│   ├── utils/
│   │   └── redisClient.ts
│   ├── handlers/
│   │   ├── classHandler.ts
│   │   └── functionHandler.ts
│   ├── scheduler.ts
│   └── listener.ts
│
├── .env
├── package.json
├── tsconfig.json
└── README.md

This structure helps keep your project organized, making it easier to maintain and extend in the future.

Step 2 — Implementing the Task Interface

Create an interface to define the structure of a task. This interface ensures that every task follows a consistent format, which is crucial for the scheduler to function correctly.

Create a new file src/interfaces/Task.ts:

export interface ExecutionPath {
    file_path: string;   // The path to the file containing the task's function
    class_name: string;  // The class name where the function resides
    function_name: string; // The specific function to execute
}

export interface Task {
    task_id: string;     // Unique identifier for the task
    title: string;       // Human-readable name for the task
    interval: string;    // The interval at which the task should run (e.g., '5m', '1h')
    last_run?: Date;     // The last time the task was executed
    next_run: Date;      // The next scheduled run time
    execution_path: ExecutionPath; // Details on where and what to execute
}

Explanation of Task Interface Fields:

  • task_id: A unique identifier for the task, ensuring that each task can be tracked separately.
  • title: A descriptive name for the task. Useful for logging and debugging.
  • interval: Specifies how frequently the task should run. This could be in seconds (s), minutes (m), hours (h), days (d), or weeks (w).
  • last_run: An optional field that records the last execution time, useful for monitoring.
  • next_run: The next time the task is scheduled to run.
  • execution_path: Specifies where to find the function to execute, including file path, class, and function names.

Step 3 — Implementing the Redis Client

To interact with Redis, we need to set up a client. This client will be used to set and get tasks and listen for expiration events.

Create a new file src/utils/redisClient.ts:

import { createClient } from "redis";
import dotenv from "dotenv";
dotenv.config();

// Retrieve Redis connection URI from the environment variables
console.log(process.env.REDIS_URI);
export const redisClient = async () => {
    const client = createClient({ url: process.env.REDIS_URI })
        .on('error', (err) => console.error(`Redis Client Error: ${err}`))
        .on('connect', () => console.info('Connected to Redis'))
        .on('ready', () => console.info('Redis is ready'));

    await client.connect();
    return client;
};

// Function to retrieve data from Redis
export const getFromRedis = async (key: string): Promise<{ [key: string]: any } | null> => {
    const client = await redisClient();
    try {
        const value = await client.get(key);
        if (value) return JSON.parse(value);
        return null;
    } catch (error) {
        console.error(`Error getting key: ${key} from Redis: ${error}`);
        return null;
    } finally {
        await client.disconnect();
    }
}

// Function to store data in Redis with an optional expiration time
export const setToRedis = async (key: string, value: string, expireIn?: number): Promise<void> => {
    const client = await redisClient();
    try {
        await client.set(key, value);
        if (expireIn) {
            await client.expire(key, expireIn);
        }
    } catch (error) {
        console.error(`Error setting key: ${key} to Redis: ${error}`);
    } finally {
        await client.disconnect();
    }
}

Explanation:

  • redisClient: Initializes a connection to Redis and manages connection events.
  • getFromRedis: Fetches a value from Redis by key and parses it from JSON.
  • setToRedis: Sets a value in Redis with an optional expiration time, which is crucial for scheduling tasks to run at specific intervals.

Step 4 — Implementing the Scheduler Class

The scheduler class will handle scheduling logic, including calculating the next run time and executing tasks.

Create a new file src/scheduler.ts:

import { addSeconds, addMinutes, addHours, addDays, addWeeks } from 'date-fns';
import path from 'path';
import { setToRedis, getFromRedis } from './utils/redisClient';
import { Task } from './interfaces/Task';

export class Scheduler {

    // Calculate the next run time based on the interval string
    private calculateNextRun(interval: string): { next_run: Date; interval: number } {
        const now = new Date();
        const match = interval.match(/^(\\d+)(s|m|h|d|w)$/);

        if (!match) {
            throw new Error('Invalid interval format');
        }

        const value = parseInt(match[1], 10);
        const unit = match[2];
        let next_run;
        const seconds_per_unit = { s: 1, m: 60, h: 3600, d: 86400, w: 604800 };

        switch (unit) {
            case 's': next_run = addSeconds(now, value); break;
            case 'm': next_run = addMinutes(now, value); break;
            case 'h': next_run = addHours(now, value); break;
            case 'd': next_run = addDays(now, value); break;
            case 'w': next_run = addWeeks(now, value); break;
            default: throw new Error('Unsupported time unit');
        }

        return {
            next_run: next_run,
            interval: value * seconds_per_unit[unit],
        };
    }

    // Update the next run time in Redis
    private async updateNextRun(task: Task) {
        const { next_run, interval } = this.calculateNextRun(task.interval);
        task.next_run = next_run;
        await setToRedis(`SCHEDS:${task.task_id}`, JSON.stringify(task), interval);
    }

    // Execute a function dynamically based on its path
    private async executeFunctionFromPath(task: Task) {
        const { file_path, class_name, function_name } = task.execution_path;
        try {
            const modulePath = path.resolve('./src/handlers', `${file_path}`);
            console.info(`Importing module from path: ${modulePath}`);
            const module = await import(modulePath);

            // Get the class and create an instance
            const instance = new module[class_name]();
            if (typeof instance[function_name] !== 'function') {
                throw new Error(`Function ${function_name} not found in class ${class_name}`);
            }

            // Execute the function
            console.info('Executing scheduled task function...');
            await instance[function_name]();
            console.info('Function executed successfully');
        } catch (error) {
            console.error('Error executing function from path:', error);
        }
    }

    // Main function to run the scheduler
    async runScheduler(scheds_key: string) {
        const task_id = scheds_key.split(':')[1];
        const task = await getFromRedis(`SCHEDS:${task_id}`) as Task;

        if (!task) {
            console.error(`Task with id: ${task_id} not found`);
        } else {
            await this.executeFunctionFromPath(task);
            await this.updateNextRun(task);
        }
    }
}

export const scheduler = new Scheduler();

Explanation:

  • calculateNextRun: Calculates the next run time based on the given interval (e.g., "5m" for 5 minutes). It also calculates the interval in seconds for Redis’ expire function.
  • updateNextRun: Updates the task's next run time in Redis to ensure it triggers again at the correct interval.
  • executeFunctionFromPath: Dynamically loads and executes a function from a specified file and class path, providing flexibility in what tasks can be scheduled.
  • runScheduler: The main function that orchestrates the retrieval and execution of scheduled tasks, then updates their next run time.

Step 5 — Implementing a Sample Task Handler

For demonstration purposes, let's create a simple handler that outputs "Hello from the handler" to the console after a 5-second delay.

Create a new file src/handlers/classHandler.ts:

Explanation:

  • HelloHandler: A class with an execute method that simulates a delayed task by waiting 5 seconds before logging a message to the console.

Step 6 — Implementing the Redis Listener

The Redis listener monitors key expiration events and triggers the scheduler when a key expires.

Create a new file src/listener.ts:

import { redisClient } from './utils/redisClient';
import { scheduler } from './scheduler';

(async () => {
    const client = await redisClient();

    // Enable keyspace notifications for expiration events
    client.configSet('notify-keyspace-events', 'Ex');

    const sub = client.duplicate();
    await sub.connect();
    const expired_subKey = '__keyevent@0__:expired';

    // Listen for expired events on all keys
    sub.pSubscribe(expired_subKey, async (key) => {
        console.info(`[i] Key expired: ${key}`);
        await scheduler.runScheduler(key);
    });

    console.info('Redis listener set up and waiting for events...');
})();

Explanation:

  • Redis Keyspace Notifications: Configures Redis to notify when keys expire. This is essential for our scheduling logic as Redis uses the EXPIRE command to manage key expiration.
  • Listener Setup: Subscribes to expiration events and triggers the runScheduler function, which handles executing the scheduled task.

Step 7 — Running and Testing the Scheduler

7.1. Configure Environment Variables

Ensure you have your Redis connection properly configured. Create a .env file in the project root if you haven't done so already, and add your Redis URI:

REDIS_URI=redis://localhost:6379

Make sure to replace localhost:6379 with the appropriate host and port if your Redis instance is hosted elsewhere.

7.2. Schedule a Task Using TypeScript Code

Instead of using the Redis CLI, we will use a TypeScript script to add a task to Redis. This approach provides more flexibility and demonstrates how to automate task scheduling programmatically.

Step-by-Step Guide to Scheduling a Task via TypeScript:

  1. Create a TypeScript Script to Add a Task:

Create a new file named src/scheduleTask.ts to automate the process of scheduling tasks in Redis:

import { v4 as uuidv4 } from 'uuid';
import { setToRedis } from './utils/redisClient';
import { Task } from './interfaces/Task';

// Function to schedule a new task
const scheduleTask = async () => {
    // Generate a unique UUID for the task
    const taskId = uuidv4();

    // Create a new task object
    const newTask: Task = {
        task_id: taskId,
        title: 'Hello World Task',
        interval: '5s',  // Adjust as needed for your testing
        next_run: new Date(),
        execution_path: {
            file_path: 'classHandler',  // This corresponds to src/handlers/classHandler.ts
            class_name: 'HelloHandler',
            function_name: 'execute',
        }
    };

    // Convert the task object to a JSON string
    const taskJSON = JSON.stringify(newTask);

    try {
        // Store the task in Redis without expiration
        await setToRedis(taskId, taskJSON);

        // Set a shadow key with expiration
        const intervalInSeconds = 5; // Example interval in seconds; modify as needed
        await setToRedis(`SCHEDS:${taskId}`, taskId, intervalInSeconds);

        console.info(`Task scheduled successfully with ID: ${taskId}`);
    } catch (error) {
        console.error('Error scheduling task:', error);
    }
};

// Execute the function to schedule the task
scheduleTask();

Explanation of the Script:

  • UUID Generation: We use the uuid package to generate a unique identifier (task_id) for each task, ensuring no two tasks have the same ID.
  • Task Object Creation: A Task object is created with all the necessary details, including the interval and execution path.
  • Storing the Task in Redis:
  • The task is stored under a key TASK:<uuid> without an expiration time.
  • A shadow key, SCHEDS:<uuid>, is created with an expiration time corresponding to the task's interval. This shadow key will expire and trigger the listener.
  • Expiration Handling: The intervalInSeconds specifies when the shadow key expires, causing Redis to emit an event that our listener can capture.
  1. Install the uuid Package:

If you haven't already installed the uuid package, you can add it to your project by running:

npm install uuid

Also, install the TypeScript types for uuid:

npm install @types/uuid --save-dev

  1. Run the TypeScript Script:

Execute the script to add the task to Redis:

ts-node src/scheduleTask.ts

7.3. Start the Redis Listener

With the task now scheduled, start the Redis listener in a separate terminal window to monitor for expiration events:

ts-node src/listener.ts

7.4. Observe the Output

After the interval specified in the shadow key (SCHEDS:<uuid>) expires, you should see the task execution output:

[i] Key expired: SCHEDS:<uuid>
Importing module from path: <path-to-classHandler>
Hello from the handler

This output confirms that the Redis listener detected the key expiration and successfully executed the task associated with that key.

Conclusion

Using a TypeScript script to add tasks to Redis programmatically gives you greater flexibility and control over task scheduling. This method demonstrates how to integrate Redis-based scheduling into your applications, leveraging UUIDs for unique task identification and shadow keys for triggering task execution. You can now expand on this foundation to build more complex scheduling systems or integrate it into a broader application framework.