Skip to content

Commit

Permalink
Fix truncate copy job when WRITE_TRUNCATE in BigQuery batch load (apa…
Browse files Browse the repository at this point in the history
…che#25101)

* Fix truncate copyjob when WRITE_TRUNCATE in BigQuery batch load
  • Loading branch information
Abacn authored Jan 20, 2023
1 parent 5e1ebee commit e379c23
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 2 deletions.
3 changes: 2 additions & 1 deletion CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,13 @@

* Avoids Cassandra syntax error when user-defined query has no where clause in it (Java) ([#24829](https://github.com/apache/beam/issues/24829)).
* Fixed JDBC connection failures (Java) during handshake due to deprecated TLSv1(.1) protocol for the JDK. ([#24623](https://github.com/apache/beam/issues/24623))
* Fixed Python BigQuery Batch Load write may truncate valid data when deposition sets to WRITE_TRUNCATE and incoming data is large (Python) ([#24623](https://github.com/apache/beam/issues/24535)).

## Known Issues

* ([#X](https://github.com/apache/beam/issues/X)).

# [2.44.0] - Unreleased
# [2.44.0] - 2023-01-12

## Highlights

Expand Down
2 changes: 1 addition & 1 deletion sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
Original file line number Diff line number Diff line change
Expand Up @@ -1093,7 +1093,7 @@ def _load_data(
load_job_project_id=self.load_job_project_id),
schema_mod_job_name_pcv))

if self.create_disposition == 'WRITE_TRUNCATE':
if self.write_disposition == 'WRITE_TRUNCATE':
# All loads going to the same table must be processed together so that
# the truncation happens only once. See BEAM-24535.
finished_temp_tables_load_job_ids_list_pc = (
Expand Down
41 changes: 41 additions & 0 deletions sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -715,6 +715,47 @@ def test_multiple_partition_files(self):
equal_to([6]),
label='CheckCopyJobCount')

@mock.patch(
'apache_beam.io.gcp.bigquery_file_loads.TriggerCopyJobs.process',
wraps=lambda *x: None)
def test_multiple_partition_files_write_truncate(self, mock_call_process):
destination = 'project1:dataset1.table1'

job_reference = bigquery_api.JobReference()
job_reference.projectId = 'project1'
job_reference.jobId = 'job_name1'
result_job = mock.Mock()
result_job.jobReference = job_reference

mock_job = mock.Mock()
mock_job.status.state = 'DONE'
mock_job.status.errorResult = None
mock_job.jobReference = job_reference

bq_client = mock.Mock()
bq_client.jobs.Get.return_value = mock_job

bq_client.jobs.Insert.return_value = result_job
bq_client.tables.Delete.return_value = None

with TestPipeline('DirectRunner') as p:
_ = (
p
| beam.Create(_ELEMENTS, reshuffle=False)
| bqfl.BigQueryBatchFileLoads(
destination,
custom_gcs_temp_location=self._new_tempdir(),
test_client=bq_client,
validate=False,
temp_file_format=bigquery_tools.FileFormat.JSON,
max_file_size=45,
max_partition_size=80,
max_files_per_partition=2,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE))

# TriggerCopyJob only processes once
self.assertEqual(mock_call_process.call_count, 1)

@parameterized.expand([
param(is_streaming=False, with_auto_sharding=False),
param(is_streaming=True, with_auto_sharding=False),
Expand Down

0 comments on commit e379c23

Please sign in to comment.