How do I run code after all threads have finished running?

I have a multithreaded web crawler that downloads a website and stores it in a database (which takes around 4 minutes). To make the crawling faster, I used node.js cluster module, but I have a problem, I want to iterate over to the next segment of the while loop, after all the threads have done their processes, not as soon as they start. How do I make sure all my threads are concluded and then move on?

Here is the relevant code in the main while loop:

while (indexSize !== indexSizeLimit) {
        const queueLength = queue.length;
        const numberOfThreads = Math.min(numberOfCPUs, queueLength);
        const threadAllocations = Array(numberOfThreads).fill(0);
        let queuesAllocated = 0;
        const queueChunks = [];

        function fillQueueChunks() {
          loop: while (true) {
            for (let i = 0; i < numberOfThreads; i++) {
              threadAllocations[i] += 1;
              queuesAllocated += 1;

              if (queuesAllocated === queueLength) {
                break loop;
              };
            };
          };

          let start = 0;

          for (let threadAllocation of threadAllocations) {
            const end = start + threadAllocation;

            queueChunks.push(queue.slice(start, end));

            start = end;
          };
        };

        fillQueueChunks();
        
        // Find out how to make multithreading finish, and then move on with the loop.
        if (cluster.isMaster) {
          for (let i = 0; i < numberOfThreads; i++) {
            cluster.fork();
          };
        } else {
          const chunk = queueChunks[cluster.worker.id - 1];

          await Promise.all(chunk.map(function (url) {
            return new Promise(async function (resolve) {
              const webcode = await request(url);

              if (webcode !== "Failure") {
                indexSize += 1;

                const document = new Document(url, webcode);
                const hrefs = document.hrefs();
                const hrefsQuery = Query(hrefs);
                // Also make sure it is not included in indexed webpages.
                const hrefIndividualized = hrefsQuery.individualize();

                hrefIndividualized;
                
                // Do something with hrefIndividualized in regards to maintaining a queue in the database.
                // And in adding a nextQueue which to replace the queue in code with.
                
                await document.save();
              };

              resolve("Written");
            });
          }));

          process.exit(0);
        };
      };

37 thoughts on “How do I run code after all threads have finished running?”

  1. Wrap the threading in a promise. You can check in the parent thread if there is a disconnect event, and if the amount of disconnects is equal to the number of threads, then you can resolve the promise.

    Here is what I have

    while (indexSize !== indexSizeLimit) {
            let nextQueue = [];
            const queueLength = queue.length;
            const numberOfThreads = Math.min(numberOfCPUs, queueLength);
            const threadAllocations = Array(numberOfThreads).fill(0);
            let queuesAllocated = 0;
            // queueChunks: [[{_id: ..., ...}], [...], ...]
            const queueChunks = [];
    
            function fillQueueChunks() {
              loop: while (true) {
                for (let i = 0; i < numberOfThreads; i++) {
                  threadAllocations[i] += 1;
                  queuesAllocated += 1;
    
                  if (queuesAllocated === queueLength) {
                    break loop;
                  };
                };
              };
    
              let start = 0;
    
              for (let threadAllocation of threadAllocations) {
                const end = start + threadAllocation;
    
                queueChunks.push(queue.slice(start, end));
    
                start = end;
              };
            };
    
            fillQueueChunks();
    
            await new Promise(async function (resolve) {
              if (cluster.isMaster) {
                let threadsDone = 0;
    
                for (let i = 0; i < numberOfThreads; i++) {
                  cluster.fork();
                };
    
                cluster.on("disconnect", function (_) {
                  threadsDone += 1;
    
                  if (threadsDone === numberOfThreads) {
                    resolve("Queue Processed");
                  };
                });
              } else {
                const queueJob = queueChunks[cluster.id - 1];
    
                await Promise.all(queueJob.map(function (queueItem) {
                  return new Promise(async function (resolve) {
                    const url = queueItem._id;
                    const webcode = await request(url);
    
                    if (webcode !== "Failure") {
                      const document = Document(url, webcode);
                      let hrefs = document.hrefs();
                      const hrefsQuery = Query(hrefs);
    
                      await document.save();
    
                      indexSize += 1; 
                      hrefs = hrefsQuery.individualize();
    
                      const hrefIncidences = Promise.all(hrefs.map(function (href) {
                        return new Promise(async function (resolve) {
                          const incidences = await Site.countDocuments({
                            url: href
                          });
    
                          resolve(incidences);
                        });
                      }));
    
                      hrefs = hrefs.filter(function (_, i) {
                        return hrefIncidences[i] === 0;
                      }).map(function (href) {
                        return {
                          _id: href
                        };
                      });
    
                      await Queue.insertMany(hrefs);
    
                      nextQueue = nextQueue.concat(hrefs);
                    };
    
                    await Queue.deleteOne({
                      _id: url
                    }); 
    
                    resolve("Success");
                  });
                }));
    
                process.exit(0);
              };
            });
    
            queue = nextQueue;
          };
    
    Reply

Leave a Comment