Двигатель таблиц Distributed
Чтобы создать движок таблиц Distributed в облаке, вы можете использовать функции таблиц remote и remoteSecure. Синтаксис Distributed(...)
не может быть использован в ClickHouse Cloud.
Таблицы с движком Distributed не хранят собственные данные, но позволяют выполнять распределенную обработку запросов на нескольких серверах. Чтение автоматически параллелизуется. Во время чтения используются индексы таблицы на удалённых серверах, если они есть.
Создание таблицы
Из таблицы
Когда таблица Distributed
указывает на таблицу на текущем сервере, вы можете принять схему этой таблицы:
Параметры Distributed
cluster
cluster
- имя кластера в конфигурационном файле сервера
database
database
- имя удаленной базы данных
table
table
- имя удаленной таблицы
sharding_key
sharding_key
- (по желанию) ключ шардирования
Указание sharding_key
необходимо для следующего:
- Для
INSERT
в распределенную таблицу (так как движок таблицы нуждается вsharding_key
, чтобы определить, как разделить данные). Однако, если настройкаinsert_distributed_one_random_shard
включена, тоINSERT
не нуждаются в ключе шардирования. - Для использования с
optimize_skip_unused_shards
, так какsharding_key
необходим для определения, какие шары должны быть запрошены.
policy_name
policy_name
- (по желанию) имя политики, оно будет использоваться для хранения временных файлов для фоновой отправки.
Смотрите также
- Настройка distributed_foreground_insert
- MergeTree для примеров
Настройки Distributed
fsync_after_insert
fsync_after_insert
- выполнить fsync
для данных файла после фоновой вставки в Distributed. Гарантирует, что ОС записала все введенные данные на диск инициаторного узла.
fsync_directories
fsync_directories
- выполнить fsync
для каталогов. Гарантирует, что ОС обновила метаданные каталога после операций, связанных с фоновой вставкой в распределенной таблице (после вставки, после отправки данных на шары и т.д.).
skip_unavailable_shards
skip_unavailable_shards
- Если true, ClickHouse безмолвно пропускает недоступные шары. Шар помечается как недоступный, когда: 1) Шар не может быть достигнут из-за сбоя соединения. 2) Шар невозможно разрешить через DNS. 3) Таблица не существует на шаре. По умолчанию false.
bytes_to_throw_insert
bytes_to_throw_insert
- если больше этого количества сжатых байтов будет ожидать фоновой вставки, произойдёт исключение. 0 - не выбрасывать. По умолчанию 0.
bytes_to_delay_insert
bytes_to_delay_insert
- если больше этого количества сжатых байтов будет ожидать фоновой вставки, запрос будет задержан. 0 - не задерживать. По умолчанию 0.
max_delay_to_insert
max_delay_to_insert
- максимальная задержка вставки данных в распределенную таблицу в секундах, если имеется много ожидающих байтов для фоновой отправки. По умолчанию 60.
background_insert_batch
background_insert_batch
- то же самое, что и distributed_background_insert_batch
background_insert_split_batch_on_failure
background_insert_split_batch_on_failure
- то же самое, что и distributed_background_insert_split_batch_on_failure
background_insert_sleep_time_ms
background_insert_sleep_time_ms
- то же самое, что и distributed_background_insert_sleep_time_ms
background_insert_max_sleep_time_ms
background_insert_max_sleep_time_ms
- то же самое, что и distributed_background_insert_max_sleep_time_ms
flush_on_detach
flush_on_detach
- Сбросить данные на удаленные узлы при DETACH/DROP/выключении сервера. По умолчанию true.
Настройки надёжности (fsync_...
):
- Влияют только на фоновые вставки (т.е.
distributed_foreground_insert=false
), когда данные сначала хранятся на диске инициаторного узла, а затем в фоне отправляются на шары. - Могут значительно снизить производительность вставок.
- Влияют на запись данных, хранящихся внутри папки распределенной таблицы в узел, который принял вашу вставку. Если вам нужны гарантии записи данных в подлежащие таблицы MergeTree - смотрите настройки надёжности (
...fsync...
) вsystem.merge_tree_settings
.
Для Настроек предела вставок (..._insert
) смотрите также:
- Настройка distributed_foreground_insert
- Настройка prefer_localhost_replica
bytes_to_throw_insert
обрабатывается передbytes_to_delay_insert
, так что вы не должны устанавливать его меньше, чемbytes_to_delay_insert
.
Пример
Данные будут считываться со всех серверов в кластере logs
, из таблицы default.hits
, расположенной на каждом сервере в кластере. Данные не только считываются, но и частично обрабатываются на удалённых серверах (насколько это возможно). Например, для запроса с GROUP BY
данные будут агрегистрироваться на удалённых серверах, и промежуточные состояния агрегатных функций будут отправляться на сервер запрашивающего. Затем данные будут дополнительно агрегироваться.
Вместо имени базы данных вы можете использовать константное выражение, которое возвращает строку. Например: currentDatabase()
.
Кластеры
Кластеры настраиваются в конфигурационном файле сервера:
Здесь определён кластер с именем logs
, который состоит из двух шардов, каждый из которых содержит две реплики. Шар относится к серверам, которые содержат различные части данных (для чтения всех данных вы должны получить доступ ко всем шарам). Реплики - это дублирующие серверы (для чтения всех данных вы можете получить доступ к данным на любой из реплик).
Имена кластеров не должны содержать точки.
Параметры host
, port
и, по желанию, user
, password
, secure
, compression
, bind_host
указаны для каждого сервера:
host
– Адрес удаленного сервера. Вы можете использовать либо доменное имя, либо IPv4 или IPv6 адрес. Если вы указываете доменное имя, сервер выполняет DNS-запрос, когда он запускается, а результат сохраняется, пока сервер работает. Если DNS-запрос не удаётся, сервер не запускается. Если вы измените DNS-запись, перезапустите сервер.port
– TCP порт для активности мессенджера (tcp_port
в конфигурации, обычно установлен на 9000). Не путать сhttp_port
.user
– Имя пользователя для подключения к удалённому серверу. Значение по умолчанию - пользовательdefault
. Этот пользователь должен иметь доступ к подключению к указанному серверу. Доступ настраивается в файлеusers.xml
. Для получения дополнительной информации смотрите раздел Права доступа.password
– Пароль для подключения к удаленному серверу (не скрыт). Значение по умолчанию: пустая строка.secure
- Использовать ли защищенное SSL/TLS соединение. Обычно также требует указания порта (порт по умолчанию для безопасности9440
). Сервер должен слушать на<tcp_port_secure>9440</tcp_port_secure>
и быть настроен с правильными сертификатами.compression
- Использовать сжатие данных. Значение по умолчанию:true
.bind_host
- Исходный адрес, который следует использовать при подключении к удалённому серверу с этого узла. Поддерживается только IPv4 адрес. Предназначен для использования в сложных сценариях развертывания, когда необходимо установить исходный IP-адрес, используемый запросами ClickHouse.
При указании реплик одна из доступных реплик будет выбрана для каждого из шардов при чтении. Вы можете настроить алгоритм балансировки нагрузки (предпочтение для доступа к какой реплике) – см. настройку load_balancing. Если соединение с сервером не установлено, будет предпринята попытка подключения с коротким временем ожидания. Если соединение не удалось, будет выбрана следующая реплика и так далее для всех реплик. Если попытка соединения не удалась для всех реплик, эта попытка будет повторена тем же образом несколько раз. Это способствует устойчивости, но не обеспечивает полной отказоустойчивости: удаленный сервер может принять соединение, но может не работать или работать плохо.
Вы можете указать только один из шардов (в этом случае обработка запроса должна называться удаленной, а не распределенной) или любое количество шардов. В каждом шаре вы можете указать от одной до любого количества реплик. Вы можете указать разное количество реплик для каждого шара.
Вы можете указать столько кластеров, сколько требуется в конфигурации.
Чтобы просмотреть свои кластеры, используйте таблицу system.clusters
.
Двигатель Distributed
позволяет работать с кластером так же, как с локальным сервером. Однако конфигурацию кластера нельзя указывать динамически, она должна быть настроена в конфигурационном файле сервера. Обычно все серверы в кластере будут иметь одинаковую конфигурацию кластера (хотя это не требуется). Кластеры из конфигурационного файла обновляются на лету, без перезапуска сервера.
Если вам нужно отправлять запрос к неизвестному набору шардов и реплик каждый раз, вам не нужно создавать таблицу Distributed
– используйте функция таблицы remote
вместо этого. См. раздел Функции таблиц.
Запись данных
Существует два метода записи данных в кластер:
Во-первых, вы можете определить, на какие серверы записывать какие данные, и выполнять запись непосредственно на каждом шаре. Другими словами, выполняйте прямые операторы INSERT
на удаленных таблицах в кластере, на который указывает таблица Distributed
. Это наиболее гибкое решение, так как вы можете использовать любую схему шардирования, даже ту, которая является нетривиальной из-за требований предметной области. Это также наиболее оптимальное решение, поскольку данные могут записываться на разные шары полностью независимо.
Во-вторых, вы можете выполнять операторы INSERT
на таблице Distributed
. В этом случае таблица самостоятельно распределит вставленные данные между серверами. Для записи в таблицу Distributed
необходимо, чтобы был настроен параметр sharding_key
(если шардов больше одного).
Каждый шар может иметь <weight>
, определенный в конфигурационном файле. Значение по умолчанию - 1
. Данные распределяются между шарами по количеству, пропорциональному весу шара. Все веса шардов суммируются, затем вес каждого шара делится на общий, чтобы определить пропорцию каждого шара. Например, если имеется два шара, и первый имеет вес 1, а второй имеет вес 2, первый получит одну треть (1 / 3) вставленных строк, а второй получит две трети (2 / 3).
Каждый шар может иметь параметр internal_replication
, определенный в конфигурационном файле. Если этот параметр установлен в true
, операция записи выбирает первую здоровую реплику и записывает данные в неё. Используйте это, если таблицы, лежащие в основе таблицы Distributed
, являются реплицируемыми таблицами (например, любая из движков таблиц Replicated*MergeTree
). Одна из реплик таблицы получит запись, и она будет автоматически реплицирована на другие реплики.
Если internal_replication
установлен в false
(по умолчанию), данные записываются во все реплики. В этом случае таблица Distributed
сама реплицирует данные. Это хуже, чем использование реплицируемых таблиц, потому что консистентность реплик не проверяется и со временем они будут содержать немного разные данные.
Чтобы выбрать шар, в который отправляется строка данных, анализируется выражение шардирования, и берётся его остаток от деления на общий вес шардов. Затем строка отправляется в тот шар, который соответствует полупромежутку остатков от prev_weights
до prev_weights + weight
, где prev_weights
- это общий вес шардов с наименьшим номером, а weight
- это вес этого шара. Например, если имеется два шара, и первый имеет вес 9, тогда как второй имеет вес 10, строка будет отправлена в первый шар для остатков из диапазона [0, 9), а во второй для остатков из диапазона [9, 19).
Выражение шардирования может быть любым выражением из констант и столбцов таблицы, которое возвращает целое число. Например, вы можете использовать выражение rand()
для случайного распределения данных или UserID
для распределения по остатку от деления ID пользователя (в этом случае данные одного пользователя будут находиться на одном шаре, что упрощает выполнение IN
и JOIN
по пользователям). Если один из столбцов распределен недостаточно равномерно, вы можете обернуть его в хеш-функцию, например, intHash64(UserID)
.
Простое деление по остаткам - это ограниченное решение для шардирования и не всегда подходит. Оно работает для средних и больших объемов данных (десятки серверов), но не для очень больших объемов данных (сотни серверов и более). В последнем случае используйте схему шардирования, необходимую для предметной области, а не записи в таблицах Distributed
.
Вы должны беспокоиться о схеме шардирования в следующих случаях:
- Используются запросы, которые требуют объединения данных (
IN
илиJOIN
) по определенному ключу. Если данные шардированы по этому ключу, вы можете использовать локальныйIN
илиJOIN
, вместоGLOBAL IN
илиGLOBAL JOIN
, что в разы эффективнее. - Используется большое количество серверов (сотни или более) с множеством мелких запросов, например, запросов на данные отдельных клиентов (например, веб-сайты, рекламодатели или партнёры). Чтобы мелкие запросы не затрагивали весь кластер, имеет смысл размещать данные для одного клиента на одном шаре. В качестве альтернативы можно настроить двухуровневое шардирование: разделить весь кластер на "слои", где слой может состоять из нескольких шардов. Данные для одного клиента находятся на одном слое, но к слою могут добавляться шары по мере необходимости, и данные распределяются случайным образом внутри них. Таблицы
Distributed
создаются для каждого слоя, а одна общая распределенная таблица создается для глобальных запросов.
Данные записываются в фоне. При вставке в таблицу блок данных просто записывается в локальную файловую систему. Данные отправляются на удаленные серверы в фоне, как можно скорее. Периодичность отправки данных регулируется настройками distributed_background_insert_sleep_time_ms и distributed_background_insert_max_sleep_time_ms. Двигатель Distributed
отправляет каждый файл с вставленными данными отдельно, но вы можете включить пакетную отправку файлов с помощью настройки distributed_background_insert_batch. Эта настройка улучшает производительность кластера, более эффективно используя ресурсы локального сервера и сети. Вы должны проверить, что данные отправляются успешно, проверив список файлов (данные, ожидающие отправки) в каталоге таблицы: /var/lib/clickhouse/data/database/table/
. Число потоков, выполняющих задачи в фоне, может быть установлено с помощью настройки background_distributed_schedule_pool_size.
Если сервер перестал существовать или произошел грубый перезапуск (например, из-за аппаратного сбоя) после вставки в таблицу Distributed
, вставленные данные могут быть потеряны. Если в каталоге таблицы обнаружена повреждённая часть данных, она перемещается в подкаталог broken
и больше не используется.
Чтение данных
При запросе таблицы Distributed
запросы SELECT
отправляются на все шары и работают независимо от того, как данные распределены по шарам (они могут быть распределены совершенно случайно). Когда вы добавляете новый шар, вам не нужно переносить старые данные в него. Вместо этого вы можете записывать новые данные в него, используя более высокий вес – данные будут распределены немного неравномерно, но запросы будут работать корректно и эффективно.
Когда опция max_parallel_replicas
активирована, обработка запросов параллелизуется по всем репликам в пределах одного шара. Для больше информации ознакомьтесь с разделом max_parallel_replicas.
Чтобы узнать больше о том, как обрабатываются распределенные запросы in
и global in
, обратитесь к этой документации.
Виртуальные столбцы
_shard_num
_shard_num
— Содержит значение shard_num
из таблицы system.clusters
. Тип: UInt32.
Смотрите также
- Описание Виртуальные столбцы
- Настройка background_distributed_schedule_pool_size
- Функции shardNum() и shardCount()