-
Notifications
You must be signed in to change notification settings - Fork 135
Produce updates to kafka #1443
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Produce updates to kafka #1443
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -7,3 +7,4 @@ | |
| docs/sphinx/_build/ | ||
| nipap-www/nipap_www.egg-info | ||
| nipap/nipap.egg-info | ||
| /.idea/ | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,5 +1,5 @@ | ||
| __version__ = "0.32.7" | ||
| __db_version__ = 7 | ||
| __db_version__ = 8 | ||
| __author__ = "Kristian Larsson, Lukas Garberg" | ||
| __author_email__ = "[email protected], [email protected]" | ||
| __copyright__ = "Copyright 2011-2014, Kristian Larsson, Lukas Garberg" | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -529,7 +529,17 @@ | |
| RETURN (part_one::bigint << 32) + part_two::bigint; | ||
| END; | ||
| $_$ LANGUAGE plpgsql IMMUTABLE STRICT; | ||
| """ | ||
| CREATE OR REPLACE FUNCTION tf_kafka_produce_event() RETURNS trigger AS $$ | ||
| BEGIN | ||
| IF TG_OP = 'DELETE' THEN | ||
| INSERT INTO kafka_produce_event (table_name, event_type, payload) VALUES (TG_TABLE_NAME, TG_OP, row_to_json(OLD)::jsonb); | ||
| ELSIF OLD IS DISTINCT FROM NEW THEN | ||
| INSERT INTO kafka_produce_event (table_name, event_type, payload) VALUES (TG_TABLE_NAME, TG_OP, row_to_json(NEW)::jsonb); | ||
| END IF; | ||
| RETURN NEW; | ||
| END; | ||
| $$ LANGUAGE plpgsql;""" | ||
|
|
||
| ip_net = """ | ||
| -------------------------------------------- | ||
|
|
@@ -538,7 +548,7 @@ | |
| -- | ||
| -------------------------------------------- | ||
| COMMENT ON DATABASE %s IS 'NIPAP database - schema version: 7'; | ||
| COMMENT ON DATABASE %s IS 'NIPAP database - schema version: 8'; | ||
| CREATE EXTENSION IF NOT EXISTS ip4r; | ||
| CREATE EXTENSION IF NOT EXISTS hstore; | ||
|
|
@@ -790,7 +800,21 @@ | |
| CREATE INDEX ip_net_log__prefix__index ON ip_net_log(prefix_id); | ||
| CREATE INDEX ip_net_log__pool__index ON ip_net_log(pool_id); | ||
| """ | ||
| -- | ||
| -- Kafka event table and triggers | ||
| -- | ||
| -- This table is used as a queue for the external kafka_producer process. | ||
| -- Triggers on the core tables insert events here. The daemon will enable or | ||
| -- disable these triggers at startup depending on configuration. | ||
| -- | ||
| CREATE TABLE IF NOT EXISTS kafka_produce_event ( | ||
| id SERIAL PRIMARY KEY, | ||
| table_name TEXT NOT NULL, | ||
| event_type TEXT NOT NULL, | ||
| payload JSONB, | ||
| processed BOOLEAN DEFAULT FALSE, | ||
| created_at TIMESTAMP WITH TIME ZONE DEFAULT now() | ||
| );""" | ||
|
|
||
| triggers = """ | ||
| -- | ||
|
|
@@ -1768,7 +1792,25 @@ | |
| WHEN (OLD.ipv4_default_prefix_length IS DISTINCT FROM NEW.ipv4_default_prefix_length | ||
| OR OLD.ipv6_default_prefix_length IS DISTINCT FROM NEW.ipv6_default_prefix_length) | ||
| EXECUTE PROCEDURE tf_ip_net_pool__iu_before(); | ||
| """ | ||
| -- Triggers that write to kafka_produce_event | ||
| CREATE TRIGGER trigger_kafka_ip_net_plan | ||
| AFTER INSERT OR UPDATE OR DELETE | ||
| ON ip_net_plan | ||
| FOR EACH ROW | ||
| EXECUTE PROCEDURE tf_kafka_produce_event(); | ||
| CREATE TRIGGER trigger_kafka_ip_net_vrf | ||
| AFTER INSERT OR UPDATE OR DELETE | ||
| ON ip_net_vrf | ||
| FOR EACH ROW | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. indentation contains tabs |
||
| EXECUTE PROCEDURE tf_kafka_produce_event(); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. indentation contains tabs |
||
| CREATE TRIGGER trigger_kafka_ip_net_pool | ||
| AFTER INSERT OR UPDATE OR DELETE | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. indentation contains tabs |
||
| ON ip_net_pool | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. indentation contains tabs |
||
| FOR EACH ROW | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. indentation contains tabs |
||
| EXECUTE PROCEDURE tf_kafka_produce_event();""" | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. indentation contains tabs |
||
|
|
||
| upgrade = [ | ||
| """ | ||
|
|
@@ -2272,4 +2314,59 @@ | |
| -- update database schema version | ||
| COMMENT ON DATABASE %s IS 'NIPAP database - schema version: 7'; | ||
| """, | ||
| """ | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. continuation line missing indentation or outdented |
||
| -- | ||
| -- Upgrade from NIPAP database schema version 7 to 8 | ||
| -- | ||
| -- | ||
| -- Kafka event table and triggers | ||
| -- | ||
| -- This table is used as a queue for the external kafka_producer process. | ||
| -- Triggers on the core tables insert events here. The daemon will enable or | ||
| -- disable these triggers at startup depending on configuration. | ||
| -- | ||
| CREATE TABLE IF NOT EXISTS kafka_produce_event ( | ||
| id SERIAL PRIMARY KEY, | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. indentation contains tabs |
||
| table_name TEXT NOT NULL, | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. indentation contains tabs |
||
| event_type TEXT NOT NULL, | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. indentation contains tabs |
||
| payload JSONB, | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. indentation contains tabs |
||
| processed BOOLEAN DEFAULT FALSE, | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. indentation contains tabs |
||
| created_at TIMESTAMP WITH TIME ZONE DEFAULT now() | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. indentation contains tabs |
||
| ); | ||
| CREATE OR REPLACE FUNCTION tf_kafka_produce_event() RETURNS trigger AS $$ | ||
| BEGIN | ||
| IF TG_OP = 'DELETE' THEN | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. indentation contains tabs |
||
| INSERT INTO kafka_produce_event (table_name, event_type, payload) VALUES (TG_TABLE_NAME, TG_OP, row_to_json(OLD)::jsonb); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. indentation contains tabs |
||
| ELSIF OLD IS DISTINCT FROM NEW THEN | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. indentation contains tabs |
||
| INSERT INTO kafka_produce_event (table_name, event_type, payload) VALUES (TG_TABLE_NAME, TG_OP, row_to_json(NEW)::jsonb); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. indentation contains tabs |
||
| END IF; | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. indentation contains tabs |
||
| RETURN NEW; | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. indentation contains tabs |
||
| END; | ||
| $$ LANGUAGE plpgsql; | ||
| -- Triggers that write to kafka_produce_event | ||
| CREATE TRIGGER trigger_kafka_ip_net_plan | ||
| AFTER INSERT OR UPDATE OR DELETE | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. indentation contains tabs |
||
| ON ip_net_plan | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. indentation contains tabs |
||
| FOR EACH ROW | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. indentation contains tabs |
||
| EXECUTE PROCEDURE tf_kafka_produce_event(); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. indentation contains tabs |
||
| CREATE TRIGGER trigger_kafka_ip_net_vrf | ||
| AFTER INSERT OR UPDATE OR DELETE | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. indentation contains tabs |
||
| ON ip_net_vrf | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. indentation contains tabs |
||
| FOR EACH ROW | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. indentation contains tabs |
||
| EXECUTE PROCEDURE tf_kafka_produce_event(); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. indentation contains tabs |
||
| CREATE TRIGGER trigger_kafka_ip_net_pool | ||
| AFTER INSERT OR UPDATE OR DELETE | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. indentation contains tabs |
||
| ON ip_net_pool | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. indentation contains tabs |
||
| FOR EACH ROW | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. indentation contains tabs |
||
| EXECUTE PROCEDURE tf_kafka_produce_event(); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. indentation contains tabs |
||
| -- update database schema version | ||
| COMMENT ON DATABASE %s IS 'NIPAP database - schema version: 8'; | ||
| """, | ||
| ] | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tab before operator