2828import org .apache .calcite .rel .RelFieldCollation ;
2929import org .apache .calcite .schema .SchemaPlus ;
3030import org .apache .calcite .tools .Frameworks ;
31+ import org .apache .calcite .util .ImmutableBitSet ;
32+ import org .apache .calcite .util .mapping .Mappings ;
3133import org .apache .ignite .cache .CacheMode ;
3234import org .apache .ignite .cache .affinity .AffinityFunction ;
3335import org .apache .ignite .cluster .ClusterNode ;
4244import org .apache .ignite .internal .processors .query .calcite .trait .TraitUtils ;
4345import org .apache .ignite .internal .processors .query .calcite .type .IgniteTypeFactory ;
4446import org .apache .ignite .internal .processors .query .calcite .util .AbstractService ;
47+ import org .apache .ignite .internal .processors .query .calcite .util .Commons ;
4548import org .apache .ignite .internal .processors .query .schema .SchemaChangeListener ;
4649import org .apache .ignite .internal .processors .query .schema .management .IndexDescriptor ;
4750import org .apache .ignite .internal .processors .subscription .GridInternalSubscriptionProcessor ;
@@ -160,33 +163,24 @@ public void subscriptionProcessor(GridInternalSubscriptionProcessor subscription
160163 }
161164
162165 /** {@inheritDoc} */
163- @ Override public synchronized void onSchemaCreated (String schemaName ) {
166+ @ Override public void onSchemaCreated (String schemaName ) {
164167 igniteSchemas .putIfAbsent (schemaName , new IgniteSchema (schemaName ));
165168 rebuild ();
166169 }
167170
168171 /** {@inheritDoc} */
169- @ Override public synchronized void onSchemaDropped (String schemaName ) {
172+ @ Override public void onSchemaDropped (String schemaName ) {
170173 igniteSchemas .remove (schemaName );
171174 rebuild ();
172175 }
173176
174177 /** {@inheritDoc} */
175- @ Override public synchronized void onSqlTypeCreated (
178+ @ Override public void onSqlTypeCreated (
176179 String schemaName ,
177180 GridQueryTypeDescriptor typeDesc ,
178181 GridCacheContextInfo <?, ?> cacheInfo
179182 ) {
180- IgniteSchema schema = igniteSchemas .computeIfAbsent (schemaName , IgniteSchema ::new );
181-
182- String tblName = typeDesc .tableName ();
183-
184- CacheTableDescriptorImpl desc =
185- new CacheTableDescriptorImpl (cacheInfo , typeDesc , affinityIdentity (cacheInfo .config ()));
186-
187- schema .addTable (tblName , new CacheTableImpl (ctx , desc ));
188-
189- rebuild ();
183+ publishTable (schemaName , typeDesc .tableName (), createTable (typeDesc , cacheInfo ));
190184 }
191185
192186 /** {@inheritDoc} */
@@ -196,7 +190,19 @@ public void subscriptionProcessor(GridInternalSubscriptionProcessor subscription
196190 GridCacheContextInfo <?, ?> cacheInfo ,
197191 List <QueryField > cols
198192 ) {
199- onSqlTypeCreated (schemaName , typeDesc , cacheInfo );
193+ IgniteCacheTable oldTbl = table (schemaName , typeDesc .tableName ());
194+ assert oldTbl != null ;
195+
196+ IgniteCacheTable newTbl = createTable (typeDesc , cacheInfo );
197+
198+ // Recreate indexes for the new table without columns shift.
199+ for (IgniteIndex idx : oldTbl .indexes ().values ()) {
200+ CacheIndexImpl idx0 = (CacheIndexImpl )idx ;
201+
202+ newTbl .addIndex (new CacheIndexImpl (idx0 .collation (), idx0 .name (), idx0 .queryIndex (), newTbl ));
203+ }
204+
205+ publishTable (schemaName , typeDesc .tableName (), newTbl );
200206 }
201207
202208 /** {@inheritDoc} */
@@ -206,7 +212,53 @@ public void subscriptionProcessor(GridInternalSubscriptionProcessor subscription
206212 GridCacheContextInfo <?, ?> cacheInfo ,
207213 List <String > cols
208214 ) {
209- onSqlTypeCreated (schemaName , typeDesc , cacheInfo );
215+ IgniteCacheTable oldTbl = table (schemaName , typeDesc .tableName ());
216+ assert oldTbl != null ;
217+
218+ IgniteCacheTable newTbl = createTable (typeDesc , cacheInfo );
219+
220+ // Recreate indexes for the new table with columns shift.
221+ int colsCnt = oldTbl .descriptor ().columnDescriptors ().size ();
222+ ImmutableBitSet .Builder retainedCols = ImmutableBitSet .builder ();
223+ retainedCols .set (0 , colsCnt );
224+
225+ for (String droppedCol : cols )
226+ retainedCols .clear (oldTbl .descriptor ().columnDescriptor (droppedCol ).fieldIndex ());
227+
228+ Mappings .TargetMapping mapping = Commons .mapping (retainedCols .build (), colsCnt );
229+
230+ for (IgniteIndex idx : oldTbl .indexes ().values ()) {
231+ CacheIndexImpl idx0 = (CacheIndexImpl )idx ;
232+
233+ newTbl .addIndex (new CacheIndexImpl (RelCollations .permute (idx0 .collation (), mapping ), idx0 .name (),
234+ idx0 .queryIndex (), newTbl ));
235+ }
236+
237+ publishTable (schemaName , typeDesc .tableName (), newTbl );
238+ }
239+
240+ /** */
241+ private IgniteCacheTable createTable (
242+ GridQueryTypeDescriptor typeDesc ,
243+ GridCacheContextInfo <?, ?> cacheInfo
244+ ) {
245+ CacheTableDescriptorImpl desc =
246+ new CacheTableDescriptorImpl (cacheInfo , typeDesc , affinityIdentity (cacheInfo .config ()));
247+
248+ return new CacheTableImpl (ctx , desc );
249+ }
250+
251+ /** */
252+ private void publishTable (
253+ String schemaName ,
254+ String tblName ,
255+ IgniteTable tbl
256+ ) {
257+ IgniteSchema schema = igniteSchemas .computeIfAbsent (schemaName , IgniteSchema ::new );
258+
259+ schema .addTable (tblName , tbl );
260+
261+ rebuild ();
210262 }
211263
212264 /** */
@@ -217,7 +269,7 @@ private static Object affinityIdentity(CacheConfiguration<?, ?> ccfg) {
217269 }
218270
219271 /** {@inheritDoc} */
220- @ Override public synchronized void onSqlTypeDropped (
272+ @ Override public void onSqlTypeDropped (
221273 String schemaName ,
222274 GridQueryTypeDescriptor typeDesc ,
223275 boolean destroy
@@ -230,12 +282,13 @@ private static Object affinityIdentity(CacheConfiguration<?, ?> ccfg) {
230282 }
231283
232284 /** {@inheritDoc} */
233- @ Override public synchronized void onIndexCreated (String schemaName , String tblName , String idxName ,
234- IndexDescriptor idxDesc ) {
235- IgniteSchema schema = igniteSchemas .get (schemaName );
236- assert schema != null ;
237-
238- IgniteCacheTable tbl = (IgniteCacheTable )schema .getTable (tblName );
285+ @ Override public void onIndexCreated (
286+ String schemaName ,
287+ String tblName ,
288+ String idxName ,
289+ IndexDescriptor idxDesc
290+ ) {
291+ IgniteCacheTable tbl = table (schemaName , tblName );
239292 assert tbl != null ;
240293
241294 RelCollation idxCollation = deriveSecondaryIndexCollation (idxDesc , tbl );
@@ -269,11 +322,8 @@ private static Object affinityIdentity(CacheConfiguration<?, ?> ccfg) {
269322 }
270323
271324 /** {@inheritDoc} */
272- @ Override public synchronized void onIndexDropped (String schemaName , String tblName , String idxName ) {
273- IgniteSchema schema = igniteSchemas .get (schemaName );
274- assert schema != null ;
275-
276- IgniteTable tbl = (IgniteTable )schema .getTable (tblName );
325+ @ Override public void onIndexDropped (String schemaName , String tblName , String idxName ) {
326+ IgniteTable tbl = table (schemaName , tblName );
277327 assert tbl != null ;
278328
279329 tbl .removeIndex (idxName );
@@ -283,21 +333,15 @@ private static Object affinityIdentity(CacheConfiguration<?, ?> ccfg) {
283333
284334 /** {@inheritDoc} */
285335 @ Override public void onIndexRebuildStarted (String schemaName , String tblName ) {
286- IgniteSchema schema = igniteSchemas .get (schemaName );
287- assert schema != null ;
288-
289- IgniteTable tbl = (IgniteTable )schema .getTable (tblName );
336+ IgniteTable tbl = table (schemaName , tblName );
290337 assert tbl != null ;
291338
292339 tbl .markIndexRebuildInProgress (true );
293340 }
294341
295342 /** {@inheritDoc} */
296343 @ Override public void onIndexRebuildFinished (String schemaName , String tblName ) {
297- IgniteSchema schema = igniteSchemas .get (schemaName );
298- assert schema != null ;
299-
300- IgniteTable tbl = (IgniteTable )schema .getTable (tblName );
344+ IgniteTable tbl = table (schemaName , tblName );
301345 assert tbl != null ;
302346
303347 tbl .markIndexRebuildInProgress (false );
@@ -328,6 +372,16 @@ private static Object affinityIdentity(CacheConfiguration<?, ?> ccfg) {
328372 return schema != null ? calciteSchema .getSubSchema (schema ) : calciteSchema ;
329373 }
330374
375+ /** */
376+ private IgniteCacheTable table (String schemaName , String tableName ) {
377+ IgniteSchema schema = igniteSchemas .get (schemaName );
378+
379+ if (schema != null )
380+ return (IgniteCacheTable )schema .getTable (tableName );
381+
382+ return null ;
383+ }
384+
331385 /** */
332386 private void rebuild () {
333387 SchemaPlus newCalciteSchema = Frameworks .createRootSchema (false );
0 commit comments