Skip to content

[CELEBORN-2032] Create reader should change to peer by taskAttemptId #3320

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from

Conversation

leixm
Copy link
Contributor

@leixm leixm commented Jun 10, 2025

What changes were proposed in this pull request?

In the dual-replica scenario, when creating a reader, we should select the replica based on taskAttemptId. Usually, taskAttempt0 selects primary partitionLocation, task Attempt1 selects replica partitionLocation, and so on. This will provide better fault tolerance.

Why are the changes needed?

Since #3079, we deleted the code logic which should use replica data when task attempt is odd, but if the data of primary partitionLocation is corrupted and CelebornInputStream#fillBuffer throws exception, such as decompression failure or some other exceptions, the replica prititionLocation will not be used when the task is retried. In fact, if taskAttempt1 uses the replica partitionLocation, taskAttempt1 can run successfully.

image

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Existing UTs.

@leixm
Copy link
Contributor Author

leixm commented Jun 10, 2025

cc @AngersZhuuuu @RexXiong @onebox-li PTAL.

@turboFei turboFei added the 0.6.0 label Jun 10, 2025
@turboFei turboFei requested a review from RexXiong June 10, 2025 21:36
@turboFei turboFei removed the 0.6.0 label Jun 11, 2025
val originLocations = fileGroups.partitionGroups.get(partitionId)
val hasReplicate = originLocations.asScala.exists(p => p != null && p.hasPeer)
var locations =
if (encodedAttemptId % 2 == 1 && hasReplicate) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not all replicas of the partition location are available, Therefore hasReplicate is true not mean the partition location has peer.

@AngersZhuuuu AngersZhuuuu changed the title CELEBORN-2032. Create reader should change to peer by taskAttemptId [CELEBORN-2032] Create reader should change to peer by taskAttemptId Jun 12, 2025
Copy link
Contributor

@FMX FMX left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

@turboFei turboFei requested a review from RexXiong June 16, 2025 23:27
val originLocations = fileGroups.partitionGroups.get(partitionId)
val hasReplicate = originLocations.asScala.exists(p => p != null && p.hasPeer)
var locations =
if (encodedAttemptId % 2 == 1 && hasReplicate) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we use context.attemptNumber()?

@@ -568,6 +568,13 @@ private PartitionReader createReader(
Optional<PartitionReaderCheckpointMetadata> checkpointMetadata)
throws IOException, InterruptedException {

// CELEBORN-2032. For the first creation of a reader (non-retries) and
// attemptNumber % 2 = 1, we should read the replica data first.
if (location.hasPeer() && fetchChunkRetryCnt == 0 && attemptNumber % 2 == 1) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO we need remove this, as the location may already change to replica, and the Partitionlocationshould consistent with pbStreamHandler

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense.

Copy link

codecov bot commented Jun 19, 2025

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 63.07%. Comparing base (fff9725) to head (514155b).
Report is 80 commits behind head on main.

Additional details and impacted files
@@            Coverage Diff             @@
##             main    #3320      +/-   ##
==========================================
- Coverage   63.54%   63.07%   -0.46%     
==========================================
  Files         343      347       +4     
  Lines       20812    21296     +484     
  Branches     1835     1879      +44     
==========================================
+ Hits        13222    13431     +209     
- Misses       6630     6897     +267     
- Partials      960      968       +8     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants