我是靠谱客的博主 自然羽毛,最近开发中收集的这篇文章主要介绍PostgreSQL 源码解读(11)- 插入数据#10(PortalRunMulti和Por...,觉得挺不错的,现在分享给大家,希望可以做个参考。
概述
本文简单介绍了PG插入数据部分的源码,主要内容包括PortalRunMulti函数和PortalRun函数的实现逻辑,PortalRunMulti函数和PortalRun均位于pquery.c文件中。
一、基础信息
PortalRunMulti函数使用的数据结构、宏定义以及依赖的函数等。
数据结构/宏定义
1、Portal
typedef struct PortalData *Portal;
typedef struct PortalData
{
/* Bookkeeping data */
const char *name; /* portal's name */
const char *prepStmtName; /* source prepared statement (NULL if none) */
MemoryContext portalContext; /* subsidiary memory for portal */
ResourceOwner resowner; /* resources owned by portal */
void (*cleanup) (Portal portal); /* cleanup hook */
/*
* State data for remembering which subtransaction(s) the portal was
* created or used in. If the portal is held over from a previous
* transaction, both subxids are InvalidSubTransactionId. Otherwise,
* createSubid is the creating subxact and activeSubid is the last subxact
* in which we ran the portal.
*/
SubTransactionId createSubid; /* the creating subxact */
SubTransactionId activeSubid; /* the last subxact with activity */
/* The query or queries the portal will execute */
const char *sourceText; /* text of query (as of 8.4, never NULL) */
const char *commandTag; /* command tag for original query */
List *stmts; /* list of PlannedStmts */
CachedPlan *cplan; /* CachedPlan, if stmts are from one */
ParamListInfo portalParams; /* params to pass to query */
QueryEnvironment *queryEnv; /* environment for query */
/* Features/options */
PortalStrategy strategy; /* see above */
int cursorOptions; /* DECLARE CURSOR option bits */
bool run_once; /* portal will only be run once */
/* Status data */
PortalStatus status; /* see above */
bool portalPinned; /* a pinned portal can't be dropped */
bool autoHeld; /* was automatically converted from pinned to
* held (see HoldPinnedPortals()) */
/* If not NULL, Executor is active; call ExecutorEnd eventually: */
QueryDesc *queryDesc; /* info needed for executor invocation */
/* If portal returns tuples, this is their tupdesc: */
TupleDesc tupDesc; /* descriptor for result tuples */
/* and these are the format codes to use for the columns: */
int16 *formats; /* a format code for each column */
/*
* Where we store tuples for a held cursor or a PORTAL_ONE_RETURNING or
* PORTAL_UTIL_SELECT query. (A cursor held past the end of its
* transaction no longer has any active executor state.)
*/
Tuplestorestate *holdStore; /* store for holdable cursors */
MemoryContext holdContext; /* memory containing holdStore */
/*
* Snapshot under which tuples in the holdStore were read. We must keep a
* reference to this snapshot if there is any possibility that the tuples
* contain TOAST references, because releasing the snapshot could allow
* recently-dead rows to be vacuumed away, along with any toast data
* belonging to them. In the case of a held cursor, we avoid needing to
* keep such a snapshot by forcibly detoasting the data.
*/
Snapshot holdSnapshot; /* registered snapshot, or NULL if none */
/*
* atStart, atEnd and portalPos indicate the current cursor position.
* portalPos is zero before the first row, N after fetching N'th row of
* query. After we run off the end, portalPos = # of rows in query, and
* atEnd is true. Note that atStart implies portalPos == 0, but not the
* reverse: we might have backed up only as far as the first row, not to
* the start. Also note that various code inspects atStart and atEnd, but
* only the portal movement routines should touch portalPos.
*/
bool atStart;
bool atEnd;
uint64 portalPos;
/* Presentation data, primarily used by the pg_cursors system view */
TimestampTz creation_time; /* time at which this portal was defined */
bool visible; /* include this portal in pg_cursors? */
} PortalData;
2、List
typedef struct ListCell ListCell;
typedef struct List
{
NodeTag type; /* T_List, T_IntList, or T_OidList */
int length;
ListCell *head;
ListCell *tail;
} List;
struct ListCell
{
union
{
void *ptr_value;
int int_value;
Oid oid_value;
} data;
ListCell *next;
};
3、Snapshot
typedef struct SnapshotData *Snapshot;
/*
* Struct representing all kind of possible snapshots.
*
* There are several different kinds of snapshots:
* * Normal MVCC snapshots
* * MVCC snapshots taken during recovery (in Hot-Standby mode)
* * Historic MVCC snapshots used during logical decoding
* * snapshots passed to HeapTupleSatisfiesDirty()
* * snapshots passed to HeapTupleSatisfiesNonVacuumable()
* * snapshots used for SatisfiesAny, Toast, Self where no members are
* accessed.
*
* TODO: It's probably a good idea to split this struct using a NodeTag
* similar to how parser and executor nodes are handled, with one type for
* each different kind of snapshot to avoid overloading the meaning of
* individual fields.
*/
typedef struct SnapshotData
{
SnapshotSatisfiesFunc satisfies; /* tuple test function */
/*
* The remaining fields are used only for MVCC snapshots, and are normally
* just zeroes in special snapshots. (But xmin and xmax are used
* specially by HeapTupleSatisfiesDirty, and xmin is used specially by
* HeapTupleSatisfiesNonVacuumable.)
*
* An MVCC snapshot can never see the effects of XIDs >= xmax. It can see
* the effects of all older XIDs except those listed in the snapshot. xmin
* is stored as an optimization to avoid needing to search the XID arrays
* for most tuples.
*/
TransactionId xmin; /* all XID < xmin are visible to me */
TransactionId xmax; /* all XID >= xmax are invisible to me */
/*
* For normal MVCC snapshot this contains the all xact IDs that are in
* progress, unless the snapshot was taken during recovery in which case
* it's empty. For historic MVCC snapshots, the meaning is inverted, i.e.
* it contains *committed* transactions between xmin and xmax.
*
* note: all ids in xip[] satisfy xmin <= xip[i] < xmax
*/
TransactionId *xip;
uint32 xcnt; /* # of xact ids in xip[] */
/*
* For non-historic MVCC snapshots, this contains subxact IDs that are in
* progress (and other transactions that are in progress if taken during
* recovery). For historic snapshot it contains *all* xids assigned to the
* replayed transaction, including the toplevel xid.
*
* note: all ids in subxip[] are >= xmin, but we don't bother filtering
* out any that are >= xmax
*/
TransactionId *subxip;
int32 subxcnt; /* # of xact ids in subxip[] */
bool suboverflowed; /* has the subxip array overflowed? */
bool takenDuringRecovery; /* recovery-shaped snapshot? */
bool copied; /* false if it's a static snapshot */
CommandId curcid; /* in my xact, CID < curcid are visible */
/*
* An extra return value for HeapTupleSatisfiesDirty, not used in MVCC
* snapshots.
*/
uint32 speculativeToken;
/*
* Book-keeping information, used by the snapshot manager
*/
uint32 active_count; /* refcount on ActiveSnapshot stack */
uint32 regd_count; /* refcount on RegisteredSnapshots */
pairingheap_node ph_node; /* link in the RegisteredSnapshots heap */
TimestampTz whenTaken; /* timestamp when snapshot was taken */
XLogRecPtr lsn; /* position in the WAL stream when taken */
} SnapshotData;
依赖的函数
1、lfirst_*
/*
* NB: There is an unfortunate legacy from a previous incarnation of
* the List API: the macro lfirst() was used to mean "the data in this
* cons cell". To avoid changing every usage of lfirst(), that meaning
* has been kept. As a result, lfirst() takes a ListCell and returns
* the data it contains; to get the data in the first cell of a
* List, use linitial(). Worse, lsecond() is more closely related to
* linitial() than lfirst(): given a List, lsecond() returns the data
* in the second cons cell.
*/
#define lnext(lc) ((lc)->next)
#define lfirst(lc) ((lc)->data.ptr_value)
#define lfirst_int(lc) ((lc)->data.int_value)
#define lfirst_oid(lc) ((lc)->data.oid_value)
#define lfirst_node(type,lc) castNode(type, lfirst(lc))
/*
* castNode(type, ptr) casts ptr to "type *", and if assertions are enabled,
* verifies that the node has the appropriate type (using its nodeTag()).
*
* Use an inline function when assertions are enabled, to avoid multiple
* evaluations of the ptr argument (which could e.g. be a function call).
*/
#ifdef USE_ASSERT_CHECKING
static inline Node *
castNodeImpl(NodeTag type, void *ptr)
{
Assert(ptr == NULL || nodeTag(ptr) == type);
return (Node *) ptr;
}
#define castNode(_type_, nodeptr) ((_type_ *) castNodeImpl(T_##_type_, nodeptr))
#else
#define castNode(_type_, nodeptr) ((_type_ *) (nodeptr))
#endif /* USE_ASSERT_CHECKING */
2、Snapshot相关
//留待MVCC再行解读
GetTransactionSnapshot
RegisterSnapshot
PushCopiedSnapshot
UpdateActiveSnapshotCommandId
PopActiveSnapshot
3、ProcessQuery
//上一节已介绍
4、CommandCounterIncrement
/*
* CommandCounterIncrement
*/
void
CommandCounterIncrement(void)
{
/*
* If the current value of the command counter hasn't been "used" to mark
* tuples, we need not increment it, since there's no need to distinguish
* a read-only command from others. This helps postpone command counter
* overflow, and keeps no-op CommandCounterIncrement operations cheap.
*/
if (currentCommandIdUsed)
{
/*
* Workers synchronize transaction state at the beginning of each
* parallel operation, so we can't account for new commands after that
* point.
*/
if (IsInParallelMode() || IsParallelWorker())
elog(ERROR, "cannot start commands during a parallel operation");
currentCommandId += 1;
if (currentCommandId == InvalidCommandId)
{
currentCommandId -= 1;
ereport(ERROR,
(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
errmsg("cannot have more than 2^32-2 commands in a transaction")));
}
currentCommandIdUsed = false;
/* Propagate new command ID into static snapshots */
SnapshotSetCommandId(currentCommandId);
/*
* Make any catalog changes done by the just-completed command visible
* in the local syscache. We obviously don't need to do this after a
* read-only command. (But see hacks in inval.c to make real sure we
* don't think a command that queued inval messages was read-only.)
*/
AtCCI_LocalCache();
}
}
5、MemoryContextDeleteChildren
/*
* MemoryContextDeleteChildren
* Delete all the descendants of the named context and release all
* space allocated therein. The named context itself is not touched.
*/
void
MemoryContextDeleteChildren(MemoryContext context)
{
AssertArg(MemoryContextIsValid(context));
/*
* MemoryContextDelete will delink the child from me, so just iterate as
* long as there is a child.
*/
while (context->firstchild != NULL)
MemoryContextDelete(context->firstchild);
}
二、源码解读
1、PortalRun
/*
* PortalRun
* Run a portal's query or queries.
*
* count <= 0 is interpreted as a no-op: the destination gets started up
* and shut down, but nothing else happens. Also, count == FETCH_ALL is
* interpreted as "all rows". Note that count is ignored in multi-query
* situations, where we always run the portal to completion.
*
* isTopLevel: true if query is being executed at backend "top level"
* (that is, directly from a client command message)
*
* dest: where to send output of primary (canSetTag) query
*
* altdest: where to send output of non-primary queries
*
* completionTag: points to a buffer of size COMPLETION_TAG_BUFSIZE
* in which to store a command completion status string.
* May be NULL if caller doesn't want a status string.
*
* Returns true if the portal's execution is complete, false if it was
* suspended due to exhaustion of the count parameter.
*/
/*
输入:
参照PortalRunMulti
输出:
布尔变量,成功true,失败false
*/
bool
PortalRun(Portal portal, long count, bool isTopLevel, bool run_once,
DestReceiver *dest, DestReceiver *altdest,
char *completionTag)
{
bool result;//返回结果
uint64 nprocessed;
ResourceOwner saveTopTransactionResourceOwner;//高层事务资源宿主
MemoryContext saveTopTransactionContext;//内存上下文
Portal saveActivePortal;//活动的Portal
ResourceOwner saveResourceOwner;
MemoryContext savePortalContext;
MemoryContext saveMemoryContext;
AssertArg(PortalIsValid(portal));
TRACE_POSTGRESQL_QUERY_EXECUTE_START();
/* Initialize completion tag to empty string */
if (completionTag)
completionTag[0] = '