在使用pulsar io connector 时, 我没有找到相应的pause, resume 操作

Hi, ALL
我在使用pulsar io connector 时,没有找到对应的pause, resume 操作, 目前还没有提供这个支持,还是我没有找到,有知道的小伙伴 分享一下,谢谢

thanks

pulsar io connector 支持 create, deploy, update,stop, restart, reload, delete 操作,暂时不支持pause, resume ,可以参考以下文档 How to use Pulsar connectors | Apache Pulsar

1 Like

非常感谢您的回复,想问一下后续支持pause, resume? :wink:

hi,
目前我的使用方案是,使用stop, start 来代替pause,resume 操作,但是遇到一个问题,我使用版本是2.9.3,使用的debezium-oracle-source

  1. 当使用
    value.converter: “org.apache.kafka.connect.json.JsonConverter”
    key.converter: “org.apache.kafka.connect.json.JsonConverter” 时, 使用stop, start 来代替pause,resume 目前没有遇到问题
  2. 当 使用
    value.converter: “org.apache.pulsar.kafka.shade.io.confluent.connect.avro.AvroConverter”
    key.converter: “org.apache.pulsar.kafka.shade.io.confluent.connect.avro.AvroConverter” 时, 在stop 后,再start connector, 会出现无法解析debezium 内 offset 的现象,错误信息:
    2022-07-25T03:06:02,094-0400 [public/default/pulsar-oracle-source-0] ERROR org.apache.kafka.connect.storage.OffsetStorageReaderImpl - CRITICAL: Failed to deserialize offset data when getting offsets for task
    with namespace pulsar-kafka-connect-adaptor. No value for this data will be returned, which may break the task or cause it to skip some data. This could either be due to an error in the connector implementati
    on or incompatible schema.
    org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic pulsar-kafka-connect-adaptor to Avro:
    at org.apache.pulsar.kafka.shade.io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:110) ~[kafka-connect-avro-converter-shaded-2.9.3.jar:2.9.3]
    at org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offsets(OffsetStorageReaderImpl.java:128) [connect-runtime-2.7.0.jar:?]
    at io.debezium.connector.common.OffsetReader.offsets(OffsetReader.java:42) [debezium-core-1.7.1.Final.jar:1.7.1.Final]
    at io.debezium.connector.common.BaseSourceTask.getPreviousOffsets(BaseSourceTask.java:308) [debezium-core-1.7.1.Final.jar:1.7.1.Final]
    at io.debezium.connector.oracle.OracleConnectorTask.start(OracleConnectorTask.java:60) [debezium-connector-oracle-1.7.1.Final.jar:1.7.1.Final]
    at io.debezium.connector.common.BaseSourceTask.start(BaseSourceTask.java:133) [debezium-core-1.7.1.Final.jar:1.7.1.Final]
    at org.apache.pulsar.io.kafka.connect.AbstractKafkaConnectSource.open(AbstractKafkaConnectSource.java:137) [pulsar-io-kafka-connect-adaptor-2.9.3.jar:2.9.3]
    at org.apache.pulsar.io.kafka.connect.KafkaConnectSource.open(KafkaConnectSource.java:62) [pulsar-io-kafka-connect-adaptor-2.9.3.jar:2.9.3]
    at org.apache.pulsar.io.debezium.DebeziumSource.open(DebeziumSource.java:99) [pulsar-io-debezium-core-2.9.3.jar:2.9.3]
    at org.apache.pulsar.functions.instance.JavaInstanceRunnable.setupInput(JavaInstanceRunnable.java:757) [org.apache.pulsar-pulsar-functions-instance-2.9.3.jar:2.9.3]
    at org.apache.pulsar.functions.instance.JavaInstanceRunnable.setup(JavaInstanceRunnable.java:232) [org.apache.pulsar-pulsar-functions-instance-2.9.3.jar:2.9.3]
    at org.apache.pulsar.functions.instance.JavaInstanceRunnable.run(JavaInstanceRunnable.java:260) [org.apache.pulsar-pulsar-functions-instance-2.9.3.jar:2.9.3]
    at java.lang.Thread.run(Thread.java:834) [?:?]
    Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 4
    Caused by: java.io.IOException: Cannot get schema from schema registry!
    at org.apache.pulsar.kafka.shade.io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient.getSchemaBySubjectAndIdFromRegistry(MockSchemaRegistryClient.java:131) ~[kafka-connect-avro-converter-shaded-2.9.3.jar:2.9.3]
    at org.apache.pulsar.kafka.shade.io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient.getBySubjectAndId(MockSchemaRegistryClient.java:202) ~[kafka-connect-avro-converter-shaded-2.9.3.jar:2.9.3]
    at org.apache.pulsar.kafka.shade.io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient.getById(MockSchemaRegistryClient.java:179) ~[kafka-connect-avro-converter-shaded-2.9.3.jar:2.9.3]
    at org.apache.pulsar.kafka.shade.io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:107) ~[kafka-connect-avro-converter-shaded-2.9.3.jar:2.9.3]
    at org.apache.pulsar.kafka.shade.io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaAvroDeserializer.java:206) ~[kafka-connect-avro-converter-shaded-2.9.3.jar:2.9.3]
    at org.apache.pulsar.kafka.shade.io.confluent.connect.avro.AvroConverter$Deserializer.deserialize(AvroConverter.java:148) ~[kafka-connect-avro-converter-shaded-2.9.3.jar:2.9.3]
    at org.apache.pulsar.kafka.shade.io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:93) ~[kafka-connect-avro-converter-shaded-2.9.3.jar:2.9.3]
    … 12 more
    2022-07-25T03:06:02,101-0400 [public/default/pulsar-oracle-source-0] INFO io.debezium.connector.common.BaseSourceTask - No previous offsets found

不知道这个是不是bug, 还是说是我配置有问题

首先你可以尝试把debezium-oracle-source 的配置文件中database.server.name 和 name的值修改一下,其次snapshot.mode的值建议设置为 “schema_only”,修改完再重新启动试试。

如果还有相同的错误麻烦把完整的配置贴一下看看~

Hi,
我按照您提供的方法试过了,报错的问题解决了,但是引出来一个更大的问题:

  1. stop connector
  2. update connector info
  3. oracle 源 有数据变更
  4. start connector

会出现,在stop,start 期间变更的数据丢失问题.

我的理解是:
value.converter & key.converter 这两个参数

  1. 不应该控制 debezium 的offset的topic的存储方式, 应该仅仅是影响存储的table数据.
    或者
  2. 在设置为avro 情况下,解析debezium 的offset时,提供解析方案.

:yum:

在stop,start 期间变更的数据丢失问题是因为snapshot.mode的值设置为 “schema_only”,每次start都会从最新的位置开始读取binlog。看来你的业务场景还是需要设置snapshot.mode = "initial"来保证能捕获所有binlog的变化。snapshot.mode参数的具体含义可以参考官方文档Debezium connector for MySQL :: Debezium Documentation

你的上述理解没错的。之所以一开始切换value.converter & key.converter两个参数切换到avro时会报错是因为你在同一个database里去做测试,debezium source在每次启动时都会从表对应offset-topic里读取上次的offset,你上次记录的json schema的 offset,而本次启动时avro格式的schema,因此会兼容。之所以修改了database.server.name就生效了是因为每张表的offset-topic生成规则中包含了它的值。

非常感谢您的回复, 我想这里可能有些误解,value.converter & key.converter 这两个参数在使用json 或avro的时候,在做stop, start操作期间并不会修改配置文件的内容,仅仅只是stop, start 操作
目前我测试得到的结果是:

  1. 使用json,做start,stop 代替pause,resume 目前可正常解析json 格式的offset.
  2. 使用avro,做start,stop 代替pause,resume 目前无法解析avro 格式的offset.

@weibo.liu 建议遇到新的问题重新起一个 topic,下面的讨论显然已经不是 io connector 支不支持 pause / resume 的问题了。论坛不是微信,可以每个主题单独讨论哈。