Skip to content

Commit

Permalink
Issue GoogleCloudDataproc#908: setting the preferred min parallelism …
Browse files Browse the repository at this point in the history
…to be not more than max parallelism (GoogleCloudDataproc#915)

* fix for issue 908, setting the preferred min parallelism to be not more than max parallelism

* adding to changes.md

* addressing review comments

* addressing review comments
  • Loading branch information
suryasoma authored Mar 7, 2023
1 parent fb1586e commit 0a62392
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 1 deletion.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
* GAX has been upgraded to version 2.23.0
* gRPC has been upgraded to version 1.53.0
* Netty has been upgraded to version 4.1.89.Final
* Issue #908: Making sure that `preferred_min_stream_count` must be less than or equal to `max_stream_count`

## 0.28.1 - 2023-02-27

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,13 @@ public ReadSessionResponse create(
log.debug("using default max parallelism [{}]", defaultMaxStreamCount);
return defaultMaxStreamCount;
});
int minStreamCount = preferredMinStreamCount;
if (minStreamCount > maxStreamCount) {
minStreamCount = maxStreamCount;
log.warn(
"preferred min parallelism is larger than the max parallelism, therefore setting it to max parallelism [{}]",
minStreamCount);
}
Instant sessionPrepEndTime = Instant.now();

ReadSession readSession =
Expand All @@ -144,7 +151,7 @@ public ReadSessionResponse create(
.setTable(tablePath)
.build())
.setMaxStreamCount(maxStreamCount)
.setPreferredMinStreamCount(preferredMinStreamCount)
.setPreferredMinStreamCount(minStreamCount)
.build());

if (readSession != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,4 +215,58 @@ public void testCustomMaxStreamCount() throws Exception {
assertThat(createReadSessionRequest.getPreferredMinStreamCount())
.isEqualTo(30); // 3 * given default parallelism
}

@Test
public void testMinStreamCountGreaterThanMaxStreamCount() throws Exception {
// setting up
when(bigQueryClient.getTable(any())).thenReturn(table);
mockBigQueryRead.reset();
mockBigQueryRead.addResponse(
ReadSession.newBuilder().addStreams(ReadStream.newBuilder().setName("0")).build());
BigQueryClientFactory mockBigQueryClientFactory = mock(BigQueryClientFactory.class);
when(mockBigQueryClientFactory.getBigQueryReadClient()).thenReturn(client);

ReadSessionCreatorConfig config =
new ReadSessionCreatorConfigBuilder()
.setPreferredMinParallelism(OptionalInt.of(21_000))
.setMaxParallelism(OptionalInt.of(10))
.build();
ReadSessionCreator creator =
new ReadSessionCreator(config, bigQueryClient, mockBigQueryClientFactory);
ReadSessionResponse readSessionResponse =
creator.create(table.getTableId(), ImmutableList.of(), Optional.empty());
assertThat(readSessionResponse).isNotNull();
assertThat(readSessionResponse.getReadSession().getStreamsCount()).isEqualTo(1);
CreateReadSessionRequest createReadSessionRequest =
(CreateReadSessionRequest) mockBigQueryRead.getRequests().get(0);
assertThat(createReadSessionRequest.getMaxStreamCount()).isEqualTo(10);
assertThat(createReadSessionRequest.getPreferredMinStreamCount()).isEqualTo(10);
}

@Test
public void testMaxStreamCountWithoutMinStreamCount() throws Exception {
// setting up
when(bigQueryClient.getTable(any())).thenReturn(table);
mockBigQueryRead.reset();
mockBigQueryRead.addResponse(
ReadSession.newBuilder().addStreams(ReadStream.newBuilder().setName("0")).build());
BigQueryClientFactory mockBigQueryClientFactory = mock(BigQueryClientFactory.class);
when(mockBigQueryClientFactory.getBigQueryReadClient()).thenReturn(client);

ReadSessionCreatorConfig config =
new ReadSessionCreatorConfigBuilder()
.setDefaultParallelism(20)
.setMaxParallelism(OptionalInt.of(10))
.build();
ReadSessionCreator creator =
new ReadSessionCreator(config, bigQueryClient, mockBigQueryClientFactory);
ReadSessionResponse readSessionResponse =
creator.create(table.getTableId(), ImmutableList.of(), Optional.empty());
assertThat(readSessionResponse).isNotNull();
assertThat(readSessionResponse.getReadSession().getStreamsCount()).isEqualTo(1);
CreateReadSessionRequest createReadSessionRequest =
(CreateReadSessionRequest) mockBigQueryRead.getRequests().get(0);
assertThat(createReadSessionRequest.getMaxStreamCount()).isEqualTo(10);
assertThat(createReadSessionRequest.getPreferredMinStreamCount()).isEqualTo(10);
}
}

0 comments on commit 0a62392

Please sign in to comment.