Data Preprocessor#

The Data Preprocessor reads node, edge and respective feature data from a data source, and produces preprocessed / transformed versions of all this data, for subsequent components to use. It uses Tensorflow Transform to achieve data transformation in a distributed fashion, and allows for transformations like categorical encoding, scaling, normalization, casting and more.

Input#

  • job_name (AppliedTaskIdentifier): which uniquely identifies an end-to-end task.

  • task_config_uri (Uri): Path which points to a “template” GbmlConfig proto yaml file.

  • resource_config_uri (Uri): Path which points to a GiGLResourceConfig yaml

  • Optional: custom_worker_image_uri: Path to docker file to be used for dataflow worker harness image

What does it do?#

The Data Preprocessor undertakes the following actions

  • Reads frozen GbmlConfig proto yaml, which contains a pointer to a user-defined instance of the DataPreprocessorConfig class (see dataPreprocessorConfigClsPath field of datasetConfig.dataPreprocessorConfig). This class houses logic for

    • Preparing datasets for ingestion and transformation (see prepare_for_pipeline)

    • Defining transformation imperatives for different node types (get_nodes_preprocessing_spec)

    • Defining transformation imperatives for different edge types (get_edges_preprocessing_spec)

    Custom arguments can also be passed into the DataPreprocessorConfig class by including them in the dataPreprocessorArgs field inside datasetConfig.dataPreprocessorConfig section of GbmlConfig.

  • Builds a GraphMetadata proto instance, which contains information about the node types (e.g. “user”) and edge types in the graph (e.g. “user-friends-user”), and assigns them corresponding “condensed” integer node and edge types.

  • Runs an “enumeration” step to internally map all the node ids to integers to mitigate space overhead. Other components operate on these enumerated identifiers to reduce storage footprint, memory overhead, and network traffic.

  • For each node and edge type, spins up a Dataflow job which manifests a Tensorflow Transform pipeline to operationalize the user-defined transformations specified in the get_nodes_preprocessing_spec and get_edges_preprocessing_spec functions inside the user-specified DataPreprocessorConfig instance. The pipelines write out transformed features as TFRecords, and a schema to help parse them, the inferred Tensorflow transform function for each feature-set, and other metadata to GCS.

How do I run it?#

Import GiGL

from gigl.src.data_preprocessor.data_preprocessor import DataPreprocessor
from gigl.common import UriFactory
from gigl.src.common.types import AppliedTaskIdentifier

data_preprocessor = DataPreprocessor()

data_preprocessor.run(
    applied_task_identifier=AppliedTaskIdentifier("my_gigl_job_name"),
    task_config_uri=UriFactory.create_uri("gs://my-temp-assets-bucket/task_config.yaml"),
    resource_config_uri=UriFactory.create_uri("gs://my-temp-assets-bucket/resource_config.yaml")
    custom_worker_image_uri="gcr.io/project/directory/dataflow_image:x.x.x",  # Optional
)

Command Line

python -m \
    gigl.src.data_preprocessor.data_preprocessor \
    --job_name my_gigl_job_name \
    --task_config_uri "gs://my-temp-assets-bucket/task_config.yaml"
    --resource_config_uri="gs://my-temp-assets-bucket/resource_config.yaml"

Output#

Upon completing the Dataflow jobs referenced in the last bullet point of What above, the component writes out a PreprocessedMetadata proto to URI specified by the preprocessedMetadataUri field in the sharedConfig section of the frozen GbmlConfig.

This proto houses information about

  • The inferred GraphMetadata

  • A map of all condensed node types to NodeMetadataOutput protos

  • A map of all condensed edge types to EdgeMetadataOutput protos

NodeMetadataOutput and EdgeMetadataOutput protos store information about the paths mentioned in the above bullet point, and relevant metadata including the fields in each TFExample which store node/edge identifiers, feature keys, labels, etc. PreprocessedMetadata will be read from this URI by other components.

Custom Usage#

  • The actions this component undertakes are largely determined by the imperative transformation logic specified in the user-provided DataPreprocessorConfig class instance. This leaves much to user control. Please take a look at the instance provided at the dataPreprocessorConfigClsPath field of datasetConfig.dataPreprocessorConfig in order to learn more. For an example dataPreprocessorConfig, see here

  • In order to customize transformation logic for existing node features, take a look at preprocessing functions in Tensorflow Transform documentation. In order to add or remove node and edge features, you can modify the logic in feature_spec_fn and preprocessing_fn housed by NodeDataPreprocessingSpec and EdgeDataPreprocessingSpec. You can use the build_ingestion_feature_spec_fn function to conveniently generate feature specs which allow you to ingest and then transform these fields

  • Note that the identifier fields (indicating node id, edge src node id, or edge dst node id) are always designated as integer types due to the enumeration steps which precedes the Tensorflow Transform jobs.

Other#

  • Design: The design of this component is intended to leave maximal flexibility to the user in defining how they want to preprocess and transform their data. These steps are unlikely to be the same across many different custom pipelines (e.g. which fields to categorically encode, which to normalize, etc.) and thus we opted for a user-defined class to house as much code as possible which could be natively written by someone familiar with Tensorflow Transform.

  • Debugging: The core logic of this component executes in Dataflow. A link to the Dataflow job will be printed in the logs of the component, which can be used to navigate to the Dataflow console and see fine-grained logging of the Dataflow pipeline.