· 6 years ago · May 02, 2019, 01:22 PM
1#!/usr/bin/env bash
2compose_file="schema-test-docker-compose.yml"
3config_file="schema-test-source-config.json"
4connect_log="connect.log"
5records_file="records.json"
6
7function compose(){
8 sudo docker-compose -f ${compose_file} $*
9}
10
11function clean(){
12 echo "Shutting down services"
13 compose down -v
14 echo "Deleting files"
15 rm ${compose_file} ${config_file} ${connect_log} ${records_file}
16}
17
18if [[ ${1} = 'clean' ]]; then
19 clean
20 exit 0
21fi
22
23if [[ -z "$(which jq)" ]]; then
24 echo "You'll need jq installed, see https://stedolan.github.io/jq/download/"
25 exit 1
26fi
27
28if [[ -z "$(which docker-compose)" ]]; then
29 echo "You'll need docker-compose (and docker) installed, see https://docs.docker.com/compose/install/"
30 exit 1
31fi
32
33echo "creating ${config_file}"
34tee ./${config_file} <<EOF
35{
36 "name": "source-test",
37 "config": {
38 "connector.class": "io.debezium.connector.mysql.MySqlConnector",
39 "ddl.parser.mode": "antlr",
40 "database.history.store.only.monitored.tables.ddl": "true",
41 "database.history.skip.unparseable.ddl": "true",
42 "tasks.max": "1",
43 "database.hostname": "mysql",
44 "database.port": "3306",
45 "database.user": "root",
46 "database.password": "root",
47 "database.server.id": "1337",
48 "database.serverTimezone": "Europe/Berlin",
49 "table.whitelist": "test.schema_test",
50 "database.history.kafka.bootstrap.servers": "broker:9092",
51 "database.history.kafka.topic": "cdc-history-test",
52 "include.schema.changes": "true",
53 "min.row.count.to.stream.results": "0",
54 "database.history.kafka.recovery.poll.interval.ms": "1000",
55 "database.history.kafka.recovery.attempts": "12000",
56 "max.batch.size": "16384",
57 "max.queue.size": "65536",
58 "decimal.handling.mode": "precise",
59 "snapshot.mode": "when_needed",
60 "connect.timeout.ms": "120000",
61 "database.server.name": "test-db",
62 "transforms": "route",
63 "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
64 "transforms.route.regex": "(.*)\\\\.(.*)\\\\.(.*)",
65 "transforms.route.replacement": "cdc-data-\$2-\$3"
66 }
67}
68EOF
69
70echo "creating ${compose_file}"
71tee ./${compose_file} <<EOF
72# Based on https://github.com/confluentinc/cp-docker-images/blob/master/examples/cp-all-in-one/docker-compose.yml
73version: '2'
74services:
75 zookeeper:
76 image: confluentinc/cp-zookeeper:5.0.0
77 hostname: zookeeper
78 container_name: zookeeper
79 ports:
80 - "2181:2181"
81 environment:
82 ZOOKEEPER_CLIENT_PORT: 2181
83 ZOOKEEPER_TICK_TIME: 2000
84
85 broker:
86 image: confluentinc/cp-kafka:5.0.0
87 hostname: broker
88 container_name: broker
89 depends_on:
90 - zookeeper
91 ports:
92 - 9092:9092
93 environment:
94 KAFKA_BROKER_ID: 1
95 KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
96 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
97 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092
98 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
99 KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
100 KAFKA_GROUP_MIN_SESSION_TIMEOUT_MS: 1000
101
102 schema-registry:
103 image: confluentinc/cp-schema-registry:5.0.0-beta30
104 hostname: schema-registry
105 container_name: schema-registry
106 depends_on:
107 - zookeeper
108 - broker
109 ports:
110 - 8081:8081
111 environment:
112 SCHEMA_REGISTRY_HOST_NAME: schema-registry
113 SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'
114
115 rest-proxy:
116 image: confluentinc/cp-kafka-rest:5.0.0
117 depends_on:
118 - zookeeper
119 - broker
120 - schema-registry
121 ports:
122 - 8082:8082
123 hostname: rest-proxy
124 container_name: rest-proxy
125 environment:
126 KAFKA_REST_HOST_NAME: rest-proxy
127 KAFKA_REST_BOOTSTRAP_SERVERS: 'broker:9092'
128 KAFKA_REST_LISTENERS: "http://0.0.0.0:8082"
129 KAFKA_REST_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
130
131 connect:
132 image: debezium/connect:0.8.3.Final
133 depends_on:
134 - mysql
135 - broker
136 ports:
137 - 9595:9595/tcp
138 - 9200:9200/tcp
139 - 8083:8083/tcp
140 environment:
141 BOOTSTRAP_SERVERS: "broker:9092"
142 HOST_NAME: "connect"
143 ADVERTISED_HOST_NAME: "connect"
144 ADVERTISED_PORT: "8083"
145 GROUP_ID: "connect-worker-1"
146 OFFSET_FLUSH_INTERVAL_MS: "30000"
147 OFFSET_FLUSH_TIMEOUT_MS: "25000"
148 CONFIG_STORAGE_TOPIC: "kafka-connect-configs-worker-1"
149 OFFSET_STORAGE_TOPIC: "kafka-connect-offsets-worker-1"
150 CONNECT_STATUS_STORAGE_TOPIC: "kafka-connect-status-worker-1"
151 KEY_CONVERTER: "io.confluent.connect.avro.AvroConverter"
152 VALUE_CONVERTER: "io.confluent.connect.avro.AvroConverter"
153 INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
154 INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
155 CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE: "true"
156 CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: "true"
157 CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
158 CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
159 CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
160 CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
161 CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"
162 CONNECT_INTERNAL_KEY_CONVERTER_SCHEMAS_ENABLE: "false"
163 CONNECT_INTERNAL_VALUE_CONVERTER_SCHEMAS_ENABLE: "false"
164 CONNECT_ACCESS_CONTROL_ALLOW_ORIGIN: "*"
165 CONNECT_ACCESS_CONTROL_ALLOW_METHODS: "GET,POST,PUT,DELETE,HEAD"
166 KAFKA_HEAP_OPTS: "-Xms2G -Xmx20G -Xloggc:/var/log/jvm-logs/verbose-gc.log -verbose:gc -XX:+PrintGCDateStamps -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/var/log/jvm-logs"
167 KAFKA_JMX_PORT: 9200
168 KAFKA_JMX_OPTS: '-Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false'
169 KAFKA_JMX_HOSTNAME: "localhost"
170
171 mysql:
172 image: mysql:5.6
173 ports:
174 - 3306:3306
175 environment:
176 MYSQL_ROOT_PASSWORD: root
177 TZ: Europe/Berlin
178 command: --log-bin=mysqld-bin --binlog-format=row --server-id=1
179EOF
180
181echo "docker-compose up"
182compose up >/dev/null 2>/dev/null &
183
184echo "Wait for kafka to be up (mysql will be long ready by then)"
185
186wait_attempts=300
187until (curl -s "http://localhost:8082/topics" >/dev/null); do
188 sleep 0.1s
189 wait_attempts="$((wait_attempts - 1))"
190 if [[ "${wait_attempts}" = "0" ]]; then
191 echo "timeout during waiting for kafka!"
192 exit 1
193 fi
194done
195
196echo "Create table in database"
197(
198mysql -u root -proot -h 127.0.0.1 -P 3306 <<EOF
199 create schema if not exists test;
200
201 use test;
202
203 create table schema_test (
204 id int primary key,
205 record_type enum('internal', 'external') not null default 'internal',
206 comment varchar(64)
207 );
208EOF
209) || exit
210
211echo "Start connector"
212curl -s -X POST -H "Content-Type: application/json" --data "@${config_file}" "localhost:8083/connectors" || exit
213echo ""
214
215echo "Insert data into table"
216(
217mysql -u root -proot -h 127.0.0.1 -P 3306 test <<EOF
218 insert into schema_test values
219 (1, 'internal', 'I\'m not null'),
220 (2, 'external', 'I\'m not null')
221 ;
222 insert into schema_test (id, comment) values (3, 'I use the default');
223
224EOF
225) || exit
226
227echo "Wait until schema subject published"
228wait_attempts=60
229while
230 error_code="$(curl -s localhost:8081/subjects/cdc-data-test-schema_test-value/versions/ | jq '.error_code?')"
231 [[ "${error_code}" = "40401" ]] # this error code means schema not found
232do
233 sleep 1s
234 wait_attempts="$((wait_attempts - 1))"
235 if [[ "${wait_attempts}" = "0" ]]; then
236 echo "timeout during waiting for kafka!"
237 exit 1
238 fi
239done
240
241echo "Wait until version published"
242wait_attempts=60
243while
244 versions="$(curl -s localhost:8081/subjects/cdc-data-test-schema_test-value/versions/)"
245 [[ "${versions}" = "[]" ]]
246do
247 sleep 1s
248 wait_attempts="$((wait_attempts - 1))"
249 if [[ "${wait_attempts}" = "0" ]]; then
250 echo "timeout during waiting for schema!"
251 exit 1
252 fi
253done
254
255echo "Register consumer"
256curl -s -X POST -H "Content-Type: application/vnd.kafka.v2+json" --data '{"name": "my_consumer_instance", "format": "avro", "auto.offset.reset": "earliest"}' http://localhost:8082/consumers/my_avro_consumer || exit
257
258echo "Subscribe to topic"
259curl -s -X POST -H "Content-Type: application/vnd.kafka.v2+json" --data '{"topics":["cdc-data-test-schema_test"]}' http://localhost:8082/consumers/my_avro_consumer/instances/my_consumer_instance/subscription || exit
260
261echo "Get messages"
262curl -s -X GET -H "Accept: application/vnd.kafka.avro.v2+json" http://localhost:8082/consumers/my_avro_consumer/instances/my_consumer_instance/records | jq '.' > ${records_file} || exit
263cat ${records_file} | jq -c '.[].value.after["test_db.test.schema_test.Value"]'
264
265echo "Creating additional table"
266(
267mysql -u root -proot -h 127.0.0.1 -P 3306 <<EOF
268 use test;
269
270 create table migration_test (
271 id varchar(20) null,
272 mgb_no varchar(20) null
273 );
274EOF
275) || exit
276
277echo "Adding records to new table"
278(
279mysql -u root -proot -h 127.0.0.1 -P 3306 test <<EOF
280 insert into migration_test values
281 ('1', '42'),
282 ('2', '23')
283 ;
284EOF
285) || exit
286
287echo "Creating unique index"
288(
289mysql -u root -proot -h 127.0.0.1 -P 3306 <<EOF
290 use test;
291 create unique index migration_test_mgb_no_uindex
292 on migration_test (mgb_no);
293;
294EOF
295) || exit
296
297echo "Alter table and insert some rows"
298(
299mysql -u root -proot -h 127.0.0.1 -P 3306 <<EOF
300 use test;
301 alter table schema_test add column one_more varchar(100) after comment;
302
303 insert into schema_test values
304 (23, 'internal', 'I\'m not null', 'blubb')
305 ;
306;
307EOF
308)
309
310echo "Wait until second schema version published"
311wait_attempts=60
312while
313 new_versions="$(curl -s localhost:8081/subjects/cdc-data-test-schema_test-value/versions/)"
314 [[ "${versions}" = "${new_versions}" ]]
315do
316 sleep 1s
317 wait_attempts="$((wait_attempts - 1))"
318 if [[ "${wait_attempts}" = "0" ]]; then
319 echo "timeout during waiting for schema!"
320 exit 1
321 fi
322done
323
324echo "Get messages"
325curl -s -X GET -H "Accept: application/vnd.kafka.avro.v2+json" http://localhost:8082/consumers/my_avro_consumer/instances/my_consumer_instance/records | jq '.' >> ${records_file} || exit
326cat ${records_file} | jq -c '.[].value.after["test_db.test.schema_test.Value"]'
327
328echo "Show schema"
329curl -s localhost:8081/subjects/cdc-data-test-schema_test-value/versions/latest | jq -r '.schema | fromjson | .fields[] | select( .name == "before") | .type[1].fields' || exit
330
331echo "select data from mysql"
332mysql -u root -proot -h 127.0.0.1 -P 3306 test -e "select * from schema_test"
333
334echo "dumping connect log at ${connect_log}, records are at ${records_file}"
335compose logs connect >> ${connect_log}
336
337printf "remove files and shut down? (yN): "
338read ans
339if [[ "$ans" = "y" || "$ans" = "Y" ]]; then
340 clean
341 exit 0
342fi
343
344echo "You can run test_add_columns_after.sh clean to clean up afterwards"