Skip to content

Commit 870ecfa

Browse files
jpechanegunnarmorling
authored andcommitted
DBZ-1052 Emit tx BEGIN/END messages
1 parent 01126bf commit 870ecfa

File tree

4 files changed

+90
-16
lines changed

4 files changed

+90
-16
lines changed

proto/pg_logicaldec.proto

+2
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ enum Op {
88
INSERT = 0;
99
UPDATE = 1;
1010
DELETE = 2;
11+
BEGIN = 3;
12+
COMMIT = 4;
1113
}
1214

1315
message Point {

src/decoderbufs.c

+76-10
Original file line numberDiff line numberDiff line change
@@ -164,16 +164,6 @@ static void pg_decode_shutdown(LogicalDecodingContext *ctx) {
164164
MemoryContextDelete(data->context);
165165
}
166166

167-
/* BEGIN callback */
168-
static void pg_decode_begin_txn(LogicalDecodingContext *ctx,
169-
ReorderBufferTXN *txn) {
170-
}
171-
172-
/* COMMIT callback */
173-
static void pg_decode_commit_txn(LogicalDecodingContext *ctx,
174-
ReorderBufferTXN *txn, XLogRecPtr commit_lsn) {
175-
}
176-
177167
/* print tuple datums (only used for debug-mode) */
178168
static void print_tuple_datums(StringInfo out, Decoderbufs__DatumMessage **tup,
179169
size_t n) {
@@ -491,6 +481,82 @@ static void add_metadata_to_msg(Decoderbufs__TypeInfo **tmsg,
491481
}
492482
}
493483

484+
/* BEGIN callback */
485+
static void pg_decode_begin_txn(LogicalDecodingContext *ctx,
486+
ReorderBufferTXN *txn) {
487+
488+
DecoderData *data;
489+
MemoryContext old;
490+
Decoderbufs__RowMessage rmsg = DECODERBUFS__ROW_MESSAGE__INIT;
491+
elog(DEBUG1, "Entering begin callback");
492+
493+
494+
/* Avoid leaking memory by using and resetting our own context */
495+
data = ctx->output_plugin_private;
496+
old = MemoryContextSwitchTo(data->context);
497+
498+
rmsg.op = DECODERBUFS__OP__BEGIN;
499+
rmsg.has_op = true;
500+
rmsg.transaction_id = txn->xid;
501+
rmsg.has_transaction_id = true;
502+
rmsg.commit_time = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(txn->commit_time);
503+
rmsg.has_commit_time = true;
504+
505+
/* write msg */
506+
OutputPluginPrepareWrite(ctx, true);
507+
if (data->debug_mode) {
508+
print_row_msg(ctx->out, &rmsg);
509+
} else {
510+
size_t psize = decoderbufs__row_message__get_packed_size(&rmsg);
511+
void *packed = palloc(psize);
512+
size_t ssize = decoderbufs__row_message__pack(&rmsg, packed);
513+
appendBinaryStringInfo(ctx->out, packed, ssize);
514+
}
515+
OutputPluginWrite(ctx, true);
516+
517+
/* Cleanup, freeing memory */
518+
MemoryContextSwitchTo(old);
519+
MemoryContextReset(data->context);
520+
}
521+
522+
/* COMMIT callback */
523+
static void pg_decode_commit_txn(LogicalDecodingContext *ctx,
524+
ReorderBufferTXN *txn, XLogRecPtr commit_lsn) {
525+
526+
DecoderData *data;
527+
MemoryContext old;
528+
Decoderbufs__RowMessage rmsg = DECODERBUFS__ROW_MESSAGE__INIT;
529+
elog(DEBUG1, "Entering commit callback");
530+
531+
532+
/* Avoid leaking memory by using and resetting our own context */
533+
data = ctx->output_plugin_private;
534+
old = MemoryContextSwitchTo(data->context);
535+
536+
rmsg.op = DECODERBUFS__OP__COMMIT;
537+
rmsg.has_op = true;
538+
rmsg.transaction_id = txn->xid;
539+
rmsg.has_transaction_id = true;
540+
rmsg.commit_time = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(txn->commit_time);
541+
rmsg.has_commit_time = true;
542+
543+
/* write msg */
544+
OutputPluginPrepareWrite(ctx, true);
545+
if (data->debug_mode) {
546+
print_row_msg(ctx->out, &rmsg);
547+
} else {
548+
size_t psize = decoderbufs__row_message__get_packed_size(&rmsg);
549+
void *packed = palloc(psize);
550+
size_t ssize = decoderbufs__row_message__pack(&rmsg, packed);
551+
appendBinaryStringInfo(ctx->out, packed, ssize);
552+
}
553+
OutputPluginWrite(ctx, true);
554+
555+
/* Cleanup, freeing memory */
556+
MemoryContextSwitchTo(old);
557+
MemoryContextReset(data->context);
558+
}
559+
494560
/* callback for individual changed tuples */
495561
static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
496562
Relation relation, ReorderBufferChange *change) {

src/proto/pg_logicaldec.pb-c.c

+9-5
Original file line numberDiff line numberDiff line change
@@ -565,17 +565,21 @@ const ProtobufCMessageDescriptor decoderbufs__row_message__descriptor =
565565
(ProtobufCMessageInit) decoderbufs__row_message__init,
566566
NULL,NULL,NULL /* reserved[123] */
567567
};
568-
static const ProtobufCEnumValue decoderbufs__op__enum_values_by_number[3] =
568+
static const ProtobufCEnumValue decoderbufs__op__enum_values_by_number[5] =
569569
{
570570
{ "INSERT", "DECODERBUFS__OP__INSERT", 0 },
571571
{ "UPDATE", "DECODERBUFS__OP__UPDATE", 1 },
572572
{ "DELETE", "DECODERBUFS__OP__DELETE", 2 },
573+
{ "BEGIN", "DECODERBUFS__OP__BEGIN", 3 },
574+
{ "COMMIT", "DECODERBUFS__OP__COMMIT", 4 },
573575
};
574576
static const ProtobufCIntRange decoderbufs__op__value_ranges[] = {
575-
{0, 0},{0, 3}
577+
{0, 0},{0, 5}
576578
};
577-
static const ProtobufCEnumValueIndex decoderbufs__op__enum_values_by_name[3] =
579+
static const ProtobufCEnumValueIndex decoderbufs__op__enum_values_by_name[5] =
578580
{
581+
{ "BEGIN", 3 },
582+
{ "COMMIT", 4 },
579583
{ "DELETE", 2 },
580584
{ "INSERT", 0 },
581585
{ "UPDATE", 1 },
@@ -587,9 +591,9 @@ const ProtobufCEnumDescriptor decoderbufs__op__descriptor =
587591
"Op",
588592
"Decoderbufs__Op",
589593
"decoderbufs",
590-
3,
594+
5,
591595
decoderbufs__op__enum_values_by_number,
592-
3,
596+
5,
593597
decoderbufs__op__enum_values_by_name,
594598
1,
595599
decoderbufs__op__value_ranges,

src/proto/pg_logicaldec.pb-c.h

+3-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)