|
21 | 21 | import com.datastax.oss.driver.api.core.cql.BoundStatement; |
22 | 22 | import com.datastax.oss.driver.api.core.cql.BoundStatementBuilder; |
23 | 23 | import com.datastax.oss.driver.api.core.cql.PreparedStatement; |
| 24 | +import com.datastax.oss.driver.api.core.cql.ResultSet; |
24 | 25 | import com.datastax.oss.driver.api.core.cql.Row; |
25 | 26 | import com.datastax.oss.driver.api.core.metadata.TokenMap; |
26 | 27 | import com.datastax.oss.driver.api.core.servererrors.QueryValidationException; |
@@ -305,33 +306,51 @@ private boolean shouldInitializeTable() { |
305 | 306 | .orElse(true); |
306 | 307 | } |
307 | 308 |
|
| 309 | + private static int getCassandraMajorVersion(final CqlSession session) { |
| 310 | + try { |
| 311 | + ResultSet rs = session.execute("SELECT release_version FROM system.local"); |
| 312 | + Row row = rs.one(); |
| 313 | + String version = row.getString("release_version"); |
| 314 | + return Integer.parseInt(version.split("\\.")[0]); |
| 315 | + } catch (final Exception e) { |
| 316 | + return 0; |
| 317 | + } |
| 318 | + } |
| 319 | + |
308 | 320 | private static void initializeTable(final CqlSession session, final String keyspaceName, final String tableName, final Configuration configuration) { |
| 321 | + int cassandraMajorVersion = getCassandraMajorVersion(session); |
309 | 322 | CreateTableWithOptions createTable = createTable(keyspaceName, tableName) |
310 | 323 | .ifNotExists() |
311 | 324 | .withPartitionKey(KEY_COLUMN_NAME, DataTypes.BLOB) |
312 | 325 | .withClusteringColumn(COLUMN_COLUMN_NAME, DataTypes.BLOB) |
313 | 326 | .withColumn(VALUE_COLUMN_NAME, DataTypes.BLOB); |
314 | 327 |
|
315 | 328 | createTable = compactionOptions(createTable, configuration); |
316 | | - createTable = compressionOptions(createTable, configuration); |
| 329 | + createTable = compressionOptions(createTable, configuration, cassandraMajorVersion); |
317 | 330 | createTable = gcGraceSeconds(createTable, configuration); |
318 | 331 | createTable = speculativeRetryOptions(createTable, configuration); |
319 | 332 |
|
320 | 333 | session.execute(createTable.build()); |
321 | 334 | } |
322 | 335 |
|
323 | 336 | private static CreateTableWithOptions compressionOptions(final CreateTableWithOptions createTable, |
324 | | - final Configuration configuration) { |
| 337 | + final Configuration configuration, |
| 338 | + final int cassandraMajorVersion) { |
325 | 339 | if (!configuration.get(CF_COMPRESSION)) { |
326 | 340 | // No compression |
327 | 341 | return createTable.withNoCompression(); |
328 | 342 | } |
329 | 343 |
|
330 | 344 | String compressionType = configuration.get(CF_COMPRESSION_TYPE); |
331 | 345 | int chunkLengthInKb = configuration.get(CF_COMPRESSION_BLOCK_SIZE); |
| 346 | + Map<String, Object> options; |
| 347 | + |
| 348 | + if (cassandraMajorVersion >= 5) |
| 349 | + options = ImmutableMap.of("class", compressionType, "chunk_length_in_kb", chunkLengthInKb); |
| 350 | + else |
| 351 | + options = ImmutableMap.of("sstable_compression", compressionType, "chunk_length_kb", chunkLengthInKb); |
332 | 352 |
|
333 | | - return createTable.withOption("compression", |
334 | | - ImmutableMap.of("sstable_compression", compressionType, "chunk_length_kb", chunkLengthInKb)); |
| 353 | + return createTable.withOption("compression", options); |
335 | 354 | } |
336 | 355 |
|
337 | 356 | static CreateTableWithOptions compactionOptions(final CreateTableWithOptions createTable, |
|
0 commit comments