diff --git a/CHANGELOG.md b/CHANGELOG.md index 1162e93..6b4bab2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,33 @@ +## v2.40.0 (2026-05-06) + +### Documentation + +* docs: README restructured (#39) + +- Installation moved further up, above the massive table +- Quick start shows a quick example +- Sklearn removed as a proper usage pattern +- Reduced size by ~40% +- Much more readable ([`28c70ba`](https://github.com/ExpediaGroup/kamae/commit/28c70ba817d079b21fe668d5fc11dbf1492d1c6e)) + +### Feature + +* feat: pairwise sim and array reduce max (#44) + +* adding modules for pairwise similarity + +* tests for pairwise similarity + +* adding the new modules on README and tests of serialisation + +* formatting issues + +* fix header ([`579612b`](https://github.com/ExpediaGroup/kamae/commit/579612b13b3e638522ee577abbeb4d592bfcfce3)) + + ## v2.39.1 (2026-04-14) ### Fix diff --git a/README.md b/README.md index 3101bcc..761d453 100644 --- a/README.md +++ b/README.md @@ -80,6 +80,7 @@ For Scikit-learn support (experimental, unmaintained), see [sklearn examples](ex | AbsoluteValue | Applies the `abs(x)` transform. | [Link](src/kamae/tensorflow/layers/absolute_value.py) | [Link](src/kamae/spark/transformers/absolute_value.py) | Not yet implemented | | ArrayConcatenate | Assembles multiple features into a single array. | [Link](src/kamae/tensorflow/layers/array_concatenate.py) | [Link](src/kamae/spark/transformers/array_concatenate.py) | [Link](src/kamae/sklearn/transformers/array_concatenate.py) | | ArrayCrop | Crops or pads a feature array to a consistent size. | [Link](src/kamae/tensorflow/layers/array_crop.py) | [Link](src/kamae/spark/transformers/array_crop.py) | Not yet implemented | +| ArrayReduceMax | Reduces the last dimension of a tensor by taking the maximum. | [Link](src/kamae/tensorflow/layers/array_reduce_max.py) | [Link](src/kamae/spark/transformers/array_reduce_max.py) | Not yet implemented | | ArraySplit | Splits a feature array into multiple features. | [Link](src/kamae/tensorflow/layers/array_split.py) | [Link](src/kamae/spark/transformers/array_split.py) | [Link](src/kamae/sklearn/transformers/array_split.py) | | ArraySubtractMinimum | Subtracts the minimum element in an array from therest to compute a timestamp difference. Ignores padded values. | [Link](src/kamae/tensorflow/layers/array_subtract_minimum.py) | [Link](src/kamae/spark/transformers/array_subtract_minimum.py) | Not yet implemented | | BearingAngle | Compute the bearing angle (https://en.wikipedia.org/wiki/Bearing_(navigation)) between two pairs of lat/long. | [Link](src/kamae/tensorflow/layers/bearing_angle.py) | [Link](src/kamae/spark/transformers/bearing_angle.py) | Not yet implemented | @@ -124,6 +125,7 @@ For Scikit-learn support (experimental, unmaintained), see [sklearn examples](ex | NumericalIfStatement | Performs a simple if else statement witha given operator. Value to check, result if true or false can be constants or features. | [Link](src/kamae/tensorflow/layers/numerical_if_statement.py) | [Link](src/kamae/spark/transformers/numerical_if_statement.py) | Not yet implemented | | OneHotEncode | Transforms a string to a one-hot array. | [Link](src/kamae/tensorflow/layers/one_hot_encode.py) | [Link](src/kamae/spark/estimators/one_hot_encode.py) | Not yet implemented | | OrdinalArrayEncode | Encodes strings in an array according to the order in which they appear. Only for 2D tensors. | [Link](src/kamae/tensorflow/layers/ordinal_array_encoder.py) | [Link](src/kamae/spark/estimators/ordinal_array_encoder.py) | Not yet implemented | +| PairwiseCosineSimilarity | Computes the cosine similarity between an embedding and a list of candidate embeddings. | [Link](src/kamae/tensorflow/layers/pairwise_cosine_similarity.py) | [Link](src/kamae/spark/transformers/pairwise_cosine_similarity.py) | Not yet implemented | | Round | Rounds a floating feature to the nearest integer using `ceil`, `floor` or a standard `round` op. | [Link](src/kamae/tensorflow/layers/round.py) | [Link](src/kamae/spark/transformers/round.py) | Not yet implemented | | RoundToDecimal | Rounds a floating feature to the nearest decimal precision. | [Link](src/kamae/tensorflow/layers/round_to_decimal.py) | [Link](src/kamae/spark/transformers/round_to_decimal.py) | Not yet implemented | | SharedOneHotEncode | Transforms a string to a one-hot array, using labels across multiple inputs to determine the one-hot size. | [Link](src/kamae/tensorflow/layers/one_hot_encode.py) | [Link](src/kamae/spark/estimators/shared_one_hot_encode.py) | Not yet implemented | diff --git a/src/kamae/__init__.py b/src/kamae/__init__.py index 1280b61..b020649 100644 --- a/src/kamae/__init__.py +++ b/src/kamae/__init__.py @@ -19,5 +19,5 @@ transformation layers. """ -__version__ = "2.39.1" +__version__ = "2.40.0" __name__ = "kamae" diff --git a/src/kamae/spark/transformers/__init__.py b/src/kamae/spark/transformers/__init__.py index f9f767d..76ce6b7 100644 --- a/src/kamae/spark/transformers/__init__.py +++ b/src/kamae/spark/transformers/__init__.py @@ -15,6 +15,7 @@ from .absolute_value import AbsoluteValueTransformer # noqa: F401 from .array_concatenate import ArrayConcatenateTransformer # noqa: F401 from .array_crop import ArrayCropTransformer # noqa: F401 +from .array_reduce_max import ArrayReduceMaxTransformer # noqa: F401 from .array_split import ArraySplitTransformer # noqa: F401 from .array_subtract_minimum import ArraySubtractMinimumTransformer # noqa: F401 from .base import BaseTransformer # noqa: F401 @@ -64,6 +65,9 @@ from .numerical_if_statement import NumericalIfStatementTransformer # noqa: F401 from .one_hot_encode import OneHotEncodeTransformer # noqa: F401 from .ordinal_array_encode import OrdinalArrayEncodeTransformer # noqa: F401 +from .pairwise_cosine_similarity import ( # noqa: F401 + PairwiseCosineSimilarityTransformer, +) from .round import RoundTransformer # noqa: F401 from .round_to_decimal import RoundToDecimalTransformer # noqa: F401 from .shared_one_hot_encode import SharedOneHotEncodeTransformer # noqa: F401 diff --git a/src/kamae/spark/transformers/array_reduce_max.py b/src/kamae/spark/transformers/array_reduce_max.py new file mode 100644 index 0000000..0b498da --- /dev/null +++ b/src/kamae/spark/transformers/array_reduce_max.py @@ -0,0 +1,95 @@ +# Copyright [2024] Expedia, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import List, Optional + +import pyspark.sql.functions as F +import tensorflow as tf +from pyspark import keyword_only +from pyspark.ml.param import Param, Params, TypeConverters +from pyspark.sql import DataFrame +from pyspark.sql.types import DataType, DoubleType, FloatType + +from kamae.spark.params import SingleInputSingleOutputParams +from kamae.spark.utils import single_input_single_output_array_transform +from kamae.tensorflow.layers import ArrayReduceMaxLayer + +from .base import BaseTransformer + + +class ArrayReduceMaxTransformer( + BaseTransformer, + SingleInputSingleOutputParams, +): + """ + Reduces an array column to its maximum element. + + Input: Array[Float/Double] of size N. + Output: Float/Double scalar (the maximum element). + + Returns defaultValue when the array is empty or null. + """ + + defaultValue = Param( + Params._dummy(), + "defaultValue", + "Value to return when the array is empty or null.", + typeConverter=TypeConverters.toFloat, + ) + + @keyword_only + def __init__( + self, + inputCol: Optional[str] = None, + outputCol: Optional[str] = None, + inputDtype: Optional[str] = None, + outputDtype: Optional[str] = None, + layerName: Optional[str] = None, + defaultValue: float = 0.0, + ) -> None: + super().__init__() + self._setDefault(defaultValue=0.0) + kwargs = self._input_kwargs + self.setParams(**kwargs) + + def setDefaultValue(self, value: float) -> "ArrayReduceMaxTransformer": + return self._set(defaultValue=value) + + def getDefaultValue(self) -> float: + return self.getOrDefault(self.defaultValue) + + @property + def compatible_dtypes(self) -> Optional[List[DataType]]: + return [FloatType(), DoubleType()] + + def _transform(self, dataset: DataFrame) -> DataFrame: + input_col = F.col(self.getInputCol()) + default = self.getDefaultValue() + + output_col = single_input_single_output_array_transform( + input_col=input_col, + input_col_datatype=self.get_column_datatype( + dataset=dataset, column_name=self.getInputCol() + ), + func=lambda x: F.coalesce(F.array_max(x), F.lit(default)), + ) + return dataset.withColumn(self.getOutputCol(), output_col) + + def get_tf_layer(self) -> tf.keras.layers.Layer: + return ArrayReduceMaxLayer( + name=self.getLayerName(), + input_dtype=self.getInputTFDtype(), + output_dtype=self.getOutputTFDtype(), + default_value=self.getDefaultValue(), + ) diff --git a/src/kamae/spark/transformers/pairwise_cosine_similarity.py b/src/kamae/spark/transformers/pairwise_cosine_similarity.py new file mode 100644 index 0000000..9165392 --- /dev/null +++ b/src/kamae/spark/transformers/pairwise_cosine_similarity.py @@ -0,0 +1,136 @@ +# Copyright [2024] Expedia, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import List, Optional + +import pyspark.sql.functions as F +import tensorflow as tf +from pyspark import keyword_only +from pyspark.ml.param import Param, Params, TypeConverters +from pyspark.sql import Column, DataFrame +from pyspark.sql.types import ArrayType, DataType, DoubleType, FloatType + +from kamae.spark.params import MultiInputSingleOutputParams +from kamae.tensorflow.layers import PairwiseCosineSimilarityLayer + +from .base import BaseTransformer + + +class PairwiseCosineSimilarityTransformer( + BaseTransformer, + MultiInputSingleOutputParams, +): + """ + Computes pairwise cosine similarity between a query embedding and each + candidate embedding packed into a flat array. + + Input 0: query embedding as Array[Float] of size D. + Input 1: flat candidate embeddings as Array[Float] of size N*D. + Output: Array[Float] of size N containing cosine similarities. + """ + + embeddingDim = Param( + Params._dummy(), + "embeddingDim", + "Dimension of each embedding vector.", + typeConverter=TypeConverters.toInt, + ) + + @keyword_only + def __init__( + self, + inputCols: Optional[List[str]] = None, + outputCol: Optional[str] = None, + inputDtype: Optional[str] = None, + outputDtype: Optional[str] = None, + layerName: Optional[str] = None, + embeddingDim: Optional[int] = None, + ) -> None: + super().__init__() + kwargs = self._input_kwargs + self.setParams(**kwargs) + + def setEmbeddingDim(self, value: int) -> "PairwiseCosineSimilarityTransformer": + return self._set(embeddingDim=value) + + def getEmbeddingDim(self) -> int: + return self.getOrDefault(self.embeddingDim) + + @property + def compatible_dtypes(self) -> Optional[List[DataType]]: + return [FloatType(), DoubleType()] + + def setInputCols(self, value: List[str]) -> "PairwiseCosineSimilarityTransformer": + if len(value) != 2: + raise ValueError( + f"Expected 2 input columns, received {len(value)} instead." + ) + return self._set(inputCols=value) + + def _transform(self, dataset: DataFrame) -> DataFrame: + input_col_names = self.getInputCols() + embedding_dim = self.getEmbeddingDim() + + query_col = F.col(input_col_names[0]) + flat_candidates_col = F.col(input_col_names[1]) + + for col_name in input_col_names: + dtype = self.get_column_datatype(dataset=dataset, column_name=col_name) + if not isinstance(dtype, ArrayType): + raise TypeError(f"Expected ArrayType for {col_name}, got {dtype}.") + + num_candidates = (F.size(flat_candidates_col) / F.lit(embedding_dim)).cast( + "int" + ) + indices = F.sequence(F.lit(0), num_candidates - F.lit(1)) + + query_norm = F.sqrt( + F.aggregate( + query_col, + F.lit(0.0).cast("double"), + lambda acc, x: acc + (x * x).cast("double"), + ) + ) + + def cosine_sim_at_index(idx: Column) -> Column: + candidate = F.slice( + flat_candidates_col, + idx * F.lit(embedding_dim) + F.lit(1), + embedding_dim, + ) + zipped = F.arrays_zip(query_col.alias("q"), candidate.alias("c")) + dot = F.aggregate( + zipped, + F.lit(0.0).cast("double"), + lambda acc, pair: acc + (pair["q"] * pair["c"]).cast("double"), + ) + cand_norm = F.sqrt( + F.aggregate( + candidate, + F.lit(0.0).cast("double"), + lambda acc, x: acc + (x * x).cast("double"), + ) + ) + return F.coalesce(dot / (query_norm * cand_norm), F.lit(0.0)) + + similarities = F.transform(indices, cosine_sim_at_index) + return dataset.withColumn(self.getOutputCol(), similarities) + + def get_tf_layer(self) -> tf.keras.layers.Layer: + return PairwiseCosineSimilarityLayer( + name=self.getLayerName(), + input_dtype=self.getInputTFDtype(), + output_dtype=self.getOutputTFDtype(), + embedding_dim=self.getEmbeddingDim(), + ) diff --git a/src/kamae/tensorflow/layers/__init__.py b/src/kamae/tensorflow/layers/__init__.py index bc63216..da97195 100644 --- a/src/kamae/tensorflow/layers/__init__.py +++ b/src/kamae/tensorflow/layers/__init__.py @@ -15,6 +15,7 @@ from .absolute_value import AbsoluteValueLayer # noqa: F401 from .array_concatenate import ArrayConcatenateLayer # noqa: F401 from .array_crop import ArrayCropLayer # noqa: F401 +from .array_reduce_max import ArrayReduceMaxLayer # noqa: F401 from .array_split import ArraySplitLayer # noqa: F401 from .array_subtract_minimum import ArraySubtractMinimumLayer # noqa: F401 from .bearing_angle import BearingAngleLayer # noqa: F401 @@ -59,6 +60,7 @@ from .numerical_if_statement import NumericalIfStatementLayer # noqa: F401 from .one_hot_encode import OneHotEncodeLayer, OneHotLayer # noqa: F401 from .ordinal_array_encode import OrdinalArrayEncodeLayer # noqa: F401 +from .pairwise_cosine_similarity import PairwiseCosineSimilarityLayer # noqa: F401 from .round import RoundLayer # noqa: F401 from .round_to_decimal import RoundToDecimalLayer # noqa: F401 from .standard_scale import StandardScaleLayer # noqa: F401 diff --git a/src/kamae/tensorflow/layers/array_reduce_max.py b/src/kamae/tensorflow/layers/array_reduce_max.py new file mode 100644 index 0000000..7482bc2 --- /dev/null +++ b/src/kamae/tensorflow/layers/array_reduce_max.py @@ -0,0 +1,64 @@ +# Copyright [2024] Expedia, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Any, Dict, Iterable, List, Optional + +import tensorflow as tf + +import kamae +from kamae.tensorflow.typing import Tensor +from kamae.tensorflow.utils import enforce_single_tensor_input + +from .base import BaseLayer + + +@tf.keras.utils.register_keras_serializable(package=kamae.__name__) +class ArrayReduceMaxLayer(BaseLayer): + """ + Reduces the last dimension of a tensor by taking the maximum. + + Input: (..., N) + Output: (...) + """ + + def __init__( + self, + name: Optional[str] = None, + input_dtype: Optional[str] = None, + output_dtype: Optional[str] = None, + default_value: float = 0.0, + **kwargs: Any, + ) -> None: + super().__init__( + name=name, input_dtype=input_dtype, output_dtype=output_dtype, **kwargs + ) + self.default_value = default_value + + @property + def compatible_dtypes(self) -> Optional[List[tf.dtypes.DType]]: + return [tf.bfloat16, tf.float16, tf.float32, tf.float64] + + @enforce_single_tensor_input + def _call(self, inputs: Tensor, **kwargs: Any) -> Tensor: + result = tf.reduce_max(inputs, axis=-1) + return tf.where( + tf.math.is_nan(result), + tf.constant(self.default_value, dtype=result.dtype), + result, + ) + + def get_config(self) -> Dict[str, Any]: + config = super().get_config() + config.update({"default_value": self.default_value}) + return config diff --git a/src/kamae/tensorflow/layers/pairwise_cosine_similarity.py b/src/kamae/tensorflow/layers/pairwise_cosine_similarity.py new file mode 100644 index 0000000..c3664a6 --- /dev/null +++ b/src/kamae/tensorflow/layers/pairwise_cosine_similarity.py @@ -0,0 +1,90 @@ +# Copyright [2024] Expedia, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Any, Dict, Iterable, List, Optional + +import tensorflow as tf + +import kamae +from kamae.tensorflow.typing import Tensor +from kamae.tensorflow.utils import enforce_multiple_tensor_input + +from .base import BaseLayer + + +@tf.keras.utils.register_keras_serializable(package=kamae.__name__) +class PairwiseCosineSimilarityLayer(BaseLayer): + """ + Computes pairwise cosine similarity between a query embedding and + each candidate embedding packed in a flat array. + + Input 0: (..., D) -- query embedding + Input 1: (..., N * D) -- flat candidate embeddings + Output: (..., N) -- cosine similarity per candidate + """ + + def __init__( + self, + name: Optional[str] = None, + input_dtype: Optional[str] = None, + output_dtype: Optional[str] = None, + embedding_dim: int = 32, + **kwargs: Any, + ) -> None: + super().__init__( + name=name, input_dtype=input_dtype, output_dtype=output_dtype, **kwargs + ) + self.embedding_dim = embedding_dim + + @property + def compatible_dtypes(self) -> Optional[List[tf.dtypes.DType]]: + return [tf.bfloat16, tf.float16, tf.float32, tf.float64] + + @enforce_multiple_tensor_input + def _call(self, inputs: Iterable[Tensor], **kwargs: Any) -> Tensor: + if len(inputs) != 2: + raise ValueError(f"Expected 2 inputs, received {len(inputs)} instead.") + + query = inputs[0] # (..., D) + flat_candidates = inputs[1] # (..., N*D) + + # Reshape: (..., N*D) -> (..., N, D) + orig_shape = tf.shape(flat_candidates) + num_candidates = orig_shape[-1] // self.embedding_dim + new_shape = tf.concat( + [orig_shape[:-1], [num_candidates, self.embedding_dim]], axis=0 + ) + candidates = tf.reshape(flat_candidates, new_shape) + + # (..., D) -> (..., 1, D) for broadcasting + query_expanded = tf.expand_dims(query, axis=-2) + + # L2 normalize along embedding dimension + q_norm = tf.nn.l2_normalize(query_expanded, axis=-1) + c_norm = tf.nn.l2_normalize(candidates, axis=-1) + + # Dot product along last axis: (..., N) + similarities = tf.reduce_sum(tf.multiply(q_norm, c_norm), axis=-1) + + # Zero-vector → NaN from normalization → replace with 0.0 + return tf.where( + tf.math.is_nan(similarities), + tf.zeros_like(similarities), + similarities, + ) + + def get_config(self) -> Dict[str, Any]: + config = super().get_config() + config.update({"embedding_dim": self.embedding_dim}) + return config diff --git a/tests/kamae/spark/transformers/test_array_reduce_max.py b/tests/kamae/spark/transformers/test_array_reduce_max.py new file mode 100644 index 0000000..ea8430d --- /dev/null +++ b/tests/kamae/spark/transformers/test_array_reduce_max.py @@ -0,0 +1,126 @@ +# Copyright [2024] Expedia, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import numpy as np +import pytest +import tensorflow as tf +from pyspark.sql.types import ArrayType, FloatType, StructField, StructType + +from kamae.spark.transformers import ArrayReduceMaxTransformer + + +class TestArrayReduceMaxTransformer: + @pytest.fixture(scope="class") + def input_df(self, spark_session): + return spark_session.createDataFrame( + [ + ([3.0, 1.0, 2.0],), + ([0.0, 5.0, 4.0],), + ([-3.0, -1.0, -2.0],), + ], + ["values"], + ) + + def test_returns_maximum_of_each_row(self, input_df): + transformer = ArrayReduceMaxTransformer(inputCol="values", outputCol="result") + result = transformer.transform(input_df).select("result").collect() + + assert [row.result for row in result] == pytest.approx([3.0, 5.0, -1.0]) + + def test_default_value_for_empty_array(self, spark_session): + schema = StructType( + [StructField("values", ArrayType(FloatType()), nullable=True)] + ) + + df = spark_session.createDataFrame( + [([],)], + schema=schema, + ) + + transformer = ArrayReduceMaxTransformer( + inputCol="values", outputCol="result", defaultValue=-99.0 + ) + result = transformer.transform(df).select("result").collect() + + assert result[0].result == pytest.approx(-99.0) + + @pytest.mark.parametrize( + "rows, input_dtype, output_dtype, default_value", + [ + # default dtypes + ( + [[3.0, 1.0, 2.0], [0.0, 5.0, 4.0], [-3.0, -1.0, -2.0]], + None, + None, + 0.0, + ), + # float input, double output + ( + [[3.0, 1.0, 2.0], [0.0, 5.0, 4.0], [-3.0, -1.0, -2.0]], + "float", + "double", + 0.0, + ), + # double input, float output + ( + [[3.0, 1.0, 2.0], [0.0, 5.0, 4.0], [-3.0, -1.0, -2.0]], + "double", + "float", + 0.0, + ), + # different array length (5 elements) + ( + [[5.0, 3.0, 1.0, 4.0, 2.0], [-1.0, -3.0, -2.0, -5.0, -4.0]], + None, + None, + 0.0, + ), + # non-default defaultValue is forwarded correctly to TF layer + ( + [[1.0, 2.0, 3.0], [-5.0, -4.0, -6.0]], + None, + None, + -99.0, + ), + ], + ) + def test_spark_tf_parity( + self, spark_session, rows, input_dtype, output_dtype, default_value + ): + transformer = ArrayReduceMaxTransformer( + inputCol="values", + outputCol="result", + inputDtype=input_dtype, + outputDtype=output_dtype, + defaultValue=default_value, + ) + + spark_df = spark_session.createDataFrame([(row,) for row in rows], ["values"]) + spark_values = ( + transformer.transform(spark_df) + .select("result") + .rdd.map(lambda r: r[0]) + .collect() + ) + + inputs = tf.constant(rows, dtype=tf.float32) + keras_values = transformer.get_tf_layer()(inputs).numpy().tolist() + + np.testing.assert_almost_equal( + spark_values, + keras_values, + decimal=4, + err_msg="Spark and TensorFlow outputs are not equal", + ) diff --git a/tests/kamae/spark/transformers/test_pairwise_cosine_similarity.py b/tests/kamae/spark/transformers/test_pairwise_cosine_similarity.py new file mode 100644 index 0000000..a737451 --- /dev/null +++ b/tests/kamae/spark/transformers/test_pairwise_cosine_similarity.py @@ -0,0 +1,168 @@ +# Copyright [2024] Expedia, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import numpy as np +import pytest +import tensorflow as tf + +from kamae.spark.transformers import PairwiseCosineSimilarityTransformer + + +class TestPairwiseCosineSimilarityTransformer: + @pytest.fixture(scope="class") + def input_df(self, spark_session): + # query: [1, 0], candidates packed flat: [1, 0, 0, 1] → 2 candidates of dim=2 + return spark_session.createDataFrame( + [ + ( + [1.0, 0.0], + [1.0, 0.0, 0.0, 1.0], + ), # identical + orthogonal → [1.0, 0.0] + ( + [0.0, 1.0], + [0.0, 1.0, 1.0, 0.0], + ), # identical + orthogonal → [1.0, 0.0] + ], + ["query", "candidates"], + ) + + def test_returns_cosine_similarity_per_candidate(self, input_df): + transformer = PairwiseCosineSimilarityTransformer( + inputCols=["query", "candidates"], + outputCol="scores", + embeddingDim=2, + ) + result = transformer.transform(input_df).select("scores").collect() + + np.testing.assert_array_almost_equal(result[0].scores, [1.0, 0.0]) + np.testing.assert_array_almost_equal(result[1].scores, [1.0, 0.0]) + + def test_opposite_vectors_give_minus_one(self, spark_session): + df = spark_session.createDataFrame( + [([1.0, 0.0], [-1.0, 0.0])], + ["query", "candidates"], + ) + transformer = PairwiseCosineSimilarityTransformer( + inputCols=["query", "candidates"], + outputCol="scores", + embeddingDim=2, + ) + result = transformer.transform(df).select("scores").collect() + + np.testing.assert_array_almost_equal(result[0].scores, [-1.0]) + + def test_wrong_number_of_input_cols_raises(self): + with pytest.raises(ValueError): + PairwiseCosineSimilarityTransformer( + inputCols=["a"], + outputCol="scores", + embeddingDim=2, + ) + + @pytest.mark.parametrize( + "queries, flat_candidates, embedding_dim, input_dtype, output_dtype", + [ + # default dtypes, dim=2, 2 candidates + ( + [[1.0, 0.0], [0.0, 1.0]], + [[1.0, 0.0, 0.0, 1.0], [0.0, 1.0, 1.0, 0.0]], + 2, + None, + None, + ), + # float input, double output + ( + [[1.0, 0.0], [0.0, 1.0]], + [[1.0, 0.0, 0.0, 1.0], [0.0, 1.0, 1.0, 0.0]], + 2, + "float", + "double", + ), + # double input, float output + ( + [[1.0, 0.0], [0.0, 1.0]], + [[1.0, 0.0, 0.0, 1.0], [0.0, 1.0, 1.0, 0.0]], + 2, + "double", + "float", + ), + # dim=3, 3 candidates + ( + [[1.0, 0.0, 0.0], [0.0, 1.0, 0.0]], + [ + [1.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 1.0], + [0.0, 1.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 1.0], + ], + 3, + None, + None, + ), + # opposite vectors → similarity -1.0 + ( + [[1.0, 0.0]], + [[-1.0, 0.0]], + 2, + None, + None, + ), + # zero-vector query → both sides must return 0.0 + ( + [[0.0, 0.0]], + [[1.0, 0.0, 0.0, 1.0]], + 2, + None, + None, + ), + ], + ) + def test_spark_tf_parity( + self, + spark_session, + queries, + flat_candidates, + embedding_dim, + input_dtype, + output_dtype, + ): + transformer = PairwiseCosineSimilarityTransformer( + inputCols=["query", "candidates"], + outputCol="scores", + embeddingDim=embedding_dim, + inputDtype=input_dtype, + outputDtype=output_dtype, + ) + + spark_df = spark_session.createDataFrame( + list(zip(queries, flat_candidates)), + ["query", "candidates"], + ) + spark_values = ( + transformer.transform(spark_df) + .select("scores") + .rdd.map(lambda r: r[0]) + .collect() + ) + + tf_queries = tf.constant(queries, dtype=tf.float32) + tf_candidates = tf.constant(flat_candidates, dtype=tf.float32) + keras_values = ( + transformer.get_tf_layer()([tf_queries, tf_candidates]).numpy().tolist() + ) + + np.testing.assert_almost_equal( + spark_values, + keras_values, + decimal=4, + err_msg="Spark and TensorFlow outputs are not equal", + ) diff --git a/tests/kamae/tensorflow/layers/test_array_reduce_max.py b/tests/kamae/tensorflow/layers/test_array_reduce_max.py new file mode 100644 index 0000000..9224392 --- /dev/null +++ b/tests/kamae/tensorflow/layers/test_array_reduce_max.py @@ -0,0 +1,53 @@ +# Copyright [2024] Expedia, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import numpy as np +import tensorflow as tf + +from kamae.tensorflow.layers import ArrayReduceMaxLayer + + +class TestArrayReduceMaxLayer: + def test_returns_maximum_of_each_row(self): + layer = ArrayReduceMaxLayer() + inputs = tf.constant([[3.0, 1.0, 2.0], [0.0, 5.0, 4.0]]) + + result = layer(inputs).numpy() + + np.testing.assert_array_almost_equal(result, [3.0, 5.0]) + + def test_negative_values(self): + layer = ArrayReduceMaxLayer() + inputs = tf.constant([[-3.0, -1.0, -2.0], [-10.0, -5.0, -7.0]]) + + result = layer(inputs).numpy() + + np.testing.assert_array_almost_equal(result, [-1.0, -5.0]) + + def test_single_element_array(self): + layer = ArrayReduceMaxLayer() + inputs = tf.constant([[42.0]]) + + result = layer(inputs).numpy() + + np.testing.assert_array_almost_equal(result, [42.0]) + + def test_default_value_returned_for_nan_input(self): + layer = ArrayReduceMaxLayer(default_value=-99.0) + inputs = tf.constant([[float("nan"), float("nan")]]) + + result = layer(inputs).numpy() + + np.testing.assert_array_almost_equal(result, [-99.0]) diff --git a/tests/kamae/tensorflow/layers/test_pairwise_cosine_similarity.py b/tests/kamae/tensorflow/layers/test_pairwise_cosine_similarity.py new file mode 100644 index 0000000..d57043f --- /dev/null +++ b/tests/kamae/tensorflow/layers/test_pairwise_cosine_similarity.py @@ -0,0 +1,80 @@ +# Copyright [2024] Expedia, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import numpy as np +import tensorflow as tf + +from kamae.tensorflow.layers import PairwiseCosineSimilarityLayer + + +class TestPairwiseCosineSimilarityLayer: + def test_identical_vectors_give_similarity_one(self): + # query [1, 0] vs candidate [1, 0] → cosine = 1.0 + layer = PairwiseCosineSimilarityLayer(embedding_dim=2) + query = tf.constant([[1.0, 0.0]]) + candidates = tf.constant([[1.0, 0.0]]) # 1 candidate, dim=2 + + result = layer([query, candidates]).numpy() + + np.testing.assert_array_almost_equal(result, [[1.0]]) + + def test_opposite_vectors_give_similarity_minus_one(self): + # query [1, 0] vs candidate [-1, 0] → cosine = -1.0 + layer = PairwiseCosineSimilarityLayer(embedding_dim=2) + query = tf.constant([[1.0, 0.0]]) + candidates = tf.constant([[-1.0, 0.0]]) + + result = layer([query, candidates]).numpy() + + np.testing.assert_array_almost_equal(result, [[-1.0]]) + + def test_orthogonal_vectors_give_similarity_zero(self): + # query [1, 0] vs candidate [0, 1] → cosine = 0.0 + layer = PairwiseCosineSimilarityLayer(embedding_dim=2) + query = tf.constant([[1.0, 0.0]]) + candidates = tf.constant([[0.0, 1.0]]) + + result = layer([query, candidates]).numpy() + + np.testing.assert_array_almost_equal(result, [[0.0]]) + + def test_multiple_candidates_flat_packed(self): + # query [1, 0], candidates: [1, 0] and [0, 1] packed as [1, 0, 0, 1] + # expected: [1.0, 0.0] + layer = PairwiseCosineSimilarityLayer(embedding_dim=2) + query = tf.constant([[1.0, 0.0]]) + candidates = tf.constant([[1.0, 0.0, 0.0, 1.0]]) + + result = layer([query, candidates]).numpy() + + np.testing.assert_array_almost_equal(result, [[1.0, 0.0]]) + + def test_zero_query_vector_gives_zero_similarity(self): + layer = PairwiseCosineSimilarityLayer(embedding_dim=2) + query = tf.constant([[0.0, 0.0]]) + candidates = tf.constant([[1.0, 0.0]]) + + result = layer([query, candidates]).numpy() + + np.testing.assert_array_almost_equal(result, [[0.0]]) + + def test_batch_of_queries(self): + # Two rows, each with query vs one candidate of dim=2 + layer = PairwiseCosineSimilarityLayer(embedding_dim=2) + query = tf.constant([[1.0, 0.0], [0.0, 1.0]]) + candidates = tf.constant([[1.0, 0.0], [0.0, 1.0]]) # same vectors + + result = layer([query, candidates]).numpy() + + np.testing.assert_array_almost_equal(result, [[1.0], [1.0]]) diff --git a/tests/kamae/tensorflow/test_layer_serialisation.py b/tests/kamae/tensorflow/test_layer_serialisation.py index 1d297e0..40ec92b 100644 --- a/tests/kamae/tensorflow/test_layer_serialisation.py +++ b/tests/kamae/tensorflow/test_layer_serialisation.py @@ -37,6 +37,7 @@ AbsoluteValueLayer, ArrayConcatenateLayer, ArrayCropLayer, + ArrayReduceMaxLayer, ArraySplitLayer, ArraySubtractMinimumLayer, BearingAngleLayer, @@ -82,6 +83,7 @@ OneHotEncodeLayer, OneHotLayer, OrdinalArrayEncodeLayer, + PairwiseCosineSimilarityLayer, RoundLayer, RoundToDecimalLayer, StandardScaleLayer, @@ -130,6 +132,12 @@ {"array_length": 3, "pad_value": "-1"}, False, ), + ( + ArrayReduceMaxLayer, + [tf.random.normal((32, 10, 5))], + {"default_value": 0.0}, + False, + ), ( BearingAngleLayer, [ @@ -179,6 +187,12 @@ None, False, ), + ( + PairwiseCosineSimilarityLayer, + [tf.random.normal((32, 8)), tf.random.normal((32, 40))], + {"embedding_dim": 8}, + False, + ), (CurrentDateLayer, [tf.constant(100, shape=(100, 10, 1))], None, False), (CurrentDateTimeLayer, [tf.constant(100, shape=(100, 10, 1))], None, True), ( diff --git a/tests/kamae/test_kamae.py b/tests/kamae/test_kamae.py index 349b877..ceb0ee9 100644 --- a/tests/kamae/test_kamae.py +++ b/tests/kamae/test_kamae.py @@ -14,7 +14,7 @@ from kamae import __version__ -semantic_release_version = "2.39.1" +semantic_release_version = "2.40.0" def test_version():