Skip to content

Documentation for ToMeDa

tomeda.baseclass

logger module-attribute

logger: TraceLogger = getLogger(__name__)

TomedaBaseClass

TomedaBaseClass(
    dataset_root: type[pydantic.BaseModel],
    param: TomedaParameter,
)

This class is the base class for all tomeda classes. It provides the functionality to parse the gatherer file, evaluate the fields and write the json file.

The class is based on the pydantic BaseModel class and uses the pydantic ModelMetaclass to create the class dynamically. The class is created from the gatherer file and the fields are evaluated. The class is then used to write the json file.

Source code in tomeda/baseclass.py
35
36
37
38
39
40
41
42
def __init__(
    self,
    dataset_root: type[pydantic.BaseModel],
    param: TomedaParameter,
) -> None:
    self.root = dataset_root
    self.root_validated = None
    self.param = param

param instance-attribute

param = param

root instance-attribute

root = dataset_root

root_validated instance-attribute

root_validated = None

check_if_recommendations_are_met

check_if_recommendations_are_met()
Source code in tomeda/baseclass.py
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
def check_if_recommendations_are_met(self):
    def check_for_recommendations_recursely(
        current_class: ModelMetaclass,
    ) -> None:
        # get (type_class , attribute_name)
        nonlocal stack, recommender_stack
        types_and_names: list[tuple[ModelMetaclass, str]] = []
        for field in current_class.__fields__:
            model_field = current_class.__fields__[field]
            name = model_field.name
            type_ = self.get_type_of(model_field)
            types_and_names.append((type_, name))

        for cls_type, cls_name in types_and_names:
            if cls_name in stack:
                continue  # recursion blocker

            stack.append(cls_name)
            origin_path = inspect.getfile(self.root)
            origin_file_name = Path(origin_path).stem

            if (
                hasattr(current_class, "_recommended")
                and cls_name in current_class._recommended
            ):
                # class_name = str(cls).split(".")[-1][:-2]
                class_name = (
                    str(current_class.__class__).split(".")[-1].rstrip("'>")
                )
                msg_ = (
                    f" '{cls_name}' found in "
                    f"{str(self.root_validated.__class__.__name__)}."
                    f"{'.'.join(stack)}  "
                )
                recommender_stack.append(msg_)

            if origin_file_name in str(cls_type):
                check_for_recommendations_recursely(cls_type)
            stack.pop(-1)

    stack: list[str] = []
    recommender_stack: list[str] = []

    if self.root_validated is None:
        raise ValueError("No validated root available - validate first")

    logger.info(f"Checking for recommendations")

    check_for_recommendations_recursely(self.root_validated)

    if len(set(recommender_stack)) > 0:
        logger.warning(
            f"{len(set(recommender_stack))} Recommendations not met!"
        )
        for msg in set(recommender_stack):
            logger.warning(f" * {msg}")
    else:
        logger.info("All recommended fields are set")

create_flat_structure

create_flat_structure(output_file: Path)
Source code in tomeda/baseclass.py
565
566
567
568
569
570
571
572
573
574
def create_flat_structure(self, output_file: Path):
    if isinstance(output_file, list):
        output_file = output_file[0]
    output_file = output_file.with_suffix(".txt")
    class_structure, _ = self.get_class_structure()
    flattened_keys = self._flatten_keys(class_structure)
    file_handler = TomedaFileHandler(
        output_file, overwrite=self.param.force_overwrite
    )
    file_handler.write(flattened_keys)

create_gatherer_file_nested_text

create_gatherer_file_nested_text(output_file: Path)
Source code in tomeda/baseclass.py
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
def create_gatherer_file_nested_text(self, output_file: Path):
    if isinstance(output_file, list):
        output_file = output_file[0]
    output_file = output_file.with_suffix(".nt.template")
    file_handler = TomedaFileHandler(
        output_file, overwrite=self.param.force_overwrite
    )

    if file_handler.is_existing() and not file_handler.overwrite:
        logger.warning(
            "File %s already exists. Will not overwrite. "
            "Specify '--overwrite' to overwrite the file.",
            output_file,
        )
        return

    schema_file = self.param.schema_module
    schema_root_class = self.param.schema_class

    content = [
        "# This is a 'ToMeDa Collector File' for metadata collection written in NestedText language",
        "# https://nestedtext.org/",
        "#",
        "# This very file is derived from",
        f"# {schema_file}:{schema_root_class}",
        "#",
        "# The Collector file can be specified in different ways:",
        "#    @file | @bash | @literal",
        "#",
        "# 1. File Extraction: Specify values by:",
        "#      a file to read from and its location within the file",
        "#      The list-index is 1 indexed (starts at 1), to specify a comma use ',' or \",\".",
        "#",
        "#      metadataKey: @file <file>,<keyword>,<delimiter>[,<list-delimiter>,<list-index>]",
        "#      e.g. metadataKey: @file /path/to/file,keyword,delimiter",
        "#",
        "# 2. Bash Execution: Specify values by bash commands:",
        "#      Commands after '@bash' are bash commands and will be executed as such",
        "#      The return value of the command will be written to the gatherer file",
        "#      e.g. metadataKey: @bash $USER",
        "#      e.g. metadataKey: @bash cat myfile | grep keyword",
        "#",
        "# 3. Literal Definition: Specify values by literal definition:",
        "#      metadataKey: @literal value",
        "#",
        "",
    ]  # pylint: disable=line-too-long

    class_structure, _ = self.get_class_structure()
    nt_s: list[str] = nt.dumps(class_structure).split("\n")
    content += [line for line in nt_s if "{}" not in line]
    file_handler.write(content)

    logger.info(f"Created empty gatherer file: {output_file}")

create_info_table

create_info_table(output_file: Path)
Source code in tomeda/baseclass.py
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
def create_info_table(self, output_file: Path):
    if isinstance(output_file, list):
        output_file = output_file[0]
    structure, info = self.get_class_structure()

    if not isinstance(output_file, Path):
        raise TomedaNotAPathError(
            output_file,
            f"Expected a Path object, but" f" got {type(output_file)}",
        )

    file_handler_info = TomedaFileHandler(
        output_file.with_suffix(".nt"), overwrite=self.param.force_overwrite
    )
    file_handler_info.write(nt.dumps(info))

    required = self.get_required(structure, info)
    file_handler_required = TomedaFileHandler(
        output_file.with_suffix(".required.nt"),
        overwrite=self.param.force_overwrite,
    )
    file_handler_required.write(required)

get_class_structure

get_class_structure() -> tuple[dict, dict]

Retrieve the structure of the class.

This method recursively traverses the class structure to extract the class attributes, types, and other related information. The returned structure includes whether the attribute allows multiple values, is required or not, and other extra information.

Returns: tuple[dict, dict] : A tuple containing two dictionaries (Structure / Info).

The first dictionary represents the class structure with each attribute as a key, and its value is either a nested dictionary (for complex types) or an empty dictionary (for basic types). The second dictionary contains additional information about each attribute, including its name, title, description, type, and whether it allows multiple values and is required.

Notes

This method makes use of several nested functions: * loop_through_classes_recursively : Traverses the class attributes and constructs the class structure. * is_basic_type : Checks if a given type is basic (i.e., not defined in the current file). * assemble_line : Assembles a string representation of the current attribute path. * gather_info : Gathers and stores additional information about an attribute.

These helper functions are used to organize the logic of the get_class_structure method and make it easier to understand.

Source code in tomeda/baseclass.py
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
def get_class_structure(self) -> tuple[dict, dict]:
    """
    Retrieve the structure of the class.

    This method recursively traverses the class structure to extract the
    class attributes, types, and other related information. The returned
    structure includes whether the attribute allows multiple values,
    is required or not, and other extra information.

    Returns: tuple[dict, dict] : A tuple containing two dictionaries
    (Structure / Info).

    The first dictionary represents the class structure with each
    attribute as a key, and its value is either a nested dictionary (for
    complex types) or an empty dictionary (for basic types). The second
    dictionary contains additional information about each attribute,
    including its name, title, description, type, and whether it allows
    multiple values and is required.

    Notes
    -----
    This method makes use of several nested functions:
        * `loop_through_classes_recursively` : Traverses the class
            attributes and constructs the class structure.
        * `is_basic_type` : Checks if a given type is basic (i.e., not
            defined in the current file).
        * `assemble_line` : Assembles a string representation of the current
            attribute path.
        * `gather_info` : Gathers and stores additional information about an
            attribute.

    These helper functions are used to organize the logic of the
    `get_class_structure` method and make it easier to understand.
    """

    def loop_through_classes_recursely(
        cls: ModelMetaclass, dict_o: dict
    ) -> None:
        # get (type_class , attribute_name)
        nonlocal stack, prepend_string, stack_required, info

        types_names_multiple: list[
            tuple[ModelMetaclass, str, bool, bool, dict]
        ] = []

        for field in cls.__fields__:
            model_field: ModelField = cls.__fields__[field]
            name: str = model_field.name
            required: bool = model_field.required
            type_: ModelMetaclass = self.get_type_of(model_field)

            multiple = self.get_multiple_of(model_field)
            extra_info = self.get_extra_info(model_field)
            types_names_multiple.append(
                (type_, name, multiple, required, extra_info)
            )

        for (
            cls_type,
            cls_name,
            multiple_allowed,
            required,
            extra_info,
        ) in types_names_multiple:
            if (cls_type, cls_name) in stack:
                continue  # recursion blocker

            stack.append((cls_type, cls_name))
            stack_required.append(required)

            if isinstance(dict_o, list):
                dict_o = dict_o[0]
            if dict_o.get(cls_name, None) is None:
                if multiple_allowed is False:
                    dict_o[cls_name] = dict()
                else:
                    dict_o[cls_name] = [{}]

            def is_basic_type(cls_type_: ModelMetaclass) -> bool:
                # if type class not in this file defined, then is basic type
                origin_path = inspect.getfile(self.root)
                origin_file_name = Path(origin_path).stem

                # basic_types = ['int','float','str','Email']
                # for basic_type in basic_types:
                #     if basic_type in str(cls_type_):
                #         return True
                # return False
                #
                type_ = cls_type_  # for debugging
                val = origin_file_name not in str(cls_type_)
                return val

            def assemble_line() -> str:
                required_string = ""
                prepend_string_str = "".join(prepend_string)
                stack_str = ".".join([val[1] for val in stack])
                return prepend_string_str + required_string + stack_str

            def gather_info(
                cls_type_: ModelMetaclass, extra_info_: dict, info_: dict
            ) -> None:
                type__ = (
                    str(cls_type_)
                    .removeprefix("<class '")
                    .removesuffix("'>")
                )
                if "EmailStr" in type__:
                    type__ = "email"

                controlled_vocabulary = None

                name_ = ".".join([val[1] for val in stack])
                if type__.startswith("<enum"):
                    # Get the first member of the enum
                    first_member = next(iter(cls_type_))
                    # Get the underlying data type of the enum value
                    value_type = type(first_member.value)

                    type__ = (
                        str(value_type)
                        .removeprefix("<class '")
                        .removesuffix("'>")
                    )

                    controlled_vocabulary = [
                        member.value for member in cls_type_
                    ]

                # extract 'read' type from complete qualified name
                # (e.g. 'dataset.metadata.provenance' -> 'provenance')

                type__ = type__.split(".")[-1]

                line_info = {
                    "name": name_,
                    "title": extra_info_.get("title"),
                    "description": extra_info_.get("description"),
                    "allow_multiples": multiple_allowed,
                    "required": required,
                    "type": type__,
                }

                if controlled_vocabulary:
                    line_info[
                        "controlledVocabulary"
                    ] = controlled_vocabulary

                info_[name_] = line_info

            gather_info(cls_type, extra_info, info)

            if is_basic_type(cls_type):
                line_ = assemble_line()
                # print(line_)
                # f.write(line_ + '\n')
                # name, title, description, allowmultiples, required
            else:
                # update forward references to allow "wrong" order of
                # classes in metadate definition file
                # cls_type.update_forward_refs()
                loop_through_classes_recursely(cls_type, dict_o[cls_name])

            stack_required.pop(-1)
            stack.pop(-1)

    info = {}
    stack: list[tuple[ModelMetaclass, str]] = []
    stack_required: list[bool] = []
    prepend_string: list[str] = []
    dict_output: dict = {}

    loop_through_classes_recursely(self.root, dict_output)

    return dict_output, info

get_extra_info staticmethod

get_extra_info(model_field: ModelField) -> dict
Source code in tomeda/baseclass.py
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
@staticmethod
def get_extra_info(model_field: ModelField) -> dict:
    extra_info_keys = ["title", "description"]

    extra_info = model_field.field_info.extra.copy() or {}

    extra_info.update(
        {
            key: getattr(model_field.field_info, key)
            for key in extra_info_keys
            if getattr(model_field.field_info, key)
        }
    )

    return extra_info

get_multiple_of staticmethod

get_multiple_of(model_field: ModelField) -> bool

check if model field is something like a Union and extract "true" data types

Source code in tomeda/baseclass.py
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
@staticmethod
def get_multiple_of(model_field: ModelField) -> bool:
    """
    check if model field is something like a Union and extract "true" data
    types
    """
    if (
        hasattr(model_field, "sub_fields")
        and model_field.sub_fields is not None
    ):
        if not isinstance(model_field.sub_fields, list):
            raise TomedaUnexpectedTypeError(
                model_field.sub_fields,
                "Value 'sub_fields' is not of type 'list', but of type"
                f" {type(model_field.sub_fields)}",
            )

        multiple: bool = True
    else:
        multiple: bool = False

    return multiple

get_required staticmethod

get_required(structure, info)
Source code in tomeda/baseclass.py
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
@staticmethod
def get_required(structure, info):
    def traverse_json(json_data: dict | list, callback_: Callable):
        nonlocal stack
        if isinstance(json_data, dict):
            for key, value in json_data.items():
                stack.append(key)
                continue_ = callback_(key, value, stack)
                if continue_:
                    traverse_json(value, callback_)
                stack.pop(-1)

        elif isinstance(json_data, list):
            for item in json_data:
                traverse_json(item, callback_)

    stack = []
    required = []

    def callback(key, value, stack_):
        nonlocal required, info
        info_key = ".".join(stack_)
        info_ = info[info_key]
        if info_["required"]:
            if not value:
                required.append(info_key)
            return True
        else:
            return False

    traverse_json(structure, callback)
    return required

get_type_of staticmethod

get_type_of(model_field: ModelField) -> ModelMetaclass

check if model field is something like a Union and extract "true" data types

Source code in tomeda/baseclass.py
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
@staticmethod
def get_type_of(model_field: ModelField) -> ModelMetaclass:
    """
    check if model field is something like a Union and extract "true" data
    types
    """
    if (
        hasattr(model_field, "sub_fields")
        and model_field.sub_fields is not None
    ):
        if not isinstance(model_field.sub_fields[0], ModelField):
            raise TomedaUnexpectedTypeError(
                model_field.sub_fields[0],
                "First element of list 'sub_fields' is not of type"
                f" 'ModelField', but of type"
                f"{type(model_field.sub_fields[0])}",
            )

        type_: ModelMetaclass = model_field.sub_fields[0].type_

        # validate that the type of the subfields is the same type as type_
        for sub_field in model_field.sub_fields:
            if not (sub_field.type_ == type_):
                raise TomedaUnexpectedTypeError(
                    sub_field.type_,
                    f"Value {model_field.name} has two different types: "
                    f"{type_} and  {sub_field.type_}: Not Supported",
                )
    else:
        type_: ModelMetaclass = model_field.type_

    return type_

validate_gatherer_file

validate_gatherer_file(
    file: Path, collector_root: Path
) -> None
Source code in tomeda/baseclass.py
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
def validate_gatherer_file(self, file: Path, collector_root: Path) -> None:
    if isinstance(file, list):
        file = file[0]
    if not isinstance(file, Path):
        raise TomedaNotAPathError(
            file, f"Expected a Path object, but" f" got {type(file)}"
        )

    try:
        logger.debug(f"Starting validation for gatherer file: {file}")
        logger.debug(f"Starting parse operation on file: {file}")

        dict_o = self._parse_gatherer_file(file)

        logger.debug(f"Starting filter operation on file: {file}")

        dict_o = self._filter_gatherer_file(dict_o)

        logger.debug(f"Starting evaluation on file: {file}")

        dict_evaluated = self._evaluate_fields(dict_o, collector_root)

        logger.debug(f"Starting PyDantic parsing on file: {file}")

        self.root_validated = self.root.parse_obj(dict_evaluated)

        logger.info(f"Successfully parsed and validated file: {file}")
    except Exception as e:
        logger.exception(
            f"Exception occurred while validating file {file}: {str(e)}"
        )
        raise

write_json

write_json(output_file: Path) -> None
Source code in tomeda/baseclass.py
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
def write_json(self, output_file: Path) -> None:
    if self.root_validated is None:
        raise ValueError("No validated gatherer file available")

    if isinstance(output_file, list):
        output_file = output_file[0]

    if not isinstance(output_file, Path):
        raise TomedaNotAPathError(
            output_file,
            f"Expected a Path object, but" f" got {type(output_file)}",
        )

    output_file = (output_file / "metadata").with_suffix(".json")
    file_handler = TomedaFileHandler(Path(output_file))
    data_: str = self.root_validated.json(indent=4, exclude_unset=True)
    file_handler.write(data_)

    logger.info(f"Wrote json to {output_file}")

get_42

get_42()
Source code in tomeda/baseclass.py
819
820
def get_42():
    return 42