多线程数据核对实战指南:文件 vs 数据库

多线程数据核对实战指南:文件 vs 数据库

技术教程gslnedu2025-03-06 12:33:279A+A-

任务背景

曾经手里有一份 超大的数据文件,我需要解析这个文件并提取每行特定的数据,文件是无法打开的。直接上实操,首先是设计出一个流程,推敲演练,最终总结出如下4大过程

  1. 解析文件内容,提取关键数据。
  2. 从数据库中读取对应数据。
  3. 将两者进行核对。
  4. 异常处理。
  5. 将核对结果批量入库。

但问题是:文件太大,内存有限!别担心,我们可以使用用 多线程 + 文件分块 的魔法来解决!


任务目标

  1. 高效解析文件:将文件切分成块,确保每块内容完整。
  2. 多线程读取:用多线程解析文件块和数据库数据。
  3. 数据核对:通过队列实现文件数据与数据库数据的核对。
  4. 批量入库:将核对结果高效写入数据库。

方案设计

流程图

流程图是GPS导航,清晰指引数据流向

方案设计图

核心步骤

给出关键代码片段

1.文件切分

  • 目标:将大文件切分成小块,确保每块内容完整。
  • 方法
    • 按行切分,避免切割到半行数据。
    • 每块大小根据内存限制动态调整。
public List splitFile(String filePath, int blockSize) throws IOException {
    List blocks = new ArrayList<>();
    try (BufferedReader reader = new BufferedReader(new FileReader(filePath))) {
        String line;
        StringBuilder buffer = new StringBuilder();
        int currentSize = 0;
        while ((line = reader.readLine()) != null) {
            buffer.append(line).append("\n");
            currentSize += line.length();
            if (currentSize >= blockSize) {
                blocks.add(new FileBlock(buffer.toString()));
                buffer = new StringBuilder();
                currentSize = 0;
            }
        }
        if (buffer.length() > 0) {
            blocks.add(new FileBlock(buffer.toString()));
        }
    }
    return blocks;
}

2.多线程解析文件

  • 目标:用多线程解析文件块,提取关键数据。
  • 方法
    • 每个线程处理一个文件块。
    • 将解析结果放入核对队列。
public class FileParser implements Runnable {
    private String blockContent;
    private BlockingQueue queue;

    public FileParser(String blockContent, BlockingQueue queue) {
        this.blockContent = blockContent;
        this.queue = queue;
    }

    @Override
    public void run() {
        String[] lines = blockContent.split("\n");
        for (String line : lines) {
            Data data = parseLine(line); // 解析每行数据
            queue.put(data); // 放入核对队列
        }
    }

    private Data parseLine(String line) {
        // 解析逻辑
        return new Data();
    }
}

3.多线程读取数据库

  • 目标:用多线程从数据库读取数据。
  • 方法
    • 每个线程读取一部分数据。
    • 将读取结果放入核对队列。
public class DBReader implements Runnable {
    private BlockingQueue queue;
    private int startId;
    private int endId;

    public DBReader(BlockingQueue queue, int startId, int endId) {
        this.queue = queue;
        this.startId = startId;
        this.endId = endId;
    }

    @Override
    public void run() {
        List dbData = fetchDataFromDB(startId, endId); // 从数据库读取数据
        for (Data data : dbData) {
            queue.put(data); // 放入核对队列
        }
    }

    private List fetchDataFromDB(int startId, int endId) {
        // 数据库查询逻辑
        return new ArrayList<>();
    }
}

4.数据核对

  • 目标:核对文件数据和数据库数据。
  • 方法
    • 从队列中取出数据,进行比对。
    • 将核对结果放入批量入库队列。
public class DataChecker implements Runnable {
    private BlockingQueue fileQueue;
    private BlockingQueue dbQueue;
    private BlockingQueue resultQueue;

    public DataChecker(BlockingQueue fileQueue, BlockingQueue dbQueue, BlockingQueue resultQueue) {
        this.fileQueue = fileQueue;
        this.dbQueue = dbQueue;
        this.resultQueue = resultQueue;
    }

    @Override
    public void run() {
        while (true) {
            Data fileData = fileQueue.take();
            Data dbData = dbQueue.take();
            if (fileData.equals(dbData)) {
                resultQueue.put(fileData); // 核对通过,放入结果队列
            }
        }
    }
}

5.批量入库

  • 目标:将核对结果批量写入数据库。
  • 方法
    • 从结果队列中取出数据,批量插入。
public class BatchInserter implements Runnable {
    private BlockingQueue resultQueue;

    public BatchInserter(BlockingQueue resultQueue) {
        this.resultQueue = resultQueue;
    }

    @Override
    public void run() {
        List batch = new ArrayList<>();
        while (true) {
            Data data = resultQueue.take();
            batch.add(data);
            if (batch.size() >= 1000) { // 每1000条批量插入一次
                insertBatch(batch);
                batch.clear();
            }
        }
    }

    private void insertBatch(List batch) {
        // 批量插入逻辑
    }
}

异常处理四重防御

异常防御机制是防弹装甲,抵御各种意外攻击。

1.文件解析异常捕获

public class FileParser implements Runnable {
    @Override
    public void run() {
        try {
            // 解析逻辑...
        } catch (Exception e) {
            ErrorTracker.log("文件解析异常", e);
            ErrorQueue.put(new ErrorData(blockId, e)); // 记录错误块
        } finally {
            CompletionCounter.fileBlockDone(); // 完成计数器
        }
    }
}

2.数据库查询重试机制

public List fetchDataWithRetry(int page, int size) {
    int retry = 0;
    while (retry < 3) {
        try {
            return jdbcTemplate.query("SELECT ... LIMIT ?,?", page*size, size);
        } catch (SQLException e) {
            ErrorTracker.log("数据库查询异常", e);
            retry++;
            Thread.sleep(1000 * retry); // 指数退避
        }
    }
    throw new RetryFailedException("数据库查询重试失败");
}

3.核对完整性保障

public class DataChecker {
    private AtomicInteger fileCount = new AtomicInteger(0);
    private AtomicInteger dbCount = new AtomicInteger(0);

    public void run() {
        while (!isDone()) { // 双重判断
            Data fileData = fileQueue.poll(1, TimeUnit.SECONDS);
            Data dbData = dbQueue.poll(1, TimeUnit.SECONDS);
            
            if (fileData != null) fileCount.incrementAndGet();
            if (dbData != null) dbCount.incrementAndGet();
            
            // 核对逻辑...
        }
        
        // 最终校验
        if (fileCount.get() != dbCount.get()) {
            ErrorTracker.log("数据总量不匹配: 文件数据=" + fileCount + " 数据库数据=" + dbCount);
        }
    }
}

4.批量插入容错设计

public class BatchInserter {
    public void insertWithFallback(List batch) {
        try {
            jdbcTemplate.batchUpdate("INSERT...", batch);
        } catch (DataAccessException e) {
            ErrorTracker.log("批量插入失败", e);
            // 分片重试:将大分片拆成小分片
            if (batch.size() > 1) {
                insertWithFallback(batch.subList(0, batch.size()/2));
                insertWithFallback(batch.subList(batch.size()/2, batch.size()));
            } else {
                ErrorQueue.put(batch.get(0)); // 单条进入错误队列
            }
        }
    }
}

完整性保障策略

完整性校验 是质量检测仪,确保不遗漏任何数据

保障机制

实现方式

原子计数器

使用AtomicInteger统计文件/数据库数据量

双重完成检测

1. 生产者完成标记
2. 队列空状态检测

最终一致性校验

核对结束后对比文件行数与数据库记录数

错误数据重试

错误队列数据定时重新投入核对流程

水位线监控

实时监控各队列数据积压情况,动态调整线程数


性能优化技巧

智能优化策略 是涡轮增压引擎,让处理速度持续飙升

  1. 动态分块策略
// 根据系统实时状态自动调整分块大小
int dynamicBlockSize = Runtime.getRuntime().freeMemory() > 512MB ? 64MB : 16MB;
  1. 双缓冲队列设计
// 主队列 + 溢出磁盘队列(防止内存溢出)
BlockingQueue overflowQueue = new DiskBackedBlockingQueue<>(); 
  1. 智能线程池管理
// 根据任务类型动态调整线程数
ExecutorService executor = new ThreadPoolExecutor(
    corePoolSize, 
    maxPoolSize,
    60L, TimeUnit.SECONDS,
    new LinkedBlockingQueue<>(1000),
    new SmartThreadFactory() // 监控线程负载
);

最终效果

运行效果

  1. 文件切分:确保每块内容完整。
  2. 多线程解析:高效提取文件数据。
  3. 多线程读取数据库:快速获取数据库数据。
  4. 数据核对:通过队列实现高效比对。
  5. 批量入库:将核对结果高效写入数据库。

总结

通过 文件切分 + 多线程 + 队列 的方案,我们可以轻松解决了 大文件解析 和 数据核对 的难题!

这个方案像给数据处理流程装上了 防弹衣 + GPS追踪器,并且这套方案的思路对以下极端情况:

文件解析中途崩溃、数据库连接闪断、核对数据量级差异、网络波动导致插入失败,系统仍能保证:

  1. 零数据丢失
  2. 完整核对覆盖
  3. 自动恢复能力
  4. 实时状态可观测

欢迎讨论并提出建议!

点击这里复制本文地址 以上内容由朽木教程网整理呈现,请务必在转载分享时注明本文地址!如对内容有疑问,请联系我们,谢谢!
qrcode

朽木教程网 © All Rights Reserved.  蜀ICP备2024111239号-8