I have a data flow to insert about 100M records into mongoDB, I'm using Beam to run on Flink cluster to deal with the the concurrency, for this job I've enable 30 task managers in the beam cluster. I'm expecting this process to complete within 24 hours based on historical experience. but when I start the job and monitor the process, it does not behave as expected. the mongoDB insert is extramly slow and the mongoDB cluster's load has increased dramatically. the job need to take about 1 year to complete, that's totally not acceptable.Finally I find out that the index fo the target collection in mongoDB is missing, which cause every db insert result in a disk scan, this is getting worse and worse when you inserted more and more data. because everytime when I do a db insert, the mondoDB is need to scan exist data in the collection to check if there is exist record to peform update otherwise perform a fresh insert. after I create the index for the target collection, the mongoDB performance boost up quickly and the server load dropped off dramatically, currently the job is running well and it will be complete within 12 hours.

mongodb-logo

MongoDB has veriouse types of indexes, I'm using multikey indexes in my case because I will need to search by multiple key first to update before I do an insert. there are many good reason to do such upsert types of db operation, one good benifit is that it could make the db operation atomic and idempotent. atomic and idempotent are very important in big data processing system, this could enable the flow to be able to re-run without any problem.

There is a framework named morphia to help us to do this work very easily. we could create the JPA like entity class to manage the mongoDB index in the code.

@Entity(noClassnameStored = true)
@Indexes(
    {
        @Index(fields = {
            @Field("id"),
            @Field("key1"),
            @Field("key2"),
            @Field("key3")
        }, options = @IndexOptions(unique = true)),
        @Index(fields = {
            @Field("key1"),
            @Field("key2"),
            @Field("key3"),
            @Field(value = "v1", type = IndexType.DESC)
        }),
        @Index(fields = {
            @Field("key1"),
            @Field("key2"),
            @Field("key3"),
            @Field(value = "v2", type = IndexType.DESC)
        })
        @Index(fields = {
            @Field(value = "lastUpdated", type = IndexType.DESC)
        })
    }
)
public class XXXEntity{}

in XXXEntity, I'm defining 4 indexes , the 1st index is used for upsert lookup the 2nd and 3rd indexes are used for online query and it could order by v1 or v2, if I only need key1, key2 and key3 and v1 or v2, the search performance will be even better, because it's a covered query. to perform an atomic update in mongoDB, we could do like below, the last parameter means createIfMissing, when we set it to true, morphia will insert a new record if the given query could not find any match

UpdateOperations<XXXEntity> updateOperations = datastore
        .createUpdateOperations(XXXEntity.class)
        .setOnInsert("id", xxxPojo.getId())
        .setOnInsert("key1", xxxPojo.getKey1())
        .setOnInsert("key2", xxxPojo.getKey2())
        .setOnInsert("key3", xxxPojo.getKey3())
        .set("v1", xxxPojo.getV1())
        .set("v2", xxxPojo.getV2())
        // set update time based on write (regardless of what caller requested)
        .set("lastUpdated", LocalDateTime.now(Clock.systemUTC()));
Query<XXXEntity> query = datastore
        .createQuery("collectionName", XXXSumEntity.class)
        .field("id").equal(xxxPojo.getId())
        .field("key1").equal(xxxPojo.getKey1())
        .field("key2").equal(xxxPojo.getKey2())
        .field("key3").equal(xxxPojo.getKey3());

UpdateResults results = datastore.update(query, updateOperations, true);        

because we have defined the index on "id,key1,key2,key3", the mongoDB do not need to do collection scan for the query, it will use the index to find out the match.

Before the index is created, I could only insert 5 to 6 record per second, most of the time is spend on the collection scan and the IO load is very heavy, after the index is created, I could insert 2263 records per seconds.