Skip to content

Commit ea8b49a

Browse files
committed
portable date python changes
1 parent c66a62f commit ea8b49a

5 files changed

Lines changed: 38 additions & 2 deletions

File tree

sdks/python/apache_beam/io/jdbc.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -360,6 +360,10 @@ def __init__(
360360
of the output PCollection elements. This bypasses automatic
361361
schema inference during pipeline construction.
362362
"""
363+
# override new portable Date type with the current Jdbc type
364+
# TODO: switch JdbcIO to return portable Date type
365+
LogicalType.register_logical_type(JdbcDateType)
366+
363367
classpath = classpath or DEFAULT_JDBC_CLASSPATH
364368

365369
dataSchema = None

sdks/python/apache_beam/portability/common_urns.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,3 +92,4 @@
9292
var_bytes = LogicalTypes.Enum.VAR_BYTES
9393
fixed_char = LogicalTypes.Enum.FIXED_CHAR
9494
var_char = LogicalTypes.Enum.VAR_CHAR
95+
date = LogicalTypes.Enum.DATE

sdks/python/apache_beam/transforms/managed_iceberg_it_test.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
# limitations under the License.
1616
#
1717

18+
import datetime
1819
import os
1920
import unittest
2021
import uuid
@@ -49,7 +50,8 @@ def _create_row(self, num: int):
4950
bytes_=bytes(num),
5051
bool_=(num % 2 == 0),
5152
float_=(num + float(num) / 100),
52-
arr_=[num, num, num])
53+
arr_=[num, num, num],
54+
date_=datetime.date.today() - datetime.timedelta(days=num)))
5355

5456
def test_write_read_pipeline(self):
5557
iceberg_config = {

sdks/python/apache_beam/typehints/schemas.py

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
bytes <-----> BYTES
3535
ByteString ------> BYTES
3636
Timestamp <-----> LogicalType(urn="beam:logical_type:micros_instant:v1")
37+
datetime.date <---> LogicalType(urn="beam:logical_type:date:v1")
3738
Decimal <-----> LogicalType(urn="beam:logical_type:fixed_decimal:v1")
3839
Mapping <-----> MapType
3940
Sequence <-----> ArrayType
@@ -1004,6 +1005,33 @@ def to_language_type(self, value):
10041005
return Timestamp(seconds=int(value.seconds), micros=int(value.micros))
10051006

10061007

1008+
@LogicalType._register_internal
1009+
class Date(NoArgumentLogicalType[datetime.date, np.int64]):
1010+
"""Date logical type that handles ``datetime.date``, days since epoch."""
1011+
EPOCH = datetime.date(1970, 1, 1)
1012+
1013+
@classmethod
1014+
def urn(cls):
1015+
return common_urns.date.urn
1016+
1017+
@classmethod
1018+
def representation_type(cls):
1019+
# type: () -> type
1020+
return np.int64
1021+
1022+
@classmethod
1023+
def language_type(cls):
1024+
return datetime.date
1025+
1026+
def to_representation_type(self, value):
1027+
# type: (datetime.date) -> np.int64
1028+
return (value - self.EPOCH).days
1029+
1030+
def to_language_type(self, value):
1031+
# type: (np.int64) -> datetime.date
1032+
return self.EPOCH + datetime.timedelta(days=value)
1033+
1034+
10071035
@LogicalType._register_internal
10081036
class PythonCallable(NoArgumentLogicalType[PythonCallableWithSource, str]):
10091037
"""A logical type for PythonCallableSource objects."""
@@ -1244,7 +1272,6 @@ def argument(self):
12441272
# TODO: A temporary fix for missing jdbc logical types.
12451273
# See the discussion in https://github.com/apache/beam/issues/35738 for
12461274
# more detail.
1247-
@LogicalType._register_internal
12481275
class JdbcDateType(LogicalType[datetime.date, MillisInstant, str]):
12491276
"""
12501277
For internal use only; no backwards-compatibility guarantees.

sdks/python/apache_beam/typehints/schemas_test.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
# pytype: skip-file
2121

2222
import dataclasses
23+
import datetime
2324
import itertools
2425
import pickle
2526
import unittest
@@ -105,6 +106,7 @@ class ComplexSchema(NamedTuple):
105106
optional_array: Optional[Sequence[np.float32]]
106107
array_optional: Sequence[Optional[bool]]
107108
timestamp: Timestamp
109+
date: datetime.date
108110

109111

110112
def get_test_beam_fieldtype_protos():

0 commit comments

Comments
 (0)