Выгрузка лога PostgreSQL из облака AWS

    Или немного прикладной тетрисологии.
    Всё новое- хорошо забытое старое.

    Эпиграфы


    Постановка задачи


    Необходимо периодически загружать текущий лог-файл PostgreSQL из облака AWS на локальный Linux хост. Не в реальном времени, но, скажем так, с небольшой задержкой.
    Период загрузки обновления лог-файла — 5 минут.
    Лог-файл, в AWS, ротируется каждый час.

    Используемые инструменты


    Для загрузки лог-файла на хост используется bash-скрипт, вызывающий AWS API «aws rds download-db-log-file-portion».

    Параметры


    • --db-instance-identifier: Имя инстанса в AWS;
    • --log-file-name: имя текущего сформированного лог-файла
    • --max-item: Общее количество элементов, возвращаемых в выходных данных команды.Размер порции загружаемого файла.
    • --starting-token: Метка начальной порции

    В данном конкретном случае, задача загрузки логов возникла по ходу работ над мониторингом производительности запросов PostgreSQL.

    Да и просто — интересная задача, для тренировки и разнообразия в ходе рабочего времени.
    Предположу, что задача в силу обыденности уже решена. Но быстрый гугл решений не подсказал, а искать более углубленно не было особого желания. В любом случае — неплохая тренировка.

    Формализация задачи


    Конечный лог-файл представляет собой множество строк переменной длины. Графически, лог-файл можно представить, примерно так:



    Уже что-то напоминает? При чём тут «тетрис»? А вот, при чем.

    Если представить возможные варианты, возникающие при загрузке очередного файла графически (для простоты, в данном случае, пусть строки имеют одну длину), получатся стандартные фигуры тетриса:

    1) Файл загружен целиком и является конечным. Размер порции больше размера конечного файла:



    2) Файл имеет продолжение. Размер порции меньше размера конечного файла:



    3) Файл является продолжением предыдущего файла и имеет продолжение. Размер порции меньше размера остатка конечного файла:



    4) Файл является продолжением предыдущего файла и является конечным. Размер порции больше размера остатка конечного файла:



    Задача — собрать прямоугольник или поиграть в тетрис, на новом уровне.



    Проблемы, возникающие по ходу решения задачи


    1) Склеить строку из 2-х порций



    В общем-то никаких особых проблем не возникло. Стандартная задача из начального курса программирования.

    Оптимальный размер порции


    А вот это, несколько интереснее.

    К сожалению, нет возможности использовать смещение после метки начальной порции:
    As you already know the option --starting-token is used to specify where to start paginating. This option takes String values which would mean that if you try to add an offset value in front of the Next Token string, the option will not be taken into consideration as an offset.
    И поэтому, приходится читать кусками-порциями.

    Если читать большими порциями, то количество чтений будет минимальным, но объем будет максимальным.

    Если читать маленькими порциями, то наоборот, количество чтений будет максимальным, но зато объем будет минимальным.

    Поэтому, для сокращения трафика и для общей красоты решения, пришлось придумать некое решение, к сожалению, немного смахивающее на костыль.

    Для иллюстрации, рассмотрим процесс загрузки лог-файла в 2-х сильно упрощенных вариантах. Количество чтений в обоих случаях зависит от размера порции.

    1) Загружаем малыми порциями:



    2) Загружаем большими порциями:



    Как обычно, оптимальное решение-посредине.

    Размер порции минимальный, но в процессе чтения, размер можно увеличивать, для сокращения числа чтений.

    Нужно отметить, что полностью задача подбора оптимального размера считываемой порции пока не решена и требует более глубокой проработки и анализа. Может, быть, чуть позже.

    Общее описание реализации


    Используемые сервисные таблицы


    CREATE TABLE endpoint
    (
    id SERIAL ,
    host text 
    );
    
    TABLE database
    (
    id SERIAL , 
    …
    last_aws_log_time text ,
    last_aws_nexttoken text ,
    aws_max_item_size integer 
    );
    last_aws_log_time — временная метка последнего загруженного лог-файла в формате YYYY-MM-DD-HH24.
    last_aws_nexttoken — текстовая метка последней загруженной порции.
    aws_max_item_size- эмпирическим путем, подобранный начальный размер порции.
    

    Полный текст скрипта


    download_aws_piece.sh
    #!/bin/bash
    #########################################################
    # download_aws_piece.sh
    # downloan piece of log from AWS
    # version HABR
     let min_item_size=1024
     let max_item_size=1048576
     let growth_factor=3
     let growth_counter=1
     let growth_counter_max=3
    
     echo $(date +%Y%m%d%H%M)':    download_aws_piece.sh:''STARTED'
     
     AWS_LOG_TIME=$1
     echo $(date +%Y%m%d%H%M)':    download_aws_piece.sh:AWS_LOG_TIME='$AWS_LOG_TIME
      
     database_id=$2
     echo $(date +%Y%m%d%H%M)':    download_aws_piece.sh:database_id='$database_id
     RESULT_FILE=$3 
      
     endpoint=`psql -h MONITOR_ENDPOINT.rds.amazonaws.com -U USER -d MONITOR_DATABASE_DATABASE -A -t -c "select e.host from endpoint e join database d on e.id = d.endpoint_id where d.id = $database_id "`
     echo $(date +%Y%m%d%H%M)':    download_aws_piece.sh:endpoint='$endpoint
      
     db_instance=`echo $endpoint | awk -F"." '{print toupper($1)}'`
     
     echo $(date +%Y%m%d%H%M)':    download_aws_piece.sh:db_instance='$db_instance
    
     LOG_FILE=$RESULT_FILE'.tmp_log'
     TMP_FILE=$LOG_FILE'.tmp'
     TMP_MIDDLE=$LOG_FILE'.tmp_mid'  
     TMP_MIDDLE2=$LOG_FILE'.tmp_mid2'  
      
     current_aws_log_time=`psql -h MONITOR_ENDPOINT.rds.amazonaws.com -U USER -d MONITOR_DATABASE -A -t -c "select last_aws_log_time from database where id = $database_id "`
    
     echo $(date +%Y%m%d%H%M)':      download_aws_piece.sh:current_aws_log_time='$current_aws_log_time
      
      if [[ $current_aws_log_time != $AWS_LOG_TIME  ]];
      then
        is_new_log='1'
    	if ! psql -h MONITOR_ENDPOINT.rds.amazonaws.com -U USER -d MONITOR_DATABASE -v ON_ERROR_STOP=1 -A -t -q -c "update database set last_aws_log_time = '$AWS_LOG_TIME' where id = $database_id "
    	then
    	  echo $(date +%Y%m%d%H%M)':    download_aws_piece.sh: FATAL_ERROR - update database set last_aws_log_time .'
    	  exit 1
    	fi
      else
        is_new_log='0'
      fi
      
      echo $(date +%Y%m%d%H%M)':    download_aws_piece.sh:is_new_log='$is_new_log
      
      let last_aws_max_item_size=`psql -h MONITOR_ENDPOINT.rds.amazonaws.com -U USER -d MONITOR_DATABASE -A -t -c "select aws_max_item_size from database where id = $database_id "`
      echo $(date +%Y%m%d%H%M)':    download_aws_piece.sh: last_aws_max_item_size='$last_aws_max_item_size
      
      let count=1
      if [[ $is_new_log == '1' ]];
      then    
    	echo $(date +%Y%m%d%H%M)':    download_aws_piece.sh: START DOWNLOADING OF NEW AWS LOG'
    	if ! aws rds download-db-log-file-portion \
    		--max-items $last_aws_max_item_size \
    		--region REGION \
    		--db-instance-identifier  $db_instance \
    		--log-file-name error/postgresql.log.$AWS_LOG_TIME > $LOG_FILE
    	then
    		echo $(date +%Y%m%d%H%M)':    download_aws_piece.sh: FATAL_ERROR - Could not get log from AWS .'
    		exit 2
    	fi  	
      else
        next_token=`psql -h MONITOR_ENDPOINT.rds.amazonaws.com -U USER -d MONITOR_DATABASE -v ON_ERROR_STOP=1 -A -t -c "select last_aws_nexttoken from database where id = $database_id "`
    	
    	if [[ $next_token == '' ]];
    	then
    	  next_token='0'	  
    	fi
    	
    	echo $(date +%Y%m%d%H%M)':    download_aws_piece.sh: CONTINUE DOWNLOADING OF AWS LOG'
    	if ! aws rds download-db-log-file-portion \
    	    --max-items $last_aws_max_item_size \
    		--starting-token $next_token \
    		--region REGION \
    		--db-instance-identifier  $db_instance \
    		--log-file-name error/postgresql.log.$AWS_LOG_TIME > $LOG_FILE
    	then
    		echo $(date +%Y%m%d%H%M)':    download_aws_piece.sh: FATAL_ERROR - Could not get log from AWS .'
    		exit 3
    	fi       
    	
    	line_count=`cat  $LOG_FILE | wc -l`
    	let lines=$line_count-1
    	  
    	tail -$lines $LOG_FILE > $TMP_MIDDLE 
    	mv -f $TMP_MIDDLE $LOG_FILE
      fi
      
      next_token_str=`cat $LOG_FILE | grep NEXTTOKEN` 
      next_token=`echo $next_token_str | awk -F" " '{ print $2}' `
      
      grep -v NEXTTOKEN $LOG_FILE  > $TMP_FILE 
      
      if [[ $next_token == '' ]];
      then
    	  cp $TMP_FILE $RESULT_FILE
    	  
    	  echo $(date +%Y%m%d%H%M)':    download_aws_piece.sh:  NEXTTOKEN NOT FOUND - FINISH '
    	  rm $LOG_FILE 
    	  rm $TMP_FILE
    	  rm $TMP_MIDDLE
              rm $TMP_MIDDLE2	  
    	  exit 0  
      else
    	psql -h MONITOR_ENDPOINT.rds.amazonaws.com -U USER -d MONITOR_DATABASE -v ON_ERROR_STOP=1 -A -t -q -c "update database set last_aws_nexttoken = '$next_token' where id = $database_id "
      fi
      
      first_str=`tail -1 $TMP_FILE`
      
      line_count=`cat  $TMP_FILE | wc -l`
      let lines=$line_count-1    
      
      head -$lines $TMP_FILE  > $RESULT_FILE
    
    ###############################################
    # MAIN CIRCLE
      let count=2
      while [[ $next_token != '' ]];
      do 
        echo $(date +%Y%m%d%H%M)':    download_aws_piece.sh: count='$count
    	
    	echo $(date +%Y%m%d%H%M)':    download_aws_piece.sh: START DOWNLOADING OF AWS LOG'
    	if ! aws rds download-db-log-file-portion \
                 --max-items $last_aws_max_item_size \
                 --starting-token $next_token \
                 --region REGION \
                 --db-instance-identifier  $db_instance \
                 --log-file-name error/postgresql.log.$AWS_LOG_TIME > $LOG_FILE
    	then
    		echo $(date +%Y%m%d%H%M)':    download_aws_piece.sh: FATAL_ERROR - Could not get log from AWS .'
    		exit 4
    	fi
    
    	next_token_str=`cat $LOG_FILE | grep NEXTTOKEN` 
    	next_token=`echo $next_token_str | awk -F" " '{ print $2}' `
    
    	TMP_FILE=$LOG_FILE'.tmp'
    	grep -v NEXTTOKEN $LOG_FILE  > $TMP_FILE  
    	
    	last_str=`head -1 $TMP_FILE`
      
        if [[ $next_token == '' ]];
    	then
    	  concat_str=$first_str$last_str
    	  	  
    	  echo $concat_str >> $RESULT_FILE
    		 
    	  line_count=`cat  $TMP_FILE | wc -l`
    	  let lines=$line_count-1
    	  
    	  tail -$lines $TMP_FILE >> $RESULT_FILE
    	  
    	  echo $(date +%Y%m%d%H%M)':    download_aws_piece.sh:  NEXTTOKEN NOT FOUND - FINISH '
    	  rm $LOG_FILE 
    	  rm $TMP_FILE
    	  rm $TMP_MIDDLE
              rm $TMP_MIDDLE2	  
    	  exit 0  
    	fi
    	
        if [[ $next_token != '' ]];
    	then
    		let growth_counter=$growth_counter+1
    		if [[ $growth_counter -gt $growth_counter_max ]];
    		then
    			let last_aws_max_item_size=$last_aws_max_item_size*$growth_factor
    			let growth_counter=1
    		fi
    	
    		if [[ $last_aws_max_item_size -gt $max_item_size ]]; 
    		then
    			let last_aws_max_item_size=$max_item_size
    		fi 
    
    	  psql -h MONITOR_ENDPOINT.rds.amazonaws.com -U USER -d MONITOR_DATABASE -A -t -q -c "update database set last_aws_nexttoken = '$next_token' where id = $database_id "
    	  
    	  concat_str=$first_str$last_str
    	  	  
    	  echo $concat_str >> $RESULT_FILE
    		 
    	  line_count=`cat  $TMP_FILE | wc -l`
    	  let lines=$line_count-1
    	  
    	  #############################
    	  #Get middle of file
    	  head -$lines $TMP_FILE > $TMP_MIDDLE
    	  
    	  line_count=`cat  $TMP_MIDDLE | wc -l`
    	  let lines=$line_count-1
    	  tail -$lines $TMP_MIDDLE > $TMP_MIDDLE2
    	  
    	  cat $TMP_MIDDLE2 >> $RESULT_FILE	  
    	  
    	  first_str=`tail -1 $TMP_FILE`	  
    	fi
    	  
        let count=$count+1
    
      done
    #
    #################################################################
    
    exit 0  
    

    Фрагменты скрипта с некоторыми пояснениями:


    Входные параметры скрипта:

    • Временная метка имени лог-файла в формате YYYY-MM-DD-HH24: AWS_LOG_TIME=$1
    • ID Базы данных: database_id=$2
    • Имя собранного лог-файла: RESULT_FILE=$3

    Получить временную метку последнего загруженного лог-файла:

    current_aws_log_time=`psql -h MONITOR_ENDPOINT.rds.amazonaws.com -U USER -d MONITOR_DATABASE -A -t -c "select last_aws_log_time from database where id = $database_id "`

    Если временная метка последнего загруженного лог-файла не совпадает с входным параметром — загружается новый лог-файл:

    if [[ $current_aws_log_time != $AWS_LOG_TIME  ]];
      then
        is_new_log='1'
    	if ! psql -h ENDPOINT.rds.amazonaws.com -U USER -d MONITOR_DATABASE -v ON_ERROR_STOP=1 -A -t -c "update database set last_aws_log_time = '$AWS_LOG_TIME' where id = $database_id "
    	then
    	  echo '***download_aws_piece.sh -FATAL_ERROR - update database set last_aws_log_time .'
    	  exit 1
    	fi
      else
        is_new_log='0'
      fi
    

    Получаем значение метки nexttoken из загруженного файла:

      next_token_str=`cat $LOG_FILE | grep NEXTTOKEN` 
      next_token=`echo $next_token_str | awk -F" " '{ print $2}' `
    

    Признаком окончания загрузки служит пустое значение nexttoken.

    В цикле считаем порции файла, попутно, сцепляя строки и увеличивая размер порции:

    Главный цикл
    # MAIN CIRCLE
      let count=2
      while [[ $next_token != '' ]];
      do 
        echo $(date +%Y%m%d%H%M)':    download_aws_piece.sh: count='$count
    	
    	echo $(date +%Y%m%d%H%M)':    download_aws_piece.sh: START DOWNLOADING OF AWS LOG'
    	if ! aws rds download-db-log-file-portion \
         --max-items $last_aws_max_item_size \
    	 --starting-token $next_token \
         --region REGION \
         --db-instance-identifier  $db_instance \
         --log-file-name error/postgresql.log.$AWS_LOG_TIME > $LOG_FILE
    	then
    		echo $(date +%Y%m%d%H%M)':    download_aws_piece.sh: FATAL_ERROR - Could not get log from AWS .'
    		exit 4
    	fi
    
    	next_token_str=`cat $LOG_FILE | grep NEXTTOKEN` 
    	next_token=`echo $next_token_str | awk -F" " '{ print $2}' `
    
    	TMP_FILE=$LOG_FILE'.tmp'
    	grep -v NEXTTOKEN $LOG_FILE  > $TMP_FILE  
    	
    	last_str=`head -1 $TMP_FILE`
      
        if [[ $next_token == '' ]];
    	then
    	  concat_str=$first_str$last_str
    	  	  
    	  echo $concat_str >> $RESULT_FILE
    		 
    	  line_count=`cat  $TMP_FILE | wc -l`
    	  let lines=$line_count-1
    	  
    	  tail -$lines $TMP_FILE >> $RESULT_FILE
    	  
    	  echo $(date +%Y%m%d%H%M)':    download_aws_piece.sh:  NEXTTOKEN NOT FOUND - FINISH '
    	  rm $LOG_FILE 
    	  rm $TMP_FILE
    	  rm $TMP_MIDDLE
             rm $TMP_MIDDLE2	  
    	  exit 0  
    	fi
    	
        if [[ $next_token != '' ]];
    	then
    		let growth_counter=$growth_counter+1
    		if [[ $growth_counter -gt $growth_counter_max ]];
    		then
    			let last_aws_max_item_size=$last_aws_max_item_size*$growth_factor
    			let growth_counter=1
    		fi
    	
    		if [[ $last_aws_max_item_size -gt $max_item_size ]]; 
    		then
    			let last_aws_max_item_size=$max_item_size
    		fi 
    
    	  psql -h MONITOR_ENDPOINT.rds.amazonaws.com -U USER -d MONITOR_DATABASE -A -t -q -c "update database set last_aws_nexttoken = '$next_token' where id = $database_id "
    	  
    	  concat_str=$first_str$last_str
    	  	  
    	  echo $concat_str >> $RESULT_FILE
    		 
    	  line_count=`cat  $TMP_FILE | wc -l`
    	  let lines=$line_count-1
    	  
    	  #############################
    	  #Get middle of file
    	  head -$lines $TMP_FILE > $TMP_MIDDLE
    	  
    	  line_count=`cat  $TMP_MIDDLE | wc -l`
    	  let lines=$line_count-1
    	  tail -$lines $TMP_MIDDLE > $TMP_MIDDLE2
    	  
    	  cat $TMP_MIDDLE2 >> $RESULT_FILE	  
    	  
    	  first_str=`tail -1 $TMP_FILE`	  
    	fi
    	  
        let count=$count+1
    
      done
    



    Что же дальше ?


    Итак, первая промежуточная задача — «загрузить лог-файл с облака» решена. Что делать с загруженным логом?
    Для начала необходимо разобрать лог-файл и выделить из него собственно запросы.
    Задача не сильно сложная. Простейший bash-script вполне справляется.
    upload_log_query.sh
    #!/bin/bash
    #########################################################
    # upload_log_query.sh
    # Upload table table from dowloaded aws file 
    # version HABR
    ###########################################################  
    echo 'TIMESTAMP:'$(date +%c)' Upload log_query table '
    source_file=$1
    echo 'source_file='$source_file
    database_id=$2
    echo 'database_id='$database_id
    
    beginer=' '
    first_line='1'
    let "line_count=0"
    sql_line=' '
    sql_flag=' '    
    space=' '
    cat $source_file | while read line
    do
      line="$space$line"
    
      if [[ $first_line == "1" ]]; then
        beginer=`echo $line | awk -F" " '{ print $1}' `
        first_line='0'
      fi
    
      current_beginer=`echo $line | awk -F" " '{ print $1}' `
    
      if [[ $current_beginer == $beginer ]]; then
        if [[ $sql_flag == '1' ]]; then
         sql_flag='0' 
         log_date=`echo $sql_line | awk -F" " '{ print $1}' `
         log_time=`echo $sql_line | awk -F" " '{ print $2}' `
         duration=`echo $sql_line | awk -F" " '{ print $5}' `
    
         #replace ' to ''
         sql_modline=`echo "$sql_line" | sed 's/'\''/'\'''\''/g'`
         sql_line=' '
    
    	 ################
    	 #PROCESSING OF THE SQL-SELECT IS HERE
         if ! psql -h ENDPOINT.rds.amazonaws.com -U USER -d DATABASE -v ON_ERROR_STOP=1 -A -t -c "select log_query('$ip_port',$database_id , '$log_date' , '$log_time' , '$duration' , '$sql_modline' )" 
         then
            echo 'FATAL_ERROR - log_query '
            exit 1
         fi
    	 ################
    
        fi #if [[ $sql_flag == '1' ]]; then
    
        let "line_count=line_count+1"
    
        check=`echo $line | awk -F" " '{ print $8}' `
        check_sql=${check^^}    
    
        #echo 'check_sql='$check_sql
        
        if [[ $check_sql == 'SELECT' ]]; then
         sql_flag='1'    
         sql_line="$sql_line$line"
    	 ip_port=`echo $sql_line | awk -F":" '{ print $4}' `
        fi
      else       
    
        if [[ $sql_flag == '1' ]]; then
          sql_line="$sql_line$line"
        fi   
        
      fi #if [[ $current_beginer == $beginer ]]; then
    
    done
    


    Теперь с выделенным из лог-файла запросом, можно работать.

    А полезных возможностей открывается несколько.

    Разобранные запросы надо где-то хранить. Для этого используется сервисная таблица log_query
    CREATE TABLE log_query
    (
       id SERIAL ,
       queryid bigint ,
       query_md5hash text not null ,
       database_id integer not null ,  
       timepoint timestamp without time zone not null,
       duration double precision not null ,
       query text not null ,
       explained_plan text[],
       plan_md5hash text  , 
       explained_plan_wo_costs text[],
       plan_hash_value text  ,
       baseline_id integer ,
       ip text ,
       port text 
    );
    ALTER TABLE log_query ADD PRIMARY KEY (id);
    ALTER TABLE log_query ADD CONSTRAINT queryid_timepoint_unique_key UNIQUE (queryid, timepoint );
    ALTER TABLE log_query ADD CONSTRAINT query_md5hash_timepoint_unique_key UNIQUE (query_md5hash, timepoint );
    
    CREATE INDEX log_query_timepoint_idx ON log_query (timepoint);
    CREATE INDEX log_query_queryid_idx ON log_query (queryid);
    ALTER TABLE log_query ADD CONSTRAINT database_id_fk FOREIGN KEY (database_id) REFERENCES database (id) ON DELETE CASCADE ;
    


    Обработка разобранного запроса осуществляется в plpgsql функции «log_query».
    log_query.sql
    --log_query.sql
    --verison HABR
    CREATE OR REPLACE FUNCTION log_query( ip_port text ,log_database_id integer , log_date text , log_time text , duration text , sql_line text   ) RETURNS boolean AS $$
    DECLARE
      result boolean ;
      log_timepoint timestamp without time zone ;
      log_duration double precision ; 
      pos integer ;
      log_query text ;
      activity_string text ;
      log_md5hash text ;
      log_explain_plan text[] ;
      
      log_planhash text ;
      log_plan_wo_costs text[] ; 
      
      database_rec record ;
      
      pg_stat_query text ; 
      test_log_query text ;
      log_query_rec record;
      found_flag boolean;
      
      pg_stat_history_rec record ;
      port_start integer ;
      port_end integer ;
      client_ip text ;
      client_port text ;
      log_queryid bigint ;
      log_query_text text ;
      pg_stat_query_text text ; 
    BEGIN
      result = TRUE ;
    
      RAISE NOTICE '***log_query';
      
      port_start = position('(' in ip_port);
      port_end = position(')' in ip_port);
      client_ip = substring( ip_port from 1 for port_start-1 );
      client_port = substring( ip_port from port_start+1 for port_end-port_start-1 );
    
      SELECT e.host , d.name , d.owner_pwd 
      INTO database_rec
      FROM database d JOIN endpoint e ON e.id = d.endpoint_id
      WHERE d.id = log_database_id ;
      
      log_timepoint = to_timestamp(log_date||' '||log_time,'YYYY-MM-DD HH24-MI-SS');
      log_duration = duration:: double precision; 
    
      
      pos = position ('SELECT' in UPPER(sql_line) );
      log_query = substring( sql_line from pos for LENGTH(sql_line));
      log_query = regexp_replace(log_query,' +',' ','g');
      log_query = regexp_replace(log_query,';+','','g');
      log_query = trim(trailing ' ' from log_query);
     
    
      log_md5hash = md5( log_query::text );
      
      --Explain execution plan--
      EXECUTE 'SELECT dblink_connect(''LINK1'',''host='||database_rec.host||' dbname='||database_rec.name||' user=DATABASE password='||database_rec.owner_pwd||' '')'; 
      
      log_explain_plan = ARRAY ( SELECT * FROM dblink('LINK1', 'EXPLAIN '||log_query ) AS t (plan text) );
      log_plan_wo_costs = ARRAY ( SELECT * FROM dblink('LINK1', 'EXPLAIN ( COSTS FALSE ) '||log_query ) AS t (plan text) );
        
      PERFORM dblink_disconnect('LINK1');
      --------------------------
      BEGIN
    	INSERT INTO log_query
    	(
    		query_md5hash ,
    		database_id , 
    		timepoint ,
    		duration ,
    		query ,
    		explained_plan ,
    		plan_md5hash , 
    		explained_plan_wo_costs , 
    		plan_hash_value , 
    		ip , 
    		port
    	) 
    	VALUES 
    	(
    		log_md5hash ,
    		log_database_id , 
    		log_timepoint , 
    		log_duration , 
    		log_query ,
    		log_explain_plan , 
    		md5(log_explain_plan::text) ,
    		log_plan_wo_costs , 
    		md5(log_plan_wo_costs::text),
    		client_ip , 
    		client_port		
    	);
    	activity_string = 	'New query has logged '||
    						' database_id = '|| log_database_id ||
    						' query_md5hash='||log_md5hash||
    						' , timepoint = '||to_char(log_timepoint,'YYYYMMDD HH24:MI:SS');
    					
    	RAISE NOTICE '%',activity_string;					
    					 
    	PERFORM pg_log( log_database_id , 'log_query' , activity_string);  
    
    	EXCEPTION
    	  WHEN unique_violation THEN
    		RAISE NOTICE '*** unique_violation *** query already has logged';
    	END;
    
    	SELECT 	queryid
    	INTO   	log_queryid
    	FROM 	log_query 
    	WHERE 	query_md5hash = log_md5hash AND
    			timepoint = log_timepoint;
    
    	IF log_queryid IS NOT NULL 
    	THEN 
    	  RAISE NOTICE 'log_query with query_md5hash = % and timepoint = % has already has a QUERYID = %',log_md5hash,log_timepoint , log_queryid ;
    	  RETURN result;
    	END IF;
    	
    	------------------------------------------------
    	RAISE NOTICE 'Update queryid';	
    	
    	SELECT * 
    	INTO log_query_rec
    	FROM log_query
    	WHERE query_md5hash = log_md5hash AND timepoint = log_timepoint ; 
    	
    	log_query_rec.query=regexp_replace(log_query_rec.query,';+','','g');
    	
    	FOR pg_stat_history_rec IN
    	 SELECT 
             queryid ,
    	  query 
    	 FROM 
             pg_stat_db_queries 
         WHERE  
          database_id = log_database_id AND
           queryid is not null 
    	LOOP
    	  pg_stat_query = pg_stat_history_rec.query ; 
    	  pg_stat_query=regexp_replace(pg_stat_query,'\n+',' ','g');
    	  pg_stat_query=regexp_replace(pg_stat_query,'\t+',' ','g');
    	  pg_stat_query=regexp_replace(pg_stat_query,' +',' ','g');
    	  pg_stat_query=regexp_replace(pg_stat_query,'\$.','%','g');
    	
    	  log_query_text = trim(trailing ' ' from log_query_rec.query);
    	  pg_stat_query_text = pg_stat_query; 
    	
    	  
    	  --SELECT log_query_rec.query like pg_stat_query INTO found_flag ; 
    	  IF (log_query_text LIKE pg_stat_query_text) THEN
    		found_flag = TRUE ;
    	  ELSE
    		found_flag = FALSE ;
    	  END IF;	  
    	  
    	  
    	  IF found_flag THEN
    	    
    		UPDATE log_query SET queryid = pg_stat_history_rec.queryid WHERE query_md5hash = log_md5hash AND timepoint = log_timepoint ;
    		activity_string = 	' updated queryid = '||pg_stat_history_rec.queryid||
    		                    ' for log_query with id = '||log_query_rec.id               
    		   				    ;						
    	    RAISE NOTICE '%',activity_string;	
    		EXIT ;
    	  END IF ;
    	  
    	END LOOP ;
    	
      RETURN result ;
    END
    $$ LANGUAGE plpgsql;
    

    При обработке используется сервисная таблица pg_stat_db_queries, содержащая снимок текущих запросов из таблицы pg_stat_history (Использование таблицы описано здесь — Мониторинг производительности запросов PostgreSQL. Часть 1 — репортинг)

    TABLE pg_stat_db_queries
    (
       database_id integer,  
       queryid bigint ,  
       query text , 
       max_time double precision 
    );
    

    TABLE pg_stat_history 
    (
    …
    database_id integer ,
    …
    queryid bigint ,
    …
    max_time double precision	 , 	
    …
    );
    

    Функция позволяет осуществить ряд полезных возможностей по обработке запросов из лог-файла.
    А именно:

    Возможность №1 — История выполнения запросов


    Очень полезно для начала решения инцидента производительности. Сначала ознакомиться с историей — а когда началось замедление?

    Затем, по классике – поискать внешние причины. Может быть просто резко увеличилась загрузка базы данных и конкретный запрос ни при чем.

    Добавить новую запись в таблицу log_query
      port_start = position('(' in ip_port);
      port_end = position(')' in ip_port);
      client_ip = substring( ip_port from 1 for port_start-1 );
      client_port = substring( ip_port from port_start+1 for port_end-port_start-1 );
    
      SELECT e.host , d.name , d.owner_pwd 
      INTO database_rec
      FROM database d JOIN endpoint e ON e.id = d.endpoint_id
      WHERE d.id = log_database_id ;
      
      log_timepoint = to_timestamp(log_date||' '||log_time,'YYYY-MM-DD HH24-MI-SS');
      log_duration = to_number(duration,'99999999999999999999D9999999999'); 
    
      
      pos = position ('SELECT' in UPPER(sql_line) );
      log_query = substring( sql_line from pos for LENGTH(sql_line));
      log_query = regexp_replace(log_query,' +',' ','g');
      log_query = regexp_replace(log_query,';+','','g');
      log_query = trim(trailing ' ' from log_query);
     
      RAISE NOTICE 'log_query=%',log_query ;   
    
      log_md5hash = md5( log_query::text );
      
      --Explain execution plan--
      EXECUTE 'SELECT dblink_connect(''LINK1'',''host='||database_rec.host||' dbname='||database_rec.name||' user=DATABASE password='||database_rec.owner_pwd||' '')'; 
      
      log_explain_plan = ARRAY ( SELECT * FROM dblink('LINK1', 'EXPLAIN '||log_query ) AS t (plan text) );
      log_plan_wo_costs = ARRAY ( SELECT * FROM dblink('LINK1', 'EXPLAIN ( COSTS FALSE ) '||log_query ) AS t (plan text) );
        
      PERFORM dblink_disconnect('LINK1');
      --------------------------
      BEGIN
    	INSERT INTO log_query
    	(
    		query_md5hash ,
    		database_id , 
    		timepoint ,
    		duration ,
    		query ,
    		explained_plan ,
    		plan_md5hash , 
    		explained_plan_wo_costs , 
    		plan_hash_value , 
    		ip , 
    		port
    	) 
    	VALUES 
    	(
    		log_md5hash ,
    		log_database_id , 
    		log_timepoint , 
    		log_duration , 
    		log_query ,
    		log_explain_plan , 
    		md5(log_explain_plan::text) ,
    		log_plan_wo_costs , 
    		md5(log_plan_wo_costs::text),
    		client_ip , 
    		client_port		
    	);
    

    Возможность №2 — Сохранять планы выполнения запросов


    На этом месте может возникнуть возражение-уточнение-комментарий: «Но ведь уже есть autoexplain». Есть то он есть, а что толку, если план выполнения хранится в том же лог-файле и для того, чтобы его сохранить для дальнейшего анализа, придётся парсить лог-файл?

    Мне же нужно было:

    во-первых: хранить план выполнения в сервисной таблице базы данных мониторинга;
    во-вторых: иметь возможности сравнивать планы выполнения между собой, что бы сразу видеть, что план выполнения запроса изменился.

    Запрос с конкретными параметрами выполнения имеется. Получить и сохранить его план выполнения, используя EXPLAIN — задача элементарная.

    Более того, используя выражение EXPLAIN (COSTS FALSE), можно получить каркас плана, который и будет использован для получения hash-значения плана, что поможет при последующем анализе истории изменения плана выполнения.

    Получить шаблон плана выполнения
      --Explain execution plan--
      EXECUTE 'SELECT dblink_connect(''LINK1'',''host='||database_rec.host||' dbname='||database_rec.name||' user=DATABASE password='||database_rec.owner_pwd||' '')'; 
      
      log_explain_plan = ARRAY ( SELECT * FROM dblink('LINK1', 'EXPLAIN '||log_query ) AS t (plan text) );
      log_plan_wo_costs = ARRAY ( SELECT * FROM dblink('LINK1', 'EXPLAIN ( COSTS FALSE ) '||log_query ) AS t (plan text) );
        
      PERFORM dblink_disconnect('LINK1');
    

    Возможность №3 — Использование лога запросов для мониторинга


    Поскольку метрики производительности настроены не на текст запроса, а на его ID, нужно связывать запросы из лог-файла с запросами для которых настроены метрики производительности.

    Ну хотя бы для того, чтобы иметь точное время возникновения инцидента производительности.

    Таким образом, при возникновении инцидента производительности для ID запроса, будет ссылка на конкретный запрос с конкретными значениями параметра и точным временем выполнения и длительности запроса. Получить данную информацию используя только представление pg_stat_statements — нельзя.

    Найти queryid запроса и обновить запись в таблице log_query
    SELECT * 
    	INTO log_query_rec
    	FROM log_query
    	WHERE query_md5hash = log_md5hash AND timepoint = log_timepoint ; 
    	
    	log_query_rec.query=regexp_replace(log_query_rec.query,';+','','g');
    	
    	FOR pg_stat_history_rec IN
    	 SELECT 
          queryid ,
    	  query 
    	 FROM 
           pg_stat_db_queries 
         WHERE  
    	   database_id = log_database_id AND
           queryid is not null 
    	LOOP
    	  pg_stat_query = pg_stat_history_rec.query ; 
    	  pg_stat_query=regexp_replace(pg_stat_query,'\n+',' ','g');
    	  pg_stat_query=regexp_replace(pg_stat_query,'\t+',' ','g');
    	  pg_stat_query=regexp_replace(pg_stat_query,' +',' ','g');
    	  pg_stat_query=regexp_replace(pg_stat_query,'\$.','%','g');
    	
    	  log_query_text = trim(trailing ' ' from log_query_rec.query);
    	  pg_stat_query_text = pg_stat_query; 
    	  
    	  --SELECT log_query_rec.query like pg_stat_query INTO found_flag ; 
    	  IF (log_query_text LIKE pg_stat_query_text) THEN
    		found_flag = TRUE ;
    	  ELSE
    		found_flag = FALSE ;
    	  END IF;	  
    	  
    	  
    	  IF found_flag THEN
    	    
    		UPDATE log_query SET queryid = pg_stat_history_rec.queryid WHERE query_md5hash = log_md5hash AND timepoint = log_timepoint ;
    		activity_string = 	' updated queryid = '||pg_stat_history_rec.queryid||
    		                    ' for log_query with id = '||log_query_rec.id		                    
    		   				    ;						
    					
    	    RAISE NOTICE '%',activity_string;	
    		EXIT ;
    	  END IF ;
    	  
    	END LOOP ;
    

    Послесловие


    Описанная методика в итоге, нашла себе применение в разрабатываемой системе мониторинга производительности запросов PostgreSQL, позволив иметь больше информации для анализа при решении возникающих инцидентов производительности запросов.

    Хотя, конечно, на мой личный авторский взгляд, нужно будет еще поработать над алгоритмом выбора и изменения размера загружаемой порции. Задача пока не решена в общем случае. Наверное, будет интересно.

    Но это уже совсем другая история …
    ICL Services
    102.38
    Цифровые технологии для бизнеса
    Share post

    Comments 0

    Only users with full accounts can post comments. Log in, please.