Skip to content

Commit 1322133

Browse files
authored
fix: Fix date type not supported in LiteralConverter::ConvertLiteralsFromString (#217)
1 parent 994b9f5 commit 1322133

4 files changed

Lines changed: 202 additions & 2 deletions

File tree

include/paimon/defs.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ struct PAIMON_EXPORT Options {
113113
static const char BLOB_TARGET_FILE_SIZE[];
114114

115115
/// "partition.default-name" - The default partition name in case the dynamic partition column
116-
/// value is null/empty string.
116+
/// value is null/empty string. Default is "__DEFAULT_PARTITION__".
117117
static const char PARTITION_DEFAULT_NAME[];
118118

119119
/// "file.compression" - The default file compression is zstd. For faster read and write, it is

src/paimon/common/predicate/literal_converter.cpp

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,10 @@ Result<Literal> LiteralConverter::ConvertLiteralsFromString(const FieldType& typ
7272
}
7373
return Literal(value.value());
7474
}
75+
case FieldType::DATE: {
76+
PAIMON_ASSIGN_OR_RAISE(int32_t value, StringUtils::StringToDate(value_str));
77+
return Literal(FieldType::DATE, value);
78+
}
7579
case FieldType::BIGINT: {
7680
auto value = StringUtils::StringToValue<int64_t>(value_str);
7781
if (value == std::nullopt) {
@@ -98,7 +102,8 @@ Result<Literal> LiteralConverter::ConvertLiteralsFromString(const FieldType& typ
98102
return Literal(type, value_str.data(), value_str.size());
99103
default:
100104
return Status::Invalid(
101-
fmt::format("Do not support type {}", FieldTypeUtils::FieldTypeToString(type)));
105+
fmt::format("Do not support type {} in ConvertLiteralsFromString",
106+
FieldTypeUtils::FieldTypeToString(type)));
102107
}
103108
}
104109

src/paimon/common/predicate/literal_converter_test.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -400,6 +400,11 @@ TEST_F(LiteralConverterTest, TestDateLiteral) {
400400
CheckResult(field_array,
401401
std::vector<Literal>({Literal(FieldType::DATE, 0l), Literal(FieldType::DATE, 4l),
402402
Literal(FieldType::DATE, -5l), Literal(FieldType::DATE)}));
403+
CheckLiteralsFromString(
404+
FieldType::DATE, {"1", "0", "1970-01-02", "1969-12-31"},
405+
std::vector<Literal>({Literal(FieldType::DATE, 1), Literal(FieldType::DATE, 0),
406+
Literal(FieldType::DATE, 1), Literal(FieldType::DATE, -1)}));
407+
403408
CheckLiteralFromRow(arrow::date32(), {0, 4, -5, NullType()}, FieldType::DATE,
404409
{Literal(FieldType::DATE, 0l), Literal(FieldType::DATE, 4l),
405410
Literal(FieldType::DATE, -5l), Literal(FieldType::DATE)});

test/inte/write_and_read_inte_test.cpp

Lines changed: 190 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -663,6 +663,196 @@ TEST_P(WriteAndReadInteTest, TestPKWithSequenceFieldPartialInPKField) {
663663
ASSERT_TRUE(success);
664664
}
665665

666+
TEST_P(WriteAndReadInteTest, TestWriteSamePartitionTwiceWithAllBasicTypesForAppend) {
667+
arrow::FieldVector fields = {
668+
arrow::field("f_bool", arrow::boolean()), arrow::field("f_int8", arrow::int8()),
669+
arrow::field("f_int16", arrow::int16()), arrow::field("f_int32", arrow::int32()),
670+
arrow::field("f_int64", arrow::int64()), arrow::field("f_float", arrow::float32()),
671+
arrow::field("f_double", arrow::float64()), arrow::field("f_string", arrow::utf8()),
672+
arrow::field("f_date", arrow::date32()), arrow::field("f_value", arrow::int32())};
673+
auto schema = arrow::schema(fields);
674+
auto [file_format, file_system] = GetParam();
675+
std::map<std::string, std::string> options = {
676+
{Options::FILE_FORMAT, file_format},
677+
{Options::TARGET_FILE_SIZE, "1024"},
678+
{Options::BUCKET, "1"},
679+
{Options::BUCKET_KEY, "f_value"},
680+
{Options::FILE_SYSTEM, file_system},
681+
{Options::PARTITION_GENERATE_LEGACY_NAME, "false"},
682+
{Options::PARTITION_DEFAULT_NAME, "null"}};
683+
if (file_system == "jindo") {
684+
options = AddOptionsForJindo(options);
685+
}
686+
ASSERT_OK_AND_ASSIGN(
687+
auto helper, TestHelper::Create(test_dir_, schema,
688+
/*partition_keys=*/
689+
{"f_bool", "f_int8", "f_int16", "f_int32", "f_int64",
690+
"f_float", "f_double", "f_string", "f_date"},
691+
/*primary_keys=*/{}, options, /*is_streaming_mode=*/true));
692+
int64_t commit_identifier = 0;
693+
694+
{
695+
std::map<std::string, std::string> partition_map = {
696+
{"f_bool", "true"}, {"f_int8", "1"}, {"f_int16", "100"},
697+
{"f_int32", "10000"}, {"f_int64", "100000"}, {"f_float", "1.5"},
698+
{"f_double", "2.5"}, {"f_string", "hello"}, {"f_date", "1970-01-02"}};
699+
700+
// First write to the same partition
701+
std::string data1 = R"([
702+
[true, 1, 100, 10000, 100000, 1.5, 2.5, "hello", 1, 10],
703+
[true, 1, 100, 10000, 100000, 1.5, 2.5, "hello", 1, 20]
704+
])";
705+
ASSERT_OK_AND_ASSIGN(
706+
std::unique_ptr<RecordBatch> batch1,
707+
TestHelper::MakeRecordBatch(arrow::struct_(fields), data1, partition_map,
708+
/*bucket=*/0, {}));
709+
ASSERT_OK_AND_ASSIGN(auto commit_msgs,
710+
helper->WriteAndCommit(std::move(batch1), commit_identifier++,
711+
/*expected_commit_messages=*/std::nullopt));
712+
713+
// Second write to the same partition
714+
std::string data2 = R"([
715+
[true, 1, 100, 10000, 100000, 1.5, 2.5, "hello", 1, 30],
716+
[true, 1, 100, 10000, 100000, 1.5, 2.5, "hello", 1, 40]
717+
])";
718+
ASSERT_OK_AND_ASSIGN(
719+
std::unique_ptr<RecordBatch> batch2,
720+
TestHelper::MakeRecordBatch(arrow::struct_(fields), data2, partition_map,
721+
/*bucket=*/0, {}));
722+
ASSERT_OK_AND_ASSIGN(commit_msgs,
723+
helper->WriteAndCommit(std::move(batch2), commit_identifier++,
724+
/*expected_commit_messages=*/std::nullopt));
725+
}
726+
{
727+
// write all null for partition fields
728+
std::map<std::string, std::string> partition_map = {
729+
{"f_bool", "null"}, {"f_int8", "null"}, {"f_int16", "null"},
730+
{"f_int32", "null"}, {"f_int64", "null"}, {"f_float", "null"},
731+
{"f_double", "null"}, {"f_string", "null"}, {"f_date", "null"}};
732+
733+
// First write to the same partition
734+
std::string data1 = R"([
735+
[null, null, null, null, null, null, null, null, null, 50]
736+
])";
737+
ASSERT_OK_AND_ASSIGN(
738+
std::unique_ptr<RecordBatch> batch1,
739+
TestHelper::MakeRecordBatch(arrow::struct_(fields), data1, partition_map,
740+
/*bucket=*/0, {}));
741+
ASSERT_OK_AND_ASSIGN(auto commit_msgs,
742+
helper->WriteAndCommit(std::move(batch1), commit_identifier++,
743+
/*expected_commit_messages=*/std::nullopt));
744+
745+
// Second write to the same partition
746+
std::string data2 = R"([
747+
[null, null, null, null, null, null, null, null, null, 60]
748+
])";
749+
ASSERT_OK_AND_ASSIGN(
750+
std::unique_ptr<RecordBatch> batch2,
751+
TestHelper::MakeRecordBatch(arrow::struct_(fields), data2, partition_map,
752+
/*bucket=*/0, {}));
753+
ASSERT_OK_AND_ASSIGN(commit_msgs,
754+
helper->WriteAndCommit(std::move(batch2), commit_identifier++,
755+
/*expected_commit_messages=*/std::nullopt));
756+
}
757+
// Read and verify
758+
arrow::FieldVector fields_with_row_kind = fields;
759+
fields_with_row_kind.insert(fields_with_row_kind.begin(),
760+
arrow::field("_VALUE_KIND", arrow::int8()));
761+
auto data_type = arrow::struct_(fields_with_row_kind);
762+
ASSERT_OK_AND_ASSIGN(std::vector<std::shared_ptr<Split>> data_splits,
763+
helper->NewScan(StartupMode::LatestFull(), /*snapshot_id=*/std::nullopt));
764+
std::string expected_data = R"([
765+
[0, true, 1, 100, 10000, 100000, 1.5, 2.5, "hello", 1, 10],
766+
[0, true, 1, 100, 10000, 100000, 1.5, 2.5, "hello", 1, 20],
767+
[0, true, 1, 100, 10000, 100000, 1.5, 2.5, "hello", 1, 30],
768+
[0, true, 1, 100, 10000, 100000, 1.5, 2.5, "hello", 1, 40],
769+
[0, null, null, null, null, null, null, null, null, null, 50],
770+
[0, null, null, null, null, null, null, null, null, null, 60]
771+
])";
772+
ASSERT_OK_AND_ASSIGN(bool success,
773+
helper->ReadAndCheckResult(data_type, data_splits, expected_data));
774+
ASSERT_TRUE(success);
775+
}
776+
777+
TEST_P(WriteAndReadInteTest, TestWriteSamePartitionTwiceWithAllBasicTypesForPk) {
778+
arrow::FieldVector fields = {
779+
arrow::field("f_bool", arrow::boolean()), arrow::field("f_int8", arrow::int8()),
780+
arrow::field("f_int16", arrow::int16()), arrow::field("f_int32", arrow::int32()),
781+
arrow::field("f_int64", arrow::int64()), arrow::field("f_float", arrow::float32()),
782+
arrow::field("f_double", arrow::float64()), arrow::field("f_string", arrow::utf8()),
783+
arrow::field("f_date", arrow::date32()), arrow::field("f_value", arrow::int32()),
784+
arrow::field("pk", arrow::utf8())};
785+
auto schema = arrow::schema(fields);
786+
auto [file_format, file_system] = GetParam();
787+
std::map<std::string, std::string> options = {{Options::FILE_FORMAT, file_format},
788+
{Options::TARGET_FILE_SIZE, "1024"},
789+
{Options::BUCKET, "1"},
790+
{Options::FILE_SYSTEM, file_system},
791+
{Options::PARTITION_GENERATE_LEGACY_NAME, "true"},
792+
{Options::PARTITION_DEFAULT_NAME, "null"}};
793+
if (file_system == "jindo") {
794+
options = AddOptionsForJindo(options);
795+
}
796+
ASSERT_OK_AND_ASSIGN(
797+
auto helper, TestHelper::Create(test_dir_, schema,
798+
/*partition_keys=*/
799+
{"f_bool", "f_int8", "f_int16", "f_int32", "f_int64",
800+
"f_float", "f_double", "f_string", "f_date"},
801+
/*primary_keys=*/
802+
{"pk", "f_bool", "f_int8", "f_int16", "f_int32", "f_int64",
803+
"f_float", "f_double", "f_string", "f_date"},
804+
options, /*is_streaming_mode=*/true));
805+
int64_t commit_identifier = 0;
806+
807+
{
808+
std::map<std::string, std::string> partition_map = {
809+
{"f_bool", "true"}, {"f_int8", "1"}, {"f_int16", "100"},
810+
{"f_int32", "10000"}, {"f_int64", "100000"}, {"f_float", "1.5"},
811+
{"f_double", "2.5"}, {"f_string", "hello"}, {"f_date", "1970-01-02"}};
812+
813+
// First write to the same partition
814+
std::string data1 = R"([
815+
[true, 1, 100, 10000, 100000, 1.5, 2.5, "hello", 1, 10, "pk1"],
816+
[true, 1, 100, 10000, 100000, 1.5, 2.5, "hello", 1, 20, "pk2"]
817+
])";
818+
ASSERT_OK_AND_ASSIGN(
819+
std::unique_ptr<RecordBatch> batch1,
820+
TestHelper::MakeRecordBatch(arrow::struct_(fields), data1, partition_map,
821+
/*bucket=*/0, {}));
822+
ASSERT_OK_AND_ASSIGN(auto commit_msgs,
823+
helper->WriteAndCommit(std::move(batch1), commit_identifier++,
824+
/*expected_commit_messages=*/std::nullopt));
825+
826+
// Second write to the same partition
827+
std::string data2 = R"([
828+
[true, 1, 100, 10000, 100000, 1.5, 2.5, "hello", 1, 30, "pk1"],
829+
[true, 1, 100, 10000, 100000, 1.5, 2.5, "hello", 1, 40, "pk3"]
830+
])";
831+
ASSERT_OK_AND_ASSIGN(
832+
std::unique_ptr<RecordBatch> batch2,
833+
TestHelper::MakeRecordBatch(arrow::struct_(fields), data2, partition_map,
834+
/*bucket=*/0, {}));
835+
ASSERT_OK_AND_ASSIGN(commit_msgs,
836+
helper->WriteAndCommit(std::move(batch2), commit_identifier++,
837+
/*expected_commit_messages=*/std::nullopt));
838+
}
839+
// Read and verify
840+
arrow::FieldVector fields_with_row_kind = fields;
841+
fields_with_row_kind.insert(fields_with_row_kind.begin(),
842+
arrow::field("_VALUE_KIND", arrow::int8()));
843+
auto data_type = arrow::struct_(fields_with_row_kind);
844+
ASSERT_OK_AND_ASSIGN(std::vector<std::shared_ptr<Split>> data_splits,
845+
helper->NewScan(StartupMode::LatestFull(), /*snapshot_id=*/std::nullopt));
846+
std::string expected_data = R"([
847+
[0, true, 1, 100, 10000, 100000, 1.5, 2.5, "hello", 1, 30, "pk1"],
848+
[0, true, 1, 100, 10000, 100000, 1.5, 2.5, "hello", 1, 20, "pk2"],
849+
[0, true, 1, 100, 10000, 100000, 1.5, 2.5, "hello", 1, 40, "pk3"]
850+
])";
851+
ASSERT_OK_AND_ASSIGN(bool success,
852+
helper->ReadAndCheckResult(data_type, data_splits, expected_data));
853+
ASSERT_TRUE(success);
854+
}
855+
666856
std::vector<std::pair<std::string, std::string>> GetTestValuesForWriteAndReadInteTest() {
667857
std::vector<std::pair<std::string, std::string>> values = {{"parquet", "local"}};
668858
#ifdef PAIMON_ENABLE_ORC

0 commit comments

Comments
 (0)