Обсуждение: Rolling my own replication

Поиск
Список
Период
Сортировка

Rolling my own replication

От
Rob Brown-Bayliss
Дата:
Hello, I would like some thoughts on the folowing plan.  I plan to
replicate several remote databases with a master db.

The setup is in a small retail chain, so each store will have it's own
postgres db making changes during the day.  After hours it will didla up
head office twice.

The first time it will upload all it's new data, then upload any updated
data.  Then disconect allowing other stores ti dial up.

The second dial up, some several hours later it will then downloadall
new and changed data from the master db, their by being in sync with all
stores before sunrise (on a good day :-)

All replicated tables have the folowing columns:

CREATE TABLE "sale_lines" (
    "loc_seq_pkey" text DEFAULT set_primary_key() NOT NULL,
    "timestamp" timestamp DEFAULT 'now()',
    "version" int4 DEFAULT 0,
    "f_new" character varying,
    "f_update" character varying,
    PRIMARY KEY ("loc_seq_pkey")
);

The loc_seq_pkey is basically a sequence on the locla machine with the
store location id prepended. eg : 1-46 is location 1, 46th new row added
anywhere in the db.  This alows me to look at an incoming new row to be
added and say "No. thats mine, no need to add it" and then check to see
if it need updating instead.

There is a triger on all replicated tables:

CREATE FUNCTION "version_control" ( ) RETURNS opaque AS 'BEGIN
    IF TG_OP = ''UPDATE'' THEN
        IF NEW.f_update = ''RESET'' THEN
            NEW.f_update := ''NO'';
            NEW.f_new := ''FALSE'';
        ELSE
            NEW.f_update := ''YES'';
            NEW.version := OLD.version + 1;
        END IF;
        RETURN NEW;
    END IF;
    IF TG_OP = ''INSERT'' THEN
        NEW.f_new := ''TRUE'';
        RETURN NEW;
    END IF;
    IF TG_OP = ''DELETE'' THEN
        RETURN OLD;
    END IF;
END;
' LANGUAGE 'plpgsql';



I was thinking of useing teh version field for checking if the data has
been changed buy other sites before being sent back to the original
site.


Is this going to work or will it bog down and die?







--

*
*  Rob Brown-Bayliss
*

Re: Rolling my own replication

От
Rob Brown-Bayliss
Дата:
On Fri, 2002-07-26 at 00:30, Shanmugasundaram Doraisamy wrote:
> Dear Rob,
>       Your posting was something simillar to what we were just going to
> start.  We have not yet formulated a way of doing it.  We will keep you
> posted if we hit upon any solution. Also please do let us know if it
> worked and if you don't mind the final code that worked.  Thanks in
> advance,
>
> Yours sincerely,
> Shan.

Teh upload of updated rows workes, but I have only tested it on two
machines over a lan, with 10 or so over a dial up I am expecting timing
issues...

The code is fiarly simple at the moment, and I shall be cleaning it up
soon.  But I am still a bit unsure of how it will all work in the field,
and then with more than one slave I am unsure yet how to handle multiple
updates to the same data, or to even check for them at the moment...

basically I have a table in the database listing all tables I need to
sync with the master db,  A conection is opend to each database.

I select * from eactable where f_new = True then take the results and
insert them into the master like so (in python):

def send_new():
    "Hunts through the tables for new rows to sync with master"
    Lcr = DBcon.cursor()
    Mcr = MasterDBcon.cursor()
    sql = "SELECT * FROM syncro_tables"
    if verbose:
        print sql
    #ask master for tables to update.
    Mcr.execute(sql)
    tables = Mcr.fetchall()
    for i in tables:
        if log:
            syslog(LOG_INFO,"Scanning table " + i[0] + " for new data.")
        sql = "SELECT * from "
        sql = sql + i[0]
        sql = sql + " WHERE f_new='TRUE'"
        if verbose:
            print "Scanning table " + i[0] + " for new data."
            print sql
        Lcr.execute(sql)
        for j in range(Lcr.rowcount):
            result = Lcr.fetchone()
            sql = "INSERT INTO " + i[0] + " VALUES ("
            for k in range(len(result)):
                if result[k] == None:
                    sql = sql + "NULL, "
                else:
                    sql = sql + "'" + str(result[k]) + "', "
            sql = sql[:-2]
            sql = sql + ")"
            if verbose:
                print sql
            try:
                Mcr.execute(sql)
            except Error, Msg:
                msg = "SQL Statement was -> "
                msg = msg + sql
                if verbose:
                    print Msg, msg
                if log:
                    syslog(LOG_WARNING, "SQL Error on Master Database")
                    syslog(LOG_WARNING, str(Msg))
                    syslog(LOG_WARNING, msg)
            MasterDBcon.commit()
        #reset new flag
        sql = "UPDATE " + i[0] + " SET f_update='RESET' WHERE f_new='FALSE'"
        if verbose:
            print sql
        Lcr.execute(sql)
    Lcr.close()
    DBcon.commit()
    Mcr.close()
    MasterDBcon.commit()


def send_update():
    "Hunts through the tables for updated rows to sync with master"
    Lcr = DBcon.cursor()
    Mcr = MasterDBcon.cursor()
    sql = "SELECT * FROM syncro_tables"
    if verbose:
        print sql
    #ask master for tables to update.
    Mcr.execute(sql)
    tables = Mcr.fetchall()
    for i in tables:
        if log:
            syslog(LOG_INFO,"Scanning table " + i[0] + " for updated data.")
        sql = "select attname from pg_class,pg_attribute where
(pg_class.relname='" + i[0] +"') and
(pg_class.oid=pg_attribute.attrelid) and (pg_attribute.attnum >= 0)
order by attnum"
        Lcr.execute(sql)
        columns = Lcr.fetchall()
        if verbose:
            print "Columns in table " + i[0] +": "
            for l in range(len(columns)):
                print columns[l]
        sql = "SELECT * from "
        sql = sql + i[0]
        sql = sql + " WHERE f_update='YES' "
        if verbose:
            print "Scanning table " + i[0] + " for updated data."
            print sql
        Lcr.execute(sql)
        # build update and update master
        for j in range(Lcr.rowcount):
            result = Lcr.fetchone()
            sql = "UPDATE " + i[0] + " SET "
            # range from 5 as we dont want to update control fields
            for k in range(5,len(result)):
                cname = columns[k]
                sql = sql + cname[0] + " = "
                if result[k] == None:
                    sql = sql + "NULL, "
                else:
                    sql = sql + "'" + str(result[k]) + "', "
            sql = sql[:-2]
            sql = sql + " WHERE (sequence_key = '" + str(result[0]) + "') and
(location_key = '" +str(result[1]) + "')"
            if verbose:
                print sql
            try:
                Mcr.execute(sql)
            except Error, Msg:
                msg = "SQL Statement was -> "
                msg = msg + sql
                if verbose:
                    print Msg, msg
                if log:
                    syslog(LOG_WARNING, "SQL Error on Master Database")
                    syslog(LOG_WARNING, str(Msg))
                    syslog(LOG_WARNING, msg)
            MasterDBcon.commit()
        #reset new flag
        sql = "UPDATE " + i[0] + " SET f_update='RESET' WHERE f_update='YES' "
        if verbose:
            print sql
        Lcr.execute(sql)
    Lcr.close()
    DBcon.commit()
    Mcr.close()
    MasterDBcon.commit()




--

*
*  Rob Brown-Bayliss
*

Re: Rolling my own replication

От
Rob Brown-Bayliss
Дата:
On Fri, 2002-07-26 at 00:30, Shanmugasundaram Doraisamy wrote:
> Dear Rob,
>       Your posting was something simillar to what we were just going to
> start.  We have not yet formulated a way of doing it.  We will keep you
> posted if we hit upon any solution. Also please do let us know if it
> worked and if you don't mind the final code that worked.  Thanks in
> advance,
>
> Yours sincerely,
> Shan.

Teh upload of updated rows workes, but I have only tested it on two
machines over a lan, with 10 or so over a dial up I am expecting timing
issues...

The code is fiarly simple at the moment, and I shall be cleaning it up
soon.  But I am still a bit unsure of how it will all work in the field,
and then with more than one slave I am unsure yet how to handle multiple
updates to the same data, or to even check for them at the moment...

basically I have a table in the database listing all tables I need to
sync with the master db,  A conection is opend to each database.

I select * from eactable where f_new = True then take the results and
insert them into the master like so (in python):

def send_new():
    "Hunts through the tables for new rows to sync with master"
    Lcr = DBcon.cursor()
    Mcr = MasterDBcon.cursor()
    sql = "SELECT * FROM syncro_tables"
    if verbose:
        print sql
    #ask master for tables to update.
    Mcr.execute(sql)
    tables = Mcr.fetchall()
    for i in tables:
        if log:
            syslog(LOG_INFO,"Scanning table " + i[0] + " for new data.")
        sql = "SELECT * from "
        sql = sql + i[0]
        sql = sql + " WHERE f_new='TRUE'"
        if verbose:
            print "Scanning table " + i[0] + " for new data."
            print sql
        Lcr.execute(sql)
        for j in range(Lcr.rowcount):
            result = Lcr.fetchone()
            sql = "INSERT INTO " + i[0] + " VALUES ("
            for k in range(len(result)):
                if result[k] == None:
                    sql = sql + "NULL, "
                else:
                    sql = sql + "'" + str(result[k]) + "', "
            sql = sql[:-2]
            sql = sql + ")"
            if verbose:
                print sql
            try:
                Mcr.execute(sql)
            except Error, Msg:
                msg = "SQL Statement was -> "
                msg = msg + sql
                if verbose:
                    print Msg, msg
                if log:
                    syslog(LOG_WARNING, "SQL Error on Master Database")
                    syslog(LOG_WARNING, str(Msg))
                    syslog(LOG_WARNING, msg)
            MasterDBcon.commit()
        #reset new flag
        sql = "UPDATE " + i[0] + " SET f_update='RESET' WHERE f_new='FALSE'"
        if verbose:
            print sql
        Lcr.execute(sql)
    Lcr.close()
    DBcon.commit()
    Mcr.close()
    MasterDBcon.commit()


def send_update():
    "Hunts through the tables for updated rows to sync with master"
    Lcr = DBcon.cursor()
    Mcr = MasterDBcon.cursor()
    sql = "SELECT * FROM syncro_tables"
    if verbose:
        print sql
    #ask master for tables to update.
    Mcr.execute(sql)
    tables = Mcr.fetchall()
    for i in tables:
        if log:
            syslog(LOG_INFO,"Scanning table " + i[0] + " for updated data.")
        sql = "select attname from pg_class,pg_attribute where
(pg_class.relname='" + i[0] +"') and
(pg_class.oid=pg_attribute.attrelid) and (pg_attribute.attnum >= 0)
order by attnum"
        Lcr.execute(sql)
        columns = Lcr.fetchall()
        if verbose:
            print "Columns in table " + i[0] +": "
            for l in range(len(columns)):
                print columns[l]
        sql = "SELECT * from "
        sql = sql + i[0]
        sql = sql + " WHERE f_update='YES' "
        if verbose:
            print "Scanning table " + i[0] + " for updated data."
            print sql
        Lcr.execute(sql)
        # build update and update master
        for j in range(Lcr.rowcount):
            result = Lcr.fetchone()
            sql = "UPDATE " + i[0] + " SET "
            # range from 5 as we dont want to update control fields
            for k in range(5,len(result)):
                cname = columns[k]
                sql = sql + cname[0] + " = "
                if result[k] == None:
                    sql = sql + "NULL, "
                else:
                    sql = sql + "'" + str(result[k]) + "', "
            sql = sql[:-2]
            sql = sql + " WHERE (sequence_key = '" + str(result[0]) + "') and
(location_key = '" +str(result[1]) + "')"
            if verbose:
                print sql
            try:
                Mcr.execute(sql)
            except Error, Msg:
                msg = "SQL Statement was -> "
                msg = msg + sql
                if verbose:
                    print Msg, msg
                if log:
                    syslog(LOG_WARNING, "SQL Error on Master Database")
                    syslog(LOG_WARNING, str(Msg))
                    syslog(LOG_WARNING, msg)
            MasterDBcon.commit()
        #reset new flag
        sql = "UPDATE " + i[0] + " SET f_update='RESET' WHERE f_update='YES' "
        if verbose:
            print sql
        Lcr.execute(sql)
    Lcr.close()
    DBcon.commit()
    Mcr.close()
    MasterDBcon.commit()




--

*
*  Rob Brown-Bayliss
*