Exceção de conversão de datas com formato inválidos nos jobs streaming CDS

Description

Problema:

Está ocorrendo exceção nos jobs streaming CDS quando os dados são recebidos com formato de datas inválido, exemplo: 2024-07-24T13:41:18.459000Z.

Stacktrace:

java.lang.IllegalArgumentException: Failed to parse [2024-07-24T13:41:18.459000Z] as a Date (type String)
	at com.totvslabs.framework.core.common.utils.LabsConversionUtils.date(LabsConversionUtils.java:511)
	at com.totvslabs.framework.core.common.utils.LabsConversionUtils.date(LabsConversionUtils.java:440)
	at com.totvslabs.mdm.pipeline.cds.bigquery.BigQuerySchemaHelper.convertType(BigQuerySchemaHelper.java:55)
	at com.totvslabs.mdm.pipeline.cds.component.StagingToTableRow.apply(StagingToTableRow.java:132)
	at com.totvslabs.mdm.pipeline.bigquery.staging.StagingToKvTenantStagingTypeTableRow.apply(StagingToKvTenantStagingTypeTableRow.java:42)
	at com.totvslabs.mdm.pipeline.bigquery.staging.StagingToKvTenantStagingTypeTableRow.apply(StagingToKvTenantStagingTypeTableRow.java:28)
	at org.apache.beam.sdk.transforms.MapElements$2.processElement(MapElements.java:151)
	at org.apache.beam.sdk.transforms.MapElements$2$DoFnInvoker.invokeProcessElement(Unknown Source)
	at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:211)
	at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:188)
	at org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:340)
	at org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
	at org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:54)
	at org.apache.beam.runners.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:285)
	at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:275)
	at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:85)
	at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:423)
	at org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:76)
	at org.apache.beam.sdk.transforms.FlatMapElements$3.processElement(FlatMapElements.java:169)
	at org.apache.beam.sdk.transforms.FlatMapElements$3$DoFnInvoker.invokeProcessElement(Unknown Source)
	at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:211)
	at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:188)
	at org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:340)
	at org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
	at org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:54)
	at org.apache.beam.runners.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:285)
	at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:275)
	at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:85)
	at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:423)
	at com.totvslabs.mdm.pipeline.cds.writer.CarolStagingParquetWriter.createBigQueryOutput(CarolStagingParquetWriter.java:456)
	at com.totvslabs.mdm.pipeline.cds.writer.CarolStagingParquetWriter.lambda$getGenericRecordsFromPubSubMessage$1(CarolStagingParquetWriter.java:224)
	at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
	at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655)
	at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
	at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
	at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
	at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
	at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
	at com.totvslabs.mdm.pipeline.cds.writer.CarolStagingParquetWriter.lambda$getGenericRecordsFromPubSubMessage$2(CarolStagingParquetWriter.java:235)
	at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
	at java.base/java.util.Iterator.forEachRemaining(Iterator.java:133)
	at java.base/java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
	at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
	at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
	at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
	at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
	at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
	at com.totvslabs.mdm.pipeline.cds.writer.CarolStagingParquetWriter.getGenericRecordsFromPubSubMessage(CarolStagingParquetWriter.java:239)
	at com.totvslabs.mdm.pipeline.cds.writer.CarolStagingParquetWriter$StagingProcessor.processElement(CarolStagingParquetWriter.java:582)
	at com.totvslabs.mdm.pipeline.cds.writer.CarolStagingParquetWriter$StagingProcessor$DoFnInvoker.invokeProcessElement(Unknown Source)
	at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:211)
	at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:185)
	at org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:340)
	at org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
	at org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:54)
	at org.apache.beam.runners.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:285)
	at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:275)
	at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:85)
	at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:423)
	at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:411)
	at com.totvslabs.mdm.pipeline.cds.component.CarolStagingParse$1.processElement(CarolStagingParse.java:70)
	at com.totvslabs.mdm.pipeline.cds.component.CarolStagingParse$1$DoFnInvoker.invokeProcessElement(Unknown Source)
	at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:211)
	at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:188)
	at org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:340)
	at org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
	at org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:54)
	at org.apache.beam.runners.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:285)
	at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:275)
	at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:85)
	at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$OnTimerArgumentProvider.outputWithTimestamp(SimpleDoFnRunner.java:850)
	at org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.outputWithTimestamp(DoFnOutputReceivers.java:85)
	at org.apache.beam.sdk.transforms.GroupIntoBatches$GroupIntoBatchesDoFn.flushBatch(GroupIntoBatches.java:660)
	at org.apache.beam.sdk.transforms.GroupIntoBatches$GroupIntoBatchesDoFn.onBufferingTimer(GroupIntoBatches.java:601)
	at org.apache.beam.sdk.transforms.GroupIntoBatches$GroupIntoBatchesDoFn$OnTimerInvoker$tsendOfBuffering$dHMtZW5kT2ZCdWZmZXJpbmc.invokeOnTimer(Unknown Source)
	at org.apache.beam.sdk.transforms.reflect.ByteBuddyDoFnInvokerFactory$DoFnInvokerBase.invokeOnTimer(ByteBuddyDoFnInvokerFactory.java:242)
	at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.onTimer(SimpleDoFnRunner.java:205)
	at org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processUserTimer(SimpleParDoFn.java:366)
	at org.apache.beam.runners.dataflow.worker.SimpleParDoFn.access$600(SimpleParDoFn.java:79)
	at org.apache.beam.runners.dataflow.worker.SimpleParDoFn$TimerType$1.processTimer(SimpleParDoFn.java:454)
	at org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:483)
	at org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:358)
	at org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.finish(ParDoOperation.java:52)
	at org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:94)
	at org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1433)
	at org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$800(StreamingDataflowWorker.java:155)
	at org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$4.run(StreamingDataflowWorker.java:1056)
	at org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor.lambda$executeLockHeld$0(BoundedQueueExecutor.java:163)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)

Pontos de atenção:

Precisamos investigar se as requisições de intake receberam response 200 (aceite do request pela plataforma Carol), neste caso podemos estar perdendo dados com este mesmo cenário.

O risco que temos com este erro ocorrendo neste fluxo é que um registro com dado inválido pode estar invalidando todo o lote de dados que está sendo processado pelo job de streaming. Neste caso os dados não voltam a serem processados e o cliente, que tem expectativas que o dado foi enviado corretamente, não vai conseguirá visualizar os dados na tabela de Staging schema.

Os erros estão relacionados com os campos internos (metadados) das stagings: mdmCreated e mdmLastUpdated.

Logs: https://cloudlogging.app.goo.gl/eWouJiimgU5eiFir6

image-20240730-180310.png

Critério de aceite:

  • Tratar corretamente a conversão de datas que deveria estar ocorrendo em produção.
  • TODO: Alinhar com @Robson Thanael Poffo se a melhor estratégia seria tratar este tipo de exception neste ponto do fluxo da pipeline de streaming para não comprometer as demais informação do record e muito menos o lote que estava sendo processado junto à ele.

Activity

MARCOS STUMPF 2 September 2024, 20:25 Jira Internal Users

@André Pereira de Oliveira Só vejo implementação em subtarefas de defeitos, creio que este foi um caso que faltou deixar vinculado a branch e commit no card principal, correto?

Automation for Jira 2 August 2024, 20:19 Jira Internal Users

@Renan Fernando Schroeder ,
@André Pereira de Oliveira ,

Este issue foi planejada para ser entregue até 2024-09-27. Você pode confirmar consultando o campo Due Date desta issue.

Data já planejadas para esta issue: 2024-09-27

Se o campo External Issue Link estiver preenchido com o link de uma issue válida no Jira Produção o cliente também será notificado no Jira Produção.