我手头有一个小小的特殊任务,我不知道如何最好地实现一个解决方案
我有三台工作站通过InfiniBand连接到运行Ubuntu20.04 LTS的NAS,带宽为40gbps。此NAS配备一个2TB NVMe SSD作为写缓存,7个RAID0单元作为主存储器
这些工作站将向该NAS输出原始数据供以后使用,每台计算机每天将输出大约6TB的数据文件,每个文件的大小从100-300 GB不等。为了防止网络变得过于拥挤,我让他们先将数据输出到NVMe缓存,然后我计划从那里分发数据文件—每个RAID0单元并发分发一个文件,以最大化磁盘IO。例如,file1转到array0,file2转到array1,file3转到array2,依此类推
现在,我正在NAS端编写一个脚本(最好作为systemd
服务,但我可以使用nohup
)来监视缓存,并将文件发送到这些RAID阵列
这是我的想法,多亏了this post,这离我的目标非常接近
import queue, threading, os, time
import shutil
bfr_drive = '/home/test_folder' # cache
ext = ".dat" # data file extension
array = 0 # simluated array as t0-t6
fileList = [] # list of files to be moved from cache to storage
destPath = '/home/test_folder/t'
fileQueue = queue.Queue()
class ThreadedCopy:
totalFiles = 0
copyCount = 0
array = 0
lock = threading.Lock()
def __init__(self):
for file_name in os.listdir(bfr_drive):
if file_name.endswith(ext):
fileList.append(os.path.join(bfr_drive, file_name))
fileList.sort()
self.totalFiles = len(fileList)
print (str(self.totalFiles) + " files to copy.")
self.threadWorkerCopy(fileList)
def CopyWorker(self):
global array
while True:
fileName = fileQueue.get()
shutil.copy(fileName, destPath+str(array))
array += 1
if array > 6:
array = 0
fileQueue.task_done()
with self.lock:
self.copyCount += 1
percent = (self.copyCount * 100) / self.totalFiles
print (str(percent) + " percent copied.")
def threadWorkerCopy(self, fileNameList):
# global array
for i in range(4):
t = threading.Thread(target=self.CopyWorker)
t.daemon = True
t.start()
# array += 1
# if array > 6:
# array = 0
print ("current array is:" + str(array)) # output prints array0 for 4 times, did not iterate
for fileName in fileNameList:
fileQueue.put(fileName)
fileQueue.join()
ThreadedCopy()
现在,Python脚本可以成功地分发文件,但只能在for i in range(4)
中的数字之后。例如,如果我将其设置为4,那么工人将对前4个文件使用相同的路径(array0),只有这样,他们才会开始在数组中迭代到1、2、3等等
有人能指出我如何分发这些文件吗?我认为我正朝着正确的方向前进,然而,我就是不明白为什么那些工人一开始就被困在同一个目录下
编辑:我有一个早期版本的代码,路径迭代在生成过程threadWorkerCopy
。我现在让它移到实际的worker函数CopyWorker
。这个问题仍然存在
问题是您不会在工作线程中生成
array
的新值,而只是在threadWorkerCopy
中创建线程时才生成。结果将取决于系统上的实际计时。每个工作线程在读取值时都将使用
array
的值。这可能与threadWorkerCopy
增加值或之后同时进行,因此您可能会得到不同目录中的文件或所有文件都在同一目录中要为每个复制进程获取一个新的编号,必须在工作线程中增加
array
中的编号。在这种情况下,必须防止两个或多个线程同时访问array
。您可以使用另一个锁来实现这一点为了进行测试,我将目录列表替换为示例文件名的硬编码列表,并将复制替换为打印值
这会打印如下内容(在不同运行之间可能会发生变化):
注意事项:
我将行
fileQueue.task_done()
移到了CopyWorker
的末尾。否则,我不会得到所有百分比输出行,有时还会收到错误消息也许您应该在主线程结束之前等待所有工作线程的结束
我没有检查代码中是否还有其他错误
更改问题中的代码后编辑:
修改后的代码仍然存在一个问题,即工作线程仍将在
fileQueue.task_done()
之后执行一些输出,以便主线程可能在工作线程之前结束修改后的代码包含工作线程访问
array
时的竞争条件,因此该行为可能是意外的相关问题 更多 >
编程相关推荐