Pull to refresh

Пул соединений MySQL и как это можно использовать для распараллеливания

Reading time14 min
Views17K
Я хочу поделиться со всеми читателями интересной темой, с которой столкнулся совсем недавно, и мне она понравилась. Развитие этой темы доставило мне удовольствие и добавило немного опыта в копилку. Вероятно, многие, а может и нет, сталкивались с пулом соединений БД. После ознакомления с этой интересной опцией мне захотелось написать статью и поделиться ею с вами. Возможно, статья получится немного длинной, но я думаю, что данный пост кому-то все же будет интересно почитать, и его заинтересует данная тема. Может быть наработки с этой статьи кто-то использует в своем проекте, в любом случае, мне будет интересно ее писать и рассказывать ее вам.

Занимаюсь фрилансом. Как-то свел меня один мой знакомый с владельцами автошколы и там меня попросили сделать программу для формирования своей коллекции билетов. Что-то вроде своей книги билетов ПДД. Все вопросы хранятся в СУБД MySQL в трех таблицах, где в одной коллекция вопросов, в другой тематика этих вопросов и в последней ответы на вопросы. В результате 800 вопросов с картинкой или без нее, включая ответы. Первоначальный вариант формирования представлял собой обычный метод с последовательным формированием вопросов внутри. Мне довольно интересна тема многопоточного выполнения программы поэтому после создания вполне рабочего метода, я решил сделать все более удобным, а заодно прибавил скорости выборки.

Начнем с того, что для формирования своей версии билетов ПДД необходимо учитывать тематику вопросов. При этом всего 12 тем. Первые 8 тем состоят из 40 вопросов, оставшиеся из 120. Каждой теме соответствует определенный номер в билете, при этом первым 8 темам соответствует 1 вопрос из билета, другим 3 вопроса из билета. Для хранения сгенерированной версии вопросов используется словарь Dictionary, где каждый ключ хранит в себе список вопросов по определенной тематике. При этом порядок вопросов должен быть всегда разным, т.е. необходим метод, который будет генерировать последовательность чисел от 0 до 40 без повторений, так получится выбирать любой билет и соответственно вопрос из него. С учетом всего получается такой алгоритм формирования всех вопросов:

  • получаем все темы, которые у нас есть;
  • из тем формируем список номеров вопросов для каждой из тем;
  • генерируем случайные числа;
  • на основе списка вопросов и последовательности случайных чисел делаем выборку и формируем список вопросов по тематике;
  • заносим каждый список в словарь.

Реализация и работа первых двух шагов зависит от логики и на мой взгляд может выполняться один раз при запросе на выборку. Третий шаг зависит от того, как сильно мы хотим сделать вопросы разными, перед каждой тематикой делать новую генерацию, либо один раз для всех тем. Я выбрал генерировать только один раз, на мой взгляд этого достаточно. Четвертый шаг будет работать чаще всего и в этом месте следует больше всего уходит времени при выборке. Последний шаг простой и его просто пропустим.

Подключение к БД


Рассмотрим подключение к БД. Для этого используем абстрактный класс, который будет хранить в себе строку соединения, Connection и DataReader:

public abstract class SqlBase 
	{
		protected static readonly String Connect;
		protected readonly MySqlConnection SqlConnection;
		protected MySqlDataReader SqlDataReader;

		static SqlBase()
		{
			Connect = String.Format("Database={0};Data Source={1};User ID={2};Password={3};CharSet=utf8",
				Settings.Default.Database, Settings.Default.DataSource, Settings.Default.UserId, Settings.Default.Password);
		}
		protected SqlBase()
		{
			try
			{
				this.SqlConnection = new MySqlConnection(Connect);
				this.SqlConnection.Open();
			}
			catch (Exception ex)
			{
				throw new Exception(ex.Message, ex);
			}
		}
	}

Мне нравятся абстрактные классы тем, что это они позволяют создавать и скрывать в себе удобную логику работы, позволяя в производном полностью сосредоточиться на другой конкретике и не беспокоится о других деталях, возложив обязанности на базовый класс. В данном случае создавая экземпляр производного класса, мы автоматически получаем необходимые условия для работы с БД и нам остается реализовать логику самого класса для формирования выборки вопросов:

public sealed class SqlPerceptionQuestions : SqlBase
	{
        public Dictionary<Int32, List<Question>> GetQuestion()
        {
           Generation();
           GetTheme();
           return Request();
        }
        private Dictionary<Int32, List<Question>> Request()
        {
           var _collectionDictionary = new Dictionary<Int32, List<Question>>();
           
           for(int ctr = 0; ctr < 12; ctr++)
           {
              using (var _questions = new SqlQuestions())
              {
				if (ctr < 8)
				{
					_collectionDictionary[ctr] = _questions.GetQuestionSmall((Int16)ctr);
				}
				else
				{
					_collectionDictionary[ctr] = _questions.GetQuestionGreat((Int16)ctr);
				}          
              }
           return _collectionDictionary;
          }

        private async void GetTheme()
		{
			//логика метода
		}
            //метод для получения данных для темы с 40 вопросами
            private List<Question> GetQuestionSmall(Int16 numTheme)
			{
				var _listQuestions = new List<Question>();

				for (Int16 numCard = 0; numCard < 40; numCard++)
				{
					_listQuestions.Add(GetQuestion(numCard, numTheme));
				}
				return _listQuestions;
			}

           //метод для получения данных для темы со 120 вопросами
			private List<Question> GetQuestionGreat(Int16 numTheme)
			{
				var _listQuestions = new List<Question>();

				for (Int16 numQuestion = 0; numQuestion < 3; numQuestion++)
					for (int numCard = 0; numCard < 40; numCard++)
					{
						_listQuestions.Add(GetQuestion(numQuestion, numTheme, numQuestion));
					}

				return _listQuestions;
			}

            // метод для получения одного вопроса из БД по параметрам
			private Question GetQuestion(Int16 numCard, Int16 numTheme, Int16 numQuestion = 0)
			{
				//логика метода
			}

            //получение ответа на вопрос
			private List<String> GetResponse(Int32 questions_id)
			{
				//логика метода
			}
    }


Это самая простая и синхронная версия, которая работает около 2 секунд и 200-400 миллисекунд, что соответственно будет блокировать пользовательский интерфейс на все это время. Это уже неплохо работающая версия, так как самая первая реализация работала довольно долго, около 6 секунд. После улучшения вышло всего около 2 секунд.

Создание асинхронной версии выборки


Все хорошо и все уже работает, но так ли, как должно быть? Ведь у нас синхронный метод(блокирующий) и не консольное приложение. Необходима правильная и вполне работающая программа, которая не будет блокироваться даже на половину секунды, а будет исправно работать при любой нагрузке. Для этого первым делом перепишем метод GetQuestion(). Сделаем его асинхронным в соответствии с паттерном TAP(Task-based Asynchronous Pattern). Кому интересно почитайте в интернете или есть довольно хорошая книга, которая мне очень нравится — «Асинхронное программирование в C# 5.0» Алекс Дэвис, где очень хорошо описывается данная тематика, либо загляните сюда. Перепишем его и он будет выглядеть так:

public async Task<Dictionary<Int32, List<Question>>> GetQuestionAsync()
		{
			return await Task.Factory.StartNew(() =>
			{
				Generation();
				GetTheme();
				return Request();
			
			}, TaskCreationOptions.LongRunning);
		}

Рассмотрим самое интересное в этом методе: Task.Factory.StartNew(). Начиная с версии .NET 4.5 можно использовать версию Task.Run(), которая отличается от предыдущей более простым объявлением с меньшим числом параметров при создании. По сути Task.Run() она представляет собой просто более удобную оболочку над Task.Factory.StartNew() и очень подходит для простого создания асинхронных задач, но при этом обладает немного меньшей гибкостью управления. Если необходим более точный контроль над тем, какой поток производит вычисления или как он планируется, используем Task.Factory.StartNew(). Если интересно загляните сюда. В этом случае, я использовал этот вариант по причине того, что я также указал такой параметр, как TaskCreationOptions.LongRunning, что помечает данную задачу как длительную и означает, что данный рабочий элемент будет выполняться на протяжении длительного периода времени и может заблокировать другие рабочие элементы. Также данный флаг предоставляет сведения для TaskScheduler, что следует ожидать избыточной подписки и это позволит создать больше потоков, чем количество доступных аппаратных потоков. С помощью этого параметра можно полностью избежать ThreadPool, включая глобальные и локальные очереди. Подробнее читайте «Планировщик заданий».

Таким образом мы получаем уже асинхронное выполнение формирования коллекции данных и не блокируем основной поток.

Распараллеливание выборки


После этого переходим к распараллеливанию, что даст нам прирост по скорости выборки данных. Но для этого придется переделать немного основной класс, изменить базовый и добавить еще один для обеспечения параллельности работы цикла, чтобы избежать гонок данных, критических секций и проблем с переменными для разных потоков.
Для начала создадим новый вложенный класс в классе SqlPerceptionQuestionsSqlQuestions. В него перенесем методы для получения вопросов и ответов на них, а также сделаем его производным от SqlBase, при этом во внешнем классе оставим метод для получения тем, его мы будем вызывать только один раз, а также формирование последовательности чисел.

Код класса SqlQuestions:

		internal sealed class SqlQuestions : SqlBase
		{
			internal List<Question> GetQuestionSmall(Int16 numTheme)
			{
				var _listQuestions = new List<Question>();

				for (Int16 numCard = 0; numCard < 40; numCard++)
				{
					_listQuestions.Add(GetQuestion(numCard, numTheme));
				}
				return _listQuestions;
			}

			internal List<Question> GetQuestionGreat(Int16 numTheme)
			{
				var _listQuestions = new List<Question>();

				for (Int16 numQuestion = 0; numQuestion < 3; numQuestion++)
					for (int numCard = 0; numCard < 40; numCard++)
					{
						_listQuestions.Add(GetQuestion(numQuestion, numTheme, numQuestion));
					}

				return _listQuestions;
			}

			private Question GetQuestion(Int16 numCard, Int16 numTheme, Int16 numQuestion = 0)
			{
				//логика метода
			}

			private List<String> GetResponse(Int32 questions_id)
			{
                //логика метода
			}
		}

Для распараллеливания цикла будем использовать Parallel.For(). Это довольно удобный способ организовать загрузку данных в несколько потоков. Но это чревато также тем, что нам необходимо будет создать как минимум несколько соединений с БД, так как один Connection способен обрабатывать один DataReader. Перепишем метод .Request():

private Dictionary<Int32, List<Question>> Request()
		{
			var _collectionDictionary = new Dictionary<Int32, List<Question>>();
			var _po = new ParallelOptions { MaxDegreeOfParallelism = Environment.ProcessorCount };

			Parallel.For(0, 12, _po, ctr =>
				{
					using (var _questions = new SqlQuestions())
					{
						if (ctr < 8)
						{
							_collectionDictionary[ctr] = _questions.GetQuestionSmall((Int16)ctr);
						}
						else
						{
							_collectionDictionary[ctr] = _questions.GetQuestionGreat((Int16)ctr);
						}
					}
				});

			return _collectionDictionary;
		}

А после открытия соединения его необходимо закрыть. При этом все это будет происходить в цикле. Для реализации всего этого я и решил создать отдельный производный класс SqlQuestions. Для закрытия соединения будем вызывать .Dispose(), в котором пропишем что нам необходимо делать при закрытии. Для этого сначала объявим метод .Dispose() в базовом классе:

	public void Dispose()
		{
			Dispose(true);
			GC.SuppressFinalize(this);
		}
		protected abstract void Dispose(Boolean disposing);

А реализовывать его будем по-разному в производных классах. Почему так? Начнем с того, что же происходит при создании подключения к БД и ее закрытии? Если открыть соединение, а затем его закрыть, то оно на некоторое время (около 3-х минут) помещается в пул соединений MySQL, и если открыть новое, то соединение берется из пула, что означает что время и ресурсы на повторное открытие не используются. Давайте запустим наши новые методы и посмотрим сколько времени уходит на открытие соединений БД, для этого вставим в базовом классе Stopwatch в код, где происходит открытие соединения и посмотрим что у нас на выходе. Код:

protected SqlBase()
		{
			Stopwatch st = new Stopwatch();
			st.Start();
			try
			{
				this.SqlConnection = new MySqlConnection(Connect);
				this.SqlConnection.Open();
			}
			catch (MySqlException ex)
			{
				throw new Exception(ex.Message, ex);
			}
			st.Stop();
			Debug.WriteLine("Время открытия соединения БД : " + st.Elapsed.Seconds.ToString() + " секунд " + st.Elapsed.Milliseconds.ToString() + " миллисекунд");
		}

Время открытия
image

Первое соединение самое долгое, здесь открывается соединение для класса SqlPerceptionQuestions, которое будет открыто на все время работы метода. Последующие соединения — это те, которые были открыты при в цикле при создании экземпляров класса SqlQuestions. С учетом кол-ва процессоров у меня на компьютере, которых 4 получаем, что в цикле максимум будет открыто 4 соединения. Получаем, что всего изначально будет открыто 5 соединений, при этом в цикле будут как открываться, так и закрываться. Поэтому первые 5 соединений требуют времени на открытие, а после когда в цикле будут закрываться старые соединения и открываться новые, то на них не будет уходить время и ресурсы, так как соединения есть в пуле и они просто выдаются каждый раз, когда требуются. Из-за этого и реализуется немного по-разному очистка классов. В классе SqlPerceptionQuestions метод будет выглядеть так:

protected override void Dispose(bool disposing)
		{
			if (!this.disposed)
			{
				if (SqlDataReader != null)
				{
					SqlDataReader.Close();
					SqlDataReader.Dispose();
				}
				SqlConnection.Close();
				SqlConnection.Dispose();
				MySqlConnection.ClearAllPools();
			}

			disposed = true;
		}

А в классе SqlQuestions точно также, за исключением строки MySqlConnection.ClearAllPools(); Ведь, если мы ее оставим, то получим такую вот ситуацию:

Время открытия
image

Как мы видим, постоянная очистка пула потоков приводит к постоянному открытию соединения с вытекающими последствиями.

Пул соединений MySql


Рассмотрим немного подробнее этот момент. Connector/Net MySql поддерживает пул соединений для повышения производительности и масштабируемости баз данных приложений с интенсивным использованием. Данная функция включена по умолчанию. Ее можно отключить или изменить характеристики с помощью опций строки подключения. Строка подключения поддерживает опции относительно пула соединений:

Наименование опции Значение по
умолчанию
Подробности
Cache Server Properties, CacheServerProperties Отключено Определяет, обновлять ли параметры некоторых системных переменных(SHOW VARIABLES) каждый раз, когда соединение из пула возвращается. Включение этой опции ускоряет соединение с контекстом пула соединений. Вашему приложению не сообщается об изменениях конфигурации, сделанных другими соединениями. Данная опция добавлена начиная с версии Connector/Net 6.3.
Connection Lifetime, ConnectionLifeTime 0 Когда соединение возвращается в пул, время его создания сравнивается с текущим временем и, если оно превышает значение ConnectionLifeTime, то данное соединение уничтожается. Это полезно в кластерных конфигурациях для осуществления балансировки нагрузки между рабочим сервером и сервером, находящимся в онлайне. Значение zero(0) заставляет соединения в пуле находится в режиме ожидания максимально возможное время.
Connection Reset, ConnectionReset false Если истина, состояние соединения при извлечении из пула сбрасывается. Значение по умолчанию позволяет избежать дополнительного цикла обработки сервером для получения соединения, но при этом состояние соединения не сбрасывается.
Maximum Pool Size, Max Pool Size, MaximumPoolsize, maxpoolsize 100 Максимальное число соединений, которым дозволено находиться в пуле.
Minimum Pool Size, Min Pool Size, MinimumPoolSize, minpoolsize 0 Минимальное число соединений, которым дозволено находиться в пуле.
Pooling true Если истина, то объект MySqlConnection берется из пула, если необходимо, он создается и добавляется в соответствующий пул. Определенные значения: true, false, yes и no.

Используя данные параметры, можно управлять пулом так, как необходимо. К примеру:
  • Connection Reset сбрасывает контекст соединения и если его использовать не по умолчанию, то время на получение соединения из пула незначительно увеличивается.
  • Minimum Pool Size позволяет установить в случае необходимости количество соединений в пуле, которые будут существовать неограниченно длительное время, если количество соединений меньше или равно значению Minimum Pool Size.
  • Отключение Pooling приведет к таким же результатам, если все время очищать пул потоков (как в примере данной программы)

По умолчанию, начиная с MySQL Connector / Net 6.2, есть фоновое задание, которое выполняется каждые три минуты и удаляет соединения из пула, которые находятся в режиме ожидания (не используются) в течение более трех минут. Очистка пула освобождает ресурсы как на клиенте, так и на стороне сервера. Это потому, что на стороне клиента каждое соединение использует сокет, так и на стороне сервера каждое соединение использует сокет и поток.

Параллельный стек вызовов


Ради интереса, если поставить точку остановы, например, в методе .GetQuestion() и посмотреть параллельный стек вызовов, увидим:

Параллельный стек
image

Как видно из скриншота, мы находимся в одном из потоков, который приостановлен, а по стеку вызовов определяем, что данный метод был вызван из метода, который загружает малую коллекцию вопросов (40). Слева от него расположены еще 3 потока, причем два из них остановлены в этот момент на строке добавления вопроса в коллекцию, которые также обрабатывают малую коллекцию вопросов. И последний из них, 4 поток занимается обработкой и получает ответ в этот момент на вопрос, только уже для вопроса из большой коллекции (120). Все эти 4 потока были созданы в параллельном цикле и работают практически одновременно с остальными потоками итераций цикла. Эти потоки входят в общее число потоков в программе, которых 8, где оставшиеся 4 решают другие задачи программы.

Последний штрих — обработка исключений


И, наконец, для работы программы нам необходима обработка исключений. Вдруг с БД изменились поля или какие-то еще параметры или в самой программе возникла непредвиденная ошибка. Перепишем метод GetQuestionAsync():

public async Task<Dictionary<Int32, List<Question>>> GetQuestionAsync()
		{
			return await Task.Factory.StartNew(() =>
			{
				try
				{
					Generation();
					GetTheme();
					return Request();
				}
				catch (AggregateException ex)
				{
					throw new AggregateException(ex.Message, ex.InnerExceptions);
				}
				catch (Exception ex)
				{
					throw new AggregateException(ex.Message, ex.InnerException);
				}
			}, TaskCreationOptions.LongRunning);
		}

Обработка исключений AggregateException связана с тем, что цикл Parallel.For, если возникнет исключение, возбудит ошибку такого типа и, следовательно, ее необходимо обработать и передать вызывающей стороне. Вполне логично, что параллельный цикл генерирует ошибку такого рода. Рассмотрим данный момент поподробнее: для этого я изменил Sql-запрос в .GetQuestion(), заведомо неправильно указав один из параметров, который в таблице БД не существует. Получаем:

Ошибка
image

При этом если продолжить отладку, то всего данное исключение возникнет 4 раза, что вполне логично. Для того, чтобы обработать все 4, пусть и относящиеся к одной причине, необходимо как-то их расположить, для чего исключение AggregateException подходит.

Обработка Exception связана с тем, что если возникнет исключение в методе .GetTheme(), то там возникнет одно исключение и его также надо перехватить.

Вызывающий код устроен так:

private async void Button_Click(object sender, RoutedEventArgs e)
		{
			Stopwatch st = new Stopwatch();
			st.Start();
			try
			{
				SqlQuest = new SqlPerceptionQuestions();
				collectionQuest = await SqlQuest.GetQuestionAsync();
			}
			catch (AggregateException ex)
			{
				ex.ShowError();
			}
            catch(Exception ex)
			{
				ex.ShowError();
			}
			finally
			{
				if (SqlQuest != null)
					SqlQuest.Dispose();
			}
			st.Stop();
			Debug.WriteLine("Время выполнения метода: " + st.Elapsed.Seconds.ToString() + " секунд " + st.Elapsed.Milliseconds.ToString() + " миллисекунд");
			Debugger.Break();
		}


Напоследок...


Вообще началось это с того, что когда я писал параллельную версию цикла, я подумал, а как сильно влияет частое открытие и закрытие соединения с БД. Посещая форумы и расспрашивая умных людей я узнал про пул соединений. Потом задался вопросом, а необходимо ли его создавать или он создается в неявном виде? Немного поэкспериментировал и почитав документацию MySQL, в результате пришел к выводу, что очень уж удачная это вещь, пул соединений!

Весь код после изменений
//Базовый класс
public abstract class SqlBase : IDisposable
	{
		protected static readonly String Connect;
		protected readonly MySqlConnection SqlConnection;
		protected MySqlDataReader SqlDataReader;

		protected Boolean disposed;

		static SqlBase()
		{
			Connect = String.Format("Database={0};Data Source={1};User ID={2};Password={3};CharSet=utf8;CacheServerProperties=true",
				Settings.Default.Database, Settings.Default.DataSource, Settings.Default.UserId, Settings.Default.Password);
		}
		protected SqlBase()
		{
			Stopwatch st = new Stopwatch();
			st.Start();
			try
			{
				this.SqlConnection = new MySqlConnection(Connect);
				this.SqlConnection.Open();
			}
			catch (Exception ex)
			{
				throw new Exception(ex.Message, ex);
			}
			st.Stop();
			Debug.WriteLine("Время открытия соединения БД : " + st.Elapsed.Seconds.ToString() + " секунд " + st.Elapsed.Milliseconds.ToString() + " миллисекунд");
		}

		~SqlBase()
		{
			Dispose(false);
		}

		public void Dispose()
		{
			Dispose(true);
			GC.SuppressFinalize(this);
		}
		protected abstract void Dispose(Boolean disposing);
	}

//класс для выполнения запросов выборки
public sealed class SqlPerceptionQuestions : SqlBase
	{
		public async Task<Dictionary<Int32, List<Question>>> GetQuestionAsync()
		{
			return await Task.Factory.StartNew(() =>
			{
				try
				{
					Generation();
					GetTheme();
					return Request();
				}
				catch (AggregateException ex)
				{
					throw new AggregateException(ex.Message, ex.InnerExceptions);
				}
				catch (Exception ex)
				{
					throw new AggregateException(ex.Message, ex.InnerException);
				}
			}, TaskCreationOptions.LongRunning);
		}

		private Dictionary<Int32, List<Question>> Request()
		{
			var _collectionDictionary = new Dictionary<Int32, List<Question>>();
			var _po = new ParallelOptions { MaxDegreeOfParallelism = Environment.ProcessorCount };

			Parallel.For(0, 12, _po, ctr =>
				{
					using (var _questions = new SqlQuestions())
					{
						if (ctr < 8)
						{
							_collectionDictionary[ctr] = _questions.GetQuestionSmall((Int16)ctr);
						}
						else
						{
							_collectionDictionary[ctr] = _questions.GetQuestionGreat((Int16)ctr);
						}
					}
				});

			return _collectionDictionary;
		}

		private void GetTheme()
		{
			
		}

		private void Generation()
		{
			
		}

		protected override void Dispose(bool disposing)
		{
			if (!this.disposed)
			{
				if (SqlDataReader != null)
				{
					SqlDataReader.Close();
					SqlDataReader.Dispose();
				}
				SqlConnection.Close();
				SqlConnection.Dispose();
				MySqlConnection.ClearAllPools();
			}

			disposed = true;
		}
       //вложенный класс для осуществления удобного формирования параллельных запросов
		internal sealed class SqlQuestions : SqlBase
		{
			internal List<Question> GetQuestionSmall(Int16 numTheme)
			{
				var _listQuestions = new List<Question>();

				for (Int16 numCard = 0; numCard < 40; numCard++)
				{
					_listQuestions.Add(GetQuestion(numCard, numTheme));
				}
				return _listQuestions;
			}

			internal List<Question> GetQuestionGreat(Int16 numTheme)
			{
				var _listQuestions = new List<Question>();

				for (Int16 numQuestion = 0; numQuestion < 3; numQuestion++)
					for (int numCard = 0; numCard < 40; numCard++)
					{
						_listQuestions.Add(GetQuestion(numQuestion, numTheme, numQuestion));
					}

				return _listQuestions;
			}

			private Question GetQuestion(Int16 numCard, Int16 numTheme, Int16 numQuestion = 0)
			{
				
			}

			private List<String> GetResponse(Int32 questions_id)
			{
				
			}

			protected override void Dispose(bool disposing)
			{
				if (!this.disposed)
				{
					if (SqlDataReader != null)
					{
						SqlDataReader.Close();
						SqlDataReader.Dispose();
					}
					SqlConnection.Close();
					SqlConnection.Dispose();
				}
				disposed = true;
			}
		}
	}

Tags:
Hubs:
Total votes 9: ↑6 and ↓3+3
Comments7

Articles