Mule Db Bulk Insert Using bulkMode

0

In this article we will see how to perform multiple SQL requests in a single bulk insert. We will enable bulk mode using bulkMode attribute. When set to true, multiple insert operations can be performed in bulk.

This will improve the performance of the applications as the number of individual query executions are reduced.

Database setup

We will insert multiple employees.

db-schema.sql:

drop table if exists `EMP`;
CREATE TABLE `EMP` (
  `ID` VARCHAR(10) NOT NULL,
  `NAME` VARCHAR(100) NOT NULL,
  PRIMARY KEY (`ID`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

applicationContext.xml:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:jdbc="http://www.springframework.org/schema/jdbc"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"       
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
		http://www.springframework.org/schema/jdbc http://www.springframework.org/schema/jdbc/spring-jdbc-3.0.xsd
		">
	<jdbc:initialize-database data-source="dataSource">
		<jdbc:script location="classpath:db-schema.sql" />
	</jdbc:initialize-database>
	
	<bean id="dataSource"
		class="org.springframework.jdbc.datasource.DriverManagerDataSource">
		<property name="driverClassName" value="com.mysql.jdbc.Driver" />
		<property name="url" value="jdbc:mysql://localhost/test" />
		<property name="username" value="root" />
		<property name="password" value="admin" />
	</bean>
</beans>

Enable bulk insert

In order to enable bulk mode, we will set bulkMode to true. We then have to submit collections of data, in our case the employees.

muleContext.xml:

<?xml version="1.0" encoding="UTF-8"?>
<mule xmlns="http://www.mulesoft.org/schema/mule/core"
      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xmlns:spring="http://www.springframework.org/schema/beans" 
      xmlns:springjdbc="http://www.springframework.org/schema/jdbc"
      xmlns:db="http://www.mulesoft.org/schema/mule/db"
      xsi:schemaLocation="
            http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
            http://www.mulesoft.org/schema/mule/db http://www.mulesoft.org/schema/mule/db/current/mule-db.xsd
                  http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-current.xsd
                  http://www.springframework.org/schema/jdbc http://www.springframework.org/schema/jdbc/spring-jdbc-3.0.xsd">
    <spring:beans>
        <spring:import resource="applicationContext.xml"/>
    </spring:beans>
    
	<flow name="insertBulk">
		<inbound-endpoint address="vm://insertBulk"
			exchange-pattern="request-response" />

		<db:insert config-ref="dbConfig" bulkMode="true">
			<db:parameterized-query>INSERT INTO EMP(ID, NAME) VALUES
				(#[payload.empId], #[payload.empName])</db:parameterized-query>
		</db:insert>
	</flow>
	
	<db:mysql-config name="dbConfig"
		url="jdbc:mysql://localhost:3306/test" password="admin">
		<db:connection-properties>
			<db:property key="user" value="root" />
		</db:connection-properties>
	</db:mysql-config>
</mule>

Emp:

package com.javarticles.mule;

public class Emp {
    private String empId;
    private String empName;
    
    public Emp(String empId, String empName) {
        this.empId = empId;
        this.empName = empName;
    }
    
    public String getEmpId() {
        return empId;
    }
    
    public String getEmpName() {
        return empName;
    }
}

We submit a list of employee objects to the inbound endpoint vm://insertBulk. After insert we verify whether the employees exist now in table.

MuleDbBulkInsertExample:

package com.javarticles.mule;

import java.util.ArrayList;
import java.util.List;

import javax.sql.DataSource;

import org.mule.api.MuleContext;
import org.mule.api.MuleException;
import org.mule.api.MuleMessage;
import org.mule.api.context.MuleContextBuilder;
import org.mule.api.context.MuleContextFactory;
import org.mule.config.DefaultMuleConfiguration;
import org.mule.config.spring.SpringXmlConfigurationBuilder;
import org.mule.context.DefaultMuleContextBuilder;
import org.mule.context.DefaultMuleContextFactory;
import org.mule.module.client.MuleClient;
import org.mule.module.db.internal.resolver.database.DbConfigResolver;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.dao.EmptyResultDataAccessException;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.datasource.init.DatabasePopulator;
import org.springframework.jdbc.datasource.init.DatabasePopulatorUtils;

public class MuleDbBulkInsertExample {
    public static void main(String[] args) throws Exception {
        DefaultMuleConfiguration dmc = new DefaultMuleConfiguration();
        dmc.setId("muleexample");
        dmc.setWorkingDirectory("/esb/mule");
        SpringXmlConfigurationBuilder configBuilder = new SpringXmlConfigurationBuilder(
                "muleContext.xml");
        MuleContextBuilder contextBuilder = new DefaultMuleContextBuilder();
        contextBuilder.setMuleConfiguration(dmc);
        MuleContextFactory contextFactory = new DefaultMuleContextFactory();
        MuleContext muleContext = contextFactory.createMuleContext(
                configBuilder, contextBuilder);
        muleContext.start();
        try {
            MuleClient muleClient = new MuleClient(muleContext);
            DataSource dataSource = (DataSource) muleContext.getRegistry().get(
                    "dataSource");
            JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);

            Emp emp1 = new Emp("01", "Joe");
            Emp emp2 = new Emp("02", "Sam");
            List<Emp> empList = new ArrayList<Emp>();
            empList.add(emp1);
            empList.add(emp2);
            insertNode(muleClient, jdbcTemplate, empList, "vm://insertBulk");
        } finally {
            muleContext.dispose();
        }
    }

    private static void insertNode(MuleClient muleClient, JdbcTemplate jt,
            List<Emp> empList, String endpointUri) throws Exception {
        System.out.println("Invoke " + endpointUri);
        MuleMessage response = muleClient.send(endpointUri, empList, null);
        System.out.println("Response: " + response.getPayloadAsString());
        verifyEmployees(jt, empList);
    }

    private static void verifyEmployees(JdbcTemplate jdbcTemplate,
            List<Emp> empList) {
        try {
            for (Emp emp : empList) {
                String name = jdbcTemplate.queryForObject(
                        "SELECT NAME FROM EMP WHERE ID = '" + emp.getEmpId() + "'",
                        String.class);
                System.out.println("Emp name " + name);
            }
        } catch (EmptyResultDataAccessException e) {
        }
    }
}

Output:

Invoke vm://insertBulk
Response: {1,1}
Emp name Joe
Emp name Sam

Download the source code

This was an example about mule Db bulk insert.

You can download the source code here: muleDbBulkInsertExample.zip

About Author

Ram’s expertise lies in test driven development and re-factoring. He is passionate about open source technologies and loves blogging on various java and open-source technologies like spring.
You can reach him at [email protected]

Comments are closed.