Friday 24 May 2019

Reindexing StormCrawler's URL Status Index

StormCrawler holds the frontier and the page fetch status in the "status" index. It's a sharded Elasticsearch index (in the most common setup) and every document contains the page URL, the fetch status, the time when the URL should be fetched, and further information as metadata. The index uses the SHA-256 of the URL as id and is usually sharded. How URLs are distributed among shards is directly related to crawler politeness. A crawler needs to limit the load to a particular server. If all URLs of the same host or domain are kept in the same shard, the spouts (we run one spout per shard) can already control that only a limited number of URLs per host or domain is emitted into the topology during a certain time interval. A controlled flow of URLs supports the fetcher bolt which ultimatively enforces politeness and guarantees a minimum delay between successive requests sent to the same server.

Motivation

The status index grows over time. Sooner or later, unless you decide to delete it and start the crawler from scratch, you hit one of the possible reasons why to reindex it:
  1. change the number of shards (to allow the index growing)
  2. fix domain names acting as sharding key (see the discussion in #684)
  3. strip metadata to save storage space
  4. apply current URL filters and normalization rules to all URLs in the status index
  5. remove the document type – previously the documents in the "status" index were of type "status". In Elasticsearch 7.0 document types are deprecated and consequently SC 1.14 also removed the document types in all indexes (status, metrics, content)
  6. merge/append indexes
  7. pull or push to another ES cluster
  8. ... (there are some more good reasons, for sure)
In my case it was a combination of points 1, 2, 3 and 5 applied to a status index holding 250 million URLs of Common Crawl's news crawler. At the same time, the server has been upgraded and I had to move the index to a new machine (point 7). Enough reasons to try the reindexing topology which has been introduced in StormCrawler 1.14 (#688). Of course, some of the points (but not all) could also be achieved by Elasticsearch standard tools.

Configure the Reindex Topology

There are multiple scenarios possible: both Elasticsearch indexes are on the same machine or cluster (using index aliases), or you pull or push from/to a remote index. My decision was to run the reindex topology on the machine hosting the new index and pull the data from the old index. To easily get around the Elasticsearch authentication, I've simply forwarded the remote ES port to the local machine running ssh -R 9201:localhost:9200 <ip_new> -fN & on the old machine. The old index is now visible on port 9201 on the new machine.

In any case, you need to first upgrade Elasticsearch on the target and source machine/cluster to version 7.0 required by StromCrawler 1.14. Or in other words, both Elasticsearch indexes must be compatible with the Elasticsearch client used by StormCrawler running the reindexing topology.

The topology is easily configured as Storm Flux:

    name: "reindexer"
    
    includes:
      - resource: true
        file: "/crawler-default.yaml"
        override: false
    
      - resource: false
        file: "crawler-conf.yaml"
        override: false   # let the properties defined in the flux take precedence
    
      - resource: false
        file: "es-conf.yaml"
        override: false
    
    config:
      # topology settings
      topology.max.spout.pending: 600
      topology.workers: 1
      # old index (remote, forwarded to port 9201)
      es.status.addresses: "http://localhost:9201"
      es.status.index.name: "status"
      es.status.routing.fieldname: "metadata.hostname"
      es.status.concurrentRequests: 1
      # new index (local)
      es.status2.addresses: "localhost"
      es.status2.index.name: "status"
      es.status2.routing: true
      es.status2.routing.fieldname: "metadata.hostname"
      es.status2.bulkActions: 500
      es.status2.flushInterval: "1s"
      es.status2.concurrentRequests: 1
      es.status2.settings:
        cluster.name: "elasticsearch"
    
    spouts:
    #  - id: "filter"
    #    className: "com.digitalpebble.stormcrawler.bolt.URLFilterBolt"
    #    parallelism: 10
      - id: "spout"
        className: "com.digitalpebble.stormcrawler.elasticsearch.persistence.ScrollSpout"
        parallelism: 10   # must be equal to number of shards in the old index
    
    bolts:
      - id: "status"
        className: "com.digitalpebble.stormcrawler.elasticsearch.persistence.StatusUpdaterBolt"
        parallelism: 4
        constructorArgs:
          - "status2"
    
    streams:
      - from: "spout"
    #   to: "filter"
    #   grouping:
    #     type: FIELDS
    #     args: ["url"]
    #     streamId: "status"
    # - from: "filter"
        to: "status"
        grouping:
          streamId: "status"
          type: CUSTOM
          customClass:
            className: "com.digitalpebble.stormcrawler.util.URLStreamGrouping"
            constructorArgs:
              - "byDomain"

The parts to plug in an URL filter bolt are commented out. Of course, you should review all settings carefully so that they fit your situation. One important point is the number of spouts which must be equal to the number of shards in the old index.

After some failed trials I've decided to choose defensive performance settings:
  • a not too high number of tuples pending in the topology: topology.max.spout.pending: 600. With 10 spouts there are max. 6000 pending tuples allowed.
  • four status bolts without concurrent requests (es.status2.concurrentRequests: 1).

To speed up the reindexing you might also change the refresh interval of the new Elasticsearch index by calling:
    curl -H Content-Type:application/json -XPUT 'http://localhost:9200/status/_settings' \
        --data '{"index" : {"refresh_interval" : -1 }}'

Don't forget to set it back to the default if the reindexing is done:
    curl ... --data '{"index" : {"refresh_interval" : null }}'

Running the Topology

The reindex topology is started as Flux via:
    java -cp crawler-1.14.jar org.apache.storm.flux.Flux --remote reindex-flux.yaml

Depending on the size of your index it might run longer. I've achieved 6,000 documents reindexed per second which means that the entire 250 million docs are reindexed after 12 hours.

Verifying the New Status Index

The topology has finished now let's check the new index and whether everything has been reindexed. Let's first get the metrics of the old status index (on port 9201):
%> curl -H Content-Type:application/json -XGET 'http://localhost:9201/_cat/indices/status?v'
health status index  uuid                   pri rep docs.count docs.deleted store.size pri.store.size
yellow open   status xFN4EGHYRaGw8hi4qxMWrg  10   1  250305363      6038664      101gb          101gb
and compare it with those of the new status index:
%> curl -H Content-Type:application/json -XGET 'http://localhost:9200/_cat/indices/status?v'
health status index  uuid                   pri rep docs.count docs.deleted store.size pri.store.size
yellow open   status ZFNT8FovT4eTl3140iOUvw  16   1  250278257         5938     87.2gb         87.2gb
Great! The new index now has 16 shards and 10 GB storage have been saved by stripping unneeded metadata. All 250 million documents have been reindexed. But stop: it's not all documents – a few thousand are missing! Panic! What's going on? Checked the logs for errors – nothing. Also: why there are deleted documents in the new index?

Ok, let's first calculate the loss: 250305363 – 250278257 = 27106 (0.01%). Well, probably not worth to redo the procedure, either the links are outdated or the crawler will find them again. Anyway, I was interested to figure out the reason. But how to find the missing URLs? – It's not trivial to compare two lists of 250 million items.

The solution is to get first the counts of items per domain aggregating counts on the field "metadata.hostname" which is used for routing documents to shards. The idea is to find the domains where the counts differ and then compare only the per-domain lists. Let's do it:
curl -s -H Content-Type:application/json -XGET http://localhost:9200/status/_search --data '{
  "aggs" : {
    "agg" : {
      "terms" : {
        "field" : "metadata.hostname",
        "size" : 100000
      }
    }
  }
}' | jq --raw-output '.aggregations.agg.buckets[] | [(.doc_count|tostring),.key] | join("\t")' \
   | rev | sort | rev >domain_counts_new_index.txt
A short explanation what this command does:
  1. we send an aggregation query to the Elasticsearch index which counts documents per domain name (kept in "metadata.hostname")
  2. the JSON output is processed by jq and the output is written into a text file with two columns – count and domain name
  3. the list is sorted using reversed strings – this keeps the counts together per top-level domain, domain, subdomain which makes the comparison easier
The procedure to get the per-domain counts from the old index is the same. After we have both lists we can compare them side by side:
%> diff -y domain_counts_old_index.txt domain_counts_new_index.txt
...
4       athletics.africa              | 80      athletics.africa
76      www.athletics.africa          <
108     chri.ca                         108     chri.ca
2       www.alta-frequenza.corsica    | 2       alta-frequenza.corsica
2       corsenetinfos.corsica         | 3       corsenetinfos.corsica
1       www.corsenetinfos.corsica     <
...
Already the first difference brought up the reason for the differences between the old and new index! You remember, there was this issue with domain names changing over time with different versions of the public suffix list? It was one of the reasons why the reindexing topology had been introduced, see #684 and news-crawler#28). If the routing key is not stable it might happen that the same URL is indexed twice with different routing keys in two shards. Exactly this happened for some of the domains affected by this issue. Here is one example:
6       hr.de                |  19      hr.de
2       reportage.hr.de      <
1       daserste.hr.de       <
11      www.hr.de            <
In the new index there are only 19 items although the sum of 6 + 2 + 1 + 11 is 20. I've checked the remaining differences and the assumption has proven true: affected are only domain names either with recently introduced TLDs (.africa has been registered by ICANN in 2017) or misclassified suffixes (co.uk is a valid suffix but hr.de is not).

Everything is fine now and the only open point is to bring the new index into production and restart the crawl topology on the new machine!

Monday 13 May 2019

What's new in StormCrawler 1.14

StormCrawler 1.14 was released yesterday and as usual, contains loads of improvements and bugfixes.


This release contains a number of breaking changes, mostly related to the move to Elasticsearch 7. We recommend that all users upgrade to this version as it contains very important fixes and performance improvements.

Dependency upgrades

  • crawler-commons 1.0 #693
  • okhttp 3.14.0 #692
  • guava 27.1 (#702)
  • icu4j 64.1  #702)
  • httpclient 4.5.8  #702)
  • Snakeyaml 1.24  #702)
  • wiremock 2.22.0  #702)
  • rometools 1.12.0  #702)
  • Elasticsearch 7.0.0 (#708)

Core



  • Track how long a spout has been without any URLs in its buffer (#685)
  • Change ack mechanism for StatusUpdaterBolts (#689)
  • Robots URL filter to get instructions from cache only (#700)
  • Allow indexing under canonical URL if in the same domain, not just host (#703)
  • /bugfix/ URLs ending with a space are fetched over and over again (#704)
  • ParseFilter to normalise the mime-type of documents into simple values (#707)
  • Robot rules should check the cache in case of a redirection (#709)
  • /bugfix/ Fix the logic around sitemap = false (#710)
  • Reduce logging of exceptions in FetcherBolt (#719)

Elasticsearch



  • Asynchronous spouts (i.e ES) can send queries after max delay since previous one ended  (#683)
  • StatusUpdaterBolt to load config from non-default param names (#687)
  • Add a ScrollSpout to read all the documents from a shard (#688 and #690) - see in our guest post how this can be used to reindex a status index.
  • ES IndexerBolt : check success of batches before acking tuples (#647)
  • /bugfix/ URLs with content that breaks ES get refetched over and over again (#705)
  • /bugfix/ URLs without valid host name (and routing) stay DISCOVERED forever (#706)
  • /bugfix/ ESSeedInjector: no URLs injected because URL filter does not subscribe to status stream (#715)
  • MetricsConsumer to include topology ID in metrics(#714)

WARC

  • Generate WARC request records (#509)
  • WARC format improvements (#691)

Tika


  • Set mimetype whitelist for Tika Parser (#712)

*********

I will be running a workshop on StormCrawler next month at the Web Archiving Conference in Zagreb and give a presentation jointly with Sebastian Nagel of CommonCrawl. I will come with loads of presents generously given by our friends at Elastic.


As usual, thanks to all contributors and users.

Happy crawling!