Jak nad standardním logging postavit korelační ID, která se sama propíšou do všech vnořených logů přes ContextVar, hashují citlivé hodnoty — a jak bufferovat logy, dokud ještě neznáte ID tasku.

Problém

V asynchronní aplikaci běží desítky tasků naráz a jejich logy se v jednom streamu prolínají. Bez korelačního ID nepoznáte, které řádky patří k sobě:

level=INFO message=Task started.
level=INFO message=Downloading source...
level=INFO message=Task started.
level=ERROR message=Download failed.

Které „started" patří k tomu „failed"? Klasické řešení je protáhnout task_id parametrem skrz každou funkci, která loguje. To je otrava a stejně to nevydrží — jakmile log vznikne o tři vrstvy hlouběji v knihovně, kontext je pryč.

K tomu dva přidružené požadavky:

  • Citlivá ID nechci v logu plaintextem (userId, e-mail) — ale chci je dohledat. Potřebuju stabilní pseudonym, ne čitelnou hodnotu.
  • Korelační ID někdy ještě neznám, když chci začít logovat. Na startu tasku zaloguju „task started", ale samotné ID tasku se přiřadí až o pár řádků níž.

Všechno tohle jde vyřešit bez externí knihovny (structlog, loguru) — stačí contextvars a vlastní logging.Filter.

Markery, které se nesou samy

Jádro je ContextVar se slovníkem markerů a context manager, který do něj přidává:

from contextvars import ContextVar

markers_var: ContextVar[dict] = ContextVar("markers", default={})

@contextmanager
def logmarker(**kwargs):
    old_markers = markers_var.get()
    new_markers = hashed_markers(**kwargs)
    token = markers_var.set({**old_markers, **new_markers})
    try:
        yield
    finally:
        markers_var.reset(token)

ContextVar je asyncio-aware — hodnota se izoluje per-task a automaticky se nese skrz await. Vnořené logmarker() bloky se slévají, takže markery z vnějšího kontextu zůstávají dostupné uvnitř:

with logmarker(process="extract", task=task_id):
    logger.info("Task started.")        # markers=[process=... task=...]
    with logmarker(source=source_id):
        logger.info("Downloading...")   # markers=[process=... task=... source=...]

Hashování citlivých hodnot

Hodnoty markerů se neukládají syrově. Projdou přes SHA256 → base32 → prvních 8 znaků:

def short_hash(obj, length=8):
    data = stable_string(obj).encode("utf-8")
    digest = hashlib.sha256(data).digest()
    b32 = base64.b32encode(digest).decode("ascii").rstrip("=")
    return b32[:length]

def hashed_markers(**kwargs):
    return {k: short_hash(v) if not k.startswith("_") else v for k, v in kwargs.items()}

Hash je stabilní — stejný userId dá vždycky stejný 8znakový kód, takže napříč logy (i napříč službami) půjde grepnout, ale samotnou hodnotu z něj nikdo nezíská.

Konvence: klíče začínající podtržítkem se nehashují. Slouží pro meta/vizuální pole, která nejsou citlivá (_cls_name, _visual) — tam chcete čitelnou hodnotu.

stable_string ještě řeší serializaci čehokoliv: nejdřív zkusí json.dumps(sort_keys=True), a když to neprojde, spadne na pickle + base85. Tím se hashování nikdy nerozbije na neserializovatelném vstupu.

Filtr, který markery vstříkne do každého logu

Markery se do log záznamu dostanou přes vlastní logging.Filter, který sedí na handleru. Filtr ze slovníku poskládá marker_str a přilepí ho na record:

class MarkerFilter(logging.Filter):
    def filter(self, record):
        markers = markers_var.get()
        record.marker_str = " ".join(
            f"{k}={v}" if not k.startswith("_") else v
            for k, v in markers.items()
        ) if markers else ""
        ...
        return True

Formatter pak marker_str jen vypíše:

class HumanReadableFormatter(logging.Formatter):
    def format(self, record):
        markers = getattr(record, "marker_str", "")
        return (
            f"timestamp={self.formatTime(record)} "
            f"logger={record.name} level={record.levelname} "
            f"markers=[{markers}] message={record.getMessage()}"
        )

Důležité je, že se MarkerFilter registruje na root handleru — funguje tedy i pro logy z knihoven třetích stran, aniž by o markerech cokoliv věděly.

DelayedLogmarker — bufferování, než znáte ID

Teď to zajímavé. Co když chcete logovat dřív, než znáte marker? Typicky na startu tasku: chcete zalogovat průběh inicializace, ale ID tasku se ustaví až někde v jejím průběhu.

DelayedLogmarker to řeší tak, že logy během bloku nezahazuje ani nevypisuje — pozdrží je, a vypíše je zpětně, jakmile markery doplníte. Funguje to přes druhý ContextVar, který drží buffer:

buffer_var: ContextVar[list | None] = ContextVar("buffer_var", default=None)

A MarkerFilter má kvůli tomu druhou půlku, kterou jsem výš vynechal:

    def filter(self, record):
        markers = markers_var.get()
        record.marker_str = ...  # jako výše

        buffer_list = buffer_var.get()
        if buffer_list is None:
            return True                  # běžný režim — log projde
        else:
            buffer_list.append(
                lambda: logging.getLogger(record.name).handle(record)
            )
            return False                 # buffer režim — log se pozdrží

Klíčový trik: v buffer režimu filtr vrátí False (log se hned nevypíše) a místo toho si uloží closure record.handle do bufferu. Záznam je zachycený celý — až ho později pustíte, projde znovu celou cestou formatterem, takže dostane i markery, které v té chvíli ještě neexistovaly.

Použití:

with DelayedLogmarker(process="extract") as delayed:
    logger.info("Initializing task...")      # pozdrženo
    task_id = await create_task()
    delayed.set_markers_and_flush(task=task_id)
    # ↑ teď se "Initializing task..." vypíše — už s task=<hash>
    logger.info("Task ready.")               # vypíše se rovnou

Když set_markers_and_flush nezavoláte, dotáhne ho __exit__ automaticky — buffer se vždycky vyprázdní a žádný log se neztratí. Opakované volání naopak hlídá a vyhodí RuntimeError, aby nedošlo k dvojímu flushnutí.

Napojení na Sentry zadarmo

Protože markery žijí v ContextVar, jdou přečíst odkudkoliv — třeba v Sentry before_send hooku, který je promítne jako tagy:

def get_sentry_logmarkers(event, hint):
    markers = {f"logmarker_{k}": str(v) for k, v in markers_var.get().items()}
    event.setdefault("tags", {}).update(markers)
    return event

V Sentry pak filtrujete chyby podle logmarker_task stejným hashem, jaký vidíte v logu. Korelace mezi logem a chybou je tím hotová bez jediného řádku navíc v aplikačním kódu.

Obecnější pozorování

  • ContextVar je správný nástroj pro per-task kontext v asyncio. Thread-local by selhal — coroutiny sdílejí vlákno. ContextVar se izoluje per-task a nese se skrz await automaticky.
  • Registrace filtru na root handleru dává markery i logům z knihoven, které o systému nevědí. Žádné protahování parametrů.
  • Bufferování přes uloženou closure record.handle je elegantní: nemusíte řešit re-formátování, stačí pozdržet volání. Záznam si pamatuje všechno sám.
  • Stabilní hash jako pseudonym je levný kompromis mezi „nelogovat citlivé hodnoty vůbec" a „logovat je plaintextem". Dohledatelnost zůstává, hodnota mizí.
  • Cena za eleganci je dvojí význam jednoho filtruMarkerFilter zároveň vstřikuje markery i řídí bufferování. Bez znalosti DelayedLogmarker ten return False v filter() vypadá jako bug.