网站首页 > 精选教程 正文
Flink CDC在配置mysql时,可以指定几种方式来选择位点: INITIAL、EARLIEST_OFFSET、LATEST_OFFSET、SPECIFIC_OFFSETS、TIMESTAMP、SNAPSHOT。
INITIAL: 全量与增量
EARLIEST_OFFSET:最早位点
LATEST_OFFSET:最近的位点
SPECIFIC_OFFSETS:指定位点
TIMESTAMP:指定时间点
SNAPSHOT:全量
源码分析
设置该类型的cdc同步任务,机制会检查当前存在的binlog文件列表,因为每个文件是按顺序排列,同时对应的时间也是有顺序的,最终是通过二分法进行查找。
public static void main(String[] args) {
MySqlSource.<Data>builder()
.startupOptions(StartupOptions.timestamp(System.currentTimeMillis()))
.build();
}
当设置了cdc任务的类型为TIMESTAMP时,会通过以下的方法来获取对应的binlogfile,具体查看类 BinlogOffsetUtils.java
public static BinlogOffset initializeEffectiveOffset(
BinlogOffset offset, MySqlConnection connection) {
BinlogOffsetKind offsetKind = offset.getOffsetKind();
switch (offsetKind) {
case EARLIEST:
return BinlogOffset.ofBinlogFilePosition("", 0);
case TIMESTAMP:
// 遍历当前所有存在的binlogfile文件,取每个文件的文件头来判断时间
// 所以一定是当前整个文件的数据,也是按binlogfile文件名来读取数据的
return DebeziumUtils.findBinlogOffset(offset.getTimestampSec() * 1000, connection);
case LATEST:
return DebeziumUtils.currentBinlogOffset(connection);
default:
return offset;
}
}
public static BinlogOffset findBinlogOffset(long targetMs, MySqlConnection connection) {
MySqlConnection.MySqlConnectionConfiguration config = connection.connectionConfig();
BinaryLogClient client =
new BinaryLogClient(
config.hostname(), config.port(), config.username(), config.password());
List<String> binlogFiles = new ArrayList<>();
JdbcConnection.ResultSetConsumer rsc =
rs -> {
while (rs.next()) {
String fileName = rs.getString(1);
long fileSize = rs.getLong(2);
if (fileSize > 0) {
binlogFiles.add(fileName);
}
}
};
try {
// 获取mysql系统内存在的binlog
connection.query("SHOW BINARY LOGS", rsc);
LOG.info("Total search binlog: {}", binlogFiles);
if (binlogFiles.isEmpty()) {
return BinlogOffset.ofBinlogFilePosition("", 0);
}
// 搜索最接近的binlog文件
String binlogName = searchBinlogName(client, targetMs, binlogFiles);
return BinlogOffset.ofBinlogFilePosition(binlogName, 0);
} catch (Exception e) {
throw new FlinkRuntimeException(e);
}
}
private static String searchBinlogName(
BinaryLogClient client, long targetMs, List<String> binlogFiles)
throws IOException, InterruptedException {
int startIdx = 0;
int endIdx = binlogFiles.size() - 1;
// 因为binlog文件名是递增的,同时时间也是递增的
// 以二分法进行查找
while (startIdx <= endIdx) {
int mid = startIdx + (endIdx - startIdx) / 2;
long midTs = getBinlogTimestamp(client, binlogFiles.get(mid));
if (midTs < targetMs) {
startIdx = mid + 1;
} else if (targetMs < midTs) {
endIdx = mid - 1;
} else {
return binlogFiles.get(mid);
}
}
return endIdx < 0 ? binlogFiles.get(0) : binlogFiles.get(endIdx);
}
从以上的逻辑可以看到,当指定了timestamp时,会从最接近的那个binlog文件开始从头开始读取数据,那会不会多读很多数据呢?答案是否定的,当从找到的binlog文件中读取数据后,真正在处理的时候,会再判断一次当前的事件是否在指定的时间范围内,代码在 MySqlBinlogSplitReadTask.java
protected void handleEvent(
MySqlPartition partition, MySqlOffsetContext offsetContext, Event event) {
// 当从binlog读取数据后,进行一次过滤
if (!eventFilter.test(event)) {
return;
}
super.handleEvent(partition, offsetContext, event);
// check do we need to stop for read binlog for snapshot split.
if (isBoundedRead()) {
final BinlogOffset currentBinlogOffset =
RecordUtils.getBinlogPosition(offsetContext.getOffset());
// reach the high watermark, the binlog reader should finished
if (currentBinlogOffset.isAtOrAfter(binlogSplit.getEndingOffset())) {
// send binlog end event
try {
signalEventDispatcher.dispatchWatermarkEvent(
binlogSplit,
currentBinlogOffset,
SignalEventDispatcher.WatermarkKind.BINLOG_END);
} catch (InterruptedException e) {
LOG.error("Send signal event error.", e);
errorHandler.setProducerThrowable(
new DebeziumException("Error processing binlog signal event", e));
}
// tell reader the binlog task finished
((StoppableChangeEventSourceContext) context).stopChangeEventSource();
}
}
}
eventFilter由BinlogSplitReader在创建MySqlBinlogSplitReadTask时处理。
private Predicate<Event> createEventFilter(BinlogOffset startingOffset) {
// 当是TIMESTAMP类型时,需要将小于指定时间的事件进行移除
if (BinlogOffsetKind.TIMESTAMP.equals(startingOffset.getOffsetKind())) {
long startTimestampSec = startingOffset.getTimestampSec();
return event ->
EventType.HEARTBEAT.equals(event.getHeader().getEventType())
|| event.getHeader().getTimestamp() >= startTimestampSec * 1000;
}
return event -> true;
}
猜你喜欢
- 2024-12-29 java的时间戳的长度为什么是固定的?它是如何做到的
- 2024-12-29 在java中进行日期时间比较的4种方法
- 2024-12-29 如何使用Java读取Excel文件到List>格式
- 2024-12-29 零基础,自学JAVA编程需要多长时间才能学完?
- 2024-12-29 从零开始学习java一般需要多长时间?
- 2024-12-29 Java学习需要多长时间?
- 2024-12-29 工作5年里,第一次用Java 读取 Elasticsearch 的数据
- 2024-12-29 Java系统开发从入门到精通第四讲(文字版)
- 2024-12-29 python获取时间戳(10位和13位)
- 2024-12-29 Java系统学习需要多长时间?
你 发表评论:
欢迎- 最近发表
- 标签列表
-
- nginx反向代理 (57)
- nginx日志 (56)
- nginx限制ip访问 (62)
- mac安装nginx (55)
- java和mysql (59)
- java中final (62)
- win10安装java (72)
- java启动参数 (64)
- java链表反转 (64)
- 字符串反转java (72)
- java逻辑运算符 (59)
- java 请求url (65)
- java信号量 (57)
- java定义枚举 (59)
- java字符串压缩 (56)
- java中的反射 (59)
- java 三维数组 (55)
- java插入排序 (68)
- java线程的状态 (62)
- java异步调用 (55)
- java中的异常处理 (62)
- java锁机制 (54)
- java静态内部类 (55)
- java怎么添加图片 (60)
- java 权限框架 (55)
本文暂时没有评论,来添加一个吧(●'◡'●)