Skip to content

Commit b253e01

Browse files
Jsephmr-salty
authored andcommitted
fix: add logic to ObjectWriteStreambuf for handling jumps in upload ranges to fix #3280 (#3283)
* fix: add logic to ObjectWriteStreambuf for handling jumps in upload ranges
1 parent bf12752 commit b253e01

2 files changed

Lines changed: 170 additions & 15 deletions

File tree

google/cloud/storage/internal/object_streambuf.cc

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -368,6 +368,10 @@ StatusOr<ResumableUploadResponse> ObjectWriteStreambuf::Flush() {
368368

369369
auto chunk_count = actual_size / UploadChunkRequest::kChunkSizeQuantum;
370370
auto chunk_size = chunk_count * UploadChunkRequest::kChunkSizeQuantum;
371+
// GCS upload returns an updated range header that sets the next expected
372+
// byte. Check to make sure it remains consistent with the bytes stored in the
373+
// buffer.
374+
auto expected_next_byte = upload_session_->next_expected_byte() + chunk_size;
371375

372376
hash_validator_->Update(pbase(), chunk_size);
373377
StatusOr<ResumableUploadResponse> result;
@@ -376,9 +380,27 @@ StatusOr<ResumableUploadResponse> ObjectWriteStreambuf::Flush() {
376380
if (!last_response_) {
377381
return last_response_;
378382
}
379-
std::copy(pbase() + chunk_size, epptr(), pbase());
383+
auto actual_next_byte = upload_session_->next_expected_byte();
384+
auto bytes_uploaded = static_cast<int64_t>(chunk_size);
385+
if (actual_next_byte < expected_next_byte) {
386+
bytes_uploaded -= expected_next_byte - actual_next_byte;
387+
if (bytes_uploaded < 0) {
388+
std::ostringstream error_message;
389+
error_message << "Could not continue upload stream. GCS requested byte "
390+
<< actual_next_byte << " which has already been uploaded.";
391+
return Status(StatusCode::kAborted, error_message.str());
392+
}
393+
} else if (actual_next_byte > expected_next_byte) {
394+
std::ostringstream error_message;
395+
error_message << "Could not continue upload stream. "
396+
<< "GCS requested unexpected byte. (expected: "
397+
<< expected_next_byte << ", actual: " << actual_next_byte
398+
<< ")";
399+
return Status(StatusCode::kAborted, error_message.str());
400+
}
401+
std::copy(pbase() + bytes_uploaded, epptr(), pbase());
380402
setp(pbase(), epptr());
381-
pbump(static_cast<int>(actual_size - chunk_size));
403+
pbump(static_cast<int>(actual_size - bytes_uploaded));
382404
return last_response_;
383405
}
384406

google/cloud/storage/internal/object_streambuf_test.cc

Lines changed: 146 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ using ::testing::_;
3333
using ::testing::HasSubstr;
3434
using ::testing::InSequence;
3535
using ::testing::Invoke;
36+
using ::testing::InvokeWithoutArgs;
3637
using ::testing::Return;
3738
using ::testing::ReturnRef;
3839
using ::testing::SizeIs;
@@ -110,11 +111,13 @@ TEST(ObjectWriteStreambufTest, EmptyTrailer) {
110111
std::string const payload(quantum, '*');
111112

112113
int count = 0;
114+
size_t next_byte = 0;
113115
EXPECT_CALL(*mock, UploadChunk(_)).WillOnce(Invoke([&](std::string const& p) {
114116
++count;
115117
EXPECT_EQ(1, count);
116118
EXPECT_EQ(payload, p);
117119
auto last_committed_byte = payload.size() - 1;
120+
next_byte = last_committed_byte + 1;
118121
return make_status_or(ResumableUploadResponse{
119122
"", last_committed_byte, {}, ResumableUploadResponse::kInProgress, {}});
120123
}));
@@ -132,7 +135,9 @@ TEST(ObjectWriteStreambufTest, EmptyTrailer) {
132135
ResumableUploadResponse::kInProgress,
133136
{}});
134137
}));
135-
EXPECT_CALL(*mock, next_expected_byte()).WillOnce(Return(quantum));
138+
EXPECT_CALL(*mock, next_expected_byte()).WillRepeatedly(Invoke([&]() {
139+
return next_byte;
140+
}));
136141

137142
ObjectWriteStreambuf streambuf(
138143
std::move(mock), quantum,
@@ -152,16 +157,18 @@ TEST(ObjectWriteStreambufTest, FlushAfterLargePayload) {
152157
auto const quantum = UploadChunkRequest::kChunkSizeQuantum;
153158
std::string const payload_1(3 * quantum, '*');
154159
std::string const payload_2("trailer");
155-
160+
size_t next_byte = 0;
156161
{
157162
InSequence seq;
158-
EXPECT_CALL(*mock, UploadChunk(_))
159-
.WillOnce(Return(make_status_or(
160-
ResumableUploadResponse{"",
161-
payload_1.size() - 1,
162-
{},
163-
ResumableUploadResponse::kInProgress,
164-
{}})));
163+
EXPECT_CALL(*mock, UploadChunk(_)).WillOnce(InvokeWithoutArgs([&]() {
164+
next_byte = payload_1.size();
165+
return make_status_or(
166+
ResumableUploadResponse{"",
167+
payload_1.size() - 1,
168+
{},
169+
ResumableUploadResponse::kInProgress,
170+
{}});
171+
}));
165172
EXPECT_CALL(
166173
*mock, UploadFinalChunk(payload_2, payload_1.size() + payload_2.size()))
167174
.WillOnce(Return(make_status_or(
@@ -171,7 +178,9 @@ TEST(ObjectWriteStreambufTest, FlushAfterLargePayload) {
171178
ResumableUploadResponse::kInProgress,
172179
{}})));
173180
}
174-
EXPECT_CALL(*mock, next_expected_byte()).WillOnce(Return(3 * quantum));
181+
EXPECT_CALL(*mock, next_expected_byte()).WillRepeatedly(Invoke([&]() {
182+
return next_byte;
183+
}));
175184

176185
ObjectWriteStreambuf streambuf(
177186
std::move(mock), 3 * quantum,
@@ -194,11 +203,13 @@ TEST(ObjectWriteStreambufTest, FlushAfterFullQuantum) {
194203
std::string const payload_2(quantum, '*');
195204

196205
int count = 0;
206+
size_t next_byte = 0;
197207
EXPECT_CALL(*mock, UploadChunk(_)).WillOnce(Invoke([&](std::string const& p) {
198208
++count;
199209
EXPECT_EQ(1, count);
200210
auto expected = payload_1 + payload_2.substr(0, quantum - payload_1.size());
201211
EXPECT_EQ(expected, p);
212+
next_byte += p.size();
202213
return make_status_or(ResumableUploadResponse{
203214
"", quantum - 1, {}, ResumableUploadResponse::kInProgress, {}});
204215
}));
@@ -217,7 +228,9 @@ TEST(ObjectWriteStreambufTest, FlushAfterFullQuantum) {
217228
ResumableUploadResponse::kInProgress,
218229
{}});
219230
}));
220-
EXPECT_CALL(*mock, next_expected_byte()).WillOnce(Return(quantum));
231+
EXPECT_CALL(*mock, next_expected_byte()).WillRepeatedly(Invoke([&]() {
232+
return next_byte;
233+
}));
221234

222235
ObjectWriteStreambuf streambuf(
223236
std::move(mock), quantum,
@@ -238,11 +251,13 @@ TEST(ObjectWriteStreambufTest, OverflowFlushAtFullQuantum) {
238251
std::string const payload(quantum, '*');
239252

240253
int count = 0;
254+
size_t next_byte = 0;
241255
EXPECT_CALL(*mock, UploadChunk(_)).WillOnce(Invoke([&](std::string const& p) {
242256
++count;
243257
EXPECT_EQ(1, count);
244258
auto expected = payload;
245259
EXPECT_EQ(expected, p);
260+
next_byte += p.size();
246261
return make_status_or(ResumableUploadResponse{
247262
"", quantum - 1, {}, ResumableUploadResponse::kInProgress, {}});
248263
}));
@@ -260,7 +275,9 @@ TEST(ObjectWriteStreambufTest, OverflowFlushAtFullQuantum) {
260275
ResumableUploadResponse::kInProgress,
261276
{}});
262277
}));
263-
EXPECT_CALL(*mock, next_expected_byte()).WillOnce(Return(quantum));
278+
EXPECT_CALL(*mock, next_expected_byte()).WillRepeatedly(Invoke([&]() {
279+
return next_byte;
280+
}));
264281
EXPECT_CALL(*mock, done).WillRepeatedly(Return(false));
265282

266283
ObjectWriteStreambuf streambuf(
@@ -274,6 +291,118 @@ TEST(ObjectWriteStreambufTest, OverflowFlushAtFullQuantum) {
274291
EXPECT_STATUS_OK(response);
275292
}
276293

294+
/// @test verify that bytes not accepted by GCS will be re-uploaded next Flush.
295+
TEST(ObjectWriteStreambufTest, SomeBytesNotAccepted) {
296+
auto mock = google::cloud::internal::make_unique<
297+
testing::MockResumableUploadSession>();
298+
299+
auto const quantum = UploadChunkRequest::kChunkSizeQuantum;
300+
std::string const payload = std::string(quantum, '*') + "abcde";
301+
302+
size_t next_byte = 0;
303+
uint64_t const bytes_uploaded_first_try = quantum - 1;
304+
EXPECT_CALL(*mock, UploadChunk(_)).WillOnce(Invoke([&](std::string const& p) {
305+
auto expected = payload.substr(0, quantum);
306+
EXPECT_EQ(expected, p);
307+
next_byte += bytes_uploaded_first_try;
308+
return make_status_or(
309+
ResumableUploadResponse{"",
310+
bytes_uploaded_first_try - 1,
311+
{},
312+
ResumableUploadResponse::kInProgress,
313+
{}});
314+
}));
315+
EXPECT_CALL(*mock, UploadFinalChunk(_, _))
316+
.WillOnce(Invoke([&](std::string const& p, std::uint64_t s) {
317+
EXPECT_EQ(p, payload.substr(bytes_uploaded_first_try));
318+
EXPECT_EQ(payload.size(), s);
319+
auto last_committed_byte = payload.size() - 1;
320+
return make_status_or(
321+
ResumableUploadResponse{"{}",
322+
last_committed_byte,
323+
{},
324+
ResumableUploadResponse::kInProgress,
325+
{}});
326+
}));
327+
EXPECT_CALL(*mock, next_expected_byte()).WillRepeatedly(Invoke([&]() {
328+
return next_byte;
329+
}));
330+
EXPECT_CALL(*mock, done).WillRepeatedly(Return(false));
331+
332+
ObjectWriteStreambuf streambuf(
333+
std::move(mock), quantum,
334+
google::cloud::internal::make_unique<NullHashValidator>());
335+
336+
std::ostream output(&streambuf);
337+
output << payload;
338+
auto response = streambuf.Close();
339+
EXPECT_STATUS_OK(response);
340+
}
341+
342+
/// @test verify that the upload steam transitions to a bad state if the next
343+
/// expected byte jumps.
344+
TEST(ObjectWriteStreambufTest, NextExpectedByteJumpsAhead) {
345+
auto mock = google::cloud::internal::make_unique<
346+
testing::MockResumableUploadSession>();
347+
348+
auto const quantum = UploadChunkRequest::kChunkSizeQuantum;
349+
std::string const payload = std::string(quantum * 2, '*');
350+
351+
size_t next_byte = 0;
352+
EXPECT_CALL(*mock, UploadChunk(_)).WillOnce(Invoke([&](std::string const& p) {
353+
next_byte += quantum * 2;
354+
auto expected = payload.substr(0, quantum);
355+
EXPECT_EQ(expected, p);
356+
return make_status_or(ResumableUploadResponse{
357+
"", next_byte - 1, {}, ResumableUploadResponse::kInProgress, {}});
358+
}));
359+
EXPECT_CALL(*mock, next_expected_byte()).WillRepeatedly(Invoke([&]() {
360+
return next_byte;
361+
}));
362+
EXPECT_CALL(*mock, done).WillRepeatedly(Return(false));
363+
std::string id = "id";
364+
EXPECT_CALL(*mock, session_id).WillOnce(ReturnRef(id));
365+
366+
ObjectWriteStreambuf streambuf(
367+
std::move(mock), quantum,
368+
google::cloud::internal::make_unique<NullHashValidator>());
369+
std::ostream output(&streambuf);
370+
output << payload;
371+
EXPECT_FALSE(output.good());
372+
EXPECT_EQ(streambuf.last_status().code(), StatusCode::kAborted);
373+
}
374+
375+
/// @test verify that the upload steam transitions to a bad state if the next
376+
/// expected byte decreases.
377+
TEST(ObjectWriteStreambufTest, NextExpectedByteDecreases) {
378+
auto mock = google::cloud::internal::make_unique<
379+
testing::MockResumableUploadSession>();
380+
381+
auto const quantum = UploadChunkRequest::kChunkSizeQuantum;
382+
std::string const payload = std::string(quantum * 2, '*');
383+
384+
auto next_byte = quantum;
385+
EXPECT_CALL(*mock, UploadChunk(_)).WillOnce(InvokeWithoutArgs([&]() {
386+
next_byte--;
387+
return make_status_or(ResumableUploadResponse{
388+
"", next_byte - 1, {}, ResumableUploadResponse::kInProgress, {}});
389+
}));
390+
EXPECT_CALL(*mock, next_expected_byte()).WillRepeatedly(Invoke([&]() {
391+
return next_byte;
392+
}));
393+
EXPECT_CALL(*mock, done).WillRepeatedly(Return(false));
394+
std::string id = "id";
395+
EXPECT_CALL(*mock, session_id).WillOnce(ReturnRef(id));
396+
397+
ObjectWriteStreambuf streambuf(
398+
std::move(mock), quantum,
399+
google::cloud::internal::make_unique<NullHashValidator>());
400+
std::ostream output(&streambuf);
401+
output << payload;
402+
EXPECT_FALSE(output.good());
403+
EXPECT_EQ(streambuf.last_status().code(), StatusCode::kAborted);
404+
}
405+
277406
/// @test Verify that a stream flushes when mixing operations that add one
278407
/// character at a time and operations that add buffers.
279408
TEST(ObjectWriteStreambufTest, MixPutcPutn) {
@@ -286,11 +415,13 @@ TEST(ObjectWriteStreambufTest, MixPutcPutn) {
286415
std::string const payload_2(quantum, '*');
287416

288417
int count = 0;
418+
size_t next_byte = 0;
289419
EXPECT_CALL(*mock, UploadChunk(_)).WillOnce(Invoke([&](std::string const& p) {
290420
++count;
291421
EXPECT_EQ(1, count);
292422
auto expected = payload_1 + payload_2.substr(0, quantum - payload_1.size());
293423
EXPECT_EQ(expected, p);
424+
next_byte += p.size();
294425
return make_status_or(ResumableUploadResponse{
295426
"", quantum - 1, {}, ResumableUploadResponse::kInProgress, {}});
296427
}));
@@ -309,7 +440,9 @@ TEST(ObjectWriteStreambufTest, MixPutcPutn) {
309440
ResumableUploadResponse::kInProgress,
310441
{}});
311442
}));
312-
EXPECT_CALL(*mock, next_expected_byte()).WillOnce(Return(quantum));
443+
EXPECT_CALL(*mock, next_expected_byte()).WillRepeatedly(Invoke([&]() {
444+
return next_byte;
445+
}));
313446

314447
ObjectWriteStreambuf streambuf(
315448
std::move(mock), quantum,

0 commit comments

Comments
 (0)