-
Notifications
You must be signed in to change notification settings - Fork 410
[CELEBORN-2032] Create reader should change to peer by taskAttemptId #3320
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
cc @AngersZhuuuu @RexXiong @onebox-li PTAL. |
val originLocations = fileGroups.partitionGroups.get(partitionId) | ||
val hasReplicate = originLocations.asScala.exists(p => p != null && p.hasPeer) | ||
var locations = | ||
if (encodedAttemptId % 2 == 1 && hasReplicate) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not all replicas of the partition location are available, Therefore hasReplicate is true not mean the partition location has peer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
val originLocations = fileGroups.partitionGroups.get(partitionId) | ||
val hasReplicate = originLocations.asScala.exists(p => p != null && p.hasPeer) | ||
var locations = | ||
if (encodedAttemptId % 2 == 1 && hasReplicate) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we use context.attemptNumber()
?
@@ -568,6 +568,13 @@ private PartitionReader createReader( | |||
Optional<PartitionReaderCheckpointMetadata> checkpointMetadata) | |||
throws IOException, InterruptedException { | |||
|
|||
// CELEBORN-2032. For the first creation of a reader (non-retries) and | |||
// attemptNumber % 2 = 1, we should read the replica data first. | |||
if (location.hasPeer() && fetchChunkRetryCnt == 0 && attemptNumber % 2 == 1) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO we need remove this, as the location may already change to replica, and the Partitionlocation
should consistent with pbStreamHandler
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make sense.
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## main #3320 +/- ##
==========================================
- Coverage 63.54% 63.07% -0.46%
==========================================
Files 343 347 +4
Lines 20812 21296 +484
Branches 1835 1879 +44
==========================================
+ Hits 13222 13431 +209
- Misses 6630 6897 +267
- Partials 960 968 +8 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
What changes were proposed in this pull request?
In the dual-replica scenario, when creating a reader, we should select the replica based on taskAttemptId. Usually, taskAttempt0 selects primary partitionLocation, task Attempt1 selects replica partitionLocation, and so on. This will provide better fault tolerance.
Why are the changes needed?
Since #3079, we deleted the code logic which should use replica data when task attempt is odd, but if the data of primary partitionLocation is corrupted and CelebornInputStream#fillBuffer throws exception, such as decompression failure or some other exceptions, the replica prititionLocation will not be used when the task is retried. In fact, if taskAttempt1 uses the replica partitionLocation, taskAttempt1 can run successfully.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Existing UTs.