Архитектура MongoDb. Часть 4. Шардинг, Map/Reduce в MongoDb

Рубрика:

Модель шардинга

Для того, чтобы выровнять нагрузку на запрос записи, мы можем использовать шарды MongoDb. При установке шардинга коллекция может быть разделена (ключом разделения) на чанки  (chunks), которые являются диапазоном ключей, и получить блоки, распределенные по нескольким шардам (каждый шард будет набором реплик). Шардинг MongoDb эффективно обеспечивают неограниченный размер для коллекции данных, что важно для любого случая с большими данными.

Для повторения итераций, в шардинг модели, для каждой коллекции, которая хранится в кластере шардинга, будет определен один ключ для шардинга. Пространство ключей для ключа шардинга делится на непрерывные диапазоны  ключей, называемые чанками (chunks), которые размещены по соответствующим шардам.

# To define the partition key
db.runcommand({shardcollection: "testdb.person",
         key: {firstname:1, lastname:1}})

В установке шарда клиентская библиотека соединяется с сервером маршрутизации без запоминания состояния "MongoS", который ведет себя как "MongoD". Сервер маршрутизации играет важную роль в передаче клиентского запроса к соответствующему серверу шардинга согласно характеристикам запроса.

Для запросов  insert/delete/update, содержащих ключ шардинга, основанных на информации отображения чанка/шарда (полученной от конфиг сервера и кэша локально), сервер маршрутизации передаст запрос к соответствующему основному серверу, размещающему чанк, диапазон ключей которого покрывает ключ шардинга измененного документа. Учитывая определенный ключ для шардинга, может быть однозначно определен основной сервер, содержащий этот чанк.

В случае запроса, сервер маршрутизации исследует, является ли ключ для шардинга частью критериев выбора, и если так, то только "направит" запрос к соответствующему шарду (основному или вторичному). Однако, если ключ шардинга не будет частью критериев выбора, то сервер маршрутизации "рассеет" запрос по каждому шарду (выбирая один элемент каждого шарда), который произведет этот локальный поиск, а результат будет собран в сервере маршрутизации и возвратится клиенту. Когда запрос требует, чтобы результат был отсортирован, и если ключ шардинга будет включен в порядок сортировки, то сервер маршрутизации направит запрос последовательно к нескольким шардам, т.к.  клиент выполняет итерации результата. Если сортировка включает другой ключ, который не является ключом для шардинга, сервер маршрутизации распространит запрос по всем шардам, которые выполнят эту локальную сортировку, и затем объединят результат в сервере маршрутизации (в основном распределенная сортировка с объединением).

Если объем данных вставленных в чанк приближается к его полной емкости, необходимо будет разделить этот чанк. Сервер маршрутизации может обнаружить эту ситуацию статистически на основе числа запросов, которые он передает, а также запросов другого существующего сервера маршрутизации. Затем сервер маршрутизации свяжется с основным сервером шарда, который содержит полный чанк и сделает запрос на разделение чанка. Сервер шарда вычислит среднюю точку диапазона ключей, которая может равномерно распределить данные, а затем разделить блок и обновить сервер конфигурации о его точке разделения. Обратите внимание, что до сих пор не происходило никакого перемещения данных, поскольку данные все еще находятся в том же сервере шарда.

С другой стороны, есть другой "балансирующий" процесс, работающий в одном из серверов маршрутизации, чье задание состоит в том, чтобы удостовериться, что каждый шард содержит приблизительно одинаковое число чанков. Если будет обнаружено условие дисбаланса, балансировщик свяжется с занятым шардом, чтобы запустить процесс миграции чанка. Этот процесс миграции происходит онлайн, где передатчик связывается с приемником, чтобы инициировать передачу данных, и данные начнут копироваться от передатчика к приемнику. Этот процесс может занять определенное время в зависимости от объема данных, во время которого в передатчике может происходить обновление. Эти изменения будут прослеживаться и когда копирование закончится, обновление передастся в место назначения. После нескольких раундов применения обновлений, с финальным раундом передача останавливается, и передатчик приостанавливает все запросы, прибывающие от сервера маршрутизации. После последнего раунда изменений, примененных к месту назначения, оно обновит сервер конфигурации о новой конфигурации шарда и сообщит шарду, который передавал данные (который все еще удерживается от запроса), возвратить StaleConfigException серверу маршрутизации, а он в свою очередь перечитает последнюю конфигурацию от configServer и повторно представит предыдущие запросы. Через какое-то время данные в передатчике будут физически удалены.

Возможно, что при условии обновления высокой частоты, изменения, применяемые к месту назначения, неспособны догнать частоту обновлений, которые происходят в передатчике. Когда обнаруживается такая ситуация, прерывается весь процесс миграции. Сервер маршрутизации может выбрать другой чанк, чтобы потом снова запустить миграцию.

Выполнение Map/Reduce

MongoDb предусматривает Map/Reduce фреймворк, чтобы выполнять параллельную обработку данных. Это понятие подобно Hadoop Map/Reduce, но со следующими небольшими различиями:

  • Он берет входные данные из результата запроса каталога HDFS, а не коллекции
  • Выход Reduce может быть добавлен к существующей коллекции, а не пустому каталогу HDFS

Map/Reduce в работе Mongo также немного отличающимся способом следующим образом:

1. Клиент определяет функцию картирования, уменьшает функцию, отправляет запрос, который определяет объем входных данных и выходной коллекции, которые хранят выходной результат.

2. Клиент отправляет запрос в сервер маршрутизации MongoS

3. MongoS передает запрос к соответствующим шардам (маршрут, или распространение зависит от того, появляется ли в запросе ключ шардинга).

4. Основная БД каждого шарда выполняет запрос и вывод канала к определенной пользователем функции map, которые выдают набор пар значений ключа, хранящихся в буфере памяти. Когда буфер памяти будет полон, будет вызвана определяемая пользователем функция reduce, которые частично уменьшают пары значений ключа в буфере памяти, результат сохраняется в локальной коллекции.

5. Когда шаг (4) завершится, функция reduce будет выполняться над всем предыдущем частично уменьшенном результате, чтобы поместить один уменьшенный результат на этом сервере.

6. Когда шаг (5) заканчивается, MongoS уведомляет соответствующие серверы шардов, которые будут хранить результирующую коллекцию. Если она будет неразделима, то только один шард будет уведомлен, в противном случае все шарды будут уведомлены.

7. Основная БД шарда(ов), хранящего результирующую коллекцию, запросит, чтобы каждый шард собрал ранее частично уменьшенные данные. Она запросит результат на основе своего соответствующего диапазона ключей.

8. Основная БД снова выполняет функцию reduce() уже над частично уменьшенным результатом. Затем сохраняет финальный результат локально. Если пользователь предоставит функцию финализации, то она также будет вызвана.

Вот простой пример для создания инвертированного индекса от документа к темам:

db.book.insert({title:"NOSQL",
             about:["software", "db"]})
db.book.insert({title:"Java programming",
             about:["software", "program"]})
db.book.insert({title:"Mongo",
             about:["db", "technology"]})
db.book.insert({title:"Oracle",
             about:["db", "software"]})
db.book.find()
m = function() {
 for (var i in this.about) {
     emit(this.about[i], this.title)
 }
}
r = function(k, vals) {
 return({topic:k, title:vals})
}
db.book.mapReduce(m, r, {query:{},
               out:{replace:"mroutput"}})
db.mroutput.find()

Читайте предыдущие части серии "Архитектура MongoDB":

Часть 1. Отличие от СУБД, обработка запросов в MongoDb

Часть 2. Модель хранения, обновления и транзакции в MongoDb

Часть 3. Модель репликации MongoDB

Автор: 
Ricky Ho, http://horicky.blogspot.com/2012/04/mongodb-architecture.html Ricky Ho, http://horicky.blogspot.com/2012/04/mongodb-architecture.html