@@ -610,8 +610,7 @@ public void commitSegmentFile(String realtimeTableName, CommittingSegmentDescrip
610610 // Copy the segment file to the controller
611611 String segmentLocation = committingSegmentDescriptor .getSegmentLocation ();
612612 Preconditions .checkArgument (segmentLocation != null , "Segment location must be provided" );
613- if (segmentLocation .regionMatches (true , 0 , CommonConstants .Segment .PEER_SEGMENT_DOWNLOAD_SCHEME , 0 ,
614- CommonConstants .Segment .PEER_SEGMENT_DOWNLOAD_SCHEME .length ())) {
613+ if (isPeerUrl (segmentLocation )) {
615614 LOGGER .info ("No moving needed for segment on peer servers: {}" , segmentLocation );
616615 return ;
617616 }
@@ -635,6 +634,11 @@ public void commitSegmentFile(String realtimeTableName, CommittingSegmentDescrip
635634 committingSegmentDescriptor .setSegmentLocation (uriToMoveTo );
636635 }
637636
637+ private boolean isPeerUrl (String segmentLocation ) {
638+ return segmentLocation .regionMatches (true , 0 , CommonConstants .Segment .PEER_SEGMENT_DOWNLOAD_SCHEME , 0 ,
639+ CommonConstants .Segment .PEER_SEGMENT_DOWNLOAD_SCHEME .length ());
640+ }
641+
638642 /**
639643 * This method is invoked after the realtime segment is uploaded but before a response is sent to the server.
640644 * It updates the propertystore segment metadata from IN_PROGRESS to DONE, and also creates new propertystore
@@ -674,9 +678,9 @@ private void commitSegmentMetadataInternal(String realtimeTableName,
674678 // Step-1: Update PROPERTYSTORE
675679 LOGGER .info ("Committing segment metadata for segment: {}" , committingSegmentName );
676680 long startTimeNs1 = System .nanoTime ();
677- SegmentZKMetadata committingSegmentZKMetadata =
678- toCommitting ? updateSegmentZKMetadataToCommitting (realtimeTableName , committingSegmentDescriptor )
679- : updateSegmentZKMetadataToDone (realtimeTableName , committingSegmentDescriptor , Status .IN_PROGRESS );
681+ SegmentZKMetadata committingSegmentZKMetadata = toCommitting
682+ ? updateSegmentZKMetadataToCommitting (realtimeTableName , committingSegmentDescriptor )
683+ : updateSegmentZKMetadataToDone (realtimeTableName , committingSegmentDescriptor , Status .IN_PROGRESS );
680684
681685 preProcessNewSegmentZKMetadata ();
682686
@@ -692,7 +696,7 @@ private void commitSegmentMetadataInternal(String realtimeTableName,
692696 LOGGER .info ("Updating Idealstate for previous: {} and new segment: {}" , committingSegmentName ,
693697 newConsumingSegmentName );
694698 long startTimeNs3 = System .nanoTime ();
695- Map <String , Map <String , String >> instanceStatesMapAfterStep3 = Collections . emptyMap () ;
699+ Map <String , Map <String , String >> instanceStatesMapAfterStep3 ;
696700 boolean newConsumingSegmentInIdealState = false ;
697701
698702 // When multiple segments of the same table complete around the same time it is possible that
@@ -791,8 +795,10 @@ private SegmentZKMetadata updateSegmentZKMetadataToDone(String realtimeTableName
791795 Preconditions .checkState (segmentMetadata != null , "Failed to find segment metadata from descriptor for segment: %s" ,
792796 segmentName );
793797 String segmentLocation = committingSegmentDescriptor .getSegmentLocation ();
794- String downloadUrl =
795- isPeerURL (segmentLocation ) ? CommonConstants .Segment .METADATA_URI_FOR_PEER_DOWNLOAD : segmentLocation ;
798+ Preconditions .checkArgument (segmentLocation != null , "Segment location must be provided" );
799+ String downloadUrl = isPeerUrl (segmentLocation )
800+ ? CommonConstants .Segment .METADATA_URI_FOR_PEER_DOWNLOAD
801+ : segmentLocation ;
796802 SegmentZKMetadataUtils .updateCommittingSegmentZKMetadata (realtimeTableName , segmentZKMetadata , segmentMetadata ,
797803 downloadUrl , committingSegmentDescriptor .getSegmentSizeBytes (), committingSegmentDescriptor .getNextOffset ());
798804 persistSegmentZKMetadata (realtimeTableName , segmentZKMetadata , stat .getVersion ());
@@ -979,6 +985,7 @@ public void commitSegmentMetadataToDone(String realtimeTableName,
979985 FlushThresholdUpdater flushThresholdUpdater =
980986 _flushThresholdUpdateManager .getFlushThresholdUpdater (streamConfig );
981987 flushThresholdUpdater .onSegmentCommit (streamConfig , committingSegmentDescriptor , committingSegmentZKMetadata );
988+ updateCommittingSegmentSizeGauge (streamConfig , committingSegmentDescriptor .getSegmentSizeBytes ());
982989 } catch (Exception e ) {
983990 LOGGER .error ("Caught exception while updating flush threshold for table: {}, segment: {}" , realtimeTableName ,
984991 segmentName , e );
@@ -988,11 +995,6 @@ public void commitSegmentMetadataToDone(String realtimeTableName,
988995 }
989996 }
990997
991- private boolean isPeerURL (String segmentLocation ) {
992- return segmentLocation != null && segmentLocation .toLowerCase ()
993- .startsWith (CommonConstants .Segment .PEER_SEGMENT_DOWNLOAD_SCHEME );
994- }
995-
996998 /**
997999 * Creates and persists segment ZK metadata for the new CONSUMING segment.
9981000 */
@@ -1041,13 +1043,35 @@ private void createNewSegmentZKMetadata(TableConfig tableConfig, StreamConfig st
10411043 FlushThresholdUpdater flushThresholdUpdater = _flushThresholdUpdateManager .getFlushThresholdUpdater (streamConfig );
10421044 if (committingSegmentZKMetadata != null ) {
10431045 flushThresholdUpdater .onSegmentCommit (streamConfig , committingSegmentDescriptor , committingSegmentZKMetadata );
1046+ long segmentSize = committingSegmentDescriptor .getSegmentSizeBytes ();
1047+ // Segment size might not be available for pauseless ingestion
1048+ if (segmentSize > 0 ) {
1049+ updateCommittingSegmentSizeGauge (streamConfig , segmentSize );
1050+ }
10441051 }
10451052 flushThresholdUpdater .updateFlushThreshold (streamConfig , newSegmentZKMetadata ,
10461053 getMaxNumPartitionsPerInstance (instancePartitions , numPartitions , numReplicas ));
1054+ updateFlushThresholdGauge (streamConfig , newSegmentZKMetadata .getSizeThresholdToFlushSegment ());
10471055
10481056 persistSegmentZKMetadata (realtimeTableName , newSegmentZKMetadata , -1 );
10491057 }
10501058
1059+ private void updateCommittingSegmentSizeGauge (StreamConfig streamConfig , long segmentSize ) {
1060+ String realtimeTableName = streamConfig .getTableNameWithType ();
1061+ String topicName = streamConfig .getTopicName ();
1062+ _controllerMetrics .setOrUpdateTableGauge (realtimeTableName , ControllerGauge .COMMITTING_SEGMENT_SIZE , segmentSize );
1063+ _controllerMetrics .setOrUpdateTableGauge (realtimeTableName , topicName ,
1064+ ControllerGauge .COMMITTING_SEGMENT_SIZE_WITH_TOPIC , segmentSize );
1065+ }
1066+
1067+ private void updateFlushThresholdGauge (StreamConfig streamConfig , int flushThreshold ) {
1068+ String realtimeTableName = streamConfig .getTableNameWithType ();
1069+ String topicName = streamConfig .getTopicName ();
1070+ _controllerMetrics .setOrUpdateTableGauge (realtimeTableName , ControllerGauge .NUM_ROWS_THRESHOLD , flushThreshold );
1071+ _controllerMetrics .setOrUpdateTableGauge (realtimeTableName , topicName ,
1072+ ControllerGauge .NUM_ROWS_THRESHOLD_WITH_TOPIC , flushThreshold );
1073+ }
1074+
10511075 private String computeStartOffset (String nextOffset , StreamConfig streamConfig , int partitionId ) {
10521076 if (!streamConfig .isEnableOffsetAutoReset () || streamConfig .isBackfillTopic ()) {
10531077 return nextOffset ;
0 commit comments