有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java StreamCorruptedException位于历史记录队列上,使队列无法使用

我们从chronicle队列(chronicle-queue-5.20.106,Red Hat Linux 6.10版)中得到了一个StreamCorruptedException,我们在下面粘贴了stacktrace。在此期间,完全不同的进程执行了非常高的IO/磁盘操作,我们认为这导致了队列暂停超过15秒,并导致了这种损坏

即使在重新启动后,因为队列已损坏且无法启动。唯一的方法是删除并重新开始,这意味着丢失数百万数据

请帮助解决方案或解决任何问题。谢谢

STACKTRACE

2020-11-18 09:55:38,905536 [4b54debf-f9e2-4c70-9152-f05fe840bc92] [TableStoreWriteLock] (WARN) Couldn't acquire write lock after 15000 ms for the lock file:/local/data/metadata.cq4t, overriding the lock. Lock was held by me
2020-11-18 09:55:38,905795 [4b54debf-f9e2-4c70-9152-f05fe840bc92] [TableStoreWriteLock] (WARN) Forced unlock for the lock file:/local/data/metadata.cq4t, unlocked: true net.openhft.chronicle.core.StackTrace: Forced unlock on Reader STRESSTEST01
        at net.openhft.chronicle.queue.impl.table.AbstractTSQueueLock.forceUnlockIfProcessIsDead(AbstractTSQueueLock.java:52)
        at net.openhft.chronicle.queue.impl.single.TableStoreWriteLock.lock(TableStoreWriteLock.java:70)
        at net.openhft.chronicle.queue.impl.single.StoreAppender.writingDocument(StoreAppender.java:349)
        at net.openhft.chronicle.queue.impl.single.StoreAppender.writingDocument(StoreAppender.java:325)

2020-11-18 09:55:42,364992 [] [ChronicleTxn] (ERROR) Error on commit java.lang.IllegalStateException: java.io.StreamCorruptedException: Data at 138604 overwritten? Expected: 0 was c3
        at net.openhft.chronicle.queue.impl.single.StoreAppender$StoreAppenderContext.close(StoreAppender.java:842)
        at net.openhft.chronicle.queue.impl.single.StoreAppender$StoreAppenderContext.close(StoreAppender.java:782)

重新启动时出错

java.lang.UnsupportedOperationException: Unknown_4
        at net.openhft.chronicle.wire.BinaryWire$BinaryValueIn.cantRead(BinaryWire.java:3648)
        at net.openhft.chronicle.wire.BinaryWire$BinaryValueIn.bytes(BinaryWire.java:2591)

模拟测试类

import net.openhft.chronicle.queue.ChronicleQueue;
import net.openhft.chronicle.queue.RollCycles;
import net.openhft.chronicle.wire.DocumentContext;

import java.io.File;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.IntStream;

public class SimulateStreamCorruptedException {
private static final int NO_OF_DOCUMENTS_TO_INSERT = 100_000;
private static final int NO_OF_THREADS = 50;
private String textToWrite = "This is a sample text to be written and value is ";
private String dbFolder = System.getProperty("dbFolder","/tmp/chroniclequeue");
private AtomicLong noOfDocuments = new AtomicLong();

public static void main(String[] args) throws InterruptedException {
    SimulateStreamCorruptedException simulator = new SimulateStreamCorruptedException();
    simulator.simulateError();
}

private void simulateError() throws InterruptedException {
    CountDownLatch latch = new CountDownLatch(NO_OF_THREADS);
    ScheduledExecutorService preTouchScheduler = Executors.newScheduledThreadPool(1);
    try(ChronicleQueue queue = getQueue()) {
        preTouchScheduler.scheduleAtFixedRate(() -> queue.acquireAppender().pretouch(), 0, 1, TimeUnit.SECONDS);
        IntStream.rangeClosed(1, NO_OF_THREADS).forEach(i -> startWriterThread(queue,i,latch));
        latch.await();
    } finally {
        preTouchScheduler.shutdownNow();
    }
}

private void startWriterThread(ChronicleQueue queue,int threadCount,CountDownLatch latch) {
    Runnable task = () -> {
        System.out.println("Starting the writing for Thread-"+threadCount);
        IntStream.rangeClosed(1, NO_OF_DOCUMENTS_TO_INSERT).forEach(i -> {
            try(DocumentContext dc = queue.acquireAppender().writingDocument()) {
                String text = textToWrite+(threadCount+i);
                dc.wire().write().bytes(text.getBytes());
                simulatePause();
            }
        });
        System.out.println("Completed the writing for Thread-"+threadCount);
        latch.countDown();
    };
    new Thread(task).start();
}

private void simulatePause() {
    if(noOfDocuments.incrementAndGet()%100==0) {
        try {Thread.sleep(20*1000);}
        catch (InterruptedException e) {e.printStackTrace();}
    }
}

private ChronicleQueue getQueue() {
    File folder = new File(dbFolder);
    if(!folder.exists()) folder.mkdirs();
    return ChronicleQueue.singleBuilder(folder)
            .rollCycle(RollCycles.DAILY)
            .strongAppenders(true)
            .build();
}

}


共 (1) 个答案

  1. # 1 楼答案

    如果您的应用程序可能会暂停15秒,那么在Chronicle队列端就没有可能的解决方案-您应该重新考虑您的软件的工作方式,因为Chronicle的工具是以超低延迟开发的,我们考虑的是微秒延迟,而不是秒延迟

    如果锁被强制解锁(这里就是这种情况),数据将被不可逆转地破坏

    不过,一种解决方法是增加超时时间-默认值为15000ms,但在创建队列时,可以通过使用builder#timeoutMS()指定在您的环境中工作的内容来增加超时时间