Data Preprocessor#
The Data Preprocessor reads node, edge and respective feature data from a data source, and produces preprocessed / transformed versions of all this data, for subsequent components to use. It uses Tensorflow Transform to achieve data transformation in a distributed fashion, and allows for transformations like categorical encoding, scaling, normalization, casting and more.
Input#
job_name (AppliedTaskIdentifier): which uniquely identifies an end-to-end task.
task_config_uri (Uri): Path which points to a “template”
GbmlConfig
proto yaml file.resource_config_uri (Uri): Path which points to a
GiGLResourceConfig
yamlOptional: custom_worker_image_uri: Path to docker file to be used for dataflow worker harness image
What does it do?#
The Data Preprocessor undertakes the following actions
Reads frozen
GbmlConfig
proto yaml, which contains a pointer to a user-defined instance of theDataPreprocessorConfig
class (seedataPreprocessorConfigClsPath
field ofdatasetConfig.dataPreprocessorConfig
). This class houses logic forPreparing datasets for ingestion and transformation (see
prepare_for_pipeline
)Defining transformation imperatives for different node types (
get_nodes_preprocessing_spec
)Defining transformation imperatives for different edge types (
get_edges_preprocessing_spec
)
Custom arguments can also be passed into the
DataPreprocessorConfig
class by including them in thedataPreprocessorArgs
field insidedatasetConfig.dataPreprocessorConfig
section ofGbmlConfig
.Builds a
GraphMetadata
proto instance, which contains information about the node types (e.g. “user”) and edge types in the graph (e.g. “user-friends-user”), and assigns them corresponding “condensed” integer node and edge types.Runs an “enumeration” step to internally map all the node ids to integers to mitigate space overhead. Other components operate on these enumerated identifiers to reduce storage footprint, memory overhead, and network traffic.
For each node and edge type, spins up a Dataflow job which manifests a Tensorflow Transform pipeline to operationalize the user-defined transformations specified in the
get_nodes_preprocessing_spec
andget_edges_preprocessing_spec
functions inside the user-specifiedDataPreprocessorConfig
instance. The pipelines write out transformed features as TFRecords, and a schema to help parse them, the inferred Tensorflow transform function for each feature-set, and other metadata to GCS.
How do I run it?#
Import GiGL
from gigl.src.data_preprocessor.data_preprocessor import DataPreprocessor
from gigl.common import UriFactory
from gigl.src.common.types import AppliedTaskIdentifier
data_preprocessor = DataPreprocessor()
data_preprocessor.run(
applied_task_identifier=AppliedTaskIdentifier("my_gigl_job_name"),
task_config_uri=UriFactory.create_uri("gs://my-temp-assets-bucket/task_config.yaml"),
resource_config_uri=UriFactory.create_uri("gs://my-temp-assets-bucket/resource_config.yaml")
custom_worker_image_uri="gcr.io/project/directory/dataflow_image:x.x.x", # Optional
)
Command Line
python -m \
gigl.src.data_preprocessor.data_preprocessor \
--job_name my_gigl_job_name \
--task_config_uri "gs://my-temp-assets-bucket/task_config.yaml"
--resource_config_uri="gs://my-temp-assets-bucket/resource_config.yaml"
Output#
Upon completing the Dataflow jobs referenced in the last bullet point of What above, the component writes out a PreprocessedMetadata
proto to URI specified by the preprocessedMetadataUri
field in the sharedConfig
section of the frozen GbmlConfig
.
This proto houses information about
The inferred
GraphMetadata
A map of all condensed node types to
NodeMetadataOutput
protosA map of all condensed edge types to
EdgeMetadataOutput
protos
NodeMetadataOutput
and EdgeMetadataOutput
protos store information about the paths mentioned in the above bullet point, and relevant metadata including the fields in each TFExample which store node/edge identifiers, feature keys, labels, etc. PreprocessedMetadata
will be read from this URI by other components.
Custom Usage#
The actions this component undertakes are largely determined by the imperative transformation logic specified in the user-provided
DataPreprocessorConfig
class instance. This leaves much to user control. Please take a look at the instance provided at thedataPreprocessorConfigClsPath
field ofdatasetConfig
.dataPreprocessorConfig
in order to learn more. For an exampledataPreprocessorConfig
, see hereIn order to customize transformation logic for existing node features, take a look at preprocessing functions in Tensorflow Transform documentation. In order to add or remove node and edge features, you can modify the logic in
feature_spec_fn
andpreprocessing_fn
housed byNodeDataPreprocessingSpec
andEdgeDataPreprocessingSpec
. You can use thebuild_ingestion_feature_spec_fn
function to conveniently generate feature specs which allow you to ingest and then transform these fieldsNote that the identifier fields (indicating node id, edge src node id, or edge dst node id) are always designated as integer types due to the enumeration steps which precedes the Tensorflow Transform jobs.
Other#
Design: The design of this component is intended to leave maximal flexibility to the user in defining how they want to preprocess and transform their data. These steps are unlikely to be the same across many different custom pipelines (e.g. which fields to categorically encode, which to normalize, etc.) and thus we opted for a user-defined class to house as much code as possible which could be natively written by someone familiar with Tensorflow Transform.
Debugging: The core logic of this component executes in Dataflow. A link to the Dataflow job will be printed in the logs of the component, which can be used to navigate to the Dataflow console and see fine-grained logging of the Dataflow pipeline.