Pulsar sink数据到postgresql中数据一直是null

2022-08-10T06:52:48,814+0000 [pulsar-client-io-1-1] INFO  com.scurrilous.circe.checksum.Crc32cIntChecksum - SSE4.2 CRC32C provider initialized
2022-08-10T06:52:48,852+0000 [public/default/postgresql-storage-sink-0] INFO  org.apache.pulsar.client.impl.schema.AutoConsumeSchema - Configure topic schema \x00\x00\x00\x00\x00\x00\x00\x00 for topic persistent://public/default/dbserver1.public.table1-partition-0 : {"key":{"name":"KafKaJson","schema":{"type":"record","name":"Key","namespace":"dbserver1.public.table1","fields":[{"name":"id","type":"int"}],"connect.name":"dbserver1.public.table1.Key"},"type":"JSON","properties":{"__AVRO_READ_OFFSET__":"0"}},"value":{"name":"KafKaJson","schema":{"type":"record","name":"Envelope","namespace":"dbserver1.public.table1","fields":[{"name":"before","type":["null",{"type":"record","name":"Value","fields":[{"name":"id","type":"int"},{"name":"name","type":["null","string"]}],"connect.name":"dbserver1.public.table1.Value"}]},{"name":"after","type":["null","Value"]},{"name":"source","type":{"type":"record","name":"Source","namespace":"io.debezium.connector.postgresql","fields":[{"name":"version","type":"string"},{"name":"connector","type":"string"},{"name":"name","type":"string"},{"name":"ts_ms","type":"long"},{"name":"snapshot","type":[{"type":"string","connect.version":1,"connect.parameters":{"allowed":"true,last,false"},"connect.default":"false","connect.name":"io.debezium.data.Enum"},"null"],"default":"false"},{"name":"db","type":"string"},{"name":"sequence","type":["null","string"]},{"name":"schema","type":"string"},{"name":"table","type":"string"},{"name":"txId","type":["null","long"]},{"name":"lsn","type":["null","long"]},{"name":"xmin","type":["null","long"]}],"connect.name":"io.debezium.connector.postgresql.Source"}},{"name":"op","type":"string"},{"name":"ts_ms","type":["null","long"]},{"name":"transaction","type":["null",{"type":"record","name":"ConnectDefault","namespace":"org.apache.pulsar.kafka.shade.io.confluent.connect.avro","fields":[{"name":"id","type":"string"},{"name":"total_order","type":"long"},{"name":"data_collection_order","type":"long"}]}]}],"connect.name":"dbserver1.public.table1.Envelope"},"type":"J
SON","properties":{"__AVRO_READ_OFFSET__":"0"}}}
2022-08-10T06:52:48,853+0000 [public/default/postgresql-storage-sink-0] INFO  org.apache.pulsar.client.impl.schema.generic.MultiVersionGenericAvroReader - Load schema reader for version(0), schema is : {"type":"record","name":"Key","namespace":"dbserver1.public.table1","fields":[{"name":"id","type":"int"}],"connect.name":"dbserver1.public.table1.Key"}
2022-08-10T06:52:48,859+0000 [public/default/postgresql-storage-sink-0] INFO  org.apache.pulsar.client.impl.schema.generic.MultiVersionGenericAvroReader - Load schema reader for version(0), schema is : {"type":"record","name":"Envelope","namespace":"dbserver1.public.table1","fields":[{"name":"before","type":["null",{"type":"record","name":"Value","fields":[{"name":"id","type":"int"},{"name":"name","type":["null","string"],"default":null}],"connect.name":"dbserver1.public.table1.Value"}],"default":null},{"name":"after","type":["null","Value"],"default":null},{"name":"source","type":{"type":"record","name":"Source","namespace":"io.debezium.connector.postgresql","fields":[{"name":"version","type":"string"},{"name":"connector","type":"string"},{"name":"name","type":"string"},{"name":"ts_ms","type":"long"},{"name":"snapshot","type":[{"type":"string","connect.version":1,"connect.parameters":{"allowed":"true,last,false"},"connect.default":"false","connect.name":"io.debezium.data.Enum"},"null"],"default":"false"},{"name":"db","type":"string"},{"name":"sequence","type":["null","string"],"default":null},{"name":"schema","type":"string"},{"name":"table","type":"string"},{"name":"txId","type":["null","long"],"default":null},{"name":"lsn","type":["null","long"],"default":null},{"name":"xmin","type":["null","long"],"default":null}],"connect.name":"io.debezium.connector.postgresql.Source"}},{"name":"op","type":"string"},{"name":"ts_ms","type":["null","long"],"default":null},{"name":"transaction","type":["null",{"type":"record","name":"ConnectDefault","namespace":"org.apache.pulsar.kafka.shade.io.confluent.connect.avro","fields":[{"name":"id","type":"string"},{"name":"total_order","type":"long"},{"name":"data_collection_order","type":"long"}]}],"default":null}],"connect.name":"dbserver1.public.table1.Envelope"}
2022-08-10T06:52:49,253+0000 [pool-5-thread-1] ERROR org.apache.pulsar.io.jdbc.JdbcAbstractSink - Got exception 
org.postgresql.util.PSQLException: ERROR: null value in column "id" of relation "test4" violates not-null constraint
  Detail: Failing row contains (null, null).
        at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2565) ~[postgresql-42.2.25.jar:42.2.25]
        at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2297) ~[postgresql-42.2.25.jar:42.2.25]
        at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:322) ~[postgresql-42.2.25.jar:42.2.25]
        at org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:481) ~[postgresql-42.2.25.jar:42.2.25]
        at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:401) ~[postgresql-42.2.25.jar:42.2.25]
        at org.postgresql.jdbc.PgPreparedStatement.executeWithFlags(PgPreparedStatement.java:164) ~[postgresql-42.2.25.jar:42.2.25]
        at org.postgresql.jdbc.PgPreparedStatement.execute(PgPreparedStatement.java:153) ~[postgresql-42.2.25.jar:42.2.25]
        at org.apache.pulsar.io.jdbc.JdbcAbstractSink.flush(JdbcAbstractSink.java:203) ~[pulsar-io-jdbc-core-2.9.2.jar:?]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) [?:?]
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) [?:?]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
        at java.lang.Thread.run(Thread.java:829) [?:?]

请提供更多的信息,pulsar版本信息,postgresql版本信息,安装部署方法等环境信息。

pulsar版本为2.10版本,pg数据库版本为postgresql13,安装部署方式是容器话部署,单节点

Hi @wangtony1993 ,建议可以提 issue 到 pulsar 的 Github repo。
因为这样的问题需要大量上下文信息(日志和配置等),还需要准备相应的环境,论坛里可能没有对应的资源做相应复现。

或者你可以提供一个配置好的可访问的环境吗,这样可以点对点针对性排查下。