Skip to content

Add postgresql storage backend#667

Open
fsvm88 wants to merge 19 commits intodomainaware:masterfrom
fsvm88:feature/postgresql-backend
Open

Add postgresql storage backend#667
fsvm88 wants to merge 19 commits intodomainaware:masterfrom
fsvm88:feature/postgresql-backend

Conversation

@fsvm88
Copy link
Copy Markdown

@fsvm88 fsvm88 commented Mar 7, 2026

Hi, I've been using the project for a while, and in my 15 years of IT career (systems/cloud engineer), I always struggled with ElasticSearch.
I feel it's clunky and heavy, I don't understand the architecture, and every attempt at learning it pushed me away from it (pretty much like Kafka). It takes ~5m to start a single-node cluster on fast HDDs (so I have to work it around by making the parsedmarc container 5m before attempting to connect), and I always have to search the right command-line incantations and env variables to make it actually work. I'm unable to perform basic or periodic maintenance because I just can't understand it (and the curl-interface doesn't really help).

I recently ran into this article and it strongly resonated, so I decided to give it a shot and add a Postgres backend to this project as an alternative to ElasticSearch. Disclaimer: it's 99% LLM.

Claude Opus 4.6 basically just oneshotted the python code at the first try. I built a test docker image and ran it for the past week with zero issues so far (I have not received forensic reports or SMTP TLS reports though).

The Grafana dashboard went through a couple iterations to refine the queries, and it also works well for now (again, no forensic reports or SMTP TLS reports to show).

Postgres is much easier and transparent to manage, it's SQL, doesn't need customized command-line, I can perform maintenance and understand what's happening, and it's also very efficient:

POD CONTAINER CPU RAM
parsedmarc-5b56455f78-9tl6m elasticsearch 17m 1750Mi
parsedmarc-5b56455f78-9tl6m parsedmarc 1m 146Mi
parsedmarc-5b56455f78-9tl6m postgres 1m 62Mi

I hope it's a useful addition to the project. Let me know if you need me to change or fix anything :)

- Add parsedmarc/postgres.py with PostgreSQLClient class:
  - __init__: accepts connection_string or individual host/port/user/password/database params
  - create_tables(): idempotent DDL for all 10 normalized tables
    (dmarc_aggregate_report, dmarc_aggregate_record + dkim/spf/policy_override,
     dmarc_forensic_report + sample_address,
     smtp_tls_report + smtp_tls_policy + smtp_tls_failure_detail)
  - save_aggregate_report_to_postgresql()
  - save_forensic_report_to_postgresql()
  - save_smtp_tls_report_to_postgresql()
  - PostgreSQLError and AlreadySaved exception classes
  - Duplicate detection via INSERT ... ON CONFLICT DO NOTHING

- Wire CLI (parsedmarc/cli.py):
  - Add postgresql_* opts to Namespace defaults
  - Parse [postgresql] INI config section
  - Initialize PostgreSQLClient and call create_tables() at startup
  - Call save_*_to_postgresql() in process_reports() for all three report types

- Add psycopg[binary]>=3.1.0 to pyproject.toml dependencies

- Document [postgresql] config section in docs/source/usage.md
Copilot AI review requested due to automatic review settings March 7, 2026 23:31
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR adds a PostgreSQL storage backend to parsedmarc as an alternative to Elasticsearch/OpenSearch. It includes the database client module, CLI integration, documentation, and a Grafana dashboard for PostgreSQL.

Changes:

  • New parsedmarc/postgres.py module: PostgreSQLClient class with methods to save aggregate, forensic, and SMTP TLS reports to PostgreSQL with auto-created schema
  • parsedmarc/cli.py: Wires the new PostgreSQL client into the existing report-processing pipeline and config file parsing
  • docs/source/usage.md: Documents the new [postgresql] config section
  • grafana/Grafana-DMARC_Reports-PostgreSQL.json: New Grafana dashboard targeting the PostgreSQL schema
  • pyproject.toml: Adds psycopg[binary]>=3.1.0 as a mandatory dependency

Reviewed changes

Copilot reviewed 5 out of 5 changed files in this pull request and generated 7 comments.

Show a summary per file
File Description
parsedmarc/postgres.py New client module with DDL and INSERT logic for all three report types
parsedmarc/cli.py Integration of PostgreSQL client into the CLI config and report dispatch
docs/source/usage.md Documentation for the new [postgresql] config section
grafana/Grafana-DMARC_Reports-PostgreSQL.json New Grafana dashboard for the PostgreSQL schema
pyproject.toml Adds psycopg[binary] as a required dependency

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

You can also share your feedback on Copilot code review. Take the survey.

Comment thread parsedmarc/postgres.py Outdated
Comment thread parsedmarc/postgres.py Outdated
Comment on lines +601 to +610
policy.get("domain"),
policy.get("policy", {}).get("policy_type"),
policy.get("policy", {}).get("policy_string") or [],
policy.get("policy", {}).get("mx_host") or [],
policy.get("summary", {}).get(
"total_successful_session_count"
),
policy.get("summary", {}).get(
"total_failure_session_count"
),
Copy link

Copilot AI Mar 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The field mapping for the SMTP TLS policy insert is incorrect. The parsed policy dictionary (produced by _parse_smtp_tls_report_policy() in parsedmarc/__init__.py) uses flat keys, not the nested raw JSON structure. Specifically:

  • policy.get("domain") should be policy.get("policy_domain")
  • policy.get("policy", {}).get("policy_type") should be policy.get("policy_type")
  • policy.get("policy", {}).get("policy_string") or [] should be policy.get("policy_strings") or []
  • policy.get("policy", {}).get("mx_host") or [] should be policy.get("mx_host_patterns") or []
  • policy.get("summary", {}).get("total_successful_session_count") should be policy.get("successful_session_count")
  • policy.get("summary", {}).get("total_failure_session_count") should be policy.get("failed_session_count")

All these fields will be stored as NULL because the keys don't exist in the parsed dict.

Suggested change
policy.get("domain"),
policy.get("policy", {}).get("policy_type"),
policy.get("policy", {}).get("policy_string") or [],
policy.get("policy", {}).get("mx_host") or [],
policy.get("summary", {}).get(
"total_successful_session_count"
),
policy.get("summary", {}).get(
"total_failure_session_count"
),
policy.get("policy_domain"),
policy.get("policy_type"),
policy.get("policy_strings") or [],
policy.get("mx_host_patterns") or [],
policy.get("successful_session_count"),
policy.get("failed_session_count"),

Copilot uses AI. Check for mistakes.
Comment thread parsedmarc/postgres.py Outdated
receiving_mx_hostname TEXT,
receiving_mx_helo TEXT,
additional_info_uri TEXT,
failure_reason_code TEXT,
Copy link

Copilot AI Mar 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ip_address column in the smtp_tls_failure_detail table definition has no corresponding field in the parsed SMTP TLS failure detail dictionaries. The parser in parsedmarc/__init__.py only produces sending_mta_ip and receiving_ip fields for failure details — there is no ip_address field. As a result, detail.get("ip_address") will always return None and the column will always be empty. This column should either be removed from the schema and INSERT statement, or documented to explain where the value comes from.

Suggested change
failure_reason_code TEXT,
failure_reason_code TEXT,
-- NOTE: ip_address is currently not populated by the parser.
-- It is retained for backward/forward compatibility and will
-- typically remain NULL; use sending_mta_ip/receiving_ip instead.

Copilot uses AI. Check for mistakes.
Comment thread parsedmarc/postgres.py Outdated
Comment on lines +624 to +639
ip_address
) VALUES (
%s, %s, %s, %s, %s, %s, %s, %s, %s, %s
)
""",
(
policy_db_id,
detail.get("result_type"),
detail.get("failed_session_count"),
detail.get("sending_mta_ip"),
detail.get("receiving_ip"),
detail.get("receiving_mx_hostname"),
detail.get("receiving_mx_helo"),
detail.get("additional_info_uri"),
detail.get("failure_reason_code"),
detail.get("ip_address"),
Copy link

Copilot AI Mar 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ip_address insert parameter will always be None because detail.get("ip_address") has no corresponding key in the parsed SMTP TLS failure detail dictionary. This is consistent with the spurious column in the DDL (see the smtp_tls_failure_detail table schema).

Copilot uses AI. Check for mistakes.
Comment thread parsedmarc/postgres.py Outdated
Comment on lines +54 to +72
if connection_string:
conninfo = connection_string
else:
parts: list[str] = []
if host:
parts.append(f"host={host}")
if port:
parts.append(f"port={port}")
if user:
parts.append(f"user={user}")
if password:
parts.append(f"password={password}")
if database:
parts.append(f"dbname={database}")
conninfo = " ".join(parts)

logger.debug("Connecting to PostgreSQL")
try:
self._conn: psycopg.Connection = psycopg.connect(conninfo)
Copy link

Copilot AI Mar 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When building the libpq keyword-value connection string manually, special characters in password, user, host, or database values are not escaped. The libpq format requires that values containing spaces, single quotes, or backslashes be quoted or escaped (e.g., password='p@ss w0rd' with backslash-escaped quotes inside). A value like my password would produce password=my password, which is malformed and will fail to connect.

Instead of building a keyword-value connection string, psycopg.connect() in psycopg 3 accepts the individual parameters directly as keyword arguments (host, port, user, password, dbname), which avoids any escaping concerns entirely.

Suggested change
if connection_string:
conninfo = connection_string
else:
parts: list[str] = []
if host:
parts.append(f"host={host}")
if port:
parts.append(f"port={port}")
if user:
parts.append(f"user={user}")
if password:
parts.append(f"password={password}")
if database:
parts.append(f"dbname={database}")
conninfo = " ".join(parts)
logger.debug("Connecting to PostgreSQL")
try:
self._conn: psycopg.Connection = psycopg.connect(conninfo)
logger.debug("Connecting to PostgreSQL")
try:
if connection_string:
# Use the provided libpq connection string or URI directly.
self._conn: psycopg.Connection = psycopg.connect(connection_string)
else:
# Pass individual parameters as keyword arguments to avoid
# manual conninfo construction and escaping issues.
self._conn = psycopg.connect(
host=host,
port=port,
user=user,
password=password,
dbname=database,
)

Copilot uses AI. Check for mistakes.
Comment thread pyproject.toml
"tqdm>=4.31.1",
"urllib3>=1.25.7",
"xmltodict>=0.12.0",
"psycopg[binary]>=3.1.0",
Copy link

Copilot AI Mar 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The psycopg[binary] dependency is added to the mandatory dependencies list, which means it is now a required dependency for all users of parsedmarc, even those who never use PostgreSQL. This will pull in a binary package (psycopg-binary) for every installation. Other backends like kafka-python-ng, boto3 (S3), pygelf, and opensearch-py are already required dependencies in the same way, so this is consistent with the existing pattern. However, if the project wanted to keep PostgreSQL optional (similar to an extras_require pattern), this could be moved to an optional extra. Consider discussing with maintainers whether this should be optional.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would rather see this is optional because they are binaries that might not exist for every platform.

Comment thread parsedmarc/postgres.py Outdated
Raises:
PostgreSQLError: If a database error occurs.
"""
sample = report.get("sample", {}) or {}
Copy link

Copilot AI Mar 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a critical data mapping bug. report["sample"] is a raw RFC 822 email string (as assigned in parsedmarc/__init__.py at parsed_report["sample"] = sample), not a dict. The expression report.get("sample", {}) or {} will return the non-empty string when one is present, and then sample.get("date"), sample.get("subject"), sample.get("body"), etc. on line 505-516 will raise AttributeError: 'str' object has no attribute 'get'.

The parsed sample data (as a structured dict containing date, subject, body, headers, from, to, etc.) lives in report["parsed_sample"], not report["sample"]. This variable should be sample = report.get("parsed_sample", {}) or {}.

Suggested change
sample = report.get("sample", {}) or {}
sample = report.get("parsed_sample", {}) or {}

Copilot uses AI. Check for mistakes.
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
@seanthegeek
Copy link
Copy Markdown
Contributor

@fsvm88 Please look at the review.

fsvm88 added 4 commits March 8, 2026 10:07
The parsed policy dictionary returned by _parse_smtp_tls_report_policy()
uses flat keys (policy_domain, policy_type, policy_strings,
mx_host_patterns, successful_session_count, failed_session_count),
not the nested raw JSON structure. The previous code used nested
lookups like policy.get('policy', {}).get('policy_type') which
would always resolve to None, storing NULL for all policy fields.
…e_detail

The parser (_parse_smtp_tls_failure_details) never produces an
'ip_address' key — it only provides 'sending_mta_ip' and
'receiving_ip'. The column was always NULL. Removed from both the
DDL and the INSERT statement.
…fo string

The previous code built a libpq keyword-value connection string by
concatenating f-strings, which does not escape special characters in
values (e.g. spaces or quotes in passwords). psycopg.connect()
natively accepts host, port, user, password, and dbname as keyword
arguments, which avoids escaping issues entirely.
…t data

report['sample'] contains the raw RFC 822 email string, not a dict.
Calling .get('date'), .get('subject'), etc. on a string raises
AttributeError. The structured dict with date, subject, body,
headers, from, to, etc. lives in report['parsed_sample'].
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 5 out of 5 changed files in this pull request and generated 3 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

You can also share your feedback on Copilot code review. Take the survey.

Comment thread parsedmarc/postgres.py Outdated
Comment on lines +361 to +362
record.get("begin_date"),
record.get("end_date"),
Copy link

Copilot AI Mar 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The save_aggregate_report_to_postgresql method reads record.get("begin_date") and record.get("end_date") to populate the interval_begin and interval_end columns, but the parsed record dictionary uses "interval_begin" and "interval_end" as field names (set by _append_parsed_record in parsedmarc/__init__.py, lines 272-273 and 289-290). Since neither "begin_date" nor "end_date" exists in individual records (those keys belong to the report metadata level), these will always be None, causing the interval_begin and interval_end columns to always be empty.

Suggested change
record.get("begin_date"),
record.get("end_date"),
record.get("interval_begin"),
record.get("interval_end"),

Copilot uses AI. Check for mistakes.
Comment thread docs/source/usage.md
Comment thread parsedmarc/postgres.py
Comment on lines +260 to +267
try:
with self._conn.transaction():
with self._conn.cursor() as cur:
for stmt in ddl_statements:
cur.execute(stmt)
logger.debug("PostgreSQL tables verified / created")
except psycopg.Error as exc:
raise PostgreSQLError(str(exc)) from exc
Copy link

Copilot AI Mar 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The create_tables method only creates the tables but no indexes beyond the primary keys and the UNIQUE constraints (which create implicit indexes). The Grafana dashboard queries repeatedly filter by rpt.begin_date (e.g., WHERE $__timeFilter(rpt.begin_date)) and join on r.report_id and r.header_from. As data grows, queries will become slow without indexes on dmarc_aggregate_report.begin_date, dmarc_aggregate_record.report_id, and dmarc_aggregate_record.header_from. Consider adding CREATE INDEX IF NOT EXISTS statements for these columns in create_tables().

Copilot uses AI. Check for mistakes.
fsvm88 added 3 commits March 8, 2026 10:27
…imestamps

The per-record INSERT used record.get("begin_date") and
record.get("end_date"), but the parser stores these fields as
"interval_begin" and "interval_end" on each record.  The keys
begin_date/end_date only exist in report_metadata.  This caused the
interval_begin and interval_end columns to always be NULL.
The [postgresql] configuration section did not mention that the
[general] save_aggregate, save_forensic, and save_smtp_tls flags
must be set to True for data to actually be written to PostgreSQL.
Add an explanatory note after the configuration examples.
The Grafana dashboard queries repeatedly filter on
dmarc_aggregate_report.begin_date, join on
dmarc_aggregate_record.report_id, and filter on
dmarc_aggregate_record.header_from.  Without indexes these queries
will degrade to sequential scans as data grows.

Add CREATE INDEX IF NOT EXISTS statements for:
- dmarc_aggregate_report(begin_date)
- dmarc_aggregate_record(report_id)
- dmarc_aggregate_record(header_from)
- dmarc_forensic_report(arrival_date_utc)
- smtp_tls_report(begin_date)
- smtp_tls_policy(report_id)
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 5 out of 5 changed files in this pull request and generated 4 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

You can also share your feedback on Copilot code review. Take the survey.

Comment thread parsedmarc/postgres.py
Comment on lines +95 to +96
begin_date TIMESTAMPTZ NOT NULL,
end_date TIMESTAMPTZ NOT NULL,
Copy link

Copilot AI Mar 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The begin_date and end_date values stored in the dmarc_aggregate_report table come from meta.get("begin_date") which is a string produced by timestamp_to_human(). This function uses datetime.fromtimestamp() (local time, not UTC), so the string does not include timezone information. When PostgreSQL inserts this into a TIMESTAMPTZ NOT NULL column, it interprets the string using the current PostgreSQL session timezone rather than UTC.

For deployments where the PostgreSQL session timezone is not set to UTC, the stored timestamps will be incorrect. The interval_begin/interval_end fields in dmarc_aggregate_record have the same issue.

To fix this, the UTC-aware begin_dt datetime objects (already computed in the parser for UTC) should be used, or the timezone-aware string format "YYYY-MM-DD HH:MM:SS+00" should be generated. Alternatively, the column type could be changed to TEXT to match the forensic report approach, though TIMESTAMPTZ is the correct semantic type here.

Suggested change
begin_date TIMESTAMPTZ NOT NULL,
end_date TIMESTAMPTZ NOT NULL,
begin_date TEXT NOT NULL,
end_date TEXT NOT NULL,

Copilot uses AI. Check for mistakes.
Comment thread parsedmarc/postgres.py Outdated
Comment on lines +178 to +179
arrival_date TEXT,
arrival_date_utc TEXT,
Copy link

Copilot AI Mar 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The arrival_date_utc and arrival_date columns in dmarc_forensic_report are declared as TEXT, but the Grafana dashboard queries them with ::TIMESTAMPTZ casts (e.g., f.arrival_date_utc::TIMESTAMPTZ BETWEEN $__timeFrom() AND $__timeTo()). The index idx_forensic_report_arrival_date is created on the TEXT column, but this index will not be utilized when the column is used with a TIMESTAMPTZ cast in a WHERE clause.

Declaring these columns as TIMESTAMPTZ would improve query performance and type safety. The arrival_date_utc value from the parser is already a UTC string in %Y-%m-%d %H:%M:%S format and would insert cleanly into a TIMESTAMPTZ column if the UTC offset is appended (e.g., by using +00 suffix).

Suggested change
arrival_date TEXT,
arrival_date_utc TEXT,
arrival_date TIMESTAMPTZ,
arrival_date_utc TIMESTAMPTZ,

Copilot uses AI. Check for mistakes.
Comment thread parsedmarc/postgres.py
Comment on lines +54 to +73
logger.debug("Connecting to PostgreSQL")
try:
if connection_string:
# Use the provided libpq connection string or URI directly.
self._conn: psycopg.Connection = psycopg.connect(
connection_string
)
else:
# Pass individual parameters as keyword arguments to avoid
# manual conninfo construction and escaping issues.
self._conn = psycopg.connect(
host=host,
port=port,
user=user,
password=password,
dbname=database,
)
self._conn.autocommit = False
except psycopg.Error as exc:
raise PostgreSQLError(str(exc)) from exc
Copy link

Copilot AI Mar 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PostgreSQLClient holds a single persistent connection (self._conn) for the lifetime of the process. When parsedmarc runs in watch mode (mailbox_watch = true), the process runs indefinitely and calls process_reports repeatedly. If the PostgreSQL server drops the connection (e.g., due to a server restart, idle_in_transaction_session_timeout, tcp_keepalives_idle expiry, or idle connection reaping), all subsequent save_* calls will raise a psycopg.OperationalError, which gets wrapped as PostgreSQLError and logged — but the client is never reconnected. The tool will silently stop persisting reports to PostgreSQL without terminating.

Consider adding connection health checking or reconnection logic (e.g., checking self._conn.closed before use and reconnecting if needed), or using a connection pool.

Copilot uses AI. Check for mistakes.
]
}
],
"refresh": "10s",
Copy link

Copilot AI Mar 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The dashboard default refresh interval is set to "10s" (10 seconds). For a DMARC reporting dashboard where data changes infrequently (reports arrive via email at most a few times a day), a 10-second auto-refresh creates unnecessary polling load on the PostgreSQL server. Consider using "1m" or "5m" as a more appropriate default, or set "refresh": "" to disable auto-refresh by default and let users enable it manually.

Suggested change
"refresh": "10s",
"refresh": "1m",

Copilot uses AI. Check for mistakes.
fsvm88 added 4 commits March 8, 2026 10:48
…d timezone ambiguity

timestamp_to_human() produces timezone-naive local-time strings
(e.g. '2024-01-15 09:30:00') without any UTC offset indicator.
Storing these in TIMESTAMPTZ columns causes PostgreSQL to interpret
them using the session timezone rather than UTC, resulting in
incorrect timestamps when the session timezone differs.

Change begin_date, end_date, interval_begin and interval_end in
aggregate report/record tables, and begin_date/end_date in the
smtp_tls_report table, from TIMESTAMPTZ to TEXT to match the actual
data format produced by the parser.
The Grafana dashboard queries cast arrival_date_utc with
::TIMESTAMPTZ (e.g. f.arrival_date_utc::TIMESTAMPTZ BETWEEN ...).
When the column is TEXT, the index on arrival_date_utc cannot be
utilised for these casts.

Change arrival_date and arrival_date_utc from TEXT to TIMESTAMPTZ
for proper type safety and index usage. Add _ensure_utc_suffix()
helper to append '+00' to the parser's UTC strings so PostgreSQL
interprets them correctly.
When parsedmarc runs in watch mode (mailbox_watch = true), the
process can stay alive indefinitely.  If PostgreSQL drops the
connection (server restart, idle timeout, TCP keep-alive expiry),
all subsequent save_* calls would fail silently.

Store connection parameters at init time and add _connect() /
_ensure_connected() methods.  Each public method now calls
_ensure_connected() before using the connection, transparently
re-establishing it if the previous connection was lost.
DMARC reports arrive infrequently (at most a few times per day),
so a 10-second auto-refresh creates unnecessary polling load on
PostgreSQL.  A 1-minute interval is a much more appropriate default.
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 5 out of 5 changed files in this pull request and generated 3 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

You can also share your feedback on Copilot code review. Take the survey.

Comment thread parsedmarc/postgres.py Outdated
Comment thread parsedmarc/postgres.py Outdated
Comment thread parsedmarc/postgres.py
…amps and contact_info before insert

- Change begin_date/end_date columns from TEXT to TIMESTAMPTZ in
  dmarc_aggregate_report, dmarc_aggregate_record (interval_begin/end),
  and smtp_tls_report so Grafana $__timeFilter/$__timeGroup macros
  work correctly.

- Add _naive_local_to_timestamptz() helper for aggregate report dates
  produced by timestamp_to_human() (local-time, no offset). Re-parses
  and attaches the local timezone offset for unambiguous TIMESTAMPTZ
  inserts.

- Wrap record-level interval_begin/interval_end (already UTC) and
  SMTP TLS begin_date/end_date (ISO 8601 with Z) with
  _ensure_utc_suffix() as a safety net.

- Improve _ensure_utc_suffix() negative-offset detection to only check
  after position 10 in the string, avoiding false positives from date
  separator hyphens in YYYY-MM-DD.

- Add _normalize_arrival_date() for forensic arrival_date: parses
  RFC 2822 or ISO 8601 via human_timestamp_to_datetime, returns clean
  UTC string with +00 offset.

- Add _contact_info_to_text() to handle Union[str, List[str]] from
  TLS-RPT contact-info field; joins list values with comma separator
  to prevent psycopg type error on TEXT column insert.
Add 33 tests covering the PostgreSQL backend:

- TestPostgreSQLHelpers (16 tests): Unit tests for pure helper functions
  - _ensure_utc_suffix: None, empty, naive UTC, already-has-offset, Z suffix
  - _naive_local_to_timestamptz: None, empty, valid conversion, bad format
  - _normalize_arrival_date: None, empty, ISO, RFC 2822, already UTC, unparseable
  - _contact_info_to_text: None, string, list, empty list, numeric

- TestPostgreSQLClientSave (9 tests): Mock-based integration tests
  - Aggregate: INSERT calls, AlreadySaved on conflict, timestamp normalization
  - Forensic: INSERT calls with sample addresses
  - SMTP TLS: INSERT calls with policy+failure, AlreadySaved, contact_info list
  - Reconnection: _ensure_connected reconnects on closed connection

- TestPostgreSQLWithSamples (3 tests): End-to-end with real sample data
  - Parses real aggregate/forensic/smtp_tls samples from samples/ directory
  - Feeds parsed data through mock save methods to verify no type errors
  - Validates that real parser output is compatible with the PostgreSQL backend

All tests use unittest.mock to avoid requiring a live PostgreSQL instance.
@fsvm88
Copy link
Copy Markdown
Author

fsvm88 commented Mar 9, 2026

I have re-checked all columns for their type, and added tests too (manually checked and making sense, passing).

I'll build another docker image and test it for a week or so, but I think most of the work is done.

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 6 out of 6 changed files in this pull request and generated 1 comment.

Comments suppressed due to low confidence (1)

tests.py:173

  • The call name is imported from unittest.mock but is never used anywhere in the test file. This is an unused import that will cause a ruff F401 lint failure in CI (ruff check .).
        sample_paths = glob("samples/aggregate/*")

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread parsedmarc/postgres.py
Comment on lines +565 to +678
def save_forensic_report_to_postgresql(self, report: dict) -> None:
"""Saves a parsed forensic (RUF) DMARC report to PostgreSQL.

Args:
report: A parsed forensic report dictionary as returned by
:func:`parsedmarc.parse_report_file`.

Raises:
PostgreSQLError: If a database error occurs.
"""
self._ensure_connected()
sample = report.get("parsed_sample", {}) or {}
src = report.get("source", {}) or {}

try:
with self._conn.transaction():
with self._conn.cursor() as cur:
cur.execute(
"""
INSERT INTO dmarc_forensic_report (
feedback_type, user_agent, version,
original_envelope_id, original_mail_from,
original_rcpt_to, arrival_date, arrival_date_utc,
authentication_results, delivery_result,
auth_failure, authentication_mechanisms,
dkim_domain, reported_domain, sample_headers_only,
source_ip_address, source_country,
source_reverse_dns, source_base_domain,
source_name, source_type,
sample, sample_date, sample_subject,
sample_body, sample_has_defects,
sample_headers, sample_from, sample_to
) VALUES (
%s, %s, %s,
%s, %s,
%s, %s, %s,
%s, %s,
%s, %s,
%s, %s, %s,
%s, %s,
%s, %s,
%s, %s,
%s, %s, %s,
%s, %s,
%s, %s, %s
)
RETURNING id
""",
(
report.get("feedback_type"),
report.get("user_agent"),
report.get("version"),
report.get("original_envelope_id"),
report.get("original_mail_from"),
report.get("original_rcpt_to"),
_normalize_arrival_date(
report.get("arrival_date")
),
_ensure_utc_suffix(
report.get("arrival_date_utc")
),
report.get("authentication_results"),
report.get("delivery_result"),
report.get("auth_failure") or [],
report.get("authentication_mechanisms") or [],
report.get("dkim_domain"),
report.get("reported_domain"),
report.get("sample_headers_only"),
src.get("ip_address"),
src.get("country"),
src.get("reverse_dns"),
src.get("base_domain"),
src.get("name"),
src.get("type"),
report.get("sample"),
sample.get("date"),
sample.get("subject"),
sample.get("body"),
sample.get("has_defects"),
psycopg_types.json.Jsonb(sample.get("headers"))
if sample.get("headers")
else None,
psycopg_types.json.Jsonb(sample.get("from"))
if sample.get("from")
else None,
psycopg_types.json.Jsonb(sample.get("to"))
if sample.get("to")
else None,
),
)
report_db_id: int = cur.fetchone()[0]

for addr_type in ("to", "cc", "bcc", "reply_to"):
entries = sample.get(addr_type) or []
if isinstance(entries, dict):
entries = [entries]
for entry in entries:
cur.execute(
"""
INSERT INTO dmarc_forensic_sample_address
(report_id, address_type,
display_name, address)
VALUES (%s, %s, %s, %s)
""",
(
report_db_id,
addr_type,
entry.get("display_name"),
entry.get("address"),
),
)

except psycopg.Error as exc:
raise PostgreSQLError(str(exc)) from exc
Copy link

Copilot AI Mar 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The dmarc_forensic_report table has no uniqueness constraint and save_forensic_report_to_postgresql performs no duplicate detection before inserting. Unlike the aggregate and SMTP TLS reports (which use ON CONFLICT DO NOTHING), forensic reports can be inserted multiple times if the same email is reprocessed (e.g., in watch mode if an email fails to archive). The other backends (Elasticsearch, OpenSearch) perform duplicate detection by querying for an existing report with the same arrival date, sender, and subject before inserting. Consider adding a similar check or a uniqueness constraint.

Copilot uses AI. Check for mistakes.
@seanthegeek
Copy link
Copy Markdown
Contributor

@fsvm88 I won't merge this until after the DMARCbis RFCs are published and #659 is merged, which will change forensic reports to failure reports and add new aggregate report fields. That way, this PR can up up to date with the major changes (table names, fields, etc.)

@fsvm88
Copy link
Copy Markdown
Author

fsvm88 commented Mar 10, 2026

@fsvm88 I won't merge this until after the DMARCbis RFCs are published and #659 is merged, which will change forensic reports to failure reports and add new aggregate report fields. That way, this PR can up up to date with the major changes (table names, fields, etc.)

No problem, makes sense, I'm not in a rush 👍

I'll keep testing and cleaning up whatever I run into

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants