Обсуждение: Rolling my own replication
Hello, I would like some thoughts on the folowing plan. I plan to replicate several remote databases with a master db. The setup is in a small retail chain, so each store will have it's own postgres db making changes during the day. After hours it will didla up head office twice. The first time it will upload all it's new data, then upload any updated data. Then disconect allowing other stores ti dial up. The second dial up, some several hours later it will then downloadall new and changed data from the master db, their by being in sync with all stores before sunrise (on a good day :-) All replicated tables have the folowing columns: CREATE TABLE "sale_lines" ( "loc_seq_pkey" text DEFAULT set_primary_key() NOT NULL, "timestamp" timestamp DEFAULT 'now()', "version" int4 DEFAULT 0, "f_new" character varying, "f_update" character varying, PRIMARY KEY ("loc_seq_pkey") ); The loc_seq_pkey is basically a sequence on the locla machine with the store location id prepended. eg : 1-46 is location 1, 46th new row added anywhere in the db. This alows me to look at an incoming new row to be added and say "No. thats mine, no need to add it" and then check to see if it need updating instead. There is a triger on all replicated tables: CREATE FUNCTION "version_control" ( ) RETURNS opaque AS 'BEGIN IF TG_OP = ''UPDATE'' THEN IF NEW.f_update = ''RESET'' THEN NEW.f_update := ''NO''; NEW.f_new := ''FALSE''; ELSE NEW.f_update := ''YES''; NEW.version := OLD.version + 1; END IF; RETURN NEW; END IF; IF TG_OP = ''INSERT'' THEN NEW.f_new := ''TRUE''; RETURN NEW; END IF; IF TG_OP = ''DELETE'' THEN RETURN OLD; END IF; END; ' LANGUAGE 'plpgsql'; I was thinking of useing teh version field for checking if the data has been changed buy other sites before being sent back to the original site. Is this going to work or will it bog down and die? -- * * Rob Brown-Bayliss *
On Fri, 2002-07-26 at 00:30, Shanmugasundaram Doraisamy wrote: > Dear Rob, > Your posting was something simillar to what we were just going to > start. We have not yet formulated a way of doing it. We will keep you > posted if we hit upon any solution. Also please do let us know if it > worked and if you don't mind the final code that worked. Thanks in > advance, > > Yours sincerely, > Shan. Teh upload of updated rows workes, but I have only tested it on two machines over a lan, with 10 or so over a dial up I am expecting timing issues... The code is fiarly simple at the moment, and I shall be cleaning it up soon. But I am still a bit unsure of how it will all work in the field, and then with more than one slave I am unsure yet how to handle multiple updates to the same data, or to even check for them at the moment... basically I have a table in the database listing all tables I need to sync with the master db, A conection is opend to each database. I select * from eactable where f_new = True then take the results and insert them into the master like so (in python): def send_new(): "Hunts through the tables for new rows to sync with master" Lcr = DBcon.cursor() Mcr = MasterDBcon.cursor() sql = "SELECT * FROM syncro_tables" if verbose: print sql #ask master for tables to update. Mcr.execute(sql) tables = Mcr.fetchall() for i in tables: if log: syslog(LOG_INFO,"Scanning table " + i[0] + " for new data.") sql = "SELECT * from " sql = sql + i[0] sql = sql + " WHERE f_new='TRUE'" if verbose: print "Scanning table " + i[0] + " for new data." print sql Lcr.execute(sql) for j in range(Lcr.rowcount): result = Lcr.fetchone() sql = "INSERT INTO " + i[0] + " VALUES (" for k in range(len(result)): if result[k] == None: sql = sql + "NULL, " else: sql = sql + "'" + str(result[k]) + "', " sql = sql[:-2] sql = sql + ")" if verbose: print sql try: Mcr.execute(sql) except Error, Msg: msg = "SQL Statement was -> " msg = msg + sql if verbose: print Msg, msg if log: syslog(LOG_WARNING, "SQL Error on Master Database") syslog(LOG_WARNING, str(Msg)) syslog(LOG_WARNING, msg) MasterDBcon.commit() #reset new flag sql = "UPDATE " + i[0] + " SET f_update='RESET' WHERE f_new='FALSE'" if verbose: print sql Lcr.execute(sql) Lcr.close() DBcon.commit() Mcr.close() MasterDBcon.commit() def send_update(): "Hunts through the tables for updated rows to sync with master" Lcr = DBcon.cursor() Mcr = MasterDBcon.cursor() sql = "SELECT * FROM syncro_tables" if verbose: print sql #ask master for tables to update. Mcr.execute(sql) tables = Mcr.fetchall() for i in tables: if log: syslog(LOG_INFO,"Scanning table " + i[0] + " for updated data.") sql = "select attname from pg_class,pg_attribute where (pg_class.relname='" + i[0] +"') and (pg_class.oid=pg_attribute.attrelid) and (pg_attribute.attnum >= 0) order by attnum" Lcr.execute(sql) columns = Lcr.fetchall() if verbose: print "Columns in table " + i[0] +": " for l in range(len(columns)): print columns[l] sql = "SELECT * from " sql = sql + i[0] sql = sql + " WHERE f_update='YES' " if verbose: print "Scanning table " + i[0] + " for updated data." print sql Lcr.execute(sql) # build update and update master for j in range(Lcr.rowcount): result = Lcr.fetchone() sql = "UPDATE " + i[0] + " SET " # range from 5 as we dont want to update control fields for k in range(5,len(result)): cname = columns[k] sql = sql + cname[0] + " = " if result[k] == None: sql = sql + "NULL, " else: sql = sql + "'" + str(result[k]) + "', " sql = sql[:-2] sql = sql + " WHERE (sequence_key = '" + str(result[0]) + "') and (location_key = '" +str(result[1]) + "')" if verbose: print sql try: Mcr.execute(sql) except Error, Msg: msg = "SQL Statement was -> " msg = msg + sql if verbose: print Msg, msg if log: syslog(LOG_WARNING, "SQL Error on Master Database") syslog(LOG_WARNING, str(Msg)) syslog(LOG_WARNING, msg) MasterDBcon.commit() #reset new flag sql = "UPDATE " + i[0] + " SET f_update='RESET' WHERE f_update='YES' " if verbose: print sql Lcr.execute(sql) Lcr.close() DBcon.commit() Mcr.close() MasterDBcon.commit() -- * * Rob Brown-Bayliss *
On Fri, 2002-07-26 at 00:30, Shanmugasundaram Doraisamy wrote: > Dear Rob, > Your posting was something simillar to what we were just going to > start. We have not yet formulated a way of doing it. We will keep you > posted if we hit upon any solution. Also please do let us know if it > worked and if you don't mind the final code that worked. Thanks in > advance, > > Yours sincerely, > Shan. Teh upload of updated rows workes, but I have only tested it on two machines over a lan, with 10 or so over a dial up I am expecting timing issues... The code is fiarly simple at the moment, and I shall be cleaning it up soon. But I am still a bit unsure of how it will all work in the field, and then with more than one slave I am unsure yet how to handle multiple updates to the same data, or to even check for them at the moment... basically I have a table in the database listing all tables I need to sync with the master db, A conection is opend to each database. I select * from eactable where f_new = True then take the results and insert them into the master like so (in python): def send_new(): "Hunts through the tables for new rows to sync with master" Lcr = DBcon.cursor() Mcr = MasterDBcon.cursor() sql = "SELECT * FROM syncro_tables" if verbose: print sql #ask master for tables to update. Mcr.execute(sql) tables = Mcr.fetchall() for i in tables: if log: syslog(LOG_INFO,"Scanning table " + i[0] + " for new data.") sql = "SELECT * from " sql = sql + i[0] sql = sql + " WHERE f_new='TRUE'" if verbose: print "Scanning table " + i[0] + " for new data." print sql Lcr.execute(sql) for j in range(Lcr.rowcount): result = Lcr.fetchone() sql = "INSERT INTO " + i[0] + " VALUES (" for k in range(len(result)): if result[k] == None: sql = sql + "NULL, " else: sql = sql + "'" + str(result[k]) + "', " sql = sql[:-2] sql = sql + ")" if verbose: print sql try: Mcr.execute(sql) except Error, Msg: msg = "SQL Statement was -> " msg = msg + sql if verbose: print Msg, msg if log: syslog(LOG_WARNING, "SQL Error on Master Database") syslog(LOG_WARNING, str(Msg)) syslog(LOG_WARNING, msg) MasterDBcon.commit() #reset new flag sql = "UPDATE " + i[0] + " SET f_update='RESET' WHERE f_new='FALSE'" if verbose: print sql Lcr.execute(sql) Lcr.close() DBcon.commit() Mcr.close() MasterDBcon.commit() def send_update(): "Hunts through the tables for updated rows to sync with master" Lcr = DBcon.cursor() Mcr = MasterDBcon.cursor() sql = "SELECT * FROM syncro_tables" if verbose: print sql #ask master for tables to update. Mcr.execute(sql) tables = Mcr.fetchall() for i in tables: if log: syslog(LOG_INFO,"Scanning table " + i[0] + " for updated data.") sql = "select attname from pg_class,pg_attribute where (pg_class.relname='" + i[0] +"') and (pg_class.oid=pg_attribute.attrelid) and (pg_attribute.attnum >= 0) order by attnum" Lcr.execute(sql) columns = Lcr.fetchall() if verbose: print "Columns in table " + i[0] +": " for l in range(len(columns)): print columns[l] sql = "SELECT * from " sql = sql + i[0] sql = sql + " WHERE f_update='YES' " if verbose: print "Scanning table " + i[0] + " for updated data." print sql Lcr.execute(sql) # build update and update master for j in range(Lcr.rowcount): result = Lcr.fetchone() sql = "UPDATE " + i[0] + " SET " # range from 5 as we dont want to update control fields for k in range(5,len(result)): cname = columns[k] sql = sql + cname[0] + " = " if result[k] == None: sql = sql + "NULL, " else: sql = sql + "'" + str(result[k]) + "', " sql = sql[:-2] sql = sql + " WHERE (sequence_key = '" + str(result[0]) + "') and (location_key = '" +str(result[1]) + "')" if verbose: print sql try: Mcr.execute(sql) except Error, Msg: msg = "SQL Statement was -> " msg = msg + sql if verbose: print Msg, msg if log: syslog(LOG_WARNING, "SQL Error on Master Database") syslog(LOG_WARNING, str(Msg)) syslog(LOG_WARNING, msg) MasterDBcon.commit() #reset new flag sql = "UPDATE " + i[0] + " SET f_update='RESET' WHERE f_update='YES' " if verbose: print sql Lcr.execute(sql) Lcr.close() DBcon.commit() Mcr.close() MasterDBcon.commit() -- * * Rob Brown-Bayliss *