Zum Hauptinhalt springen

Cloudanbieter wie AWS, Azure und GCP bieten die Möglichkeit, Machine-Learning-Modelle in der Cloud zu trainieren. Anders als beim Training auf dem eigenen Rechner ist in der Cloud der Speicherort des Datensatzes nicht zwingend die Festplatte der ausführenden Maschine (VM). Mögliche Speicherorte für die Datensätze sind stattdessen sogenannte Blob bzw. Cloud Storages (Data Lakes) sowie Datenbanken oder Data Warehouses. In diesem Artikel wollen wir unterschiedliche Konzepte betrachten, wie Daten mit bestmöglicher Performance eingelesen werden können.

Sämtliche Ideen werden mit Python-Beispielcode illustriert. Die gezeigten Beispiele beziehen sich auf Google Cloud Storage, lassen sich aber grundsätzlich auf viele der genannten Datenquellen anwenden.

Fall 1: Eine Datei als Datenquelle

Dieser Fall ist am ehesten mit lokalem Training vergleichbar: Eine große Datei in der Cloud, aus der der Trainingsdatensatz eingelesen werden soll. Das kommt oft bei tabellarischen Daten vor, bei denen der gesamte Datensatz in Speicherformaten wie CSV, Parquet etc. zur Verfügung steht.

Im Code unten ist eine Möglichkeit dargestellt, wie Daten flexibel in beliebigen Dateiformaten aus dem Cloud Storage eingelesen werden können. Die BlobHandler-Klasse regelt den Lesevorgang aus der Cloud, eine StreamConverter-Klasse bestimmt, wie das gegebene Dateiformat interpretiert und umgewandelt werden soll. (Hier: Verstehe das Dateiformat als CSV und konvertiere es in einen Pandas DataFrame.)

import io
from abc import ABC, abstractmethod
from typing import Union, Iterable
from google.cloud.storage import Client, Blob


class StreamConverter(ABC):
    @abstractmethod
    def from_stream(self, stream: io.BytesIO) -> Iterable:
        ...


class BlobHandler:
    def __init__(self, bucket: str, io_handler: StreamConverter):
        self.client = Client()
        self.bucket = self.client.get_bucket(bucket)
        self.io_handler = io_handler

    def read_from_blob(self, blob_or_uri: Union[str, Blob]) -> Iterable:
        with io.BytesIO() as stream:
            self.client.download_blob_to_file(blob_or_uri, stream)
            stream.seek(0)
            out = self.io_handler.from_stream(stream)
        return out
import pandas as pd
    
class CSV2PandasConverter(StreamConverter):
    def from_stream(self, stream: io.BytesIO) -> pd.DataFrame:
        return pd.read_csv(stream)

 

Fall 2: Einlesen aus mehreren Dateien – paralleles Einlesen

Oft gibt es den Anwendungsfall, dass nicht eine, sondern eine große Anzahl an Dateien als Datenquelle dient. So ist das zum Beispiel oft bei unstrukturierten Daten wie Bildern, Audiodateien oder Textnachrichten der Fall.

Einige Cloud-Quellen haben gegenüber der lokalen Festplatte den Vorteil, dass nahezu beliebig viele Dateien parallel eingelesen werden können. Um das auszunutzen, ist die Verwendung von asynchronen Ansätzen besonders gut geeignet – statt eine Datei nach der nächsten einzulesen, wird eine definierte Anzahl an Dateien parallel eingelesen. Die gegebene Bandbreite wird dadurch optimal genutzt und die Lese-/Schreibgeschwindigkeit kann deutlich gesteigert werden.

Hier ein Beispiel, wie der bereits bekannte Code mit Multithreading erweitert werden kann:

from typing import Optional, List
import concurrent.futures
    
def read_parallel(self, pth: Optional[str] = None, max_parallel_downloads: int = 50) -> List[Iterable]:
    blobs = self.bucket.list_blobs(prefix=pth)

    with concurrent.futures.ThreadPoolExecutor(max_parallel_downloads) as executor:
        futures = []
        for blob in blobs:
            futures.append(executor.submit(self.read_from_blob, blob))

    chuncks = [future.result() for future in concurrent.futures.as_completed(futures)]
    return chuncks

BlobHandler.read_parallel = read_parallel

 

Fall 3: Arbeiten mit großen Datenmengen

Lesen der Daten in Chunks

Große Datenmengen im Big-Data-Bereich übersteigen schnell die Größe des vorhandenen Arbeitsspeichers, wodurch es unmöglich wird, den gesamten Datensatz auf einmal (auf einer Maschine) zu laden. Das führt zu folgenden Einschränkungen:

  1. Die Wahl der Algorithmen wird beschränkt: Viele Modelltypen benötigen den gesamten Datensatz auf einmal, damit sie trainiert werden können. Ist der Datensatz zu groß, ist man auf Modelle angewiesen, die mit Batch- oder Online-Learning trainiert werden können. So wird pro Iteration immer nur ein Teil bzw. nur eine einzelne Stichprobe des Datensatzes benötigt.
  2. Das Dateiformat muss dafür ausgelegt sein, portioniert eingelesen zu werden. Voraussetzung hierfür ist:
  • Die Datei muss in einem zeilenbasierten Speicherformat vorliegen,
  • oder der Datensatz muss bereits in mehreren portionierten Dateien gespeichert sein (viele kleine Dateien).

Grundsätzlich gibt es zwei Methoden, tabellarische Daten zu speichern – spaltenweise (‚columnar‘ – z. B. Parquet) und zeilenweise (z. B. CSV, Avro). Wenn eine oder mehrere Stichproben/Zeilen aus einer Datei gelesen werden sollen, ohne die Datei komplett geladen zu haben, ist das nur mit zeilenbasierten Formaten möglich.

Pythons Iteratoren sind gut dafür geeignet, einen großen Datensatz in Batches portioniert einzulesen. Das nächste Beispiel zeigt, wie der vorhandene Code erweitert werden kann, um eine Datei im Avro-Format einzulesen, sodass sich jeweils nur ein Batch mit definierter Größe im Arbeitsspeicher befindet.

from contextlib import contextmanager
from typing import Iterator
import fastavro

@contextmanager
def stream_from_blob(self, blob_or_uri: Union[str, Blob]) -> Iterator[Iterable]:
    with io.BytesIO() as stream:
        self.client.download_blob_to_file(blob_or_uri, stream)
        stream.seek(0)
        yield self.io_handler.from_stream(stream)
        
BlobHandler.stream_from_blob = stream_from_blob


class Avro2PandasChunckGen(StreamConverter):
    chuncksize: int

    def from_stream(self, stream: io.BytesIO) -> Iterator[pd.DataFrame]:
        record_stream = fastavro.reader(stream)
        while True:
            records = []
            for i, record in zip(range(self.chuncksize), record_stream):
                records.append(record)
            if len(records) == 0:
                break
            yield pd.DataFrame.from_records(records)

 

Einlesen während des Trainings

Ein grundsätzliches Problem mit großen Datenmengen ist der zeitintensive Lesevorgang. Bei den bisher gezeigten Ansätzen wird davon ausgegangen, dass zuerst der Datensatz (bzw. ein Batch des Datensatzes) eingelesen und im Anschluss das Modell trainiert wird. Ist jedoch das Modell im Stande, mit Batches trainiert zu werden, ist das verschwendete Zeit.

Das Einlesen von Daten ist kaum rechenintensiv und hauptsächlich von der gegebenen Bandbreite abhängig. Daher bietet es sich an, die nächsten Datenzeilen zu laden, während das Modell – auf Basis schon geladener Daten – trainiert wird.

Im Folgenden ist die grundsätzliche Idee skizziert: Ein Producer Thread liest die Daten zeilenweise ein, während ein Consumer die bereits gelesenen Daten zur Verfügung stellt.

from queue import Queue
from threading import Thread

class PrefetchAvro2PandasChunckGen(StreamConverter):
    stack: Queue = Queue()

    def _produce(self, stream):
        for record in fastavro.reader(stream):
            self.stack.put(record)
        self.stack.put(None)

    def _consume(self):
        while record := self.stack.get() is not None:
            yield record

    def from_stream(self, stream: io.BytesIO) -> Iterator:

        producer_thread = Thread(target=lambda: self._produce(stream))
        producer_thread.start()

        for record in self._consume():
            yield record

        producer_thread.join()

 

Fall 4: Umsetzung mit TensorFlow

Die bisherigen Abschnitte haben gezeigt, dass das effiziente Einlesen von Machine-Learning-Daten ein komplexes Unterfangen ist – für optimale Performance sind mehrere Threads notwendig. Außerdem ist darauf zu achten, nicht ‚out of memory‘ zu laufen. Dieses Problems ist sich TensorFlow bewusst. Neben der Implementation von Deep-Learning-Architekturen sind mittlerweile auch IO und Preprocessing im Fokus des Open-Source-Projektes.

TensorFlow bietet vorgefertigte Schnittstellen für diverse Cloud-Quellen wie Blob Storage, BigQuery und eine Vielzahl anderer Quellen.

import tensorflow as tf

COLUMN_NAMES = [str(i) for i in range(14)]
FIELD_DEFAULTS = [[float(i)] for i in range(14)]

def csv_to_record(line: str):
    fields = tf.io.decode_csv(line, FIELD_DEFAULTS)
    records = dict(zip(COLUMN_NAMES, fields))
    return records

def create_dataset(list_of_files: List[str]) -> tf.data.Dataset:
    dataset = tf.data.TextLineDataset(list_of_files) \
        .prefetch(buffer_size=tf.data.AUTOTUNE) \
        .batch(batch_size=1000) \
        .map(csv_to_record, num_parallel_calls=tf.data.AUTOTUNE)
    return dataset

 

Der dargestellte Code zeigt, wie mehrere CSV-Files parallel eingelesen und in Chunks prozessiert werden. Bisher vorgestellte Ansätze können auf diese Weise mit nur wenigen Zeilen Code umgesetzt werden.
Die Flexibilität des tf.Datasets und die Exportmöglichkeit als Numpy-Iterator machen es vielseitig einsetzbar und auch für Nicht-TensorFlow-Modelle interessant. Als Bonus on top bietet TensorFlow einen eigenen Profiler, der dabei hilft, Performance Bottlenecks in der Pipeline zu identifizieren.

 

 

 

Probiere es doch am besten selbst mal aus. Idealerweise startest du mit einem der TensorFlow Tutorials.

Starte hier!

 

 

 

Dein Ansprechpartner
Laurenz Reitsam
Consultant
Laurenz ist Data Scientist der sich neben Machine Learning und Datenanalysen auch für DevOps und Infrastruktur begeistert. Er ist davon überzeugt, dass ein Modell nur dann ein gutes Modell sein kann, wenn es seinen Weg in die Produktion schafft.
#Pythonist #GCP #DataScience