initial boilerplate
This commit is contained in:
@@ -0,0 +1,38 @@
|
||||
package db
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"github.com/jackc/pgx/v5/pgxpool"
|
||||
)
|
||||
|
||||
// Connect opens a connection pool to Postgres.
|
||||
// pgxpool manages multiple connections automatically — you rarely need to
|
||||
// think about it; just pass the pool around and pgx picks an idle connection.
|
||||
func Connect(databaseURL string) (*pgxpool.Pool, error) {
|
||||
if databaseURL == "" {
|
||||
return nil, fmt.Errorf("DATABASE_URL is not set")
|
||||
}
|
||||
|
||||
config, err := pgxpool.ParseConfig(databaseURL)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("parse database url: %w", err)
|
||||
}
|
||||
|
||||
// Pool settings — tune these later based on your load
|
||||
config.MaxConns = 25
|
||||
config.MinConns = 2
|
||||
|
||||
pool, err := pgxpool.NewWithConfig(context.Background(), config)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("create pool: %w", err)
|
||||
}
|
||||
|
||||
// Ping to verify the connection works at startup
|
||||
if err := pool.Ping(context.Background()); err != nil {
|
||||
return nil, fmt.Errorf("ping database: %w", err)
|
||||
}
|
||||
|
||||
return pool, nil
|
||||
}
|
||||
@@ -0,0 +1,161 @@
|
||||
package db
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
|
||||
"github.com/jackc/pgx/v5/pgxpool"
|
||||
)
|
||||
|
||||
// migration holds a SQL statement and a human-readable name.
|
||||
// We run them in order and track which ones have already run.
|
||||
type migration struct {
|
||||
name string
|
||||
sql string
|
||||
}
|
||||
|
||||
var migrations = []migration{
|
||||
{
|
||||
name: "create_receipts",
|
||||
sql: `
|
||||
CREATE TABLE IF NOT EXISTS receipts (
|
||||
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
||||
store_name TEXT,
|
||||
receipt_date DATE NOT NULL,
|
||||
image_path TEXT,
|
||||
city TEXT,
|
||||
country TEXT DEFAULT 'US',
|
||||
submitted_at TIMESTAMPTZ NOT NULL DEFAULT now()
|
||||
);
|
||||
`,
|
||||
},
|
||||
{
|
||||
name: "create_canonical_items",
|
||||
sql: `
|
||||
-- The "dictionary" of known items we track.
|
||||
-- User submissions get mapped to these via fuzzy matching.
|
||||
CREATE TABLE IF NOT EXISTS canonical_items (
|
||||
name TEXT PRIMARY KEY, -- e.g. "milk_whole_1gal"
|
||||
display_name TEXT NOT NULL, -- e.g. "Whole Milk, 1 Gallon"
|
||||
category TEXT NOT NULL, -- e.g. "dairy"
|
||||
unit TEXT, -- e.g. "gallon", "lb", "dozen"
|
||||
aliases TEXT[] DEFAULT '{}' -- ["1 gal whole milk", "milk whole gal"]
|
||||
);
|
||||
`,
|
||||
},
|
||||
{
|
||||
name: "create_line_items",
|
||||
sql: `
|
||||
CREATE TABLE IF NOT EXISTS line_items (
|
||||
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
||||
receipt_id UUID NOT NULL REFERENCES receipts(id) ON DELETE CASCADE,
|
||||
raw_name TEXT NOT NULL,
|
||||
canonical_name TEXT REFERENCES canonical_items(name),
|
||||
price_cents INT NOT NULL CHECK (price_cents > 0),
|
||||
quantity NUMERIC(8,3) NOT NULL DEFAULT 1,
|
||||
-- Stored computed column: unit price in cents
|
||||
unit_price_cents INT GENERATED ALWAYS AS
|
||||
(ROUND(price_cents / quantity)::INT) STORED
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS line_items_receipt_id ON line_items(receipt_id);
|
||||
CREATE INDEX IF NOT EXISTS line_items_canonical ON line_items(canonical_name);
|
||||
`,
|
||||
},
|
||||
{
|
||||
name: "create_price_snapshots",
|
||||
sql: `
|
||||
-- Pre-aggregated monthly averages, rebuilt by a background job.
|
||||
-- Charts read from here rather than scanning all line_items each time.
|
||||
CREATE TABLE IF NOT EXISTS price_snapshots (
|
||||
canonical_name TEXT NOT NULL REFERENCES canonical_items(name),
|
||||
year_month DATE NOT NULL, -- always the 1st of the month
|
||||
avg_price_cents INT NOT NULL,
|
||||
sample_count INT NOT NULL DEFAULT 0,
|
||||
PRIMARY KEY (canonical_name, year_month)
|
||||
);
|
||||
`,
|
||||
},
|
||||
{
|
||||
name: "seed_canonical_items",
|
||||
sql: `
|
||||
INSERT INTO canonical_items (name, display_name, category, unit, aliases) VALUES
|
||||
('milk_whole_1gal', 'Whole Milk, 1 Gallon', 'dairy', 'gallon',
|
||||
ARRAY['whole milk gallon', '1 gal whole milk', 'milk whl gal']),
|
||||
('eggs_large_dozen', 'Large Eggs, 1 Dozen', 'dairy', 'dozen',
|
||||
ARRAY['large eggs 12ct', 'eggs large dozen', 'grade a large eggs']),
|
||||
('bread_white_loaf', 'White Bread, 1 Loaf', 'bakery', 'loaf',
|
||||
ARRAY['white bread', 'sandwich bread', 'bread loaf']),
|
||||
('ground_beef_1lb', 'Ground Beef, 1 lb (80%)', 'meat', 'lb',
|
||||
ARRAY['ground beef lb', '80/20 ground beef', 'hamburger meat']),
|
||||
('olive_oil_16oz', 'Olive Oil, 16 oz', 'pantry', 'bottle',
|
||||
ARRAY['olive oil 16oz', 'extra virgin olive oil', 'evoo 16oz']),
|
||||
('butter_salted_1lb','Salted Butter, 1 lb', 'dairy', 'lb',
|
||||
ARRAY['butter salted pound', 'salted butter 4 sticks']),
|
||||
('chicken_breast_1lb','Chicken Breast, 1 lb', 'meat', 'lb',
|
||||
ARRAY['boneless chicken breast', 'chicken breast lb']),
|
||||
('orange_juice_52oz','Orange Juice, 52 oz', 'beverages','carton',
|
||||
ARRAY['oj 52oz', 'orange juice carton', 'florida natural oj'])
|
||||
ON CONFLICT (name) DO NOTHING;
|
||||
`,
|
||||
},
|
||||
}
|
||||
|
||||
// Migrate runs all pending migrations in order.
|
||||
// It creates a simple tracking table on first run.
|
||||
func Migrate(pool *pgxpool.Pool) error {
|
||||
ctx := context.Background()
|
||||
|
||||
// Create the migrations tracking table if it doesn't exist
|
||||
_, err := pool.Exec(ctx, `
|
||||
CREATE TABLE IF NOT EXISTS schema_migrations (
|
||||
name TEXT PRIMARY KEY,
|
||||
applied_at TIMESTAMPTZ NOT NULL DEFAULT now()
|
||||
);
|
||||
`)
|
||||
if err != nil {
|
||||
return fmt.Errorf("create migrations table: %w", err)
|
||||
}
|
||||
|
||||
for _, m := range migrations {
|
||||
// Check if already applied
|
||||
var exists bool
|
||||
err := pool.QueryRow(ctx,
|
||||
"SELECT EXISTS(SELECT 1 FROM schema_migrations WHERE name = $1)", m.name,
|
||||
).Scan(&exists)
|
||||
if err != nil {
|
||||
return fmt.Errorf("check migration %s: %w", m.name, err)
|
||||
}
|
||||
|
||||
if exists {
|
||||
continue
|
||||
}
|
||||
|
||||
// Run the migration inside a transaction so it's atomic
|
||||
tx, err := pool.Begin(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("begin migration %s: %w", m.name, err)
|
||||
}
|
||||
|
||||
if _, err := tx.Exec(ctx, m.sql); err != nil {
|
||||
_ = tx.Rollback(ctx)
|
||||
return fmt.Errorf("run migration %s: %w", m.name, err)
|
||||
}
|
||||
|
||||
if _, err := tx.Exec(ctx,
|
||||
"INSERT INTO schema_migrations (name) VALUES ($1)", m.name,
|
||||
); err != nil {
|
||||
_ = tx.Rollback(ctx)
|
||||
return fmt.Errorf("record migration %s: %w", m.name, err)
|
||||
}
|
||||
|
||||
if err := tx.Commit(ctx); err != nil {
|
||||
return fmt.Errorf("commit migration %s: %w", m.name, err)
|
||||
}
|
||||
|
||||
slog.Info("migration applied", "name", m.name)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -0,0 +1,287 @@
|
||||
package db
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/jackc/pgx/v5/pgxpool"
|
||||
)
|
||||
|
||||
// Queries holds the database connection pool.
|
||||
// All database operations are methods on this type.
|
||||
// This pattern (a "repository") keeps SQL out of your handlers.
|
||||
type Queries struct {
|
||||
pool *pgxpool.Pool
|
||||
}
|
||||
|
||||
func New(pool *pgxpool.Pool) *Queries {
|
||||
return &Queries{pool: pool}
|
||||
}
|
||||
|
||||
// ---- Receipts ----
|
||||
|
||||
type CreateReceiptParams struct {
|
||||
StoreName string
|
||||
ReceiptDate time.Time
|
||||
ImagePath string
|
||||
City string
|
||||
Country string
|
||||
}
|
||||
|
||||
type Receipt struct {
|
||||
ID uuid.UUID
|
||||
StoreName string
|
||||
ReceiptDate time.Time
|
||||
ImagePath string
|
||||
City string
|
||||
Country string
|
||||
SubmittedAt time.Time
|
||||
}
|
||||
|
||||
func (q *Queries) CreateReceipt(ctx context.Context, p CreateReceiptParams) (Receipt, error) {
|
||||
var r Receipt
|
||||
err := q.pool.QueryRow(ctx, `
|
||||
INSERT INTO receipts (store_name, receipt_date, image_path, city, country)
|
||||
VALUES ($1, $2, $3, $4, $5)
|
||||
RETURNING id, store_name, receipt_date, image_path, city, country, submitted_at
|
||||
`, p.StoreName, p.ReceiptDate, p.ImagePath, p.City, p.Country,
|
||||
).Scan(&r.ID, &r.StoreName, &r.ReceiptDate, &r.ImagePath, &r.City, &r.Country, &r.SubmittedAt)
|
||||
|
||||
if err != nil {
|
||||
return Receipt{}, fmt.Errorf("create receipt: %w", err)
|
||||
}
|
||||
return r, nil
|
||||
}
|
||||
|
||||
func (q *Queries) GetReceiptByID(ctx context.Context, id uuid.UUID) (Receipt, error) {
|
||||
var r Receipt
|
||||
err := q.pool.QueryRow(ctx, `
|
||||
SELECT id, store_name, receipt_date, image_path, city, country, submitted_at
|
||||
FROM receipts WHERE id = $1
|
||||
`, id).Scan(&r.ID, &r.StoreName, &r.ReceiptDate, &r.ImagePath, &r.City, &r.Country, &r.SubmittedAt)
|
||||
|
||||
if err != nil {
|
||||
return Receipt{}, fmt.Errorf("get receipt: %w", err)
|
||||
}
|
||||
return r, nil
|
||||
}
|
||||
|
||||
// ---- Line Items ----
|
||||
|
||||
type CreateLineItemParams struct {
|
||||
ReceiptID uuid.UUID
|
||||
RawName string
|
||||
CanonicalName string // may be empty if not matched yet
|
||||
PriceCents int
|
||||
Quantity float64
|
||||
}
|
||||
|
||||
func (q *Queries) CreateLineItem(ctx context.Context, p CreateLineItemParams) error {
|
||||
var canonical *string
|
||||
if p.CanonicalName != "" {
|
||||
canonical = &p.CanonicalName
|
||||
}
|
||||
|
||||
_, err := q.pool.Exec(ctx, `
|
||||
INSERT INTO line_items (receipt_id, raw_name, canonical_name, price_cents, quantity)
|
||||
VALUES ($1, $2, $3, $4, $5)
|
||||
`, p.ReceiptID, p.RawName, canonical, p.PriceCents, p.Quantity)
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("create line item: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// ---- Canonical Items ----
|
||||
|
||||
type CanonicalItem struct {
|
||||
Name string
|
||||
DisplayName string
|
||||
Category string
|
||||
Unit string
|
||||
Aliases []string
|
||||
}
|
||||
|
||||
func (q *Queries) ListCanonicalItems(ctx context.Context) ([]CanonicalItem, error) {
|
||||
rows, err := q.pool.Query(ctx, `
|
||||
SELECT name, display_name, category, unit, aliases
|
||||
FROM canonical_items
|
||||
ORDER BY category, display_name
|
||||
`)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("list canonical items: %w", err)
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
var items []CanonicalItem
|
||||
for rows.Next() {
|
||||
var item CanonicalItem
|
||||
if err := rows.Scan(&item.Name, &item.DisplayName, &item.Category, &item.Unit, &item.Aliases); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
items = append(items, item)
|
||||
}
|
||||
return items, rows.Err()
|
||||
}
|
||||
|
||||
// ---- Price History ----
|
||||
|
||||
type PricePoint struct {
|
||||
YearMonth time.Time
|
||||
AvgPriceCents int
|
||||
SampleCount int
|
||||
}
|
||||
|
||||
// GetPriceHistory returns monthly average prices for a canonical item.
|
||||
func (q *Queries) GetPriceHistory(ctx context.Context, canonicalName string) ([]PricePoint, error) {
|
||||
rows, err := q.pool.Query(ctx, `
|
||||
SELECT year_month, avg_price_cents, sample_count
|
||||
FROM price_snapshots
|
||||
WHERE canonical_name = $1
|
||||
ORDER BY year_month ASC
|
||||
`, canonicalName)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("get price history: %w", err)
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
var points []PricePoint
|
||||
for rows.Next() {
|
||||
var p PricePoint
|
||||
if err := rows.Scan(&p.YearMonth, &p.AvgPriceCents, &p.SampleCount); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
points = append(points, p)
|
||||
}
|
||||
return points, rows.Err()
|
||||
}
|
||||
|
||||
// TopMover represents the biggest price change over a period.
|
||||
type TopMover struct {
|
||||
CanonicalName string
|
||||
DisplayName string
|
||||
Category string
|
||||
PriceThen int
|
||||
PriceNow int
|
||||
PctChange float64
|
||||
}
|
||||
|
||||
// GetTopMovers finds items with the largest price change over the last N months.
|
||||
func (q *Queries) GetTopMovers(ctx context.Context, months int, limit int) ([]TopMover, error) {
|
||||
rows, err := q.pool.Query(ctx, `
|
||||
WITH
|
||||
-- Most recent snapshot per item
|
||||
latest AS (
|
||||
SELECT DISTINCT ON (canonical_name)
|
||||
canonical_name, avg_price_cents AS price_now, year_month
|
||||
FROM price_snapshots
|
||||
ORDER BY canonical_name, year_month DESC
|
||||
),
|
||||
-- Snapshot closest to N months ago
|
||||
old AS (
|
||||
SELECT DISTINCT ON (ps.canonical_name)
|
||||
ps.canonical_name, ps.avg_price_cents AS price_then
|
||||
FROM price_snapshots ps
|
||||
WHERE ps.year_month <= (now() - ($1 || ' months')::interval)::date
|
||||
ORDER BY ps.canonical_name, ps.year_month DESC
|
||||
)
|
||||
SELECT
|
||||
l.canonical_name,
|
||||
ci.display_name,
|
||||
ci.category,
|
||||
o.price_then,
|
||||
l.price_now,
|
||||
ROUND(((l.price_now - o.price_then)::numeric / o.price_then) * 100, 1) AS pct_change
|
||||
FROM latest l
|
||||
JOIN old o ON l.canonical_name = o.canonical_name
|
||||
JOIN canonical_items ci ON l.canonical_name = ci.name
|
||||
ORDER BY ABS(pct_change) DESC
|
||||
LIMIT $2
|
||||
`, months, limit)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("get top movers: %w", err)
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
var movers []TopMover
|
||||
for rows.Next() {
|
||||
var m TopMover
|
||||
if err := rows.Scan(&m.CanonicalName, &m.DisplayName, &m.Category,
|
||||
&m.PriceThen, &m.PriceNow, &m.PctChange); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
movers = append(movers, m)
|
||||
}
|
||||
return movers, rows.Err()
|
||||
}
|
||||
|
||||
// InflationSummary returns the overall purchasing power change since a base year.
|
||||
type InflationSummary struct {
|
||||
BaseYear int
|
||||
CurrentYear int
|
||||
BasketThen int // average cents across tracked items at base year
|
||||
BasketNow int // average cents across tracked items now
|
||||
PurchasingPower float64 // e.g. 0.58 means $1 in baseYear = $0.58 today
|
||||
TotalPctChange float64 // e.g. 72.4 means prices are 72.4% higher
|
||||
}
|
||||
|
||||
func (q *Queries) GetInflationSummary(ctx context.Context, baseYear int) (InflationSummary, error) {
|
||||
var s InflationSummary
|
||||
err := q.pool.QueryRow(ctx, `
|
||||
WITH
|
||||
base AS (
|
||||
SELECT AVG(avg_price_cents)::int AS avg_price
|
||||
FROM price_snapshots
|
||||
WHERE EXTRACT(YEAR FROM year_month) = $1
|
||||
),
|
||||
current AS (
|
||||
SELECT AVG(avg_price_cents)::int AS avg_price
|
||||
FROM price_snapshots
|
||||
WHERE year_month >= date_trunc('year', now()) - interval '1 year'
|
||||
)
|
||||
SELECT
|
||||
$1,
|
||||
EXTRACT(YEAR FROM now())::int,
|
||||
base.avg_price,
|
||||
current.avg_price,
|
||||
ROUND((base.avg_price::numeric / current.avg_price) * 100, 1),
|
||||
ROUND(((current.avg_price - base.avg_price)::numeric / base.avg_price) * 100, 1)
|
||||
FROM base, current
|
||||
`, baseYear).Scan(
|
||||
&s.BaseYear, &s.CurrentYear,
|
||||
&s.BasketThen, &s.BasketNow,
|
||||
&s.PurchasingPower, &s.TotalPctChange,
|
||||
)
|
||||
if err != nil {
|
||||
return InflationSummary{}, fmt.Errorf("get inflation summary: %w", err)
|
||||
}
|
||||
return s, nil
|
||||
}
|
||||
|
||||
// RebuildSnapshots re-aggregates all line_items into price_snapshots.
|
||||
// Run this as a nightly cron job or after a batch of new submissions.
|
||||
func (q *Queries) RebuildSnapshots(ctx context.Context) error {
|
||||
_, err := q.pool.Exec(ctx, `
|
||||
INSERT INTO price_snapshots (canonical_name, year_month, avg_price_cents, sample_count)
|
||||
SELECT
|
||||
canonical_name,
|
||||
date_trunc('month', r.receipt_date)::date AS year_month,
|
||||
ROUND(AVG(li.unit_price_cents))::int AS avg_price_cents,
|
||||
COUNT(*) AS sample_count
|
||||
FROM line_items li
|
||||
JOIN receipts r ON li.receipt_id = r.id
|
||||
WHERE li.canonical_name IS NOT NULL
|
||||
GROUP BY canonical_name, year_month
|
||||
ON CONFLICT (canonical_name, year_month)
|
||||
DO UPDATE SET
|
||||
avg_price_cents = EXCLUDED.avg_price_cents,
|
||||
sample_count = EXCLUDED.sample_count
|
||||
`)
|
||||
if err != nil {
|
||||
return fmt.Errorf("rebuild snapshots: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
Reference in New Issue
Block a user