Logging With Multiprocessing Pool

Mar 13, 2025

7 mins read

While debugging image caching in mikula I encountered a problem of adding logs to the part of the code that was executed as a multiprocessing pool task. The standard logging package in Python can handle multithreaded applications but requires special handlers to log across multiple processes or applications. Popular mechanisms for passing log messages between processes use Sockets, HTTP or multiprocessing queues. Examples of using these techniques can be found in the Logging Cookbook.

For my debugging I needed a simple and lightweight solution so I adapted the multiprocessing queue approach descriped in the corresponding section of the Cookbook. This post discusses a few tricks required to enable logging from the task run by a pool.

Multiprocessing Pool and Logging

The multiprocessing package contains functionality for creating and controlling Python processes at low level as well as high-level primitives like Pool for running parallel tasks. The Pool spawns a specified number of worker processes and distributes the tasks between them hiding all the implementation details from the user.

The main difference between using individual processes and the pool consists in the amount of control the programmer has over spawning and accessing the worker processes. Luckily, the Pool class has functionality for initialising each worker before running the tasks which we are going to utilize to create loggers and handlers.

The basic idea is the same as described in the Cookbook: each worker process creates a QueueHandler and adds it to the root logger. A separate Sink process creates a multiprocessing queue used for passing log messages from the workers, creates a handler of type RotatingFileHandler for aggregating the logs, then queries the queue for messages and passes them to its own handler. A special sentinel is put in the queue to signal the sink process that it should exit.

Log Sink

It is convenient to put all functionality of the log sink into one class. An instance of multiprocessing.Queue is created in the constructor. The two main methods of the Sink class are start and stop. The former creates and starts a process for pulling messages from the queue while the latter puts a sentinel in the queue which causes the process to exit and then joins it.

class Sink:
    class Sentinel:
        pass

    def __init__(self, **kwargs):
        self.kwargs = kwargs
        self.queue = Queue()
        self.process = None

    def _do_logging(self):
        root_logger = logging.getLogger()
        file_handler = logging.handlers.RotatingFileHandler(filename=self.kwargs["filename"],
                                                 mode=self.kwargs["mode"],
                                                 maxBytes=self.kwargs["maxBytes"],
                                                 backupCount=self.kwargs["backupCount"])
        formatter = logging.Formatter(fmt=self.kwargs["format"])
        file_handler.setFormatter(formatter)
        root_logger.addHandler(file_handler)

        while True:
            record = self.queue.get()
            if isinstance(record, self.Sentinel):
                break
            sink_logger = logging.getLogger(record.name)
            sink_logger.handle(record)

    def start(self):
        self.process = Process(target=self._do_logging)
        self.process.start()

    def stop(self):
        if self.process is None:
            return
        self.queue.put_nowait(self.Sentinel())
        self.process.join()

The __init__ method of the Sink class captures keyword arguments which are later used by the internal _do_logging method to create the RotatingFileHandler and add it to the root logger. It is important that these operations are done in this method as it will run in a separate process.

The nested empty class Sentinel is used instead of None as it is more explicit.

Loggers for worker processes

The initialisation code run by each worker is fairly straightforward:

def init_logger(log_queue, level=logging.DEBUG):
    handler = logging.handlers.QueueHandler(log_queue)
    root_logger = logging.getLogger()
    root_logger.addHandler(handler)
    root_logger.setLevel(level)

This function is passed to the Pool constructor together with a single initialization argument: the queue created by the instance of the Sink class. Note, that the arguments must be passed as a tuple even when a single argument is used.

    pool = Pool(processes=2, initializer=init_logger, initargs=(sink.queue,))

Logging

To log messages from the worker process, a logger is first chosen (or created) with the name corresponding the the current process name:

    process = current_process()
    task_logger = logging.getLogger(name=process.name)

This logger is then used as usual, for example:

    worker_logger.info("starting task")

Example

To illustrate the use of the logger we create a function for calculating the running mean of a given number of random integer values.

def worker_task(args):
    seed, count = args

    process = current_process()
    worker_logger = logging.getLogger(name=process.name)
    worker_logger.debug(f"Worker task started as process \"{process.name}\"; seed={seed}; count={count}")

    random.seed(seed)
    result = random.randint(0, 1000) / count
    for k in range(1, count):
        result += random.randint(0, 1000) / count

    worker_logger.info(f"Worker process \"{process.name}\" finished calculations, result={result:.3f}")
    return result

This function takes two parameters: (1) the seed for the random number generator, and (2) the number of values to average. The arguments are passed as tuple to simplify the caller code.

To use run this function using the pool of workers we need to create a log sink and a pool:

    sink = Sink(filename="multiprocessing_pool.log",
                mode="w",
                maxBytes=20000,
                backupCount=3,
                format="%(asctime)s %(processName)-10s %(name)s %(levelname)-8s %(message)s")
    pool = Pool(processes=2, initializer=init_logger, initargs=(sink.queue,))

The sink process is then started:

    sink.start()

The tasks are defined as a tuple, each element of which is a tuple containing two elements: the seed and the count.

    task_size = 1000
    tasks = ((3264328, task_size), (87529, task_size), (64209, task_size), (87529, task_size))

The Pool.imap method is then used to run the tasks:

    for r in pool.imap(worker_task, tasks):
        print(f"{r:.3f}")

The clean-up consists of stopping the sink process and terminating the pool:

    sink.stop()
    pool.close()
    pool.terminate()
    pool.join()

After running the code the multiprocessing_pool.log file is created in the working folder which contains something similar to this:

2025-03-13 20:10:04,969 SpawnPoolWorker-1 SpawnPoolWorker-1 DEBUG    Worker task started as process "SpawnPoolWorker-1"; seed=3264328; count=10000
2025-03-13 20:10:04,989 SpawnPoolWorker-1 SpawnPoolWorker-1 INFO     Worker process "SpawnPoolWorker-1" finished calculations, result=499.338
2025-03-13 20:10:04,978 SpawnPoolWorker-2 SpawnPoolWorker-2 DEBUG    Worker task started as process "SpawnPoolWorker-2"; seed=87529; count=10000
2025-03-13 20:10:04,989 SpawnPoolWorker-1 SpawnPoolWorker-1 DEBUG    Worker task started as process "SpawnPoolWorker-1"; seed=64209; count=10000
2025-03-13 20:10:04,997 SpawnPoolWorker-2 SpawnPoolWorker-2 INFO     Worker process "SpawnPoolWorker-2" finished calculations, result=500.582
2025-03-13 20:10:05,003 SpawnPoolWorker-1 SpawnPoolWorker-1 INFO     Worker process "SpawnPoolWorker-1" finished calculations, result=494.367
2025-03-13 20:10:04,998 SpawnPoolWorker-2 SpawnPoolWorker-2 DEBUG    Worker task started as process "SpawnPoolWorker-2"; seed=87529; count=10000
2025-03-13 20:10:05,007 SpawnPoolWorker-2 SpawnPoolWorker-2 INFO     Worker process "SpawnPoolWorker-2" finished calculations, result=500.582

The result of running each task is also printed in the console:

499.338
500.582
494.367
500.582

The second and the last values are the same since the seed values in the tasks are equal.

Improved Implementation

There are quite a lot of moving parts in this solution with some implementation details exposed that could be hidden from the user. A class that adds the logging functionality to the standard Pool would be a much better solution. It could be easily implemented if we require that the Pool could only be used as a context manager, which is a recommended anyway.

Here is a possible implementation:

class LoggingPool:
    def __init__(self,
                 processes=None,
                 initializer=None,
                 initargs=None,
                 maxtasksperchild=None,
                 level=logging.DEBUG,
                 *handler_args,
                 **handler_kwargs):
        self.level = level
        self.initializer = initializer
        self.sink = Sink(*handler_args, **handler_kwargs)
        pool_init_args = tuple() if initargs is None else initargs
        self.pool = Pool(processes=processes,
                         initializer=self._initialize,
                         initargs=pool_init_args,
                         maxtasksperchild=maxtasksperchild)

    def _initialize(self, *args):
        handler = logging.handlers.QueueHandler(self.sink.queue)
        root_logger = logging.getLogger()
        root_logger.addHandler(handler)
        root_logger.setLevel(self.level)
        if self.initializer is not None:
            self.initializer(*args)

    def __enter__(self):
        self.sink.start()
        return self.pool.__enter__()

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.sink.stop()
        self.pool.__exit__(exc_type, exc_val, exc_tb)

An instance of class multiprocessing.Pool is created in the constructor. The initializer code is moved into the internal member function _initialize which also supports calling the user initialisation code for the worker with the user-supplied parameters.

Starting and stopping of the sink process is done during entering and exiting the context. The __enter__ method returns the result of calling the corresponding method of the Pool class.

The LoggingPool is thus a drop-in replacement for the standard Pool class:

    with LoggingPool(processes=2,
                     filename="multiprocessing_pool.log") as pool:
            print(f"{r:.3f}")

Conlusion

The python logging package is amazingly flexible and as we saw could be adapted to work with multiprocessing pool with minimum changes to the user code.

The code is available from Github. If you found a bug, have questions or suggestions please use Github Issue Tracker or contact me directly.