Mar 13, 2025
7 mins read
While debugging image caching in mikula I encountered a problem of adding logs to the part of the code that was executed as a multiprocessing pool task. The standard logging
package in Python can handle multithreaded applications but requires special handlers to log across multiple processes or applications. Popular mechanisms for passing log messages between processes use Sockets, HTTP or multiprocessing queues. Examples of using these techniques can be found in the Logging Cookbook.
For my debugging I needed a simple and lightweight solution so I adapted the multiprocessing queue approach descriped in the corresponding section of the Cookbook. This post discusses a few tricks required to enable logging from the task run by a pool.
The multiprocessing
package contains functionality for creating and controlling Python processes at low level as well as high-level primitives like Pool for running parallel tasks. The Pool spawns a specified number of worker processes and distributes the tasks between them hiding all the implementation details from the user.
The main difference between using individual processes and the pool consists in the amount of control the programmer has over spawning and accessing the worker processes. Luckily, the Pool class has functionality for initialising each worker before running the tasks which we are going to utilize to create loggers and handlers.
The basic idea is the same as described in the Cookbook: each worker process creates a QueueHandler
and adds it to the root logger. A separate Sink
process creates a multiprocessing queue used for passing log messages from the workers, creates a handler of type RotatingFileHandler
for aggregating the logs, then queries the queue for messages and passes them to its own handler. A special sentinel is put in the queue to signal the sink process that it should exit.
It is convenient to put all functionality of the log sink into one class. An instance of multiprocessing.Queue
is created in the constructor. The two main methods of the Sink
class are start
and stop
. The former creates and starts a process for pulling messages from the queue while the latter puts a sentinel in the queue which causes the process to exit and then joins it.
class Sink:
class Sentinel:
pass
def __init__(self, **kwargs):
self.kwargs = kwargs
self.queue = Queue()
self.process = None
def _do_logging(self):
root_logger = logging.getLogger()
file_handler = logging.handlers.RotatingFileHandler(filename=self.kwargs["filename"],
mode=self.kwargs["mode"],
maxBytes=self.kwargs["maxBytes"],
backupCount=self.kwargs["backupCount"])
formatter = logging.Formatter(fmt=self.kwargs["format"])
file_handler.setFormatter(formatter)
root_logger.addHandler(file_handler)
while True:
record = self.queue.get()
if isinstance(record, self.Sentinel):
break
sink_logger = logging.getLogger(record.name)
sink_logger.handle(record)
def start(self):
self.process = Process(target=self._do_logging)
self.process.start()
def stop(self):
if self.process is None:
return
self.queue.put_nowait(self.Sentinel())
self.process.join()
The __init__
method of the Sink
class captures keyword arguments which are later used by the internal _do_logging
method to create the RotatingFileHandler
and add it to the root logger. It is important that these operations are done in this method as it will run in a separate process.
The nested empty class Sentinel
is used instead of None
as it is more explicit.
The initialisation code run by each worker is fairly straightforward:
def init_logger(log_queue, level=logging.DEBUG):
handler = logging.handlers.QueueHandler(log_queue)
root_logger = logging.getLogger()
root_logger.addHandler(handler)
root_logger.setLevel(level)
This function is passed to the Pool
constructor together with a single initialization argument: the queue created by the instance of the Sink
class. Note, that the arguments must be passed as a tuple even when a single argument is used.
pool = Pool(processes=2, initializer=init_logger, initargs=(sink.queue,))
To log messages from the worker process, a logger is first chosen (or created) with the name corresponding the the current process name:
process = current_process()
task_logger = logging.getLogger(name=process.name)
This logger is then used as usual, for example:
worker_logger.info("starting task")
To illustrate the use of the logger we create a function for calculating the running mean of a given number of random integer values.
def worker_task(args):
seed, count = args
process = current_process()
worker_logger = logging.getLogger(name=process.name)
worker_logger.debug(f"Worker task started as process \"{process.name}\"; seed={seed}; count={count}")
random.seed(seed)
result = random.randint(0, 1000) / count
for k in range(1, count):
result += random.randint(0, 1000) / count
worker_logger.info(f"Worker process \"{process.name}\" finished calculations, result={result:.3f}")
return result
This function takes two parameters: (1) the seed for the random number generator, and (2) the number of values to average. The arguments are passed as tuple to simplify the caller code.
To use run this function using the pool of workers we need to create a log sink and a pool:
sink = Sink(filename="multiprocessing_pool.log",
mode="w",
maxBytes=20000,
backupCount=3,
format="%(asctime)s %(processName)-10s %(name)s %(levelname)-8s %(message)s")
pool = Pool(processes=2, initializer=init_logger, initargs=(sink.queue,))
The sink process is then started:
sink.start()
The tasks are defined as a tuple, each element of which is a tuple containing two elements: the seed and the count.
task_size = 1000
tasks = ((3264328, task_size), (87529, task_size), (64209, task_size), (87529, task_size))
The Pool.imap
method is then used to run the tasks:
for r in pool.imap(worker_task, tasks):
print(f"{r:.3f}")
The clean-up consists of stopping the sink process and terminating the pool:
sink.stop()
pool.close()
pool.terminate()
pool.join()
After running the code the multiprocessing_pool.log
file is created in the working folder which contains something similar to this:
2025-03-13 20:10:04,969 SpawnPoolWorker-1 SpawnPoolWorker-1 DEBUG Worker task started as process "SpawnPoolWorker-1"; seed=3264328; count=10000
2025-03-13 20:10:04,989 SpawnPoolWorker-1 SpawnPoolWorker-1 INFO Worker process "SpawnPoolWorker-1" finished calculations, result=499.338
2025-03-13 20:10:04,978 SpawnPoolWorker-2 SpawnPoolWorker-2 DEBUG Worker task started as process "SpawnPoolWorker-2"; seed=87529; count=10000
2025-03-13 20:10:04,989 SpawnPoolWorker-1 SpawnPoolWorker-1 DEBUG Worker task started as process "SpawnPoolWorker-1"; seed=64209; count=10000
2025-03-13 20:10:04,997 SpawnPoolWorker-2 SpawnPoolWorker-2 INFO Worker process "SpawnPoolWorker-2" finished calculations, result=500.582
2025-03-13 20:10:05,003 SpawnPoolWorker-1 SpawnPoolWorker-1 INFO Worker process "SpawnPoolWorker-1" finished calculations, result=494.367
2025-03-13 20:10:04,998 SpawnPoolWorker-2 SpawnPoolWorker-2 DEBUG Worker task started as process "SpawnPoolWorker-2"; seed=87529; count=10000
2025-03-13 20:10:05,007 SpawnPoolWorker-2 SpawnPoolWorker-2 INFO Worker process "SpawnPoolWorker-2" finished calculations, result=500.582
The result of running each task is also printed in the console:
499.338
500.582
494.367
500.582
The second and the last values are the same since the seed values in the tasks are equal.
There are quite a lot of moving parts in this solution with some implementation details exposed that could be hidden from the user. A class that adds the logging functionality to the standard Pool
would be a much better solution. It could be easily implemented if we require that the Pool could only be used as a context manager, which is a recommended anyway.
Here is a possible implementation:
class LoggingPool:
def __init__(self,
processes=None,
initializer=None,
initargs=None,
maxtasksperchild=None,
level=logging.DEBUG,
*handler_args,
**handler_kwargs):
self.level = level
self.initializer = initializer
self.sink = Sink(*handler_args, **handler_kwargs)
pool_init_args = tuple() if initargs is None else initargs
self.pool = Pool(processes=processes,
initializer=self._initialize,
initargs=pool_init_args,
maxtasksperchild=maxtasksperchild)
def _initialize(self, *args):
handler = logging.handlers.QueueHandler(self.sink.queue)
root_logger = logging.getLogger()
root_logger.addHandler(handler)
root_logger.setLevel(self.level)
if self.initializer is not None:
self.initializer(*args)
def __enter__(self):
self.sink.start()
return self.pool.__enter__()
def __exit__(self, exc_type, exc_val, exc_tb):
self.sink.stop()
self.pool.__exit__(exc_type, exc_val, exc_tb)
An instance of class multiprocessing.Pool
is created in the constructor. The initializer code is moved into the internal member function _initialize
which also supports calling the user initialisation code for the worker with the user-supplied parameters.
Starting and stopping of the sink process is done during entering and exiting the context. The __enter__
method returns the result of calling the corresponding method of the Pool
class.
The LoggingPool
is thus a drop-in replacement for the standard Pool
class:
with LoggingPool(processes=2,
filename="multiprocessing_pool.log") as pool:
print(f"{r:.3f}")
The python logging
package is amazingly flexible and as we saw could be adapted to work with multiprocessing pool with minimum changes to the user code.
The code is available from Github. If you found a bug, have questions or suggestions please use Github Issue Tracker or contact me directly.