Skip to content

Bugfix for maxPendingProcessRecordsInput is 0#1708

Merged
lucienlu-aws merged 4 commits into
awslabs:masterfrom
lucienlu-aws:max-pending-batches-fix
Mar 2, 2026
Merged

Bugfix for maxPendingProcessRecordsInput is 0#1708
lucienlu-aws merged 4 commits into
awslabs:masterfrom
lucienlu-aws:max-pending-batches-fix

Conversation

@lucienlu-aws

@lucienlu-aws lucienlu-aws commented Feb 25, 2026

Copy link
Copy Markdown
Contributor

Issue #, if available:

Description of changes:

  • Add maxPendingProcessRecordsInput config to PollingConfig #1674 introduced a bug when users specify maxPendingProcessRecordsInput to be 0. This commit fixes it, details below
  • Limit maxPendingProcessRecordsInput to have max value of 5
  • [small fix] Update initial lastEventDeliveryTime to be now instead of epoch to avoid WARN log at KCL startup
                    "{}: Record delivery time to shard consumer is high at {} millis. Check the ExecutorStateEvent logs"
                            + " to see the state of the executor service. Also check if the RecordProcessor's processing "
                            + "time is high. ",

Symptom

When maxPendingProcessRecordsInput is 0 and the KCL application initializes a ShardConsumer at a checkpoint that is not at tip (i.e. could be TRIM_HORIZON with 2 batches of records or a sequence number that has at least two batches of records after it), there is a 60 second delay when retrieving the second record.

Fix

  • When maxPendingProcessRecordsInput is 0, don't fetch records from Kinesis until Publisher has demand
  • Do not attempt to drainQueue in subcsroption.request() and NotifyingSubscriber

By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you update it to Instant.now()? This is to avoid the false large delay warning for the first batch of records

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated

}

public PollingConfig maxPendingProcessRecordsInput(int maxPendingProcessRecordsInput) {
if (maxPendingProcessRecordsInput > 5) {

@chenylee-aws chenylee-aws Mar 2, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should probably use DEFAULT_MAX_PENDING_PROCESS_RECORDS_INPUT_LIMIT here as well.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

Comment on lines 49 to 51
public static final int DEFAULT_MAX_PENDING_PROCESS_RECORDS_INPUT_LIMIT = 5;

public static final int DEFAULT_MAX_PENDING_PROCESS_RECORDS_INPUT = 4;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you make these two private?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated

@lucienlu-aws lucienlu-aws merged commit 69ebb91 into awslabs:master Mar 2, 2026
6 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants