Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,33 @@



## v2.40.0 (2026-05-06)

### Documentation

* docs: README restructured (#39)

- Installation moved further up, above the massive table
- Quick start shows a quick example
- Sklearn removed as a proper usage pattern
- Reduced size by ~40%
- Much more readable ([`28c70ba`](https://github.com/ExpediaGroup/kamae/commit/28c70ba817d079b21fe668d5fc11dbf1492d1c6e))

### Feature

* feat: pairwise sim and array reduce max (#44)

* adding modules for pairwise similarity

* tests for pairwise similarity

* adding the new modules on README and tests of serialisation

* formatting issues

* fix header ([`579612b`](https://github.com/ExpediaGroup/kamae/commit/579612b13b3e638522ee577abbeb4d592bfcfce3))


## v2.39.1 (2026-04-14)

### Fix
Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ For Scikit-learn support (experimental, unmaintained), see [sklearn examples](ex
| AbsoluteValue | Applies the `abs(x)` transform. | [Link](src/kamae/tensorflow/layers/absolute_value.py) | [Link](src/kamae/spark/transformers/absolute_value.py) | Not yet implemented |
| ArrayConcatenate | Assembles multiple features into a single array. | [Link](src/kamae/tensorflow/layers/array_concatenate.py) | [Link](src/kamae/spark/transformers/array_concatenate.py) | [Link](src/kamae/sklearn/transformers/array_concatenate.py) |
| ArrayCrop | Crops or pads a feature array to a consistent size. | [Link](src/kamae/tensorflow/layers/array_crop.py) | [Link](src/kamae/spark/transformers/array_crop.py) | Not yet implemented |
| ArrayReduceMax | Reduces the last dimension of a tensor by taking the maximum. | [Link](src/kamae/tensorflow/layers/array_reduce_max.py) | [Link](src/kamae/spark/transformers/array_reduce_max.py) | Not yet implemented |
| ArraySplit | Splits a feature array into multiple features. | [Link](src/kamae/tensorflow/layers/array_split.py) | [Link](src/kamae/spark/transformers/array_split.py) | [Link](src/kamae/sklearn/transformers/array_split.py) |
| ArraySubtractMinimum | Subtracts the minimum element in an array from therest to compute a timestamp difference. Ignores padded values. | [Link](src/kamae/tensorflow/layers/array_subtract_minimum.py) | [Link](src/kamae/spark/transformers/array_subtract_minimum.py) | Not yet implemented |
| BearingAngle | Compute the bearing angle (https://en.wikipedia.org/wiki/Bearing_(navigation)) between two pairs of lat/long. | [Link](src/kamae/tensorflow/layers/bearing_angle.py) | [Link](src/kamae/spark/transformers/bearing_angle.py) | Not yet implemented |
Expand Down Expand Up @@ -124,6 +125,7 @@ For Scikit-learn support (experimental, unmaintained), see [sklearn examples](ex
| NumericalIfStatement | Performs a simple if else statement witha given operator. Value to check, result if true or false can be constants or features. | [Link](src/kamae/tensorflow/layers/numerical_if_statement.py) | [Link](src/kamae/spark/transformers/numerical_if_statement.py) | Not yet implemented |
| OneHotEncode | Transforms a string to a one-hot array. | [Link](src/kamae/tensorflow/layers/one_hot_encode.py) | [Link](src/kamae/spark/estimators/one_hot_encode.py) | Not yet implemented |
| OrdinalArrayEncode | Encodes strings in an array according to the order in which they appear. Only for 2D tensors. | [Link](src/kamae/tensorflow/layers/ordinal_array_encoder.py) | [Link](src/kamae/spark/estimators/ordinal_array_encoder.py) | Not yet implemented |
| PairwiseCosineSimilarity | Computes the cosine similarity between an embedding and a list of candidate embeddings. | [Link](src/kamae/tensorflow/layers/pairwise_cosine_similarity.py) | [Link](src/kamae/spark/transformers/pairwise_cosine_similarity.py) | Not yet implemented |
| Round | Rounds a floating feature to the nearest integer using `ceil`, `floor` or a standard `round` op. | [Link](src/kamae/tensorflow/layers/round.py) | [Link](src/kamae/spark/transformers/round.py) | Not yet implemented |
| RoundToDecimal | Rounds a floating feature to the nearest decimal precision. | [Link](src/kamae/tensorflow/layers/round_to_decimal.py) | [Link](src/kamae/spark/transformers/round_to_decimal.py) | Not yet implemented |
| SharedOneHotEncode | Transforms a string to a one-hot array, using labels across multiple inputs to determine the one-hot size. | [Link](src/kamae/tensorflow/layers/one_hot_encode.py) | [Link](src/kamae/spark/estimators/shared_one_hot_encode.py) | Not yet implemented |
Expand Down
2 changes: 1 addition & 1 deletion src/kamae/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,5 @@
transformation layers.
"""

__version__ = "2.39.1"
__version__ = "2.40.0"
__name__ = "kamae"
4 changes: 4 additions & 0 deletions src/kamae/spark/transformers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from .absolute_value import AbsoluteValueTransformer # noqa: F401
from .array_concatenate import ArrayConcatenateTransformer # noqa: F401
from .array_crop import ArrayCropTransformer # noqa: F401
from .array_reduce_max import ArrayReduceMaxTransformer # noqa: F401
from .array_split import ArraySplitTransformer # noqa: F401
from .array_subtract_minimum import ArraySubtractMinimumTransformer # noqa: F401
from .base import BaseTransformer # noqa: F401
Expand Down Expand Up @@ -64,6 +65,9 @@
from .numerical_if_statement import NumericalIfStatementTransformer # noqa: F401
from .one_hot_encode import OneHotEncodeTransformer # noqa: F401
from .ordinal_array_encode import OrdinalArrayEncodeTransformer # noqa: F401
from .pairwise_cosine_similarity import ( # noqa: F401
PairwiseCosineSimilarityTransformer,
)
from .round import RoundTransformer # noqa: F401
from .round_to_decimal import RoundToDecimalTransformer # noqa: F401
from .shared_one_hot_encode import SharedOneHotEncodeTransformer # noqa: F401
Expand Down
95 changes: 95 additions & 0 deletions src/kamae/spark/transformers/array_reduce_max.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
# Copyright [2024] Expedia, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import List, Optional

import pyspark.sql.functions as F
import tensorflow as tf
from pyspark import keyword_only
from pyspark.ml.param import Param, Params, TypeConverters
from pyspark.sql import DataFrame
from pyspark.sql.types import DataType, DoubleType, FloatType

from kamae.spark.params import SingleInputSingleOutputParams
from kamae.spark.utils import single_input_single_output_array_transform
from kamae.tensorflow.layers import ArrayReduceMaxLayer

from .base import BaseTransformer


class ArrayReduceMaxTransformer(
BaseTransformer,
SingleInputSingleOutputParams,
):
"""
Reduces an array column to its maximum element.

Input: Array[Float/Double] of size N.
Output: Float/Double scalar (the maximum element).

Returns defaultValue when the array is empty or null.
"""

defaultValue = Param(
Params._dummy(),
"defaultValue",
"Value to return when the array is empty or null.",
typeConverter=TypeConverters.toFloat,
)

@keyword_only
def __init__(
self,
inputCol: Optional[str] = None,
outputCol: Optional[str] = None,
inputDtype: Optional[str] = None,
outputDtype: Optional[str] = None,
layerName: Optional[str] = None,
defaultValue: float = 0.0,
) -> None:
super().__init__()
self._setDefault(defaultValue=0.0)
kwargs = self._input_kwargs
self.setParams(**kwargs)

def setDefaultValue(self, value: float) -> "ArrayReduceMaxTransformer":
return self._set(defaultValue=value)

def getDefaultValue(self) -> float:
return self.getOrDefault(self.defaultValue)

@property
def compatible_dtypes(self) -> Optional[List[DataType]]:
return [FloatType(), DoubleType()]

def _transform(self, dataset: DataFrame) -> DataFrame:
input_col = F.col(self.getInputCol())
default = self.getDefaultValue()

output_col = single_input_single_output_array_transform(
input_col=input_col,
input_col_datatype=self.get_column_datatype(
dataset=dataset, column_name=self.getInputCol()
),
func=lambda x: F.coalesce(F.array_max(x), F.lit(default)),
)
return dataset.withColumn(self.getOutputCol(), output_col)

def get_tf_layer(self) -> tf.keras.layers.Layer:
return ArrayReduceMaxLayer(
name=self.getLayerName(),
input_dtype=self.getInputTFDtype(),
output_dtype=self.getOutputTFDtype(),
default_value=self.getDefaultValue(),
)
136 changes: 136 additions & 0 deletions src/kamae/spark/transformers/pairwise_cosine_similarity.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
# Copyright [2024] Expedia, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import List, Optional

import pyspark.sql.functions as F
import tensorflow as tf
from pyspark import keyword_only
from pyspark.ml.param import Param, Params, TypeConverters
from pyspark.sql import Column, DataFrame
from pyspark.sql.types import ArrayType, DataType, DoubleType, FloatType

from kamae.spark.params import MultiInputSingleOutputParams
from kamae.tensorflow.layers import PairwiseCosineSimilarityLayer

from .base import BaseTransformer


class PairwiseCosineSimilarityTransformer(
BaseTransformer,
MultiInputSingleOutputParams,
):
"""
Computes pairwise cosine similarity between a query embedding and each
candidate embedding packed into a flat array.

Input 0: query embedding as Array[Float] of size D.
Input 1: flat candidate embeddings as Array[Float] of size N*D.
Output: Array[Float] of size N containing cosine similarities.
"""

embeddingDim = Param(
Params._dummy(),
"embeddingDim",
"Dimension of each embedding vector.",
typeConverter=TypeConverters.toInt,
)

@keyword_only
def __init__(
self,
inputCols: Optional[List[str]] = None,
outputCol: Optional[str] = None,
inputDtype: Optional[str] = None,
outputDtype: Optional[str] = None,
layerName: Optional[str] = None,
embeddingDim: Optional[int] = None,
) -> None:
super().__init__()
kwargs = self._input_kwargs
self.setParams(**kwargs)

def setEmbeddingDim(self, value: int) -> "PairwiseCosineSimilarityTransformer":
return self._set(embeddingDim=value)

def getEmbeddingDim(self) -> int:
return self.getOrDefault(self.embeddingDim)

@property
def compatible_dtypes(self) -> Optional[List[DataType]]:
return [FloatType(), DoubleType()]

def setInputCols(self, value: List[str]) -> "PairwiseCosineSimilarityTransformer":
if len(value) != 2:
raise ValueError(
f"Expected 2 input columns, received {len(value)} instead."
)
return self._set(inputCols=value)

def _transform(self, dataset: DataFrame) -> DataFrame:
input_col_names = self.getInputCols()
embedding_dim = self.getEmbeddingDim()

query_col = F.col(input_col_names[0])
flat_candidates_col = F.col(input_col_names[1])

for col_name in input_col_names:
dtype = self.get_column_datatype(dataset=dataset, column_name=col_name)
if not isinstance(dtype, ArrayType):
raise TypeError(f"Expected ArrayType for {col_name}, got {dtype}.")

num_candidates = (F.size(flat_candidates_col) / F.lit(embedding_dim)).cast(
"int"
)
indices = F.sequence(F.lit(0), num_candidates - F.lit(1))

query_norm = F.sqrt(
F.aggregate(
query_col,
F.lit(0.0).cast("double"),
lambda acc, x: acc + (x * x).cast("double"),
)
)

def cosine_sim_at_index(idx: Column) -> Column:
candidate = F.slice(
flat_candidates_col,
idx * F.lit(embedding_dim) + F.lit(1),
embedding_dim,
)
zipped = F.arrays_zip(query_col.alias("q"), candidate.alias("c"))
dot = F.aggregate(
zipped,
F.lit(0.0).cast("double"),
lambda acc, pair: acc + (pair["q"] * pair["c"]).cast("double"),
)
cand_norm = F.sqrt(
F.aggregate(
candidate,
F.lit(0.0).cast("double"),
lambda acc, x: acc + (x * x).cast("double"),
)
)
return F.coalesce(dot / (query_norm * cand_norm), F.lit(0.0))

similarities = F.transform(indices, cosine_sim_at_index)
return dataset.withColumn(self.getOutputCol(), similarities)

def get_tf_layer(self) -> tf.keras.layers.Layer:
return PairwiseCosineSimilarityLayer(
name=self.getLayerName(),
input_dtype=self.getInputTFDtype(),
output_dtype=self.getOutputTFDtype(),
embedding_dim=self.getEmbeddingDim(),
)
2 changes: 2 additions & 0 deletions src/kamae/tensorflow/layers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from .absolute_value import AbsoluteValueLayer # noqa: F401
from .array_concatenate import ArrayConcatenateLayer # noqa: F401
from .array_crop import ArrayCropLayer # noqa: F401
from .array_reduce_max import ArrayReduceMaxLayer # noqa: F401
from .array_split import ArraySplitLayer # noqa: F401
from .array_subtract_minimum import ArraySubtractMinimumLayer # noqa: F401
from .bearing_angle import BearingAngleLayer # noqa: F401
Expand Down Expand Up @@ -59,6 +60,7 @@
from .numerical_if_statement import NumericalIfStatementLayer # noqa: F401
from .one_hot_encode import OneHotEncodeLayer, OneHotLayer # noqa: F401
from .ordinal_array_encode import OrdinalArrayEncodeLayer # noqa: F401
from .pairwise_cosine_similarity import PairwiseCosineSimilarityLayer # noqa: F401
from .round import RoundLayer # noqa: F401
from .round_to_decimal import RoundToDecimalLayer # noqa: F401
from .standard_scale import StandardScaleLayer # noqa: F401
Expand Down
64 changes: 64 additions & 0 deletions src/kamae/tensorflow/layers/array_reduce_max.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
# Copyright [2024] Expedia, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Any, Dict, Iterable, List, Optional

import tensorflow as tf

import kamae
from kamae.tensorflow.typing import Tensor
from kamae.tensorflow.utils import enforce_single_tensor_input

from .base import BaseLayer


@tf.keras.utils.register_keras_serializable(package=kamae.__name__)
class ArrayReduceMaxLayer(BaseLayer):
"""
Reduces the last dimension of a tensor by taking the maximum.

Input: (..., N)
Output: (...)
"""

def __init__(
self,
name: Optional[str] = None,
input_dtype: Optional[str] = None,
output_dtype: Optional[str] = None,
default_value: float = 0.0,
**kwargs: Any,
) -> None:
super().__init__(
name=name, input_dtype=input_dtype, output_dtype=output_dtype, **kwargs
)
self.default_value = default_value

@property
def compatible_dtypes(self) -> Optional[List[tf.dtypes.DType]]:
return [tf.bfloat16, tf.float16, tf.float32, tf.float64]

@enforce_single_tensor_input
def _call(self, inputs: Tensor, **kwargs: Any) -> Tensor:
result = tf.reduce_max(inputs, axis=-1)
return tf.where(
tf.math.is_nan(result),
tf.constant(self.default_value, dtype=result.dtype),
result,
)

def get_config(self) -> Dict[str, Any]:
config = super().get_config()
config.update({"default_value": self.default_value})
return config
Loading
Loading