有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

多线程Java行集的多线程处理

我正在拉下一个充满数据的表,我需要处理这个问题,对每一行做一些格式化,然后推出一个RESTAPI

我使用一个PostgreSQL数据库和java实现,其思想是将所有数据向下拉,获得行数,并旋转线程以一次处理一个块

我已经建立了连接,将表拉入缓存行集中,并使用last()getRow()beforeFirst()获取行数

我试图找到一种方法来分割出一大块行集并将其交给他人处理,但我似乎看不到有任何方法可以做到这一点

有限制x和其他东西,但我想避免使用如此大的数据进行大量数据库调用

任何想法都将不胜感激

这就是我所看到的

RowSet rst = RowSetProvider.newFactory().createCachedRowSet();
rst.setUrl(url);
rst.setUsername(username);
rst.setPassword(password);

String cmd = "select * from event_log";

rst.setCommand(cmd);
rst.execute();

ResultSetMetaData rsmd = rst.getMetaData();
int columnsNumber = rsmd.getColumnCount();

rst.last();
int size = rst.getRow();
int maxPerThread = 1000;
rst.beforeFirst();

int threadsToCreate = size / maxPerThread;

for (int loopCount = 0; loopCount < threadsToCreate; loopCount++)
{
    //Create chunk
    //Create thread
    //Pass chunk into thread and start it
    //Once chunk is finished then thread and chunk are destroyed
}

共 (1) 个答案

  1. # 1 楼答案

    这是思考JDBC交互的正确方式:

    • 所有查询都像一个特殊视图:SELECT foo, bar BETWEEN a AND b AS baz FROM foo INNER JOIN whatever;-这有效地创建了一个新的临时表
    • ResultSet是一个实时交互的概念:ResultSet不是返回数据的转储。这就像FileInputStream和磁盘上的文件之间的关系:ResultSet有一些方法为您提供数据,这些数据可能是通过与数据库“live”聊天来获取的。ResultSet本身只有几个句柄,而不是实际数据,尽管它可能会进行一些缓存,但您不知道

    因此:

    • 结果集是完全不可并行的。如果您与多个线程共享一个ResultSet对象,那么您编写了一个bug,并且无法从中恢复
    • 在许多数据库中,“askforthelength”相当于运行整个查询过程,因此速度相当慢。您可能不想这样做,而且从“我想同时处理我收到的信息”的角度来看,没有真正的理由这样做。你选择了错误的方法
    • 结果集可以(并且通常,出于性能原因,应该是!)配置为“仅向前”,意思是:您可以通过调用.next()前进一行,一旦这样做,就不能返回。这大大减少了DB服务器上的负载,因为它不必准备好正确响应请求以跳回到起点

    以下是我建议你做的:

    • 您有一个“控制器”线程,该线程具有结果集并运行查询
    • 一旦查询返回,您就不知道有多少条记录。但您确实知道需要并行化多少—需要同时处理这些数据的线程数
    • 因此,答案是:以ExecutorPool的形式启动那么多线程。然后,让您的控制器拉入行(调用resultSet.next(),并通过调用所有不同的.getFoo(idxOrColName)方法将所有数据拉入java类型),将所有数据编组到单个java对象中。我建议您编写一个表示一行数据的POJO,并为每行创建一个POJO
    • 然后,您的控制器线程将获取此对象并将此对象视为“作业”

    现在,您已经将问题简化为一种基本的forkjoin样式的策略:您有一个生成作业的线程,还有一些代码可以接受单个作业并完成它。我刚刚描述了ExecutorPool和friends的设计目的

    处理器线程不能访问ResultSet对象,这一点至关重要。并行地从DB中提取行是没有意义的,因为DB不是并行的,并且不能比单个线程更快地提供此信息。在这里,您唯一可以获得的并行化胜利是以并行方式处理数据,这就是为什么如果没有更大的变化,就无法改进上述模型的原因

    如果你正在寻找剧烈的重新设计,你需要“预块”。比方说,你已经知道你有一个有一百万行的数据库,每行都有一个完全随机的ID。你还知道你有X个处理器线程,其中X是一个动态数,取决于很多因素,比如你运行的硬件有多少CPU核

    然后:

    你启动了X个线程。你告诉每个线程它的索引(因此,如果你有7个线程,一个有“索引0”,另一个有“索引1”,一直到“索引6”),以及总共有多少个线程

    然后,每个线程运行以下查询:

    SELECT * FROM jobs WHERE unid % 7 = 5;

    这是第6个作业线程将运行的查询

    这保证了每个线程运行的作业数量是相等的

    一般来说,这比以前的模型效率低,因为这很可能意味着DB仅为d进行更多的工作(运行相同的查询7倍,而不是只运行一次),任何给定的工作线程都可能在其他线程仍在运行时开始空闲,而与此相对的是,控制器将作业拉出并分发到模型中,您将不会遇到一个线程已完成而其他线程仍有大量作业剩余的情况

    注意:行集和结果集的工作方式完全相同。事实上,行集的DB版本(JdbcRowSet)是作为结果集的轻量级包装器实现的