Home > Bigdata, Lucene, Scaling > Real-time Search on 40 billion records/month with Solr

Real-time Search on 40 billion records/month with Solr


Solr is blazing fast Search solution on top of Lucene and bring all great capabilities of Lucene with lot of other capabilities as an Enterprise Search server. Some months ago, we had a meeting with one of our client and they had below requirements.

  1. A SEO/SEM Analytics based search portal to be accessed by 10k+ users/day (will grow to 100k/day in 4-5 months)
  2. 3 countries’ data (will grow to 8 countries)
  3. 40 billion records(rows) every month (will grow to 80 billion)
  4. 6TB data in mysql
  5. Page loading time should be under 1.5sec (Not just search time)
  6. Should be very cost affording solution in terms of Hardware and Operational costs, as due to being a startup.

And they want to store these all 40 billion records (selected columns) in Solr for search. This is one time Indexing (for that Month) but should finish as quickly as possible in a max time frame of 2-3 days.

It looks challenging to me as AFAIK, I have seen/read guys to have max 8 billion records in Solr on Internet (I know there are companies those have much bigger numbers in their indexes, like twitter/github/stackoverflow)

but well, it was a challenge for me and I thought to give it a try.

so what’s the application do ? It is heavily search dependent application.

There are around 14+10 widgets (in two tabs) in UI.

User enters a keyword in UI. Based on keyword, each widget fires a ajax call to back-end in parallel. makes a call to Solr first, get the related IDs/documents and then make a query to mysql, get the relevant pre-computed/semi-computed information and pass it on to UI.

Due to large volume, we faced several challenges while indexing, search and scaling the overall system. Here, I am going to share our several design approaches to import data faster from mysql, improving the Indexing performance, Distributed Search, Doc-values(life saver), Redis and the overall system architecture.

Terminology:
As ‘Index’ word is used to represent a single monolithic index(single core) and some times to represent a collection with same type. I will use the below for this article.

  • Core/Shard: A single core (Single Process)
  • Index/Collection: Collection of Cores for a single type (category, or for e.g. from a single table) on multiple Machines/Processes

Database Import:
As a typical design, we have data in mysql and we want to index it so on our website we can show results to the end user very quickly. We tried a POC with Solr’s JDBC data import. As pretty straightforward, we defined mappings in data-config file for all 19 tables and we were good to go. It worked like a charm and our data was indexed in Solr.
But as soon as we tried it with bigger size, it was very slow. So we decided to write our own importer that will have less functionalities but can scale upto our expectations.

We wrote a Importer that reads selected column of each table from database and forward this data to Solr for Indexing.

Below is the design for Importer:
Importer have two major components one i.e. QueryExecutor that execute queries on mysql and another one is SolrIndexer:

Query Executor takes a table name and determines the max number of records (rows) available in the table. It does by doing “select max(Id, Primary key, Long) from table t”. It does not do it by using “select count(ID) from table t” as that would be misleading unless the IDs in database are sequential and there is No deletes.

Once The QueryExecutor determines the max number or rows, it fires 10 Worker threads (QueryWorker) at one time to fetch a batch of rows for e.g the first QueryWorker fetches from 1 to 2000 and the next one fetches from 2001 to 4000 and so on.

Here we do it like “select * from table t where ID>=1 to ID<=2000”. We could do it by using database’s limit function by using limit m,n but that would be slow compared to previous as here database has to scan the documents before applying the limit. Incase you wants to use limit there are some articles already available on google for this.

Since here each query is stateless other than the “from” and “to” of IDs, we can fire multiple queries concurrently.

The data from these queries are added to a Indexing Pipeline using a callback. In Indexing Pipeline, once the number of documents crosses a batch limit, it gets added as a IndexingJob (BlockingQueue based implementation). IndexingExecutor is more or less similar to the QueryExecutor where IndexingExecutor takes these IndexingJob from BlockingQueue and hand it over to IndexingWorker threads (SolrIndexer). These Indexing worker threads keep flushing data to Solr with a retry, in-case there is an error while flushing.

Indexing:

To scale indexing I applied my several learnings#7 gained from an earlier project. Here we didn’t used the EmbeddedSolrServer due to the data volume and only one time update for that month. We need to do a bulk indexing. We could use Hadoop to do this and run multiple Indexing jobs from multiple machines but that would complicate the problem and will not help much as later we need to merge these small indices to into a small number, so we need to query only small number of shards.

All tables – Same Index:
In this approach, we had a single distributed Index for all tables but with month_tablename_pk as identifier with extra fields for dbname,month, tablename and primary key value (ID). This works good but here we can not parallelize the import process for each table and while querying it needs to hit all shards (200+) from all Solr servers.

Index per table:
The final approach for our choice was that we create one Index for each table and whenever we have to query we can directly query this Index. At the same time we can run indexing for each table separately (if system resources allow)

Index per table – How many records/shard :
As it advised on forums/mailing list of Solr, we changed the Indexer to keep 100 million records on each shard and decided to run. We have a very large table that have 12 billion records, and we didn’t realize that this would create 120 shards before we did this. Now we had 120 shards and made a query on this, it was very-2 slow as now the Search head has to aggregate the response from all shards.

we tried to think what can be done and decided to merge these shards using Lucene’s IndexMergeTool. In our experience it was taking around 2.5minutes for each 1GB index but was slower when there are bigger indices. We had close to 280GB data for this table so this option was not viable and we had to move on.

In next indexing run, we configured to have 1.5 billion records in one shard so we wll have 8 shards but unfortunately it was also not faster. After this we configured to have 750 million in each shard so now we have 15-16 shards. But this configuration is not a GlobalPolicy as there are tables those have total 1 billion records and if we go by this for them, then their performance will be slow (Actually we got to see this when we tried with 1.5billion documents/shard)

Search:

Doc-values (life-saver): Field Cache OOM
This index data was available in Solr servers and was working fine but in every couple of days (2-3) there was a OutOfMemory dump gets generated and either Solr servers’ process used to become irresponsive or it gets killed. We was not sure for the reason so we decided to analyze with eclipse MAT (Memory Analyzer) and as we could see the most of memory is taken up by HashMap. It looks to be suspect of Cache’s so we decreased all the number of elements in cache significantly. but still the problem didn’t solved.

There were multiple threads around this issue and as we could see the number of elements of FieldCache gets increased and there were no options available to control it. We searched this issue on google a lot and at thread came to know about the DocValues. We started exploring DocValues and it looks to be solution of our issue. Before going ahead we tried a POC with “David Arthur”‘s article on DocValues http://searchhub.org/2013/04/02/fun-with-docvalues-in-solr-4-2/ and it looked promising to us.

Luckily we were already on Solr 4.3.1 and it was introduced in Solr 4.2. As we expected (Hoped), we didn’t have a OutOfMemory after that. We monitored our systems regularly till few weeks and till date we didn’t have this issue again. Thanks everyone who worked on this feature to bring it in Lucene/Solr.

Optimization:
How we optimize the solr cores

As a part of the Index generation, generally the number of segments on each core range from 32 to 600 (due to the merge factor). At one instance for 121GB index there were 2149 number of segments. Since there were these number of segments, the search performance was awfully slow (The first query used to take more than 10 minutes)

As we know, the less number of segments would increase the search performance drastically but if we decrease the mergefactor (and maxSegmentsInMB) it would decrease the Indexing Performance. We decided an approach to overcome this. Once the data is indexed (and committed), we run an optimizer on Indexes. This Optimizer fetches all the cores available and sort them ascending by number of segments and index size. It initially approaches the cores those are small, so it will finish quickly.The optimizer apply optimization on each shard in an intelligent manner partially inspired from Hathitrust’s blog.

When we call optimization on a shard with ‘maxSegments’, under the hood, it calls merge where all the segments get started to merge till the ‘maxSegments’ parameter is met. In this duration it may take 3 times of index size on disk with a very heavy Disk IO and heap memory usage. To mitigate (avoid), optimizer run optimization in a smooth manner where it sets the maxSegments to a 90% of current number of segments and keep doing till it comes to a number defined as per index size. for e.g. if the current number of segments are 600, and max number of allowed segments is 25, then it will set the maxSegments to 540 and run the optimization, once Index is optimized, it will again set the maxSegments to 486 and it will keep doing it till the number of segments is not under 25.

Beside this since we have 200+ shards(cores) on multiple Solr’s process, this approach takes a lot of time(but keep your JVM Memory usage low) so to take advantage we run optimization in parallel on 3 Solr servers (different process) at one time.

Cache Populater – Warmup of Kernel/Solr’s caches
Since we have data distributed (partitioned) into Multiple shards and Multiple Solr servers, we can not run a local warmup query using QuerySenderListener. We also can’t run the queries by using production logs from earlier month as their query parameters (IDs) would be different. To achieve this we take advantage of php-app who has all the business logic defined and provides a simple interface to fire query.

We take all available (most popular) keywords/domains and fire all the queries those needs to be executed before opening this data to public. This helps us in two ways, one is the warmup of kernel cache and another is populating of Solr cache as now all queries that hits Solr can be served from QueryResultCache.

Search Head:

To do a distribute search on Solr, we need to have one core as Master shard and its job is to collect the response from all the shards those are in shards parameter and later it aggregate them, sort them by score, apply limiting based on number of records to be fetch(rows=number) & sends back the requests to shards from whom it has to retrieve the fields. So as we see there are 2 requests for each search request.

To scale it, we decided to keep a Node for each country (or multiple replicas for load balancing) that just do this and don’t have any data. It is inspired from the github’s elasticsearch deployment architecture. In order to get more throughput, we need to properly tune the settings like number of threads, connection timeout, threads wait period etc. for HttpShardHandler in solrconfig.xml

for e.g. it is like the below
curl ‘http://localhost:8983/solr/select?shards=localhost:8983/solr,localhost:7574/solr&indent=true&q=ipod+solr&#8217;

Searcher – Search webapp (Tomcat):
Searcher is a web application inspired from Twitter’s blender who Abstracts out all the backend(Solr/Redis). It queries multiple countries’ Index data based on request type. All the cluster mappings like all the shards for a particular table/country and Search head server for it. It server as proxy cum application layer.

In proxy mode, based on the request type, it forward requests to a Specific Country’s search head and that internally collect and aggregate the response from backend cluster(Multiple shards) and pass it back to Searcher.

In application mode, Searcher queries multiple Solr servers (Behind Search head) for multiple months data and aggregate them itself before passing to Php-app. Before querying Solr servers, it check Redis first and if the response exist (due to same query executed earlier) it will skip querying of Solr.

Redis:
Redis is an in-memory but persistent on disk database. It is also an advanced key-value store and provides a very-very fast PUT and GET operation.
In our case to show some trends about a specific keyword/domain, we need to query multiple months data in Real time. That means we need to hit the all months data in backend and one place it has to be aggregated. The problem with this approach is this approach does not scale at large, when there multiple users are firing the queries again and again for the same domain/keyword, it has to do the same computation.

Since in our case, data does not change for the whole month, we added Redis as a cache layer in between so if results are already there in Redis, we can directly get the response from there without hitting the search backend. It also helps us in offloading some pressure from the Solr servers. Since Redis keeps everything in Memory, if it is not used in a proper way, it could lead to take all of your system RAM, To avoid it we use Redis hashes and also set the “maxmemory” as 4GB with LRU policy (maxmemory-policy allkeys-lru)

Architecture:
As I explained earlier, we have typical technology stack consisting of LNMP (Linux, Nginx, Mysql, Php) with Apache Solr, Tomcat, Redis and HAProxy.

HAProxy is used as load balancer who load balance the incoming requests and redirect them to a Nginx server, which communicate with PHP-FPM (FastCGI Process Manager) over a socket. The php-app is the main driver who queries Searcher and then mysql to generate a response for the requests and send it client where they are rendered using jQuery-Ajax.

Technology Stack:

PHP5, Mysql, Redis, Tomcat, Solr, Zookeeper, Nginx, HAProxy

Comparison between SSD and SAS in our case

Initially we used SSD (Solid State Drive) where both mysql and Solr was on it and we was able to import around 20millions records/minute but it was getting beyond the budget so switched to SAS 15k memory due to being very cheap compare to SSD but our throughput numbers also got reduced by 50% to 11 million records/minute. we are still using SAS 15k due to being cost-effective but have plans to upgrade to SSD later.

Key learnings:
1. Design the system with proper Indexing and Search level data Partition.
2. Keep running optimization on shards everyday (if they change frequently) but don’t do it very frequently as it will discard the existing caches
3. Run warmup queries just optimization before opening it for use.
4. Co-locate the index data, so in Distributed Search aggregation does not impact much due to Network.
5. Always use Filter Query (fq) as much as possible as that will improve the performance due to Filter cache.
6. Tune the Indexing Parameters like ramBufferSize, MergeFactor, HttpShardHandler’s various configurations.
7. Use hash in Redis to minimize the memory usage.
8. Keep an eye on Garbage collection (GC) specially on Full GC.
9. Monitor your system/cluster health.
10.Keep your JVM heap size to lower value (proportional to machine’s RAM) as bigger heap will lead to frequents GC and with leaving enough RAM for kernel.
11. Don’t use SoftCommit if you don’t need it. If you do then, do it more frequently(reasonable) as under the hood it uses a hashmap and would lead to a bigger structure, making slow your restart.
12. Tune the caching configurations properly as that can improve the response time drastically.
13. Don’t optimize a Index with large number of segments into a very small number of segments in one glance as while optimization(merging) it could take 3 times of Index size on disk while merging with very high heap usage and disk IO.

References:

 
 
I work as a Freelance Big-data/Search Consultant in Hyderabad, India. If you liked this post, you can follow me on twitter or subscribe to my feed to get notified of new posts. If you think, I could help you and your company and you’d like to work with me please contact me directly.
 
 

Advertisements
Categories: Bigdata, Lucene, Scaling
  1. Karthik
    December 22, 2014 at 9:28 pm

    Awesome Rahul , great stuff !!! .Once again you proved through hard work and perseverance we can build great stuff with minimum things. keep up the good work .

  2. Ashu
    July 18, 2015 at 10:23 am

    I have asked to design solr schema for application in which product is selling by multiple store and each store is having its own quantity and price. Keeping in view this requirement I can think below schema design.

    1.Demoralize data for store and product table. In this case there is lot of redundancy and if 7m product sold by 100 stores then it will be 100X7M record. I don’t think good design because it will have lot of redundancy in each document and 7M documents will become 700M documents in Solr. Apart from this On Product listing page I need to show product along with minimum price exist among the stores. To get this information I need to get group store products and pass to group query as price ascending so that minimum price document will be retrieved per group. As per Google Group performance with facet is bad in solr.

    2.If I kept product document separately from store documents then I need to execute more than a query to get products attribute and store product having minimum price. This may be performance issue.

    3.Solr has introduced new functionality indexing block. As price and quantity will change frequently then it is difficult to modify these fields in block because whole block needs to re-indexed every time.

    Please suggest what is the best practice to design schema in this scenario?

  1. No trackbacks yet.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

%d bloggers like this: