Skip to content

Commit 0e14724

Browse files
authored
Async sample row keys (#1369)
* Add AsyncSampleRowKeys to DataClient. * Implement AsyncSampleRowKeys. It's straight-forward: run the RPC, accumulate the response stream and retry on failure. * Add an integration test for AsyncSampleRowKeys. * Increase the timeout for integration tests. AsyncSampleRowKeys test has to write some more data to verify if the implementation does something reasonable. This takes several seconds, hence the need to increase the timeout.
1 parent 63ee8eb commit 0e14724

14 files changed

Lines changed: 569 additions & 1 deletion

google/cloud/bigtable/CMakeLists.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,8 @@ add_library(bigtable_client
153153
instance_update_config.h
154154
instance_update_config.cc
155155
internal/async_bulk_apply.h
156+
internal/async_sample_row_keys.h
157+
internal/async_sample_row_keys.cc
156158
internal/async_retry_op.h
157159
internal/async_retry_unary_rpc.h
158160
internal/bulk_mutator.h
@@ -300,6 +302,7 @@ set(bigtable_client_unit_tests
300302
internal/table_admin_test.cc
301303
internal/table_async_apply_test.cc
302304
internal/table_async_bulk_apply_test.cc
305+
internal/table_async_sample_row_keys_test.cc
303306
internal/table_test.cc
304307
mutations_test.cc
305308
table_admin_test.cc

google/cloud/bigtable/bigtable_client.bzl

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ bigtable_client_HDRS = [
1717
"instance_config.h",
1818
"instance_update_config.h",
1919
"internal/async_bulk_apply.h",
20+
"internal/async_sample_row_keys.h",
2021
"internal/async_retry_op.h",
2122
"internal/async_retry_unary_rpc.h",
2223
"internal/bulk_mutator.h",
@@ -68,6 +69,7 @@ bigtable_client_SRCS = [
6869
"instance_admin.cc",
6970
"instance_config.cc",
7071
"instance_update_config.cc",
72+
"internal/async_sample_row_keys.cc",
7173
"internal/bulk_mutator.cc",
7274
"internal/completion_queue_impl.cc",
7375
"internal/common_client.cc",

google/cloud/bigtable/bigtable_client_unit_tests.bzl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ bigtable_client_unit_tests = [
2424
"internal/table_admin_test.cc",
2525
"internal/table_async_apply_test.cc",
2626
"internal/table_async_bulk_apply_test.cc",
27+
"internal/table_async_sample_row_keys_test.cc",
2728
"internal/table_test.cc",
2829
"mutations_test.cc",
2930
"table_admin_test.cc",

google/cloud/bigtable/ci/run_integration_tests.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ set +e
3232
success=""
3333
readonly TIMEOUT_CMD="$(which timeout)"
3434
if [ -n "${TIMEOUT_CMD}" ]; then
35-
timeout="${TIMEOUT_CMD} 15s"
35+
timeout="${TIMEOUT_CMD} 20s"
3636
else
3737
timeout="env"
3838
fi

google/cloud/bigtable/data_client.cc

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,14 @@ class DefaultDataClient : public DataClient {
9595
btproto::SampleRowKeysRequest const& request) override {
9696
return impl_.Stub()->SampleRowKeys(context, request);
9797
}
98+
std::unique_ptr<::grpc::ClientAsyncReaderInterface<
99+
::google::bigtable::v2::SampleRowKeysResponse>>
100+
AsyncSampleRowKeys(
101+
::grpc::ClientContext* context,
102+
const ::google::bigtable::v2::SampleRowKeysRequest& request,
103+
::grpc::CompletionQueue* cq, void* tag) override {
104+
return impl_.Stub()->AsyncSampleRowKeys(context, request, cq, tag);
105+
}
98106

99107
std::unique_ptr<grpc::ClientReaderInterface<btproto::MutateRowsResponse>>
100108
MutateRows(grpc::ClientContext* context,

google/cloud/bigtable/data_client.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ class Table;
2929
} // namespace noex
3030
namespace internal {
3131
class AsyncBulkMutator;
32+
class AsyncSampleRowKeys;
3233
class BulkMutator;
3334
} // namespace internal
3435

@@ -82,6 +83,7 @@ class DataClient {
8283
friend class Table;
8384
friend class noex::Table;
8485
friend class internal::AsyncBulkMutator;
86+
friend class internal::AsyncSampleRowKeys;
8587
friend class internal::BulkMutator;
8688
friend class RowReader;
8789
//@{
@@ -111,6 +113,12 @@ class DataClient {
111113
grpc::ClientReaderInterface<google::bigtable::v2::SampleRowKeysResponse>>
112114
SampleRowKeys(grpc::ClientContext* context,
113115
google::bigtable::v2::SampleRowKeysRequest const& request) = 0;
116+
virtual std::unique_ptr<::grpc::ClientAsyncReaderInterface<
117+
::google::bigtable::v2::SampleRowKeysResponse>>
118+
AsyncSampleRowKeys(
119+
::grpc::ClientContext* context,
120+
const ::google::bigtable::v2::SampleRowKeysRequest& request,
121+
::grpc::CompletionQueue* cq, void* tag) = 0;
114122
virtual std::unique_ptr<
115123
grpc::ClientReaderInterface<google::bigtable::v2::MutateRowsResponse>>
116124
MutateRows(grpc::ClientContext* context,
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
// Copyright 2018 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#include "google/cloud/bigtable/internal/async_sample_row_keys.h"
16+
#include "google/cloud/bigtable/internal/table.h"
17+
#include "google/cloud/bigtable/rpc_retry_policy.h"
18+
#include "google/cloud/bigtable/table_strong_types.h"
19+
#include <numeric>
20+
21+
namespace google {
22+
namespace cloud {
23+
namespace bigtable {
24+
inline namespace BIGTABLE_CLIENT_NS {
25+
namespace internal {
26+
27+
AsyncSampleRowKeys::AsyncSampleRowKeys(
28+
std::shared_ptr<DataClient> client,
29+
bigtable::AppProfileId const& app_profile_id,
30+
bigtable::TableId const& table_name)
31+
: client_(std::move(client)) {
32+
bigtable::internal::SetCommonTableOperationRequest<
33+
google::bigtable::v2::SampleRowKeysRequest>(
34+
request_, app_profile_id.get(), table_name.get());
35+
}
36+
37+
} // namespace internal
38+
} // namespace BIGTABLE_CLIENT_NS
39+
} // namespace bigtable
40+
} // namespace cloud
41+
} // namespace google
Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
// Copyright 2018 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#ifndef GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_BIGTABLE_INTERNAL_ASYNC_SAMPLE_ROW_KEYS_H_
16+
#define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_BIGTABLE_INTERNAL_ASYNC_SAMPLE_ROW_KEYS_H_
17+
18+
#include "google/cloud/bigtable/async_operation.h"
19+
#include "google/cloud/bigtable/bigtable_strong_types.h"
20+
#include "google/cloud/bigtable/completion_queue.h"
21+
#include "google/cloud/bigtable/data_client.h"
22+
#include "google/cloud/bigtable/idempotent_mutation_policy.h"
23+
#include "google/cloud/bigtable/internal/async_retry_op.h"
24+
#include "google/cloud/bigtable/internal/bulk_mutator.h"
25+
#include "google/cloud/bigtable/internal/row_key_sample.h"
26+
#include "google/cloud/bigtable/table_strong_types.h"
27+
#include "google/cloud/internal/invoke_result.h"
28+
#include "google/cloud/internal/make_unique.h"
29+
30+
namespace google {
31+
namespace cloud {
32+
namespace bigtable {
33+
inline namespace BIGTABLE_CLIENT_NS {
34+
namespace internal {
35+
36+
/**
37+
* A SampleRowKeys call bound with client, table and app_profile_id.
38+
*
39+
* It satisfies the requirements to be used as the `Operation` parameter in
40+
* `AsyncRetryOp`.
41+
*
42+
* It encapsulates calling this RPC and accumulates the result. In case of an
43+
* error, all partially accumulated data is dropped.
44+
*/
45+
class AsyncSampleRowKeys {
46+
public:
47+
using Request = google::bigtable::v2::SampleRowKeysRequest;
48+
using Response = std::vector<RowKeySample>;
49+
50+
AsyncSampleRowKeys(std::shared_ptr<DataClient> client,
51+
bigtable::AppProfileId const& app_profile_id,
52+
bigtable::TableId const& table_name);
53+
54+
/** Start the bound aynchronous request.
55+
*
56+
* @tparam Functor the type of the function-like object that will receive the
57+
* results.
58+
*
59+
* @tparam valid_callback_type a format parameter, uses `std::enable_if<>` to
60+
* disable this template if the functor does not match the expected
61+
* signature.
62+
*
63+
* @param cq the completion queue to run the asynchronous operations.
64+
*
65+
* @param context the gRPC context used for this request
66+
*
67+
* @param callback the functor which will be fired in an unspecified thread
68+
* once the response stream completes
69+
*/
70+
template <typename Functor,
71+
typename std::enable_if<
72+
google::cloud::internal::is_invocable<Functor, CompletionQueue&,
73+
grpc::Status&>::value,
74+
int>::type valid_callback_type = 0>
75+
void Start(CompletionQueue& cq,
76+
std::unique_ptr<grpc::ClientContext>&& context,
77+
Functor&& callback) {
78+
cq.MakeUnaryStreamRpc(
79+
*client_, &DataClient::AsyncSampleRowKeys, request_, std::move(context),
80+
[this](CompletionQueue&, const grpc::ClientContext&,
81+
google::bigtable::v2::SampleRowKeysResponse& response) {
82+
response_.emplace_back(RowKeySample{std::move(response.row_key()),
83+
response.offset_bytes()});
84+
},
85+
FinishedCallback<Functor>(*this, std::forward<Functor>(callback)));
86+
}
87+
88+
Response AccumulatedResult() { return response_; }
89+
90+
private:
91+
template <typename Functor,
92+
typename std::enable_if<
93+
google::cloud::internal::is_invocable<Functor, CompletionQueue&,
94+
grpc::Status&>::value,
95+
int>::type valid_callback_type = 0>
96+
struct FinishedCallback {
97+
FinishedCallback(AsyncSampleRowKeys& parent, Functor&& callback)
98+
: parent_(parent), callback_(callback) {}
99+
100+
void operator()(CompletionQueue& cq, grpc::ClientContext& context,
101+
grpc::Status& status) {
102+
if (not status.ok()) {
103+
// The sample must be a consistent sample of the rows in the table. On
104+
// failure we must forget the previous responses and accumulate only
105+
// new values.
106+
parent_.response_ = Response();
107+
}
108+
callback_(cq, status);
109+
}
110+
111+
// The user of AsyncSampleRowKeys has to make sure that it is not destructed
112+
// before all callbacks return, so we have a guarantee that this reference
113+
// is valid for as long as we don't call callback_.
114+
AsyncSampleRowKeys& parent_;
115+
Functor callback_;
116+
};
117+
118+
private:
119+
std::shared_ptr<bigtable::DataClient> client_;
120+
Request request_;
121+
Response response_;
122+
};
123+
124+
/**
125+
* Perform an `AsyncSampleRowKeys` operation request with retries.
126+
*
127+
* @tparam Functor the type of the function-like object that will receive the
128+
* results. It must satisfy (using C++17 types):
129+
* static_assert(std::is_invocable_v<
130+
* Functor, CompletionQueue&, std::vector<RowKeySample>&,
131+
* grpc::Status&>);
132+
*
133+
* @tparam valid_callback_type a format parameter, uses `std::enable_if<>` to
134+
* disable this template if the functor does not match the expected
135+
* signature.
136+
*/
137+
template <typename Functor,
138+
typename std::enable_if<
139+
google::cloud::internal::is_invocable<Functor, CompletionQueue&,
140+
std::vector<RowKeySample>&,
141+
grpc::Status&>::value,
142+
int>::type valid_callback_type = 0>
143+
class AsyncRetrySampleRowKeys
144+
: public AsyncRetryOp<ConstantIdempotencyPolicy, Functor,
145+
AsyncSampleRowKeys> {
146+
public:
147+
AsyncRetrySampleRowKeys(char const* error_message,
148+
std::unique_ptr<RPCRetryPolicy> rpc_retry_policy,
149+
std::unique_ptr<RPCBackoffPolicy> rpc_backoff_policy,
150+
MetadataUpdatePolicy metadata_update_policy,
151+
std::shared_ptr<bigtable::DataClient> client,
152+
bigtable::AppProfileId const& app_profile_id,
153+
bigtable::TableId const& table_name,
154+
Functor&& callback)
155+
: AsyncRetryOp<ConstantIdempotencyPolicy, Functor, AsyncSampleRowKeys>(
156+
error_message, std::move(rpc_retry_policy),
157+
// BulkMutator is idempotent because it keeps track of idempotency
158+
// of the mutations it holds.
159+
std::move(rpc_backoff_policy), ConstantIdempotencyPolicy(true),
160+
std::move(metadata_update_policy), std::forward<Functor>(callback),
161+
AsyncSampleRowKeys(client, std::move(app_profile_id),
162+
std::move(table_name))) {}
163+
};
164+
165+
} // namespace internal
166+
} // namespace BIGTABLE_CLIENT_NS
167+
} // namespace bigtable
168+
} // namespace cloud
169+
} // namespace google
170+
171+
#endif // GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_BIGTABLE_INTERNAL_ASYNC_SAMPLE_ROW_KEYS_H_

google/cloud/bigtable/internal/table.h

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
#include "google/cloud/bigtable/idempotent_mutation_policy.h"
2323
#include "google/cloud/bigtable/internal/async_bulk_apply.h"
2424
#include "google/cloud/bigtable/internal/async_retry_unary_rpc.h"
25+
#include "google/cloud/bigtable/internal/async_sample_row_keys.h"
2526
#include "google/cloud/bigtable/internal/bulk_mutator.h"
2627
#include "google/cloud/bigtable/internal/row_key_sample.h"
2728
#include "google/cloud/bigtable/metadata_update_policy.h"
@@ -281,6 +282,35 @@ class Table {
281282
return result;
282283
}
283284

285+
/**
286+
* Make an asynchronous request to get sample row keys.
287+
*
288+
* @param cq the completion queue that will execute the asynchronous calls,
289+
* the application must ensure that one or more threads are blocked on
290+
* `cq.Run()`.
291+
* @param callback a functor to be called when the operation completes. It
292+
* must satisfy (using C++17 types):
293+
* static_assert(std::is_invocable_v<
294+
* Functor, CompletionQueue&, std::vector<RowKeySample>&,
295+
* grpc::Status&>);
296+
*
297+
* @tparam Functor the type of the callback.
298+
*/
299+
template <typename Functor,
300+
typename std::enable_if<
301+
google::cloud::internal::is_invocable<
302+
Functor, CompletionQueue&, std::vector<RowKeySample>&,
303+
grpc::Status&>::value,
304+
int>::type valid_callback_type = 0>
305+
void AsyncSampleRowKeys(CompletionQueue& cq, Functor&& callback) {
306+
auto op =
307+
std::make_shared<bigtable::internal::AsyncRetrySampleRowKeys<Functor>>(
308+
__func__, rpc_retry_policy_->clone(), rpc_backoff_policy_->clone(),
309+
metadata_update_policy_, client_, app_profile_id_, table_name_,
310+
std::forward<Functor>(callback));
311+
312+
op->Start(cq);
313+
}
284314
//@}
285315

286316
private:

0 commit comments

Comments
 (0)