Hive覚え書き・その3
where句に出てくるデフォルト・パーティションをどう処理しているかの調査中.
qlプロジェクトの調査
grep -nIR 'DEFAULTPARTITIONNAME' --include '*.java' ./ql/src/java/org/apache/hadoop/hive/ql/
./ql/src/java/org/apache/hadoop/hive/ql//exec/ColumnStatsTask.java:364: this.conf.getVar(ConfVars.DEFAULTPARTITIONNAME) : partVal.toString());
./ql/src/java/org/apache/hadoop/hive/ql//exec/DDLTask.java:1105: if (part.getName().equals(conf.getVar(HiveConf.ConfVars.DEFAULTPARTITIONNAME))) {
./ql/src/java/org/apache/hadoop/hive/ql//exec/TableScanOperator.java:258: defaultPartitionName = HiveConf.getVar(hconf, HiveConf.ConfVars.DEFAULTPARTITIONNAME);
./ql/src/java/org/apache/hadoop/hive/ql//metadata/Hive.java:2569: String defaultPartitionName = HiveConf.getVar(conf, ConfVars.DEFAULTPARTITIONNAME);
./ql/src/java/org/apache/hadoop/hive/ql//optimizer/ppr/PartitionPruner.java:476: String defaultPartitionName = conf.getVar(HiveConf.ConfVars.DEFAULTPARTITIONNAME);
./ql/src/java/org/apache/hadoop/hive/ql//parse/BaseSemanticAnalyzer.java:1474: String defaultPartitionName = HiveConf.getVar(conf, HiveConf.ConfVars.DEFAULTPARTITIONNAME);
./ql/src/java/org/apache/hadoop/hive/ql//parse/DDLSemanticAnalyzer.java:244: reservedPartitionValues.add(HiveConf.getVar(conf, ConfVars.DEFAULTPARTITIONNAME));
./ql/src/java/org/apache/hadoop/hive/ql//parse/SemanticAnalyzer.java:6594: conf.getVar(HiveConf.ConfVars.DEFAULTPARTITIONNAME),
./ql/src/java/org/apache/hadoop/hive/ql//stats/StatsUtils.java:438: ci.getType().getTypeName(), conf.getVar(ConfVars.DEFAULTPARTITIONNAME)));
うーん, org.apache.hive.ql.parse パッケージあたりを探ればいいのかな?
org.apache.hive.ql.parse.BaseSemanticAnalyzer
/**
* BaseSemanticAnalyzer.
*
*/
public abstract class BaseSemanticAnalyzer {
// ...
private static boolean getPartExprNodeDesc(ASTNode astNode, HiveConf conf,
Map<ASTNode, ExprNodeDesc> astExprNodeMap) throws SemanticException {
if (astNode == null) {
return true;
} else if ((astNode.getChildren() == null) || (astNode.getChildren().size() == 0)) {
return astNode.getType() != HiveParser.TOK_PARTVAL;
}
TypeCheckCtx typeCheckCtx = new TypeCheckCtx(null);
String defaultPartitionName = HiveConf.getVar(conf, HiveConf.ConfVars.DEFAULTPARTITIONNAME);
boolean result = true;
for (Node childNode : astNode.getChildren()) {
ASTNode childASTNode = (ASTNode)childNode;
if (childASTNode.getType() != HiveParser.TOK_PARTVAL) {
result = getPartExprNodeDesc(childASTNode, conf, astExprNodeMap) && result;
} else {
boolean isDynamicPart = childASTNode.getChildren().size() <= 1;
result = !isDynamicPart && result;
if (!isDynamicPart) {
ASTNode partVal = (ASTNode)childASTNode.getChildren().get(1);
if (!defaultPartitionName.equalsIgnoreCase(unescapeSQLString(partVal.getText()))) {
astExprNodeMap.put((ASTNode)childASTNode.getChildren().get(0),
TypeCheckProcFactory.genExprNode(partVal, typeCheckCtx).get(partVal));
}
}
}
}
return result;
}
// ...
}
抽象構文木 (AST) にデフォルト・パーティションの値があったら特別扱いするようだ. astExprNodeMap はどんな意味の記録なんだろうか.
childASTNode.getChildren().get(0) みたいなASTの走査しなきゃいけないの辛い, どういう文法なのかのコメントが欲しい.
org.apache.hive.ql.parse.DDLSemanticAnalyzer
/**
* DDLSemanticAnalyzer.
*
*/
public class DDLSemanticAnalyzer extends BaseSemanticAnalyzer {
// ...
public DDLSemanticAnalyzer(QueryState queryState, Hive db) throws SemanticException {
super(queryState, db);
reservedPartitionValues = new HashSet<String>();
// Partition can't have this name
reservedPartitionValues.add(HiveConf.getVar(conf, ConfVars.DEFAULTPARTITIONNAME));
reservedPartitionValues.add(HiveConf.getVar(conf, ConfVars.DEFAULT_ZOOKEEPER_PARTITION_NAME));
// Partition value can't end in this suffix
reservedPartitionValues.add(HiveConf.getVar(conf, ConfVars.METASTORE_INT_ORIGINAL));
reservedPartitionValues.add(HiveConf.getVar(conf, ConfVars.METASTORE_INT_ARCHIVED));
reservedPartitionValues.add(HiveConf.getVar(conf, ConfVars.METASTORE_INT_EXTRACTED));
hiveAuthorizationTaskFactory = createAuthorizationTaskFactory(conf, db);
}
// ...
/**
* Certain partition values are are used by hive. e.g. the default partition
* in dynamic partitioning and the intermediate partition values used in the
* archiving process. Naturally, prohibit the user from creating partitions
* with these reserved values. The check that this function is more
* restrictive than the actual limitation, but it's simpler. Should be okay
* since the reserved names are fairly long and uncommon.
*/
private void validatePartitionValues(Map<String, String> partSpec)
throws SemanticException {
for (Entry<String, String> e : partSpec.entrySet()) {
for (String s : reservedPartitionValues) {
String value = e.getValue();
if (value != null && value.contains(s)) {
throw new SemanticException(ErrorMsg.RESERVED_PART_VAL.getMsg(
"(User value: " + e.getValue() + " Reserved substring: " + s + ")"));
}
}
}
}
// ...
}
デフォルト・パーティションの値を予約語として登録して, それを含めた予約語が出てくるパーティション値をエラーとして例外を飛ばすっぽい. ちなみに reservedPartitionValues はDDLSemanticAnalyzerのフィールド.
org.apache.hive.ql.parse.SemanticAnalyzer
/**
* Implementation of the semantic analyzer. It generates the query plan.
* There are other specific semantic analyzers for some hive operations such as
* DDLSemanticAnalyzer for ddl operations.
*/
public class SemanticAnalyzer extends BaseSemanticAnalyzer {
// ...
@SuppressWarnings("nls")
protected Operator genFileSinkPlan(String dest, QB qb, Operator input)
throws SemanticException {
RowResolver inputRR = opParseCtx.get(input).getRowResolver();
QBMetaData qbm = qb.getMetaData();
Integer dest_type = qbm.getDestTypeForAlias(dest);
Table dest_tab = null; // destination table if any
boolean destTableIsAcid = false; // should the destination table be written to using ACID
boolean destTableIsTemporary = false;
boolean destTableIsMaterialization = false;
Partition dest_part = null;// destination partition if any
Path queryTmpdir = null; // the intermediate destination directory
Path dest_path = null; // the final destination directory
TableDesc table_desc = null;
int currentTableId = 0;
boolean isLocal = false;
SortBucketRSCtx rsCtx = new SortBucketRSCtx();
DynamicPartitionCtx dpCtx = null;
LoadTableDesc ltd = null;
ListBucketingCtx lbCtx = null;
Map<String, String> partSpec = null;
switch (dest_type.intValue()) {
case QBMetaData.DEST_TABLE: {
dest_tab = qbm.getDestTableForAlias(dest);
destTableIsAcid = AcidUtils.isAcidTable(dest_tab);
destTableIsTemporary = dest_tab.isTemporary();
// Is the user trying to insert into a external tables
if ((!conf.getBoolVar(HiveConf.ConfVars.HIVE_INSERT_INTO_EXTERNAL_TABLES)) &&
(dest_tab.getTableType().equals(TableType.EXTERNAL_TABLE))) {
throw new SemanticException(
ErrorMsg.INSERT_EXTERNAL_TABLE.getMsg(dest_tab.getTableName()));
}
partSpec = qbm.getPartSpecForAlias(dest);
dest_path = dest_tab.getPath();
// If the query here is an INSERT_INTO and the target is an immutable table,
// verify that our destination is empty before proceeding
if (dest_tab.isImmutable() &&
qb.getParseInfo().isInsertIntoTable(dest_tab.getDbName(),dest_tab.getTableName())){
try {
FileSystem fs = dest_path.getFileSystem(conf);
if (! MetaStoreUtils.isDirEmpty(fs,dest_path)){
LOG.warn("Attempted write into an immutable table : "
+ dest_tab.getTableName() + " : " + dest_path);
throw new SemanticException(
ErrorMsg.INSERT_INTO_IMMUTABLE_TABLE.getMsg(dest_tab.getTableName()));
}
} catch (IOException ioe) {
LOG.warn("Error while trying to determine if immutable table has any data : "
+ dest_tab.getTableName() + " : " + dest_path);
throw new SemanticException(ErrorMsg.INSERT_INTO_IMMUTABLE_TABLE.getMsg(ioe.getMessage()));
}
}
// check for partition
List<FieldSchema> parts = dest_tab.getPartitionKeys();
if (parts != null && parts.size() > 0) { // table is partitioned
if (partSpec == null || partSpec.size() == 0) { // user did NOT specify partition
throw new SemanticException(generateErrorMessage(
qb.getParseInfo().getDestForClause(dest),
ErrorMsg.NEED_PARTITION_ERROR.getMsg()));
}
dpCtx = qbm.getDPCtx(dest);
if (dpCtx == null) {
dest_tab.validatePartColumnNames(partSpec, false);
dpCtx = new DynamicPartitionCtx(dest_tab, partSpec,
conf.getVar(HiveConf.ConfVars.DEFAULTPARTITIONNAME),
conf.getIntVar(HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTSPERNODE));
qbm.setDPCtx(dest, dpCtx);
}
if (!HiveConf.getBoolVar(conf, HiveConf.ConfVars.DYNAMICPARTITIONING)) { // allow DP
throw new SemanticException(generateErrorMessage(
qb.getParseInfo().getDestForClause(dest),
ErrorMsg.DYNAMIC_PARTITION_DISABLED.getMsg()));
}
if (dpCtx.getSPPath() != null) {
dest_path = new Path(dest_tab.getPath(), dpCtx.getSPPath());
}
if ((dest_tab.getNumBuckets() > 0)) {
dpCtx.setNumBuckets(dest_tab.getNumBuckets());
}
}
boolean isNonNativeTable = dest_tab.isNonNative();
if (isNonNativeTable) {
queryTmpdir = dest_path;
} else {
queryTmpdir = ctx.getTempDirForPath(dest_path);
}
if (dpCtx != null) {
// set the root of the temporary path where dynamic partition columns will populate
dpCtx.setRootPath(queryTmpdir);
}
// this table_desc does not contain the partitioning columns
table_desc = Utilities.getTableDesc(dest_tab);
// Add sorting/bucketing if needed
input = genBucketingSortingDest(dest, input, qb, table_desc, dest_tab, rsCtx);
idToTableNameMap.put(String.valueOf(destTableId), dest_tab.getTableName());
currentTableId = destTableId;
destTableId++;
lbCtx = constructListBucketingCtx(dest_tab.getSkewedColNames(),
dest_tab.getSkewedColValues(), dest_tab.getSkewedColValueLocationMaps(),
dest_tab.isStoredAsSubDirectories(), conf);
// Create the work for moving the table
// NOTE: specify Dynamic partitions in dest_tab for WriteEntity
if (!isNonNativeTable) {
AcidUtils.Operation acidOp = AcidUtils.Operation.NOT_ACID;
if (destTableIsAcid) {
acidOp = getAcidType(table_desc.getOutputFileFormatClass());
checkAcidConstraints(qb, table_desc, dest_tab);
}
ltd = new LoadTableDesc(queryTmpdir, table_desc, dpCtx, acidOp);
ltd.setReplace(!qb.getParseInfo().isInsertIntoTable(dest_tab.getDbName(),
dest_tab.getTableName()));
ltd.setLbCtx(lbCtx);
loadTableWork.add(ltd);
} else {
// This is a non-native table.
// We need to set stats as inaccurate.
setStatsForNonNativeTable(dest_tab);
}
WriteEntity output = null;
// Here only register the whole table for post-exec hook if no DP present
// in the case of DP, we will register WriteEntity in MoveTask when the
// list of dynamically created partitions are known.
if ((dpCtx == null || dpCtx.getNumDPCols() == 0)) {
output = new WriteEntity(dest_tab, determineWriteType(ltd, isNonNativeTable));
if (!outputs.add(output)) {
throw new SemanticException(ErrorMsg.OUTPUT_SPECIFIED_MULTIPLE_TIMES
.getMsg(dest_tab.getTableName()));
}
}
if ((dpCtx != null) && (dpCtx.getNumDPCols() >= 0)) {
// No static partition specified
if (dpCtx.getNumSPCols() == 0) {
output = new WriteEntity(dest_tab, determineWriteType(ltd, isNonNativeTable), false);
outputs.add(output);
}
// part of the partition specified
// Create a DummyPartition in this case. Since, the metastore does not store partial
// partitions currently, we need to store dummy partitions
else {
try {
String ppath = dpCtx.getSPPath();
ppath = ppath.substring(0, ppath.length() - 1);
DummyPartition p =
new DummyPartition(dest_tab, dest_tab.getDbName()
+ "@" + dest_tab.getTableName() + "@" + ppath,
partSpec);
output = new WriteEntity(p, WriteEntity.WriteType.INSERT, false);
outputs.add(output);
} catch (HiveException e) {
throw new SemanticException(e.getMessage(), e);
}
}
}
ctx.getLoadTableOutputMap().put(ltd, output);
break;
}
case QBMetaData.DEST_PARTITION: {
dest_part = qbm.getDestPartitionForAlias(dest);
dest_tab = dest_part.getTable();
destTableIsAcid = AcidUtils.isAcidTable(dest_tab);
if ((!conf.getBoolVar(HiveConf.ConfVars.HIVE_INSERT_INTO_EXTERNAL_TABLES)) &&
dest_tab.getTableType().equals(TableType.EXTERNAL_TABLE)) {
throw new SemanticException(
ErrorMsg.INSERT_EXTERNAL_TABLE.getMsg(dest_tab.getTableName()));
}
Path tabPath = dest_tab.getPath();
Path partPath = dest_part.getDataLocation();
// If the query here is an INSERT_INTO and the target is an immutable table,
// verify that our destination is empty before proceeding
if (dest_tab.isImmutable() &&
qb.getParseInfo().isInsertIntoTable(dest_tab.getDbName(),dest_tab.getTableName())){
try {
FileSystem fs = partPath.getFileSystem(conf);
if (! MetaStoreUtils.isDirEmpty(fs,partPath)){
LOG.warn("Attempted write into an immutable table partition : "
+ dest_tab.getTableName() + " : " + partPath);
throw new SemanticException(
ErrorMsg.INSERT_INTO_IMMUTABLE_TABLE.getMsg(dest_tab.getTableName()));
}
} catch (IOException ioe) {
LOG.warn("Error while trying to determine if immutable table partition has any data : "
+ dest_tab.getTableName() + " : " + partPath);
throw new SemanticException(ErrorMsg.INSERT_INTO_IMMUTABLE_TABLE.getMsg(ioe.getMessage()));
}
}
// if the table is in a different dfs than the partition,
// replace the partition's dfs with the table's dfs.
dest_path = new Path(tabPath.toUri().getScheme(), tabPath.toUri()
.getAuthority(), partPath.toUri().getPath());
queryTmpdir = ctx.getTempDirForPath(dest_path);
table_desc = Utilities.getTableDesc(dest_tab);
// Add sorting/bucketing if needed
input = genBucketingSortingDest(dest, input, qb, table_desc, dest_tab, rsCtx);
idToTableNameMap.put(String.valueOf(destTableId), dest_tab.getTableName());
currentTableId = destTableId;
destTableId++;
lbCtx = constructListBucketingCtx(dest_part.getSkewedColNames(),
dest_part.getSkewedColValues(), dest_part.getSkewedColValueLocationMaps(),
dest_part.isStoredAsSubDirectories(), conf);
AcidUtils.Operation acidOp = AcidUtils.Operation.NOT_ACID;
if (destTableIsAcid) {
acidOp = getAcidType(table_desc.getOutputFileFormatClass());
checkAcidConstraints(qb, table_desc, dest_tab);
}
ltd = new LoadTableDesc(queryTmpdir, table_desc, dest_part.getSpec(), acidOp);
ltd.setReplace(!qb.getParseInfo().isInsertIntoTable(dest_tab.getDbName(),
dest_tab.getTableName()));
ltd.setLbCtx(lbCtx);
loadTableWork.add(ltd);
if (!outputs.add(new WriteEntity(dest_part, (ltd.getReplace() ?
WriteEntity.WriteType.INSERT_OVERWRITE :
WriteEntity.WriteType.INSERT)))) {
throw new SemanticException(ErrorMsg.OUTPUT_SPECIFIED_MULTIPLE_TIMES
.getMsg(dest_tab.getTableName() + "@" + dest_part.getName()));
}
break;
}
case QBMetaData.DEST_LOCAL_FILE:
isLocal = true;
// fall through
case QBMetaData.DEST_DFS_FILE: {
dest_path = new Path(qbm.getDestFileForAlias(dest));
if (isLocal) {
// for local directory - we always write to map-red intermediate
// store and then copy to local fs
queryTmpdir = ctx.getMRTmpPath();
} else {
// otherwise write to the file system implied by the directory
// no copy is required. we may want to revisit this policy in future
try {
Path qPath = FileUtils.makeQualified(dest_path, conf);
queryTmpdir = ctx.getTempDirForPath(qPath);
} catch (Exception e) {
throw new SemanticException("Error creating temporary folder on: "
+ dest_path, e);
}
}
String cols = "";
String colTypes = "";
ArrayList<ColumnInfo> colInfos = inputRR.getColumnInfos();
// CTAS case: the file output format and serde are defined by the create
// table command rather than taking the default value
List<FieldSchema> field_schemas = null;
CreateTableDesc tblDesc = qb.getTableDesc();
if (tblDesc != null) {
field_schemas = new ArrayList<FieldSchema>();
destTableIsTemporary = tblDesc.isTemporary();
destTableIsMaterialization = tblDesc.isMaterialization();
}
boolean first = true;
for (ColumnInfo colInfo : colInfos) {
String[] nm = inputRR.reverseLookup(colInfo.getInternalName());
if (nm[1] != null) { // non-null column alias
colInfo.setAlias(nm[1]);
}
String colName = colInfo.getInternalName(); //default column name
if (field_schemas != null) {
FieldSchema col = new FieldSchema();
if (!("".equals(nm[0])) && nm[1] != null) {
colName = unescapeIdentifier(colInfo.getAlias()).toLowerCase(); // remove ``
}
colName = fixCtasColumnName(colName);
col.setName(colName);
String typeName = colInfo.getType().getTypeName();
// CTAS should NOT create a VOID type
if (typeName.equals(serdeConstants.VOID_TYPE_NAME)) {
throw new SemanticException(ErrorMsg.CTAS_CREATES_VOID_TYPE
.getMsg(colName));
}
col.setType(typeName);
field_schemas.add(col);
}
if (!first) {
cols = cols.concat(",");
colTypes = colTypes.concat(":");
}
first = false;
cols = cols.concat(colName);
// Replace VOID type with string when the output is a temp table or
// local files.
// A VOID type can be generated under the query:
//
// select NULL from tt;
// or
// insert overwrite local directory "abc" select NULL from tt;
//
// where there is no column type to which the NULL value should be
// converted.
//
String tName = colInfo.getType().getTypeName();
if (tName.equals(serdeConstants.VOID_TYPE_NAME)) {
colTypes = colTypes.concat(serdeConstants.STRING_TYPE_NAME);
} else {
colTypes = colTypes.concat(tName);
}
}
// update the create table descriptor with the resulting schema.
if (tblDesc != null) {
tblDesc.setCols(new ArrayList<FieldSchema>(field_schemas));
}
boolean isDestTempFile = true;
if (!ctx.isMRTmpFileURI(dest_path.toUri().toString())) {
idToTableNameMap.put(String.valueOf(destTableId), dest_path.toUri().toString());
currentTableId = destTableId;
destTableId++;
isDestTempFile = false;
}
boolean isDfsDir = (dest_type.intValue() == QBMetaData.DEST_DFS_FILE);
loadFileWork.add(new LoadFileDesc(tblDesc, queryTmpdir, dest_path, isDfsDir, cols,
colTypes));
if (tblDesc == null) {
if (qb.getIsQuery()) {
String fileFormat;
if (SessionState.get().isHiveServerQuery() &&
conf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_SERIALIZE_IN_TASKS)) {
fileFormat = "SequenceFile";
HiveConf.setVar(conf, HiveConf.ConfVars.HIVEQUERYRESULTFILEFORMAT, fileFormat);
table_desc=
PlanUtils.getDefaultQueryOutputTableDesc(cols, colTypes, fileFormat,
ThriftJDBCBinarySerDe.class);
// Set the fetch formatter to be a no-op for the ListSinkOperator, since we'll
// write out formatted thrift objects to SequenceFile
conf.set(SerDeUtils.LIST_SINK_OUTPUT_FORMATTER, NoOpFetchFormatter.class.getName());
} else {
fileFormat = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEQUERYRESULTFILEFORMAT);
table_desc =
PlanUtils.getDefaultQueryOutputTableDesc(cols, colTypes, fileFormat,
LazySimpleSerDe.class);
}
} else {
table_desc = PlanUtils.getDefaultTableDesc(qb.getDirectoryDesc(), cols, colTypes);
}
} else {
table_desc = PlanUtils.getTableDesc(tblDesc, cols, colTypes);
}
if (!outputs.add(new WriteEntity(dest_path, !isDfsDir, isDestTempFile))) {
throw new SemanticException(ErrorMsg.OUTPUT_SPECIFIED_MULTIPLE_TIMES
.getMsg(dest_path.toUri().toString()));
}
break;
}
default:
throw new SemanticException("Unknown destination type: " + dest_type);
}
input = genConversionSelectOperator(dest, qb, input, table_desc, dpCtx);
inputRR = opParseCtx.get(input).getRowResolver();
ArrayList<ColumnInfo> vecCol = new ArrayList<ColumnInfo>();
if (updating() || deleting()) {
vecCol.add(new ColumnInfo(VirtualColumn.ROWID.getName(), VirtualColumn.ROWID.getTypeInfo(),
"", true));
} else {
try {
StructObjectInspector rowObjectInspector = (StructObjectInspector) table_desc
.getDeserializer(conf).getObjectInspector();
List<? extends StructField> fields = rowObjectInspector
.getAllStructFieldRefs();
for (int i = 0; i < fields.size(); i++) {
vecCol.add(new ColumnInfo(fields.get(i).getFieldName(), TypeInfoUtils
.getTypeInfoFromObjectInspector(fields.get(i)
.getFieldObjectInspector()), "", false));
}
} catch (Exception e) {
throw new SemanticException(e.getMessage(), e);
}
}
RowSchema fsRS = new RowSchema(vecCol);
// The output files of a FileSink can be merged if they are either not being written to a table
// or are being written to a table which is not bucketed
// and table the table is not sorted
boolean canBeMerged = (dest_tab == null || !((dest_tab.getNumBuckets() > 0) ||
(dest_tab.getSortCols() != null && dest_tab.getSortCols().size() > 0)));
// If this table is working with ACID semantics, turn off merging
canBeMerged &= !destTableIsAcid;
// Generate the partition columns from the parent input
if (dest_type.intValue() == QBMetaData.DEST_TABLE
|| dest_type.intValue() == QBMetaData.DEST_PARTITION) {
genPartnCols(dest, input, qb, table_desc, dest_tab, rsCtx);
}
FileSinkDesc fileSinkDesc = new FileSinkDesc(
queryTmpdir,
table_desc,
conf.getBoolVar(HiveConf.ConfVars.COMPRESSRESULT),
currentTableId,
rsCtx.isMultiFileSpray(),
canBeMerged,
rsCtx.getNumFiles(),
rsCtx.getTotalFiles(),
rsCtx.getPartnCols(),
dpCtx,
dest_path);
fileSinkDesc.setHiveServerQuery(SessionState.get().isHiveServerQuery());
// If this is an insert, update, or delete on an ACID table then mark that so the
// FileSinkOperator knows how to properly write to it.
if (destTableIsAcid) {
AcidUtils.Operation wt = updating() ? AcidUtils.Operation.UPDATE :
(deleting() ? AcidUtils.Operation.DELETE : AcidUtils.Operation.INSERT);
fileSinkDesc.setWriteType(wt);
acidFileSinks.add(fileSinkDesc);
}
fileSinkDesc.setTemporary(destTableIsTemporary);
fileSinkDesc.setMaterialization(destTableIsMaterialization);
/* Set List Bucketing context. */
if (lbCtx != null) {
lbCtx.processRowSkewedIndex(fsRS);
lbCtx.calculateSkewedValueSubDirList();
}
fileSinkDesc.setLbCtx(lbCtx);
// set the stats publishing/aggregating key prefix
// the same as directory name. The directory name
// can be changed in the optimizer but the key should not be changed
// it should be the same as the MoveWork's sourceDir.
fileSinkDesc.setStatsAggPrefix(fileSinkDesc.getDirName().toString());
if (!destTableIsMaterialization &&
HiveConf.getVar(conf, HIVESTATSDBCLASS).equalsIgnoreCase(StatDB.fs.name())) {
String statsTmpLoc = ctx.getTempDirForPath(dest_path).toString();
fileSinkDesc.setStatsTmpDir(statsTmpLoc);
LOG.debug("Set stats collection dir : " + statsTmpLoc);
}
if (dest_part != null) {
try {
String staticSpec = Warehouse.makePartPath(dest_part.getSpec());
fileSinkDesc.setStaticSpec(staticSpec);
} catch (MetaException e) {
throw new SemanticException(e);
}
} else if (dpCtx != null) {
fileSinkDesc.setStaticSpec(dpCtx.getSPPath());
}
Operator output = putOpInsertMap(OperatorFactory.getAndMakeChild(
fileSinkDesc, fsRS, input), inputRR);
if (ltd != null && SessionState.get() != null) {
SessionState.get().getLineageState()
.mapDirToFop(ltd.getSourcePath(), (FileSinkOperator) output);
} else if ( queryState.getCommandType().equals(HiveOperation.CREATETABLE_AS_SELECT.getOperationName())) {
Path tlocation = null;
String tName = Utilities.getDbTableName(tableDesc.getTableName())[1];
try {
Warehouse wh = new Warehouse(conf);
tlocation = wh.getTablePath(db.getDatabase(tableDesc.getDatabaseName()), tName);
} catch (MetaException|HiveException e) {
throw new SemanticException(e);
}
SessionState.get().getLineageState()
.mapDirToFop(tlocation, (FileSinkOperator) output);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Created FileSink Plan for clause: " + dest + "dest_path: "
+ dest_path + " row schema: " + inputRR.toString());
}
FileSinkOperator fso = (FileSinkOperator) output;
fso.getConf().setTable(dest_tab);
fsopToTable.put(fso, dest_tab);
// the following code is used to collect column stats when
// hive.stats.autogather=true
// and it is an insert overwrite or insert into table
if (dest_tab != null && conf.getBoolVar(ConfVars.HIVESTATSAUTOGATHER)
&& conf.getBoolVar(ConfVars.HIVESTATSCOLAUTOGATHER)
&& ColumnStatsAutoGatherContext.canRunAutogatherStats(fso)) {
if (dest_type.intValue() == QBMetaData.DEST_TABLE) {
genAutoColumnStatsGatheringPipeline(qb, table_desc, partSpec, input, qb.getParseInfo()
.isInsertIntoTable(dest_tab.getDbName(), dest_tab.getTableName()));
} else if (dest_type.intValue() == QBMetaData.DEST_PARTITION) {
genAutoColumnStatsGatheringPipeline(qb, table_desc, dest_part.getSpec(), input, qb
.getParseInfo().isInsertIntoTable(dest_tab.getDbName(), dest_tab.getTableName()));
}
}
return output;
}
// ...
}
メソッド長ぇ.
どうもここを読むと良さそうな気がする.
とりあえず今日はここまで. 全体の設計意図が把握できないうちはソースコードの中で迷子になるなぁ.