PostgreSQL Source Code: src/backend/executor/nodeAgg.c File Reference (original) (raw)
3280{
3288 int max_aggno;
3289 int max_transno;
3290 int numaggrefs;
3291 int numaggs;
3292 int numtrans;
3293 int phase;
3294 int phaseidx;
3296 Bitmapset *all_grouped_cols = NULL;
3297 int numGroupingSets = 1;
3298 int numPhases;
3299 int numHashes;
3300 int i = 0;
3301 int j = 0;
3304
3305
3307
3308
3309
3310
3315
3324 aggstate->peragg = NULL;
3332 aggstate->sort_in = NULL;
3334
3335
3336
3337
3338 numPhases = (use_hashing ? 1 : 2);
3339 numHashes = (use_hashing ? 1 : 0);
3340
3341
3342
3343
3344
3345
3347 {
3349
3350 foreach(l, node->chain)
3351 {
3353
3354 numGroupingSets = Max(numGroupingSets,
3356
3357
3358
3359
3360
3362 ++numPhases;
3363 else
3364 ++numHashes;
3365 }
3366 }
3367
3368 aggstate->maxsets = numGroupingSets;
3369 aggstate->numphases = numPhases;
3370
3373
3374
3375
3376
3377
3378
3379
3380
3381
3382
3383
3384
3385
3386
3387
3390
3391 for (i = 0; i < numGroupingSets; ++i)
3392 {
3395 }
3396
3397 if (use_hashing)
3399
3401
3402
3403
3404
3405
3406
3407
3409 eflags &= ~EXEC_FLAG_REWIND;
3412
3413
3414
3415
3420
3424
3425
3426
3427
3428
3429 if (numPhases > 2)
3430 {
3433
3434
3435
3436
3437
3438
3439
3440
3441
3442
3443
3444
3445
3446
3447
3451 }
3452
3453
3454
3455
3458
3459
3460
3461
3462
3463
3464
3465
3466
3467
3468
3469
3470
3471
3474
3475
3476
3477
3479 max_aggno = -1;
3480 max_transno = -1;
3481 foreach(l, aggstate->aggs)
3482 {
3484
3485 max_aggno = Max(max_aggno, aggref->aggno);
3486 max_transno = Max(max_transno, aggref->aggtransno);
3487 }
3488 aggstate->numaggs = numaggs = max_aggno + 1;
3489 aggstate->numtrans = numtrans = max_transno + 1;
3490
3491
3492
3493
3494
3496
3498 if (numHashes)
3499 {
3504 }
3505
3506 phase = 0;
3507 for (phaseidx = 0; phaseidx <= list_length(node->chain); ++phaseidx)
3508 {
3509 Agg *aggnode;
3510 Sort *sortnode;
3511
3512 if (phaseidx > 0)
3513 {
3516 }
3517 else
3518 {
3519 aggnode = node;
3520 sortnode = NULL;
3521 }
3522
3523 Assert(phase <= 1 || sortnode);
3524
3527 {
3531
3534 perhash = &aggstate->perhash[i];
3535
3536
3537 phasedata->aggnode = node;
3539
3540
3541 perhash->aggnode = aggnode;
3542
3544
3545 for (j = 0; j < aggnode->numCols; ++j)
3547
3549
3550 all_grouped_cols = bms_add_members(all_grouped_cols, cols);
3551 continue;
3552 }
3553 else
3554 {
3556 int num_sets;
3557
3559
3560 if (num_sets)
3561 {
3564
3565 i = 0;
3567 {
3570
3571
3572 for (j = 0; j < current_length; ++j)
3574
3577
3578 ++i;
3579 }
3580
3583 }
3584 else
3585 {
3586 Assert(phaseidx == 0);
3587
3590 }
3591
3592
3593
3594
3596 {
3597
3598
3599
3600
3603
3604
3605 for (int k = 0; k < phasedata->numsets; k++)
3606 {
3608
3609
3610 if (length == 0)
3611 continue;
3612
3613
3614 if (phasedata->eqfunctions[length - 1] != NULL)
3615 continue;
3616
3619 length,
3620 aggnode->grpColIdx,
3621 aggnode->grpOperators,
3622 aggnode->grpCollations,
3624 }
3625
3626
3627 if (aggnode->numCols > 0 &&
3629 {
3633 aggnode->grpColIdx,
3634 aggnode->grpOperators,
3635 aggnode->grpCollations,
3637 }
3638 }
3639
3640 phasedata->aggnode = aggnode;
3642 phasedata->sortnode = sortnode;
3643 }
3644 }
3645
3646
3647
3648
3649 i = -1;
3652
3653
3654
3655
3656
3660
3663
3664 aggstate->peragg = peraggs;
3665 aggstate->pertrans = pertransstates;
3666
3667
3670 * (numGroupingSets + numHashes));
3672
3674 {
3675 for (i = 0; i < numGroupingSets; i++)
3676 {
3678 * numaggs);
3679 }
3680
3681 aggstate->pergroups = pergroups;
3682 pergroups += numGroupingSets;
3683 }
3684
3685
3686
3687
3688 if (use_hashing)
3689 {
3691 uint64 totalGroups = 0;
3692
3697
3698
3700
3704
3705
3706
3707
3708
3709
3710
3711 for (int k = 0; k < aggstate->num_hashes; k++)
3713
3719
3720
3723
3725
3726
3728 }
3729
3730
3731
3732
3733
3734
3735
3737 {
3741 }
3742 else
3743 {
3747 }
3748
3749
3750
3751
3752
3753 foreach(l, aggstate->aggs)
3754 {
3759 int numAggTransFnArgs;
3760 int numDirectArgs;
3764 Oid finalfn_oid;
3765 Oid serialfn_oid,
3766 deserialfn_oid;
3767 Oid aggOwner;
3768 Expr *finalfnexpr;
3769 Oid aggtranstype;
3770
3771
3772 Assert(aggref->agglevelsup == 0);
3773
3775
3776 peragg = &peraggs[aggref->aggno];
3777
3778
3779 if (peragg->aggref != NULL)
3780 continue;
3781
3782 peragg->aggref = aggref;
3783 peragg->transno = aggref->aggtransno;
3784
3785
3789 elog(ERROR, "cache lookup failed for aggregate %u",
3792
3793
3800
3801
3802 aggtranstype = aggref->aggtranstype;
3804
3805
3808 else
3809 peragg->finalfn_oid = finalfn_oid = aggform->aggfinalfn;
3810
3813
3814
3815
3816
3817
3818 if (aggtranstype == INTERNALOID)
3819 {
3820
3821
3822
3823
3824
3826 {
3827
3829
3830 if ((aggform->aggserialfn))
3831 elog(ERROR, "serialfunc not provided for serialization aggregation");
3832 serialfn_oid = aggform->aggserialfn;
3833 }
3834
3835
3837 {
3838
3840
3841 if ((aggform->aggdeserialfn))
3842 elog(ERROR, "deserialfunc not provided for deserialization aggregation");
3843 deserialfn_oid = aggform->aggdeserialfn;
3844 }
3845 }
3846
3847
3848 {
3850
3854 elog(ERROR, "cache lookup failed for function %u",
3858
3860 {
3861 aclresult = object_aclcheck(ProcedureRelationId, finalfn_oid, aggOwner,
3867 }
3869 {
3870 aclresult = object_aclcheck(ProcedureRelationId, serialfn_oid, aggOwner,
3876 }
3878 {
3879 aclresult = object_aclcheck(ProcedureRelationId, deserialfn_oid, aggOwner,
3885 }
3886 }
3887
3888
3889
3890
3891
3892
3894 aggTransFnInputTypes);
3895
3896
3898
3899
3900 if (aggform->aggfinalextra)
3901 peragg->numFinalArgs = numAggTransFnArgs + 1;
3902 else
3904
3905
3908
3909
3910
3911
3912
3914 {
3917 aggtranstype,
3918 aggref->aggtype,
3919 aggref->inputcollid,
3920 finalfn_oid,
3921 &finalfnexpr);
3924 }
3925
3926
3930
3931
3932
3933
3934
3935 pertrans = &pertransstates[aggref->aggtransno];
3936 if (pertrans->aggref == NULL)
3937 {
3938 Datum textInitVal;
3940 bool initValueIsNull;
3941 Oid transfn_oid;
3942
3943
3944
3945
3946
3947
3949 {
3950 transfn_oid = aggform->aggcombinefn;
3951
3952
3954 elog(ERROR, "combinefn not set for aggregate function");
3955 }
3956 else
3957 transfn_oid = aggform->aggtransfn;
3958
3964
3965
3966
3967
3968
3970 Anum_pg_aggregate_agginitval,
3971 &initValueIsNull);
3972 if (initValueIsNull)
3974 else
3976
3978 {
3979 Oid combineFnInputTypes[] = {aggtranstype,
3980 aggtranstype};
3981
3982
3983
3984
3985
3986
3988
3989
3991 aggref, transfn_oid, aggtranstype,
3992 serialfn_oid, deserialfn_oid,
3994 combineFnInputTypes, 2);
3995
3996
3997
3998
3999
4000
4001
4002 if (pertrans->transfn.fn_strict && aggtranstype == INTERNALOID)
4004 (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
4005 errmsg("combine function with transition type %s must not be declared STRICT",
4007 }
4008 else
4009 {
4010
4011 if (AGGKIND_IS_ORDERED_SET(aggref->aggkind))
4013 else
4015
4017 aggref, transfn_oid, aggtranstype,
4018 serialfn_oid, deserialfn_oid,
4020 aggTransFnInputTypes,
4021 numAggTransFnArgs);
4022
4023
4024
4025
4026
4027
4028
4029
4030
4031
4033 {
4034 if (numAggTransFnArgs <= numDirectArgs ||
4036 aggtranstype))
4038 (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
4039 errmsg("aggregate %u needs to have compatible input type and transition type",
4041 }
4042 }
4043 }
4044 else
4047 }
4048
4049
4050
4051
4052
4053
4054
4055
4056
4057
4058
4061 (errcode(ERRCODE_GROUPING_ERROR),
4062 errmsg("aggregate function calls cannot be nested")));
4063
4064
4065
4066
4067
4068
4069
4070
4071 for (phaseidx = 0; phaseidx < aggstate->numphases; phaseidx++)
4072 {
4074 bool dohash = false;
4075 bool dosort = false;
4076
4077
4079 continue;
4080
4082 {
4083
4084
4085
4086
4087 dohash = true;
4088 dosort = true;
4089 }
4091 {
4092
4093
4094
4095
4096
4097 continue;
4098 }
4101 {
4102 dohash = false;
4103 dosort = true;
4104 }
4106 {
4107 dohash = true;
4108 dosort = false;
4109 }
4110 else
4112
4114 false);
4115
4116
4118 }
4119
4120 return aggstate;
4121}
void aclcheck_error(AclResult aclerr, ObjectType objtype, const char *objectname)
AclResult object_aclcheck(Oid classid, Oid objectid, Oid roleid, AclMode mode)
int bms_next_member(const Bitmapset *a, int prevbit)
Bitmapset * bms_add_member(Bitmapset *a, int x)
Bitmapset * bms_add_members(Bitmapset *a, const Bitmapset *b)
int errcode(int sqlerrcode)
int errmsg(const char *fmt,...)
#define ereport(elevel,...)
ExprState * ExecInitQual(List *qual, PlanState *parent)
List * ExecInitExprList(List *nodes, PlanState *parent)
ExprState * ExecBuildAggTrans(AggState *aggstate, AggStatePerPhase phase, bool doSort, bool doHash, bool nullcheck)
PlanState * ExecInitNode(Plan *node, EState *estate, int eflags)
const TupleTableSlotOps TTSOpsVirtual
void ExecInitResultTupleSlotTL(PlanState *planstate, const TupleTableSlotOps *tts_ops)
void ExecCreateScanSlotFromOuterPlan(EState *estate, ScanState *scanstate, const TupleTableSlotOps *tts_ops)
void ExecAssignExprContext(EState *estate, PlanState *planstate)
void ExecAssignProjectionInfo(PlanState *planstate, TupleDesc inputDesc)
const TupleTableSlotOps * ExecGetResultSlotOps(PlanState *planstate, bool *isfixed)
struct AggStatePerTransData * AggStatePerTrans
struct AggStatePerAggData * AggStatePerAgg
#define EXEC_FLAG_BACKWARD
#define EXEC_FLAG_EXPLAIN_ONLY
char * format_type_be(Oid type_oid)
#define HeapTupleIsValid(tuple)
static void * GETSTRUCT(const HeapTupleData *tuple)
List * lcons_int(int datum, List *list)
char * get_func_name(Oid funcid)
static void find_hash_columns(AggState *aggstate)
static Datum GetAggInitVal(Datum textInitVal, Oid transtype)
Size hash_agg_entry_size(int numTrans, Size tupleWidth, Size transitionSpace)
static void build_pertrans_for_aggref(AggStatePerTrans pertrans, AggState *aggstate, EState *estate, Aggref *aggref, Oid transfn_oid, Oid aggtranstype, Oid aggserialfn, Oid aggdeserialfn, Datum initValue, bool initValueIsNull, Oid *inputTypes, int numArguments)
static TupleTableSlot * ExecAgg(PlanState *pstate)
static void build_hash_tables(AggState *aggstate)
static void hash_create_memory(AggState *aggstate)
#define DO_AGGSPLIT_DESERIALIZE(as)
#define DO_AGGSPLIT_SERIALIZE(as)
#define InvokeFunctionExecuteHook(objectId)
void build_aggregate_finalfn_expr(Oid *agg_input_types, int num_finalfn_inputs, Oid agg_state_type, Oid agg_result_type, Oid agg_input_collation, Oid finalfn_oid, Expr **finalfnexpr)
int get_aggregate_argtypes(Aggref *aggref, Oid *inputTypes)
bool IsBinaryCoercible(Oid srctype, Oid targettype)
FormData_pg_aggregate * Form_pg_aggregate
#define list_nth_node(type, list, n)
FormData_pg_proc * Form_pg_proc
static Datum ObjectIdGetDatum(Oid X)
Bitmapset ** grouped_cols
ExprState * evaltrans_cache[2][2]
AggStatePerGroup * all_pergroups
int hash_planned_partitions
TupleTableSlot * hash_spill_wslot
TupleTableSlot * sort_slot
const TupleTableSlotOps * outerops
ExecProcNodeMtd ExecProcNode
void ReleaseSysCache(HeapTuple tuple)
HeapTuple SearchSysCache1(int cacheId, Datum key1)
Datum SysCacheGetAttr(int cacheId, HeapTuple tup, AttrNumber attributeNumber, bool *isNull)