Interaction with language model

This software extends markdown to enable interaction with a language model. The user exchanges information with the language model, which is turn is given information about the document that is being edited. This idea follows that concept of the notebook, where text and programmtic execution interact. The difference is that the programmatic execution takes place in the language model.

The exchange with the language model takes place through metadata blocks. Metadata blocks are a standard feature of much markdown, including Pandoc markdown and R markdown. However, the main use is providing a header to the document. In markdown, however, these blocks can be placed anywhere in the document. Internally, metadata blocks contain YAML specifications.

There are three main ways in which the interaction takes place. In chat mode, the user and the language model exchange messages within the metadata blocks. To initiate an exchange with the language model, the user writes a line in a metadata block starting with '?: ', or a YAML property 'query: ' containing the text for the chat. The language model replies within the block. Further messages marked with '+: ', or YAML property 'message: ', continue the chat (to initiate a new chat, one deletes the chat from the block or marks the new exchande with 'query: ' or '?: '). In this and all other modalities, the metadata fields written by the language model are prefixed by a '~'. Fields written by the language model that are not for modification by the user are prefixed by '~~'.

In edit mode, the user requests the language model to edit part of the text. To initiate an edit exchange, the user puts a request in a metadata block starting with '=: ', or a YAML property 'edit: '. The model responds by creating a new heading for the old text (if there is any), ###### old text, and one for the new text ###### new text, with the new or the edited text.

In batch mode, a whole markdown document is scanned by the program and edited by the model. Sepcific code may be developed to provide edits (we refer to this as a 'batch model'). The code saves the edited markdown, and the user can inspect or edit, if necessary, the output. Batch mode is thought to allow repeated scans of the document, allowing rounds of interaction with the user. In the RAG batch model, properties are added in the metadata blocks (such as the questions the text answers). The user can edit, add, replace these properties. At successive scans, the properties are inserted by the batch model whenever they are missing (new text, for example).

In chat and edit mode, what the user writes in the metadata block are prompts concerning the text the block annotates. In batch mode, the prompts are part of the batch model.

Generic scan module

Operations on markdown files to support LM markdown use. Here, scan checks that that markdown is well-formed, adds a header if missing, and returns a list of blocks with a header block first, or a list of blocks with error blocks for problems.

Main functions

scan: general checks on blocklist, mainly header markdown_scan: checks on markdown file (load) save_scan: saves markdown with timestamp verification

Behaviour

Functions in this module use the custom LoggerBase class from the lmm.utils.logging package for error handling. The logger is passed as the last argument to functions that require it. Errors are logged rather than raised, except for validation errors in markdown_scan and save_scan.

File size limits: markdown_scan accepts max_size_mb (default 50.0) and warn_size_mb (default 10.0) parameters. Files exceeding warn_size_mb trigger a warning, while files exceeding max_size_mb will not be loaded and an error is logged.

blocklist_scan(blocks, default_title='Title')

General check that the markdown is suitable for work, returning a list of blocks with a header block first.

Parameters:

Name Type Description Default
blocks list[Block]

the list of blocks to process.

required
default_title str

the default title to use when no title is found or when the title is "Title".

'Title'

Returns:

Type Description
list[Block]

the processed list of blocks. If the input contains only

list[Block]

ErrorBlocks, returns the list as-is without adding a header,

list[Block]

as ErrorBlocks signal that the block list is not valid.

Examples:

>>> from lmm.markdown.parse_markdown import parse_markdown_text
>>> blocks = parse_markdown_text("# My Document\n\nSome text")
>>> result = blocklist_scan(blocks)
>>> isinstance(result[0], HeaderBlock)
True
>>> result[0].content['title']
'My Document'
Source code in lmm/scan/scan.py
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
def blocklist_scan(blocks: list[Block], default_title: str = "Title") -> list[Block]:
    """General check that the markdown is suitable for work,
    returning a list of blocks with a header block first.

    Args:
        blocks: the list of blocks to process.
        default_title: the default title to use when no title is found
            or when the title is "Title".

    Returns:
        the processed list of blocks. If the input contains only
        ErrorBlocks, returns the list as-is without adding a header,
        as ErrorBlocks signal that the block list is not valid.

    Examples:
        ```
        >>> from lmm.markdown.parse_markdown import parse_markdown_text
        >>> blocks = parse_markdown_text("# My Document\\n\\nSome text")
        >>> result = blocklist_scan(blocks)
        >>> isinstance(result[0], HeaderBlock)
        True
        >>> result[0].content['title']
        'My Document'
        ```
    """

    if not blocks:  # Empty list
        return [HeaderBlock.from_default()]

    # Validate first block and ensure first block is header,
    # creating one if necessary
    match blocks[0]:
        case HeaderBlock() | MetadataBlock() as bl:
            if (
                'title' not in bl.content
                or bl.content['title'] == "Title"
            ):
                bl.content['title'] = default_title
                if not bl.comment:
                    bl.comment = "**Default title added**"
            # replace first with header
            blocks[0] = HeaderBlock._from_metadata_block(bl)
        case HeadingBlock() as bl:
            metadata: MetadataDict = {'title': bl.content}
            blocks.insert(
                0,
                HeaderBlock(
                    content=metadata,
                    comment="**Default header added**",
                ),
            )
        case TextBlock():
            metadata: MetadataDict = {'title': default_title}
            blocks.insert(
                0,
                HeaderBlock(
                    content=metadata,
                    comment="**Default header added**",
                ),
            )
        case ErrorBlock():
            pass

    return blocks

markdown_scan(sourcefile, save=True, *, max_size_mb=50.0, warn_size_mb=10.0, logger=logger)

General check that the markdown is suitable for work, returning a list of blocks with a header block first. When a title is missing, uses the filename stem as the default title.

Parameters:

Name Type Description Default
sourcefile str | Path

the file to load the markdown from

required
save bool | str | Path

if False, does not save; if True, saves back to original markdown file; if a filename, saves to file.

True
max_size_mb float

the max size, in MB, of the file to load

50.0
warn_size_mb float

the size of the input file that results in a warning

10.0
logger LoggerBase

a logger object (defaults to console logging)

logger

Returns:

Type Description
list[Block]

the processed list of blocks.

Note

if an error occurs and the blocklist becomes empty, it does not alter the source file.

Examples:

# Scan a markdown file and save changes. Timestamp added
blocks = markdown_scan("document.md", save=True)

# Scan without saving
blocks = markdown_scan("document.md", save=False)

# Scan and save to different file, timestamp added
blocks = markdown_scan("source.md", save="output.md")
Source code in lmm/scan/scan.py
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
@validate_call(config={'arbitrary_types_allowed': True})
def markdown_scan(
    sourcefile: str | Path,
    save: bool | str | Path = True,
    *,
    max_size_mb: float = 50.0,
    warn_size_mb: float = 10.0,
    logger: LoggerBase = logger,
) -> list[Block]:
    """General check that the markdown is suitable for work,
    returning a list of blocks with a header block first. When
    a title is missing, uses the filename stem as the default title.

    Args:
        sourcefile: the file to load the markdown from
        save: if False, does not save; if True, saves back to
            original markdown file; if a filename, saves to
            file.
        max_size_mb: the max size, in MB, of the file to load
        warn_size_mb: the size of the input file that results in
            a warning
        logger: a logger object (defaults to console logging)

    Returns:
        the processed list of blocks.

    Note:
        if an error occurs and the blocklist becomes empty,
        it does not alter the source file.

    Examples:
        ```python
        # Scan a markdown file and save changes. Timestamp added
        blocks = markdown_scan("document.md", save=True)

        # Scan without saving
        blocks = markdown_scan("document.md", save=False)

        # Scan and save to different file, timestamp added
        blocks = markdown_scan("source.md", save="output.md")
        ```
    """

    # Source validation
    source: Path | None = iou.validate_file(sourcefile, logger)
    if not source:
        return []
    # For type-checking
    source = Path(source)

    # load_blocks is guaranteed to return an empty list or a list
    # of blocks.
    blocks: list[Block] = mkd.load_blocks(
        source,
        max_size_mb=max_size_mb,
        warn_size_mb=warn_size_mb,
        logger=logger,
    )
    if not blocks:  # Empty list check
        logger.warning(f"No blocks found in file: {source}")
        return []
    if mkd.blocklist_haserrors(blocks):
        logger.warning(f"Errors found while scanning {source}")

    # Use blocklist_scan with filename stem as default title
    # This ensures missing titles are replaced with the filename
    blocks = blocklist_scan(blocks, default_title=source.stem)
    if not blocks:
        return []

    # Save and return
    match save:
        case False:
            pass
        case True:
            save_scan(source, blocks, logger=logger)
        case str() | Path():
            save_markdown(save, blocks, logger=logger)
        case _:  # ignore
            pass

    return blocks

save_scan(destfile, blocks, *, verify_unchanged=True, logger=logger)

Save blocks to markdown file with optional timestamp verification.

This function provides a safe save mechanism that can verify the file hasn't been modified since it was loaded, preventing accidental overwrites of concurrent changes. A timestamp is stored in the header metadata block using the key '~last_modified'.

Parameters:

Name Type Description Default
destfile str | Path

Destination file path (string or Path object)

required
blocks list[Block]

List of Block objects to save (must have HeaderBlock first)

required
verify_unchanged bool

If True, check timestamp to verify file hasn't changed since load. Defaults to True for safety.

True
logger LoggerBase

Logger object for error reporting

logger

Returns:

Type Description
bool

True if saved successfully, False otherwise

Examples:

>>> # Basic save to new file
>>> from lmm.scan.scan import markdown_scan, save_scan
>>> blocks = markdown_scan("test.md", save=False)
>>> save_scan("output.md", blocks)
True

>>> # Load, modify, and save with verification
>>> blocks = markdown_scan("test.md", save=False)
>>> blocks[0].content['author'] = 'New Author'
>>> save_scan("test.md", blocks, verify_unchanged=True)
True

>>> # Force save without verification
>>> save_scan("test.md", blocks, verify_unchanged=False)
True
Note
  • The timestamp is stored in blocks[0].content['~last_modified']
  • If verify_unchanged=True and timestamps don't match, returns False
  • Missing timestamps are handled gracefully (first save or legacy file)
  • Errors are logged through the logger object
Source code in lmm/scan/scan.py
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
@validate_call(config={'arbitrary_types_allowed': True})
def save_scan(
    destfile: str | Path,
    blocks: list[Block],
    *,
    verify_unchanged: bool = True,
    logger: LoggerBase = logger,
) -> bool:
    """
    Save blocks to markdown file with optional timestamp verification.

    This function provides a safe save mechanism that can verify the file
    hasn't been modified since it was loaded, preventing accidental overwrites
    of concurrent changes. A timestamp is stored in the header metadata block
    using the key '~last_modified'.

    Args:
        destfile: Destination file path (string or Path object)
        blocks: List of Block objects to save (must have HeaderBlock first)
        verify_unchanged: If True, check timestamp to verify file hasn't 
            changed since load. Defaults to True for safety.
        logger: Logger object for error reporting

    Returns:
        True if saved successfully, False otherwise

    Examples:
        ```
        >>> # Basic save to new file
        >>> from lmm.scan.scan import markdown_scan, save_scan
        >>> blocks = markdown_scan("test.md", save=False)
        >>> save_scan("output.md", blocks)
        True

        >>> # Load, modify, and save with verification
        >>> blocks = markdown_scan("test.md", save=False)
        >>> blocks[0].content['author'] = 'New Author'
        >>> save_scan("test.md", blocks, verify_unchanged=True)
        True

        >>> # Force save without verification
        >>> save_scan("test.md", blocks, verify_unchanged=False)
        True
        ```

    Note:
        - The timestamp is stored in blocks[0].content['~last_modified']
        - If verify_unchanged=True and timestamps don't match, returns False
        - Missing timestamps are handled gracefully (first save or legacy file)
        - Errors are logged through the logger object
    """
    # Validate inputs
    if not blocks:
        logger.error("Cannot save empty block list")
        return False

    if not isinstance(blocks[0], HeaderBlock):
        logger.error("First block must be a HeaderBlock")
        return False

    # Convert to Path
    dest_path = Path(destfile)

    # Verify timestamp if requested and file exists
    if verify_unchanged and dest_path.exists():
        try:
            # Load existing file to check timestamp
            existing_blocks = markdown_scan(dest_path, save=False, logger=logger)

            if not existing_blocks:
                logger.warning(
                    f"Could not load existing file {dest_path} for "
                    "timestamp verification, proceeding anyway"
                )
            else:
                # Get timestamps
                existing_timestamp: str = existing_blocks[0].content.get(LAST_MODIFIED_KEY) # type: ignore
                current_timestamp: str = blocks[0].content.get(LAST_MODIFIED_KEY) # type: ignore

                # Compare timestamps
                if existing_timestamp and current_timestamp:
                    if existing_timestamp != current_timestamp:
                        logger.warning(
                            f"File {dest_path} has been modified since load. "
                            f"Expected timestamp: {current_timestamp}, "
                            f"found: {existing_timestamp}. Save aborted."
                        )
                        return False
                elif existing_timestamp and not current_timestamp:
                    logger.info(
                        f"Blocks to save have no timestamp, but file {dest_path} "
                        "does. This may indicate the blocks were not loaded via "
                        "save_scan. Proceeding with save."
                    )
                # If neither has timestamp or only current has one, proceed

        except Exception as e:
            logger.error(f"Error during timestamp verification: {e}")
            return False

    # Update timestamp with current time
    blocks[0].content[LAST_MODIFIED_KEY] = datetime.now().isoformat()

    # Save using save_markdown
    success = save_markdown(dest_path, blocks, logger)

    if success:
        logger.info(f"Successfully saved {dest_path}")

    return success

scan(sourcefile, save=True, *, max_size_mb=50.0, warn_size_mb=10.0, logger=logger)

General check that the markdown is suitable for work. This is a wrapper around markdown_scan that catches exceptions and logs them, suitable for command-line interface use.

Parameters:

Name Type Description Default
sourcefile str | Path

the file to load the markdown from

required
save bool | str | Path

if False, does not save; if True, saves back to original markdown file; if a filename, saves to file.

True
max_size_mb float

the max size, in MB, of the file to load

50.0
warn_size_mb float

the size of the input file that results in a warning

10.0
logger LoggerBase

a logger object (defaults to console logging)

logger

Returns:

Type Description
None

None. Errors are logged instead of raised.

Source code in lmm/scan/scan.py
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
def scan(
    sourcefile: str | Path,
    save: bool | str | Path = True,
    *,
    max_size_mb: float = 50.0,
    warn_size_mb: float = 10.0,
    logger: LoggerBase = logger,
) -> None:
    """General check that the markdown is suitable for work.
    This is a wrapper around markdown_scan that catches exceptions and 
    logs them, suitable for command-line interface use.

    Args:
        sourcefile: the file to load the markdown from
        save: if False, does not save; if True, saves back to
            original markdown file; if a filename, saves to
            file.
        max_size_mb: the max size, in MB, of the file to load
        warn_size_mb: the size of the input file that results in
            a warning
        logger: a logger object (defaults to console logging)

    Returns:
        None. Errors are logged instead of raised.
    """

    try:
        markdown_scan(
            sourcefile,
            save,
            max_size_mb=max_size_mb,
            warn_size_mb=warn_size_mb,
            logger=logger,
        )
    except (OSError, ValueError, RuntimeError) as e:
        logger.error(f"Error scanning {sourcefile}: {e}")

Scan module for LLM interaction

Operations on markdown blocks to interface with language models.

Main functions

scan_messages: looks for queries, messages, and edit prompts, and passes them to the language model, allowing the interaction markdown_messages: applies scan_messages to file remove_messages: removes message content from metadata

blocklist_clear_messages(blocks, keys=None)

Remove language model interactions from metadata. If specific keys are specified, only remove those keys.

Parameters:

Name Type Description Default
blocks list[Block]

the block list to handle

required
keys opts

specify the keys to remove. Otherwise, will remove the keys used in message exchanges.

None
Source code in lmm/scan/scan_messages.py
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
def blocklist_clear_messages(
    blocks: list[Block], keys: list[str] | None = None
) -> list[Block]:
    """Remove language model interactions from metadata. If specific
    keys are specified, only remove those keys.

    Args:
        blocks: the block list to handle
        keys (opts): specify the keys to remove. Otherwise, will
            remove the keys used in message exchanges.
    """

    if keys is not None:
        return clear_metadata_properties(blocks, keys)

    blocklist: list[Block] = []
    for b in blocks:
        if isinstance(b, MetadataBlock):
            newb: MetadataBlock = b.deep_copy()
            kks: dict_keys[str, MetadataValue] = newb.content.keys()
            if QUERY_KEY in kks:
                newb.content.pop(QUERY_KEY)
            if MESSAGE_KEY in kks:
                newb.content.pop(MESSAGE_KEY)
            if EDIT_KEY in kks:
                newb.content.pop(EDIT_KEY)
            if CHAT_KEY in kks:
                newb.content.pop(CHAT_KEY)

            if len(newb.content) > 0 or bool(newb.private_):
                blocklist.append(newb)
        else:
            blocklist.append(b)

    return blocklist

blocklist_messages(blocks, logger=logger)

Carries out the interaction with the language model, returning a list of blocks with a header block first.

Parameters:

Name Type Description Default
blocks list[Block]

markdown blocks to process

required

Returns:

Type Description
list[Block]

the processed list of blocks.

Source code in lmm/scan/scan_messages.py
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
def blocklist_messages(
    blocks: list[Block], logger: LoggerBase = logger
) -> list[Block]:
    """
    Carries out the interaction with the language model,
    returning a list of blocks with a header block first.

    Args:
        blocks: markdown blocks to process

    Returns:
        the processed list of blocks.
    """

    if not blocks:
        return []

    blocks = blocklist_scan(blocks)
    if blocklist_haserrors(blocks):
        logger.warning("Problems in markdown, fix before continuing")
        return blocks

    root: MarkdownTree = blocks_to_tree(
        blocklist_copy(blocks), logger
    )
    if not root:
        return []

    processed_root: MarkdownNode = _process_chain(root, logger)
    return tree_to_blocks(processed_root)

markdown_clear_messages(sourcefile, keys=None, save=True, logger=logger)

Removes the messages from a markdown. If keys is specified, removes the metadata properties specified by keys.

Parameters:

Name Type Description Default
sourcefile str | Path

the file to load the markdown from

required
keys optional

the keys of messages or any property to remove

None
save bool | str | Path

if False, does not save; if True, saves back to original markdown file; if a filename, saves to file.

True
Note

if an error occurs and the blocklist becomes empty, it does not alter the source file.

Source code in lmm/scan/scan_messages.py
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
@validate_call(config={'arbitrary_types_allowed': True})
def markdown_clear_messages(
    sourcefile: str | Path,
    keys: list[str] | None = None,
    save: bool | str | Path = True,
    logger: LoggerBase = logger,
) -> list[Block]:
    """
    Removes the messages from a markdown. If keys is specified,
    removes the metadata properties specified by keys.

    Args:
        sourcefile: the file to load the markdown from
        keys (optional): the keys of messages or any property to
            remove
        save: if False, does not save; if True, saves back to
            original markdown file; if a filename, saves to
            file.

    Note:
        if an error occurs and the blocklist becomes empty,
        it does not alter the source file.
    """

    SAVE_FILE = False
    blocks: list[Block] = markdown_scan(
        sourcefile, SAVE_FILE, logger=logger
    )
    if not blocks:
        return []

    if blocklist_haserrors(blocks):
        save_markdown(sourcefile, blocks, logger)
        logger.warning("Problems in markdown, fix before continuing")
        return []

    blocks = blocklist_clear_messages(blocks, keys)

    match save:
        case False:
            pass
        case True:
            save_markdown(sourcefile, blocks, logger)
        case str() | Path():
            save_markdown(save, blocks, logger)
        case _:  # ignore
            pass

    return blocks

markdown_messages(sourcefile, save=True, *, max_size_mb=50.0, warn_size_mb=10.0, logger=logger)

Carries out the interaction with the language model, returning a list of blocks with a header block first.

Parameters:

Name Type Description Default
sourcefile str | Path

the file to load the markdown from

required
save bool | str | Path

if False, does not save; if True, saves back to original markdown file; if a filename, saves to file.

True
Note

if an error occurs and the blocklist becomes empty, it does not alter the source file.

Source code in lmm/scan/scan_messages.py
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
@validate_call(config={'arbitrary_types_allowed': True})
def markdown_messages(
    sourcefile: str | Path,
    save: bool | str | Path = True,
    *,
    max_size_mb: float = 50.0,
    warn_size_mb: float = 10.0,
    logger: LoggerBase = logger,
) -> list[Block]:
    """
    Carries out the interaction with the language model,
    returning a list of blocks with a header block first.

    Args:
        sourcefile: the file to load the markdown from
        save: if False, does not save; if True, saves back to
            original markdown file; if a filename, saves to
            file.

    Note:
        if an error occurs and the blocklist becomes empty,
        it does not alter the source file.
    """

    SAVE_FILE = False
    blocks: list[Block] = markdown_scan(
        sourcefile,
        SAVE_FILE,
        max_size_mb=max_size_mb,
        warn_size_mb=warn_size_mb,
        logger=logger,
    )
    if not blocks:
        return []
    if blocklist_haserrors(blocks):
        save_markdown(sourcefile, blocks, logger)
        logger.warning("Problems in markdown, fix before continuing")
        return []

    root: MarkdownTree = blocks_to_tree(
        blocklist_copy(blocks), logger
    )
    if not root:
        return []

    processed_root: MarkdownNode = _process_chain(root, logger)
    blocks = tree_to_blocks(processed_root)
    if not blocks:
        return []

    match save:
        case False:
            pass
        case True:
            save_markdown(sourcefile, blocks, logger)
        case str() | Path():
            save_markdown(save, blocks, logger)
        case _:  # ignore
            pass

    return blocks

scan_clear_messages(sourcefile, keys=None, save=True, logger=logger)

Removes the messages from a markdown. If keys is specified, removes the metadata properties specified by keys.

Parameters:

Name Type Description Default
sourcefile str | Path

the file to load the markdown from

required
keys optional

the keys of messages or any property to remove

None
save bool | str | Path

if False, does not save; if True, saves back to original markdown file; if a filename, saves to file.

True
Note

stub of markdown_clear_messages for interface build

Source code in lmm/scan/scan_messages.py
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
def scan_clear_messages(
    sourcefile: str | Path,
    keys: list[str] | None = None,
    save: bool | str | Path = True,
    logger: LoggerBase = logger,
) -> None:
    """
    Removes the messages from a markdown. If keys is specified,
    removes the metadata properties specified by keys.

    Args:
        sourcefile: the file to load the markdown from
        keys (optional): the keys of messages or any property to
            remove
        save: if False, does not save; if True, saves back to
            original markdown file; if a filename, saves to
            file.

    Note:
        stub of markdown_clear_messages for interface build
    """

    try:
        markdown_clear_messages(sourcefile, keys, save, logger)
    except Exception as e:
        logger.error(str(e))

scan_messages(sourcefile, save=True, *, max_size_mb=50.0, warn_size_mb=10.0, logger=logger)

Carries out the interaction with the language model, returning a list of blocks with a header block first.

Parameters:

Name Type Description Default
sourcefile str | Path

the file to load the markdown from

required
save bool | str | Path

if False, does not save; if True, saves back to original markdown file; if a filename, saves to file.

True
Note

stub of markdown_messages for interface build

Source code in lmm/scan/scan_messages.py
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
def scan_messages(
    sourcefile: str | Path,
    save: bool | str | Path = True,
    *,
    max_size_mb: float = 50.0,
    warn_size_mb: float = 10.0,
    logger: LoggerBase = logger,
) -> None:
    """
    Carries out the interaction with the language model,
    returning a list of blocks with a header block first.

    Args:
        sourcefile: the file to load the markdown from
        save: if False, does not save; if True, saves back to
            original markdown file; if a filename, saves to
            file.

    Note:
        stub of markdown_messages for interface build
    """

    try:
        markdown_messages(
            sourcefile,
            save,
            max_size_mb=max_size_mb,
            warn_size_mb=warn_size_mb,
            logger=logger,
        )
    except Exception as e:
        logger.error(str(e))

Scan module for RAG

Operations on markdown blocks to prepare it for RAG (Retrieval Augmented Generation) by enhancing it with metadata. This module uses functions to change the markdown document using its tree representation and higher-order traversal functions.

The operations that are supported by the module are

  1. Validating the markdown structure and ensuring a proper header block
  2. Adding unique IDs to blocks for tracking
  3. Building hierarchical titles for headings based on document structure
  4. Adding potential questions that sections of text answer using a language model
  5. Adding summaries to heading nodes based on their content using a language model
This functionality is implemented by the utility functions

add_titles_to_headings add_id_to_nodes add_questions add_summaries

The functions blocklist_rag and markdown_rag use these functions to carry out the operations as specified by an options record, ScanOpts. The advantage of gathering these functions together in a superordinate function is that this latter can be sure that the specifications are consistent, and the functions are used in the right order.

Main superordinate functions

blocklist_rag: processes a blocklist adding metadata annotations markdown_rag: applies blocklist_rag to file

Behaviour

markdown_rag raises validation errors if called with wrong types. All other functions report errors through logger (no side effect through raised exceptions).

ScanOpts

Bases: BaseModel

This options structure gathers the parameters for annotating the markdown (represented as a list of markdown blocks). All options default to no-op.

Options

titles: add hierarchical titles to heading blocks questions: add potential questions to blocks questions_threshold: min word count to trigger questions summaries: add content summaries to heading blocks summaries_threshold: min word count to trigger summaries textid: adds a text id to text blocks headingid: adds a heading id to headings textUUID: adds a UUID to text blocks headingUUID: adds a UUID to headings

Example of use
opts = ScanOpts(titles = True) # add titles
blocks = scan_rag(blocks, opts)
Source code in lmm/scan/scan_rag.py
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
class ScanOpts(BaseModel):
    """
    This options structure gathers the parameters for annotating
    the markdown (represented as a list of markdown blocks).
    All options default to no-op.

    Options:
        titles: add hierarchical titles to heading blocks
        questions: add potential questions to blocks
        questions_threshold: min word count to trigger questions
        summaries: add content summaries to heading blocks
        summaries_threshold: min word count to trigger summaries
        textid: adds a text id to text blocks
        headingid: adds a heading id to headings
        textUUID: adds a UUID to text blocks
        headingUUID: adds a UUID to headings

    Example of use:
        ```python
        opts = ScanOpts(titles = True) # add titles
        blocks = scan_rag(blocks, opts)
        ```
    """

    titles: bool = Field(
        default=False,
        description="Enable generation of hierarchical titles for "
        + "heading blocks based on document structure",
    )
    questions: bool = Field(
        default=False,
        description="Enable generation of potential questions that "
        + "text sections answer using language models",
    )
    questions_threshold: int = Field(
        default=15,
        gt=-1,
        description="Minimum word count threshold to trigger question"
        + " generation (ignored if questions=False)",
    )
    summaries: bool = Field(
        default=False,
        description="Enable generation of content summaries for "
        + "heading blocks using language models",
    )
    summary_threshold: int = Field(
        default=50,
        gt=-1,
        description="Minimum word count threshold to trigger summary "
        + "generation (ignored if summaries=False)",
    )
    remove_messages: bool = Field(
        default=False,
        description="Remove language model messages and metadata from"
        + " the processed document. Cleans up irrelevant metadata"
        + "created during interaction with the language model prior"
        + " to ingesting",
    )
    textid: bool = Field(
        default=False,
        description="Add unique text identifiers to text blocks for "
        + "tracking and reference in the vector database",
    )
    headingid: bool = Field(
        default=False,
        description="Add unique heading identifiers to heading blocks"
        + " for tracking and reference in the vector database",
    )
    textUUID: bool = Field(
        default=False,
        description="Add universally unique identifiers (UUIDs) to "
        + "text blocks for creation of id's in vector database",
    )
    headingUUID: bool = Field(
        default=False,
        description="Add universally unique identifiers (UUIDs) to "
        + "heading blocks for creation of group id's in vector database",
    )
    language_model_settings: (
        Settings | LanguageModelSettings | None
    ) = Field(
        default=None,
        description="A Settings object, a LanguageModelSettings "
        "object, or None. If provided, overrides settings in "
        "config.toml.",
    )

    model_config = ConfigDict(extra='forbid')

add_id_to_nodes(root_node, textid, headingid, base_hash, logger, *, filt_func=lambda _: True)

Add unique identifiers to text and heading blocks in a markdown tree. These identifiers may be used when ingesting the document, to create the id's used by the vector database, such that new versions of the same blocks are overwritten in the database.

This function traverses the markdown tree and assigns unique identifiers to TextBlock and HeadingBlock nodes. The identifiers are constructed using: 1. A base hash derived from the document's title or a provided base_hash 2. A sequential counter for text and heading blocks

The function adds two types of metadata identifiers: - 'textid': Unique identifier for TextBlock nodes - 'headingid': Unique identifier for HeadingBlock nodes

Parameters:

Name Type Description Default
root_node MarkdownNode

The root node of the markdown tree to process

required
textid bool

add id to text nodes (boolean)

required
headingid bool

add id to heading nodes (boolean)

required
base_hash str

A base hash to use for identifier generation. If not provided, a hash is generated from the root node's content (and will differ from that content).

required
logger LoggerBase

a logger object.

required
filt_func Callable[[MarkdownNode], bool]

a predicate to filter the nodes where the id should be added.

lambda _: True

Identifier Format: - For text blocks: "{base_hash}.{sequential_number}" Example: "abc123.1", "abc123.2" - For heading blocks: "{base_hash}.h{sequential_number}" Example: "abc123.h1", "abc123.h2"

Note
  • Identifiers are always added irrespective of whether they already exist in the node's metadata.
  • The base_hash function produces a hash derived from the docid, ensuring uniqueness across different documents while maintaining deterministic IDs for the same document content.
Source code in lmm/scan/scan_rag.py
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
def add_id_to_nodes(
    root_node: MarkdownNode,
    textid: bool,
    headingid: bool,
    base_hash: str | None,
    logger: LoggerBase,
    *,
    filt_func: Callable[[MarkdownNode], bool] = lambda _: True,
) -> None:
    """Add unique identifiers to text and heading blocks in a markdown
    tree. These identifiers may be used when ingesting the document,
    to create the id's used by the vector database, such that new
    versions of the same blocks are overwritten in the database.

    This function traverses the markdown tree and assigns unique
    identifiers to TextBlock and HeadingBlock nodes. The identifiers
    are constructed using:
    1. A base hash derived from the document's title or a provided
        base_hash
    2. A sequential counter for text and heading blocks

    The function adds two types of metadata identifiers:
    - 'textid': Unique identifier for TextBlock nodes
    - 'headingid': Unique identifier for HeadingBlock nodes

    Args:
        root_node (MarkdownNode): The root node of the markdown tree
            to process
        textid: add id to text nodes (boolean)
        headingid: add id to heading nodes (boolean)
        base_hash (str, optional): A base hash to use for identifier
            generation. If not provided, a hash is generated from the
            root node's content (and will differ from that content).
        logger: a logger object.
        filt_func: a predicate to filter the nodes where the id should
            be added.

    Identifier Format:
    - For text blocks: "{base_hash}.{sequential_number}"
      Example: "abc123.1", "abc123.2"
    - For heading blocks: "{base_hash}.h{sequential_number}"
      Example: "abc123.h1", "abc123.h2"

    Note:
        - Identifiers are _always_ added irrespective of whether they
            already exist in the node's metadata.
        - The base_hash function produces a hash derived from the docid,
            ensuring uniqueness across different documents while maintaining
            deterministic IDs for the same document content.
    """

    textid = bool(textid)
    headingid = bool(headingid)
    if not (textid or headingid):
        return

    if not base_hash:
        from lmm.utils.hash import base_hash as hash_func

        title: str = root_node.get_content()
        docid: str = root_node.get_metadata_string_for_key(  # type: ignore
            DOCID_KEY, title  # title is default if DOCID missing
        )
        # don't just use docid, make it same length and intuitively
        # not something to tamper with.
        base_hash = hash_func(docid)

    counter: dict[str, int] = {'text': 0, 'heading': 0}
    textkey = TEXTID_KEY
    headingkey = HEADINGID_KEY

    def _add_id(node: MarkdownNode) -> None:
        if not filt_func(node):
            return
        match node.block:
            case TextBlock() if textid:
                counter['text'] += 1
                node.metadata[textkey] = (
                    f"{base_hash}.{counter['text']}"
                )
            case HeadingBlock() if headingid:
                counter['heading'] += 1
                node.metadata[headingkey] = (
                    f"{base_hash}.h{counter['heading']}"
                )
            case _:
                pass

    pre_order_traversal(root_node, _add_id)

add_questions(root, opts, logger, *, filt_func=lambda _: True)

Add questions answered by text using a language model. Will not add questions to the header node, but to all heading nodes in the document.

Parameters:

Name Type Description Default
root MarkdownNode

a markdown node to start the traversal

required
opts ScanOpts

options defining thresholds for computing questions

required
logger LoggerBase

a logger object

required
filt_func Callable[[MarkdownNode], bool]

a predicated to fiter the heading nodes to add questions to.

lambda _: True

Returns: None.

Source code in lmm/scan/scan_rag.py
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
def add_questions(
    root: MarkdownNode,
    opts: ScanOpts,
    logger: LoggerBase,
    *,
    filt_func: Callable[[MarkdownNode], bool] = lambda _: True,
) -> None:
    """Add questions answered by text using a language model. Will
    not add questions to the header node, but to all heading nodes
    in the document.

    Args:
        root: a markdown node to start the traversal
        opts: options defining thresholds for computing questions
        logger: a logger object
        filt_func: a predicated to fiter the heading nodes to add
            questions to.

    Returns: None.
    """

    def llm_questions(text: str) -> str:
        if len(text.split()) < opts.questions_threshold:
            return ""
        response: str = ""
        try:
            kernel: RunnableType = create_runnable(
                "question_generator", opts.language_model_settings
            )
            response = kernel.invoke({'text': text})
        except ConnectionError:
            logger.error(
                "Could not connect to language models.\n"
                + "Check the internet connection."
            )
        except Exception as e:
            logger.error(
                "Error in using the language model to create "
                f"questions: {e}"
            )

        # replace numbers
        pattern = r"\s*\d+[.)]\s*"
        response = re.sub(pattern, "~_~", response)
        return " - ".join(response.split("~_~"))

    # do not call questions at header node.
    if root.is_header_node():
        for node in root.children:
            post_order_hashed_aggregation(
                node,
                llm_questions,
                QUESTIONS_KEY,
                filter_func=filt_func,
            )
    else:
        post_order_hashed_aggregation(
            root, llm_questions, QUESTIONS_KEY, filter_func=filt_func
        )

add_summaries(root, opts, logger, *, filt_func=lambda _: True)

Add summaries of text to metadata of headings using a language model.

Parameters:

Name Type Description Default
root MarkdownNode

a markdown node to start the traversal

required
opts ScanOpts

options defining thresholds for computing summaries

required
logger LoggerBase

a logger object

required
filt_func Callable[[MarkdownNode], bool]

a predicate function to filter the heading noted where a summary will be added.

lambda _: True
Source code in lmm/scan/scan_rag.py
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
def add_summaries(
    root: MarkdownNode,
    opts: ScanOpts,
    logger: LoggerBase,
    *,
    filt_func: Callable[[MarkdownNode], bool] = lambda _: True,
) -> None:
    """Add summaries of text to metadata of headings using a
    language model.

    Args:
        root: a markdown node to start the traversal
        opts: options defining thresholds for computing summaries
        logger: a logger object
        filt_func: a predicate function to filter the heading
            noted where a summary will be added.
    """

    def llm_add_summary(text: str) -> str:
        if len(text.split()) < opts.summary_threshold:
            return ""
        response: str = ""
        try:
            kernel: RunnableType = create_runnable(
                runnable_name="summarizer",
                user_settings=opts.language_model_settings,
            )
            response = kernel.invoke({'text': text})
        except ConnectionError:
            logger.error(
                "Could not connect to language models.\n"
                + "Check the internet connection."
            )
        except Exception as e:
            logger.error(
                "Error in using the language model to create "
                f"summaries: {e}"
            )

        return response

    post_order_hashed_aggregation(
        root, llm_add_summary, SUMMARY_KEY, filter_func=filt_func
    )

add_titles_to_headings(root, logger, *, key=TITLES_KEY, filt_func=lambda _: True)

Recursively add titles to heading blocks in a markdown tree.

This function maps nodes a markdown tree in a pre-order manner, collecting and concatenating the content of ancestor headings for each heading block. It adds a metadata field to HeadingBlock nodes, which represents the full hierarchical path of headings leading to that specific heading.

Parameters:

Name Type Description Default
root MarkdownNode

The root node of the markdown tree to process, or any other parent node

required
logger LoggerBase

a logger object

required
key str

the key under which the titles are added

TITLES_KEY
filt_func Callable[[MarkdownNode], bool]

A predicate the nodes must satisfy for titles to be added

lambda _: True
Note
  • Only non-empty heading contents are included in the titles
  • The titles are added to the key field in the node's metadata
Source code in lmm/scan/scan_rag.py
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
def add_titles_to_headings(
    root: MarkdownNode,
    logger: LoggerBase,
    *,
    key: str = TITLES_KEY,
    filt_func: Callable[[MarkdownNode], bool] = lambda _: True,
) -> None:
    """Recursively add titles to heading blocks in a markdown tree.

    This function maps nodes a markdown tree in a pre-order manner,
    collecting and concatenating the content of ancestor headings
    for each heading block. It adds a metadata field to HeadingBlock
    nodes, which represents the full hierarchical path of headings
    leading to that specific heading.

    Args:
        root: The root node of the markdown tree to process, or
            any other parent node
        logger: a logger object
        key: the key under which the titles are added
        filt_func: A predicate the nodes must satisfy for titles to
            be added

    Note:
        - Only non-empty heading contents are included in the titles
        - The titles are added to the key field in the node's metadata
    """

    def map_func(node: MarkdownNode) -> None:
        # recursively add content of headings to key in metadata
        if isinstance(node, HeadingNode):
            if not filt_func(node):
                return
            if node.parent:
                titles: str = str(
                    node.parent.get_metadata_for_key(key, "")
                )
            else:
                titles = ""
            title: str = node.get_content()
            node.set_metadata_for_key(
                key,
                titles
                + (" - " if titles else "")
                + (title if title else ""),
            )
        return

    pre_order_traversal(root, map_func)

blocklist_rag(blocks, opts=ScanOpts(), logger=_logger)

Prepares the blocklist structure for RAG (Retrieval Augmented Generation) by enhancing it with metadata.

Parameters:

Name Type Description Default
blocks list[Block]

a markdown block list

required
opts ScanOpts

a ScanOpts object

ScanOpts()
logger LoggerBase

a logger object (defaults to console logging)

_logger

Returns:

Type Description
list[Block]

list[Block]: List of enhanced markdown blocks, or empty list if processing fails

Note

The function adds several metadata fields to blocks:

  • docid: Unique document identifier
  • titles: Hierarchical heading path
  • textid/headingid: Unique block identifiers
  • questions: Potential questions answered by the text
  • summary: Content summaries for heading blocks

The function will return an empty list if the input block list contains error blocks. It will add a default header if the header is missing, and a docid field to the header if this is missing.

Reentrant Processing: Running blocklist_rag multiple times on the same blocks is idempotent if the content hasn't changed. IDs are always regenerated to ensure integrity, but summaries/questions are only recomputed if the text content has changed (detected via hash comparison). This ensures external systems never need to reference internal IDs, as the parsed markdown is only used to export IDs, never to look them up.

Example of use
opts = ScanOpts(titles = True) # add titles
blocks = blocklist_rag(blocks, opts)

# override language model from config.toml
opts = ScanOpts(
    questions = True,               # add questions
    language_model_settings = LanguageModelSettings(
        model = "OpenAI/gpt-4o"
    )
)
blocks = blocklist_rag(blocks, opts)
Source code in lmm/scan/scan_rag.py
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
def blocklist_rag(
    blocks: list[Block],
    opts: ScanOpts = ScanOpts(),
    logger: LoggerBase = _logger,
) -> list[Block]:
    """
    Prepares the blocklist structure for RAG (Retrieval Augmented
    Generation) by enhancing it with metadata.

    Args:
        blocks: a markdown block list
        opts: a ScanOpts object
        logger: a logger object (defaults to console logging)

    Returns:
        list[Block]: List of enhanced markdown blocks, or empty list
            if processing fails

    Note:
        The function adds several metadata fields to blocks:

        - docid: Unique document identifier
        - titles: Hierarchical heading path
        - textid/headingid: Unique block identifiers
        - questions: Potential questions answered by the text
        - summary: Content summaries for heading blocks

        The function will return an empty list if the input block
        list contains error blocks. It will add a default header
        if the header is missing, and a docid field to the header
        if this is missing.

        Reentrant Processing:
            Running blocklist_rag multiple times on the same blocks is
            idempotent if the content hasn't changed. IDs are always
            regenerated to ensure integrity, but summaries/questions are
            only recomputed if the text content has changed (detected via
            hash comparison). This ensures external systems never need to
            reference internal IDs, as the parsed markdown is only used
            to export IDs, never to look them up.

    Example of use:
        ```python
        opts = ScanOpts(titles = True) # add titles
        blocks = blocklist_rag(blocks, opts)

        # override language model from config.toml
        opts = ScanOpts(
            questions = True,               # add questions
            language_model_settings = LanguageModelSettings(
                model = "OpenAI/gpt-4o"
            )
        )
        blocks = blocklist_rag(blocks, opts)
        ```
    """

    # Validation
    build_titles = bool(opts.titles)
    build_questions = bool(opts.questions)
    build_summaries = bool(opts.summaries)
    build_textids = bool(opts.textid)
    build_headingids = bool(opts.headingid)
    build_textUUID = bool(opts.textUUID)
    build_headingUUID = bool(opts.headingUUID)
    if build_textUUID and (not build_textids):
        logger.info("bloclist_rag: text id's built to form UUID")
        build_textids = True
    if build_headingUUID and (not build_headingids):
        logger.info("blocklist_rag: heading id's built to form UUID")
        build_headingids = True

    if not (
        build_titles
        or build_questions
        or build_summaries
        or build_textids
        or build_headingids
        or build_textUUID
        or build_headingUUID
    ):
        logger.info("No RAG changes specified for document.")
        return blocks

    # Validate for lm markdown
    blocks = blocklist_scan(blocks)

    # Further document validation
    if not blocks:
        raise RuntimeError(
            "Unreachable code reached: scan function "
            + "should not return an empty list"
        )
    if len(blocks) == 1 and isinstance(blocks[0], ErrorBlock):
        logger.error("Load failed:\n" + str(blocks[0].get_content()))
        return []
    if blocklist_haserrors(blocks):
        # convert markdown errors into logger errors
        for b in blocklist_errors(blocks):
            logger.error(b.get_content())
        logger.error("Errors in markdown. Fix before continuing.")
        return []

    # Preproc text blocks prior to annotations
    blocks = blocklist_copy(blocks)

    # this removes metadata properties that are not relevant
    # to RAG, such as chats
    if opts.remove_messages:
        blocks = blocklist_clear_messages(blocks)

    # Process directives
    root: MarkdownTree = blocks_to_tree(blocks, logger)
    if not root:
        return []
    logger.info("Processing " + root.get_content())

    # add titles for internal use
    add_titles_to_headings(root, logger, key=TITLES_TEMP_KEY)

    # add docid. This should identify the document uniquely.
    # It may be provided by the human user too.
    if DOCID_KEY not in root.metadata:
        # generate a random string to form doc id
        root.metadata[DOCID_KEY] = generate_random_string()

    # Add titles to headings
    if build_titles:
        logger.info("Adding titles to heading metadata.")
        add_titles_to_headings(
            root, logger, key=TITLES_KEY, filt_func=_filt_func
        )

    # Add an id to all heading and text blocks
    add_id_to_nodes(
        root,
        build_textids,
        build_headingids,
        root.get_metadata_string_for_key(DOCID_KEY),
        logger,
        filt_func=_filt_func,
    )

    # Add UUID to text and heading nodes
    def add_headingUUID_func(node: MarkdownNode) -> None:
        if not _filt_func(node):
            return
        if node.has_metadata_key(UUID_KEY):
            return
        if isinstance(node, HeadingNode):
            uuid_base: str | None = node.get_metadata_string_for_key(
                HEADINGID_KEY
            )
            if uuid_base is not None:
                node.set_metadata_for_key(
                    UUID_KEY,
                    generate_uuid(uuid_base),
                )
            else:
                # should not happen given we have generated TXTID's
                logger.warning("Could not set uuid for object")

    if build_headingUUID:
        logger.info("Adding UUIDs to headings.")
        pre_order_traversal(root, add_headingUUID_func)

    def add_textUUID_func(node: MarkdownNode) -> None:
        if not _filt_func(node):
            return
        if node.has_metadata_key(UUID_KEY):
            return
        if isinstance(node, TextNode):
            uuid_base: str | None = node.get_metadata_string_for_key(
                TEXTID_KEY
            )
            if uuid_base is not None:
                node.set_metadata_for_key(
                    UUID_KEY,
                    generate_uuid(uuid_base),
                )
            else:
                # should not happen given we have generated TXTID's
                logger.warning("Could not set uuid for object")

    if build_textUUID:
        logger.info("Adding UUIDs to text blocks.")
        pre_order_traversal(root, add_textUUID_func)

    # Add source
    def add_source_func(node: MarkdownNode) -> None:
        if not _filt_func(node):
            return
        if isinstance(node, HeadingNode):
            if node.is_header_node():
                return
            source: str | None = node.fetch_metadata_string_for_key(
                DOCID_KEY
            )
            if source:
                node.metadata[SOURCE_KEY] = source

    pre_order_traversal(root, add_source_func)

    # Add a summary to heading nodes that is recomputed after changes
    if build_summaries:
        logger.info("Adding summaries about text.")
        add_summaries(root, opts, filt_func=_filt_func, logger=logger)

    # Add questions that the text answers, recomputed if text changes
    # (will use summaries if existing)
    if build_questions:
        logger.info("Adding questions about text.")
        add_questions(root, opts, filt_func=_filt_func, logger=logger)

    # check meta-data without text
    def _warn_empty_text(node: MarkdownNode) -> None:
        if node.is_header_node():
            pass
        elif isinstance(node, HeadingNode):
            if node.metadata:
                if len(node.get_text_children()) == 0:
                    node.metadata[WARNING_KEY] = (
                        "**Add text under this "
                        + "heading to avoid removal of "
                        + "metadata when ingesting**"
                    )
                elif WARNING_KEY in node.metadata:
                    node.metadata.pop(WARNING_KEY, "")
                else:
                    pass
        elif isinstance(node, TextNode):
            if node.metadata:
                if not node.get_content():
                    node.metadata[WARNING_KEY] = (
                        "**Add text under this "
                        + "metadata to avoid removal of "
                        + "metadata when ingesting**"
                    )
                elif WARNING_KEY in node.metadata:
                    node.metadata.pop(WARNING_KEY, "")
                else:
                    pass
        else:
            pass

    post_order_traversal(root, _warn_empty_text)

    # Re-create blocklist
    blocks = tree_to_blocks(root)

    # remove internal titles
    blocks = clear_metadata_properties(blocks, [TITLES_TEMP_KEY])

    return blocks

get_changed_titles(blocks, logger)

List the titles of all changed text. This is the headings that would be updated in a scan operation.

Parameters:

Name Type Description Default
blocks list[Block]

the block list to evaluate

required
logger LoggerBase

a logger object

required

Returns:

Type Description
list[str]

a list of strings containing the headings with

list[str]

changed content

Source code in lmm/scan/scan_rag.py
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
def get_changed_titles(
    blocks: list[Block], logger: LoggerBase
) -> list[str]:
    """List the titles of all changed text. This is the
    headings that would be updated in a scan operation.

    Args:
        blocks: the block list to evaluate
        logger: a logger object

    Returns:
        a list of strings containing the headings with
        changed content
    """
    from ..markdown.treeutils import (
        get_nodes_with_metadata,
        get_headingnodes,
    )
    from .scan_keys import TXTHASH_KEY
    from ..utils.logging import LoglistLogger

    internal_logger = LoglistLogger()

    blocklist: list[Block] = blocklist_copy(blocks)
    root: HeadingNode | None = blocks_to_tree(blocklist, logger)
    if root is None:
        return []

    TITLES_KEY = "~__TITLES__"
    OUTPUT_KEY = "~__OUTPUT__"

    # check there are any hashes
    nodes: list[HeadingNode] = get_nodes_with_metadata(
        root, TXTHASH_KEY, HeadingNode
    )
    if not nodes:
        logger.info(
            "No hashes in document (the document has "
            "not been scanned yet)."
        )
        return []

    # add titles to report
    add_titles_to_headings(root, internal_logger, key=TITLES_KEY)
    if internal_logger.count_logs(level=logging.ERROR):
        for log in internal_logger.get_logs(logging.ERROR):
            logger.error(log)
        return []

    # execute hashing. Create output first as otherwise will
    # be reformed irrespective of hash.
    def _add_metadata_func(n: MarkdownNode) -> None:
        n.metadata[OUTPUT_KEY] = "fixed"

    post_order_traversal(root, _add_metadata_func)
    post_order_hashed_aggregation(
        root,
        lambda _: "changed",
        OUTPUT_KEY,
        True,
        filter_func=_filt_func,
        logger=internal_logger,
    )
    if internal_logger.count_logs(level=logging.ERROR):
        for log in internal_logger.get_logs(logging.ERROR):
            logger.error(log)
        return []

    # get nodes when hash discrepancy led to recompute
    nodes: list[HeadingNode] = get_headingnodes(
        root,
        True,
        lambda n: n.get_metadata_string_for_key(OUTPUT_KEY, "")
        == "changed"
        and not n.is_header_node(),
    )

    titles: list[str] = []
    for n in nodes:
        title: str | None = n.get_metadata_string_for_key(TITLES_KEY)
        if title:
            titles.append(title)

    return titles

markdown_rag(sourcefile, opts=ScanOpts(), save=True, *, max_size_mb=50.0, warn_size_mb=10.0, logger=_logger)

Scans the markdown file and adds information required for the ingestion in the vector database.

opts defines what operations are conducted on the document, but if the header of the document contains an opts field, the specifications in the header are used.

Parameters:

Name Type Description Default
sourcefile str | Path

the file to load the markdown from

required
opts ScanOpts

a ScanOpts objects with the following options: titles (False) add hierarchical titles to headings questions (False) add questions to headings questions_threshold (15) ignored if questions == False summaries (False) add summaries to headings summary_threshold (50) ignored if summaries == False remove_messages (False) textid (False) add textid to text blocks headingid (False) add headingid to headings textUUID (False) add UUID to text blocks headingUUID (False) add UUID to heading blocks pool_threshold (0) pooling of text blocks

ScanOpts()
save bool | str | Path

if False, does not save; if True, saves back to original markdown file; if a filename, saves to file. Defaults to True.

True
max_size_mb float

the max size, in MB, of the file to load

50.0
warn_size_mb float

the size of the input file that results in a warning

10.0
logger LoggerBase

a logger object. Defaults to console logger.

_logger

Returns:

Type Description
list[Block]

a list of blocks, starting with a header block.

if an error occurs and the blocklist becomes empty,

it does not alter the source file.

Source code in lmm/scan/scan_rag.py
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
@validate_call(config={'arbitrary_types_allowed': True})
def markdown_rag(
    sourcefile: str | Path,
    opts: ScanOpts = ScanOpts(),
    save: bool | str | Path = True,
    *,
    max_size_mb: float = 50.0,
    warn_size_mb: float = 10.0,
    logger: LoggerBase = _logger,
) -> list[Block]:
    """
    Scans the markdown file and adds information required for the
    ingestion in the vector database.

    opts defines what operations are conducted on the document,
    but if the header of the document contains an opts field,
    the specifications in the header are used.

    Args:
        sourcefile: the file to load the markdown from
        opts: a ScanOpts objects with the following options:
            titles (False)    add hierarchical titles to headings
            questions (False) add questions to headings
            questions_threshold (15) ignored if questions == False
            summaries (False) add summaries to headings
            summary_threshold (50) ignored if summaries == False
            remove_messages (False)
            textid (False)    add textid to text blocks
            headingid (False) add headingid to headings
            textUUID (False)  add UUID to text blocks
            headingUUID (False) add UUID to heading blocks
            pool_threshold (0) pooling of text blocks
        save: if False, does not save; if True, saves back to
            original markdown file; if a filename, saves to
            file. Defaults to True.
        max_size_mb: the max size, in MB, of the file to load
        warn_size_mb: the size of the input file that results in
            a warning
        logger: a logger object. Defaults to console logger.

    Returns:
        a list of blocks, starting with a header block.

    Note: if an error occurs and the blocklist becomes empty,
        it does not alter the source file.
    """

    blocks: list[Block] = markdown_scan(
        sourcefile,
        False,
        max_size_mb=max_size_mb,
        warn_size_mb=warn_size_mb,
        logger=logger,
    )
    if not blocks:
        return []
    if blocklist_haserrors(blocks):
        save_markdown(sourcefile, blocks, logger)
        logger.warning("Problems in markdown, fix before continuing")
        return []

    # Take over options if specified in header. The isinstance check
    # will always be true since markdown_scan provides a default
    # header if it is missing, but we check for pyright's benefit
    if isinstance(blocks[0], HeaderBlock):
        header: HeaderBlock = blocks[0]
        options: dict[str, object] = header.get_key_type(
            OPTIONS_KEY, dict, {}
        )
        if bool(options):
            logger.info("Reading opts specifications from header")
            try:
                # types checked and coerced by the pydantic model
                opts = ScanOpts(**options)  # type: ignore
            except Exception as e:
                logger.error(f"Invalid scan specification:\n{e}")
                return []
    else:
        raise RuntimeError(
            "Unreachable code reached: header block missing"
        )

    blocks = blocklist_rag(blocks, opts, logger)
    if not blocks:
        return []

    match save:
        case False:
            pass
        case True:
            save_markdown(sourcefile, blocks, logger)
        case str() | Path():
            save_markdown(save, blocks, logger)

    return blocks

scan_rag(sourcefile, *, titles=False, questions=False, questions_threshold=15, summaries=False, summary_threshold=50, remove_messages=False, save=True, max_size_mb=50.0, warn_size_mb=10.0, logger=_logger)

Convenience wrapper around markdown_rag with individual parameters.

This function provides a flattened interface for markdown_rag,
accepting individual boolean parameters instead of a ScanOpts object.
Useful for command-line interfaces and simple scripts.

Args:
    sourcefile: the file to load the markdown from
  • titles (False) add hierarchical titles to headings questions (False) add questions to headings questions_threshold (15) ignored if questions == False summaries (False) add summaries to headings summary_threshold (50) ignored if summaries == False remove_messages (False) save: if False, does not save; if True, saves back to original markdown file; if a filename, saves to file. Defaults to True. max_size_mb: the max size, in MB, of the file to load warn_size_mb: the size of the input file that results in a warning logger: a logger object. Defaults to console logger.

    Returns: None

Source code in lmm/scan/scan_rag.py
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
def scan_rag(
    sourcefile: str | Path,
    *,
    titles: bool = False,
    questions: bool = False,
    questions_threshold: int = 15,
    summaries: bool = False,
    summary_threshold: int = 50,
    remove_messages: bool = False,
    save: bool | str | Path = True,
    max_size_mb: float = 50.0,
    warn_size_mb: float = 10.0,
    logger: LoggerBase = _logger,
) -> None:
    """Convenience wrapper around markdown_rag with individual parameters.

    This function provides a flattened interface for markdown_rag,
    accepting individual boolean parameters instead of a ScanOpts object.
    Useful for command-line interfaces and simple scripts.

    Args:
        sourcefile: the file to load the markdown from
-           titles (False)    add hierarchical titles to headings
        questions (False) add questions to headings
        questions_threshold (15) ignored if questions == False
        summaries (False) add summaries to headings
        summary_threshold (50) ignored if summaries == False
        remove_messages (False)
        save: if False, does not save; if True, saves back to
            original markdown file; if a filename, saves to
            file. Defaults to True.
        max_size_mb: the max size, in MB, of the file to load
        warn_size_mb: the size of the input file that results in
            a warning
        logger: a logger object. Defaults to console logger.

    Returns: None

    """

    try:
        opts = ScanOpts(
            titles=titles,
            questions=questions,
            questions_threshold=questions_threshold,
            summaries=summaries,
            summary_threshold=summary_threshold,
            remove_messages=remove_messages,
        )
        markdown_rag(
            sourcefile,
            opts,
            save,
            max_size_mb=max_size_mb,
            warn_size_mb=warn_size_mb,
            logger=logger,
        )
    except Exception as e:
        logger.error(str(e))

Scan module for block splitting

Splits a blocklist using a splitter.

This implementation uses langchain to split the text of text blocks. Metadata is inherited from the original block, except for textid and UUID fields.

metadata are not inherited from previous blocks or headings.

In general, text blocks will have been populated with metadata prior to calling this function.

Main classes

NullTextSplitter: a splitter that does not split (scan_split becomes a no-opt)

Main functions

scan_split take a blocklist and split the text blocks

NullTextSplitter

Bases: TextSplitter

A langchain text splitter that does not split

Source code in lmm/scan/scan_split.py
50
51
52
53
54
class NullTextSplitter(TextSplitter):
    """A langchain text splitter that does not split"""

    def split_text(self, text: str) -> list[str]:
        return [text]

blocks_to_splitted_blocks(blocks, text_splitter)

Transform a blocklist by applying text splitting to the text block prior to ingestion. Metadata are inherited from the original block.

Parameters:

Name Type Description Default
blocks list[Block]

a list of markdown blocks

required
text_splitter TextSplitter

a langchain text splitter

required

Returns:

Type Description
list[Block]

a list of markdown blocks

Source code in lmm/scan/scan_split.py
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
def blocks_to_splitted_blocks(
    blocks: list[Block], text_splitter: TextSplitter
) -> list[Block]:
    """Transform a blocklist by applying text splitting to the text
    block prior to ingestion. Metadata are inherited from the original
    block.

    Args:
        blocks: a list of markdown blocks
        text_splitter: a langchain text splitter

    Returns:
        a list of markdown blocks
    """

    if isinstance(text_splitter, NullTextSplitter):
        return blocks

    def _split_text_block(bl: TextBlock) -> list[TextBlock]:
        doc: Document = Document(
            page_content=bl.get_content(), metadata={}
        )
        docs = text_splitter.split_documents([doc])
        return [TextBlock(content=d.page_content) for d in docs]

    # split
    newblocks: list[Block] = []
    for b in blocks:
        if isinstance(b, TextBlock):
            splits = _split_text_block(b)
            if newblocks and isinstance(newblocks[-1], MetadataBlock):
                curmeta: MetadataBlock = newblocks[-1].deep_copy()
                newblocks.append(splits[0])
                # do not inherit textid's and UUID's
                curmeta.content.pop(TEXTID_KEY, "")
                curmeta.content.pop(UUID_KEY, "")
                for s in splits[1:]:
                    if curmeta.content:
                        newblocks.append(curmeta.deep_copy())
                    newblocks.append(s)
            else:
                newblocks.extend(splits)
        else:
            newblocks.append(b)
    return newblocks

markdown_split(sourcefile, save=False, text_splitter=defaultSplitter, logger=logger)

Interface to apply split to documents (interactive use)

Parameters:

Name Type Description Default
sourcefile str | Path

the file containing the markdown document to split

required
save bool | str | Path

a boolean value indicating whether the split document should be saved to disk

False
text_splitter opt

a langchain text splitter (defaults to a character text splitter, chunk size 1000, overlap 200). To switch off splitting, use NullTextSplitter

defaultSplitter
Note

if an error occurs and the blocklist becomes empty, it does not alter the source file.

Source code in lmm/scan/scan_split.py
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
@validate_call(config={'arbitrary_types_allowed': True})
def markdown_split(
    sourcefile: str | Path,
    save: bool | str | Path = False,
    text_splitter: TextSplitter = defaultSplitter,
    logger: LoggerBase = logger,
) -> list[Block]:
    """Interface to apply split to documents (interactive use)

    Args:
        sourcefile: the file containing the markdown document to split
        save: a boolean value indicating whether the split document
            should be saved to disk
        text_splitter (opt): a langchain text splitter
            (defaults to a character text splitter, chunk size
            1000, overlap 200). To switch off splitting, use
            NullTextSplitter

    Note:
        if an error occurs and the blocklist becomes empty,
        it does not alter the source file.
    """

    blocks = load_blocks(sourcefile, logger=logger)
    if not blocks:
        return []
    if blocklist_haserrors(blocks):
        save_blocks(sourcefile, blocks, logger)
        logger.warning("Problems in markdown, fix before continuing")
        return []

    blocks = scan_split(blocks, text_splitter)
    if not blocks:
        return []

    match save:
        case False:
            pass
        case True:
            save_blocks(sourcefile, blocks, logger)
        case str() | Path():
            save_blocks(save, blocks, logger)
        case _:  # ignore
            pass

    return blocks

scan_split(blocks, text_splitter=defaultSplitter)

Scan syntax for splitter

Parameters:

Name Type Description Default
blocks list[Block]

a list of markdown blocks

required
text_splitter opt

a langchain text splitter (defaults to a character text splitter, chunk size 1000, overlap 200). To switch off splitting, use NullTextSplitter

defaultSplitter

Returns:

Type Description
list[Block]

a list of markdown blocks

Source code in lmm/scan/scan_split.py
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
def scan_split(
    blocks: list[Block], text_splitter: TextSplitter = defaultSplitter
) -> list[Block]:
    """Scan syntax for splitter

    Args:
        blocks: a list of markdown blocks
        text_splitter (opt): a langchain text splitter
            (defaults to a character text splitter, chunk size
            1000, overlap 200). To switch off splitting, use
            NullTextSplitter

    Returns:
        a list of markdown blocks
    """
    return blocks_to_splitted_blocks(blocks, text_splitter)

Scan module for Chunks

Converts a list of markdown blocks into a list of Chunk objects, which include all the information for being ingested into a vector database. The list of markdown blocks will have been preprocessed as necessary, i.e. split into smaller text blocks, endowed with metadata and an uuid identification code.

When using a vector database to store information, data may be used to obtain embeddings (the semantic representation of the content, which the database uses to identify text from a query based on similarity), and to select parts of the information that is stored in the database and retrieved when records are selected. These two sets of information are often the same, but they not need be. The Chunk class and its member methods collects and organizes this information. It constitutes a framework-neutral replacement for the Document class, commonly used by frameworks in RAG applications.

Embeddings are increasingly supported in a variety of configurations. Besides the data selected for storage, portion of data may be selected to compute the embeddings. This module defines an encoding model to map the data selected for embedding to the embedding type supported by the database engine. In what follows, the metadata properties used to generate embeddings are called 'annotations', to distinguish them from other properties (among others, metadata properties used for housekeeping purposes).

The annotation model not only specifies what metadata properties are included in the embedding, but also whether to look for them in the ancestors of the markdown text, represented as a hierachical tree where headings are the nodes in the hierarchy. The encoding model further specifies how the annotations are used in dense and sparse encodings.

Example:

from lmm.markdown.parse_markdown import (
    blocklist_haserrors,
)
from lmm.scan.scan import markdown_scan
from lmm.scan.scan_rag import scan_rag, ScanOpts
from lmm.scan.scan_keys import TITLES_KEY
from lmm.utils.logging import LoglistLogger
from lmm_education.config.config import (
    AnnotationModel,
    EncodingModel,
)
from lmm_education.stores.chunks import blocks_to_chunks

logger = LoglistLogger()

# the starting point is a list of blocks, such as one originated
# from parsing a markdown file
blocks = markdown_scan("mymarkdown.md")
if blocklist_haserrors(blocks):
    raise ValueError("Errors in  markdown")

# add metadata for annotations (here titles)
blocks = scan_rag(blocks, ScanOpts(titles=True), logger)
if logger.count_logs(level=logging.ERROR) > 0:
    raise ValueError("
".join(logger.get_logs(logging.ERROR)))

# transform to chunks specifying titles for annotations
encoding_model = EncodingModel.SPARSE_CONTENT
chunks = blocks_to_chunks(
    blocks,
    annotation_model=AnnotationModel(
        inherited_properties=[TITLES_KEY]
    ),
    encoding_model=encoding_model,
    logger=logger,
)

# now chunks can be ingested
from lmm_education.stores.vector_store_qdrant import (
    upload,
    client_from_config,
    encoding_to_qdrantembedding_model as to_embedding_model,
)
from lmm_education.config.config import (
    ConfigSettings,
    LocalStorage,
)

settings = ConfigSettings(
    storage=LocalStorage(folder="./test_storage")
)
points = upload(
    client=client_from_config(settings, logger),
    collection_name="documents",
    model=to_embedding_model(encoding_model),
    chunks=chunks,
    logger=logger,
)

if logger.count_logs(level=logging.ERROR) > 0:
    raise ValueError("Could not ingest blocks")
Note

no embedding is computed here; this is done by the upload function in the example above.

Responsibilities

define encoding models complement metadata of headings required by encoding and ingestion (such as titles) implement the encoding model when transforming blocks to chunks (collect the adequate information in dense_encoding and sparse_encoding)

Main functions

blocks_to_chunks: list of blocks to list of chunks chunks_to_blocks: the inverse transformation (for inspection and verification).

Behaviour

Functions in this module generally use a logger argument for error reporting, but may raise standard exceptions (e.g. ValueError) for invalid configurations.

reviewed a 24.10.2025

AnnotationModel

Bases: BaseModel

Specifies what metadata properties are selected to form annotations and how. Also selects properties to be indexed for filtering.

The AnnotationModel is meant to allow users to add annotations to an encoding model by specifying them in the config file. Note that an annotation model is implicit when using scan_rag to generate metadata properties such as questions, etc.

Attributes:

Name Type Description
inherited_properties list[str]

properties are sought among ancestors.

own_properties list[str]

limit propertis to those owned by node.

filters list[str]

properties that should be indexed to allow filter searches.

Source code in lmm/scan/chunks.py
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
class AnnotationModel(BaseModel):
    """
    Specifies what metadata properties are selected to form
    annotations and how. Also selects properties to be indexed
    for filtering.

    The AnnotationModel is meant to allow users to add annotations
    to an encoding model by specifying them in the config file.
    Note that an annotation model is implicit when using scan_rag
    to generate metadata properties such as questions, etc.

    Attributes:
        inherited_properties: properties are sought among ancestors.
        own_properties: limit propertis to those owned by node.
        filters: properties that should be indexed to allow filter
            searches.
    """

    inherited_properties: list[str] = Field(
        default=[],
        description="Metadata properties inherited from ancestors",
    )
    own_properties: list[str] = Field(
        default=[],
        description="Metadata properties of the node",
    )
    filters: list[str] = Field(
        default=[],
        description="Metadata properties to be indexed for filtering",
    )

    def add_inherited_properties(
        self, props: str | list[str]
    ) -> None:
        """Add properties to the list of inherited properties."""
        if isinstance(props, str):
            props = [props]
        for p in props:
            if p not in self.inherited_properties:
                self.inherited_properties.append(p)

    def add_own_properties(self, props: str | list[str]) -> None:
        """Add properties to the list of own properties."""
        if isinstance(props, str):
            props = [props]
        for p in props:
            if p not in self.own_properties:
                self.own_properties.append(p)

    def has_property(self, prop: str) -> bool:
        """Check if a property is in the model."""
        return (
            prop in self.inherited_properties
            or prop in self.own_properties
        )

    def has_properties(self) -> bool:
        """Check if the model has any properties."""
        return (
            len(self.inherited_properties) > 0
            or len(self.own_properties) > 0
        )

add_inherited_properties(props)

Add properties to the list of inherited properties.

Source code in lmm/scan/chunks.py
255
256
257
258
259
260
261
262
263
def add_inherited_properties(
    self, props: str | list[str]
) -> None:
    """Add properties to the list of inherited properties."""
    if isinstance(props, str):
        props = [props]
    for p in props:
        if p not in self.inherited_properties:
            self.inherited_properties.append(p)

add_own_properties(props)

Add properties to the list of own properties.

Source code in lmm/scan/chunks.py
265
266
267
268
269
270
271
def add_own_properties(self, props: str | list[str]) -> None:
    """Add properties to the list of own properties."""
    if isinstance(props, str):
        props = [props]
    for p in props:
        if p not in self.own_properties:
            self.own_properties.append(p)

has_properties()

Check if the model has any properties.

Source code in lmm/scan/chunks.py
280
281
282
283
284
285
def has_properties(self) -> bool:
    """Check if the model has any properties."""
    return (
        len(self.inherited_properties) > 0
        or len(self.own_properties) > 0
    )

has_property(prop)

Check if a property is in the model.

Source code in lmm/scan/chunks.py
273
274
275
276
277
278
def has_property(self, prop: str) -> bool:
    """Check if a property is in the model."""
    return (
        prop in self.inherited_properties
        or prop in self.own_properties
    )

Chunk

Bases: BaseModel

Class for storing a piece of text and associated metadata, with an additional uuid field for its identification in the database. Each instance of this class becomes a record or 'point' in the database.

The fields content and metadata contain information that will be stored in the database. The field content is meant to contain the text. The field metadata contains an associative array. (In some databases, there is no difference in the way material is stored, i.e. text is one field among many possible others; the distinction is present in many frameworks, however).

The field annotations contains concatenated metadata strings that, depending on the encoding model, may end up in the sparse or in the dense encoding.

The fields dense_encoding and sparse_encoding contain the text that is used for embedding using the respective approaches.

The uuid field contains the id of the database record.

Source code in lmm/scan/chunks.py
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
class Chunk(BaseModel):
    """
    Class for storing a piece of text and associated metadata, with
    an additional uuid field for its identification in the database.
    Each instance of this class becomes a record or 'point' in the
    database.

    The fields `content` and `metadata` contain information that will
    be stored in the database. The field `content` is meant to contain
    the text. The field `metadata` contains an associative array. (In
    some databases, there is no difference in the way material is
    stored, i.e. text is one field among many possible others; the
    distinction is present in many frameworks, however).

    The field `annotations` contains concatenated metadata strings
    that, depending on the encoding model, may end up in the sparse
    or in the dense encoding.

    The fields `dense_encoding` and `sparse_encoding` contain the text
    that is used for embedding using the respective approaches.

    The `uuid` field contains the id of the database record.
    """

    content: str = Field(
        description="The textual content for storage in the database"
        + " in the content field of the payload"
    )
    metadata: MetadataDict = Field(
        default={},
        description="Metadata of the original text block"
        + " for storage in the database payload as fields",
    )
    annotations: str = Field(
        default="",
        description="Selected parts of the metadata that may be used "
        + "for encoding",
    )
    dense_encoding: str = Field(
        default="",
        description="The content selected for dense encoding",
    )
    sparse_encoding: str = Field(
        default="",
        description="The content selected for sparse encoding",
    )
    uuid: str = Field(
        default="",
        description="Identification of the record in the database",
    )

    def get_uuid(self) -> str:
        """Return the UUID of the document. Lazily creates the
        UUID if missing, storing it in the object to ensure
        consistency."""
        if not self.uuid:
            self.uuid = str(uuid4())
        return self.uuid

get_uuid()

Return the UUID of the document. Lazily creates the UUID if missing, storing it in the object to ensure consistency.

Source code in lmm/scan/chunks.py
339
340
341
342
343
344
345
def get_uuid(self) -> str:
    """Return the UUID of the document. Lazily creates the
    UUID if missing, storing it in the object to ensure
    consistency."""
    if not self.uuid:
        self.uuid = str(uuid4())
    return self.uuid

EncodingModel

Bases: StrEnum

Enum for encoding strategies

Attributes:

Name Type Description
NONE

no encoding (no embedding).

CONTENT

the textual content of the chunk is also used for the embedding

MERGED

merge textual content and annotations in a larger piece of text for the embedding

MULTIVECTOR

textual content and annotations are encoded by multivectors

SPARSE

use annotations only and use sparse encoding

SPARSE_CONTENT

annotations for sparse encoding, textual content for dense encoding

SPARSE_MERGED

annotations for sparse encoding, merged annotations and textual content for dense encoding

SPARSE_MULTIVECTOR

annotations for sparse encoding, annotations and textual content for multivector encoding

Source code in lmm/scan/chunks.py
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
class EncodingModel(StrEnum):
    """
    Enum for encoding strategies

    Attributes:
        NONE: no encoding (no embedding).
        CONTENT: the textual content of the chunk is also used for
            the embedding
        MERGED: merge textual content and annotations in a larger
            piece of text for the embedding
        MULTIVECTOR: textual content and annotations are encoded
            by multivectors
        SPARSE: use annotations only and use sparse encoding
        SPARSE_CONTENT: annotations for sparse encoding, textual
            content for dense encoding
        SPARSE_MERGED: annotations for sparse encoding, merged
            annotations and textual content for dense encoding
        SPARSE_MULTIVECTOR: annotations for sparse encoding,
            annotations and textual content for multivector encoding
    """

    # No encoding
    NONE = "none"

    # Encode only textual content in dense vector
    CONTENT = "content"

    # Encode textual content merged with metadata
    # annotations in dense vectors
    MERGED = "merged"

    # Encode content and annotations using multivectors
    MULTIVECTOR = "multivector"

    # Sparse encoding of annotations only
    SPARSE = "sparse"

    # Sparse annotations, dense encoding of content
    SPARSE_CONTENT = "sparse_content"

    # Sparse annotations, dense encoding of merged
    # content and annotations
    SPARSE_MERGED = "sparse_merged"

    # Sparse annotations, multivector encoding of merged
    # content and annotations
    SPARSE_MULTIVECTOR = "sparse_multivector"

blocks_to_chunks(blocklist, encoding_model, annotation_model=AnnotationModel(), logger=default_logger)

Transform a blocklist into a list of Chunk objects.

Implements the encoding model by collecting appropriate data and metadata.

Parameters:

Name Type Description Default
blocklist list[Block]

a list of markdown blocks

required
encoding_model EncodingModel

how to allocate information to dense and sparse encoding

required
annotation_model AnnotationModel | list[str]

the fields from the metadata to use for encoding. This field is ignored if the encoding model makes no use of annotations

AnnotationModel()
logger LoggerBase

a logger object.

default_logger

Returns:

Type Description
list[Chunk]

a list of Chunk objects

Note

this function only encodes text blocks. Markdown documents consisting only of headings and metadata are considered empty.

Source code in lmm/scan/chunks.py
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
def blocks_to_chunks(
    blocklist: list[Block],
    encoding_model: EncodingModel,
    annotation_model: AnnotationModel | list[str] = AnnotationModel(),
    logger: LoggerBase = default_logger,
) -> list[Chunk]:
    """
    Transform a blocklist into a list of `Chunk` objects.

    Implements the encoding model by collecting appropriate data
        and metadata.

    Args:
        blocklist: a list of markdown blocks
        encoding_model: how to allocate information to dense and
            sparse encoding
        annotation_model: the fields from the metadata to use for
            encoding. This field is ignored if the encoding model
            makes no use of annotations
        logger: a logger object.

    Returns:
        a list of `Chunk` objects

    Note:
        this function only encodes text blocks. Markdown documents
        consisting only of headings and metadata are considered
        empty.
    """

    if not blocklist:
        return []

    if isinstance(annotation_model, list):
        annotation_model = AnnotationModel(
            inherited_properties=annotation_model
        )

    # check there are annotations when sparse or hybrid models
    # are used
    if encoding_model == EncodingModel.SPARSE:
        if not annotation_model.has_properties():
            logger.error(
                f"{encoding_model} specified, but no annotations in model"
            )
            return []

    if encoding_model in [
        EncodingModel.SPARSE_CONTENT,
        EncodingModel.SPARSE_MULTIVECTOR,
        EncodingModel.SPARSE_MERGED,
        EncodingModel.MULTIVECTOR,
        EncodingModel.MERGED,
    ]:
        if not annotation_model.has_properties():
            logger.warning(
                f"{encoding_model} specified, but no annotations in model"
            )

    # collect or create required metadata for RAG: uuid, textid
    blocks: list[Block] = blocklist_rag(
        blocklist_copy(blocklist),
        ScanOpts(textid=True, textUUID=True),
        logger,
    )
    if blocklist_haserrors(blocks):
        logger.error("blocks_to_chunks called with error blocks")
        return []

    root: MarkdownTree = blocks_to_tree(blocks, logger)
    if root is None:
        return []

    # get rid of skipped nodes
    root = prune_tree(
        root, lambda x: not x.get_metadata_for_key(SKIP_KEY, False)
    )
    if root is None:
        logger.info("Markdown skipped (skip directive in header)")
        return []

    # integrate text node metadata by collecting metadata from parent,
    # unless metadata are already specified in the text node. These
    # metadata will be stored in the database as payload. This will
    # not inherit specific properties from ancestors, only the first
    # metadata block on the ancestor's path. We exclude metadata
    # properties that are used to chat and housekeeping.
    rootnode: MarkdownNode = inherit_metadata(root, exclude=_exclude_set)

    # map a text node with the inherited metadata to a Chunk object
    def _textnode_to_chunk(n: TextNode) -> Chunk:
        """Create a Chunk from a TextNode."""
        # annotations
        annlist: list[str] = []
        value: str | None = None
        for key in annotation_model.inherited_properties:
            value = n.fetch_metadata_string_for_key(key, False)
            if value:
                annlist.append(value.strip())
        for key in annotation_model.own_properties:
            value = n.get_metadata_string_for_key(key)
            if value:
                annlist.append(value.strip())

        # metadata for payload
        meta: MetadataDict = copy.deepcopy(n.metadata)
        for key in _exclude_set:
            meta.pop(key, None)
        chunk: Chunk = Chunk(
            content=n.get_content(),
            annotations=", ".join(annlist),
            uuid=str(meta.pop(UUID_KEY, "")),
            metadata=meta,
        )

        # determine content to be encoded according to encoding model
        match encoding_model:
            case EncodingModel.NONE:
                # no encoding
                pass

            case EncodingModel.CONTENT | EncodingModel.MULTIVECTOR:
                # encode only the content of the text blocks or
                # encode the content and metadata annotations using
                # multivectors
                chunk.dense_encoding = chunk.content

            case EncodingModel.MERGED:
                # encode the content merged with metadata annotations
                chunk.dense_encoding = (
                    f"{chunk.annotations}. {chunk.content}"
                    if chunk.annotations
                    else chunk.content
                )

            case EncodingModel.SPARSE:
                # sparse encoding of metadata annotations only
                chunk.sparse_encoding = chunk.annotations

            case (
                EncodingModel.SPARSE_CONTENT
                | EncodingModel.SPARSE_MULTIVECTOR
            ):
                # sparse encoding of metadata annotations, dense
                #   encoding of content or
                # sparse encoding of metadata annotations, multidense
                #   encoding of content
                chunk.sparse_encoding = chunk.annotations
                chunk.dense_encoding = chunk.content

            case EncodingModel.SPARSE_MERGED:
                # sparse encoding of metadata annotations, dense
                # encoding of merged content and annotations
                chunk.sparse_encoding = chunk.annotations
                chunk.dense_encoding = (
                    f"{chunk.annotations}. {chunk.content}"
                    if chunk.annotations
                    else chunk.content
                )

            case _:
                raise ValueError(
                    f"Unsupported encoding model: {encoding_model}"
                )
        return chunk

    # we exclude from chunking nodes that were skipped.
    chunks = traverse_tree_nodetype(
        rootnode,
        _textnode_to_chunk,
        TextNode,
    )
    return [c for c in chunks if c.content]

chunks_to_blocks(chunks, sep='', key_chunk='~chunk')

Transform a list of Chunk objects to a list of blocks.

Parameters:

Name Type Description Default
chunks list[Chunk]

a list of Chunk objects

required
sep str

an optional separator to visualize the breaks between chunks

''
key_chunk str

the metadata key where the chunk is copied into

'~chunk'

Returns:

Type Description
list[Block]

a list of markdown blocks that can be serialized as a Markdown document

Note

the content of the chunk is split into a metadata block and a text block, containing the 'content' value of the chunk.

Source code in lmm/scan/chunks.py
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
def chunks_to_blocks(
    chunks: list[Chunk], sep: str = "", key_chunk: str = "~chunk"
) -> list[Block]:
    """
    Transform a list of `Chunk` objects to a list of blocks.

    Args:
        chunks: a list of `Chunk` objects
        sep: an optional separator to visualize the breaks
            between chunks
        key_chunk: the metadata key where the chunk is copied into

    Returns:
        a list of markdown blocks that can be serialized as
            a Markdown document

    Note:
        the content of the chunk is split into a metadata block
            and a text block, containing the 'content' value of the chunk.
    """

    from lmm.markdown.parse_yaml import MetadataPrimitive

    blocks: list[Block] = []
    for c in chunks:
        if sep:
            blocks.append(TextBlock(content=sep))
        if c.metadata:
            blockmeta = c.metadata.copy()
            meta: dict[
                str, MetadataPrimitive | list[MetadataPrimitive]
            ] = {
                'uuid': c.uuid,
                'content': "<block content>",
                'annotations': c.annotations,
                'dense_encoding': c.dense_encoding,
                'sparse_encoding': c.sparse_encoding,
            }
            blockmeta[key_chunk] = meta
            blocks.append(MetadataBlock(content=blockmeta))
        blocks.append(TextBlock(content=c.content))

    return blocks

serialize_chunks(chunks, sep='', key_chunk='~chunk')

Serialize a list of Chunkobjects for debug/inspection purposes. See chunks_to_blocks for more details.

Parameters:

Name Type Description Default
chunks list[Chunk]

a list of Chunk objects

required
sep str

an optional separator to visualize the breaks between chunks

''
key_chunk str

the metadata key where the chunk is copied into

'~chunk'

Returns:

Type Description
str

a string representation of the chunks.

Source code in lmm/scan/chunks.py
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
def serialize_chunks(
    chunks: list[Chunk], sep: str = "", key_chunk: str = "~chunk"
) -> str:
    """
    Serialize a list of `Chunk`objects for debug/inspection purposes.
    See chunks_to_blocks for more details.

    Args:
        chunks: a list of `Chunk` objects
        sep: an optional separator to visualize the breaks
            between chunks
        key_chunk: the metadata key where the chunk is copied into

    Returns:
        a string representation of the chunks.
    """

    # lazy load
    from lmm.markdown.parse_markdown import serialize_blocks

    return serialize_blocks(chunks_to_blocks(chunks, sep, key_chunk))

Scan module for Scan Keys

Keys used in scan functions

Scan module for Scan Utilities

Utilities for scan modules.

Main functions
  • preproc_for_markdown
  • post_order_hashed_aggregation
Behaviour

Exported functions in this module generally raise ValueError for invalid arguments or internal state errors. They also accept a LoggerBase object to log warnings and information about the aggregation process.

aggregate_hash(node, filter_func)

Create a hash from the text of the node, or of the descendants of the node. If the text is empty, an empty string is returned.

Parameters:

Name Type Description Default
node MarkdownNode

the node to compute the hash for

required
filter_func Callable[[MarkdownNode], bool]

a function to filter the nodes whose content should be hashed

required

Returns:

Type Description
str

a string of 22 characters, or an empty string if there is no content in the tree.

Source code in lmm/scan/scanutils.py
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
def aggregate_hash(
    node: MarkdownNode,
    filter_func: Callable[[MarkdownNode], bool],
) -> str:
    """
    Create a hash from the text of the node, or of the descendants
    of the node. If the text is empty, an empty string is returned.

    Args:
        node: the node to compute the hash for
        filter_func: a function to filter the nodes whose
            content should be hashed

    Returns:
        a string of 22 characters, or an empty string if there is
            no content in the tree.
    """

    if node.is_text_node():
        return (
            base_hash(node.get_content()) if filter_func(node) else ""
        )

    buffer: list[str] = []
    for child in node.children:
        if not filter_func(child):
            continue

        if child.is_text_node():
            buffer.append(child.get_content())
        else:
            buffer.append(aggregate_hash(child, filter_func))

    return base_hash("".join(buffer))

post_order_hashed_aggregation(root_node, aggregate_func, output_key, hashed=True, hash_key=TXTHASH_KEY, *, filter_func=lambda _: True, logger=ConsoleLogger())

Executes a post-order traversal on the markdown tree, with bottom- -up aggregation of the synthetic attributes in the parent nodes from the content data member of children text nodes. The synthetic attribute is computed by aggregate_func and recursively stored in the output_key field of the metadata member of the parent node.

This function differs from tree.extract_content in that a hash is computed to verify that the content was changed before calling the aggregate function.

Note

aggregate_func is only called if there is content to aggregate. This avoids calls to llm's without content. In addition, aggregate_func itself may autonomously return empty for insufficient content.

If a heading child lacks a synthetic attribute as a result of this, the aggregation algorithm will descend into that child's subtree to find text to give more material to aggregate_func.

Content collection strategy

Parent nodes collect content from their children as follows: - From text children: the raw text content is collected. - From heading children: if the child has a synthetic output (output_key in metadata), that output is collected. If not, the algorithm recurses into the child's subtree to collect raw text from deeper levels. This means that parent aggregation operates on children's synthetic outputs, not their raw text.

Single-heading-child optimisation

When a non-root heading node has exactly one child which is also a heading node, the child's synthetic output is copied to the parent instead of calling aggregate_func (since the result would be identical). This copy cascades correctly in chains (H1->H2->H3): post-order processes H3 first, then H2 copies from H3, then H1 copies from H2. When a child is later added (no longer only-child), the node enters the normal aggregation path; the old copied output is invalidated by hash mismatch (hashed=True) or overwritten (hashed=False with output_key deleted).

Manual edits

Manual edits to synthetic properties are overwritten on recomputation. Use frozen: true in metadata to preserve them.

Parameters:

Name Type Description Default
root_node MarkdownNode

The root node of the markdown tree

required
aggregate_func Callable[[str], str]

Function to process the collected content before storing. The collected content is provided as a string. The function may return an empty string if there is no/not enough material to synthetise, leaving it for synthesis at the next level. This implies that at the next level text will be recursively collected from all children nodes to attempt to compute the synthetic attribute.

required
output_key str

the key in the metadata where the synthetised attributes should be stored

required
hashed bool

if true, stores a hash of the content used for aggregation, and if the content changes recomputes the aggregation. If false, the aggregation is computed only if the output key is missing from the metadata or its value is empty (see summary below)

True
hash_key str

the key in the metadata where the hash is read and stored.

TXTHASH_KEY
filter_func Callable[[MarkdownNode], bool]

a predicate function on the nodes to be aggregated. Only nodes where filter_func(node) is True will be aggregated. This means that nodes excluded by the filter_func will be excluded for both aggregation and production of synthetic attributes (the branch is completely pruned)

lambda _: True
logger LoggerBase

a logger object.

ConsoleLogger()

Behaviour under different conditions

hashed = True (default) - Computes a hash of the content of text nodes under each heading, ignoring synthetic outputs. - If the node already has both output_key and hash_key in metadata, and the stored hash matches the newly-computed hash, no new synthetic property is recomputed. - If hash differs, or output_key is missing, or hash_key is missing, recomputes the synthetic property and stores it in the metadata together with the new hash. Hence, when hashed = True, changes to raw text trigger recomputation, while changes to synthetic outputs of children do not (allowing manual editing of synthetic properties).

hashed = False - If the node already has output_key in metadata with a truthy value, no recomputation takes place (the old property is retained). - If output_key is missing, the synthetic property is computed and stored in the metadata. - No hash is ever stored or checked. This is a "compute once" mode. To force recomputation, one must delete the output_key from the node's metadata manually, or use the extract_content function.

frozen: true in metadata If a node has a frozen property set to true, no aggregation will take place on that node and all its descendants. This means that the aggregation process itself is frozen.

Behaviour

Raises ValueError: If validation fails for any of the following: - hashed is True and output_key equals hash_key - output_key is None or empty string

Source code in lmm/scan/scanutils.py
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
def post_order_hashed_aggregation(
    root_node: MarkdownNode,
    aggregate_func: Callable[[str], str],
    output_key: str,
    hashed: bool = True,
    hash_key: str = TXTHASH_KEY,
    *,
    filter_func: Callable[[MarkdownNode], bool] = lambda _: True,
    logger: LoggerBase = ConsoleLogger(),
) -> None:
    """
    Executes a post-order traversal on the markdown tree, with bottom-
    -up aggregation of the synthetic attributes in the parent nodes
    from the content data member of children text nodes. The synthetic
    attribute is computed by aggregate_func and recursively stored in
    the output_key field of the metadata member of the parent node.

    This function differs from tree.extract_content in that a
    hash is computed to verify that the content was changed before
    calling the aggregate function.

    Note:
        aggregate_func is only called if there is content to
        aggregate. This avoids calls to llm's without content. In
        addition, aggregate_func itself may autonomously return empty
        for insufficient content.

        If a heading child lacks a synthetic attribute as a result of
        this, the aggregation algorithm will descend into that child's
        subtree to find text to give more material to aggregate_func.

    Content collection strategy:
        Parent nodes collect content from their children as follows:
        - From text children: the raw text content is collected.
        - From heading children: if the child has a synthetic output
          (output_key in metadata), that output is collected. If not,
          the algorithm recurses into the child's subtree to collect
          raw text from deeper levels.
        This means that parent aggregation operates on children's
        synthetic outputs, not their raw text.

    Single-heading-child optimisation:
        When a non-root heading node has exactly one child which is
        also a heading node, the child's synthetic output is copied
        to the parent instead of calling aggregate_func (since the
        result would be identical). This copy cascades correctly
        in chains (H1->H2->H3): post-order processes H3 first,
        then H2 copies from H3, then H1 copies from H2. When a
        child is later added (no longer only-child), the node
        enters the normal aggregation path; the old copied output
        is invalidated by hash mismatch (hashed=True) or
        overwritten (hashed=False with output_key deleted).

    Manual edits:
        Manual edits to synthetic properties are overwritten on
        recomputation. Use ``frozen: true`` in metadata to preserve
        them.

    Args:
        root_node: The root node of the markdown tree
        aggregate_func: Function to process the collected content
            before storing. The collected content is provided as a
            string. The function may return an empty string if
            there is no/not enough material to synthetise, leaving
            it for synthesis at the next level. This implies that
            at the next level text will be recursively collected
            from all children nodes to attempt to compute the
            synthetic attribute.
        output_key: the key in the metadata where the synthetised
            attributes should be stored
        hashed: if true, stores a hash of the content used for
            aggregation, and if the content changes recomputes the
            aggregation. If false, the aggregation is computed only
            if the output key is missing from the metadata or its
            value is empty (see summary below)
        hash_key: the key in the metadata where the hash is read
            and stored.
        filter_func: a predicate function on the nodes to be
            aggregated. Only nodes where filter_func(node) is True
            will be aggregated. This means that nodes excluded by
            the filter_func will be excluded for both aggregation
            and production of synthetic attributes (the branch is
            completely pruned)
        logger: a logger object.

    Behaviour under different conditions

    `hashed = True` (default)
    - Computes a hash of the content of text nodes under each heading,
        ignoring synthetic outputs.
    - If the node already has both `output_key` and `hash_key` in
        metadata, and the stored hash matches the newly-computed hash,
        no new synthetic property is recomputed.
    - If hash differs, or `output_key` is missing, or `hash_key` is
        missing, recomputes the synthetic property and stores it in
        the metadata together with the new hash.
    Hence, when `hashed = True`, changes to raw text trigger
    recomputation, while changes to synthetic outputs of children do
    not (allowing manual editing of synthetic properties).

    `hashed = False`
    - If the node already has `output_key` in metadata with a truthy
        value, no recomputation takes place (the old property is
        retained).
    - If `output_key` is missing, the synthetic property is computed
        and stored in the metadata.
    - No hash is ever stored or checked.
    This is a "compute once" mode. To force recomputation, one must
    delete the `output_key` from the node's metadata manually, or
    use the `extract_content` function.

    `frozen: true` in metadata
    If a node has a `frozen` property set to true, no aggregation
    will take place on that node and all its descendants. This
    means that the aggregation process itself is frozen.

    Behaviour:
        Raises ValueError: If validation fails for any of the
        following:
            - hashed is True and output_key equals hash_key
            - output_key is None or empty string
    """

    # this to inform type checker about assumption on node type
    def _is_heading_node(
        node: MarkdownNode,
    ) -> TypeGuard[HeadingNode]:
        return isinstance(node, HeadingNode)

    # this again for type checker, setting None to ""
    def _node_property(
        node: MarkdownNode, key: str, append: str = ""
    ) -> str:
        prpty: str | None = node.get_metadata_string_for_key(key, "")
        return (prpty + append) if prpty else ""

    # Validate output_key (treated as coding error)
    if not output_key or not output_key.strip():
        raise ValueError(
            "output_key must be a non-empty string. "
            f"Received: {repr(output_key)}"
        )
    output_key = output_key.strip()

    # Validate that output_key and hash_key are different when
    # hashing is enabled (treated as coding error)
    if hashed and output_key == hash_key:
        raise ValueError(
            "output_key and hash_key cannot be the same when "
            f"hashed=True. Both are set to '{output_key}'. This "
            "would cause the hash value to overwrite the aggregated "
            "output."
        )

    if root_node.is_header_node() and not filter_func(root_node):
        logger.warning("Aggregation skipped for document")
        return

    delimiter: str = "\n\n"
    any_content_processed = False

    def _process_node(node: MarkdownNode) -> None:
        nonlocal any_content_processed
        # Skip leaf nodes (they don't have children to synthetise)
        if node.is_text_node():
            return

        if not _is_heading_node(node):
            # this does not defend against coding errors, it
            # just satisfies type checker
            raise ValueError(
                "Unreachable code reached: unexpected node type"
            )

        # do not compute aggregation if there is a parent node
        # with a "frozen" property to prevent updates
        if node.fetch_metadata_for_key(FREEZE_KEY, True, False):
            logger.info("Skipped (frozen)")
            return

        # no children: nothing to aggregate
        if node.count_children() == 0:
            return
        # single-heading-child optimisation: copy the child's
        # synthetic output instead of re-aggregating identical
        # content (root is exempt so it always aggregates)
        if (
            node != root_node
            and node.count_children() == 1
            and isinstance(node.children[0], HeadingNode)
        ):
            child = node.children[0]
            child_output = child.get_metadata_string_for_key(
                output_key, ""
            )
            if child_output:
                if not node.metadata:
                    node.metadata = {}
                node.metadata[output_key] = child_output
                if hashed:
                    node.metadata[hash_key] = (
                        aggregate_hash(node, filter_func)
                    )
                any_content_processed = True
            return

        # collect content from children (it is a heading node)
        collected_content: list[str] = []

        def _collect_text(node: MarkdownNode) -> None:
            # Recursively collects text from a node

            if not filter_func(node):
                return

            for child in node.children:

                if not filter_func(child):
                    continue

                if child.is_text_node():
                    # Collect content from direct TextBlock children
                    collected_content.append(child.get_content())
                else:
                    # Collect synthetic outputs from heading children
                    # that have them, and if not look in children
                    text: str | None = (
                        child.get_metadata_string_for_key(output_key)
                    )

                    if text:
                        collected_content.append(text)
                    else:  # recursion to headings down the tree
                        _collect_text(child)

        # start the recursion
        _collect_text(node)

        # If we collected any content, process it and store it in
        # metadata
        if collected_content:
            joined_content = delimiter.join(collected_content)

            # If there is the output, check that the joined content
            # corresponds to the hash
            if hashed:
                new_hash = aggregate_hash(node, filter_func)
                if (
                    node.metadata
                    and output_key in node.metadata
                    and node.metadata[output_key]
                    and hash_key in node.metadata
                ):
                    if node.metadata[hash_key] == new_hash:
                        logger.info(
                            _node_property(
                                node,
                                TITLES_TEMP_KEY,
                                " skipped: text unchanged",
                            )
                        )
                        any_content_processed = True
                        return
            # If not hashed, check that output is already there
            else:
                if (
                    node.metadata
                    and output_key in node.metadata
                    and node.metadata[output_key]
                ):
                    any_content_processed = True
                    logger.info(
                        _node_property(
                            node,
                            TITLES_TEMP_KEY,
                            f" skipped: {output_key} present",
                        )
                    )
                    return

            # the hash differs or the output is missing. we need to
            # recompute
            logger.info(
                "Aggregating " + _node_property(node, TITLES_TEMP_KEY)
            )
            synth_content = aggregate_func(joined_content)
            if not synth_content:
                return

            # Initialize metadata dictionary if it doesn't exist
            if not node.metadata:
                node.metadata = {}

            # Store the synthesized property in metadata
            node.metadata[output_key] = synth_content
            if hashed:
                node.metadata[hash_key] = new_hash  # type: ignore
                # ignore: bound if hashed

            # Mark that we processed at least some content
            any_content_processed = True

    post_order_traversal(root_node, _process_node)

    # Warn if no content was processed (all nodes were filtered out,
    # or aggregate_func refused to compute aggregation)
    if not any_content_processed:
        heading_titles: str = _node_property(
            root_node, TITLES_TEMP_KEY, ": "
        )
        if root_node.is_root_node():
            logger.warning(
                heading_titles
                + "No aggregation was performed. This may indicate an "
                "overly restrictive filter, non-aggregable metadata, "
                "or an empty/small document.",
            )
        else:
            if len(root_node.get_text_children()) > 0:
                logger.warning(
                    heading_titles + "No aggregation was performed."
                )

preproc_for_markdown(response)

Pre-processes a string for markdown rendering, specifically handling LaTeX-style delimiters.

Parameters:

Name Type Description Default
response str

The string to be processed.

required

Returns:

Type Description
str

The processed string with updated delimiters.

Source code in lmm/scan/scanutils.py
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
def preproc_for_markdown(response: str) -> str:
    """
    Pre-processes a string for markdown rendering, specifically
    handling LaTeX-style delimiters.

    Args:
        response: The string to be processed.

    Returns:
        The processed string with updated delimiters.
    """
    # replace square brackets containing the character '\' to one
    # that is enclosed between '$$' for rendering in markdown
    response = re.sub(r"\\\[|\\\]", "$$", response)
    response = re.sub(r"\\\(|\\\)", "$", response)
    return response