home *** CD-ROM | disk | FTP | other *** search
/ Chip 2001 January / Chip_2001-01_cd1.bin / tema / mysql / mysql-3.23.28g-win-source.exe / sql / slave.cpp < prev    next >
C/C++ Source or Header  |  2000-11-22  |  32KB  |  1,174 lines

  1. /* Copyright (C) 2000 MySQL AB & MySQL Finland AB & TCX DataKonsult AB
  2.    
  3.    This program is free software; you can redistribute it and/or modify
  4.    it under the terms of the GNU General Public License as published by
  5.    the Free Software Foundation; either version 2 of the License, or
  6.    (at your option) any later version.
  7.    
  8.    This program is distributed in the hope that it will be useful,
  9.    but WITHOUT ANY WARRANTY; without even the implied warranty of
  10.    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  11.    GNU General Public License for more details.
  12.    
  13.    You should have received a copy of the GNU General Public License
  14.    along with this program; if not, write to the Free Software
  15.    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
  16.  
  17.  
  18. #include "mysql_priv.h"
  19. #include <mysql.h>
  20. #include "mini_client.h"
  21. #include "slave.h"
  22. #include <thr_alarm.h>
  23. #include <my_dir.h>
  24.  
  25. bool slave_running = 0;
  26. pthread_t slave_real_id;
  27. MASTER_INFO glob_mi;
  28. HASH replicate_do_table, replicate_ignore_table;
  29. DYNAMIC_ARRAY replicate_wild_do_table, replicate_wild_ignore_table;
  30. bool do_table_inited = 0, ignore_table_inited = 0;
  31. bool wild_do_table_inited = 0, wild_ignore_table_inited = 0;
  32. bool table_rules_on = 0;
  33. #ifndef DBUG_OFF
  34. int disconnect_slave_event_count = 0;
  35. static int events_till_disconnect = -1;
  36. static int stuck_count = 0;
  37. #endif
  38.  
  39.  
  40. static inline void skip_load_data_infile(NET* net);
  41. static inline bool slave_killed(THD* thd);
  42. static int init_slave_thread(THD* thd);
  43. static void safe_connect(THD* thd, MYSQL* mysql, MASTER_INFO* mi);
  44. static void safe_reconnect(THD* thd, MYSQL* mysql, MASTER_INFO* mi);
  45. static int safe_sleep(THD* thd, int sec);
  46. static int request_table_dump(MYSQL* mysql, char* db, char* table);
  47. static int create_table_from_dump(THD* thd, NET* net, const char* db,
  48.                   const char* table_name);
  49. static inline char* rewrite_db(char* db);
  50. static void free_table_ent(TABLE_RULE_ENT* e)
  51. {
  52.   my_free((gptr) e, MYF(0));
  53. }
  54.  
  55. static byte* get_table_key(TABLE_RULE_ENT* e, uint* len,
  56.                my_bool not_used __attribute__((unused)))
  57. {
  58.   *len = e->key_len;
  59.   return (byte*)e->db;
  60. }
  61.  
  62.  
  63. void init_table_rule_hash(HASH* h, bool* h_inited)
  64. {
  65.   hash_init(h, TABLE_RULE_HASH_SIZE,0,0,
  66.         (hash_get_key) get_table_key,
  67.         (void (*)(void*)) free_table_ent, 0);
  68.   *h_inited = 1;
  69. }
  70.  
  71. void init_table_rule_array(DYNAMIC_ARRAY* a, bool* a_inited)
  72. {
  73.   init_dynamic_array(a, sizeof(TABLE_RULE_ENT*), TABLE_RULE_ARR_SIZE,
  74.              TABLE_RULE_ARR_SIZE);
  75.   *a_inited = 1;
  76. }
  77.  
  78. static TABLE_RULE_ENT* find_wild(DYNAMIC_ARRAY *a, const char* key, int len)
  79. {
  80.   uint i;
  81.   const char* key_end = key + len;
  82.   
  83.   for(i = 0; i < a->elements; i++)
  84.     {
  85.       TABLE_RULE_ENT* e ;
  86.       get_dynamic(a, (gptr)&e, i);
  87.       if(!wild_case_compare(key, key_end, (const char*)e->db,
  88.                 (const char*)(e->db + e->key_len),'\\'))
  89.     return e;
  90.     }
  91.   
  92.   return 0;
  93. }
  94.  
  95. int tables_ok(THD* thd, TABLE_LIST* tables)
  96. {
  97.   for(; tables; tables = tables->next)
  98.     {
  99.       if(!tables->updating) 
  100.     continue;
  101.       char hash_key[2*NAME_LEN+2];
  102.       char* p;
  103.       p = strmov(hash_key, tables->db ? tables->db : thd->db);
  104.       *p++ = '.';
  105.       uint len = strmov(p, tables->real_name) - hash_key ;
  106.       if(do_table_inited) // if there are any do's
  107.     {
  108.       if(hash_search(&replicate_do_table, (byte*) hash_key, len))
  109.         return 1;
  110.     }
  111.       if(ignore_table_inited) // if there are any do's
  112.     {
  113.       if(hash_search(&replicate_ignore_table, (byte*) hash_key, len))
  114.         return 0; 
  115.     }
  116.       if(wild_do_table_inited && find_wild(&replicate_wild_do_table,
  117.                        hash_key, len)) return 1;
  118.       if(wild_ignore_table_inited && find_wild(&replicate_wild_ignore_table,
  119.                        hash_key, len)) return 0;
  120.     }
  121.  
  122.   return !do_table_inited && !wild_do_table_inited;
  123.   // if no explicit rule found
  124.   // and there was a do list, do not replicate. If there was
  125.   // no do list, go ahead
  126. }
  127.  
  128.  
  129. int add_table_rule(HASH* h, const char* table_spec)
  130. {
  131.   char* dot = strchr(table_spec, '.');
  132.   if(!dot) return 1;
  133.   uint len = (uint)strlen(table_spec);
  134.   if(!len) return 1;
  135.   TABLE_RULE_ENT* e = (TABLE_RULE_ENT*)my_malloc(sizeof(TABLE_RULE_ENT)
  136.                          + len, MYF(MY_WME));
  137.   if(!e) return 1;
  138.   e->db = (char*)e + sizeof(TABLE_RULE_ENT);
  139.   e->tbl_name = e->db + (dot - table_spec) + 1;
  140.   e->key_len = len;
  141.   memcpy(e->db, table_spec, len);
  142.   (void)hash_insert(h, (byte*)e);
  143.   return 0;
  144. }
  145.  
  146. int add_wild_table_rule(DYNAMIC_ARRAY* a, const char* table_spec)
  147. {
  148.   char* dot = strchr(table_spec, '.');
  149.   if(!dot) return 1;
  150.   uint len = (uint)strlen(table_spec);
  151.   if(!len) return 1;
  152.   TABLE_RULE_ENT* e = (TABLE_RULE_ENT*)my_malloc(sizeof(TABLE_RULE_ENT)
  153.                          + len, MYF(MY_WME));
  154.   if(!e) return 1;
  155.   e->db = (char*)e + sizeof(TABLE_RULE_ENT);
  156.   e->tbl_name = e->db + (dot - table_spec) + 1;
  157.   e->key_len = len;
  158.   memcpy(e->db, table_spec, len);
  159.   insert_dynamic(a, (gptr)&e);
  160.   return 0;
  161. }
  162.  
  163.  
  164. static inline bool slave_killed(THD* thd)
  165. {
  166.   return abort_slave || abort_loop || thd->killed;
  167. }
  168.  
  169. static inline void skip_load_data_infile(NET* net)
  170. {
  171.   (void)my_net_write(net, "\xfb/dev/null", 10);
  172.   (void)net_flush(net);
  173.   (void)my_net_read(net); // discard response
  174.   send_ok(net); // the master expects it
  175. }
  176.  
  177. static inline char* rewrite_db(char* db)
  178. {
  179.   if(replicate_rewrite_db.is_empty() || !db) return db;
  180.   I_List_iterator<i_string_pair> it(replicate_rewrite_db);
  181.   i_string_pair* tmp;
  182.  
  183.   while((tmp=it++))
  184.     {
  185.       if(!strcmp(tmp->key, db))
  186.     return tmp->val;
  187.     }
  188.  
  189.   return db;
  190. }
  191.  
  192. int db_ok(const char* db, I_List<i_string> &do_list,
  193.       I_List<i_string> &ignore_list )
  194. {
  195.   if(do_list.is_empty() && ignore_list.is_empty())
  196.     return 1; // ok to replicate if the user puts no constraints
  197.  
  198.   if(!db)
  199.     return 0; // if the user has specified restrictions on which databases to replicate
  200.   // and db was not selected, do not replicate
  201.  
  202.   if(!do_list.is_empty()) // if the do's are not empty
  203.     {
  204.       I_List_iterator<i_string> it(do_list);
  205.       i_string* tmp;
  206.  
  207.       while((tmp=it++))
  208.     {
  209.       if(!strcmp(tmp->ptr, db))
  210.         return 1; // match
  211.     }
  212.       return 0;
  213.     }
  214.   else // there are some elements in the don't, otherwise we cannot get here
  215.     {
  216.       I_List_iterator<i_string> it(ignore_list);
  217.       i_string* tmp;
  218.  
  219.       while((tmp=it++))
  220.     {
  221.       if(!strcmp(tmp->ptr, db))
  222.         return 0; // match
  223.     }
  224.       
  225.       return 1;
  226.       
  227.     }
  228. }
  229.  
  230. static void init_strvar_from_file(char* var, int max_size, FILE* f,
  231.                    char* default_val)
  232. {
  233.  
  234.   if(fgets(var, max_size, f)) 
  235.     {
  236.       char* last_p = strend(var) - 1;
  237.       if(*last_p == '\n') *last_p = 0; // if we stopped on newline, kill it
  238.       else
  239.     while( (fgetc(f) != '\n' && !feof(f)));
  240.       // if we truncated a line or stopped on last char, remove all chars
  241.       // up to and including newline
  242.     }
  243.   else if(default_val)
  244.    strmake(var,  default_val, max_size);
  245. }
  246.  
  247. static void init_intvar_from_file(int* var, FILE* f,
  248.                    int default_val)
  249. {
  250.   char buf[32];
  251.   
  252.   if(fgets(buf, sizeof(buf), f)) 
  253.     {
  254.       *var = atoi(buf);
  255.     }
  256.   else if(default_val)
  257.    *var = default_val;
  258. }
  259.  
  260.  
  261. static int create_table_from_dump(THD* thd, NET* net, const char* db,
  262.                   const char* table_name)
  263. {
  264.   uint packet_len = my_net_read(net); // read create table statement
  265.   TABLE_LIST tables;
  266.   int error = 0;
  267.   
  268.   if(packet_len == packet_error)
  269.     {
  270.       send_error(&thd->net, ER_MASTER_NET_READ);
  271.       return 1;
  272.     }
  273.   if(net->read_pos[0] == 255) // error from master
  274.     {
  275.       net->read_pos[packet_len] = 0;
  276.       net_printf(&thd->net, ER_MASTER, net->read_pos + 3);
  277.       return 1;
  278.     }
  279.   thd->command = COM_TABLE_DUMP;
  280.   thd->query = sql_alloc(packet_len + 1);
  281.   if(!thd->query)
  282.     {
  283.       sql_print_error("create_table_from_dump: out of memory");
  284.       net_printf(&thd->net, ER_GET_ERRNO, "Out of memory");
  285.       return 1;
  286.     }
  287.   memcpy(thd->query, net->read_pos, packet_len);
  288.   thd->query[packet_len] = 0;
  289.   thd->current_tablenr = 0;
  290.   thd->query_error = 0;
  291.   thd->net.no_send_ok = 1;
  292.   thd->proc_info = "Creating table from master dump";
  293.   char* save_db = thd->db;
  294.   thd->db = thd->last_nx_db; // in case we are creating in a different
  295.   // database
  296.   mysql_parse(thd, thd->query, packet_len); // run create table
  297.   thd->db = save_db; // leave things the way the were before
  298.   
  299.   if(thd->query_error)
  300.     {
  301.       close_thread_tables(thd); // mysql_parse takes care of the error send
  302.       return 1;
  303.     }
  304.  
  305.   bzero((char*) &tables,sizeof(tables));
  306.   tables.db = (char*)db;
  307.   tables.name = tables.real_name = (char*)table_name;
  308.   tables.lock_type = TL_WRITE;
  309.   thd->proc_info = "Opening master dump table";
  310.   if(!open_ltable(thd, &tables, TL_WRITE))
  311.     {
  312.       // open tables will send the error
  313.       sql_print_error("create_table_from_dump: could not open created table");
  314.       close_thread_tables(thd);
  315.       return 1;
  316.     }
  317.   
  318.   handler *file = tables.table->file;
  319.   thd->proc_info = "Reading master dump table data";
  320.   if(file->net_read_dump(net))
  321.     {
  322.       net_printf(&thd->net, ER_MASTER_NET_READ);
  323.       sql_print_error("create_table_from_dump::failed in\
  324.  handler::net_read_dump()");
  325.       close_thread_tables(thd);
  326.       return 1;
  327.     }
  328.  
  329.   HA_CHECK_OPT check_opt;
  330.   check_opt.init();
  331.   check_opt.quick = 1;
  332.   thd->proc_info = "rebuilding the index on master dump table";
  333.   Vio* save_vio = thd->net.vio;
  334.   thd->net.vio = 0; // we do not want repair() to spam us with messages
  335.   // just send them to the error log, and report the failure in case of
  336.   // problems
  337.   if(file->repair(thd,&check_opt ))
  338.     {
  339.       net_printf(&thd->net, ER_INDEX_REBUILD,tables.table->real_name );
  340.       error = 1;
  341.     }
  342.   thd->net.vio = save_vio;
  343.   close_thread_tables(thd);
  344.   
  345.   thd->net.no_send_ok = 0;
  346.   return error; 
  347. }
  348.  
  349. int fetch_nx_table(THD* thd, MASTER_INFO* mi)
  350. {
  351.   MYSQL* mysql = mc_mysql_init(NULL);
  352.   int error = 1;
  353.   int nx_errno = 0;
  354.   if(!mysql)
  355.     {
  356.       sql_print_error("fetch_nx_table: Error in mysql_init()");
  357.       nx_errno = ER_GET_ERRNO;
  358.       goto err;
  359.     }
  360.  
  361.   safe_connect(thd, mysql, mi);
  362.   if(slave_killed(thd))
  363.     goto err;
  364.  
  365.   if(request_table_dump(mysql, thd->last_nx_db, thd->last_nx_table))
  366.     {
  367.       nx_errno = ER_GET_ERRNO;
  368.       sql_print_error("fetch_nx_table: failed on table dump request ");
  369.       goto err;
  370.     }
  371.  
  372.   if(create_table_from_dump(thd, &mysql->net, thd->last_nx_db,
  373.                 thd->last_nx_table))
  374.     {
  375.       // create_table_from_dump will have sent the error alread
  376.       sql_print_error("fetch_nx_table: failed on create table ");
  377.       goto err;
  378.     }
  379.   
  380.   error = 0;
  381.  err:
  382.   if(mysql)
  383.     {
  384.      mc_mysql_close(mysql);
  385.      mysql = 0;
  386.     }
  387.   if(nx_errno && thd->net.vio)
  388.     send_error(&thd->net, nx_errno, "Error in fetch_nx_table");
  389.   
  390.   return error;
  391. }
  392.  
  393. int init_master_info(MASTER_INFO* mi)
  394. {
  395.   FILE* file;
  396.   MY_STAT stat_area;
  397.   char fname[FN_REFLEN+128];
  398.   fn_format(fname, master_info_file, mysql_data_home, "", 4+16+32);
  399.   
  400.  
  401.   // we need a mutex while we are changing master info parameters to
  402.   // keep other threads from reading bogus info
  403.  
  404.   pthread_mutex_lock(&mi->lock);
  405.   mi->pending = 0;
  406.   
  407.   if(!my_stat(fname, &stat_area, MYF(0))) // we do not want any messages
  408.     // if the file does not exist
  409.     {
  410.       file = my_fopen(fname, O_CREAT|O_RDWR|O_BINARY, MYF(MY_WME));
  411.       if(!file)
  412.     {
  413.       pthread_mutex_unlock(&mi->lock);
  414.       return 1;
  415.     }
  416.       mi->log_file_name[0] = 0;
  417.       mi->pos = 4; // skip magic number
  418.       mi->file = file;
  419.       
  420.       if(master_host)
  421.         strmake(mi->host, master_host, sizeof(mi->host) - 1);
  422.       if(master_user)
  423.         strmake(mi->user, master_user, sizeof(mi->user) - 1);
  424.       if(master_password)
  425.         strmake(mi->password, master_password, sizeof(mi->password) - 1);
  426.       mi->port = master_port;
  427.       mi->connect_retry = master_connect_retry;
  428.       
  429.       if(flush_master_info(mi))
  430.     {
  431.       pthread_mutex_unlock(&mi->lock);
  432.       return 1;
  433.     }
  434.     }
  435.   else
  436.     {
  437.       file = my_fopen(fname, O_RDWR|O_BINARY, MYF(MY_WME));
  438.       if(!file)
  439.     {
  440.       pthread_mutex_unlock(&mi->lock);
  441.       return 1;
  442.     }
  443.       
  444.       if(!fgets(mi->log_file_name, sizeof(mi->log_file_name), file))
  445.     {
  446.       sql_print_error("Error reading log file name from master info file ");
  447.       pthread_mutex_unlock(&mi->lock);
  448.           return 1;
  449.     }
  450.  
  451.       *(strend(mi->log_file_name) - 1) = 0; // kill \n
  452.       char buf[FN_REFLEN];
  453.       if(!fgets(buf, sizeof(buf), file))
  454.     {
  455.       sql_print_error("Error reading log file position from master info file");
  456.       pthread_mutex_unlock(&mi->lock);
  457.       return 1;
  458.     }
  459.  
  460.       mi->pos = atoi(buf);
  461.       mi->file = file;
  462.       init_strvar_from_file(mi->host, sizeof(mi->host), file, master_host);
  463.       init_strvar_from_file(mi->user, sizeof(mi->user), file, master_user); 
  464.       init_strvar_from_file(mi->password, sizeof(mi->password), file,
  465.              master_password);
  466.       
  467.       init_intvar_from_file((int*)&mi->port, file, master_port);    
  468.       init_intvar_from_file((int*)&mi->connect_retry, file,
  469.                 master_connect_retry);
  470.       
  471.     }
  472.   
  473.   mi->inited = 1;
  474.   pthread_mutex_unlock(&mi->lock);
  475.   
  476.   return 0;
  477. }
  478.  
  479. int show_master_info(THD* thd)
  480. {
  481.   DBUG_ENTER("show_master_info");
  482.   List<Item> field_list;
  483.   field_list.push_back(new Item_empty_string("Master_Host",
  484.                              sizeof(glob_mi.host)));
  485.   field_list.push_back(new Item_empty_string("Master_User",
  486.                              sizeof(glob_mi.user)));
  487.   field_list.push_back(new Item_empty_string("Master_Port", 6));
  488.   field_list.push_back(new Item_empty_string("Connect_retry", 6));
  489.   field_list.push_back( new Item_empty_string("Log_File",
  490.                              FN_REFLEN));
  491.   field_list.push_back(new Item_empty_string("Pos", 12));
  492.   field_list.push_back(new Item_empty_string("Slave_Running", 3));
  493.   field_list.push_back(new Item_empty_string("Replicate_do_db", 20));
  494.   field_list.push_back(new Item_empty_string("Replicate_ignore_db", 20));
  495.   if(send_fields(thd, field_list, 1))
  496.     DBUG_RETURN(-1);
  497.  
  498.   String* packet = &thd->packet;
  499.   packet->length(0);
  500.   
  501.   pthread_mutex_lock(&glob_mi.lock);
  502.   net_store_data(packet, glob_mi.host);
  503.   net_store_data(packet, glob_mi.user);
  504.   net_store_data(packet, (uint32) glob_mi.port);
  505.   net_store_data(packet, (uint32) glob_mi.connect_retry);
  506.   net_store_data(packet, glob_mi.log_file_name);
  507.   net_store_data(packet, (longlong)glob_mi.pos);
  508.   pthread_mutex_unlock(&glob_mi.lock);
  509.   pthread_mutex_lock(&LOCK_slave);
  510.   net_store_data(packet, slave_running ? "Yes":"No");
  511.   pthread_mutex_unlock(&LOCK_slave);
  512.   net_store_data(packet, &replicate_do_db);
  513.   net_store_data(packet, &replicate_ignore_db);
  514.   
  515.   if(my_net_write(&thd->net, (char*)thd->packet.ptr(), packet->length()))
  516.     DBUG_RETURN(-1);
  517.  
  518.   send_eof(&thd->net);
  519.   DBUG_RETURN(0);
  520. }
  521.  
  522. int flush_master_info(MASTER_INFO* mi)
  523. {
  524.   FILE* file = mi->file;
  525.   char lbuf[22];
  526.   
  527.   if(my_fseek(file, 0L, MY_SEEK_SET, MYF(MY_WME)) == MY_FILEPOS_ERROR ||
  528.      fprintf(file, "%s\n%s\n%s\n%s\n%s\n%d\n%d\n",
  529.         mi->log_file_name, llstr(mi->pos, lbuf), mi->host, mi->user, mi->password,
  530.          mi->port, mi->connect_retry) < 0 ||
  531.      fflush(file))
  532.     {
  533.       sql_print_error("Write error flushing master_info: %d", errno);
  534.       return 1;
  535.     }
  536.  
  537.   return 0;
  538. }
  539.  
  540.  
  541. static int init_slave_thread(THD* thd)
  542. {
  543.   DBUG_ENTER("init_slave_thread");
  544.   thd->system_thread = thd->bootstrap = 1;
  545.   thd->client_capabilities = 0;
  546.   my_net_init(&thd->net, 0);
  547.   thd->max_packet_length=thd->net.max_packet;
  548.   thd->master_access= ~0;
  549.   thd->priv_user = 0;
  550.   thd->slave_thread = 1;
  551.   thd->options = (((opt_log_slave_updates) ? OPTION_BIN_LOG:0)
  552.     | OPTION_AUTO_COMMIT | OPTION_AUTO_IS_NULL) ;
  553.   thd->system_thread = 1;
  554.   thd->client_capabilities = CLIENT_LOCAL_FILES;
  555.   slave_real_id=thd->real_id=pthread_self();
  556.   pthread_mutex_lock(&LOCK_thread_count);
  557.   thd->thread_id = thread_id++;
  558.   pthread_mutex_unlock(&LOCK_thread_count);
  559.  
  560.   if (init_thr_lock() ||
  561.       my_pthread_setspecific_ptr(THR_THD,  thd) ||
  562.       my_pthread_setspecific_ptr(THR_MALLOC, &thd->mem_root) ||
  563.       my_pthread_setspecific_ptr(THR_NET,  &thd->net))
  564.   {
  565.     close_connection(&thd->net,ER_OUT_OF_RESOURCES); // is this needed?
  566.     end_thread(thd,0);
  567.     DBUG_RETURN(-1);
  568.   }
  569.  
  570.   thd->mysys_var=my_thread_var;
  571.   thd->dbug_thread_id=my_thread_id();
  572. #ifndef __WIN__
  573.   sigset_t set;
  574.   VOID(sigemptyset(&set));            // Get mask in use
  575.   VOID(pthread_sigmask(SIG_UNBLOCK,&set,&thd->block_signals));
  576. #endif
  577.  
  578.   thd->mem_root.free=thd->mem_root.used=0;    // Probably not needed
  579.   if (thd->max_join_size == (ulong) ~0L)
  580.     thd->options |= OPTION_BIG_SELECTS;
  581.  
  582.   thd->proc_info="Waiting for master update";
  583.   thd->version=refresh_version;
  584.   thd->set_time();
  585.  
  586.   DBUG_RETURN(0);
  587. }
  588.  
  589. static int safe_sleep(THD* thd, int sec)
  590. {
  591.   thr_alarm_t alarmed;
  592.   thr_alarm_init(&alarmed);
  593.   time_t start_time= time((time_t*) 0);
  594.   time_t end_time= start_time+sec;
  595.   ALARM  alarm_buff;
  596.  
  597.   while (start_time < end_time)
  598.   {
  599.     int nap_time = (int) (end_time - start_time);
  600.     thr_alarm(&alarmed, 2 * nap_time,&alarm_buff); // the only reason we are asking for alarm is so that
  601.     // we will be woken up in case of murder, so if we do not get killed, set the alarm
  602.     // so it goes off after we wake up naturally
  603.     sleep(nap_time);
  604.     if (thr_alarm_in_use(&alarmed)) // if we wake up before the alarm goes off, hit the button
  605.       thr_end_alarm(&alarmed);     // so it will not wake up the wife and kids :-)
  606.     
  607.     if (slave_killed(thd))
  608.       return 1;
  609.     start_time=time((time_t*) 0);
  610.   }
  611.   return 0;
  612. }
  613.  
  614.  
  615. static int request_dump(MYSQL* mysql, MASTER_INFO* mi)
  616. {
  617.   char buf[FN_REFLEN + 10];
  618.   int len;
  619.   int binlog_flags = 0; // for now
  620.   char* logname = mi->log_file_name;
  621.   int4store(buf, mi->pos);
  622.   int2store(buf + 4, binlog_flags);
  623.   int4store(buf + 6, server_id);
  624.   len = (uint) strlen(logname);
  625.   memcpy(buf + 10, logname,len);
  626.   if(mc_simple_command(mysql, COM_BINLOG_DUMP, buf, len + 10, 1))
  627.     // something went wrong, so we will just reconnect and retry later
  628.     // in the future, we should do a better error analysis, but for
  629.     // now we just fill up the error log :-)
  630.     {
  631.       sql_print_error("Error on COM_BINLOG_DUMP: %s, will retry in %d secs",
  632.               mc_mysql_error(mysql), master_connect_retry);
  633.       return 1;
  634.     }
  635.  
  636.   return 0;
  637. }
  638.  
  639. static int request_table_dump(MYSQL* mysql, char* db, char* table)
  640. {
  641.   char buf[1024];
  642.   char * p = buf;
  643.   uint table_len = (uint) strlen(table);
  644.   uint db_len = (uint) strlen(db);
  645.   if(table_len + db_len > sizeof(buf) - 2)
  646.     {
  647.       sql_print_error("request_table_dump: Buffer overrun");
  648.       return 1;
  649.     } 
  650.   
  651.   *p++ = db_len;
  652.   memcpy(p, db, db_len);
  653.   p += db_len;
  654.   *p++ = table_len;
  655.   memcpy(p, table, table_len);
  656.   
  657.   if(mc_simple_command(mysql, COM_TABLE_DUMP, buf, p - buf + table_len, 1))
  658.     {
  659.       sql_print_error("request_table_dump: Error sending the table dump \
  660. command");
  661.       return 1;
  662.     }
  663.  
  664.   return 0;
  665. }
  666.  
  667.  
  668. static uint read_event(MYSQL* mysql, MASTER_INFO *mi)
  669. {
  670.   uint len = packet_error;
  671.   int read_errno = EINTR; // for convinience lets think we start by
  672.   // being in the interrupted state :-)
  673.   // my_real_read() will time us out
  674.   // we check if we were told to die, and if not, try reading again
  675. #ifndef DBUG_OFF
  676.   if(disconnect_slave_event_count && !(events_till_disconnect--))
  677.     return packet_error;      
  678. #endif
  679.   
  680.   while (!abort_loop && !abort_slave && len == packet_error && read_errno == EINTR )
  681.   {
  682.     len = mc_net_safe_read(mysql);
  683.     read_errno = errno;
  684.   }
  685.   if(abort_loop || abort_slave)
  686.     return packet_error;
  687.   if (len == packet_error || (int) len < 1)
  688.   {
  689.     sql_print_error("Error reading packet from server: %s (read_errno %d,\
  690. server_errno=%d)",
  691.             mc_mysql_error(mysql), read_errno, mc_mysql_errno(mysql));
  692.     return packet_error;
  693.   }
  694.  
  695.   if(len == 1)
  696.     {
  697.      sql_print_error("Received 0 length packet from server, looks like master shutdown: %s (%d)",
  698.             mc_mysql_error(mysql), read_errno);
  699.      return packet_error;
  700.     }
  701.   
  702.   DBUG_PRINT("info",( "len=%u, net->read_pos[4] = %d\n",
  703.               len, mysql->net.read_pos[4]));
  704.  
  705.   return len - 1;   
  706. }
  707.  
  708.  
  709. static int exec_event(THD* thd, NET* net, MASTER_INFO* mi, int event_len)
  710. {
  711.   Log_event * ev = Log_event::read_log_event((const char*)net->read_pos + 1,
  712.                          event_len);
  713.   
  714.   if (ev)
  715.   {
  716.     int type_code = ev->get_type_code();
  717.     if(ev->server_id == ::server_id)
  718.       {
  719.     if(type_code == LOAD_EVENT)
  720.       skip_load_data_infile(net);
  721.     
  722.     mi->inc_pos(event_len);
  723.     flush_master_info(mi);
  724.     delete ev;     
  725.     return 0; // avoid infinite update loops
  726.       }
  727.   
  728.     thd->server_id = ev->server_id; // use the original server id for logging
  729.     thd->set_time(); // time the query
  730.     ev->when = time(NULL);
  731.     
  732.     switch(type_code)
  733.     {
  734.     case QUERY_EVENT:
  735.     {
  736.       Query_log_event* qev = (Query_log_event*)ev;
  737.       int q_len = qev->q_len;
  738.       init_sql_alloc(&thd->mem_root, 8192,0);
  739.       thd->db = rewrite_db((char*)qev->db);
  740.       if(db_ok(thd->db, replicate_do_db, replicate_ignore_db))
  741.       {
  742.     thd->query = (char*)qev->query;
  743.     thd->set_time((time_t)qev->when);
  744.     thd->current_tablenr = 0;
  745.     VOID(pthread_mutex_lock(&LOCK_thread_count));
  746.     thd->query_id = query_id++;
  747.     VOID(pthread_mutex_unlock(&LOCK_thread_count));
  748.     thd->last_nx_table = thd->last_nx_db = 0;
  749.     thd->query_error = 0; // clear error
  750.     thd->net.last_errno = 0;
  751.     thd->net.last_error[0] = 0;
  752.     mysql_parse(thd, thd->query, q_len);
  753.     int expected_error,actual_error;
  754.     if((expected_error = qev->error_code) !=
  755.        (actual_error = thd->net.last_errno) && expected_error)
  756.       {
  757.         sql_print_error("Slave: did not get the expected error\
  758.  running query from master - expected: '%s', got '%s'",
  759.                 ER(expected_error),
  760.                 actual_error ? ER(actual_error):"no error"
  761.                 );
  762.         thd->query_error = 1;
  763.       }
  764.     else if(expected_error == actual_error)
  765.       thd->query_error = 0;
  766.       }
  767.       thd->db = 0;// prevent db from being freed
  768.       thd->query = 0; // just to be sure
  769.       thd->convert_set = 0; // assume no convert for next query
  770.       // unless set explictly
  771.       close_thread_tables(thd);
  772.       free_root(&thd->mem_root,0);
  773.       
  774.       if (thd->query_error)
  775.       {
  776.     sql_print_error("Slave:  error running query '%s' ",
  777.             qev->query);
  778.     delete ev;
  779.     return 1;
  780.       }
  781.       
  782.       delete ev;
  783.         
  784.       if(thd->fatal_error)
  785.       {
  786.     sql_print_error("Slave: Fatal error running query '%s' ",
  787.             thd->query);
  788.     return 1;
  789.       }
  790.  
  791.       mi->inc_pos(event_len);
  792.       flush_master_info(mi);
  793.       break;
  794.     }
  795.       
  796.     case LOAD_EVENT:
  797.     {
  798.       Load_log_event* lev = (Load_log_event*)ev;
  799.       init_sql_alloc(&thd->mem_root, 8192,0);
  800.       thd->db = rewrite_db((char*)lev->db);
  801.       thd->query = 0;
  802.       thd->query_error = 0;
  803.         
  804.       if(db_ok(thd->db, replicate_do_db, replicate_ignore_db))
  805.       {
  806.     thd->set_time((time_t)lev->when);
  807.     thd->current_tablenr = 0;
  808.     VOID(pthread_mutex_lock(&LOCK_thread_count));
  809.     thd->query_id = query_id++;
  810.     VOID(pthread_mutex_unlock(&LOCK_thread_count));
  811.  
  812.     enum enum_duplicates handle_dup = DUP_IGNORE;
  813.     if(lev->sql_ex.opt_flags && REPLACE_FLAG)
  814.       handle_dup = DUP_REPLACE;
  815.     sql_exchange ex((char*)lev->fname, lev->sql_ex.opt_flags &&
  816.             DUMPFILE_FLAG );
  817.     String field_term(&lev->sql_ex.field_term, 1),
  818.       enclosed(&lev->sql_ex.enclosed, 1),
  819.       line_term(&lev->sql_ex.line_term,1),
  820.       escaped(&lev->sql_ex.escaped, 1),
  821.       line_start(&lev->sql_ex.line_start, 1);
  822.         
  823.     ex.field_term = &field_term;
  824.     if(lev->sql_ex.empty_flags & FIELD_TERM_EMPTY)
  825.       ex.field_term->length(0);
  826.         
  827.     ex.enclosed = &enclosed;
  828.     if(lev->sql_ex.empty_flags & ENCLOSED_EMPTY)
  829.       ex.enclosed->length(0);
  830.  
  831.     ex.line_term = &line_term;
  832.     if(lev->sql_ex.empty_flags & LINE_TERM_EMPTY)
  833.       ex.line_term->length(0);
  834.  
  835.     ex.line_start = &line_start;
  836.     if(lev->sql_ex.empty_flags & LINE_START_EMPTY)
  837.       ex.line_start->length(0);
  838.  
  839.     ex.escaped = &escaped;
  840.     if(lev->sql_ex.empty_flags & ESCAPED_EMPTY)
  841.       ex.escaped->length(0);
  842.  
  843.     ex.opt_enclosed = (lev->sql_ex.opt_flags & OPT_ENCLOSED_FLAG);
  844.     if(lev->sql_ex.empty_flags & FIELD_TERM_EMPTY)
  845.       ex.field_term->length(0);
  846.         
  847.     ex.skip_lines = lev->skip_lines;
  848.         
  849.     TABLE_LIST tables;
  850.     bzero((char*) &tables,sizeof(tables));
  851.     tables.db = thd->db;
  852.     tables.name = tables.real_name = (char*)lev->table_name;
  853.     tables.lock_type = TL_WRITE;
  854.         
  855.     if (open_tables(thd, &tables))
  856.     {
  857.       sql_print_error("Slave:  error opening table %s ",
  858.               tables.name);
  859.       delete ev;
  860.       return 1;
  861.     }
  862.  
  863.     List<Item> fields;
  864.     lev->set_fields(fields);
  865.     thd->net.vio = net->vio;
  866.     // mysql_load will use thd->net to read the file
  867.     thd->net.pkt_nr = net->pkt_nr;
  868.     // make sure the client does get confused
  869.     // about the packet sequence
  870.     if(mysql_load(thd, &ex, &tables, fields, handle_dup, 1,
  871.               TL_WRITE))
  872.       thd->query_error = 1;
  873.     net->pkt_nr = thd->net.pkt_nr;
  874.       }
  875.       else // we will just ask the master to send us /dev/null if we do not want to
  876.     // load the data :-)
  877.       {
  878.     skip_load_data_infile(net);
  879.       }
  880.         
  881.       thd->net.vio = 0; 
  882.       thd->db = 0;// prevent db from being freed
  883.       close_thread_tables(thd);
  884.       if(thd->query_error)
  885.       {
  886.     int sql_error = thd->net.last_errno;
  887.     if(!sql_error)
  888.       sql_error = ER_UNKNOWN_ERROR;
  889.         
  890.     sql_print_error("Slave:  error '%s' running load data infile ",
  891.             ER(sql_error));
  892.     delete ev;
  893.     return 1;
  894.       }
  895.       delete ev;
  896.         
  897.       if(thd->fatal_error)
  898.       {
  899.     sql_print_error("Slave: Fatal error running query '%s' ",
  900.             thd->query);
  901.     return 1;
  902.       }
  903.  
  904.       mi->inc_pos(event_len);
  905.       flush_master_info(mi);
  906.       break;
  907.     }
  908.  
  909.     case START_EVENT:
  910.       mi->inc_pos(event_len);
  911.       flush_master_info(mi);
  912.       break;
  913.                   
  914.     case STOP_EVENT:
  915.       mi->inc_pos(event_len);
  916.       flush_master_info(mi);
  917.       break;
  918.     case ROTATE_EVENT:
  919.     {
  920.       Rotate_log_event* rev = (Rotate_log_event*)ev;
  921.       int ident_len = rev->ident_len;
  922.       memcpy(mi->log_file_name, rev->new_log_ident,ident_len );
  923.       mi->log_file_name[ident_len] = 0;
  924.       mi->pos = 4; // skip magic number
  925.       flush_master_info(mi);
  926.       break;
  927.     }
  928.  
  929.     case INTVAR_EVENT:
  930.     {
  931.       Intvar_log_event* iev = (Intvar_log_event*)ev;
  932.       switch(iev->type)
  933.       {
  934.       case LAST_INSERT_ID_EVENT:
  935.     thd->last_insert_id_used = 1;
  936.     thd->last_insert_id = iev->val;
  937.     break;
  938.       case INSERT_ID_EVENT:
  939.     thd->next_insert_id = iev->val;
  940.     break;
  941.         
  942.       }
  943.       mi->inc_pending(event_len);
  944.       break;
  945.     }
  946.     }
  947.                   
  948.   }
  949.   else
  950.   {
  951.     sql_print_error("Could not parse log event entry, check the master for binlog corruption\n\
  952.  This may also be a network problem, or just a bug in the master or slave code");
  953.     return 1;
  954.   }
  955.   return 0;      
  956. }
  957.       
  958. // slave thread
  959.  
  960. pthread_handler_decl(handle_slave,arg __attribute__((unused)))
  961. {
  962.   THD *thd;; // needs to be first for thread_stack
  963.   MYSQL *mysql = NULL ;
  964.  
  965.   if(!server_id)
  966.     {
  967.      sql_print_error("Server id not set, will not start slave");
  968.      pthread_exit((void*)1);
  969.     }
  970.   
  971.   pthread_mutex_lock(&LOCK_slave);
  972.   if(slave_running)
  973.     {
  974.       pthread_mutex_unlock(&LOCK_slave);
  975.       pthread_exit((void*)1);  // safety just in case
  976.     }
  977.   slave_running = 1;
  978.   abort_slave = 0;
  979.   pthread_mutex_unlock(&LOCK_slave);
  980.   
  981.   int error = 1;
  982.   bool retried_once = 0;
  983.   ulonglong last_failed_pos = 0;
  984.   
  985.   my_thread_init(); // needs to be up here, otherwise we get a coredump
  986.   // trying to use DBUG_ stuff
  987.   thd = new THD; // note that contructor of THD uses DBUG_ !
  988.   thd->set_time();
  989.   DBUG_ENTER("handle_slave");
  990.  
  991.   pthread_detach_this_thread();
  992.   if(init_slave_thread(thd) || init_master_info(&glob_mi))
  993.     goto err;
  994.   thd->thread_stack = (char*)&thd; // remember where our stack is
  995.  
  996.   threads.append(thd);
  997.   
  998.   DBUG_PRINT("info",("master info: log_file_name=%s, position=%d",
  999.              glob_mi.log_file_name, glob_mi.pos));
  1000.  
  1001.   mysql = mc_mysql_init(NULL);
  1002.   if(!mysql)
  1003.     {
  1004.       sql_print_error("Slave thread: error in mc_mysql_init()");
  1005.       goto err;
  1006.     }
  1007.   
  1008.   thd->proc_info = "connecting to master";
  1009.   safe_connect(thd, mysql, &glob_mi);
  1010.   
  1011.   while(!slave_killed(thd))
  1012.     {
  1013.       thd->proc_info = "requesting binlog dump";
  1014.       if(request_dump(mysql, &glob_mi))
  1015.     {
  1016.       sql_print_error("Failed on request_dump()");
  1017.       if(slave_killed(thd))
  1018.            goto err;
  1019.       
  1020.       thd->proc_info = "waiting to reconnect after a failed dump request";
  1021.       if(mysql->net.vio)
  1022.         vio_close(mysql->net.vio);
  1023.       // first time retry immediately, assuming that we can recover
  1024.       // right away - if first time fails, sleep between re-tries
  1025.       // hopefuly the admin can fix the problem sometime
  1026.       if(retried_once)
  1027.         safe_sleep(thd, glob_mi.connect_retry);
  1028.       else
  1029.         retried_once = 1;
  1030.       
  1031.       if(slave_killed(thd))
  1032.           goto err;
  1033.  
  1034.       thd->proc_info = "reconnecting after a failed dump request";
  1035.           sql_print_error("Slave: failed dump request, reconnecting to \
  1036. try again, master_log_pos=%ld", last_failed_pos = glob_mi.pos );
  1037.       safe_reconnect(thd, mysql, &glob_mi);
  1038.       if(slave_killed(thd))
  1039.           goto err;
  1040.  
  1041.       continue;
  1042.     }
  1043.  
  1044.  
  1045.       while(!slave_killed(thd))
  1046.     {
  1047.       thd->proc_info = "reading master update";
  1048.       uint event_len = read_event(mysql, &glob_mi);
  1049.       if(slave_killed(thd))
  1050.         goto err;
  1051.       
  1052.       if (event_len == packet_error)
  1053.       {
  1054.         thd->proc_info = "waiting to reconnect after a failed read";
  1055.         if(mysql->net.vio)
  1056.            vio_close(mysql->net.vio);
  1057.         if(retried_once) // punish repeat offender with sleep
  1058.           safe_sleep(thd, glob_mi.connect_retry);
  1059.         else
  1060.           retried_once = 1; 
  1061.         
  1062.         if(slave_killed(thd))
  1063.           goto err;
  1064.         thd->proc_info = "reconnecting after a failed read";
  1065.         sql_print_error("Slave: Failed reading log event, \
  1066. reconnecting to retry, master_log_pos=%ld", last_failed_pos = glob_mi.pos);
  1067.         safe_reconnect(thd, mysql, &glob_mi);
  1068.         if(slave_killed(thd))
  1069.           goto err;
  1070.         break;
  1071.       }
  1072.       
  1073.       thd->proc_info = "processing master log event"; 
  1074.       if(exec_event(thd, &mysql->net, &glob_mi, event_len))
  1075.         {
  1076.           sql_print_error("Error running query, slave aborted. Fix the problem, and re-start\
  1077.  the slave thread with mysqladmin start-slave");
  1078.           goto err;
  1079.           // there was an error running the query
  1080.           // abort the slave thread, when the problem is fixed, the user
  1081.           // should restart the slave with mysqladmin start-slave
  1082.         }
  1083.       
  1084.       // successful exec with offset advance,
  1085.       // the slave repents and his sins are forgiven!
  1086.       if(glob_mi.pos > last_failed_pos)
  1087.         {
  1088.          retried_once = 0;
  1089. #ifndef DBUG_OFF
  1090.          stuck_count = 0;
  1091. #endif
  1092.         }
  1093. #ifndef DBUG_OFF
  1094.       else
  1095.         {
  1096.           stuck_count++;
  1097.         // show a little mercy, allow slave to read one more event
  1098.            // before cutting him off - otherwise he gets stuck
  1099.            // on Invar events, since they do not advance the offset
  1100.            // immediately
  1101.           if(stuck_count > 2)
  1102.             events_till_disconnect++;
  1103.         }
  1104. #endif      
  1105.  
  1106.     }
  1107.     }
  1108.  
  1109.   error = 0;
  1110.  err:
  1111.   thd->query = thd->db = 0; // extra safety
  1112.   if(mysql)
  1113.     {
  1114.       mc_mysql_close(mysql);
  1115.       mysql = 0;
  1116.     }
  1117.   thd->proc_info = "waiting for slave mutex on exit";
  1118.   pthread_mutex_lock(&LOCK_slave);
  1119.   slave_running = 0;
  1120.   abort_slave = 0;
  1121.   pthread_cond_broadcast(&COND_slave_stopped); // tell the world we are done
  1122.   pthread_mutex_unlock(&LOCK_slave);
  1123.   delete thd;
  1124.   my_thread_end();
  1125.   pthread_exit(0);
  1126.   DBUG_RETURN(0);                // Can't return anything here
  1127. }
  1128.  
  1129. static void safe_connect(THD* thd, MYSQL* mysql, MASTER_INFO* mi)
  1130.   // will try to connect until successful
  1131. {
  1132. #ifndef DBUG_OFF
  1133.   events_till_disconnect = disconnect_slave_event_count;
  1134. #endif  
  1135.   while(!slave_killed(thd) &&
  1136.     !mc_mysql_connect(mysql, mi->host, mi->user, mi->password, 0,
  1137.               mi->port, 0, 0))
  1138.   {
  1139.     sql_print_error(
  1140.             "Slave thread: error connecting to master:%s, retry in %d sec",
  1141.             mc_mysql_error(mysql), mi->connect_retry);
  1142.     safe_sleep(thd, mi->connect_retry);
  1143.   }
  1144.   
  1145.   mysql_log.write(thd, COM_CONNECT_OUT, "%s@%s:%d",
  1146.           mi->user, mi->host, mi->port);
  1147.   
  1148. }
  1149.  
  1150. // will try to connect until successful
  1151.  
  1152. static void safe_reconnect(THD* thd, MYSQL* mysql, MASTER_INFO* mi)
  1153. {
  1154.   mi->pending = 0; // if we lost connection after reading a state set event
  1155.   // we will be re-reading it, so pending needs to be cleared
  1156. #ifndef DBUG_OFF
  1157.   events_till_disconnect = disconnect_slave_event_count;
  1158. #endif
  1159.   while(!slave_killed(thd) && mc_mysql_reconnect(mysql))
  1160.   {
  1161.     sql_print_error("Slave thread: error connecting to master:\
  1162. %s, retry in %d sec",
  1163.             mc_mysql_error(mysql), mi->connect_retry);
  1164.      safe_sleep(thd, mi->connect_retry);
  1165.   }
  1166.   
  1167. }
  1168.  
  1169. #ifdef __GNUC__
  1170. template class I_List_iterator<i_string>;
  1171. template class I_List_iterator<i_string_pair>;
  1172. #endif
  1173.  
  1174.