The easiest way to move and transform data between PostgreSQL databases using Logical Replication.
βΉοΈ pg_flo
is in active development. The design and architecture is continuously improving. PRs/Issues are very much welcome π
- Real-time Data Streaming - Capture inserts, updates, deletes, and DDL changes in near real-time
- Fast Initial Loads - Parallel copy of existing data with automatic follow-up continuous replication
- Powerful Transformations - Filter and transform data on-the-fly (see rules)
- Flexible Routing - Route to different tables and remap columns (see routing)
- Production Ready - Supports resumable streaming, DDL tracking, and more
- Real-time data replication between PostgreSQL databases
- ETL pipelines with data transformation
- Data re-routing, masking and filtering
- Database migration with zero downtime
- Event streaming from PostgreSQL
- Docker
- PostgreSQL database with
wal_level=logical
docker pull shayonj/pg_flo:latest
Choose one:
- Environment variables
- YAML configuration file (example)
- CLI flags
# Start NATS server
docker run -d --name pg_flo_nats \
--network host \
-v /path/to/nats-server.conf:/etc/nats/nats-server.conf \
nats:latest \
-c /etc/nats/nats-server.conf
# Start replicator (using config file)
docker run -d --name pg_flo_replicator \
--network host \
-v /path/to/config.yaml:/etc/pg_flo/config.yaml \
shayonj/pg_flo:latest \
replicator --config /etc/pg_flo/config.yaml
# Start worker
docker run -d --name pg_flo_worker \
--network host \
-v /path/to/config.yaml:/etc/pg_flo/config.yaml \
shayonj/pg_flo:latest \
worker postgres --config /etc/pg_flo/config.yaml
# Replicator settings
host: "localhost"
port: 5432
dbname: "myapp"
user: "replicator"
password: "secret"
group: "users"
tables:
- "users"
# Worker settings (postgres sink)
target-host: "dest-db"
target-dbname: "myapp"
target-user: "writer"
target-password: "secret"
# Common settings
nats-url: "nats://localhost:4222"
View full configuration options β
pg_flo uses two main components:
- Replicator: Captures PostgreSQL changes via logical replication
- Worker: Processes and routes changes through NATS
Groups are used to:
- Identify replication processes
- Isolate replication slots and publications
- Run multiple instances on same database
- Maintain state for resumability
- Enable parallel processing
# Example: Separate groups for different tables
pg_flo replicator --group users_orders --tables users,orders
pg_flo replicator --group products --tables products
- Stream Only (default)
- Real-time streaming of changes
pg_flo replicator --stream
- Copy Only
- One-time parallel copy of existing data
pg_flo replicator --copy --max-copy-workers-per-table 4
- Copy and Stream
- Initial parallel copy followed by continuous streaming
pg_flo replicator --copy-and-stream --max-copy-workers-per-table 4
- stdout: Console output
- file: File writing
- postgres: Database replication
- webhook: HTTP endpoints
Routing configuration is defined in a separate YAML file:
# routing.yaml
users:
source_table: users
destination_table: customers
column_mappings:
- source: id
destination: customer_id
# Apply routing configuration
pg_flo worker postgres --routing-config /path/to/routing.yaml
Rules are defined in a separate YAML file:
# rules.yaml
users:
- type: exclude_columns
columns: [password, ssn]
- type: mask_columns
columns: [email]
# Apply transformation rules
pg_flo worker file --rules-config /path/to/rules.yaml
View transformation options β
pg_flo worker postgres --config /etc/pg_flo/config.yaml --routing-config routing.yaml --rules-config rules.yaml
Best practices:
- Run one worker per group
- Use groups to replicate different tables independently
- Scale horizontally using multiple groups
Example scaling setup:
# Group: sales
pg_flo replicator --group sales --tables sales
pg_flo worker postgres --group sales
# Group: inventory
pg_flo replicator --group inventory --tables inventory
pg_flo worker postgres --group inventory
- NATS message size: 8MB (configurable)
- One worker per group recommended
- PostgreSQL logical replication prerequisites required
- Tables must have one of the following for replication:
- Primary key
- Unique constraint with
NOT NULL
columns REPLICA IDENTITY FULL
set
Example table configurations:
-- Using primary key (recommended)
CREATE TABLE users (
id SERIAL PRIMARY KEY,
email TEXT,
name TEXT
);
-- Using unique constraint
CREATE TABLE orders (
order_id TEXT NOT NULL,
customer_id TEXT NOT NULL,
data JSONB,
CONSTRAINT orders_unique UNIQUE (order_id, customer_id)
);
ALTER TABLE orders REPLICA IDENTITY USING INDEX orders_unique;
-- Using all columns (higher overhead in terms of performance)
CREATE TABLE audit_logs (
id SERIAL,
action TEXT,
data JSONB
);
ALTER TABLE audit_logs REPLICA IDENTITY FULL;
make build
make test
make lint
# E2E tests
./internal/scripts/e2e_local.sh
Contributions welcome! Please open an issue or submit a pull request.
Apache License 2.0. View license β