Table of Contents
序
本文主要研究一下nifi的AbstractBinlogTableEventWriter
AbstractBinlogTableEventWriter
nifi-1.11.4/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/AbstractBinlogTableEventWriter.java
public abstract class AbstractBinlogTableEventWriter<T extends BinlogTableEventInfo> extends AbstractBinlogEventWriter<T> {
protected void writeJson(T event) throws IOException {
super.writeJson(event);
if (event.getDatabaseName() != null) {
jsonGenerator.writeStringField("database", event.getDatabaseName());
} else {
jsonGenerator.writeNullField("database");
}
if (event.getTableName() != null) {
jsonGenerator.writeStringField("table_name", event.getTableName());
} else {
jsonGenerator.writeNullField("table_name");
}
if (event.getTableId() != null) {
jsonGenerator.writeNumberField("table_id", event.getTableId());
} else {
jsonGenerator.writeNullField("table_id");
}
}
// Default implementation for table-related binlog events
@Override
public long writeEvent(ProcessSession session, String transitUri, T eventInfo, long currentSequenceId, Relationship relationship) {
FlowFile flowFile = session.create();
flowFile = session.write(flowFile, (outputStream) -> {
super.startJson(outputStream, eventInfo);
writeJson(eventInfo);
// Nothing in the body
super.endJson();
});
flowFile = session.putAllAttributes(flowFile, getCommonAttributes(currentSequenceId, eventInfo));
session.transfer(flowFile, relationship);
session.getProvenanceReporter().receive(flowFile, transitUri);
return currentSequenceId + 1;
}
}
复制代码
- AbstractBinlogTableEventWriter继承了AbstractBinlogEventWriter,其泛型基类为BinlogTableEventInfo,它有四个子类,分别是DDLEventWriter、InsertRowsWriter、UpdateRowsWriter、DeleteRowsWriter
DDLEventWriter
nifi-1.11.4/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/DDLEventWriter.java
public class DDLEventWriter extends AbstractBinlogTableEventWriter<DDLEventInfo> {
@Override
public long writeEvent(ProcessSession session, String transitUri, DDLEventInfo eventInfo, long currentSequenceId, Relationship relationship) {
FlowFile flowFile = session.create();
flowFile = session.write(flowFile, (outputStream) -> {
super.startJson(outputStream, eventInfo);
super.writeJson(eventInfo);
jsonGenerator.writeStringField("query", eventInfo.getQuery());
super.endJson();
});
flowFile = session.putAllAttributes(flowFile, getCommonAttributes(currentSequenceId, eventInfo));
session.transfer(flowFile, relationship);
session.getProvenanceReporter().receive(flowFile, transitUri);
return currentSequenceId + 1;
}
}
复制代码
- DDLEventWriter继承了AbstractBinlogTableEventWriter,其writeEvent方法写入DDLEventInfo
InsertRowsWriter
nifi-1.11.4/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/InsertRowsWriter.java
public class InsertRowsWriter extends AbstractBinlogTableEventWriter<InsertRowsEventInfo> {
/**
* Creates and transfers a new flow file whose contents are the JSON-serialized value of the specified event, and the sequence ID attribute set
*
* @param session A reference to a ProcessSession from which the flow file(s) will be created and transferred
* @param eventInfo An event whose value will become the contents of the flow file
* @return The next available CDC sequence ID for use by the CDC processor
*/
@Override
public long writeEvent(final ProcessSession session, String transitUri, final InsertRowsEventInfo eventInfo, final long currentSequenceId, Relationship relationship) {
final AtomicLong seqId = new AtomicLong(currentSequenceId);
for (Serializable[] row : eventInfo.getRows()) {
FlowFile flowFile = session.create();
flowFile = session.write(flowFile, outputStream -> {
super.startJson(outputStream, eventInfo);
super.writeJson(eventInfo);
final BitSet bitSet = eventInfo.getIncludedColumns();
writeRow(eventInfo, row, bitSet);
super.endJson();
});
flowFile = session.putAllAttributes(flowFile, getCommonAttributes(seqId.get(), eventInfo));
session.transfer(flowFile, relationship);
session.getProvenanceReporter().receive(flowFile, transitUri);
seqId.getAndIncrement();
}
return seqId.get();
}
protected void writeRow(InsertRowsEventInfo event, Serializable[] row, BitSet includedColumns) throws IOException {
jsonGenerator.writeArrayFieldStart("columns");
int i = includedColumns.nextSetBit(0);
while (i != -1) {
jsonGenerator.writeStartObject();
jsonGenerator.writeNumberField("id", i + 1);
ColumnDefinition columnDefinition = event.getColumnByIndex(i);
Integer columnType = null;
if (columnDefinition != null) {
jsonGenerator.writeStringField("name", columnDefinition.getName());
columnType = columnDefinition.getType();
jsonGenerator.writeNumberField("column_type", columnType);
}
if (row[i] == null) {
jsonGenerator.writeNullField("value");
} else {
jsonGenerator.writeObjectField("value", MySQLCDCUtils.getWritableObject(columnType, row[i]));
}
jsonGenerator.writeEndObject();
i = includedColumns.nextSetBit(i + 1);
}
jsonGenerator.writeEndArray();
}
}
复制代码
- InsertRowsWriter继承了AbstractBinlogTableEventWriter,其writeEvent方法写入InsertRowsEventInfo
UpdateRowsWriter
nifi-1.11.4/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/UpdateRowsWriter.java
public class UpdateRowsWriter extends AbstractBinlogTableEventWriter<UpdateRowsEventInfo> {
/**
* Creates and transfers a new flow file whose contents are the JSON-serialized value of the specified event, and the sequence ID attribute set
*
* @param session A reference to a ProcessSession from which the flow file(s) will be created and transferred
* @param eventInfo An event whose value will become the contents of the flow file
* @return The next available CDC sequence ID for use by the CDC processor
*/
@Override
public long writeEvent(final ProcessSession session, String transitUri, final UpdateRowsEventInfo eventInfo, final long currentSequenceId, Relationship relationship) {
final AtomicLong seqId = new AtomicLong(currentSequenceId);
for (Map.Entry<Serializable[], Serializable[]> row : eventInfo.getRows()) {
FlowFile flowFile = session.create();
flowFile = session.write(flowFile, outputStream -> {
super.startJson(outputStream, eventInfo);
super.writeJson(eventInfo);
final BitSet bitSet = eventInfo.getIncludedColumns();
writeRow(eventInfo, row, bitSet);
super.endJson();
});
flowFile = session.putAllAttributes(flowFile, getCommonAttributes(seqId.get(), eventInfo));
session.transfer(flowFile, relationship);
session.getProvenanceReporter().receive(flowFile, transitUri);
seqId.getAndIncrement();
}
return seqId.get();
}
protected void writeRow(UpdateRowsEventInfo event, Map.Entry<Serializable[], Serializable[]> row, BitSet includedColumns) throws IOException {
jsonGenerator.writeArrayFieldStart("columns");
int i = includedColumns.nextSetBit(0);
while (i != -1) {
jsonGenerator.writeStartObject();
jsonGenerator.writeNumberField("id", i + 1);
ColumnDefinition columnDefinition = event.getColumnByIndex(i);
Integer columnType = null;
if (columnDefinition != null) {
jsonGenerator.writeStringField("name", columnDefinition.getName());
columnType = columnDefinition.getType();
jsonGenerator.writeNumberField("column_type", columnType);
}
Serializable[] oldRow = row.getKey();
Serializable[] newRow = row.getValue();
if (oldRow[i] == null) {
jsonGenerator.writeNullField("last_value");
} else {
jsonGenerator.writeObjectField("last_value", MySQLCDCUtils.getWritableObject(columnType, oldRow[i]));
}
if (newRow[i] == null) {
jsonGenerator.writeNullField("value");
} else {
jsonGenerator.writeObjectField("value", MySQLCDCUtils.getWritableObject(columnType, newRow[i]));
}
jsonGenerator.writeEndObject();
i = includedColumns.nextSetBit(i + 1);
}
jsonGenerator.writeEndArray();
}
}
复制代码
- UpdateRowsWriter继承了AbstractBinlogTableEventWriter,其writeEvent方法写入UpdateRowsEventInfo
DeleteRowsWriter
nifi-1.11.4/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/DeleteRowsWriter.java
public class DeleteRowsWriter extends AbstractBinlogTableEventWriter<DeleteRowsEventInfo> {
/**
* Creates and transfers a new flow file whose contents are the JSON-serialized value of the specified event, and the sequence ID attribute set
*
* @param session A reference to a ProcessSession from which the flow file(s) will be created and transferred
* @param eventInfo An event whose value will become the contents of the flow file
* @return The next available CDC sequence ID for use by the CDC processor
*/
@Override
public long writeEvent(final ProcessSession session, String transitUri, final DeleteRowsEventInfo eventInfo, final long currentSequenceId, Relationship relationship) {
final AtomicLong seqId = new AtomicLong(currentSequenceId);
for (Serializable[] row : eventInfo.getRows()) {
FlowFile flowFile = session.create();
flowFile = session.write(flowFile, outputStream -> {
super.startJson(outputStream, eventInfo);
super.writeJson(eventInfo);
final BitSet bitSet = eventInfo.getIncludedColumns();
writeRow(eventInfo, row, bitSet);
super.endJson();
});
flowFile = session.putAllAttributes(flowFile, getCommonAttributes(seqId.get(), eventInfo));
session.transfer(flowFile, relationship);
session.getProvenanceReporter().receive(flowFile, transitUri);
seqId.getAndIncrement();
}
return seqId.get();
}
protected void writeRow(DeleteRowsEventInfo event, Serializable[] row, BitSet includedColumns) throws IOException {
jsonGenerator.writeArrayFieldStart("columns");
int i = includedColumns.nextSetBit(0);
while (i != -1) {
jsonGenerator.writeStartObject();
jsonGenerator.writeNumberField("id", i + 1);
ColumnDefinition columnDefinition = event.getColumnByIndex(i);
Integer columnType = null;
if (columnDefinition != null) {
jsonGenerator.writeStringField("name", columnDefinition.getName());
columnType = columnDefinition.getType();
jsonGenerator.writeNumberField("column_type", columnType);
}
if (row[i] == null) {
jsonGenerator.writeNullField("value");
} else {
jsonGenerator.writeObjectField("value", MySQLCDCUtils.getWritableObject(columnType, row[i]));
}
jsonGenerator.writeEndObject();
i = includedColumns.nextSetBit(i + 1);
}
jsonGenerator.writeEndArray();
}
}
复制代码
- DeleteRowsWriter继承了AbstractBinlogTableEventWriter,其writeEvent方法写入DeleteRowsEventInfo
小结
AbstractBinlogTableEventWriter继承了AbstractBinlogEventWriter,其泛型基类为BinlogTableEventInfo,它有四个子类,分别是DDLEventWriter、InsertRowsWriter、UpdateRowsWriter、DeleteRowsWriter
评论前必须登录!
注册