Przejdź do treści

Jak poskromiłam Elasticsearch i pomogłam bibliotekarzom? (część II)


Poprzedni:

W poprzednim artykule opisałam, jak stworzyłam własny pseudojęzyk umożliwiający pisanie skomplikowanych zapytań, przeszukujących rekordy bibliograficzne zapisane w formacie MARC 21. Wspominałam, że jako bazę danych wykorzystaliśmy Elasticsearch, oferujący kosmiczne możliwości. Tak więc, zgodnie z obietnicą, czas odsłonić kurtynę i zaprezentować naszą gwiazdę w pełnym świetle reflektorów.

Jeśli jeszcze nie przeczytaliście poprzedniego wpisu, zachęcam serdecznie. Inaczej ciężko Wam będzie połapać się w dalszej części tej historii...

Za kulisami biblioteki

Jak już wcześniej wspominałam, rekordy bibliograficzne, na których pracujemy są zapisane w formacie MARC 21, a wszystkie aktualne dane można pobrać ze strony: data.bn.org.pl. Udostępnione są tu zarówno konkretne typy materiałów, takie jak: książki, e-booki czy czasopisma, jak również zrzut całej bazy danych – plik bibs-all.marc, który (na dzień 08.08.2022) zajmuje 11.2 GB!

Już samo pobranie tego pliku zajmuje trochę czasu, ale nie musimy się tym zbytnio przejmować, gdyż w naszej aplikacji wszystko dzieje się automatycznie i oczywiście asynchronicznie – co 24h pobierany jest nowy plik, a wszystkie dane odpowiednio aktualizowane (ale o tym za chwilę).

Po pobraniu pliku następuje jego dynamiczne parsowanie i tutaj z wielką pomocą przychodzi biblioteka pymarc (sugeruję przyznanie twórcy Pokojowej Nagrody Nobla, gdyż dzięki niemu nie ucierpiał żaden człowiek, zwierzę ani – co najważniejsze – laptop – if you know what i mean 😉 ).

Biblioteka pymarc udostępnia klasę MARCReader, umożliwiającą odczyt rekordówch z pliku .marc. Przykładowy, odkodowany przez nią rekord wygląda tak:

=LDR  01585naa a2200457 i 4500
=001  b0000004768946
=005  20200506044404.6
=008  100513s2010\\\\pl\\\\\\\\\\\|000\0\pol\\
=009  991032413619705066
=015  \\$aPrasa 2010/08/26
=035  \\$a991032413619705066
=035  \\$a(PL-WaBN)b47689468-48omnis_nlop
=035  \\$ab47689468
=035  \\$a(OCoLC)998619526
=035  \\$a(PL)b0000004768946
=035  \\$a(EXLNZ-48OMNIS_NETWORK)9910333249005606
=040  \\$aWA M$cWA M
=100  1\$aMachcewicz, Paweł$d(1966- )
=245  10$aProf. Machcewicz: Muzeum II wojny pokaże nawet rok 1989 /$cPaweł Machcewicz ; rozm. przepr. Anna Cieślak.
=260  \\$c26 VIII 2010.
=336  \\$aTekst$btxt$2rdacontent
=337  \\$aBez urządzenia pośredniczącego$bn$2rdamedia
=338  \\$aWolumin$bnc$2rdacarrier
=380  \\$aArtykuły
=610  27$aMuzeum II Wojny Światowej (Gdańsk)$xprojekty$y21 w.$2DBN
=650  \7$aII wojna światowa (1939-1945)$2DBN
=651  \7$aGdańsk (woj. pomorskie)$xmuzealnictwo$y21 w.$2DBN
=700  1\$aCieślak, Anna
=773  0\$iW:$tPolska (Metropolia Warszawska, wyd. zasadnicze).$x1898-3081.$g2010, nr 183, s. 10-11$w(PL)b0000002035638
=902  \\$a180212$c100513$dm$eb$f-$g0
=909  \\$ab0000047689468
=915  \\$aPrasa 2010/08/26
=916  \\$aMachcewicz, Paweł   (1966- )
=918  1\$aCieślak, Anna
=951  \7$aGdańsk (woj. pomorskie)$xmuzealnictwo$y21 w.$2DBN
=980  \\$aArtykuły
=996  \\$a.b47689468
=996  \\$a.b47689468$9local
=998  \\$ao
=999  \\$aDBN01$ckor
=999  \\$aWA M$bES

Piękne, prawda?

Jeśli pamiętacie, jak wyglądał oryginalny rekord takiego pliku (mała podpowiedź tu), zapewne dostrzegacie już jaką magię 🪄 wykonała ta biblioteka.

Pojedynczy “dokument”

Iterując po wszystkich rekordach, na podstawie każdego z nich tworzymy tzw. “dokument” , czyli obiekt w postaci słownika {klucz : wartość}, który zostanie zapisany w Elasticsearchu. Kluczami są oczywiście etykiety pól, a wartościami – przypisane im dane.

Jak wygląda cała operacja? Zajrzyjmy w główną funkcję:

class MARCRecordsParser: 
    ...
    def create_es_doc_from_marc_record(self, record: Record):
        doc = {}
        dict_record_fields = record.as_dict()['fields']

        for field in dict_record_fields:
            (tag, values), = field.items()
            doc.update({tag: MARCRecordsParser.get_values_by_field(record, tag)})  # all the main fields

            if type(values) == dict:  # subfields
                subfields_list = values['subfields']
                subfields_dict = self._prepare_subfields_dict(subfields_list)
                es_subfields_dict = {
                    f'{tag}_{subfield_tag}': subfield_value for subfield_tag, subfield_value in subfields_dict.items()
                }
                doc = self.update_dict(doc, es_subfields_dict)

        extra_fields = self.prepare_extra_fields(record)
        doc.update(extra_fields)

        return doc

    @staticmethod
    def get_values_by_field(pymarc_rcd: Record, field: str) -> [str]:
        return [v.value() for v in pymarc_rcd.get_fields(field)]

    def _prepare_subfields_dict(self, subfields_list: List[dict]):
        result = {}
        for subfield in subfields_list:
            (subfield_tag, subfield_value), = subfield.items()
            if subfield_tag in result:
                result[subfield_tag].append(subfield_value)
            else:
                result[subfield_tag] = [subfield_value]
        return result

    def prepare_extra_fields(self, record: Record) -> dict:
        return {
            'mms_id': self.get_mms_id(record),
            'publication_date': self.get_publication_date(record),
            'publication_country': self.get_publication_country(record),
            'isbn': self.get_isbn(record),
            'language_of_original': self.get_language_of_original(record),
            'language_of_intermediate_translation': self.get_language_of_intermediate_translation(record),
            'udc': self.get_udc(record),
            'other_classification_number': self.get_other_classification_number(record),
            'creator': self.get_creator(record),
            'title': self.get_title(record),
            'title_of_original': self.get_title_of_original(record),
            'edition': self.get_edition(record),
            'publication_place': self.get_publication_place(record),
            'extent': self.get_extent(record),
            'form_of_work': self.get_form_of_work(record),
            'audience_characteristics': self.get_audience_characteristics(record),
            'contributor_characteristics': self.get_contributor_characteristics(record),
            'genre': self.get_genre(record),
            'cocreator': self.get_cocreator(record),
            'cocreator_only_translator': self.get_cocreator_only_translator(record),
            'cocreator_without_translator': self.get_cocreator_without_translator(record),
            'publisher_uniform_name': self.get_publisher_uniform_name(record),
            'series_personal': self.get_series_personal(record),
            'series_title': self.get_series_title(record),
            'modification_date': self.get_modification_date(record)

        }

Dzięki użyciu metody as_dict() zamieniamy strukturę rekordu MARC do standardowego słownika, gdzie pod kluczem fields zapisane są wszystkie pola. Przechodząc po liście pól, dodajemy je (wraz z wartościami) do zmiennej doc. Metoda get_values_by_field() pozwala wygodnie wyciągnąć wszystkie wartości przypisane do danego pola.

Następnie musimy pobrać wszystkie podpola głównych pól, uwzględniając fakt, że mogą się one powtarzać. Stąd też powstała dodatkowa metoda prepare_subfields_dict(), która weryfikuje, czy dane podpole pojawiło się już na liście i w zależności od sytuacji – dodaje nową wartość lub tworzy nową parę {klucz : wartość} z danym podpolem.

Na koniec dodajemy jeszcze dodatkowe pola, które mają ułatwić pracę bibliotekarzom, a są to m.in.: tytuł, autor, współautor, język, data i miejsce publikacji, data modyfikacji i parę innych. Część tych danych znajduje się w konkretnym polu, inne zaś są zbiorem wartości z kilku różnych pól. Pozwolę sobie jednak pominąć metody pobierające te pola, gdyż nie mają one większego znaczenia w kontekście tego artykułu.

Indeksacja Elasticsearcha

Przedstawiona wyżej funkcja create_es_doc_from_marc_record tworzy pojedynczy “dokument”, który zostanie przesłany do Elasticsearcha. Spójrzmy więc, w jaki sposób jest ona wywoływana dla całego zbioru rekordów.

@celery.task(name='reindex_marc_records')
def reindex_marc_records(marc_file: str, download_file: bool = True):
    if download_file:
        filename = download_marc_file(marc_file, MARC_RECORDS_URL)

    try:
        ElasticSearchConnector.create_index()
        hash_manager = HashManager(logger=logger)
        hash_manager.clear_hashes_file()
        bulk_data = []

        marc_records_parser = MARCRecordsParser(logger=logger)
        counter = 0
        file_url = os.path.join(os.sep, STATIC_FILES_EXPORT_PATH, marc_file)

        with open(file_url, 'rb') as fh:
            reader = MARCReader(fh)
            for idx, record in enumerate(reader, 1):
                if not record:
                    continue
                control_number = marc_records_parser.get_control_number(record)
                data_dict = marc_records_parser.create_es_doc_from_marc_record(record)
                op_dict = {
                    "index": {
                        "_index": ES_INDEX,
                        "_id": control_number,
                    }
                }
                bulk_data.append(op_dict)
                bulk_data.append(data_dict)
                hash_manager.add_hash(record)

                if len(bulk_data) and len(bulk_data) % BATCH_SIZE == 0:
                    ElasticSearchConnector.bulk_records(body=bulk_data)
                    time.sleep(SLEEP_TIME_BETWEEN_RECORDS_BULK)
                    bulk_data.clear()
                    logger.info(f'---> Indexing next batch of records (from {idx=})')
                    counter += len(bulk_data)

            if len(bulk_data) > 0:  # adding last batch of data
                ElasticSearchConnector.bulk_records(body=bulk_data)
                logger.info(f'---> Indexing next {len(bulk_data)} records (from {idx=})')
                counter += len(bulk_data)

            logger.info(f'---> Indexed records number: {counter})')

            hash_manager.flush_hashes_buffer()

        return {'indexed': counter}

    except elasticsearch.exceptions.ConnectionError as e:
        logger.info(f'---> Could not establish connection with Elasticsearch ({str(e)}')

        return {'message': 'Could not establish connection with Elasticsearch.'}

Metoda reindex_marc_records() wywołuje aktualizację rekordów. Jak widać w pierwszej linii kodu, jest to też task Celery, który uruchamia się asynchronicznie w tle.

Na początku pobierany jest nowy plik z danymi (lub nie, jeśli w parametrze download_file przekazemy False – wtedy analizowany jest ostatnio pobrany plik). Później tworzony jest nowy indeks ES (metoda ElasticSearchConnector.create_index() usuwa istniejący indeks i tworzy nowy o takiej samej nazwie). Linie wykorzystujące klasę HashManager omówię za chwilę.

W pętli iterującej po pliku pobierane są kolejne rekordy MARC, z których tworzone są opisane wcześniej “dokumenty” i dodawane do listy bulk_data. Zgodnie z konwencją ES-a, aby poprawnie zaindeksować wiele rekordów naraz, należy oddzielić je słownikami zawierającymi dwa klucze: _index – nazwa indeksu, do którego chcemy zapisać rekord oraz _id – unikalny identyfikator rekordu. W tym słowniku można dodać też inne metadane, ale nie są one konieczne.

W ten sposób zapełniana jest lista bulk_data. Aby zapewnić optymalne wykorzystanie zasobów nie wysyłamy ogromnej listy w jednym zapytaniu, ale zbieramy je “batchami”, czyli grupami o rozmiarze przypisanym do zmiennej BATCH_SIZE (który eksperymentalnie ustawiliśmy na 500). Pomiędzy kolejnymi requestami dajemy też 2 sekundy odstępu (zdefiniowane w zmiennej SLEEP_TIME_BETWEEN_RECORDS_BULK), aby ES mógł odsapnąć.

Dodatkowo zapisujemy w logach informacje o ilości zaindeksowanych rekordów.

A czym jest tajemniczy HashManager, który celowo pominęłam wcześniej?

Aktualizacja rekordów

Pełna reindeksacja danych wykonywana jest tylko przy pierwszym uruchomieniu aplikacji oraz w uzasadnionych przypadkach.

Z kolei co 24h wykonywana jest aktualizacja rekordów.

Czym różni się ona od pełnej reindeksacji?

Nie usuwamy całego indeksu i nie tworzymy na nowo wszystkich obiektów, ale porównujemy rekordy pobrane z nowego pliku z już istniejącymi w bazie i 1) aktualizujemy tylko te, które zostały zmodyfikowane, 2) dodajemy nowe oraz 3) usuwamy takie, które nie pojawiły się w nowym pliku.

Do weryfikacji tego, czy dany rekord został zmodyfikowany służy klasa HashManager, z której przedstawię tylko te najważniejsze metody.

class HashManager:

   ...
    def find_deleted_records(self):
        old_data_keys = self.old_data.keys()
        new_data_keys = self.new_data.keys()
        deleted_keys = list(set(old_data_keys) - set(new_data_keys))
        return deleted_keys

    def generate_hash(self, data: dict):
        dhash = hashlib.md5()
        encoded = json.dumps(data, sort_keys=True).encode()
        dhash.update(encoded)
        return dhash.hexdigest()

    def has_record_different_hash(self, record: Record, control_number: str):
        dict_record = record.as_dict()
        dict_record_hash = self.generate_hash(dict_record)
        old_record_hash = self.old_data.get(control_number)
        # adding record to new_data
        self.new_data[control_number] = dict_record_hash

        if not old_record_hash or dict_record_hash != old_record_hash:
            # the object does not exists or has changed
            return True

        return False

Podczas aktualizacji dla każdego rekordu generowany jest nowy hash – generate_hash(), a jego wartość porównywana jest z zapisanym w pliku hashem przypisanym do tego samego identyfikatora. Jeśli wartości są sobie równe – oznacza to, że rekord się nie zmienił i pętla przechodzi do kolejnej iteracji.

Jesli wartości są różne, tworzony jest nowy “dokument” , zapisywany w ES pod tym samym identyfikatorem (czyli następuje aktualizacja rekordu), a w pliku zapisywany jest nowy hash przypisany do tego identyfikatora.

Po zakończeniu pętli iterującej rekordy MARC wywoływana jest jeszcze metoda find_deleted_records(). Porównuje ona starą i nową listę identyfikatorów i znajduje te, które zniknęły – jeśli ktoś usunął je z bazy BN – oznacza to, że musimy usunąć je także z bazy Elasticsearcha.

Podsumowanie

Tak prezentuje się budowa bazy Elasticsearcha na podstawie rekordów bibliograficznych MARC. Cały proces dla bazy danych zawierającej ponad 5mln rekordów zajmuje ok. 1.5h. Podczas aktualizacji reindeksowane są jedynie wybrane “dokumenty”, co skraca ten czas do kilkunastu minut.

W połączeniu z wyszukiwarką opisaną w części 1 (oraz oczywiście warstwą frontendową) udało nam się stworzyć naprawdę ciekawą aplikację, dzięki której bibliotekarze mogą tworzyć i zapisywać własne zapytania, otrzymywać wyniki w postaci czytelnej i przejrzystej tabeli oraz generować pliki .csv zawierające zarówno wszystkie, jak też wybrane przez nich pola wyszukanych rekordów.

Co sądzicie o takiej aplikacji? Może macie pomysł, gdzie jeszcze podobne rozwiązanie ułatwiłoby czyjąś pracę? Czekam na komentarze!

3.7 3 votes
Article Rating
guest
0 komentarzy
Inline Feedbacks
View all comments