java Kafka连接源连接器内存不足问题
我已经编写了一个定制的Kafka Connect插件来读取文件并将avro消息发布到主题。我正在使用LineNumberReader
读取文件中的每一行,并将记录与模式相结合,然后发布到主题。只有在poll方法中返回SourceRecord
列表时,记录才会被发布。有时,如果它是一个大文件,我会遇到内存不足的问题。我研究过这个问题,读过很多博客和问题,但似乎没有什么对我有用。我在SourceTask
类中读到了commit()
方法,但不是很清楚。有人遇到过类似的问题吗?下面的代码片段:
@Override
public List<SourceRecord> poll() throws InterruptedException {
final List<SourceRecord> results = new ArrayList<>();
//Get reader based on certain params:
FileReader reader = myReader;
while (reader.hasNext()) {
results.add(getSourceRecord(file, reader.currentOffset(), reader.next()));
}
return results;
}
private getSourceRecord convert(String fileName, Offset offset, Struct struct) {
return new SourceRecord(
new HashMap<String, Object>() {
{
put("path", fileName);
}
},
Collections.singletonMap("offset", offset.getRecordOffset()),
config.getWriteTopic(),
struct.schema(),
struct
);
}
public boolean hasNext() {
if (currentLine != null) {
return true;
} else if (finished) {
return false;
} else {
try {
while (true) {
String line = reader.readLine();
offset.setOffset(reader.getLineNumber());
if (line == null) {
finished = true;
return false;
}
currentLine = line;
//Removed for brevity
}
}
}
共 (0) 个答案