PostgreSQL
PostgreSQL is a powerful open-source object-relational database management system (ORDBMS)
This document will introduce how to connect PostgreSQL as a data source in the ClickPipes platform
Supported Versions and Architectures
- Versions: PostgreSQL 9.x to 16.x
- Architectures: Stand-alone or Replication architectures
Supported Data Types
Category | Data Types |
---|---|
Strings & Text | character, character varying, text |
Numeric | integer, bigint, smallint, numeric, real, double precision |
Binary | bytea |
Bit | bit, bit varying |
Boolean | boolean |
Date & Time | timestamp without time zone, timestamp with time zone, date, time without time zone, time with time zone, interval |
Spatial Data | geometry, point, polygon, circle, path, box, line, lseg |
Network Types | inet, cidr, macaddr |
Identifier | uuid, oid, regproc, regprocedure, regoper, regoperator, regclass, regtype, regconfig, regdictionary |
Text Search | tsvector, tsquery |
Others | xml, json, array |
Limitations
- To capture incremental events for partitioned parent tables, PostgreSQL version 13 or above must be used, and the pgoutput plugin must be selected.
Considerations
- When using log-based plugins that rely on replication slots (e.g., wal2json), too many shared mining processes may cause WAL log accumulation, increasing disk pressure. It's recommended to reduce the number of mining processes or promptly delete unnecessary CDC tasks and replication slots.
Preparation
Log in to the PostgreSQL database as an administrator.
Create a user and grant permissions.
Execute the following command format to create an account for data synchronization/development tasks.
CREATE USER username WITH PASSWORD 'password';
- username: Username.
- password: Password.
Execute the following command format to grant account permissions.
- Read Full Data Only
- Read Full and Incremental Data
-- Switch to the database to be authorized
\c database_name
-- Grant table read permission for the target schema
GRANT SELECT ON ALL TABLES IN SCHEMA schema_name TO username;
-- Grant USAGE permission to schema
GRANT USAGE ON SCHEMA schema_name TO username;-- Switch to the database to be authorized
\c database_name
-- Grant table read permission for the target schema
GRANT SELECT ON ALL TABLES IN SCHEMA schema_name TO username;
-- Grant USAGE permission to schema
GRANT USAGE ON SCHEMA schema_name TO username;
-- Grant replication permission
ALTER USER username REPLICATION;- database_name: Database name.
- schema_name: Schema name.
- username: Username.
- Execute the following command format to modify the replica identity to FULL (using the entire row as the identifier), which determines the fields recorded in the log when data undergoes UPDATE/DELETE.tip
If you only need to read full data from PostgreSQL, you can skip this and the subsequent steps.
ALTER TABLE 'schema_name'.'table_name' REPLICA IDENTITY FULL;
- schema_name: Schema name.
- table_name: Table name.
Log in to the server hosting PostgreSQL, and choose the decoder plugin to install based on business needs and version:
Wal2json (Recommended): Suitable for PostgreSQL 9.4 and above, converts WAL logs to JSON format, simple to use, but requires source tables to have primary keys; otherwise, delete operations cannot be synchronized.
Pgoutput: An internal logical replication protocol introduced in PostgreSQL 10, no additional installation needed. For tables with primary keys and
replica identity
set todefault
, thebefore
in update events will be empty, which can be solved by settingreplica identity full
. Additionally, if you do not have database-level CREATE permissions, you need to run the following commands to create the required PUBLICATION:CREATE PUBLICATION dbz_publication_root FOR ALL TABLES WITH (PUBLISH_VIA_PARTITION_ROOT = TRUE);
CREATE PUBLICATION dbz_publication FOR ALL TABLES;Decoderbufs: Suitable for PostgreSQL 9.6 and above, uses Google Protocol Buffers to parse WAL logs but requires more complex configuration.
Walminer: Does not rely on logical replication, doesn't require setting
wal_level
tological
, or adjusting replication slot configuration, but requires superuser permissions.
Next, we will demonstrate the installation process using Wal2json as an example.
tipIn this example, PostgreSQL version 12 is installed on CentOS 7. If your environment differs, you will need to adjust the installation steps for development package versions, environment variable paths, etc.
Add the repository package.
yum install https://download.postgresql.org/pub/repos/yum/reporpms/EL-7-x86_64/pgdg-redhat-repo-latest.noarch.rpm
Install the PostgreSQL 12 development package.
yum install -y postgresql12-devel
Set environment variables and activate them.
export PATH=$PATH:/usr/pgsql-12/bin
source /etc/profileInstall environment dependencies, including llvm, clang, gcc, etc.
yum install -y devtoolset-7-llvm centos-release-scl devtoolset-7-gcc* llvm5.0
Execute the following commands in sequence to complete the installation of the plugin.
# Clone and enter the directory
git clone https://github.com/eulerto/wal2json.git && cd wal2json
# Enter the scl's devtoolset environment
scl enable devtoolset-7 bash
# Compile and install
make && make installModify the
postgresql.conf
configuration file and set thewal_level
value tological
.tipIf PostgreSQL version is 9.4, 9.5, or 9.6, also increase the values of
max_replication_slots
andmax_wal_senders
(e.g., set to 10).Modify the
pg_hba.conf
configuration file, adding the following content to ensure clickpipes can access the database.# Replace username with the actual username
local replication username trust
host replication username 0.0.0.0/32 md5
host replication username ::1/128 trustRestart the PostgreSQL service during a low business peak period.
service postgresql-12.service restart
(Optional) Test the log plugin.
Connect to the postgres database, switch to the database to be synchronized, and create a test table.
-- Suppose the database to be synchronized is demodata, and the schema is public
\c demodata
CREATE TABLE public.test_decode
(
uid integer not null
constraint users_pk
primary key,
name varchar(50),
age integer,
score decimal
);Create a Slot connection, using the wal2json plugin as an example.
SELECT * FROM pg_create_logical_replication_slot('slot_test', 'wal2json');
Insert a record into the test table.
INSERT INTO public.test_decode (uid, name, age, score)
VALUES (1, 'Jack', 18, 89);Listen to the log and check if there is information about the insert operation.
SELECT * FROM pg_logical_slot_peek_changes('slot_test', null, null);
Example return (displayed vertically):
lsn | 0/3E38E60
xid | 610
data | {"change":[{"kind":"insert","schema":"public","table":"test_decode","columnnames":["uid","name","age","score"],"columntypes":["integer","character varying(50)","integer","numeric"],"columnvalues":[1,"Jack",18,89]}]}If there are no issues, delete the Slot connection and the test table.
SELECT * FROM pg_drop_replication_slot('slot_test');
DROP TABLE public.test_decode;(Optional) If you need to use the last update timestamp for incremental synchronization, perform the following steps.
Execute the following command in the source database to create a common function. Replace the schema name as needed.
CREATE OR REPLACE FUNCTION schema_name.update_lastmodified_column()
RETURNS TRIGGER LANGUAGE plpgsql AS $$
BEGIN
NEW.last_update = now();
RETURN NEW;
END;
$$;Create fields and triggers; this must be done for each table, for example, a table named mytable.
// Create the last_update field
ALTER TABLE schema_name.mytable ADD COLUMN last_update timestamp DEFAULT now();
// Create a trigger
CREATE TRIGGER trg_uptime BEFORE UPDATE ON schema_name.mytable FOR EACH ROW EXECUTE PROCEDURE
update_lastmodified_column();
Enable SSL Connection (Optional)
To further enhance the security of the data pipeline, you can enable SSL (Secure Sockets Layer) encryption for PostgreSQL, providing encrypted network connections at the transport layer. This improves communication data security while ensuring data integrity.
Log in to the device hosting the PostgreSQL database and execute the following commands to create a self-signed certificate.
# Generate root certificate private key (pem file)
openssl genrsa -out ca.key 2048
# Generate root certificate signing request file (csr file)
openssl req -new -key ca.key -out ca.csr -subj "/C=CN/ST=myprovince/L=mycity/O=myorganization/OU=mygroup/CN=myCA"
# Create a self-signed root certificate, valid for one year:
openssl x509 -req -days 365 -extensions v3_ca -signkey ca.key -in ca.csr -out ca.crtExecute the following commands in sequence to generate server private key and certificate.
# Generate server private key
openssl genrsa -out server.key 2048
# Generate server certificate request file
openssl req -new -key server.key -out server.csr -subj "/C=CN/ST=myprovince/L=mycity/O=myorganization/OU=mygroup/CN=myServer"
# Use self-signed CA certificate to issue server certificate, valid for one year
openssl x509 -req -days 365 -extensions v3_req -CA ca.crt -CAkey ca.key -CAcreateserial -in server.csr -out server.crt(Optional) Execute
openssl verify -CAfile ca.crt server.crt
to verify whether the server certificate is correctly signed.Execute the following commands in sequence to generate client private key and certificate.
# Generate client private key
openssl genrsa -out client.key 2048
# Generate certificate request file, for user1 (full authentication should focus on users)
openssl req -new -key client.key -out client.csr -subj "/C=CN/ST=myprovince/L=mycity/O=myorganization/OU=mygroup/CN=user1"
# Use root certificate to issue client certificate
openssl x509 -req -days 365 -extensions v3_req -CA ca.crt -CAkey ca.key -CAcreateserial -in client.csr -out client.crt(Optional) Execute
openssl verify -CAfile ca.crt client.crt
to verify whether the client certificate is correctly signed.Modify the following PostgreSQL configuration files to enable SSL and specify the relevant certificate/key files.
- postgresql.conf
- pg_hba.conf
ssl = on
ssl_ca_file = 'ca.crt'
ssl_cert_file = 'server.crt'
ssl_crl_file = ''
ssl_key_file = 'server.key'
ssl_ciphers = 'HIGH:MEDIUM:+3DES:!aNULL' # allowed SSL ciphers
ssl_prefer_server_ciphers = onhostssl all all all trust clientcert=verify-ca
Source Config
- Connection Settings
- Name: Enter a unique name that has business significance.
- Type: Supports using PostgreSQL as a source or target database.
- Host: Database connection IP address or hostname.
- Port: Database service port.
- Database: The name of the database, i.e., one connection corresponds to one database. If there are multiple databases, multiple data connections need to be created.
- Schema: Schema name.
- User: Database username.
- Password: Password corresponding to the database username.
- Log Plugin Name: To read data changes from PostgreSQL and achieve incremental data synchronization, you need to follow the guidance in the Preparation section to select and install the appropriate plugin.
- SSL Settings: Choose whether to enable SSL to connect to the data source, which can further enhance data security. After enabling this function, you need to upload CA files, client certificates, and fill in the client password.