|
| 1 | +// Licensed to the Apache Software Foundation (ASF) under one or more |
| 2 | +// contributor license agreements. See the NOTICE file distributed with |
| 3 | +// this work for additional information regarding copyright ownership. |
| 4 | +// The ASF licenses this file to You under the Apache License, Version 2.0 |
| 5 | +// (the "License"); you may not use this file except in compliance with |
| 6 | +// the License. You may obtain a copy of the License at |
| 7 | +// |
| 8 | +// http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | +// |
| 10 | +// Unless required by applicable law or agreed to in writing, software |
| 11 | +// distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | +// See the License for the specific language governing permissions and |
| 14 | +// limitations under the License. |
| 15 | += Change Data Capture Extension |
| 16 | + |
| 17 | +WARNING: CDC is an experimental feature. API or design architecture might be changed. |
| 18 | + |
| 19 | +== Overview |
| 20 | +link:https://github.com/apache/ignite-extensions/tree/master/modules/cdc-ext[Change Data Capture Extension] module provides two ways to set up cross cluster replication based on CDC. |
| 21 | + |
| 22 | +. link:https://github.com/apache/ignite-extensions/blob/master/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/IgniteToIgniteCdcStreamer.java[Ignite2IgniteCdcStreamer] - streams changes to destination cluster using client node. |
| 23 | +. link:https://github.com/apache/ignite-extensions/blob/master/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/IgniteToKafkaCdcStreamer.java[Ignite2KafkaCdcStreamer] combined with link:https://github.com/apache/ignite-extensions/blob/master/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamer.java[KafkaToIgniteCdcStreamer] streams changes to destination cluster using link:https://kafka.apache.org[Apache Kafka] as a transport. |
| 24 | + |
| 25 | +NOTE: For each cache replicated between clusters link:https://github.com/apache/ignite/blob/master/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/CacheVersionConflictResolver.java[CacheVersionConflictResolver] should be defined. |
| 26 | + |
| 27 | + |
| 28 | +== Ignite to Ignite CDC streamer |
| 29 | +This streamer starts client node which connects to destination cluster. |
| 30 | +After connection is established, all changes captured by CDC will be replicated to destination cluster. |
| 31 | + |
| 32 | +NOTE: Instances of `ignite-cdc.sh` with configured streamer should be started on each server node of source cluster to capture all changes. |
| 33 | + |
| 34 | +image:../../assets/images/integrations/CDC-ignite2ignite.svg[] |
| 35 | + |
| 36 | +== Configuration |
| 37 | + |
| 38 | +[cols="20%,45%,35%",opts="header"] |
| 39 | +|=== |
| 40 | +|Name |Description | Default value |
| 41 | +| `caches` | Set of cache names to replicate. | null |
| 42 | +| `destinationIgniteConfiguration` | Ignite configuration of client nodes that will connect to destination cluster to replicate changes. | null |
| 43 | +| `onlyPrimary` | Flag to handle changes only on primary node. | `false` |
| 44 | +| `maxBatchSize` | Maximum number of events to be sent to destination cluster in a single batch. | 1024 |
| 45 | +|=== |
| 46 | + |
| 47 | +== Metrics |
| 48 | + |
| 49 | +|=== |
| 50 | +|Name |Description |
| 51 | +| `EventsCount` | Count of messages applied to destination cluster. |
| 52 | +| `LastEventTime` | Timestamp of last applied event. |
| 53 | +|=== |
| 54 | + |
| 55 | +== CDC replication using Kafka |
| 56 | + |
| 57 | +This way to replicate changes between clusters requires setting up two applications: |
| 58 | + |
| 59 | +. `ignite-cdc.sh` with `org.apache.ignite.cdc.kafka.IgniteToKafkaCdcStreamer` that will capture changes from source cluster and write it to Kafka topic. |
| 60 | +. `kafka-to-ignite.sh` that will read changes from Kafka topic and then write them to destination cluster. |
| 61 | + |
| 62 | +NOTE: Instances of `ignite-cdc.sh` with configured streamer should be started on each server node of source cluster to capture all changes. |
| 63 | + |
| 64 | +image:../../assets/images/integrations/CDC-ignite2kafka.svg[] |
| 65 | + |
| 66 | +=== IgniteToKafkaCdcStreamer Configuration |
| 67 | + |
| 68 | +[cols="20%,45%,35%",opts="header"] |
| 69 | +|=== |
| 70 | +|Name |Description | Default value |
| 71 | +| `caches` | Set of cache names to replicate. | null |
| 72 | +| `kafkaProperties` | Kafka producer properties. | null |
| 73 | +| `topic` | Name of the Kafka topic. | null |
| 74 | +| `kafkaParts` | Number of Kafka topic partitions. | null |
| 75 | +| `onlyPrimary` | Flag to handle changes only on primary node. | `false` |
| 76 | +| `maxBatchSize` | Maximum size of concurrently produced Kafka records. When streamer reaches this number, it waits for Kafka acknowledgements, and then commits CDC offset. | `1024` |
| 77 | +| `kafkaRequestTimeout` | Kafka request timeout in milliseconds. | `3000` |
| 78 | +|=== |
| 79 | + |
| 80 | +=== IgniteToKafkaCdcStreamer Metrics |
| 81 | + |
| 82 | +|=== |
| 83 | +|Name |Description |
| 84 | +| `EventsCount` | Count of messages applied to destination cluster. |
| 85 | +| `LastEventTime` | Timestamp of last applied event. |
| 86 | +| `BytesSent` | Number of bytes send to Kafka. |
| 87 | +|=== |
| 88 | + |
| 89 | +=== `kafka-to-ignite.sh` application |
| 90 | + |
| 91 | +This application should be started near the destination cluster. |
| 92 | +`kafka-to-ignite.sh` will read CDC events from Kafka topic and then apply them to destination cluster. |
| 93 | + |
| 94 | +IMPORTANT: `kafka-to-ignite.sh` implements the fail-fast approach. It just fails in case of any error. The restart procedure should be configured with the OS tools. |
| 95 | + |
| 96 | +Count of instances of the application does not corellate to the count of destination server nodes. |
| 97 | +It should be just enough to process source cluster load. |
| 98 | +Each instance of application will process configured subset of topic partitions to spread the load. |
| 99 | +`KafkaConsumer` for each partition will be created to ensure fair reads. |
| 100 | + |
| 101 | +==== Installation |
| 102 | + |
| 103 | +. Build `cdc-ext` module with maven: |
| 104 | ++ |
| 105 | +```console |
| 106 | + $~/src/ignite-extensions/> mvn clean package -DskipTests |
| 107 | + $~/src/ignite-extensions/> ls modules/cdc-ext/target | grep zip |
| 108 | +ignite-cdc-ext.zip |
| 109 | +``` |
| 110 | + |
| 111 | +. Unpack `ignite-cdc-ext.zip` archive to `$IGNITE_HOME` folder. |
| 112 | + |
| 113 | +Now, you have additional binary `$IGNITE_HOME/bin/kafka-to-ignite.sh` and `$IGNITE_HOME/libs/optional/ignite-cdc-ext` module. |
| 114 | + |
| 115 | +NOTE: Please, enable `ignite-cdc-ext` to be able to run `kafka-to-ignite.sh`. |
| 116 | + |
| 117 | +==== Configuration |
| 118 | + |
| 119 | +Application configuration should be done using POJO classes or Spring xml file like regular Ignite node configuration. |
| 120 | +Kafka to ignite configuration file should contain the following beans that will be loaded during startup: |
| 121 | + |
| 122 | +. `IgniteConfiguration` bean: Configuration of the client node that will connect to the destination cluster. |
| 123 | +. `java.util.Properties` bean with the name `kafkaProperties`: Single Kafka consumer configuration. |
| 124 | +. `org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration` bean: Options specific to `kafka-to-ignite.sh` application. |
| 125 | + |
| 126 | +[cols="20%,45%,35%",opts="header"] |
| 127 | +|=== |
| 128 | +|Name |Description | Default value |
| 129 | +| `caches` | Set of cache names to replicate. | null |
| 130 | +| `topic` | Name of the Kafka topic. | null |
| 131 | +| `kafkaPartsFrom` | Lower Kafka partitions number (inclusive). | -1 |
| 132 | +| `kafkaPartsTo` | Lower Kafka partitions number (exclusive). | -1 |
| 133 | +| `kafkaRequestTimeout` | Kafka request timeout in milliseconds. | `3000` |
| 134 | +| `maxBatchSize` | Maximum number of events to be sent to destination cluster in a single batch. | 1024 |
| 135 | +| `threadCount` | Count of threads to proceed consumers. Each thread poll records from dedicated partitions in round-robin manner. | 16 |
| 136 | +|=== |
| 137 | + |
| 138 | +==== Logging |
| 139 | + |
| 140 | +`kakfa-to-ignite.sh` uses the same logging configuration as the Ignite node does. The only difference is that the log is written in the "kafka-ignite-streamer.log" file. |
| 141 | + |
| 142 | +== CacheVersionConflictResolver implementation |
| 143 | + |
| 144 | +It expected that CDC streamers will be configured with the `onlyPrimary=false` in most real-world deployments to ensure fault-tolerance. |
| 145 | +That means streamer will send the same change several times equal to `CacheConfiguration#backups` + 1. |
| 146 | +At the same time concurrent updates of the same key can be done in replicated clusters. |
| 147 | +`CacheVersionConflictResolver` used by Ignite node to selects or merge new (from update request) and existing (stored in the cluster) entry versions. |
| 148 | +Selected entry version will be actually stored in the cluster. |
| 149 | + |
| 150 | +NOTE: Default implementation only select correct entry and never merge. |
| 151 | + |
| 152 | +link:https://github.com/apache/ignite/blob/master/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/CacheVersionConflictResolver.java[CacheVersionConflictResolver] should be defined for each cache replicated between clusters. |
| 153 | + |
| 154 | +Default link:https://github.com/apache/ignite-extensions/blob/master/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheVersionConflictResolverImpl.java[implementation] is available in cdc-ext. |
| 155 | + |
| 156 | +==== Configuration |
| 157 | + |
| 158 | +[cols="20%,45%,35%",opts="header"] |
| 159 | +|=== |
| 160 | +|Name |Description | Default value |
| 161 | +| `clusterId` | Local cluster id. Can be any value from 1 to 31. | null |
| 162 | +| `caches` | Set of cache names to handle with this plugin instance. | null |
| 163 | +| `conflictResolveField` | Value field to resolve conflict with. Optional. Field values must implement `java.lang.Comparable`. | null |
| 164 | +|=== |
| 165 | + |
| 166 | +==== Conflict resolve algorithm |
| 167 | + |
| 168 | +Replicated changes contain some additional data. Specifically, entry version from source cluster supplied with the changed data. |
| 169 | +Default conflict resolve algorithm based on entry version and `conflictResolveField`. |
| 170 | +Conflict resolution field should contain user provided monotonically increasing value such as query id or timestamp. |
| 171 | + |
| 172 | +. Changes from the "local" cluster always win. |
| 173 | +. If both old and new entry from the same cluster version comparison used to determine order. |
| 174 | +. If `conflictResolveField` if provided then field values comparison used to determine order. |
| 175 | +. Conflict resolution failed. Update will be ignored. |
| 176 | + |
| 177 | +==== Configuration example |
| 178 | +Configuration is done via Ignite node plugin: |
| 179 | + |
| 180 | +```xml |
| 181 | +<property name="pluginProviders"> |
| 182 | + <bean class="org.apache.ignite.cdc.conflictresolve.CacheVersionConflictResolverPluginProvider"> |
| 183 | + <property name="clusterId" value="1" /> |
| 184 | + <property name="caches"> |
| 185 | + <util:list> |
| 186 | + <bean class="java.lang.String"> |
| 187 | + <constructor-arg type="String" value="queryId" /> |
| 188 | + </bean> |
| 189 | + </util:list> |
| 190 | + </property> |
| 191 | + </bean> |
| 192 | +</property> |
| 193 | +``` |
0 commit comments