Cloud providers such as AWS, Azure and GCP offer the possibility to train with machine learning models in the cloud. Unlike training on one's own computer, the data set's storage location in the cloud is not necessarily the hard disk of the executing machine (VM). Possible storage locations for data sets are instead blob or cloud storages (data lakes) databases or data warehouses. In this article, we look at different concepts of how data can be read with the best possible performance.
All ideas are illustrated with Python sample code. The shown examples refer to Google Cloud Storage but can fundamentally be applied to many of the mentioned data sources.
Instance 1: File as data source
This case is most comparable to local training: The training data set is read from a large file in the cloud. This often happens with tabular data in whose case the entire data set is available in storage formats such as CSV, Parquet etc.
The code below shows a way to flexibly read data in any file format from cloud storage. The BlobHandler class manages operations for reading from the cloud, while the StreamConverter class determines how the given file format is interpreted and converted (the file format here is CSV and converted into a Pandas DataFrame).
import io
from abc import ABC, abstractmethod
from typing import Union, Iterable
from google.cloud.storage import Client, Blob
class StreamConverter(ABC):
@abstractmethod
def from_stream(self, stream: io.BytesIO) -> Iterable:
...
class BlobHandler:
def __init__(self, bucket: str, io_handler: StreamConverter):
self.client = Client()
self.bucket = self.client.get_bucket(bucket)
self.io_handler = io_handler
def read_from_blob(self, blob_or_uri: Union[str, Blob]) -> Iterable:
with io.BytesIO() as stream:
self.client.download_blob_to_file(blob_or_uri, stream)
stream.seek(0)
out = self.io_handler.from_stream(stream)
return out
import pandas as pd
class CSV2PandasConverter(StreamConverter):
def from_stream(self, stream: io.BytesIO) -> pd.DataFrame:
return pd.read_csv(stream)
Instance 2: Reading from multiple files – parallel reading
In practice, the data source often comprises a large number of files instead of just one. For example, this is often the case with unstructured data such as images, audio files and text messages.
Some cloud sources have an advantage over local hard disks in that almost any required number of files can be read in parallel. Asynchronous techniques are particularly suitable for utilizing this advantage – instead of reading one file after the next, a defined number of files is read simultaneously. The provided bandwidth is thus optimally used, and the read/write speed can be significantly increased.
Here is an example of how our familiar code can be extended with multithreading:
from typing import Optional, List
import concurrent.futures
def read_parallel(self, pth: Optional[str] = None, max_parallel_downloads: int = 50) -> List[Iterable]:
blobs = self.bucket.list_blobs(prefix=pth)
with concurrent.futures.ThreadPoolExecutor(max_parallel_downloads) as executor:
futures = []
for blob in blobs:
futures.append(executor.submit(self.read_from_blob, blob))
chuncks = [future.result() for future in concurrent.futures.as_completed(futures)]
return chuncks
BlobHandler.read_parallel = read_parallel
Instance 3: Working with large amounts of data
Reading data in chunks
Large data volumes in big-data applications quickly exceed available memory capacity, making it impossible to load entire data sets at once (on one machine). This results in the restrictions mentioned next.
- Limited choice of algorithms: For the purpose of training, many model types require the entire data set at once. If the data set is too large, one has to rely on models which can be trained via batch or online learning. Only a part or single sample of the data set is thus required per iteration.
- The file format must be designed to be read in portions. A prerequisite for this:
- The file must have been stored in a row-based format.
- Alternatively, the data set must already be stored in several portioned files (many small files).
Basically, there are two methods of storing tabular data – columnar (e.g. Parquet) and row-by-row (e.g. CSV, Avro). If one or more samples/rows are to be read from a file without it having been fully loaded, this is only possible with row-based formats.
Python iterators are well suited for reading large data sets portioned in batches. The next example shows how our existent code can be extended to read a file in Avro format, so that there is only one batch of a defined size in the main memory at a time.
from contextlib import contextmanager
from typing import Iterator
import fastavro
@contextmanager
def stream_from_blob(self, blob_or_uri: Union[str, Blob]) -> Iterator[Iterable]:
with io.BytesIO() as stream:
self.client.download_blob_to_file(blob_or_uri, stream)
stream.seek(0)
yield self.io_handler.from_stream(stream)
BlobHandler.stream_from_blob = stream_from_blob
class Avro2PandasChunckGen(StreamConverter):
chuncksize: int
def from_stream(self, stream: io.BytesIO) -> Iterator[pd.DataFrame]:
record_stream = fastavro.reader(stream)
while True:
records = []
for i, record in zip(range(self.chuncksize), record_stream):
records.append(record)
if len(records) == 0:
break
yield pd.DataFrame.from_records(records)
Reading during training
A fundamental problem with large amounts of data is the time-consuming reading process. The techniques described so far assume that the data set (or a batch of it) is first read, and the model trained subsequently. However, training with a model in batches wastes time.
Reading of data is not very intensive computationally, and depends mainly on available bandwidth. It therefore makes sense to load the next rows of data while the model is being trained on the basis of already loaded data.
The basic idea is outlined next. A Produce thread reads data row by row, while a Consume thread provides data already read.
from queue import Queue
from threading import Thread
class PrefetchAvro2PandasChunckGen(StreamConverter):
stack: Queue = Queue()
def _produce(self, stream):
for record in fastavro.reader(stream):
self.stack.put(record)
self.stack.put(None)
def _consume(self):
while record := self.stack.get() is not None:
yield record
def from_stream(self, stream: io.BytesIO) -> Iterator:
producer_thread = Thread(target=lambda: self._produce(stream))
producer_thread.start()
for record in self._consume():
yield record
producer_thread.join()
Instance 4: Implementation with TensorFlow
The previous sections have shown that efficient reading of machine-learning data is a complex undertaking – multiple threads are required for optimal performance. In addition, care must be taken not to run out of memory. TensorFlow is aware of this problem. In addition to implementation of deep-learning architectures, IO and pre-processing are now also the focus of the open-source project.
TensorFlow offers ready-made interfaces for various cloud sources such as Blob Storage, BigQuery and a variety of other sources.
import tensorflow as tf
COLUMN_NAMES = [str(i) for i in range(14)]
FIELD_DEFAULTS = [[float(i)] for i in range(14)]
def csv_to_record(line: str):
fields = tf.io.decode_csv(line, FIELD_DEFAULTS)
records = dict(zip(COLUMN_NAMES, fields))
return records
def create_dataset(list_of_files: List[str]) -> tf.data.Dataset:
dataset = tf.data.TextLineDataset(list_of_files) \
.prefetch(buffer_size=tf.data.AUTOTUNE) \
.batch(batch_size=1000) \
.map(csv_to_record, num_parallel_calls=tf.data.AUTOTUNE)
return dataset
The code here shows how several CSV files are read in parallel and processed in chunks. This way, just a few lines of code are needed to implement the techniques presented so far.
The flexibility of the tf data set and the ability to export as a NumPy iterator allow versatile use and also establish relevance for models not associated with TensorFlow. As an added bonus, TensorFlow offers its own Profiler which helps identify performance bottlenecks in the pipeline.
Try it out for yourself. Ideally, you should start with one of the tensor flow tutorials.