Load and save markdown documents

Load and save module: ioutils

Utilities to read/write markdown files to/from disc and handle errors consistently for the lmm package.

The module ioutils provides robust I/O operations for markdown files with comprehensive error handling, file size validation, encoding detection, and integration with the lm markdown logging system.

Key Features: - Automatic encoding detection with fallback strategies - Configurable file size limits with warnings and hard limits - Comprehensive error handling through LoggerBase abstraction - Integration with markdown parsing and block structures - Support for both file paths and direct string content

Logger Usage Patterns

The module supports different logger implementations for various use cases:

  1. ConsoleLogger - For interactive development and debugging: >>> from lmm.utils.logging import ConsoleLogger >>> logger = ConsoleLogger(name) >>> content = load_markdown("file.md", logger=logger) # Errors and warnings printed to console

  2. FileLogger - For production logging to files: >>> from lmm.utils.logging import FileLogger >>> from pathlib import Path >>> logger = FileLogger(name, Path("app.log")) >>> content = load_markdown("file.md", logger=logger) # Errors and warnings written to app.log

  3. ExceptionConsoleLogger - For strict error handling: >>> from lmm.utils.logging import ExceptionConsoleLogger >>> logger = ExceptionConsoleLogger(name) >>> content = load_markdown("file.md", logger=logger) # Raises RuntimeError on any error condition

  4. LoglistLogger - For testing and programmatic access: >>> from lmm.utils.logging import LoglistLogger >>> logger = LoglistLogger() >>> content = load_markdown("file.md", logger=logger) >>> errors = logger.get_logs(logging.ERROR) # Error-level only

Module Relationships

This module serves as the I/O layer between file system operations and the markdown parsing system:

File System ←→ ioutils.py ←→ parse_markdown.py ←→ Application

  • Depends on lmm.utils.ioutils for basic file validation
  • Depends on lmm.utils.logging for error reporting abstraction
  • Integrates with lmm.markdown.parse_markdown for block structures
  • Used by higher-level modules for markdown file processing
Performance Characteristics
  • File size checking: O(1) - single stat() call
  • Encoding detection: O(n) where n is detection sample size (1-10KB)
  • UTF-8 detection: Fast path with 1KB sample
  • Chardet detection: Slower but more accurate with 10KB sample
  • Memory usage: Proportional to file size (entire file loaded into memory)
  • Recommended limits: 50MB max, 10MB warning (configurable)

For large files, consider: - Increasing max_size_mb parameter if needed - Using streaming approaches for files > 100MB - Monitoring memory usage in production environments

Main Functions
  • load_markdown: Load markdown files with encoding detection and size validation
  • save_markdown: Save markdown content to files with error handling
  • report_error_blocks: Report and filter error blocks from parsed markdown
  • convert_dollar_latex_delimiters: Convert LaTeX $/$$ to ()/[]
  • convert_backslash_latex_delimiters: Convert LaTeX ()/[] to $/$$
Behaviour

Functions use LoggerBase abstraction instead of raising exceptions directly. Error handling behavior depends on the logger implementation: - ConsoleLogger/FileLogger: Log errors, return empty strings/False - ExceptionConsoleLogger: Raises RuntimeError on errors - LoglistLogger: Accumulates errors for programmatic access

convert_backslash_latex_delimiters(response)

Convert LaTeX delimiters from [ ] and ( ) format to $$ and $ format. This is the inverse of convert_latex_delimiters.

Source code in lmm/markdown/ioutils.py
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
def convert_backslash_latex_delimiters(response: str) -> str:
    r"""
    Convert LaTeX delimiters from \[ \] and \( \) format to $$ and $ format.
    This is the inverse of convert_latex_delimiters.
    """

    # Convert \[...\] expressions (display math) to $$...$$
    # Match \[ followed by content, then \]
    pattern = r'\\\[\s*(.*?)\s*\\\]'
    replacement = r'$$\1$$'
    response = re.sub(pattern, replacement, response, flags=re.DOTALL)

    # Convert \(...\) expressions (inline math) to $...$
    # Match \( followed by content, then \)
    pattern = r'\\\(\s*(.*?)\s*\\\)'
    replacement = r'$\1$'
    response = re.sub(pattern, replacement, response, flags=re.DOTALL)

    return response

convert_dollar_latex_delimiters(response)

Convert LaTeX delimiters from $$ and $ format to [ ] and ( ) format. Avoids converting escaped dollar signs (\$).

Parameters:

Name Type Description Default
response str

String containing LaTeX with dollar delimiters

required

Returns:

Type Description
str

String with converted LaTeX delimiters

Source code in lmm/markdown/ioutils.py
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
def convert_dollar_latex_delimiters(response: str) -> str:
    r"""
    Convert LaTeX delimiters from $$ and $ format to \[ \] and \( \) format.
    Avoids converting escaped dollar signs (\$).

    Args:
        response: String containing LaTeX with dollar delimiters

    Returns:
        String with converted LaTeX delimiters
    """

    # Convert $$ expressions (display math) - avoid escaped \$$
    # Negative lookbehind (?<!\\) ensures we don't match \$$
    pattern = r'(?<!\\)\$\$\s*(.*?)\s*\$\$'
    replacement = r'\\[\1\\]'
    response = re.sub(pattern, replacement, response, flags=re.DOTALL)

    # Convert $ expressions (inline math) - avoid escaped \$
    pattern = r'(?<!\\)\$\s*(.*?)\s*\$'
    replacement = r'\\(\1\\)'
    response = re.sub(pattern, replacement, response, flags=re.DOTALL)

    return response

load_markdown(source, logger=logger, max_size_mb=50.0, warn_size_mb=10.0, encoding=None, auto_detect_encoding=True)

Loads a text file (intended for markdown files). The purpose of this function is to catch errors through a LoggerBase object, instead of raising errors in the I/O.

Parameters:

Name Type Description Default
source str | Path

the source file. If the source is a multiline string, or if it is not a file, returns the string itself.

required
logger LoggerBase

a logger object (defaults to console).

logger
max_size_mb float

maximum file size in MB (default: 50.0).

50.0
warn_size_mb float

file size in MB to trigger warning (default: 10.0).

10.0
encoding str | None

specific encoding to use. If None and auto_detect_encoding is True, encoding will be detected automatically.

None
auto_detect_encoding bool

whether to automatically detect file encoding (default: True).

True

Returns:

Name Type Description
str str

The loaded markdown content as a string. Returns empty string ("") on error.

Note

I/O errors will be conveyed to the logger object. Use an ExceptionConsoleLogger object to raise errors.

Source code in lmm/markdown/ioutils.py
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
@validate_call(config={'arbitrary_types_allowed': True})
def load_markdown(
    source: str | Path,
    logger: LoggerBase = logger,
    max_size_mb: float = 50.0,
    warn_size_mb: float = 10.0,
    encoding: str | None = None,
    auto_detect_encoding: bool = True,
) -> str:
    """
    Loads a text file (intended for markdown files).
    The purpose of this function is to catch errors through
    a LoggerBase object, instead of raising errors in the I/O.

    Args:
        source (str | Path): the source file. If the source is a
            multiline string, or if it is not a file, returns the
            string itself.
        logger (LoggerBase): a logger object (defaults to console).
        max_size_mb (float): maximum file size in MB (default: 50.0).
        warn_size_mb (float): file size in MB to trigger warning
            (default: 10.0).
        encoding (str | None): specific encoding to use. If None and
            auto_detect_encoding is True, encoding will be detected
            automatically.
        auto_detect_encoding (bool): whether to automatically detect
            file encoding (default: True).

    Returns:
        str: The loaded markdown content as a string. Returns empty string ("") on error.

    Note:
        I/O errors will be conveyed to the logger object. Use an
        ExceptionConsoleLogger object to raise errors.
    """

    # Make the source a Path object if it points to file
    if isinstance(source, str):
        source = string_to_path_or_string(source)

    # Load if Path object, or return
    if isinstance(source, Path):
        if validate_file(source, logger) is None:
            return ""

        # Check file size limits
        if not _check_file_size(
            source, max_size_mb, warn_size_mb, logger
        ):
            return ""

        # Determine encoding to use
        file_encoding = encoding
        if file_encoding is None and auto_detect_encoding:
            file_encoding = _detect_encoding(source, logger)
        elif file_encoding is None:
            file_encoding = 'utf-8'  # Default fallback

        try:
            # Handle potential encoding errors gracefully
            if file_encoding == 'utf-8':
                content = source.read_text(
                    encoding=file_encoding, errors='replace'
                )
            else:
                content = source.read_text(encoding=file_encoding)
        except (IOError, OSError) as e:
            logger.error(f"I/O error reading file {source}: {e}")
            return ""
        except UnicodeDecodeError as e:
            logger.error(
                f"Encoding error reading file {source} with "
                f"{file_encoding}: {e}"
            )
            # Try UTF-8 with error replacement as last resort
            try:
                content = source.read_text(
                    encoding='utf-8', errors='replace'
                )
                logger.warning(
                    "Fallback to UTF-8 with error replacement "
                    f"for {source}"
                )
            except Exception as fallback_e:
                logger.error(
                    f"Final fallback failed for {source}: "
                    f"{fallback_e}"
                )
                return ""
        except Exception as e:
            logger.error(
                f"Unexpected error reading file {source}: {e}"
            )
            return ""
    else:
        content = source

    return content

report_error_blocks(blocks, logger=logger)

Checks the existence of error blocks. If there are any, they are reported to the logger object.

Parameters:

Name Type Description Default
blocks list[Block]

the block list to check for error blocks

required
logger LoggerBase

a logger object, defaulting to a console logger, which reports the errors.

logger

Returns:

Type Description
list[Block]

a list without error blocks.

Note

I/O errors are conveyed through the logger object. Use an ExceptionConsoleLogger object to raise errors. Use blocklist_errors to filter the block list for error blocks.

Source code in lmm/markdown/ioutils.py
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
def report_error_blocks(
    blocks: list[Block], logger: LoggerBase = logger
) -> list[Block]:
    """
    Checks the existence of error blocks. If there are any, they are
    reported to the logger object.

    Args:
        blocks: the block list to check for error blocks
        logger (LoggerBase): a logger object, defaulting to
            a console logger, which reports the errors.

    Returns:
        a list without error blocks.

    Note:
        I/O errors are conveyed through the logger object. Use an
        ExceptionConsoleLogger object to raise errors.
        Use blocklist_errors to filter the block list for error
        blocks.
    """
    if not blocks:
        return []

    errblocks: list[ErrorBlock] = blocklist_errors(blocks)
    if not errblocks:  # all ok
        return blocks

    # Handle single error block (usually file loading failure)
    if len(blocks) == 1 and len(errblocks) == 1:
        _report_single_error_block(errblocks[0], logger)
        return []

    # Handle multiple error blocks
    _report_multiple_error_blocks(errblocks, logger)
    return [b for b in blocks if not isinstance(b, ErrorBlock)]

save_markdown(dest, content, logger=logger)

Save markdown blocks to a file.

Parameters:

Name Type Description Default
dest str | Path

the file to save the markdown to.

required
content list[Block] | str

the content of the markdown, a block list or a string

required
logger LoggerBase

a logger object, defaulting to a console logger.

logger

Returns:

Type Description
bool

a boolean indicating success or failure.

Note

I/O errors are conveyed through the logger object. Use an ExceptionConsoleLogger object to raise errors.

For critical save failures, the error is also logged to a ConsoleLogger to ensure visibility even if the provided logger is not console-based.

Source code in lmm/markdown/ioutils.py
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
@validate_call(config={'arbitrary_types_allowed': True})
def save_markdown(
    dest: str | Path | io.TextIOBase,
    content: list[Block] | str,
    logger: LoggerBase = logger,
) -> bool:
    """
    Save markdown blocks to a file.

    Args:
        dest (str | Path): the file to save the markdown to.
        content: the content of the markdown, a block list or a string
        logger (LoggerBase): a logger object, defaulting to
            a console logger.

    Returns:
        a boolean indicating success or failure.

    Note:
        I/O errors are conveyed through the logger object. Use an
        ExceptionConsoleLogger object to raise errors.

        For critical save failures, the error is also logged to a 
        ConsoleLogger to ensure visibility even if the provided logger 
        is not console-based.
    """
    if not content:
        logger.warning("Empty markdown")
        return False

    # Serialize content if it is a list of blocks
    match content:
        case str():
            pass
        case list():
            content = serialize_blocks(content)
        case _:
            logger.critical('Invalid object given to serialize')
            return False

    try:
        if isinstance(dest, io.TextIOBase):
            dest.write(content)
        else:
            # Check save path
            save_path = Path(dest)
            # Create parent directories if they don't exist
            save_path.parent.mkdir(parents=True, exist_ok=True)

            with open(save_path, 'w', encoding='utf-8') as file:
                file.write(content)

    except (IOError, OSError) as e:
        error_msg = f"I/O error saving markdown to {dest}: {str(e)}"
        logger.error(error_msg)
        # Ensure error reaches console even if logger isn't console-based
        from lmm.utils.logging import ConsoleLogger
        console = ConsoleLogger("lmm.markdown.ioutils")
        console.error(error_msg)
        return False
    except Exception as e:
        error_msg = f"Unexpected error saving markdown to {dest}: {str(e)}"
        logger.error(error_msg)
        # Ensure error reaches console even if logger isn't console-based
        from lmm.utils.logging import ConsoleLogger
        console = ConsoleLogger("lmm.markdown.ioutils")
        console.error(error_msg)
        return False
        # Note: Don't fail here as we've already processed the file
        # Just couldn't save it

    return True

Parse markdown into block lists

Markdown documents are parsed into lists of block objects. These objects may be of three types: metadata (including the header, MetadataBlock), heading (HeadingBlock), and text (TextBlock). Serializing this list provides text that can be saved back to disk as a markdown file.

The parser covers a simplified version of Pandoc markdown. The parsing leaves the content of the text blocks unchanged. Unlike the pandoc parser, it maintains the position of the metadata blocks in the text. This parse list is a flat list to reflect the sequential nature of the markdown file.

ErrorBlock

Bases: BaseModel

This object represents a portion of the markdown document that gave rise to parsing errors.

Important functions: serialize() a textual representation of the error get_content() the string with the error description self.origin the markdown text that gave rise to the error

Note

ErrorBlocks use identity equality (is) rather than value equality (==). This ensures that error instances remain distinct even when they have identical content, which is important for tracking multiple similar errors separately.

Source code in lmm/markdown/parse_markdown.py
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
class ErrorBlock(BaseModel):
    """This object represents a portion of the markdown document
    that gave rise to parsing errors.

    Important functions:
    serialize()     a textual representation of the error
    get_content()   the string with the error description
    self.origin     the markdown text that gave rise to the error

    Note:
        ErrorBlocks use identity equality (is) rather than value
        equality (==). This ensures that error instances remain
        distinct even when they have identical content, which is
        important for tracking multiple similar errors separately.
    """

    content: str = ""
    errormsg: str = ""
    origin: str = ""
    type: Literal['error'] = 'error'

    def __eq__(self, obj: object) -> bool:
        return self is obj

    def __ne__(self, obj: object) -> bool:
        return not self.__eq__(obj)

    def serialize(self) -> str:
        """A textual representation of the error. When parsed, it will
        reconstitute the markdown text that gave rise to the error."""
        content = "** ERROR: " + self.content + "**\n"
        if self.errormsg:
            content += self.errormsg + "\n"
        if self.origin:
            content += "\n" + self.origin + "\n\n"
        return content

    def get_info(self) -> str:
        """Printable block properties and content."""
        info = "\n-------------\nError block\n"
        info += self.content
        info = info if info else "empty error block"
        return info

    def get_content(self) -> str:
        """Returns the error message"""
        return self.content

    def deep_copy(self) -> 'ErrorBlock':
        return self.model_copy(deep=True)

get_content()

Returns the error message

Source code in lmm/markdown/parse_markdown.py
664
665
666
def get_content(self) -> str:
    """Returns the error message"""
    return self.content

get_info()

Printable block properties and content.

Source code in lmm/markdown/parse_markdown.py
657
658
659
660
661
662
def get_info(self) -> str:
    """Printable block properties and content."""
    info = "\n-------------\nError block\n"
    info += self.content
    info = info if info else "empty error block"
    return info

serialize()

A textual representation of the error. When parsed, it will reconstitute the markdown text that gave rise to the error.

Source code in lmm/markdown/parse_markdown.py
647
648
649
650
651
652
653
654
655
def serialize(self) -> str:
    """A textual representation of the error. When parsed, it will
    reconstitute the markdown text that gave rise to the error."""
    content = "** ERROR: " + self.content + "**\n"
    if self.errormsg:
        content += self.errormsg + "\n"
    if self.origin:
        content += "\n" + self.origin + "\n\n"
    return content

HeaderBlock

Bases: MetadataBlock

This object represents the header block of a markdown document. It is the first block of the block list obtained from loading a markdown file with load_markdown. The behaviour of functions in this package when a header block is is inserted by code in a position other than the first is undefined.

Important functions: serialize() reconstitute a text representation of the metadata get_content() the metadata get_key(key, default) a metadata value indexed by key

Source code in lmm/markdown/parse_markdown.py
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
class HeaderBlock(MetadataBlock):
    """This object represents the header block of a markdown document.
    It is the first block of the block list obtained from loading a
    markdown file with load_markdown.
    The behaviour of functions in this package when a header block is
    is inserted by code in a position other than the first is
    undefined.

    Important functions:
    serialize()     reconstitute a text representation of the metadata
    get_content()   the metadata
    get_key(key, default) a metadata value indexed by key
    """

    type: Literal['header'] = 'header'  # type: ignore

    def get_info(self) -> str:
        """Printable block properties and content."""
        info = "\n-------------\nHeader block"
        info += f" # {self.comment}\n" if self.comment else "\n"
        info += (
            pya.dump_yaml(self.content) if self.content else "<empty>"
        )
        if self.private_:
            info += "\n\nAdditional data:" + pya.dump_yaml(
                self.private_
            )
        return info

    def deep_copy(self) -> 'HeaderBlock':
        return self.model_copy(deep=True)

    @staticmethod
    def _from_metadata_block(
        block: MetadataBlock,
    ) -> 'HeaderBlock':
        if 'title' not in block.content:
            block.content['title'] = "Title"
        hblock = HeaderBlock(
            content=block.content,
            comment=block.comment,
            private_=block.private_,
        )
        return hblock

    @staticmethod
    def _from_tokens(
        stack: list[tuple['Token', str]],
        mapped_keys: Mapping[str, str] | None = None,
    ) -> 'HeaderBlock | ErrorBlock':
        block = MetadataBlock._from_tokens(stack, mapped_keys)
        if isinstance(block, MetadataBlock):
            return HeaderBlock._from_metadata_block(block)
        return block

    @staticmethod
    def _from_dict(
        dct: MetadataDict | dict[object, object],
    ) -> 'HeaderBlock|ErrorBlock':
        block = MetadataBlock._from_dict(dct)
        match block:
            case ErrorBlock():
                return block
            case MetadataBlock():
                return HeaderBlock._from_metadata_block(block)

    @staticmethod
    def from_default(source: str = "") -> 'HeaderBlock':
        """Instantiate a default header block."""
        if not source:
            source = "Title"
        return HeaderBlock(content={'title': source})

from_default(source='') staticmethod

Instantiate a default header block.

Source code in lmm/markdown/parse_markdown.py
412
413
414
415
416
417
@staticmethod
def from_default(source: str = "") -> 'HeaderBlock':
    """Instantiate a default header block."""
    if not source:
        source = "Title"
    return HeaderBlock(content={'title': source})

get_info()

Printable block properties and content.

Source code in lmm/markdown/parse_markdown.py
362
363
364
365
366
367
368
369
370
371
372
373
def get_info(self) -> str:
    """Printable block properties and content."""
    info = "\n-------------\nHeader block"
    info += f" # {self.comment}\n" if self.comment else "\n"
    info += (
        pya.dump_yaml(self.content) if self.content else "<empty>"
    )
    if self.private_:
        info += "\n\nAdditional data:" + pya.dump_yaml(
            self.private_
        )
    return info

HeadingBlock

Bases: BaseModel

This object represents a heading of the markdown document. A heading is a single line starting with one to six '#' characters followed by a space, and the title text.

Important functions: serialize() reconstitutes a text representation of the heading get_content() the title given by the heading text

Source code in lmm/markdown/parse_markdown.py
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
class HeadingBlock(BaseModel):
    """This object represents a heading of the markdown document.
    A heading is a single line starting with one to six '#'
    characters followed by a space, and the title text.

    Important functions:
    serialize()     reconstitutes a text representation of the heading
    get_content()   the title given by the heading text
    """

    level: int
    content: str
    attributes: str = ""
    type: Literal['heading'] = 'heading'

    def __eq__(self, obj: object) -> bool:
        if not isinstance(obj, HeadingBlock):
            return False
        if not (
            self.level == obj.level
            and self.content == obj.content
            and self.attributes == obj.attributes
        ):
            return False
        return True

    def __ne__(self, obj: object) -> bool:
        return not self.__eq__(obj)

    def serialize(self) -> str:
        """A parsable textual representation of the block."""
        strrep = "#" * self.level + " " + self.content
        if self.attributes:
            strrep = strrep + " {" + self.attributes + "}"
        return strrep + "\n"

    def get_info(self) -> str:
        """Printable block properties and content."""
        info = "\n-------------\nHeading block\n"
        info += str(self.content) if self.content else "<empty>"
        return info

    def get_content(self) -> str:
        """Returns the heading text"""
        return self.content

    def deep_copy(self) -> 'HeadingBlock':
        return self.model_copy(deep=True)

    @staticmethod
    def _from_tokens(
        stack: list[tuple['Token', str]],
    ) -> 'HeadingBlock | ErrorBlock':
        # we assume that this is a single token with a heading content
        if len(stack) > 1:
            # throw error: this should not happen
            raise RuntimeError(
                "Unexpected token stack: Heading block should only "
                + "contain one line"
            )

        content = stack[0][1]

        # empty heading
        m = re.match(r'^#{1,6}\s*$', content)
        if m:
            return ErrorBlock(
                content="Empty heading content (a series of '#'"
                + " followed by space)",
                origin=content,
            )

        # check attributes: text delimited by '{' '}' at end of line
        m = re.search(r'\s+\{(.*?)\}\s*$', content)
        if m:
            # Extract the content before the attributes
            content = content[: m.start()].strip()
            attr_text = m.group(1).strip()
        else:
            attr_text = ""

        # parse heading at last, 1 to 6 '#' (guaranteed by
        # tokenization) followed by space and text
        m = re.search(r'^(#+)\s+(.+)', content)
        if not m:
            if attr_text:
                return ErrorBlock(
                    content="The heading specifies attributes, but "
                    + "there is no heading text",
                    origin=stack[0][1],
                )
            else:
                return ErrorBlock(
                    content="Cannot parse heading content",
                    origin=stack[0][1],
                )
        try:
            block = HeadingBlock(
                level=len(m.group(1)),
                content=m.group(2).strip(),
                attributes=attr_text,
            )
        except Exception as e:
            return ErrorBlock(
                content="Could not parse heading",
                errormsg=str(e),
                origin=stack[0][1],
            )
        return block

get_content()

Returns the heading text

Source code in lmm/markdown/parse_markdown.py
462
463
464
def get_content(self) -> str:
    """Returns the heading text"""
    return self.content

get_info()

Printable block properties and content.

Source code in lmm/markdown/parse_markdown.py
456
457
458
459
460
def get_info(self) -> str:
    """Printable block properties and content."""
    info = "\n-------------\nHeading block\n"
    info += str(self.content) if self.content else "<empty>"
    return info

serialize()

A parsable textual representation of the block.

Source code in lmm/markdown/parse_markdown.py
449
450
451
452
453
454
def serialize(self) -> str:
    """A parsable textual representation of the block."""
    strrep = "#" * self.level + " " + self.content
    if self.attributes:
        strrep = strrep + " {" + self.attributes + "}"
    return strrep + "\n"

MetadataBlock

Bases: BaseModel

This object represents the data of a metadata block in a markdown document.

Important functions: serialize() reconstitute a text representation of the metadata get_content() the metadata get_key(key, default) a metadata value indexed by key

Source code in lmm/markdown/parse_markdown.py
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
class MetadataBlock(BaseModel):
    """This object represents the data of a metadata block in a
    markdown document.

    Important functions:
    serialize()     reconstitute a text representation of the metadata
    get_content()   the metadata
    get_key(key, default) a metadata value indexed by key
    """

    content: MetadataDict = {}
    comment: str = ""
    private_: list[object] = []
    type: Literal['metadata'] = 'metadata'

    def __eq__(self, obj: object) -> bool:
        if not isinstance(obj, MetadataBlock | HeaderBlock):
            return False
        if not (
            self.content == obj.content
            and self.comment == obj.comment
        ):
            return False
        if len(self.private_) != len(obj.private_):
            return False
        for i, d in enumerate(self.private_):
            try:
                if not d == obj.private_[i]:
                    return False
            except Exception:
                # Comparison may fail for complex objects
                return False
        return True

    def __ne__(self, obj: object) -> bool:
        return not self.__eq__(obj)

    def serialize(self) -> str:
        """A parsable textual representation of the block."""
        strrep = "---"
        if self.comment:
            strrep = strrep + " # " + self.comment
        # reconstitute original yaml block (see parse_yaml.py)
        content: str = pya.serialize_yaml_parse(
            (self.content, self.private_)
        )
        strrep = strrep + '\n' + content
        return strrep + "---\n"

    def get_info(self) -> str:
        """Printable block properties and content."""
        info = "\n-------------\nMetadata block"
        info += f" # {self.comment}\n" if self.comment else "\n"
        info += (
            pya.dump_yaml(self.content) if self.content else "<empty>"
        )
        if self.private_:
            info += "\n\nAdditional data:\n" + pya.dump_yaml(
                self.private_
            )
        return info

    def get_content(self) -> MetadataDict:
        """Returns a dictionary with the metadata."""
        return self.content

    def get_key(
        self, key: str, default: MetadataValue = None
    ) -> MetadataValue:
        """Returns the value of a key in the metadata."""
        return (
            self.content[key]
            if key in self.content.keys()
            else default
        )

    def get_key_type(
        self, key: str, value_type: type[T], default: T
    ) -> T:
        """Get a metadata value if of type T. The argument
        default must be given.

        Args:
            key: the key (a string)
            value_type: the type of the value. Due to the limitations
                of the Python type system, no algebraic or parametric
                type may be used here.
            default: (of type value_type) a default return value.

        Returns:
            a value of type value_type.

        Note:
            To code for a checkable None (but violate the type model)
            ```python
            value = get_key_type('title', str | None, None)
            ```
            Here, value is of type str | None, and the function will
            return None if 'title' is not found or is not a
            string.
        """
        value: MetadataValue = (
            self.content[key] if key in self.content.keys() else None
        )
        if isinstance(value, value_type):
            return value
        else:
            return default

    def deep_copy(self) -> 'MetadataBlock':
        return self.model_copy(deep=True)

    @staticmethod
    def _from_tokens(
        stack: list[tuple['Token', str]],
        mapped_keys: Mapping[str, str] | None = None,
    ) -> 'MetadataBlock | ErrorBlock':
        if not stack:
            # this is a programming error
            raise ValueError(
                "Invalid call to _from_tokens with empty list."
            )

        # check for comments
        comment_match = stack[0][1].strip().split('#', 1)
        comment = (
            comment_match[1].strip() if len(comment_match) > 1 else ''
        )

        # we assume the first and last tokens to be metadata markers
        content = '\n'.join([y for (_, y) in stack[1:-1]])

        # Check YAML content size to prevent YAML bombs
        if len(content) > MAX_YAML_BLOCK_SIZE:
            offending_meta = '\n'.join([y for (_, y) in stack])
            return ErrorBlock(
                content=f"Metadata block too large ({len(content)} bytes, max {MAX_YAML_BLOCK_SIZE})",
                origin=offending_meta[:500] + "..." if len(offending_meta) > 500 else offending_meta,
            )

        # first use yaml parser, catching any error
        try:
            yamldata: Any = yaml.safe_load(content)
        except yaml.YAMLError as e:
            offending_meta = '\n'.join([y for (_, y) in stack])
            return ErrorBlock(
                content="\nYAML parse error in metadata block.",
                errormsg=str(e),
                origin=offending_meta,
            )

        # this returns the part of the yaml block that we want to
        # use here in 'part', the rest of the block in 'whole'.
        # See parse_yaml.py for explanation.
        try:
            part: dict[str, MetadataValue]
            whole: list[object]
            part, whole = pya.split_yaml_parse(yamldata, mapped_keys)
        except ValueError as e:
            # These are metadata fields rejected by split_yaml_parse
            offending_meta = '\n'.join([y for (_, y) in stack])
            return ErrorBlock(
                content="Invalid markdown header.",
                errormsg=str(e),
                origin=offending_meta,
            )

        if (not part) and (not whole):
            return ErrorBlock(
                content="Invalid or empty metadata block.",
                origin="---\n---",
            )
        # We should be able to cope with this now
        # if not part:
        #     invalid_meta = '\n'.join([y for (_, y) in stack])
        #     return ErrorBlock(
        #         content="Metadata contains a dictionary or a list"
        #         + "of dictionaries that not acceptable for use in"
        #         + " LM markdown",
        #         origin=invalid_meta,
        #     )

        try:
            block = MetadataBlock(
                content=part, private_=whole, comment=comment
            )
        except (ValidationError, TypeError, ValueError, KeyError):
            # First fallback: try to store in private_ field
            try:
                block = MetadataBlock(
                    content={},
                    private_=[part] + whole,
                    comment=(
                        "Invalid (too deeply nested?) metadata, "
                        "ignored by model."
                    )
                )
            except ValidationError:
                # Pydantic errors ensue from nested dictionaries
                return ErrorBlock(
                    content="Could not parse metadata:"
                    + " YAML object type not supported.",
                    errormsg="",  # a convoluted pydantic message
                    origin='\n'.join([y for (_, y) in stack]),
                )
            except (TypeError, ValueError, KeyError) as e:
                return ErrorBlock(
                    content="Could not parse metadata.",
                    errormsg=str(e),
                    origin='\n'.join([y for (_, y) in stack]),
                )
        return block

    @staticmethod
    def _from_dict(
        dct: MetadataDict | dict[object, object],
    ) -> 'MetadataBlock|ErrorBlock':
        if not pya.is_metadata_dict(dct):
            return ErrorBlock(content="Invalid data for metadata.")
        # now dct is a metadata dict
        try:
            block = MetadataBlock(content=dct)
        except ValidationError:
            # This is a pydantic type error
            return ErrorBlock(
                content="Invalid dictionary for metadata "
                + "(too deep nesting, or invalid data types)."
            )
        except Exception as e:
            return ErrorBlock(
                content="Invalid dictionary for metadata",
                errormsg=str(e),
            )
        return block

get_content()

Returns a dictionary with the metadata.

Source code in lmm/markdown/parse_markdown.py
172
173
174
def get_content(self) -> MetadataDict:
    """Returns a dictionary with the metadata."""
    return self.content

get_info()

Printable block properties and content.

Source code in lmm/markdown/parse_markdown.py
159
160
161
162
163
164
165
166
167
168
169
170
def get_info(self) -> str:
    """Printable block properties and content."""
    info = "\n-------------\nMetadata block"
    info += f" # {self.comment}\n" if self.comment else "\n"
    info += (
        pya.dump_yaml(self.content) if self.content else "<empty>"
    )
    if self.private_:
        info += "\n\nAdditional data:\n" + pya.dump_yaml(
            self.private_
        )
    return info

get_key(key, default=None)

Returns the value of a key in the metadata.

Source code in lmm/markdown/parse_markdown.py
176
177
178
179
180
181
182
183
184
def get_key(
    self, key: str, default: MetadataValue = None
) -> MetadataValue:
    """Returns the value of a key in the metadata."""
    return (
        self.content[key]
        if key in self.content.keys()
        else default
    )

get_key_type(key, value_type, default)

Get a metadata value if of type T. The argument default must be given.

Parameters:

Name Type Description Default
key str

the key (a string)

required
value_type type[T]

the type of the value. Due to the limitations of the Python type system, no algebraic or parametric type may be used here.

required
default T

(of type value_type) a default return value.

required

Returns:

Type Description
T

a value of type value_type.

Note

To code for a checkable None (but violate the type model)

value = get_key_type('title', str | None, None)

Here, value is of type str | None, and the function will return None if 'title' is not found or is not a string.

Source code in lmm/markdown/parse_markdown.py
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
def get_key_type(
    self, key: str, value_type: type[T], default: T
) -> T:
    """Get a metadata value if of type T. The argument
    default must be given.

    Args:
        key: the key (a string)
        value_type: the type of the value. Due to the limitations
            of the Python type system, no algebraic or parametric
            type may be used here.
        default: (of type value_type) a default return value.

    Returns:
        a value of type value_type.

    Note:
        To code for a checkable None (but violate the type model)
        ```python
        value = get_key_type('title', str | None, None)
        ```
        Here, value is of type str | None, and the function will
        return None if 'title' is not found or is not a
        string.
    """
    value: MetadataValue = (
        self.content[key] if key in self.content.keys() else None
    )
    if isinstance(value, value_type):
        return value
    else:
        return default

serialize()

A parsable textual representation of the block.

Source code in lmm/markdown/parse_markdown.py
147
148
149
150
151
152
153
154
155
156
157
def serialize(self) -> str:
    """A parsable textual representation of the block."""
    strrep = "---"
    if self.comment:
        strrep = strrep + " # " + self.comment
    # reconstitute original yaml block (see parse_yaml.py)
    content: str = pya.serialize_yaml_parse(
        (self.content, self.private_)
    )
    strrep = strrep + '\n' + content
    return strrep + "---\n"

TextBlock

Bases: BaseModel

This object represents a text block from the markdown document. The text block starts after a heading, a metadata block, or a blank line, and ends with a blank line or the end of the document.

Important functions: serialize() reconstitutes a text representation of the block get_content() returns a string with the text content extend() extends the text with that of another text block

Source code in lmm/markdown/parse_markdown.py
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
class TextBlock(BaseModel):
    """This object represents a text block from the markdown document.
    The text block starts after a heading, a metadata block, or a
    blank line, and ends with a blank line or the end of the document.

    Important functions:
    serialize()     reconstitutes a text representation of the block
    get_content()   returns a string with the text content
    extend()        extends the text with that of another text block
    """

    content: str
    type: Literal['text'] = 'text'

    def __eq__(self, obj: object) -> bool:
        if not isinstance(obj, TextBlock):
            return False
        if not self.content == obj.content:
            return False
        return True

    def __ne__(self, obj: object) -> bool:
        return not self.__eq__(obj)

    def serialize(self) -> str:
        """A parsable textual representation of the block."""
        return self.content + "\n"

    def get_info(self) -> str:
        """Printable block properties and content."""
        info = "\n-------------\nText block\n"
        if self.content:
            content = self.content.split()
            if len(content) > MAX_INFO_WORDS:
                content = content[:MAX_INFO_WORDS - 1] + ["..."]
            info += " ".join(content) if content else "<empty>"
        else:
            info += "<empty>"
        return info

    def get_word_count(self) -> int:
        """Get the word count in the text block"""
        return len(self.content.split())

    def get_content(self) -> str:
        """Returns the text of the text block"""
        return self.content

    def is_empty(self) -> bool:
        return not self.content

    def extend(self, text: 'str | TextBlock') -> 'TextBlock':
        """Extend the content of the block with new text of with
        the content of another text block. The new content is
        added at the end of the block.
        Returns: a (reference to) the modified block."""
        value: str
        match text:
            case str():
                value = text
            case TextBlock() as block:
                value = block.get_content()
        if self.is_empty():
            self.content = value
        else:
            self.content = self.content + "\n\n" + value

        return self

    def deep_copy(self) -> 'TextBlock':
        return self.model_copy(deep=True)

    @staticmethod
    def _from_tokens(stack: list[tuple['Token', str]]) -> 'TextBlock':
        # we assume that the first token is a content
        # and the last one is a blank line
        content = '\n'.join([y for (_, y) in stack[0:-1]])
        # we clear printed output of error blocks to allow re-scanning
        if content.startswith("** ERROR:"):
            return TextBlock(content="")

        return TextBlock(content=content)

    @staticmethod
    def from_text(text: str) -> 'TextBlock':
        """Instantiate a new text block from text."""
        return TextBlock(content=text)

extend(text)

Extend the content of the block with new text of with the content of another text block. The new content is added at the end of the block. Returns: a (reference to) the modified block.

Source code in lmm/markdown/parse_markdown.py
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
def extend(self, text: 'str | TextBlock') -> 'TextBlock':
    """Extend the content of the block with new text of with
    the content of another text block. The new content is
    added at the end of the block.
    Returns: a (reference to) the modified block."""
    value: str
    match text:
        case str():
            value = text
        case TextBlock() as block:
            value = block.get_content()
    if self.is_empty():
        self.content = value
    else:
        self.content = self.content + "\n\n" + value

    return self

from_text(text) staticmethod

Instantiate a new text block from text.

Source code in lmm/markdown/parse_markdown.py
614
615
616
617
@staticmethod
def from_text(text: str) -> 'TextBlock':
    """Instantiate a new text block from text."""
    return TextBlock(content=text)

get_content()

Returns the text of the text block

Source code in lmm/markdown/parse_markdown.py
575
576
577
def get_content(self) -> str:
    """Returns the text of the text block"""
    return self.content

get_info()

Printable block properties and content.

Source code in lmm/markdown/parse_markdown.py
559
560
561
562
563
564
565
566
567
568
569
def get_info(self) -> str:
    """Printable block properties and content."""
    info = "\n-------------\nText block\n"
    if self.content:
        content = self.content.split()
        if len(content) > MAX_INFO_WORDS:
            content = content[:MAX_INFO_WORDS - 1] + ["..."]
        info += " ".join(content) if content else "<empty>"
    else:
        info += "<empty>"
    return info

get_word_count()

Get the word count in the text block

Source code in lmm/markdown/parse_markdown.py
571
572
573
def get_word_count(self) -> int:
    """Get the word count in the text block"""
    return len(self.content.split())

serialize()

A parsable textual representation of the block.

Source code in lmm/markdown/parse_markdown.py
555
556
557
def serialize(self) -> str:
    """A parsable textual representation of the block."""
    return self.content + "\n"

blocklist_copy(blocks, filter_func=None)

Return a deep copy of the blocklist.

Parameters:

Name Type Description Default
blocks list[Block]

the block list to copy.

required
filter_func Callable[[Block], bool] | None

a predicate function to filter the blocks. If not provided, all blocks are returned.

None

Returns:

Type Description
list[Block]

A copy of the block list.

Examples:

A deep copy of all text blocks:

blockscopy = blocklist_copy(
    blocklist, lambda b: isinstance(b, TextBlock)
)

Notes: To copy by reference, stardard Python syntax may be used:

```python
blockscopy = [b for b in blocks if isinstance(b, TextBlock)]
```
Source code in lmm/markdown/parse_markdown.py
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
def blocklist_copy(
    blocks: list[Block],
    filter_func: Callable[[Block], bool] | None = None,
) -> list[Block]:
    """Return a deep copy of the blocklist.

    Args:
        blocks: the block list to copy.
        filter_func: a predicate function to filter the blocks. If
            not provided, all blocks are returned.

    Returns:
        A copy of the block list.

    Examples:
        A deep copy of all text blocks:
        ```python
        blockscopy = blocklist_copy(
            blocklist, lambda b: isinstance(b, TextBlock)
        )
        ```
    Notes:
        To copy by reference, stardard Python syntax may be used:

        ```python
        blockscopy = [b for b in blocks if isinstance(b, TextBlock)]
        ```
    """
    return (
        [b.deep_copy() for b in blocks]
        if filter_func is None
        else [b.deep_copy() for b in blocks if filter_func(b)]
    )

blocklist_errors(blocks)

Return a list of errors in the block list.

Source code in lmm/markdown/parse_markdown.py
1006
1007
1008
1009
1010
1011
1012
def blocklist_errors(blocks: list[Block]) -> list[ErrorBlock]:
    """Return a list of errors in the block list."""
    return [
        block.deep_copy()
        for block in blocks
        if isinstance(block, ErrorBlock)
    ]

blocklist_get_info(blocks)

Collect info on all blocks in the list

Source code in lmm/markdown/parse_markdown.py
1058
1059
1060
def blocklist_get_info(blocks: list[Block]) -> str:
    """Collect info on all blocks in the list"""
    return "\n".join([x.get_info() for x in blocks])

blocklist_haserrors(blocks)

Check if the block list contains errors.

Source code in lmm/markdown/parse_markdown.py
1015
1016
1017
1018
1019
1020
def blocklist_haserrors(blocks: list[Block]) -> bool:
    """Check if the block list contains errors."""
    for b in blocks:
        if isinstance(b, ErrorBlock):
            return True
    return False

blocklist_map(blocks, map_func, filter_func=lambda _: True)

Apply map_func to all blocks that satisfy the predicate filter_func.

Example

Write a function that applies a replacement text to all contents of text blocks.

def blocklist_replace(
    blocks: list[Block], target: str, replacement: str
) -> list[Block]:
    def replace_block(block: Block) -> Block:
        return (
            TextBlock.from_text(
                block.get_content().replace(
                    target, replacement
                )
            )
            if isinstance(block, TextBlock)
            else block
        )

    return blocklist_map(blocks, replace_block)
Source code in lmm/markdown/parse_markdown.py
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
def blocklist_map(
    blocks: list[Block],
    map_func: Callable[[Block], Block],
    filter_func: Callable[[Block], bool] = lambda _: True,
) -> list[Block]:
    """Apply map_func to all blocks that satisfy the predicate
    filter_func.

    Example:
        Write a function that applies a replacement text to all
        contents of text blocks.
        ```python
        def blocklist_replace(
            blocks: list[Block], target: str, replacement: str
        ) -> list[Block]:
            def replace_block(block: Block) -> Block:
                return (
                    TextBlock.from_text(
                        block.get_content().replace(
                            target, replacement
                        )
                    )
                    if isinstance(block, TextBlock)
                    else block
                )

            return blocklist_map(blocks, replace_block)
        ```
    """
    return [
        map_func(b.deep_copy()) if filter_func(b) else b
        for b in blocks
    ]

load_blocks(source, *, max_size_mb=DEFAULT_MAX_SIZE_MB, warn_size_mb=DEFAULT_WARN_SIZE_MB, logger)

Load a pandoc markdown file into structured blocks. Used in development.

This function loads the entire file into memory for parsing. For very large files, this may consume significant memory. The file size limits are enforced to prevent excessive memory usage.

Parameters:

Name Type Description Default
source str | Path

Path to a markdown file.

required
max_size_mb float

the max size, in MB, of the file to load. Files larger than this will not be loaded and an error will be logged.

DEFAULT_MAX_SIZE_MB
warn_size_mb float

the size of the input file that results in a warning being logged. Use this to be notified when processing larger files.

DEFAULT_WARN_SIZE_MB
logger LoggerBase

a logger object (defaults to console logging)

required

Returns:

Type Description
list[Block]

List of Block objects (HeaderBlock, MetadataBlock, HeadingBlock, TextBlock, ErrorBlock) representing the parsed content. Returns an empty list if the file cannot be loaded.

Note

This function processes the entire file in memory and is not suitable for streaming very large files. For files approaching the max_size_mb limit, consider processing in smaller chunks or using alternative approaches.

Source code in lmm/markdown/parse_markdown.py
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
def load_blocks(
    source: str | Path,
    *,
    max_size_mb: float = DEFAULT_MAX_SIZE_MB,
    warn_size_mb: float = DEFAULT_WARN_SIZE_MB,
    logger: LoggerBase,
) -> list[Block]:
    """Load a pandoc markdown file into structured blocks. Used in
    development.

    This function loads the entire file into memory for parsing. For
    very large files, this may consume significant memory. The file
    size limits are enforced to prevent excessive memory usage.

    Args:
        source: Path to a markdown file.
        max_size_mb: the max size, in MB, of the file to load. Files
            larger than this will not be loaded and an error will be
            logged.
        warn_size_mb: the size of the input file that results in
            a warning being logged. Use this to be notified when
            processing larger files.
        logger: a logger object (defaults to console logging)

    Returns:
        List of Block objects (HeaderBlock, MetadataBlock,
            HeadingBlock, TextBlock, ErrorBlock) representing
            the parsed content. Returns an empty list if the file
            cannot be loaded.

    Note:
        This function processes the entire file in memory and is not
        suitable for streaming very large files. For files approaching
        the max_size_mb limit, consider processing in smaller chunks
        or using alternative approaches.
    """

    # Load the markdown
    from .ioutils import load_markdown

    content = load_markdown(source, logger, max_size_mb, warn_size_mb)
    if not content:
        return []

    # Parse it
    blocks = parse_markdown_text(content, logger=logger)

    # Check for errors in the block list and log them to console
    from .ioutils import report_error_blocks

    report_error_blocks(blocks, logger)

    # Returns all blocks, also error blocks
    return blocks

parse_markdown_text(content, mapped_keys=None, logger=None)

Parse a pandoc markdown text into structured blocks.

Parameters:

Name Type Description Default
content str

a string containing markdown document.

required
mapped_keys Mapping[str, str] | None

a dictionary mapping keys to a replacement value, used to replace short-form of metadata entries of the user (for example, ?: maps to query: for a mapped key of {'?': "query"}). Does not affect keys in the header block.

None
logger LoggerBase | None

a logger object. This function does not raise or logs errors, because parse errors will be propagated by error blocks. If you pass a logger, you get warning about possible block and heading markers preceded by space, which are not errors strictly speaking, but may be unnitended typos with consequences.

None

Returns:

Type Description
list[Block]

List of Block objects (HeaderBlock, MetadataBlock,

list[Block]

HeadingBlock, TextBlock, ErrorBlock) representing

list[Block]

the parsed content.

Source code in lmm/markdown/parse_markdown.py
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
def parse_markdown_text(
    content: str,
    mapped_keys: Mapping[str, str] | None = None,
    logger: LoggerBase | None = None,
) -> list[Block]:
    """Parse a pandoc markdown text into structured blocks.

    Args:
        content: a string containing markdown document.
        mapped_keys: a dictionary mapping keys to a replacement value,
            used to replace short-form of metadata entries of the
            user (for example, ?: maps to query: for a mapped key of
            {'?': "query"}). Does not affect keys in the header block.
        logger: a logger object. This function does not raise or logs
            errors, because parse errors will be propagated by error
            blocks. If you pass a logger, you get warning about
            possible block and heading markers preceded by space,
            which are not errors strictly speaking, but may be
            unnitended typos with consequences.

    Returns:
        List of Block objects (HeaderBlock, MetadataBlock,
        HeadingBlock, TextBlock, ErrorBlock) representing
        the parsed content.

    Related functions:
        - serialize_blocks: Recreates Markdown text from blocks
        - blocklist_haserrors: Checks if parsing was successful
        - blocklist_errors: Returns list of error blocks
        - blocklist_get_info: Return description of the blocks
    """

    if not content:
        return []

    # preproc
    lines: list[str] = content.splitlines()

    # check for possible unintended misspecified blocks or headings
    if logger is not None:
        for i, line in enumerate(lines):
            if re.match(r'^\s+---(\s*|$)', line):
                logger.warning(
                    "A metadata marker preceded by space "
                    f"found at line {i + 1}. Are you sure? it"
                    " will be parsed as text."
                )

            if re.match(r'^\s+#\s+\w+', line):
                logger.warning(
                    "A heading marker preceded by space "
                    f"found at line {i + 1}. Are you sure? it"
                    " will be parsed as text."
                )

    # proc
    tokens = _tokenizer(lines)
    blocks = _parser(tokens, mapped_keys)

    return blocks

save_blocks(file_name, blocks, logger)

Write a list of Block objects to a markdown file. Used in development.

Parameters:

Name Type Description Default
file_name str | Path

Path to the output file (string or Path object)

required
blocks list[Block]

List of Block objects to be serialized

required
logger LoggerBase

Logger object for error reporting

required

Returns:

Type Description
bool

Boolean indicating success or failure

Source code in lmm/markdown/parse_markdown.py
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
def save_blocks(
    file_name: str | Path, blocks: list[Block], logger: LoggerBase
) -> bool:
    """Write a list of Block objects to a markdown file. Used in
    development.

    Args:
        file_name: Path to the output file (string or Path object)
        blocks: List of Block objects to be serialized
        logger: Logger object for error reporting

    Returns:
        Boolean indicating success or failure
    """
    from .ioutils import save_markdown

    return save_markdown(file_name, blocks, logger)

save_blocks_debug(file_name, blocks, sep='', logger=ConsoleLogger())

A debug version of save_blocks, with a separator string added to make clear where the block boundaries are. Used in development.

Source code in lmm/markdown/parse_markdown.py
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
def save_blocks_debug(
    file_name: str | Path, 
    blocks: list[Block], 
    sep: str = "",
    logger: LoggerBase = ConsoleLogger(),
) -> None:
    """A debug version of save_blocks, with a separator string
    added to make clear where the block boundaries are. Used in
    development."""

    from .ioutils import save_markdown

    content = ""
    for b in blocks:
        content += b.serialize()
        if isinstance(b, TextBlock):
            content += sep + "\n"
        content += "\n"

    save_markdown(file_name, content, logger)

serialize_blocks(blocks)

Convert a list of Block objects to a markdown string.

Joins the string representations of all blocks, adding blank lines between blocks as appropriate based on their types. No blank line is added after header blocks or before heading blocks.

Parameters:

Name Type Description Default
blocks list[Block]

List of Block objects to convert

required

Returns:

Type Description
str

A string containing the markdown representation of the blocks.

Source code in lmm/markdown/parse_markdown.py
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
def serialize_blocks(blocks: list[Block]) -> str:
    """Convert a list of Block objects to a markdown string.

    Joins the string representations of all blocks, adding blank lines
    between blocks as appropriate based on their types. No blank line
    is added after header blocks or before heading blocks.

    Args:
        blocks: List of Block objects to convert

    Returns:
        A string containing the markdown representation of the blocks.

    Related functions:
        - parse_markdown_text: Converts a markdown string into a
            parsed block list.
    """
    if not blocks:
        return ""

    last_block = blocks[0]
    content = last_block.serialize()
    trailing_block_types = (HeaderBlock, HeadingBlock, TextBlock)
    for block in blocks[1:]:
        if isinstance(last_block, trailing_block_types):
            content += '\n'
        content += block.serialize()
        last_block = block

    return (
        content[:-1]
        if content[-1] == "\n" and isinstance(last_block, TextBlock)
        else content
    )

Utilities to work with block lists: blockutils

Utilities to work with lists of markdown blocks.

Note: Most functions mutate the content of block lists in place. To avoid this or maintain referential transparency, call blocklist_copy() from lmm.markdown.parse_markdown before using these functions.

Main Functions: - compose(): Compose multiple block processing functions - clear_metadata(): Remove metadata blocks from lists - clear_metadata_properties(): Remove specific properties from metadata blocks - merge_textblocks(): Merge contiguous text blocks - unmerge_textblocks(): Split merged text blocks at blank lines - merge_textblocks_if(): Conditionally merge text blocks based on predicate - merge_equation_blocks(): Merge text blocks separated by equations - merge_code_blocks(): Merge text blocks separated by code blocks - merge_short_textblocks(): Merge short text blocks based on word count

Behaviour: All functions are pure in the sense that they do not raise exceptions under normal usage. They accept well-formed block lists and return transformed block lists. No custom logger is used; functions follow a functional programming style.

clear_metadata(blocks)

Remove all metadata blocks from the block list.

Parameters:

Name Type Description Default
blocks list[Block]

List of markdown blocks to filter

required

Returns:

Type Description
list[Block]

New list with all MetadataBlock instances removed

Source code in lmm/markdown/blockutils.py
69
70
71
72
73
74
75
76
77
78
79
def clear_metadata(blocks: list[Block]) -> list[Block]:
    """
    Remove all metadata blocks from the block list.

    Args:
        blocks: List of markdown blocks to filter

    Returns:
        New list with all MetadataBlock instances removed
    """
    return [b for b in blocks if b.type != "metadata"]

clear_metadata_properties(blocks, keys)

Remove key/value properties from metadata blocks as specified by keys.

Metadata blocks with no remaining properties after removal are deleted unless they contain private metadata (private_ field).

Parameters:

Name Type Description Default
blocks list[Block]

List of markdown blocks to process

required
keys list[str]

List of property keys to remove from metadata blocks

required

Returns:

Type Description
list[Block]

New list with specified properties removed from MetadataBlock instances.

list[Block]

MetadataBlocks that become empty (no content and no private_ data) are

list[Block]

excluded from the result.

Source code in lmm/markdown/blockutils.py
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
def clear_metadata_properties(
    blocks: list[Block], keys: list[str]
) -> list[Block]:
    """
    Remove key/value properties from metadata blocks as specified by keys.

    Metadata blocks with no remaining properties after removal are deleted unless
    they contain private metadata (private_ field).

    Args:
        blocks: List of markdown blocks to process
        keys: List of property keys to remove from metadata blocks

    Returns:
        New list with specified properties removed from MetadataBlock instances.
        MetadataBlocks that become empty (no content and no private_ data) are
        excluded from the result.
    """
    if not keys:
        return blocks

    blocklist: list[Block] = []
    for b in blocks:
        if isinstance(b, MetadataBlock):
            newb: MetadataBlock = b.deep_copy()
            for k in keys:
                newb.content.pop(k, None)
            if len(newb.content) > 0 or bool(newb.private_):
                blocklist.append(newb)
        else:
            blocklist.append(b)
    return blocklist

compose(*funcs)

Compose multiple functions that process lists of Block objects. Functions are applied from left to right, so compose(f, g, h)(x) is equivalent to h(g(f(x))).

Parameters:

Name Type Description Default
*funcs BlockFunc

Variable number of functions, each taking a list[Block] and returning a list[Block]

()

Returns:

Type Description
BlockFunc

A function that applies all input functions in sequence. If no

BlockFunc

functions are provided, returns the identity function. If one

BlockFunc

function is provided, returns that function.

Source code in lmm/markdown/blockutils.py
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
def compose(*funcs: BlockFunc) -> BlockFunc:
    """
    Compose multiple functions that process lists of Block objects.
    Functions are applied from left to right, so compose(f, g, h)(x)
    is equivalent to h(g(f(x))).

    Args:
        *funcs: Variable number of functions, each taking a
            list[Block] and returning a list[Block]

    Returns:
        A function that applies all input functions in sequence. If no
        functions are provided, returns the identity function. If one
        function is provided, returns that function.
    """
    if not funcs:
        return lambda x: x

    def _compose_two(f: BlockFunc, g: BlockFunc) -> BlockFunc:
        return lambda x: g(f(x))

    return reduce(_compose_two, funcs)

merge_code_blocks(blocks, linecount=12)

Merge text blocks together that are separated by code blocks of size less or equal to linecount.

Parameters:

Name Type Description Default
blocks list[Block]

List of markdown blocks to process

required
linecount int

Maximum number of lines in code blocks that will trigger merging (default: 12)

12

Returns:

Type Description
list[Block]

New list where TextBlock instances are merged when separated by small

list[Block]

code blocks (markdown fenced code blocks with ``` delimiters).

Source code in lmm/markdown/blockutils.py
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
def merge_code_blocks(
    blocks: list[Block], linecount: int = 12
) -> list[Block]:
    """
    Merge text blocks together that are separated by code blocks of size less
    or equal to linecount.

    Args:
        blocks: List of markdown blocks to process
        linecount: Maximum number of lines in code blocks that will trigger
            merging (default: 12)

    Returns:
        New list where TextBlock instances are merged when separated by small
        code blocks (markdown fenced code blocks with ``` delimiters).
    """

    def _is_code_block(b: TextBlock) -> bool:
        content: str = b.get_content()
        return (
            re.match(
                r"^```(\{[^\n]*\}|(\w+))?\n(.*?)\n```$",
                content,
                re.DOTALL,
            )
            is not None
        ) and (content.count("\n") <= (linecount + 1))

    return merge_textblocks_if(blocks, _is_code_block)

merge_equation_blocks(blocks)

Merge text blocks together that are separated by equations.

Equations are identified as text blocks matching the pattern $$...$$ (LaTeX display math delimiters).

Parameters:

Name Type Description Default
blocks list[Block]

List of markdown blocks to process

required

Returns:

Type Description
list[Block]

New list where TextBlock instances are merged when separated by

list[Block]

equation blocks.

Source code in lmm/markdown/blockutils.py
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
def merge_equation_blocks(blocks: list[Block]) -> list[Block]:
    """
    Merge text blocks together that are separated by equations.

    Equations are identified as text blocks matching the pattern $$...$$
    (LaTeX display math delimiters).

    Args:
        blocks: List of markdown blocks to process

    Returns:
        New list where TextBlock instances are merged when separated by
        equation blocks.
    """

    def _is_eq_block(block: TextBlock) -> bool:
        return (
            re.match(r"^\s*\$\$.*\$\$\s*$", block.get_content())
            is not None
        )

    return merge_textblocks_if(blocks, _is_eq_block)

merge_short_textblocks(blocks, wordthresh=120)

Merges short text blocks together, defined by a word count threshold.

Text blocks with fewer than wordthresh words are merged with the next text block. This continues until a block meets or exceeds the threshold.

Parameters:

Name Type Description Default
blocks list[Block]

List of markdown blocks to process

required
wordthresh int

Minimum word count threshold for text blocks (default: 120)

120

Returns:

Type Description
list[Block]

New list where short consecutive TextBlock instances have been merged.

Source code in lmm/markdown/blockutils.py
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
def merge_short_textblocks(
    blocks: list[Block], wordthresh: int = 120
) -> list[Block]:
    """
    Merges short text blocks together, defined by a word count threshold.

    Text blocks with fewer than wordthresh words are merged with the next
    text block. This continues until a block meets or exceeds the threshold.

    Args:
        blocks: List of markdown blocks to process
        wordthresh: Minimum word count threshold for text blocks (default: 120)

    Returns:
        New list where short consecutive TextBlock instances have been merged.
    """

    if not blocks:
        return []

    blocklist: list[Block] = []
    curblock: Block = blocks[0].deep_copy()
    for b in blocks[1:]:
        match b:
            case TextBlock() as bl:
                if (
                    isinstance(curblock, TextBlock)
                    and len(curblock.get_content().split())
                    < wordthresh
                ):
                    curblock.extend(bl)
                else:
                    # reduce
                    blocklist.append(curblock)
                    curblock = bl.deep_copy()
            case _ as bl:  # reduce
                blocklist.append(curblock)
                curblock = bl
    blocklist.append(curblock)  # reduce

    return blocklist

merge_textblocks(blocks)

Merge contiguous text blocks into larger blocks.

Parameters:

Name Type Description Default
blocks list[Block]

List of markdown blocks to process

required

Returns:

Type Description
list[Block]

New list where consecutive TextBlock instances have been merged using

list[Block]

serialize_blocks to create combined content.

Example
# three blocks
blocks = [
    HeadingBlock(content = "Title")
    TextBlock(content = "Text 1")
    TextBlock(content = "Text 2")
]
# creates two blocks, heading and text
newblocks = merge_textblocks(blocks)
Source code in lmm/markdown/blockutils.py
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
def merge_textblocks(blocks: list[Block]) -> list[Block]:
    """
    Merge contiguous text blocks into larger blocks.

    Args:
        blocks: List of markdown blocks to process

    Returns:
        New list where consecutive TextBlock instances have been merged using
        serialize_blocks to create combined content.

    Example:
        ```python
        # three blocks
        blocks = [
            HeadingBlock(content = "Title")
            TextBlock(content = "Text 1")
            TextBlock(content = "Text 2")
        ]
        # creates two blocks, heading and text
        newblocks = merge_textblocks(blocks)
        ```
    """
    blocklist: list[Block] = []
    text_stack: list[Block] = []
    for b in blocks:
        if isinstance(b, TextBlock):
            # shift
            text_stack.append(b)
        else:
            if len(text_stack) > 0:
                # we have something else than a text block,
                # reduce existing text blocks...
                blocklist.append(
                    TextBlock(content=serialize_blocks(text_stack))
                )
                text_stack.clear()
            # always reduce other blocks
            blocklist.append(b)

    if len(text_stack) > 0:
        # residual text blocks at end of document
        blocklist.append(
            TextBlock(content=serialize_blocks(text_stack))
        )

    return blocklist

merge_textblocks_if(blocks, test_func)

Merge text blocks together that are separated by blocks for which test_func(block) is true.

Parameters:

Name Type Description Default
blocks list[Block]

List of markdown blocks to process

required
test_func Callable[[TextBlock], bool]

Predicate function that takes a TextBlock and returns True if the block should act as a separator triggering merges

required

Returns:

Type Description
list[Block]

New list where TextBlock instances are merged when separated by blocks

list[Block]

for which test_func returns True.

Example
blocks = [
    TextBlock(content = "Text 1")
    TextBlock(content = "Lext 2")
    TextBlock(content = "Text 3")
]
# This creates one single block
newblocks = merge_textblocks_if(blocks,
    lambda x: x.get_content().startswith("Lext"))

# These will also be one single block
newblocks = merge_textblocks_if(blocks[0:1],
    lambda x: x.get_content().startswith("Lext"))
newblocks = merge_textblocks_if(blocks[1:2],
    lambda x: x.get_content().startswith("Lext"))

# This leaves blocks unchanged
newblocks = merge_textblocks_if(blocks,
    lambda x: x.get_content().startswith("Q"))
Source code in lmm/markdown/blockutils.py
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
def merge_textblocks_if(
    blocks: list[Block], test_func: Callable[[TextBlock], bool]
) -> list[Block]:
    """
    Merge text blocks together that are separated by blocks for which
    test_func(block) is true.

    Args:
        blocks: List of markdown blocks to process
        test_func: Predicate function that takes a TextBlock and returns True
            if the block should act as a separator triggering merges

    Returns:
        New list where TextBlock instances are merged when separated by blocks
        for which test_func returns True.

    Example:
        ```python
        blocks = [
            TextBlock(content = "Text 1")
            TextBlock(content = "Lext 2")
            TextBlock(content = "Text 3")
        ]
        # This creates one single block
        newblocks = merge_textblocks_if(blocks,
            lambda x: x.get_content().startswith("Lext"))

        # These will also be one single block
        newblocks = merge_textblocks_if(blocks[0:1],
            lambda x: x.get_content().startswith("Lext"))
        newblocks = merge_textblocks_if(blocks[1:2],
            lambda x: x.get_content().startswith("Lext"))

        # This leaves blocks unchanged
        newblocks = merge_textblocks_if(blocks,
            lambda x: x.get_content().startswith("Q"))
        ```
    """

    if not blocks:
        return []

    test_func_withnone: Callable[[TextBlock | None], bool] = (
        lambda x: (  # noqa: E731
            test_func(x) if x is not None else False
        )
    )

    blocklist: list[Block] = []
    curblock: Block = blocks[0].deep_copy()
    lastappend: TextBlock | None = None
    if isinstance(curblock, TextBlock) and test_func(curblock):
        lastappend = curblock
    for b in blocks[1:]:
        match b:
            case TextBlock() as bl if test_func(bl):
                if isinstance(curblock, TextBlock):
                    curblock.extend(bl)  # shift
                    lastappend = bl
                else:
                    # reduce
                    blocklist.append(curblock)
                    curblock = bl.deep_copy()
                    lastappend = bl
            case TextBlock() as bl:
                if isinstance(
                    curblock, TextBlock
                ) and test_func_withnone(lastappend):
                    curblock.extend(bl)  # shift
                    lastappend = bl
                else:
                    # reduce
                    blocklist.append(curblock)
                    curblock = bl.deep_copy()
                    lastappend = None
            case _:  # reduce
                blocklist.append(curblock)
                curblock = b
                lastappend = None
    blocklist.append(curblock)  # reduce

    return blocklist

unmerge_textblocks(blocks)

Unmerge text blocks separated by blank lines. This function is the inverse of merge_textblocks.

Parameters:

Name Type Description Default
blocks list[Block]

List of markdown blocks to process

required

Returns:

Type Description
list[Block]

New list where TextBlock instances have been split at blank lines using

list[Block]

parse_markdown_text.

Source code in lmm/markdown/blockutils.py
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
def unmerge_textblocks(blocks: list[Block]) -> list[Block]:
    """
    Unmerge text blocks separated by blank lines. This function is the inverse
    of merge_textblocks.

    Args:
        blocks: List of markdown blocks to process

    Returns:
        New list where TextBlock instances have been split at blank lines using
        parse_markdown_text.
    """

    blocklist: list[Block] = []
    for b in blocks:
        if isinstance(b, TextBlock):
            blocklist.extend(parse_markdown_text(b.get_content()))
        else:
            blocklist.append(b)
    return blocklist

Yaml parser

Interface to the pyyaml package.

Layer of functions to work with the output of safe_load to handle list of dictionaries and cover edge cases. YAML can contain a lot of content kinds that are not compatible for use with a vector database, and are not relevant in their use to interact with a language model. The aim here is to isolate an object that is represented in python as a dictionary with string keys. This dictionary will be used to exchange messages with the language model.

Conformant YAML objects consist of dictionaries, or list of dictionaries of type dict[str, elementary_type] where elementary type is one of int, float, bool, str.

This module defines types MetadataDict and MetadataValue, which are union types defining the set of dictionaries and dictionary values that deemed conformant with the use of LM markdown.

The YAML object contained in a metadata block is decomposed into two, 'part', and 'whole'. The 'part' component is the one that may be used in the rest of the application, containing a conformant dictionary. The whole part is kept aside and recomposed with the part when the whole YAML object is reconstituted.

YAML objects consisting of literals only will raise an exception, since it is conceivable that the user intended something different. Byte/imaginary literals are put in whole.

Main functions

split_yaml_parse: split yaml block into part and whole desplit_yaml_parse: recompose yaml block serialize_yaml_parse: serialization utility

Behaviour: raises exception for yaml data that are not parsable or that contain structures other than those of the MetadataDict type.

desplit_yaml_parse(split_parse)

Reconstitute the original yaml object from the tuple constructed by yaml_parse. Dictionaries that were split as some values were not elementary remain split.

Parameters:

Name Type Description Default
split_parse tuple[Mapping[str, MetadataValue], list[object]] | None

a tuple of two elements, the 'part' and the 'whole'.

required

Returns:

Type Description
Any

an object with yaml data.

Source code in lmm/markdown/parse_yaml.py
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
def desplit_yaml_parse(
    split_parse: (
        tuple[Mapping[str, MetadataValue], list[object]] | None
    ),
) -> Any:
    """
    Reconstitute the original yaml object from the tuple
    constructed by yaml_parse. Dictionaries that were split
    as some values were not elementary remain split.

    Args:
        split_parse: a tuple of two elements, the 'part' and
            the 'whole'.

    Returns:
        an object with yaml data.
    """
    if split_parse is None:
        return None
    part, whole = split_parse
    if part == {} and whole == []:
        return None
    if not whole:
        return part
    if part == {}:
        if len(whole) == 1:
            return whole[0]
        else:
            return whole
    else:
        return [part] + whole

dump_yaml(x)

Takes a parsed yaml object and serializes it to a string. Delegates to yaml.safe_dump.

Parameters:

Name Type Description Default
x Any

a parsed yaml object.

required

Returns:

Type Description
str

the string serialization of the object.

Source code in lmm/markdown/parse_yaml.py
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
def dump_yaml(x: Any) -> str:
    """
    Takes a parsed yaml object and serializes it to a string.
    Delegates to yaml.safe_dump.

    Args:
        x: a parsed yaml object.

    Returns:
        the string serialization of the object.
    """
    if x is None:
        return ""

    y: str = (
        yaml.safe_dump(
            x,
            default_flow_style=False,
            width=float("Inf"),
            encoding="utf-8",
            allow_unicode=True,
            indent=1,
            sort_keys=False,
        )
        .decode("utf-8")
        .replace("'''", "'")
        .replace("__NEWLINE__", "\n")
    )
    return re.sub(r"\n\.\.\.\n$", "", y)

is_metadata_dict(data)

Utility function to represent type information when matching

Source code in lmm/markdown/parse_yaml.py
112
113
114
115
116
117
118
def is_metadata_dict(data: object) -> TypeGuard[MetadataDict]:
    """
    Utility function to represent type information when matching
    """
    if not _is_string_dict(data):
        return False
    return all([_is_metadata_type(value) for value in data.values()])

is_metadata_primitive(value)

Utility function to represent type information when matching

Source code in lmm/markdown/parse_yaml.py
92
93
94
95
96
97
98
def is_metadata_primitive(
    value: object,
) -> TypeGuard[MetadataPrimitive]:
    """
    Utility function to represent type information when matching
    """
    return isinstance(value, (int, float, str, bool))

serialize_yaml_parse(split_parse)

Reconstitute a yaml string from the tuple constructed by yaml_parse. Dictionaries that were split as some values were not elementary remain split.

Parameters:

Name Type Description Default
split_parse tuple[Mapping[str, MetadataValue], list[object]] | None

a tuple with 'part' und 'whole'

required

Returns:

Type Description
str

a string representing a yaml object.

Source code in lmm/markdown/parse_yaml.py
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
def serialize_yaml_parse(
    split_parse: (
        tuple[Mapping[str, MetadataValue], list[object]] | None
    ),
) -> str:
    """
    Reconstitute a yaml string from the tuple
    constructed by yaml_parse. Dictionaries that were split
    as some values were not elementary remain split.

    Args:
        split_parse: a tuple with 'part' und 'whole'

    Returns:
        a string representing a yaml object.
    """
    yamldata = desplit_yaml_parse(split_parse)
    return dump_yaml(yamldata)

split_yaml_parse(yamldata, mapped_keys=None)

Constrain output of parsed yaml objects to a tuple that represents a conformant ParsedYaml type, and the original object

Parameters:

Name Type Description Default
yamldata object | None

the output of yaml.safe_load()

required
mapped_keys Mapping[str, str] | None

a dict-type to replace keys in the parsed yaml object

None

Returns:

Type Description
ParsedYaml

a tuple. In the first member of the tuple a conformant

ParsedYaml

dictionary with strings as keys and values of conformant

ParsedYaml

types. The second member of the tuple is a list of yaml

ParsedYaml

data that could not be parsed.

raises error if the yaml object contains data,

such as listerals, that are not dictionaries.

Source code in lmm/markdown/parse_yaml.py
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
def split_yaml_parse(
    yamldata: object | None,
    mapped_keys: Mapping[str, str] | None = None,
) -> ParsedYaml:
    """
    Constrain output of parsed yaml objects to a tuple that
    represents a conformant ParsedYaml type, and the original
    object

    Args:
        yamldata: the output of yaml.safe_load()
        mapped_keys: a dict-type to replace keys in the parsed
            yaml object

    Returns:
        a tuple. In the first member of the tuple a conformant
        dictionary with strings as keys and values of conformant
        types. The second member of the tuple is a list of yaml
        data that could not be parsed.

    Behaviour: raises error if the yaml object contains data,
        such as listerals, that are not dictionaries.
    """

    part: MetadataDict = {}
    whole: list[object] = []
    match yamldata:
        case None | [] | [None]:
            pass
        case list() as value if value == [{}]:
            pass
        case list() as value if value == [[]]:
            pass
        case list() if is_metadata_dict(yamldata[0]):
            # set reference to chosen element of the list
            part = yamldata[0]
            if len(yamldata) > 1:
                whole = yamldata[1:]
        case list() if _is_string_dict(yamldata[0]):
            # heterogeneous dict in first position
            part, buff = _split_metadata_dict(yamldata[0])
            whole = (
                (buff + yamldata[1:]) if len(yamldata) > 1 else buff
            )
        case list():
            # invalid dictionary in first element or list of non-dict
            whole = yamldata
        case dict() if is_metadata_dict(yamldata):
            # we keep whole to empty, as there is no list
            part = yamldata
        case dict() if _is_string_dict(yamldata):
            # heterogeneous dict
            part, whole = _split_metadata_dict(yamldata)
        case dict():
            # invalid dict, keep empty dictionary in part
            whole = [yamldata]
        case _ as lit if _is_metadata_type(lit):
            # someone is specifying data as a literal
            raise ValueError(
                "Data in markdown header must follow a property.\n"
                + "Specify the data like this:\n"
                + f"property_name: {lit}"
            )
        case _ as prim if _is_primitive_type(prim):
            whole = [prim]
        case _:
            # non-dictionary
            raise ValueError(
                "Invalid YAML object type for markdown header (not"
                + " a dict or list)"
            )

    if mapped_keys is not None and bool(part):
        newpart: MetadataDict = {}
        for key in part.keys():
            if key in mapped_keys:
                newpart[mapped_keys[key]] = part[key]
            else:
                newpart[key] = part[key]
        part = newpart

    return part, whole