Skip to content

Commit 012f825

Browse files
authored
[Java] Optimize beam pubsub message to serialized pubsub protobuf message (#37896)
1 parent 3aad50b commit 012f825

2 files changed

Lines changed: 220 additions & 8 deletions

File tree

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessages.java

Lines changed: 44 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,11 @@
1818
package org.apache.beam.sdk.io.gcp.pubsub;
1919

2020
import com.google.protobuf.ByteString;
21+
import com.google.protobuf.CodedOutputStream;
2122
import com.google.protobuf.InvalidProtocolBufferException;
23+
import java.io.IOException;
2224
import java.util.Map;
25+
import javax.annotation.Nullable;
2326
import org.apache.beam.sdk.transforms.SerializableFunction;
2427
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
2528

@@ -28,26 +31,58 @@ public final class PubsubMessages {
2831
private PubsubMessages() {}
2932

3033
public static com.google.pubsub.v1.PubsubMessage toProto(PubsubMessage input) {
31-
Map<String, String> attributes = input.getAttributeMap();
34+
@Nullable Map<String, String> attributes = input.getAttributeMap();
3235
com.google.pubsub.v1.PubsubMessage.Builder message =
3336
com.google.pubsub.v1.PubsubMessage.newBuilder()
3437
.setData(ByteString.copyFrom(input.getPayload()));
3538
// TODO(https://github.com/apache/beam/issues/19787) this should not be null
36-
if (attributes != null) {
39+
if (attributes != null && !attributes.isEmpty()) {
3740
message.putAllAttributes(attributes);
3841
}
39-
String messageId = input.getMessageId();
40-
if (messageId != null) {
42+
@Nullable String messageId = input.getMessageId();
43+
if (messageId != null && !messageId.isEmpty()) {
4144
message.setMessageId(messageId);
4245
}
4346

44-
String orderingKey = input.getOrderingKey();
45-
if (orderingKey != null) {
47+
@Nullable String orderingKey = input.getOrderingKey();
48+
if (orderingKey != null && !orderingKey.isEmpty()) {
4649
message.setOrderingKey(orderingKey);
4750
}
4851
return message.build();
4952
}
5053

54+
// Optimization of toProto(input).toByteArray()
55+
private static byte[] toSerializedPubsubMessageProto(PubsubMessage input) {
56+
@Nullable Map<String, String> attributes = input.getAttributeMap();
57+
@Nullable String messageId = input.getMessageId();
58+
@Nullable String orderingKey = input.getOrderingKey();
59+
if ((attributes == null || attributes.isEmpty())
60+
&& (messageId == null || messageId.isEmpty())
61+
&& (orderingKey == null || orderingKey.isEmpty())) {
62+
// Optimize the case where we are just sending a payload.
63+
byte[] payload = input.getPayload();
64+
if (payload == null || payload.length == 0) {
65+
return new byte[0];
66+
}
67+
int size =
68+
CodedOutputStream.computeByteArraySize(
69+
com.google.pubsub.v1.PubsubMessage.DATA_FIELD_NUMBER, payload);
70+
byte[] serialized = new byte[size];
71+
try {
72+
CodedOutputStream output = CodedOutputStream.newInstance(serialized);
73+
output.writeByteArray(com.google.pubsub.v1.PubsubMessage.DATA_FIELD_NUMBER, payload);
74+
output.checkNoSpaceLeft();
75+
} catch (IOException e) {
76+
// Should not happen since we are writing to a byte array of the exact size.
77+
throw new RuntimeException(
78+
"Unexpected error while serializing PubsubMessage to a byte array.", e);
79+
}
80+
return serialized;
81+
}
82+
// Fallback to general case by building up a protobuf and serializing it.
83+
return toProto(input).toByteArray();
84+
}
85+
5186
public static PubsubMessage fromProto(com.google.pubsub.v1.PubsubMessage input) {
5287
return new PubsubMessage(
5388
input.getData().toByteArray(),
@@ -56,12 +91,13 @@ public static PubsubMessage fromProto(com.google.pubsub.v1.PubsubMessage input)
5691
input.getOrderingKey());
5792
}
5893

59-
// Convert the PubsubMessage to a PubsubMessage proto, then return its serialized representation.
94+
// Convert the beam PubsubMessage to a serialized com.google.pubsub.v1.PubsubMessage proto
95+
// representation.
6096
public static class ParsePayloadAsPubsubMessageProto
6197
implements SerializableFunction<PubsubMessage, byte[]> {
6298
@Override
6399
public byte[] apply(PubsubMessage input) {
64-
return toProto(input).toByteArray();
100+
return toSerializedPubsubMessageProto(input);
65101
}
66102
}
67103

Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.sdk.io.gcp.pubsub;
19+
20+
import static org.junit.Assert.assertArrayEquals;
21+
import static org.junit.Assert.assertEquals;
22+
import static org.junit.Assert.assertTrue;
23+
24+
import java.nio.charset.StandardCharsets;
25+
import java.util.Collections;
26+
import java.util.Map;
27+
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
28+
import org.junit.Test;
29+
import org.junit.runner.RunWith;
30+
import org.junit.runners.JUnit4;
31+
32+
/** Tests for {@link PubsubMessages}. */
33+
@RunWith(JUnit4.class)
34+
public class PubsubMessagesTest {
35+
36+
@Test
37+
public void testRoundTripToProto() {
38+
byte[] payload = "test-payload".getBytes(StandardCharsets.UTF_8);
39+
Map<String, String> attributes = ImmutableMap.of("key1", "value1", "key2", "value2");
40+
String messageId = "test-message-id";
41+
String orderingKey = "test-ordering-key";
42+
43+
PubsubMessage originalMessage = new PubsubMessage(payload, attributes, messageId, orderingKey);
44+
PubsubMessage roundTrippedMessage =
45+
PubsubMessages.fromProto(PubsubMessages.toProto(originalMessage));
46+
47+
assertArrayEquals(originalMessage.getPayload(), roundTrippedMessage.getPayload());
48+
assertEquals(originalMessage.getAttributeMap(), roundTrippedMessage.getAttributeMap());
49+
assertEquals(originalMessage.getMessageId(), roundTrippedMessage.getMessageId());
50+
assertEquals(originalMessage.getOrderingKey(), roundTrippedMessage.getOrderingKey());
51+
}
52+
53+
@Test
54+
public void testRoundTripToProto_emptyAttributes() {
55+
byte[] payload = "test-payload".getBytes(StandardCharsets.UTF_8);
56+
Map<String, String> attributes = Collections.emptyMap();
57+
String messageId = "test-message-id";
58+
String orderingKey = "test-ordering-key";
59+
60+
PubsubMessage originalMessage = new PubsubMessage(payload, attributes, messageId, orderingKey);
61+
PubsubMessage roundTrippedMessage =
62+
PubsubMessages.fromProto(PubsubMessages.toProto(originalMessage));
63+
64+
assertArrayEquals(originalMessage.getPayload(), roundTrippedMessage.getPayload());
65+
assertEquals(originalMessage.getAttributeMap(), roundTrippedMessage.getAttributeMap());
66+
assertEquals(originalMessage.getMessageId(), roundTrippedMessage.getMessageId());
67+
assertEquals(originalMessage.getOrderingKey(), roundTrippedMessage.getOrderingKey());
68+
}
69+
70+
@Test
71+
public void testRoundTripToProto_nullAttributes() {
72+
byte[] payload = "test-payload".getBytes(StandardCharsets.UTF_8);
73+
String messageId = "test-message-id";
74+
String orderingKey = "test-ordering-key";
75+
76+
PubsubMessage originalMessage = new PubsubMessage(payload, null, messageId, orderingKey);
77+
PubsubMessage roundTrippedMessage =
78+
PubsubMessages.fromProto(PubsubMessages.toProto(originalMessage));
79+
80+
assertArrayEquals(originalMessage.getPayload(), roundTrippedMessage.getPayload());
81+
// PubsubMessage.fromProto returns an empty map when proto attributes map is empty
82+
assertEquals(Collections.emptyMap(), roundTrippedMessage.getAttributeMap());
83+
assertEquals(originalMessage.getMessageId(), roundTrippedMessage.getMessageId());
84+
assertEquals(originalMessage.getOrderingKey(), roundTrippedMessage.getOrderingKey());
85+
}
86+
87+
@Test
88+
public void testRoundTripToProto_nullMessageIdAndOrderingKey() {
89+
byte[] payload = "test-payload".getBytes(StandardCharsets.UTF_8);
90+
Map<String, String> attributes = ImmutableMap.of("key", "value");
91+
92+
PubsubMessage originalMessage = new PubsubMessage(payload, attributes, null, null);
93+
PubsubMessage roundTrippedMessage =
94+
PubsubMessages.fromProto(PubsubMessages.toProto(originalMessage));
95+
96+
assertArrayEquals(originalMessage.getPayload(), roundTrippedMessage.getPayload());
97+
assertEquals(originalMessage.getAttributeMap(), roundTrippedMessage.getAttributeMap());
98+
// protobuf translates null string into empty string.
99+
assertTrue(roundTrippedMessage.getMessageId().isEmpty());
100+
assertTrue(roundTrippedMessage.getOrderingKey().isEmpty());
101+
}
102+
103+
@Test
104+
public void testRoundTripToProto_messageIdAndOrderingKey() {
105+
byte[] payload = "test-payload".getBytes(StandardCharsets.UTF_8);
106+
Map<String, String> attributes = ImmutableMap.of("key", "value");
107+
108+
PubsubMessage originalMessage =
109+
new PubsubMessage(payload, attributes, "messageId", "orderingKey");
110+
PubsubMessage roundTrippedMessage =
111+
PubsubMessages.fromProto(PubsubMessages.toProto(originalMessage));
112+
113+
assertArrayEquals(originalMessage.getPayload(), roundTrippedMessage.getPayload());
114+
assertEquals(originalMessage.getAttributeMap(), roundTrippedMessage.getAttributeMap());
115+
assertEquals(originalMessage.getOrderingKey(), roundTrippedMessage.getOrderingKey());
116+
assertEquals(originalMessage.getMessageId(), roundTrippedMessage.getMessageId());
117+
}
118+
119+
@Test
120+
public void testParsePayloadAsPubsubMessageProto() {
121+
byte[] payload = "test-payload".getBytes(StandardCharsets.UTF_8);
122+
PubsubMessage originalMessage = new PubsubMessage(payload, null, null, null);
123+
124+
byte[] serialized =
125+
new PubsubMessages.ParsePayloadAsPubsubMessageProto().apply(originalMessage);
126+
PubsubMessage roundTrippedMessage =
127+
new PubsubMessages.ParsePubsubMessageProtoAsPayload().apply(serialized);
128+
129+
assertArrayEquals(originalMessage.getPayload(), roundTrippedMessage.getPayload());
130+
assertEquals(Collections.emptyMap(), roundTrippedMessage.getAttributeMap());
131+
assertTrue(
132+
roundTrippedMessage.getMessageId() == null || roundTrippedMessage.getMessageId().isEmpty());
133+
assertTrue(
134+
roundTrippedMessage.getOrderingKey() == null
135+
|| roundTrippedMessage.getOrderingKey().isEmpty());
136+
}
137+
138+
@Test
139+
public void testParsePayloadAsPubsubMessageProto_emptyPayload() {
140+
byte[] payload = new byte[0];
141+
PubsubMessage originalMessage = new PubsubMessage(payload, null, null, null);
142+
143+
byte[] serialized =
144+
new PubsubMessages.ParsePayloadAsPubsubMessageProto().apply(originalMessage);
145+
PubsubMessage roundTrippedMessage =
146+
new PubsubMessages.ParsePubsubMessageProtoAsPayload().apply(serialized);
147+
148+
assertArrayEquals(originalMessage.getPayload(), roundTrippedMessage.getPayload());
149+
assertEquals(Collections.emptyMap(), roundTrippedMessage.getAttributeMap());
150+
assertTrue(
151+
roundTrippedMessage.getMessageId() == null || roundTrippedMessage.getMessageId().isEmpty());
152+
assertTrue(
153+
roundTrippedMessage.getOrderingKey() == null
154+
|| roundTrippedMessage.getOrderingKey().isEmpty());
155+
}
156+
157+
@Test
158+
public void testParsePayloadAsPubsubMessageProto_withAttributes() {
159+
byte[] payload = "test-payload".getBytes(StandardCharsets.UTF_8);
160+
Map<String, String> attributes = ImmutableMap.of("key1", "value1", "key2", "value2");
161+
PubsubMessage originalMessage = new PubsubMessage(payload, attributes, null, null);
162+
163+
byte[] serialized =
164+
new PubsubMessages.ParsePayloadAsPubsubMessageProto().apply(originalMessage);
165+
PubsubMessage roundTrippedMessage =
166+
new PubsubMessages.ParsePubsubMessageProtoAsPayload().apply(serialized);
167+
168+
assertArrayEquals(originalMessage.getPayload(), roundTrippedMessage.getPayload());
169+
assertEquals(originalMessage.getAttributeMap(), roundTrippedMessage.getAttributeMap());
170+
assertTrue(
171+
roundTrippedMessage.getMessageId() == null || roundTrippedMessage.getMessageId().isEmpty());
172+
assertTrue(
173+
roundTrippedMessage.getOrderingKey() == null
174+
|| roundTrippedMessage.getOrderingKey().isEmpty());
175+
}
176+
}

0 commit comments

Comments
 (0)