Use Bull with NestJS




This is the third part of File uploading example, using NestJS and Angular 11 - Part 1 and Part 02. In this article, I will use Bull for uploading files. 


What is Bull?

  • Bull is a high-performance queue based on Redis for Node-based applications.
  • Queues are very useful when we are developing microservices.
  • It will make smooth communication between microservices communication.
  • It is very useful in a situation like a network failure.
  • You may read the official documentation for more details.

Use Bull with NestJS

  • NestJS provides @nestjs/bull package as a wrapper on top of the Bull.
  • You need to install Redis server before you get into the Bull. Because Bull is based on Redis.

Install Redis

  • Installing Redis may differ according to your operating system.
  • I am not going to explain this here. 
  • You can install a redis server  locally and once you start the server, it may look like this


Now your Redis server is up and running. If you can install a Redis client, then it will be very easy to see statuses and the queue processors. Now let's install the bull package for NestJS 

Install Bull on Nest

  • Go to the automobile-backend application
  • Hit following commands to install Bull wrapper for NestJS

npm install --save @nestjs/bull bull
npm install --save-dev @types/bull


  • It will look like this

How to use Bull?


I am going to handle the file uploading process using a queue. Once the backend server receives a file, this file will be put in a queue. Then the queue will be processed one by one. Bull provides many functions to handle these kinds of problems. Let's configure our application one by one. Here is the flow.

  • Import Bull module to the root
  • Register a queue
  • Add files into the queue 
  • Create a service to process queue


Import Bull module to the root


  • Add bull module to the automobile.module.ts file as follows.



        import { Module } from '@nestjs/common';
        import { AutomobileController } from './automobile.controller';
        import { AutomobileService } from './automobile.service';
        import { TypeOrmModule } from '@nestjs/typeorm';
        import { Vehicle } from './vehicle';
        import { BullModule } from '@nestjs/bull';

        @Module({
        controllers: [AutomobileController],
        providers: [AutomobileService],
        imports: [
        TypeOrmModule.forFeature([Vehicle]),
        BullModule.forRoot({
        redis:{
        host:'localhost',
        port:6379
        },
        }),
        BullModule.registerQueue({
        name: 'upload-queue'
        })]
        })
        export class AutomobileModule {}



  • Now you can see, the BullModule.forRoot. Here we can set the configuration details of the locally running Redis server that we have already created. 
  • My Redis server is running on port 6379.
  • Then I registered a queue called "upload-queue" to handle uploaded files.
  • This registered queue will be used to add data into the queue and in the processor as well.

Now we need some refactoring of our backend service. Just remember I am still working on the existing project that I have completed in the last article. Now I am going to refactor automobile.controller.ts and add a new processor for file processing. 


Refactoring controller to serve queue operations


I am not going to use the existing automobile.service .ts file. Instead, I create a new file for CSV file operations. Check this controller.



        import { Controller, Post, UploadedFile, UseInterceptors } from '@nestjs/common';
        import { FileInterceptor } from '@nestjs/platform-express';
       import { extname } from 'path';
        import { diskStorage } from 'multer';
        import { InjectQueue } from '@nestjs/bull';
        import { Queue } from 'bull';

        @Controller('/api/vehicles')
        export class AutomobileController {

        constructor(@InjectQueue('upload-queue') private fileQueue: Queue){}

        @Post('/upload')
        @UseInterceptors(FileInterceptor("csv", {
        storage: diskStorage({
        destination: './csv',
        filename: (req, file, cb) => {
        const randomName = Array(32).fill(null).map(() => (Math.round(Math.random() * 16)).toString(16)).join('')
        cb(null, `${randomName}${extname(file.originalname)}`)
        }
        })
        }))
        async uploadCsv(@UploadedFile() file){
        this.fileQueue.add('csv', {file: file})
      }
        }




  • As you can see I removed injection for the automobile.service.ts and added a new injection for the queue in the constructor.
  • The queue name is "upload-queue" as I registered in the automobile.module.ts file.
  • The received file will be added to the queue.

Adding a Processor


The processor will handle the operations of the data added to the queue.  A queue can have one Processor and many processes.  @Processor and @Process decorators can be used to indicate the processor and processes.

  • Create a new directory inside the vehicle module.
  • Create a new file (upload.processor.ts) in the created directory.
  • The data operations can be done in the relevant process.



        import { Process, Processor } from '@nestjs/bull';
        import { InjectRepository } from '@nestjs/typeorm';
        import { Job } from 'bull';
        import { Vehicle } from 'src/automobile/vehicle';
        import { Repository } from 'typeorm';

        @Processor('upload-queue')
        export class UploadProcessor {

        constructor(
        @InjectRepository(Vehicle) private vehicleRepository: Repository<Vehicle>) { }

        @Process('csv')
        async handleCsvFiles(job: Job) {

        const csv = require('csvtojson')
        const csvFilePath = process.cwd() + '/' + job.data.file.path;
        const vehicleArray = await csv().fromFile(csvFilePath);
        await this.vehicleRepository.save(vehicleArray);
        }
        }



  • The registered queue name will be used as the Processor name (upload-queue)
  • The process name is that name that we used in the controller when adding data into the queue.
  • The business logic goes inside the Process.

If you look carefully, you will be able to see I am not returning anything. All functions are void. Previously I return the list of data. It is not suitable when you are dealing with queues. To handle this we can use a socket service (socket.io, socket cluster) for notifications. I am not going to do this in this article. I will do this in a future article.

I make one change in the client application. I refactor the submit() method in the upload.component.ts file.



        submit() {
        const formData = new FormData();
        formData.append('csv', this.myForm.get('fileSource')!.value);

        this.http.post('http://localhost:3000/api/vehicles/upload', formData)
        .subscribe((data) => {
        alert("Data imported successfully")
        })
       }




Now everything will be right. You can run both server and client then check how it is working. You can find the code from below Git repository. Please visit the below link to find the next article.


Part 04: GraphQL and PostGraphile with NestJS







Use Bull with NestJS Use Bull with NestJS Reviewed by Ravi Yasas on 11:32 AM Rating: 5

No comments:

Powered by Blogger.