A Coding Implementation on Loguru for Designing Robust, Structured, Concurrent, and Production-Ready Python Logging Pipelines
In this tutorial, we implement a practical use case with Loguru, a powerful, flexible, and production-ready logging library for Python. We start by building a clean, idempotent logging setup that can be safely rerun without duplicating handlers or producing messy output. From there, we move step by step through structured logging, contextual logging, custom log levels, global patching, callable formatters, and in-memory sinks. We also handle real-world logging needs such as rich exception traces, JSON log files, custom rotation, compression, retention, async logging, threaded execution, multiprocessing-safe logging, and standard logging module interception. By keeping everything in a Colab-ready workflow, we make it easy to test, inspect, and understand how Loguru can support debugging, monitoring, and observability in serious Python applications.
!pip install -q loguru nest_asyncio
import os, sys, time, json, glob, gzip, shutil, asyncio, logging, itertools, multiprocessing
from collections import deque
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from loguru import logger
try:
import nest_asyncio; nest_asyncio.apply()
except Exception as e:
print("nest_asyncio not applied:", e)
WORKDIR = "/content/loguru_demo" if os.path.isdir("/content") else "/tmp/loguru_demo"
os.makedirs(WORKDIR, exist_ok=True); os.chdir(WORKDIR)
for f in glob.glob("*"):
try: os.remove(f)
except OSError: pass
print(f"Working directory: {WORKDIR}\n")
RESULTS = []
def check(name, condition, detail=""):
ok = bool(condition); RESULTS.append((name, ok))
print(f" [{'PASS' if ok else 'FAIL'}] {name}" + (f" — {detail}" if detail else ""))
def banner(t): print(f"\n{'='*64}\n {t}\n{'='*64}")
_seq = itertools.count(1)
def global_patcher(record):
record["extra"].setdefault("env", "colab")
record["extra"]["seq"] = next(_seq)
_NOISE = {"env", "seq", "app"}
def console_formatter(record):
fmt = ("<green>{time:HH:mm:ss.SSS}</green> | <level>{level: <8}</level> | "
"<cyan>{name}:{function}:{line}</cyan> - <level>{message}</level>")
if any(k not in _NOISE for k in record["extra"]):
fmt += " | <yellow>{extra}</yellow>"
return fmt + "\n{exception}"We install Loguru and supporting dependencies, import all required libraries, and prepare a clean working directory for the tutorial. We also create a small verification helper to test each feature as the tutorial runs. We then define a global patcher and console formatter so that every log record carries useful metadata and appears in a readable format.
class MemorySink:
def __init__(self, capacity=2000): self.buffer = deque(maxlen=capacity)
def write(self, message): self.buffer.append(message.record)
def flush(self): pass
def has_level(self, name): return any(r["level"].name == name for r in self.buffer)
def find(self, pred): return [r for r in self.buffer if pred(r)]
MAX_BYTES = 1500
def size_rotation(message, file):
return file.tell() + len(message) > MAX_BYTES
def gzip_compression(filepath):
with open(filepath, "rb") as fi, gzip.open(filepath + ".gz", "wb") as fo:
shutil.copyfileobj(fi, fo)
os.remove(filepath)
def keep_latest_retention(files):
for old in sorted(files, key=os.path.getmtime, reverse=True)[3:]:
try: os.remove(old)
except OSError: pass
class InterceptHandler(logging.Handler):
def emit(self, record):
try: level = logger.level(record.levelname).name
except ValueError: level = record.levelno
frame, depth = logging.currentframe(), 2
while frame and frame.f_code.co_filename == logging.__file__:
frame, depth = frame.f_back, depth + 1
(logger.opt(depth=depth, exception=record.exc_info)
.bind(stdlib_logger=record.name)
.log(level, record.getMessage()))
def mp_worker(n):
logger.bind(child=os.getpid()).info("hello from child item {}", n)
return os.getpid()We create reusable logging components that make the tutorial more practical and production-like. We define an in-memory sink, custom file rotation, compression, and retention functions to control how logs are stored. We also built a standard logging interceptor and a multiprocessing worker to connect Loguru to external libraries and child processes.
banner("1) logger.configure(): handlers + custom level + extra + patcher")
mem = MemorySink()
logger.configure(
handlers=[
{"sink": sys.stderr, "format": console_formatter, "level": "DEBUG",
"colorize": True, "backtrace": True, "diagnose": True},
{"sink": mem, "level": "DEBUG", "format": "{message}"},
{"sink": "structured.jsonl", "serialize": True, "level": "DEBUG",
"enqueue": True},
{"sink": "errors.log", "level": "ERROR", "enqueue": True,
"backtrace": True, "diagnose": False,
"format": "{time:YYYY-MM-DD HH:mm:ss} | {level} | "
"{name}:{function}:{line} | {message}"},
],
levels=[{"name": "NOTICE", "no": 22, "color": "<blue><bold>", "icon": "
"}],
extra={"app": "loguru-advanced"},
patcher=global_patcher,
)
logger.debug("debug"); logger.info("info"); logger.success("SUCCESS level ships built-in")
logger.warning("warning"); logger.log("NOTICE", "custom level between INFO and SUCCESS")
banner("2) bind() / contextualize() / patch()")
logger.bind(user_id=42, request_id="abc-123").info("bound context")
with logger.contextualize(task="batch-job", run=7):
logger.info("inside contextualized block")
logger.patch(lambda r: r["extra"].update(epoch=round(time.time()))).info("per-call patched record")
banner("3) @logger.catch + context-manager form")
def inner(d): return d["a"] / d["b"]
def outer(d): return inner(d)
@logger.catch(reraise=False)
def compute(d): return outer(d)
compute({"a": 1, "b": 0})
with logger.catch(message="handled inside a with-block"):
raise ValueError("boom in block")
banner("4) opt(lazy=True), inline colors, record access")
logger.opt(lazy=True).debug("lazy sum = {}", lambda: sum(i*i for i in range(1_000_000)))
logger.opt(colors=True).info("inline <red>colors</red> <green>work</green>")
logger.opt(record=True).info("emitted from source line {record[line]}")We configure Loguru with multiple handlers, including console output, memory capture, JSON logging, and error logging. We then demonstrate structured logging with bound context, contextual blocks, patched records, and a custom log level. We also explore exception handling and useful opt() features such as lazy evaluation, inline colors, and record access.
banner("5) custom rotation/compression/retention (forces real rotation)")
ev_id = logger.add("events_{time:HHmmss_SSS}.log",
rotation=size_rotation, compression=gzip_compression,
retention=keep_latest_retention, enqueue=True, level="DEBUG",
format="{time:HH:mm:ss.SSS} | {level: <8} | {message}")
for i in range(80):
logger.bind(idx=i).debug("rotating event line number {}", i)
logger.complete(); logger.remove(ev_id)
print(f" archives created: {sorted(glob.glob('events_*.gz'))}")
banner("6a) ThreadPoolExecutor with per-thread contextualize()")
thread_caps = []
tid = logger.add(thread_caps.append, level="DEBUG", format="{message}",
filter=lambda r: "worker_id" in r["extra"])
def worker(n):
with logger.contextualize(worker_id=n):
logger.info("thread work item {}", n)
return n * n
with ThreadPoolExecutor(max_workers=8) as ex:
sq = list(ex.map(worker, range(8)))
logger.complete(); logger.remove(tid)
worker_ids = {m.record["extra"]["worker_id"] for m in thread_caps}
banner("6b) async coroutine sink + await logger.complete()")
async def run_async_demo():
sunk = []
async def async_sink(message):
await asyncio.sleep(0); sunk.append(message.record["message"])
sid = logger.add(async_sink, level="DEBUG", catch=True)
async def task(n):
with logger.contextualize(coro=n):
logger.info("async task {} start", n)
await asyncio.sleep(0.01)
logger.success("async task {} done", n)
await asyncio.gather(*(task(i) for i in range(5)))
await logger.complete()
logger.remove(sid)
return sunk
try:
async_msgs = asyncio.run(run_async_demo())
except RuntimeError:
async_msgs = asyncio.get_event_loop().run_until_complete(run_async_demo())
print(f" async sink received {len(async_msgs)} messages")We demonstrate custom file management by automatically rotating, compressing, and retaining log files. We then test thread-safe logging by running multiple workers, each with its own contextual metadata. We also add an asynchronous coroutine sink to see how Loguru handles async tasks and correctly drains pending logs.
banner("7) intercept stdlib `logging` and filter a chatty library")
logging.basicConfig(handlers=[InterceptHandler()], level=0, force=True)
lib_caps = []
def lib_filter(record):
if record["extra"].get("stdlib_logger") == "chatty":
return record["level"].no >= logger.level("WARNING").no
return True
lid = logger.add(lib_caps.append, level="DEBUG", format="{message}",
filter=lambda r: ("stdlib_logger" in r["extra"]) and lib_filter(r))
logging.getLogger("chatty").info("noisy info (should be filtered out)")
logging.getLogger("chatty").warning("noisy warning (kept)")
logging.getLogger("important").debug("important debug (kept)")
logger.complete(); logger.remove(lid)
banner("8) SELF-TESTS")
logger.complete(); time.sleep(0.2)
try:
rec = json.loads(open("structured.jsonl").read().splitlines()[-1])
check("JSON sink serializes records", {"text", "record"} <= set(rec))
except Exception as e:
check("JSON sink serializes records", False, str(e))
try:
err_txt = open("errors.log").read()
check("errors.log captured ZeroDivisionError", "ZeroDivisionError" in err_txt)
except Exception as e:
check("errors.log captured ZeroDivisionError", False, str(e))
check("custom rotation produced .gz archives", len(glob.glob("events_*.gz")) >= 1,
f"{len(glob.glob('events_*.gz'))} archive(s)")
check("custom NOTICE level recorded", mem.has_level("NOTICE"))
check("SUCCESS level recorded", mem.has_level("SUCCESS"))
check("bound user_id=42 present", bool(mem.find(lambda r: r["extra"].get("user_id") == 42)))
check("contextualize task present", bool(mem.find(lambda r: r["extra"].get("task") == "batch-job")))
check("global patcher stamped env", bool(mem.find(lambda r: r["extra"].get("env") == "colab")))
check("exception captured in a record",bool(mem.find(lambda r: r["exception"] is not None)))
check("threads logged all 8 workers", worker_ids == set(range(8)), str(sorted(worker_ids)))
check("async sink got 10 messages", len(async_msgs) == 10, f"{len(async_msgs)} msgs")
kept = {m.record["extra"]["stdlib_logger"] + ":" + m.record["level"].name for m in lib_caps}
check("library INFO filtered, rest kept",
("chatty:INFO" not in kept) and ("chatty:WARNING" in kept) and ("important:DEBUG" in kept),
str(sorted(kept)))We intercept Python’s built-in logging module and route standard library logs into Loguru. We apply source-aware filtering so that noisy logs from one library can be suppressed while important messages are still kept. We then run self-tests to verify JSON logging, error capture, archive creation, context propagation, exception records, threaded logs, async logs, and filtering behavior.
banner("9) throughput: enqueue=False vs enqueue=True")
def bench(enqueue, n=15000):
logger.remove()
sid = logger.add(lambda m: None, level="DEBUG", format="{message}", enqueue=enqueue)
t0 = time.perf_counter()
for i in range(n): logger.bind(i=i).debug("benchmark {}", i)
logger.complete(); dt = time.perf_counter() - t0
logger.remove(sid)
return n / dt
try:
sync_tput = bench(False); async_tput = bench(True)
print(f" direct : {sync_tput:,.0f} msg/s")
print(f" enqueue : {async_tput:,.0f} msg/s (non-blocking, process/thread-safe)")
check("benchmark completed", sync_tput > 0 and async_tput > 0)
except Exception as e:
check("benchmark completed", False, str(e))
banner("10) multiprocessing with enqueue=True (fork)")
try:
logger.remove()
mp_id = logger.add("mp.log", enqueue=True, level="DEBUG",
format="{extra[child]} | {message}")
ctx = multiprocessing.get_context("fork")
with ProcessPoolExecutor(max_workers=4, mp_context=ctx) as ex:
pids = list(ex.map(mp_worker, range(4)))
logger.complete(); logger.remove(mp_id); time.sleep(0.1)
lines = open("mp.log").read().splitlines()
check("multiprocessing logged from children", len(lines) >= 4,
f"{len(lines)} lines from {len(set(pids))} PIDs")
except Exception as e:
check("multiprocessing logged from children", False, f"unsupported here: {e}")
logger.remove()
logger.add(sys.stderr, level="INFO", colorize=True,
format="<green>{time:HH:mm:ss}</green> | <level>{level}</level> | {message}")
banner("RESULTS")
passed = sum(ok for _, ok in RESULTS)
for name, ok in RESULTS:
print(f" {'
' if ok else '
'} {name}")
print(f"\n {passed}/{len(RESULTS)} checks passed")
print(f" files: {sorted(glob.glob('*'))}")
(logger.success if passed == len(RESULTS) else logger.warning)(
"
Loguru tutorial complete!" if passed == len(RESULTS)
else f"
Completed with {len(RESULTS)-passed} failed check(s)"
)
We benchmark Loguru throughput by comparing direct logging with enqueue-based logging. We then test multiprocessing-safe logging by writing messages from child processes into a shared log file. Also, we clean up the logger, print all test results, list the generated files, and show whether the tutorial completes successfully.
In conclusion, we built a complete and robust logging system using Loguru that goes far beyond basic print-style debugging. We learned how to configure multiple sinks, capture structured JSON records, add contextual metadata, preserve useful exception information, manage rotating log files, filter noisy third-party libraries, and handle concurrent workloads across threads, async tasks, and processes. We also included self-verification checks and a small benchmark to confirm that the logging pipeline works correctly and to assess its performance behavior.
Check out the Full Codes here. Also, feel free to follow us on Twitter and don’t forget to join our 150k+ ML SubReddit and Subscribe to our Newsletter. Wait! are you on telegram? now you can join us on telegram as well.
Need to partner with us for promoting your GitHub Repo OR Hugging Face Page OR Product Release OR Webinar etc.? Connect with us
The post A Coding Implementation on Loguru for Designing Robust, Structured, Concurrent, and Production-Ready Python Logging Pipelines appeared first on MarkTechPost.
📌 Kaynak: MarkTechPost