Skip to content

Commit

Permalink
Close COPY TO/FROM PROGRAM descriptors in case of error
Browse files Browse the repository at this point in the history
Commit 110b825 added an ifThrow argument to the close_program_pipes function.
That commit contained a call to close_program_pipes(cstate, false) on errors.
But then this function call was lost. And now, in case of errors, open pipes are
not closed until the end of the session.

This patch adds a callback for closing handles on errors, which is called before
deleting the memory context of the copy state.
  • Loading branch information
RekGRpth authored and Stolb27 committed May 2, 2024
1 parent 2fd27bb commit 6dafe31
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 9 deletions.
3 changes: 2 additions & 1 deletion src/backend/access/external/url_execute.c
Original file line number Diff line number Diff line change
Expand Up @@ -819,7 +819,8 @@ pclose_with_stderr(int pid, int *pipes, StringInfo sinfo)
/* close the data pipe. we can now read from error pipe without being blocked */
close(pipes[EXEC_DATA_P]);

read_err_msg(pipes[EXEC_ERR_P], sinfo);
if (sinfo->data)
read_err_msg(pipes[EXEC_ERR_P], sinfo);

close(pipes[EXEC_ERR_P]);

Expand Down
36 changes: 28 additions & 8 deletions src/backend/commands/copy.c
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ static GpDistributionData *GetDistributionPolicyForPartition(GpDistributionData
MemoryContext context);
static unsigned int
GetTargetSeg(GpDistributionData *distData, Datum *baseValues, bool *baseNulls);
static ProgramPipes *open_program_pipes(char *command, bool forwrite);
static ProgramPipes *open_program_pipes(CopyState cstate, bool forwrite);
static void close_program_pipes(CopyState cstate, bool ifThrow);
static void cdbFlushInsertBatches(List *resultRels,
CopyState cstate,
Expand Down Expand Up @@ -2258,7 +2258,7 @@ BeginCopyToOnSegment(QueryDesc *queryDesc)

if (cstate->is_program)
{
cstate->program_pipes = open_program_pipes(cstate->filename, true);
cstate->program_pipes = open_program_pipes(cstate, true);
cstate->copy_file = fdopen(cstate->program_pipes->pipes[0], PG_BINARY_W);

if (cstate->copy_file == NULL)
Expand Down Expand Up @@ -2494,7 +2494,7 @@ BeginCopyTo(Relation rel,

if (is_program)
{
cstate->program_pipes = open_program_pipes(cstate->filename, true);
cstate->program_pipes = open_program_pipes(cstate, true);
cstate->copy_file = fdopen(cstate->program_pipes->pipes[0], PG_BINARY_W);

if (cstate->copy_file == NULL)
Expand Down Expand Up @@ -4697,7 +4697,7 @@ BeginCopyFrom(Relation rel,

if (cstate->is_program)
{
cstate->program_pipes = open_program_pipes(cstate->filename, false);
cstate->program_pipes = open_program_pipes(cstate, false);
cstate->copy_file = fdopen(cstate->program_pipes->pipes[0], PG_BINARY_R);
if (cstate->copy_file == NULL)
ereport(ERROR,
Expand Down Expand Up @@ -7953,9 +7953,21 @@ GetTargetSeg(GpDistributionData *distData, Datum *baseValues, bool *baseNulls)
return target_seg;
}

static void
close_program_pipes_on_reset(void *arg)
{
if (!IsAbortInProgress())
return;

CopyState cstate = arg;

close_program_pipes(cstate, false);
}

static ProgramPipes*
open_program_pipes(char *command, bool forwrite)
open_program_pipes(CopyState cstate, bool forwrite)
{
char *command = cstate->filename;
int save_errno;
pqsigfunc save_SIGPIPE;
/* set up extvar */
Expand Down Expand Up @@ -7993,6 +8005,12 @@ open_program_pipes(char *command, bool forwrite)
errmsg("can not start command: %s", command)));
}

MemoryContextCallback *callback = MemoryContextAlloc(cstate->copycontext, sizeof(MemoryContextCallback));

callback->arg = cstate;
callback->func = close_program_pipes_on_reset;
MemoryContextRegisterResetCallback(cstate->copycontext, callback);

return program_pipes;
}

Expand All @@ -8002,8 +8020,7 @@ close_program_pipes(CopyState cstate, bool ifThrow)
Assert(cstate->is_program);

int ret = 0;
StringInfoData sinfo;
initStringInfo(&sinfo);
StringInfoData sinfo = {0};

if (cstate->copy_file)
{
Expand All @@ -8016,8 +8033,11 @@ close_program_pipes(CopyState cstate, bool ifThrow)
{
return;
}


if (ifThrow)
initStringInfo(&sinfo);
ret = pclose_with_stderr(cstate->program_pipes->pid, cstate->program_pipes->pipes, &sinfo);
cstate->program_pipes = NULL;

if (ret == 0 || !ifThrow)
{
Expand Down
11 changes: 11 additions & 0 deletions src/test/regress/expected/copy2.out
Original file line number Diff line number Diff line change
Expand Up @@ -421,3 +421,14 @@ SELECT * FROM copy_eoc_marker;
(1 row)

DROP TABLE copy_eoc_marker;
-- Ensure we close COPY TO/FROM PROGRAM descriptors in case of error
CREATE TABLE test_copy_error (a SMALLINT) DISTRIBUTED BY (a);
COPY test_copy_error FROM PROGRAM 'seq 30000 90000 | cat -';
ERROR: value "32768" is out of range for type smallint
CONTEXT: COPY test_copy_error, line 2769, column a: "32768"
\! pgrep cat;
INSERT INTO test_copy_error SELECT CASE WHEN i = 10 THEN 0 ELSE i END FROM generate_series(1, 100) i;
COPY (SELECT 1 / a FROM test_copy_error) TO PROGRAM 'cat -';
ERROR: division by zero
\! pgrep cat;
DROP TABLE test_copy_error;
9 changes: 9 additions & 0 deletions src/test/regress/sql/copy2.sql
Original file line number Diff line number Diff line change
Expand Up @@ -354,3 +354,12 @@ COPY copy_eoc_marker FROM stdin LOG ERRORS SEGMENT REJECT LIMIT 5;
\.
SELECT * FROM copy_eoc_marker;
DROP TABLE copy_eoc_marker;

-- Ensure we close COPY TO/FROM PROGRAM descriptors in case of error
CREATE TABLE test_copy_error (a SMALLINT) DISTRIBUTED BY (a);
COPY test_copy_error FROM PROGRAM 'seq 30000 90000 | cat -';
\! pgrep cat;
INSERT INTO test_copy_error SELECT CASE WHEN i = 10 THEN 0 ELSE i END FROM generate_series(1, 100) i;
COPY (SELECT 1 / a FROM test_copy_error) TO PROGRAM 'cat -';
\! pgrep cat;
DROP TABLE test_copy_error;

0 comments on commit 6dafe31

Please sign in to comment.